1 /* 2 * Copyright 2016 Jakub Klama <jceel@FreeBSD.org> 3 * All rights reserved 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted providing that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 15 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 16 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY 18 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 22 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING 23 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 24 * POSSIBILITY OF SUCH DAMAGE. 25 * 26 */ 27 28 #include <errno.h> 29 #include <stdlib.h> 30 #include <pthread.h> 31 #if defined(__FreeBSD__) 32 #include <pthread_np.h> 33 #endif 34 #include <sys/queue.h> 35 #include "lib9p.h" 36 #include "threadpool.h" 37 38 static void l9p_threadpool_rflush(struct l9p_threadpool *tp, 39 struct l9p_request *req); 40 41 static void * 42 l9p_responder(void *arg) 43 { 44 struct l9p_threadpool *tp; 45 struct l9p_worker *worker = arg; 46 struct l9p_request *req; 47 48 tp = worker->ltw_tp; 49 for (;;) { 50 /* get next reply to send */ 51 pthread_mutex_lock(&tp->ltp_mtx); 52 while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting) 53 pthread_cond_wait(&tp->ltp_reply_cv, &tp->ltp_mtx); 54 if (worker->ltw_exiting) { 55 pthread_mutex_unlock(&tp->ltp_mtx); 56 break; 57 } 58 59 /* off reply queue */ 60 req = STAILQ_FIRST(&tp->ltp_replyq); 61 STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink); 62 63 /* request is now in final glide path, can't be Tflush-ed */ 64 req->lr_workstate = L9P_WS_REPLYING; 65 66 /* any flushers waiting for this request can go now */ 67 if (req->lr_flushstate != L9P_FLUSH_NONE) 68 l9p_threadpool_rflush(tp, req); 69 70 pthread_mutex_unlock(&tp->ltp_mtx); 71 72 /* send response */ 73 l9p_respond(req, false, true); 74 } 75 return (NULL); 76 } 77 78 static void * 79 l9p_worker(void *arg) 80 { 81 struct l9p_threadpool *tp; 82 struct l9p_worker *worker = arg; 83 struct l9p_request *req; 84 85 tp = worker->ltw_tp; 86 pthread_mutex_lock(&tp->ltp_mtx); 87 for (;;) { 88 while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting) 89 pthread_cond_wait(&tp->ltp_work_cv, &tp->ltp_mtx); 90 if (worker->ltw_exiting) 91 break; 92 93 /* off work queue; now work-in-progress, by us */ 94 req = STAILQ_FIRST(&tp->ltp_workq); 95 STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink); 96 req->lr_workstate = L9P_WS_INPROGRESS; 97 req->lr_worker = worker; 98 pthread_mutex_unlock(&tp->ltp_mtx); 99 100 /* actually try the request */ 101 req->lr_error = l9p_dispatch_request(req); 102 103 /* move to responder queue, updating work-state */ 104 pthread_mutex_lock(&tp->ltp_mtx); 105 req->lr_workstate = L9P_WS_RESPQUEUED; 106 req->lr_worker = NULL; 107 STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink); 108 109 /* signal the responder */ 110 pthread_cond_signal(&tp->ltp_reply_cv); 111 } 112 pthread_mutex_unlock(&tp->ltp_mtx); 113 return (NULL); 114 } 115 116 /* 117 * Just before finally replying to a request that got touched by 118 * a Tflush request, we enqueue its flushers (requests of type 119 * Tflush, which are now on the flushee's lr_flushq) onto the 120 * response queue. 121 */ 122 static void 123 l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req) 124 { 125 struct l9p_request *flusher; 126 127 /* 128 * https://swtch.com/plan9port/man/man9/flush.html says: 129 * 130 * "Should multiple Tflushes be received for a pending 131 * request, they must be answered in order. A Rflush for 132 * any of the multiple Tflushes implies an answer for all 133 * previous ones. Therefore, should a server receive a 134 * request and then multiple flushes for that request, it 135 * need respond only to the last flush." This means 136 * we could march through the queue of flushers here, 137 * marking all but the last one as "to be dropped" rather 138 * than "to be replied-to". 139 * 140 * However, we'll leave that for later, if ever -- it 141 * should be harmless to respond to each, in order. 142 */ 143 STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) { 144 flusher->lr_workstate = L9P_WS_RESPQUEUED; 145 #ifdef notdef 146 if (not the last) { 147 flusher->lr_flushstate = L9P_FLUSH_NOT_RUN; 148 /* or, flusher->lr_drop = true ? */ 149 } 150 #endif 151 STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink); 152 } 153 } 154 155 int 156 l9p_threadpool_init(struct l9p_threadpool *tp, int size) 157 { 158 struct l9p_worker *worker; 159 #if defined(__FreeBSD__) 160 char threadname[16]; 161 #endif 162 int error; 163 int i, nworkers, nresponders; 164 165 if (size <= 0) 166 return (EINVAL); 167 error = pthread_mutex_init(&tp->ltp_mtx, NULL); 168 if (error) 169 return (error); 170 error = pthread_cond_init(&tp->ltp_work_cv, NULL); 171 if (error) 172 goto fail_work_cv; 173 error = pthread_cond_init(&tp->ltp_reply_cv, NULL); 174 if (error) 175 goto fail_reply_cv; 176 177 STAILQ_INIT(&tp->ltp_workq); 178 STAILQ_INIT(&tp->ltp_replyq); 179 LIST_INIT(&tp->ltp_workers); 180 181 nresponders = 0; 182 nworkers = 0; 183 for (i = 0; i <= size; i++) { 184 worker = calloc(1, sizeof(struct l9p_worker)); 185 worker->ltw_tp = tp; 186 worker->ltw_responder = i == 0; 187 error = pthread_create(&worker->ltw_thread, NULL, 188 worker->ltw_responder ? l9p_responder : l9p_worker, 189 (void *)worker); 190 if (error) { 191 free(worker); 192 break; 193 } 194 if (worker->ltw_responder) 195 nresponders++; 196 else 197 nworkers++; 198 199 #if defined(__FreeBSD__) 200 if (worker->ltw_responder) { 201 pthread_set_name_np(worker->ltw_thread, "9p-responder"); 202 } else { 203 sprintf(threadname, "9p-worker:%d", i - 1); 204 pthread_set_name_np(worker->ltw_thread, threadname); 205 } 206 #endif 207 208 LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link); 209 } 210 if (nresponders == 0 || nworkers == 0) { 211 /* need the one responder, and at least one worker */ 212 l9p_threadpool_shutdown(tp); 213 return (error); 214 } 215 return (0); 216 217 /* 218 * We could avoid these labels by having multiple destroy 219 * paths (one for each error case), or by having booleans 220 * for which variables were initialized. Neither is very 221 * appealing... 222 */ 223 fail_reply_cv: 224 pthread_cond_destroy(&tp->ltp_work_cv); 225 fail_work_cv: 226 pthread_mutex_destroy(&tp->ltp_mtx); 227 228 return (error); 229 } 230 231 /* 232 * Run a request, usually by queueing it. 233 */ 234 void 235 l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req) 236 { 237 238 /* 239 * Flush requests must be handled specially, since they 240 * can cancel / kill off regular requests. (But we can 241 * run them through the regular dispatch mechanism.) 242 */ 243 if (req->lr_req.hdr.type == L9P_TFLUSH) { 244 /* not on a work queue yet so we can touch state */ 245 req->lr_workstate = L9P_WS_IMMEDIATE; 246 (void) l9p_dispatch_request(req); 247 } else { 248 pthread_mutex_lock(&tp->ltp_mtx); 249 req->lr_workstate = L9P_WS_NOTSTARTED; 250 STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink); 251 pthread_cond_signal(&tp->ltp_work_cv); 252 pthread_mutex_unlock(&tp->ltp_mtx); 253 } 254 } 255 256 /* 257 * Run a Tflush request. Called via l9p_dispatch_request() since 258 * it has some debug code in it, but not called from worker thread. 259 */ 260 int 261 l9p_threadpool_tflush(struct l9p_request *req) 262 { 263 struct l9p_connection *conn; 264 struct l9p_threadpool *tp; 265 struct l9p_request *flushee; 266 uint16_t oldtag; 267 enum l9p_flushstate nstate; 268 269 /* 270 * Find what we're supposed to flush (the flushee, as it were). 271 */ 272 req->lr_error = 0; /* Tflush always succeeds */ 273 conn = req->lr_conn; 274 tp = &conn->lc_tp; 275 oldtag = req->lr_req.tflush.oldtag; 276 ht_wrlock(&conn->lc_requests); 277 flushee = ht_find_locked(&conn->lc_requests, oldtag); 278 if (flushee == NULL) { 279 /* 280 * Nothing to flush! The old request must have 281 * been done and gone already. Just queue this 282 * Tflush for a success reply. 283 */ 284 ht_unlock(&conn->lc_requests); 285 pthread_mutex_lock(&tp->ltp_mtx); 286 goto done; 287 } 288 289 /* 290 * Found the original request. We'll need to inspect its 291 * work-state to figure out what to do. 292 */ 293 pthread_mutex_lock(&tp->ltp_mtx); 294 ht_unlock(&conn->lc_requests); 295 296 switch (flushee->lr_workstate) { 297 298 case L9P_WS_NOTSTARTED: 299 /* 300 * Flushee is on work queue, but not yet being 301 * handled by a worker. 302 * 303 * The documentation -- see 304 * http://ericvh.github.io/9p-rfc/rfc9p2000.html 305 * https://swtch.com/plan9port/man/man9/flush.html 306 * -- says that "the server should answer the 307 * flush message immediately". However, Linux 308 * sends flush requests for operations that 309 * must finish, such as Tclunk, and it's not 310 * possible to *answer* the flush request until 311 * it has been handled (if necessary) or aborted 312 * (if allowed). 313 * 314 * We therefore now just the original request 315 * and let the request-handler do whatever is 316 * appropriate. NOTE: we could have a table of 317 * "requests that can be aborted without being 318 * run" vs "requests that must be run to be 319 * aborted", but for now that seems like an 320 * unnecessary complication. 321 */ 322 nstate = L9P_FLUSH_REQUESTED_PRE_START; 323 break; 324 325 case L9P_WS_IMMEDIATE: 326 /* 327 * This state only applies to Tflush requests, and 328 * flushing a Tflush is illegal. But we'll do nothing 329 * special here, which will make us act like a flush 330 * request for the flushee that arrived too late to 331 * do anything about the flushee. 332 */ 333 nstate = L9P_FLUSH_REQUESTED_POST_START; 334 break; 335 336 case L9P_WS_INPROGRESS: 337 /* 338 * Worker thread flushee->lr_worker is working on it. 339 * Kick it to get it out of blocking system calls. 340 * (This requires that it carefully set up some 341 * signal handlers, and may be FreeBSD-dependent, 342 * it probably cannot be handled this way on MacOS.) 343 */ 344 #ifdef notyet 345 pthread_kill(...); 346 #endif 347 nstate = L9P_FLUSH_REQUESTED_POST_START; 348 break; 349 350 case L9P_WS_RESPQUEUED: 351 /* 352 * The flushee is already in the response queue. 353 * We'll just mark it as having had some flush 354 * action applied. 355 */ 356 nstate = L9P_FLUSH_TOOLATE; 357 break; 358 359 case L9P_WS_REPLYING: 360 /* 361 * Although we found the flushee, it's too late to 362 * make us depend on it: it's already heading out 363 * the door as a reply. 364 * 365 * We don't want to do anything to the flushee. 366 * Instead, we want to work the same way as if 367 * we had never found the tag. 368 */ 369 goto done; 370 } 371 372 /* 373 * Now add us to the list of Tflush-es that are waiting 374 * for the flushee (creating the list if needed, i.e., if 375 * this is the first Tflush for the flushee). We (req) 376 * will get queued for reply later, when the responder 377 * processes the flushee and calls l9p_threadpool_rflush(). 378 */ 379 if (flushee->lr_flushstate == L9P_FLUSH_NONE) 380 STAILQ_INIT(&flushee->lr_flushq); 381 flushee->lr_flushstate = nstate; 382 STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink); 383 384 pthread_mutex_unlock(&tp->ltp_mtx); 385 386 return (0); 387 388 done: 389 /* 390 * This immediate op is ready to be replied-to now, so just 391 * stick it onto the reply queue. 392 */ 393 req->lr_workstate = L9P_WS_RESPQUEUED; 394 STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink); 395 pthread_mutex_unlock(&tp->ltp_mtx); 396 pthread_cond_signal(&tp->ltp_reply_cv); 397 return (0); 398 } 399 400 int 401 l9p_threadpool_shutdown(struct l9p_threadpool *tp) 402 { 403 struct l9p_worker *worker, *tmp; 404 405 LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) { 406 pthread_mutex_lock(&tp->ltp_mtx); 407 worker->ltw_exiting = true; 408 if (worker->ltw_responder) 409 pthread_cond_signal(&tp->ltp_reply_cv); 410 else 411 pthread_cond_broadcast(&tp->ltp_work_cv); 412 pthread_mutex_unlock(&tp->ltp_mtx); 413 pthread_join(worker->ltw_thread, NULL); 414 LIST_REMOVE(worker, ltw_link); 415 free(worker); 416 } 417 pthread_cond_destroy(&tp->ltp_reply_cv); 418 pthread_cond_destroy(&tp->ltp_work_cv); 419 pthread_mutex_destroy(&tp->ltp_mtx); 420 421 return (0); 422 } 423