1 /* 2 * Copyright 2016 Jakub Klama <jceel@FreeBSD.org> 3 * All rights reserved 4 * 5 * Copyright 2020 Joyent, Inc. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted providing that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY 20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 24 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING 25 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 * 28 */ 29 30 #include <errno.h> 31 #include <stdlib.h> 32 #include <pthread.h> 33 #if defined(__FreeBSD__) 34 #include <pthread_np.h> 35 #endif 36 #include <sys/queue.h> 37 #include "lib9p.h" 38 #include "threadpool.h" 39 40 static void l9p_threadpool_rflush(struct l9p_threadpool *tp, 41 struct l9p_request *req); 42 43 static void * 44 l9p_responder(void *arg) 45 { 46 struct l9p_threadpool *tp; 47 struct l9p_worker *worker = arg; 48 struct l9p_request *req; 49 50 tp = worker->ltw_tp; 51 for (;;) { 52 /* get next reply to send */ 53 54 if (pthread_mutex_lock(&tp->ltp_mtx) != 0) 55 break; 56 while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting) { 57 (void) pthread_cond_wait(&tp->ltp_reply_cv, 58 &tp->ltp_mtx); 59 } 60 if (worker->ltw_exiting) { 61 (void) pthread_mutex_unlock(&tp->ltp_mtx); 62 break; 63 } 64 65 /* off reply queue */ 66 req = STAILQ_FIRST(&tp->ltp_replyq); 67 STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink); 68 69 /* request is now in final glide path, can't be Tflush-ed */ 70 req->lr_workstate = L9P_WS_REPLYING; 71 72 /* any flushers waiting for this request can go now */ 73 if (req->lr_flushstate != L9P_FLUSH_NONE) 74 l9p_threadpool_rflush(tp, req); 75 76 if (pthread_mutex_unlock(&tp->ltp_mtx) != 0) 77 break; 78 79 /* send response */ 80 l9p_respond(req, false, true); 81 } 82 return (NULL); 83 } 84 85 static void * 86 l9p_worker(void *arg) 87 { 88 struct l9p_threadpool *tp; 89 struct l9p_worker *worker = arg; 90 struct l9p_request *req; 91 92 tp = worker->ltw_tp; 93 if (pthread_mutex_lock(&tp->ltp_mtx) != 0) 94 return (NULL); 95 for (;;) { 96 while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting) { 97 (void) pthread_cond_wait(&tp->ltp_work_cv, 98 &tp->ltp_mtx); 99 } 100 if (worker->ltw_exiting) 101 break; 102 103 /* off work queue; now work-in-progress, by us */ 104 req = STAILQ_FIRST(&tp->ltp_workq); 105 STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink); 106 req->lr_workstate = L9P_WS_INPROGRESS; 107 req->lr_worker = worker; 108 (void) pthread_mutex_unlock(&tp->ltp_mtx); 109 110 /* actually try the request */ 111 req->lr_error = l9p_dispatch_request(req); 112 113 /* move to responder queue, updating work-state */ 114 if (pthread_mutex_lock(&tp->ltp_mtx) != 0) 115 return (NULL); 116 req->lr_workstate = L9P_WS_RESPQUEUED; 117 req->lr_worker = NULL; 118 STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink); 119 120 /* signal the responder */ 121 (void) pthread_cond_signal(&tp->ltp_reply_cv); 122 } 123 (void) pthread_mutex_unlock(&tp->ltp_mtx); 124 return (NULL); 125 } 126 127 /* 128 * Just before finally replying to a request that got touched by 129 * a Tflush request, we enqueue its flushers (requests of type 130 * Tflush, which are now on the flushee's lr_flushq) onto the 131 * response queue. 132 */ 133 static void 134 l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req) 135 { 136 struct l9p_request *flusher; 137 138 /* 139 * https://swtch.com/plan9port/man/man9/flush.html says: 140 * 141 * "Should multiple Tflushes be received for a pending 142 * request, they must be answered in order. A Rflush for 143 * any of the multiple Tflushes implies an answer for all 144 * previous ones. Therefore, should a server receive a 145 * request and then multiple flushes for that request, it 146 * need respond only to the last flush." This means 147 * we could march through the queue of flushers here, 148 * marking all but the last one as "to be dropped" rather 149 * than "to be replied-to". 150 * 151 * However, we'll leave that for later, if ever -- it 152 * should be harmless to respond to each, in order. 153 */ 154 STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) { 155 flusher->lr_workstate = L9P_WS_RESPQUEUED; 156 #ifdef notdef 157 if (not the last) { 158 flusher->lr_flushstate = L9P_FLUSH_NOT_RUN; 159 /* or, flusher->lr_drop = true ? */ 160 } 161 #endif 162 STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink); 163 } 164 } 165 166 int 167 l9p_threadpool_init(struct l9p_threadpool *tp, int size) 168 { 169 struct l9p_worker *worker; 170 #if defined(__FreeBSD__) 171 char threadname[16]; 172 #endif 173 int error; 174 int i, nworkers, nresponders; 175 176 if (size <= 0) 177 return (EINVAL); 178 #ifdef __illumos__ 179 pthread_mutexattr_t attr; 180 181 if ((error = pthread_mutexattr_init(&attr)) != 0) 182 return (error); 183 if ((error = pthread_mutexattr_settype(&attr, 184 PTHREAD_MUTEX_ERRORCHECK)) != 0) { 185 return (error); 186 } 187 error = pthread_mutex_init(&tp->ltp_mtx, &attr); 188 #else 189 error = pthread_mutex_init(&tp->ltp_mtx, NULL); 190 #endif 191 if (error) 192 return (error); 193 error = pthread_cond_init(&tp->ltp_work_cv, NULL); 194 if (error) 195 goto fail_work_cv; 196 error = pthread_cond_init(&tp->ltp_reply_cv, NULL); 197 if (error) 198 goto fail_reply_cv; 199 200 STAILQ_INIT(&tp->ltp_workq); 201 STAILQ_INIT(&tp->ltp_replyq); 202 LIST_INIT(&tp->ltp_workers); 203 204 nresponders = 0; 205 nworkers = 0; 206 for (i = 0; i <= size; i++) { 207 worker = calloc(1, sizeof(struct l9p_worker)); 208 #ifdef __illumos__ 209 if (worker == NULL) 210 break; 211 #endif 212 worker->ltw_tp = tp; 213 worker->ltw_responder = i == 0; 214 error = pthread_create(&worker->ltw_thread, NULL, 215 worker->ltw_responder ? l9p_responder : l9p_worker, 216 (void *)worker); 217 if (error) { 218 free(worker); 219 break; 220 } 221 if (worker->ltw_responder) 222 nresponders++; 223 else 224 nworkers++; 225 226 #if defined(__FreeBSD__) 227 if (worker->ltw_responder) { 228 pthread_set_name_np(worker->ltw_thread, "9p-responder"); 229 } else { 230 sprintf(threadname, "9p-worker:%d", i - 1); 231 pthread_set_name_np(worker->ltw_thread, threadname); 232 } 233 #elif defined(__illumos__) 234 if (worker->ltw_responder) { 235 (void) pthread_setname_np(worker->ltw_thread, 236 "9p-responder"); 237 } else { 238 char threadname[PTHREAD_MAX_NAMELEN_NP]; 239 240 (void) snprintf(threadname, sizeof (threadname), 241 "9p-worker:%d", i - 1); 242 (void) pthread_setname_np(worker->ltw_thread, 243 threadname); 244 } 245 #endif 246 247 LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link); 248 } 249 if (nresponders == 0 || nworkers == 0) { 250 /* need the one responder, and at least one worker */ 251 l9p_threadpool_shutdown(tp); 252 return (error); 253 } 254 return (0); 255 256 /* 257 * We could avoid these labels by having multiple destroy 258 * paths (one for each error case), or by having booleans 259 * for which variables were initialized. Neither is very 260 * appealing... 261 */ 262 fail_reply_cv: 263 (void) pthread_cond_destroy(&tp->ltp_work_cv); 264 fail_work_cv: 265 (void) pthread_mutex_destroy(&tp->ltp_mtx); 266 267 return (error); 268 } 269 270 /* 271 * Run a request, usually by queueing it. 272 */ 273 void 274 l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req) 275 { 276 277 /* 278 * Flush requests must be handled specially, since they 279 * can cancel / kill off regular requests. (But we can 280 * run them through the regular dispatch mechanism.) 281 */ 282 if (req->lr_req.hdr.type == L9P_TFLUSH) { 283 /* not on a work queue yet so we can touch state */ 284 req->lr_workstate = L9P_WS_IMMEDIATE; 285 (void) l9p_dispatch_request(req); 286 } else { 287 if (pthread_mutex_lock(&tp->ltp_mtx) != 0) 288 return; 289 req->lr_workstate = L9P_WS_NOTSTARTED; 290 STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink); 291 (void) pthread_cond_signal(&tp->ltp_work_cv); 292 (void) pthread_mutex_unlock(&tp->ltp_mtx); 293 } 294 } 295 296 /* 297 * Run a Tflush request. Called via l9p_dispatch_request() since 298 * it has some debug code in it, but not called from worker thread. 299 */ 300 int 301 l9p_threadpool_tflush(struct l9p_request *req) 302 { 303 struct l9p_connection *conn; 304 struct l9p_threadpool *tp; 305 struct l9p_request *flushee; 306 uint16_t oldtag; 307 enum l9p_flushstate nstate = L9P_FLUSH_NONE; 308 int err; 309 310 /* 311 * Find what we're supposed to flush (the flushee, as it were). 312 */ 313 req->lr_error = 0; /* Tflush always succeeds */ 314 conn = req->lr_conn; 315 tp = &conn->lc_tp; 316 oldtag = req->lr_req.tflush.oldtag; 317 if ((err = ht_wrlock(&conn->lc_requests)) != 0) 318 return (err); 319 flushee = ht_find_locked(&conn->lc_requests, oldtag); 320 if (flushee == NULL) { 321 /* 322 * Nothing to flush! The old request must have 323 * been done and gone already. Just queue this 324 * Tflush for a success reply. 325 */ 326 (void) ht_unlock(&conn->lc_requests); 327 if ((err = pthread_mutex_lock(&tp->ltp_mtx)) != 0) 328 return (err); 329 goto done; 330 } 331 332 /* 333 * Found the original request. We'll need to inspect its 334 * work-state to figure out what to do. 335 */ 336 if ((err = pthread_mutex_lock(&tp->ltp_mtx)) != 0) { 337 (void) ht_unlock(&conn->lc_requests); 338 return (err); 339 } 340 (void) ht_unlock(&conn->lc_requests); 341 342 switch (flushee->lr_workstate) { 343 344 case L9P_WS_NOTSTARTED: 345 /* 346 * Flushee is on work queue, but not yet being 347 * handled by a worker. 348 * 349 * The documentation -- see 350 * http://ericvh.github.io/9p-rfc/rfc9p2000.html 351 * https://swtch.com/plan9port/man/man9/flush.html 352 * -- says that "the server should answer the 353 * flush message immediately". However, Linux 354 * sends flush requests for operations that 355 * must finish, such as Tclunk, and it's not 356 * possible to *answer* the flush request until 357 * it has been handled (if necessary) or aborted 358 * (if allowed). 359 * 360 * We therefore now just the original request 361 * and let the request-handler do whatever is 362 * appropriate. NOTE: we could have a table of 363 * "requests that can be aborted without being 364 * run" vs "requests that must be run to be 365 * aborted", but for now that seems like an 366 * unnecessary complication. 367 */ 368 nstate = L9P_FLUSH_REQUESTED_PRE_START; 369 break; 370 371 case L9P_WS_IMMEDIATE: 372 /* 373 * This state only applies to Tflush requests, and 374 * flushing a Tflush is illegal. But we'll do nothing 375 * special here, which will make us act like a flush 376 * request for the flushee that arrived too late to 377 * do anything about the flushee. 378 */ 379 nstate = L9P_FLUSH_REQUESTED_POST_START; 380 break; 381 382 case L9P_WS_INPROGRESS: 383 /* 384 * Worker thread flushee->lr_worker is working on it. 385 * Kick it to get it out of blocking system calls. 386 * (This requires that it carefully set up some 387 * signal handlers, and may be FreeBSD-dependent, 388 * it probably cannot be handled this way on MacOS.) 389 */ 390 #ifdef notyet 391 pthread_kill(...); 392 #endif 393 nstate = L9P_FLUSH_REQUESTED_POST_START; 394 break; 395 396 case L9P_WS_RESPQUEUED: 397 /* 398 * The flushee is already in the response queue. 399 * We'll just mark it as having had some flush 400 * action applied. 401 */ 402 nstate = L9P_FLUSH_TOOLATE; 403 break; 404 405 case L9P_WS_REPLYING: 406 /* 407 * Although we found the flushee, it's too late to 408 * make us depend on it: it's already heading out 409 * the door as a reply. 410 * 411 * We don't want to do anything to the flushee. 412 * Instead, we want to work the same way as if 413 * we had never found the tag. 414 */ 415 goto done; 416 } 417 418 /* 419 * Now add us to the list of Tflush-es that are waiting 420 * for the flushee (creating the list if needed, i.e., if 421 * this is the first Tflush for the flushee). We (req) 422 * will get queued for reply later, when the responder 423 * processes the flushee and calls l9p_threadpool_rflush(). 424 */ 425 if (flushee->lr_flushstate == L9P_FLUSH_NONE) 426 STAILQ_INIT(&flushee->lr_flushq); 427 flushee->lr_flushstate = nstate; 428 STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink); 429 430 (void) pthread_mutex_unlock(&tp->ltp_mtx); 431 432 return (0); 433 434 done: 435 /* 436 * This immediate op is ready to be replied-to now, so just 437 * stick it onto the reply queue. 438 */ 439 req->lr_workstate = L9P_WS_RESPQUEUED; 440 STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink); 441 (void) pthread_mutex_unlock(&tp->ltp_mtx); 442 (void) pthread_cond_signal(&tp->ltp_reply_cv); 443 return (0); 444 } 445 446 int 447 l9p_threadpool_shutdown(struct l9p_threadpool *tp) 448 { 449 struct l9p_worker *worker, *tmp; 450 451 LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) { 452 if (pthread_mutex_lock(&tp->ltp_mtx) != 0) 453 continue; 454 worker->ltw_exiting = true; 455 if (worker->ltw_responder) 456 (void) pthread_cond_signal(&tp->ltp_reply_cv); 457 else 458 (void) pthread_cond_broadcast(&tp->ltp_work_cv); 459 (void) pthread_mutex_unlock(&tp->ltp_mtx); 460 (void) pthread_join(worker->ltw_thread, NULL); 461 LIST_REMOVE(worker, ltw_link); 462 free(worker); 463 } 464 (void) pthread_cond_destroy(&tp->ltp_reply_cv); 465 (void) pthread_cond_destroy(&tp->ltp_work_cv); 466 (void) pthread_mutex_destroy(&tp->ltp_mtx); 467 468 return (0); 469 } 470