xref: /freebsd/contrib/lib9p/threadpool.c (revision 134e17798c9af53632b372348ab828e75e65bf46)
1 /*
2  * Copyright 2016 Jakub Klama <jceel@FreeBSD.org>
3  * All rights reserved
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted providing that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
15  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
18  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
22  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
23  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24  * POSSIBILITY OF SUCH DAMAGE.
25  *
26  */
27 
28 #include <errno.h>
29 #include <stdlib.h>
30 #include <pthread.h>
31 #if defined(__FreeBSD__)
32 #include <pthread_np.h>
33 #endif
34 #include <sys/queue.h>
35 #include "lib9p.h"
36 #include "threadpool.h"
37 
38 static void l9p_threadpool_rflush(struct l9p_threadpool *tp,
39     struct l9p_request *req);
40 
41 static void *
l9p_responder(void * arg)42 l9p_responder(void *arg)
43 {
44 	struct l9p_threadpool *tp;
45 	struct l9p_worker *worker = arg;
46 	struct l9p_request *req;
47 
48 	tp = worker->ltw_tp;
49 	for (;;) {
50 		/* get next reply to send */
51 		pthread_mutex_lock(&tp->ltp_mtx);
52 		while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting)
53 			pthread_cond_wait(&tp->ltp_reply_cv, &tp->ltp_mtx);
54 		if (worker->ltw_exiting) {
55 			pthread_mutex_unlock(&tp->ltp_mtx);
56 			break;
57 		}
58 
59 		/* off reply queue */
60 		req = STAILQ_FIRST(&tp->ltp_replyq);
61 		STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink);
62 
63 		/* request is now in final glide path, can't be Tflush-ed */
64 		req->lr_workstate = L9P_WS_REPLYING;
65 
66 		/* any flushers waiting for this request can go now */
67 		if (req->lr_flushstate != L9P_FLUSH_NONE)
68 			l9p_threadpool_rflush(tp, req);
69 
70 		pthread_mutex_unlock(&tp->ltp_mtx);
71 
72 		/* send response */
73 		l9p_respond(req, false, true);
74 	}
75 	return (NULL);
76 }
77 
78 static void *
l9p_worker(void * arg)79 l9p_worker(void *arg)
80 {
81 	struct l9p_threadpool *tp;
82 	struct l9p_worker *worker = arg;
83 	struct l9p_request *req;
84 
85 	tp = worker->ltw_tp;
86 	pthread_mutex_lock(&tp->ltp_mtx);
87 	for (;;) {
88 		while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting)
89 			pthread_cond_wait(&tp->ltp_work_cv, &tp->ltp_mtx);
90 		if (worker->ltw_exiting)
91 			break;
92 
93 		/* off work queue; now work-in-progress, by us */
94 		req = STAILQ_FIRST(&tp->ltp_workq);
95 		STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink);
96 		req->lr_workstate = L9P_WS_INPROGRESS;
97 		req->lr_worker = worker;
98 		pthread_mutex_unlock(&tp->ltp_mtx);
99 
100 		/* actually try the request */
101 		req->lr_error = l9p_dispatch_request(req);
102 
103 		/* move to responder queue, updating work-state */
104 		pthread_mutex_lock(&tp->ltp_mtx);
105 		req->lr_workstate = L9P_WS_RESPQUEUED;
106 		req->lr_worker = NULL;
107 		STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
108 
109 		/* signal the responder */
110 		pthread_cond_signal(&tp->ltp_reply_cv);
111 	}
112 	pthread_mutex_unlock(&tp->ltp_mtx);
113 	return (NULL);
114 }
115 
116 /*
117  * Just before finally replying to a request that got touched by
118  * a Tflush request, we enqueue its flushers (requests of type
119  * Tflush, which are now on the flushee's lr_flushq) onto the
120  * response queue.
121  */
122 static void
l9p_threadpool_rflush(struct l9p_threadpool * tp,struct l9p_request * req)123 l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req)
124 {
125 	struct l9p_request *flusher;
126 
127 	/*
128 	 * https://swtch.com/plan9port/man/man9/flush.html says:
129 	 *
130 	 * "Should multiple Tflushes be received for a pending
131 	 * request, they must be answered in order.  A Rflush for
132 	 * any of the multiple Tflushes implies an answer for all
133 	 * previous ones.  Therefore, should a server receive a
134 	 * request and then multiple flushes for that request, it
135 	 * need respond only to the last flush."  This means
136 	 * we could march through the queue of flushers here,
137 	 * marking all but the last one as "to be dropped" rather
138 	 * than "to be replied-to".
139 	 *
140 	 * However, we'll leave that for later, if ever -- it
141 	 * should be harmless to respond to each, in order.
142 	 */
143 	STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) {
144 		flusher->lr_workstate = L9P_WS_RESPQUEUED;
145 #ifdef notdef
146 		if (not the last) {
147 			flusher->lr_flushstate = L9P_FLUSH_NOT_RUN;
148 			/* or, flusher->lr_drop = true ? */
149 		}
150 #endif
151 		STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink);
152 	}
153 }
154 
155 int
l9p_threadpool_init(struct l9p_threadpool * tp,int size)156 l9p_threadpool_init(struct l9p_threadpool *tp, int size)
157 {
158 	struct l9p_worker *worker;
159 #if defined(__FreeBSD__)
160 	char threadname[16];
161 #endif
162 	int error;
163 	int i, nworkers, nresponders;
164 
165 	if (size <= 0)
166 		return (EINVAL);
167 	error = pthread_mutex_init(&tp->ltp_mtx, NULL);
168 	if (error)
169 		return (error);
170 	error = pthread_cond_init(&tp->ltp_work_cv, NULL);
171 	if (error)
172 		goto fail_work_cv;
173 	error = pthread_cond_init(&tp->ltp_reply_cv, NULL);
174 	if (error)
175 		goto fail_reply_cv;
176 
177 	STAILQ_INIT(&tp->ltp_workq);
178 	STAILQ_INIT(&tp->ltp_replyq);
179 	LIST_INIT(&tp->ltp_workers);
180 
181 	nresponders = 0;
182 	nworkers = 0;
183 	for (i = 0; i <= size; i++) {
184 		worker = calloc(1, sizeof(struct l9p_worker));
185 		worker->ltw_tp = tp;
186 		worker->ltw_responder = i == 0;
187 		error = pthread_create(&worker->ltw_thread, NULL,
188 		    worker->ltw_responder ? l9p_responder : l9p_worker,
189 		    (void *)worker);
190 		if (error) {
191 			free(worker);
192 			break;
193 		}
194 		if (worker->ltw_responder)
195 			nresponders++;
196 		else
197 			nworkers++;
198 
199 #if defined(__FreeBSD__)
200 		if (worker->ltw_responder) {
201 			pthread_set_name_np(worker->ltw_thread, "9p-responder");
202 		} else {
203 			sprintf(threadname, "9p-worker:%d", i - 1);
204 			pthread_set_name_np(worker->ltw_thread, threadname);
205 		}
206 #endif
207 
208 		LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link);
209 	}
210 	if (nresponders == 0 || nworkers == 0) {
211 		/* need the one responder, and at least one worker */
212 		l9p_threadpool_shutdown(tp);
213 		return (error);
214 	}
215 	return (0);
216 
217 	/*
218 	 * We could avoid these labels by having multiple destroy
219 	 * paths (one for each error case), or by having booleans
220 	 * for which variables were initialized.  Neither is very
221 	 * appealing...
222 	 */
223 fail_reply_cv:
224 	pthread_cond_destroy(&tp->ltp_work_cv);
225 fail_work_cv:
226 	pthread_mutex_destroy(&tp->ltp_mtx);
227 
228 	return (error);
229 }
230 
231 /*
232  * Run a request, usually by queueing it.
233  */
234 void
l9p_threadpool_run(struct l9p_threadpool * tp,struct l9p_request * req)235 l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req)
236 {
237 
238 	/*
239 	 * Flush requests must be handled specially, since they
240 	 * can cancel / kill off regular requests.  (But we can
241 	 * run them through the regular dispatch mechanism.)
242 	 */
243 	if (req->lr_req.hdr.type == L9P_TFLUSH) {
244 		/* not on a work queue yet so we can touch state */
245 		req->lr_workstate = L9P_WS_IMMEDIATE;
246 		(void) l9p_dispatch_request(req);
247 	} else {
248 		pthread_mutex_lock(&tp->ltp_mtx);
249 		req->lr_workstate = L9P_WS_NOTSTARTED;
250 		STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink);
251 		pthread_cond_signal(&tp->ltp_work_cv);
252 		pthread_mutex_unlock(&tp->ltp_mtx);
253 	}
254 }
255 
256 /*
257  * Run a Tflush request.  Called via l9p_dispatch_request() since
258  * it has some debug code in it, but not called from worker thread.
259  */
260 int
l9p_threadpool_tflush(struct l9p_request * req)261 l9p_threadpool_tflush(struct l9p_request *req)
262 {
263 	struct l9p_connection *conn;
264 	struct l9p_threadpool *tp;
265 	struct l9p_request *flushee;
266 	uint16_t oldtag;
267 	enum l9p_flushstate nstate;
268 
269 	/*
270 	 * Find what we're supposed to flush (the flushee, as it were).
271 	 */
272 	req->lr_error = 0;	/* Tflush always succeeds */
273 	conn = req->lr_conn;
274 	tp = &conn->lc_tp;
275 	oldtag = req->lr_req.tflush.oldtag;
276 	ht_wrlock(&conn->lc_requests);
277 	flushee = ht_find_locked(&conn->lc_requests, oldtag);
278 	if (flushee == NULL) {
279 		/*
280 		 * Nothing to flush!  The old request must have
281 		 * been done and gone already.  Just queue this
282 		 * Tflush for a success reply.
283 		 */
284 		ht_unlock(&conn->lc_requests);
285 		pthread_mutex_lock(&tp->ltp_mtx);
286 		goto done;
287 	}
288 
289 	/*
290 	 * Found the original request.  We'll need to inspect its
291 	 * work-state to figure out what to do.
292 	 */
293 	pthread_mutex_lock(&tp->ltp_mtx);
294 	ht_unlock(&conn->lc_requests);
295 
296 	switch (flushee->lr_workstate) {
297 
298 	case L9P_WS_NOTSTARTED:
299 		/*
300 		 * Flushee is on work queue, but not yet being
301 		 * handled by a worker.
302 		 *
303 		 * The documentation -- see
304 		 * http://ericvh.github.io/9p-rfc/rfc9p2000.html
305 		 * https://swtch.com/plan9port/man/man9/flush.html
306 		 * -- says that "the server should answer the
307 		 * flush message immediately".  However, Linux
308 		 * sends flush requests for operations that
309 		 * must finish, such as Tclunk, and it's not
310 		 * possible to *answer* the flush request until
311 		 * it has been handled (if necessary) or aborted
312 		 * (if allowed).
313 		 *
314 		 * We therefore now just  the original request
315 		 * and let the request-handler do whatever is
316 		 * appropriate.  NOTE: we could have a table of
317 		 * "requests that can be aborted without being
318 		 * run" vs "requests that must be run to be
319 		 * aborted", but for now that seems like an
320 		 * unnecessary complication.
321 		 */
322 		nstate = L9P_FLUSH_REQUESTED_PRE_START;
323 		break;
324 
325 	case L9P_WS_IMMEDIATE:
326 		/*
327 		 * This state only applies to Tflush requests, and
328 		 * flushing a Tflush is illegal.  But we'll do nothing
329 		 * special here, which will make us act like a flush
330 		 * request for the flushee that arrived too late to
331 		 * do anything about the flushee.
332 		 */
333 		nstate = L9P_FLUSH_REQUESTED_POST_START;
334 		break;
335 
336 	case L9P_WS_INPROGRESS:
337 		/*
338 		 * Worker thread flushee->lr_worker is working on it.
339 		 * Kick it to get it out of blocking system calls.
340 		 * (This requires that it carefully set up some
341 		 * signal handlers, and may be FreeBSD-dependent,
342 		 * it probably cannot be handled this way on MacOS.)
343 		 */
344 #ifdef notyet
345 		pthread_kill(...);
346 #endif
347 		nstate = L9P_FLUSH_REQUESTED_POST_START;
348 		break;
349 
350 	case L9P_WS_RESPQUEUED:
351 		/*
352 		 * The flushee is already in the response queue.
353 		 * We'll just mark it as having had some flush
354 		 * action applied.
355 		 */
356 		nstate = L9P_FLUSH_TOOLATE;
357 		break;
358 
359 	case L9P_WS_REPLYING:
360 		/*
361 		 * Although we found the flushee, it's too late to
362 		 * make us depend on it: it's already heading out
363 		 * the door as a reply.
364 		 *
365 		 * We don't want to do anything to the flushee.
366 		 * Instead, we want to work the same way as if
367 		 * we had never found the tag.
368 		 */
369 		goto done;
370 	}
371 
372 	/*
373 	 * Now add us to the list of Tflush-es that are waiting
374 	 * for the flushee (creating the list if needed, i.e., if
375 	 * this is the first Tflush for the flushee).  We (req)
376 	 * will get queued for reply later, when the responder
377 	 * processes the flushee and calls l9p_threadpool_rflush().
378 	 */
379 	if (flushee->lr_flushstate == L9P_FLUSH_NONE)
380 		STAILQ_INIT(&flushee->lr_flushq);
381 	flushee->lr_flushstate = nstate;
382 	STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink);
383 
384 	pthread_mutex_unlock(&tp->ltp_mtx);
385 
386 	return (0);
387 
388 done:
389 	/*
390 	 * This immediate op is ready to be replied-to now, so just
391 	 * stick it onto the reply queue.
392 	 */
393 	req->lr_workstate = L9P_WS_RESPQUEUED;
394 	STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
395 	pthread_mutex_unlock(&tp->ltp_mtx);
396 	pthread_cond_signal(&tp->ltp_reply_cv);
397 	return (0);
398 }
399 
400 int
l9p_threadpool_shutdown(struct l9p_threadpool * tp)401 l9p_threadpool_shutdown(struct l9p_threadpool *tp)
402 {
403 	struct l9p_worker *worker, *tmp;
404 
405 	LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) {
406 		pthread_mutex_lock(&tp->ltp_mtx);
407 		worker->ltw_exiting = true;
408 		if (worker->ltw_responder)
409 			pthread_cond_signal(&tp->ltp_reply_cv);
410 		else
411 			pthread_cond_broadcast(&tp->ltp_work_cv);
412 		pthread_mutex_unlock(&tp->ltp_mtx);
413 		pthread_join(worker->ltw_thread, NULL);
414 		LIST_REMOVE(worker, ltw_link);
415 		free(worker);
416 	}
417 	pthread_cond_destroy(&tp->ltp_reply_cv);
418 	pthread_cond_destroy(&tp->ltp_work_cv);
419 	pthread_mutex_destroy(&tp->ltp_mtx);
420 
421 	return (0);
422 }
423