1 /* 2 * Copyright (C) 2007 Oracle. All rights reserved. 3 * 4 * This program is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU General Public 6 * License v2 as published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 11 * General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public 14 * License along with this program; if not, write to the 15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 16 * Boston, MA 021110-1307, USA. 17 */ 18 19 #include <linux/kthread.h> 20 #include <linux/list.h> 21 #include <linux/spinlock.h> 22 #include <linux/freezer.h> 23 #include "async-thread.h" 24 25 #define WORK_QUEUED_BIT 0 26 #define WORK_DONE_BIT 1 27 #define WORK_ORDER_DONE_BIT 2 28 #define WORK_HIGH_PRIO_BIT 3 29 30 /* 31 * container for the kthread task pointer and the list of pending work 32 * One of these is allocated per thread. 33 */ 34 struct btrfs_worker_thread { 35 /* pool we belong to */ 36 struct btrfs_workers *workers; 37 38 /* list of struct btrfs_work that are waiting for service */ 39 struct list_head pending; 40 struct list_head prio_pending; 41 42 /* list of worker threads from struct btrfs_workers */ 43 struct list_head worker_list; 44 45 /* kthread */ 46 struct task_struct *task; 47 48 /* number of things on the pending list */ 49 atomic_t num_pending; 50 51 /* reference counter for this struct */ 52 atomic_t refs; 53 54 unsigned long sequence; 55 56 /* protects the pending list. */ 57 spinlock_t lock; 58 59 /* set to non-zero when this thread is already awake and kicking */ 60 int working; 61 62 /* are we currently idle */ 63 int idle; 64 }; 65 66 /* 67 * helper function to move a thread onto the idle list after it 68 * has finished some requests. 69 */ 70 static void check_idle_worker(struct btrfs_worker_thread *worker) 71 { 72 if (!worker->idle && atomic_read(&worker->num_pending) < 73 worker->workers->idle_thresh / 2) { 74 unsigned long flags; 75 spin_lock_irqsave(&worker->workers->lock, flags); 76 worker->idle = 1; 77 78 /* the list may be empty if the worker is just starting */ 79 if (!list_empty(&worker->worker_list)) { 80 list_move(&worker->worker_list, 81 &worker->workers->idle_list); 82 } 83 spin_unlock_irqrestore(&worker->workers->lock, flags); 84 } 85 } 86 87 /* 88 * helper function to move a thread off the idle list after new 89 * pending work is added. 90 */ 91 static void check_busy_worker(struct btrfs_worker_thread *worker) 92 { 93 if (worker->idle && atomic_read(&worker->num_pending) >= 94 worker->workers->idle_thresh) { 95 unsigned long flags; 96 spin_lock_irqsave(&worker->workers->lock, flags); 97 worker->idle = 0; 98 99 if (!list_empty(&worker->worker_list)) { 100 list_move_tail(&worker->worker_list, 101 &worker->workers->worker_list); 102 } 103 spin_unlock_irqrestore(&worker->workers->lock, flags); 104 } 105 } 106 107 static void check_pending_worker_creates(struct btrfs_worker_thread *worker) 108 { 109 struct btrfs_workers *workers = worker->workers; 110 unsigned long flags; 111 112 rmb(); 113 if (!workers->atomic_start_pending) 114 return; 115 116 spin_lock_irqsave(&workers->lock, flags); 117 if (!workers->atomic_start_pending) 118 goto out; 119 120 workers->atomic_start_pending = 0; 121 if (workers->num_workers >= workers->max_workers) 122 goto out; 123 124 spin_unlock_irqrestore(&workers->lock, flags); 125 btrfs_start_workers(workers, 1); 126 return; 127 128 out: 129 spin_unlock_irqrestore(&workers->lock, flags); 130 } 131 132 static noinline int run_ordered_completions(struct btrfs_workers *workers, 133 struct btrfs_work *work) 134 { 135 if (!workers->ordered) 136 return 0; 137 138 set_bit(WORK_DONE_BIT, &work->flags); 139 140 spin_lock(&workers->order_lock); 141 142 while (1) { 143 if (!list_empty(&workers->prio_order_list)) { 144 work = list_entry(workers->prio_order_list.next, 145 struct btrfs_work, order_list); 146 } else if (!list_empty(&workers->order_list)) { 147 work = list_entry(workers->order_list.next, 148 struct btrfs_work, order_list); 149 } else { 150 break; 151 } 152 if (!test_bit(WORK_DONE_BIT, &work->flags)) 153 break; 154 155 /* we are going to call the ordered done function, but 156 * we leave the work item on the list as a barrier so 157 * that later work items that are done don't have their 158 * functions called before this one returns 159 */ 160 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 161 break; 162 163 spin_unlock(&workers->order_lock); 164 165 work->ordered_func(work); 166 167 /* now take the lock again and call the freeing code */ 168 spin_lock(&workers->order_lock); 169 list_del(&work->order_list); 170 work->ordered_free(work); 171 } 172 173 spin_unlock(&workers->order_lock); 174 return 0; 175 } 176 177 static void put_worker(struct btrfs_worker_thread *worker) 178 { 179 if (atomic_dec_and_test(&worker->refs)) 180 kfree(worker); 181 } 182 183 static int try_worker_shutdown(struct btrfs_worker_thread *worker) 184 { 185 int freeit = 0; 186 187 spin_lock_irq(&worker->lock); 188 spin_lock(&worker->workers->lock); 189 if (worker->workers->num_workers > 1 && 190 worker->idle && 191 !worker->working && 192 !list_empty(&worker->worker_list) && 193 list_empty(&worker->prio_pending) && 194 list_empty(&worker->pending) && 195 atomic_read(&worker->num_pending) == 0) { 196 freeit = 1; 197 list_del_init(&worker->worker_list); 198 worker->workers->num_workers--; 199 } 200 spin_unlock(&worker->workers->lock); 201 spin_unlock_irq(&worker->lock); 202 203 if (freeit) 204 put_worker(worker); 205 return freeit; 206 } 207 208 static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker, 209 struct list_head *prio_head, 210 struct list_head *head) 211 { 212 struct btrfs_work *work = NULL; 213 struct list_head *cur = NULL; 214 215 if(!list_empty(prio_head)) 216 cur = prio_head->next; 217 218 smp_mb(); 219 if (!list_empty(&worker->prio_pending)) 220 goto refill; 221 222 if (!list_empty(head)) 223 cur = head->next; 224 225 if (cur) 226 goto out; 227 228 refill: 229 spin_lock_irq(&worker->lock); 230 list_splice_tail_init(&worker->prio_pending, prio_head); 231 list_splice_tail_init(&worker->pending, head); 232 233 if (!list_empty(prio_head)) 234 cur = prio_head->next; 235 else if (!list_empty(head)) 236 cur = head->next; 237 spin_unlock_irq(&worker->lock); 238 239 if (!cur) 240 goto out_fail; 241 242 out: 243 work = list_entry(cur, struct btrfs_work, list); 244 245 out_fail: 246 return work; 247 } 248 249 /* 250 * main loop for servicing work items 251 */ 252 static int worker_loop(void *arg) 253 { 254 struct btrfs_worker_thread *worker = arg; 255 struct list_head head; 256 struct list_head prio_head; 257 struct btrfs_work *work; 258 259 INIT_LIST_HEAD(&head); 260 INIT_LIST_HEAD(&prio_head); 261 262 do { 263 again: 264 while (1) { 265 266 267 work = get_next_work(worker, &prio_head, &head); 268 if (!work) 269 break; 270 271 list_del(&work->list); 272 clear_bit(WORK_QUEUED_BIT, &work->flags); 273 274 work->worker = worker; 275 276 work->func(work); 277 278 atomic_dec(&worker->num_pending); 279 /* 280 * unless this is an ordered work queue, 281 * 'work' was probably freed by func above. 282 */ 283 run_ordered_completions(worker->workers, work); 284 285 check_pending_worker_creates(worker); 286 287 } 288 289 spin_lock_irq(&worker->lock); 290 check_idle_worker(worker); 291 292 if (freezing(current)) { 293 worker->working = 0; 294 spin_unlock_irq(&worker->lock); 295 refrigerator(); 296 } else { 297 spin_unlock_irq(&worker->lock); 298 if (!kthread_should_stop()) { 299 cpu_relax(); 300 /* 301 * we've dropped the lock, did someone else 302 * jump_in? 303 */ 304 smp_mb(); 305 if (!list_empty(&worker->pending) || 306 !list_empty(&worker->prio_pending)) 307 continue; 308 309 /* 310 * this short schedule allows more work to 311 * come in without the queue functions 312 * needing to go through wake_up_process() 313 * 314 * worker->working is still 1, so nobody 315 * is going to try and wake us up 316 */ 317 schedule_timeout(1); 318 smp_mb(); 319 if (!list_empty(&worker->pending) || 320 !list_empty(&worker->prio_pending)) 321 continue; 322 323 if (kthread_should_stop()) 324 break; 325 326 /* still no more work?, sleep for real */ 327 spin_lock_irq(&worker->lock); 328 set_current_state(TASK_INTERRUPTIBLE); 329 if (!list_empty(&worker->pending) || 330 !list_empty(&worker->prio_pending)) { 331 spin_unlock_irq(&worker->lock); 332 goto again; 333 } 334 335 /* 336 * this makes sure we get a wakeup when someone 337 * adds something new to the queue 338 */ 339 worker->working = 0; 340 spin_unlock_irq(&worker->lock); 341 342 if (!kthread_should_stop()) { 343 schedule_timeout(HZ * 120); 344 if (!worker->working && 345 try_worker_shutdown(worker)) { 346 return 0; 347 } 348 } 349 } 350 __set_current_state(TASK_RUNNING); 351 } 352 } while (!kthread_should_stop()); 353 return 0; 354 } 355 356 /* 357 * this will wait for all the worker threads to shutdown 358 */ 359 int btrfs_stop_workers(struct btrfs_workers *workers) 360 { 361 struct list_head *cur; 362 struct btrfs_worker_thread *worker; 363 int can_stop; 364 365 spin_lock_irq(&workers->lock); 366 list_splice_init(&workers->idle_list, &workers->worker_list); 367 while (!list_empty(&workers->worker_list)) { 368 cur = workers->worker_list.next; 369 worker = list_entry(cur, struct btrfs_worker_thread, 370 worker_list); 371 372 atomic_inc(&worker->refs); 373 workers->num_workers -= 1; 374 if (!list_empty(&worker->worker_list)) { 375 list_del_init(&worker->worker_list); 376 put_worker(worker); 377 can_stop = 1; 378 } else 379 can_stop = 0; 380 spin_unlock_irq(&workers->lock); 381 if (can_stop) 382 kthread_stop(worker->task); 383 spin_lock_irq(&workers->lock); 384 put_worker(worker); 385 } 386 spin_unlock_irq(&workers->lock); 387 return 0; 388 } 389 390 /* 391 * simple init on struct btrfs_workers 392 */ 393 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max) 394 { 395 workers->num_workers = 0; 396 INIT_LIST_HEAD(&workers->worker_list); 397 INIT_LIST_HEAD(&workers->idle_list); 398 INIT_LIST_HEAD(&workers->order_list); 399 INIT_LIST_HEAD(&workers->prio_order_list); 400 spin_lock_init(&workers->lock); 401 spin_lock_init(&workers->order_lock); 402 workers->max_workers = max; 403 workers->idle_thresh = 32; 404 workers->name = name; 405 workers->ordered = 0; 406 workers->atomic_start_pending = 0; 407 workers->atomic_worker_start = 0; 408 } 409 410 /* 411 * starts new worker threads. This does not enforce the max worker 412 * count in case you need to temporarily go past it. 413 */ 414 int btrfs_start_workers(struct btrfs_workers *workers, int num_workers) 415 { 416 struct btrfs_worker_thread *worker; 417 int ret = 0; 418 int i; 419 420 for (i = 0; i < num_workers; i++) { 421 worker = kzalloc(sizeof(*worker), GFP_NOFS); 422 if (!worker) { 423 ret = -ENOMEM; 424 goto fail; 425 } 426 427 INIT_LIST_HEAD(&worker->pending); 428 INIT_LIST_HEAD(&worker->prio_pending); 429 INIT_LIST_HEAD(&worker->worker_list); 430 spin_lock_init(&worker->lock); 431 432 atomic_set(&worker->num_pending, 0); 433 atomic_set(&worker->refs, 1); 434 worker->workers = workers; 435 worker->task = kthread_run(worker_loop, worker, 436 "btrfs-%s-%d", workers->name, 437 workers->num_workers + i); 438 if (IS_ERR(worker->task)) { 439 ret = PTR_ERR(worker->task); 440 kfree(worker); 441 goto fail; 442 } 443 spin_lock_irq(&workers->lock); 444 list_add_tail(&worker->worker_list, &workers->idle_list); 445 worker->idle = 1; 446 workers->num_workers++; 447 spin_unlock_irq(&workers->lock); 448 } 449 return 0; 450 fail: 451 btrfs_stop_workers(workers); 452 return ret; 453 } 454 455 /* 456 * run through the list and find a worker thread that doesn't have a lot 457 * to do right now. This can return null if we aren't yet at the thread 458 * count limit and all of the threads are busy. 459 */ 460 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers) 461 { 462 struct btrfs_worker_thread *worker; 463 struct list_head *next; 464 int enforce_min = workers->num_workers < workers->max_workers; 465 466 /* 467 * if we find an idle thread, don't move it to the end of the 468 * idle list. This improves the chance that the next submission 469 * will reuse the same thread, and maybe catch it while it is still 470 * working 471 */ 472 if (!list_empty(&workers->idle_list)) { 473 next = workers->idle_list.next; 474 worker = list_entry(next, struct btrfs_worker_thread, 475 worker_list); 476 return worker; 477 } 478 if (enforce_min || list_empty(&workers->worker_list)) 479 return NULL; 480 481 /* 482 * if we pick a busy task, move the task to the end of the list. 483 * hopefully this will keep things somewhat evenly balanced. 484 * Do the move in batches based on the sequence number. This groups 485 * requests submitted at roughly the same time onto the same worker. 486 */ 487 next = workers->worker_list.next; 488 worker = list_entry(next, struct btrfs_worker_thread, worker_list); 489 worker->sequence++; 490 491 if (worker->sequence % workers->idle_thresh == 0) 492 list_move_tail(next, &workers->worker_list); 493 return worker; 494 } 495 496 /* 497 * selects a worker thread to take the next job. This will either find 498 * an idle worker, start a new worker up to the max count, or just return 499 * one of the existing busy workers. 500 */ 501 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) 502 { 503 struct btrfs_worker_thread *worker; 504 unsigned long flags; 505 struct list_head *fallback; 506 507 again: 508 spin_lock_irqsave(&workers->lock, flags); 509 worker = next_worker(workers); 510 511 if (!worker) { 512 if (workers->num_workers >= workers->max_workers) { 513 goto fallback; 514 } else if (workers->atomic_worker_start) { 515 workers->atomic_start_pending = 1; 516 goto fallback; 517 } else { 518 spin_unlock_irqrestore(&workers->lock, flags); 519 /* we're below the limit, start another worker */ 520 btrfs_start_workers(workers, 1); 521 goto again; 522 } 523 } 524 goto found; 525 526 fallback: 527 fallback = NULL; 528 /* 529 * we have failed to find any workers, just 530 * return the first one we can find. 531 */ 532 if (!list_empty(&workers->worker_list)) 533 fallback = workers->worker_list.next; 534 if (!list_empty(&workers->idle_list)) 535 fallback = workers->idle_list.next; 536 BUG_ON(!fallback); 537 worker = list_entry(fallback, 538 struct btrfs_worker_thread, worker_list); 539 found: 540 /* 541 * this makes sure the worker doesn't exit before it is placed 542 * onto a busy/idle list 543 */ 544 atomic_inc(&worker->num_pending); 545 spin_unlock_irqrestore(&workers->lock, flags); 546 return worker; 547 } 548 549 /* 550 * btrfs_requeue_work just puts the work item back on the tail of the list 551 * it was taken from. It is intended for use with long running work functions 552 * that make some progress and want to give the cpu up for others. 553 */ 554 int btrfs_requeue_work(struct btrfs_work *work) 555 { 556 struct btrfs_worker_thread *worker = work->worker; 557 unsigned long flags; 558 int wake = 0; 559 560 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 561 goto out; 562 563 spin_lock_irqsave(&worker->lock, flags); 564 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 565 list_add_tail(&work->list, &worker->prio_pending); 566 else 567 list_add_tail(&work->list, &worker->pending); 568 atomic_inc(&worker->num_pending); 569 570 /* by definition we're busy, take ourselves off the idle 571 * list 572 */ 573 if (worker->idle) { 574 spin_lock(&worker->workers->lock); 575 worker->idle = 0; 576 list_move_tail(&worker->worker_list, 577 &worker->workers->worker_list); 578 spin_unlock(&worker->workers->lock); 579 } 580 if (!worker->working) { 581 wake = 1; 582 worker->working = 1; 583 } 584 585 if (wake) 586 wake_up_process(worker->task); 587 spin_unlock_irqrestore(&worker->lock, flags); 588 out: 589 590 return 0; 591 } 592 593 void btrfs_set_work_high_prio(struct btrfs_work *work) 594 { 595 set_bit(WORK_HIGH_PRIO_BIT, &work->flags); 596 } 597 598 /* 599 * places a struct btrfs_work into the pending queue of one of the kthreads 600 */ 601 int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) 602 { 603 struct btrfs_worker_thread *worker; 604 unsigned long flags; 605 int wake = 0; 606 607 /* don't requeue something already on a list */ 608 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 609 goto out; 610 611 worker = find_worker(workers); 612 if (workers->ordered) { 613 /* 614 * you're not allowed to do ordered queues from an 615 * interrupt handler 616 */ 617 spin_lock(&workers->order_lock); 618 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) { 619 list_add_tail(&work->order_list, 620 &workers->prio_order_list); 621 } else { 622 list_add_tail(&work->order_list, &workers->order_list); 623 } 624 spin_unlock(&workers->order_lock); 625 } else { 626 INIT_LIST_HEAD(&work->order_list); 627 } 628 629 spin_lock_irqsave(&worker->lock, flags); 630 631 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 632 list_add_tail(&work->list, &worker->prio_pending); 633 else 634 list_add_tail(&work->list, &worker->pending); 635 check_busy_worker(worker); 636 637 /* 638 * avoid calling into wake_up_process if this thread has already 639 * been kicked 640 */ 641 if (!worker->working) 642 wake = 1; 643 worker->working = 1; 644 645 if (wake) 646 wake_up_process(worker->task); 647 spin_unlock_irqrestore(&worker->lock, flags); 648 649 out: 650 return 0; 651 } 652