1 /* 2 * Copyright (C) 2007 Oracle. All rights reserved. 3 * 4 * This program is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU General Public 6 * License v2 as published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 11 * General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public 14 * License along with this program; if not, write to the 15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 16 * Boston, MA 021110-1307, USA. 17 */ 18 19 #include <linux/kthread.h> 20 #include <linux/slab.h> 21 #include <linux/list.h> 22 #include <linux/spinlock.h> 23 #include <linux/freezer.h> 24 #include "async-thread.h" 25 26 #define WORK_QUEUED_BIT 0 27 #define WORK_DONE_BIT 1 28 #define WORK_ORDER_DONE_BIT 2 29 #define WORK_HIGH_PRIO_BIT 3 30 31 /* 32 * container for the kthread task pointer and the list of pending work 33 * One of these is allocated per thread. 34 */ 35 struct btrfs_worker_thread { 36 /* pool we belong to */ 37 struct btrfs_workers *workers; 38 39 /* list of struct btrfs_work that are waiting for service */ 40 struct list_head pending; 41 struct list_head prio_pending; 42 43 /* list of worker threads from struct btrfs_workers */ 44 struct list_head worker_list; 45 46 /* kthread */ 47 struct task_struct *task; 48 49 /* number of things on the pending list */ 50 atomic_t num_pending; 51 52 /* reference counter for this struct */ 53 atomic_t refs; 54 55 unsigned long sequence; 56 57 /* protects the pending list. */ 58 spinlock_t lock; 59 60 /* set to non-zero when this thread is already awake and kicking */ 61 int working; 62 63 /* are we currently idle */ 64 int idle; 65 }; 66 67 /* 68 * btrfs_start_workers uses kthread_run, which can block waiting for memory 69 * for a very long time. It will actually throttle on page writeback, 70 * and so it may not make progress until after our btrfs worker threads 71 * process all of the pending work structs in their queue 72 * 73 * This means we can't use btrfs_start_workers from inside a btrfs worker 74 * thread that is used as part of cleaning dirty memory, which pretty much 75 * involves all of the worker threads. 76 * 77 * Instead we have a helper queue who never has more than one thread 78 * where we scheduler thread start operations. This worker_start struct 79 * is used to contain the work and hold a pointer to the queue that needs 80 * another worker. 81 */ 82 struct worker_start { 83 struct btrfs_work work; 84 struct btrfs_workers *queue; 85 }; 86 87 static void start_new_worker_func(struct btrfs_work *work) 88 { 89 struct worker_start *start; 90 start = container_of(work, struct worker_start, work); 91 btrfs_start_workers(start->queue, 1); 92 kfree(start); 93 } 94 95 static int start_new_worker(struct btrfs_workers *queue) 96 { 97 struct worker_start *start; 98 int ret; 99 100 start = kzalloc(sizeof(*start), GFP_NOFS); 101 if (!start) 102 return -ENOMEM; 103 104 start->work.func = start_new_worker_func; 105 start->queue = queue; 106 ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work); 107 if (ret) 108 kfree(start); 109 return ret; 110 } 111 112 /* 113 * helper function to move a thread onto the idle list after it 114 * has finished some requests. 115 */ 116 static void check_idle_worker(struct btrfs_worker_thread *worker) 117 { 118 if (!worker->idle && atomic_read(&worker->num_pending) < 119 worker->workers->idle_thresh / 2) { 120 unsigned long flags; 121 spin_lock_irqsave(&worker->workers->lock, flags); 122 worker->idle = 1; 123 124 /* the list may be empty if the worker is just starting */ 125 if (!list_empty(&worker->worker_list)) { 126 list_move(&worker->worker_list, 127 &worker->workers->idle_list); 128 } 129 spin_unlock_irqrestore(&worker->workers->lock, flags); 130 } 131 } 132 133 /* 134 * helper function to move a thread off the idle list after new 135 * pending work is added. 136 */ 137 static void check_busy_worker(struct btrfs_worker_thread *worker) 138 { 139 if (worker->idle && atomic_read(&worker->num_pending) >= 140 worker->workers->idle_thresh) { 141 unsigned long flags; 142 spin_lock_irqsave(&worker->workers->lock, flags); 143 worker->idle = 0; 144 145 if (!list_empty(&worker->worker_list)) { 146 list_move_tail(&worker->worker_list, 147 &worker->workers->worker_list); 148 } 149 spin_unlock_irqrestore(&worker->workers->lock, flags); 150 } 151 } 152 153 static void check_pending_worker_creates(struct btrfs_worker_thread *worker) 154 { 155 struct btrfs_workers *workers = worker->workers; 156 unsigned long flags; 157 158 rmb(); 159 if (!workers->atomic_start_pending) 160 return; 161 162 spin_lock_irqsave(&workers->lock, flags); 163 if (!workers->atomic_start_pending) 164 goto out; 165 166 workers->atomic_start_pending = 0; 167 if (workers->num_workers + workers->num_workers_starting >= 168 workers->max_workers) 169 goto out; 170 171 workers->num_workers_starting += 1; 172 spin_unlock_irqrestore(&workers->lock, flags); 173 start_new_worker(workers); 174 return; 175 176 out: 177 spin_unlock_irqrestore(&workers->lock, flags); 178 } 179 180 static noinline int run_ordered_completions(struct btrfs_workers *workers, 181 struct btrfs_work *work) 182 { 183 if (!workers->ordered) 184 return 0; 185 186 set_bit(WORK_DONE_BIT, &work->flags); 187 188 spin_lock(&workers->order_lock); 189 190 while (1) { 191 if (!list_empty(&workers->prio_order_list)) { 192 work = list_entry(workers->prio_order_list.next, 193 struct btrfs_work, order_list); 194 } else if (!list_empty(&workers->order_list)) { 195 work = list_entry(workers->order_list.next, 196 struct btrfs_work, order_list); 197 } else { 198 break; 199 } 200 if (!test_bit(WORK_DONE_BIT, &work->flags)) 201 break; 202 203 /* we are going to call the ordered done function, but 204 * we leave the work item on the list as a barrier so 205 * that later work items that are done don't have their 206 * functions called before this one returns 207 */ 208 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 209 break; 210 211 spin_unlock(&workers->order_lock); 212 213 work->ordered_func(work); 214 215 /* now take the lock again and call the freeing code */ 216 spin_lock(&workers->order_lock); 217 list_del(&work->order_list); 218 work->ordered_free(work); 219 } 220 221 spin_unlock(&workers->order_lock); 222 return 0; 223 } 224 225 static void put_worker(struct btrfs_worker_thread *worker) 226 { 227 if (atomic_dec_and_test(&worker->refs)) 228 kfree(worker); 229 } 230 231 static int try_worker_shutdown(struct btrfs_worker_thread *worker) 232 { 233 int freeit = 0; 234 235 spin_lock_irq(&worker->lock); 236 spin_lock(&worker->workers->lock); 237 if (worker->workers->num_workers > 1 && 238 worker->idle && 239 !worker->working && 240 !list_empty(&worker->worker_list) && 241 list_empty(&worker->prio_pending) && 242 list_empty(&worker->pending) && 243 atomic_read(&worker->num_pending) == 0) { 244 freeit = 1; 245 list_del_init(&worker->worker_list); 246 worker->workers->num_workers--; 247 } 248 spin_unlock(&worker->workers->lock); 249 spin_unlock_irq(&worker->lock); 250 251 if (freeit) 252 put_worker(worker); 253 return freeit; 254 } 255 256 static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker, 257 struct list_head *prio_head, 258 struct list_head *head) 259 { 260 struct btrfs_work *work = NULL; 261 struct list_head *cur = NULL; 262 263 if(!list_empty(prio_head)) 264 cur = prio_head->next; 265 266 smp_mb(); 267 if (!list_empty(&worker->prio_pending)) 268 goto refill; 269 270 if (!list_empty(head)) 271 cur = head->next; 272 273 if (cur) 274 goto out; 275 276 refill: 277 spin_lock_irq(&worker->lock); 278 list_splice_tail_init(&worker->prio_pending, prio_head); 279 list_splice_tail_init(&worker->pending, head); 280 281 if (!list_empty(prio_head)) 282 cur = prio_head->next; 283 else if (!list_empty(head)) 284 cur = head->next; 285 spin_unlock_irq(&worker->lock); 286 287 if (!cur) 288 goto out_fail; 289 290 out: 291 work = list_entry(cur, struct btrfs_work, list); 292 293 out_fail: 294 return work; 295 } 296 297 /* 298 * main loop for servicing work items 299 */ 300 static int worker_loop(void *arg) 301 { 302 struct btrfs_worker_thread *worker = arg; 303 struct list_head head; 304 struct list_head prio_head; 305 struct btrfs_work *work; 306 307 INIT_LIST_HEAD(&head); 308 INIT_LIST_HEAD(&prio_head); 309 310 do { 311 again: 312 while (1) { 313 314 315 work = get_next_work(worker, &prio_head, &head); 316 if (!work) 317 break; 318 319 list_del(&work->list); 320 clear_bit(WORK_QUEUED_BIT, &work->flags); 321 322 work->worker = worker; 323 324 work->func(work); 325 326 atomic_dec(&worker->num_pending); 327 /* 328 * unless this is an ordered work queue, 329 * 'work' was probably freed by func above. 330 */ 331 run_ordered_completions(worker->workers, work); 332 333 check_pending_worker_creates(worker); 334 335 } 336 337 spin_lock_irq(&worker->lock); 338 check_idle_worker(worker); 339 340 if (freezing(current)) { 341 worker->working = 0; 342 spin_unlock_irq(&worker->lock); 343 refrigerator(); 344 } else { 345 spin_unlock_irq(&worker->lock); 346 if (!kthread_should_stop()) { 347 cpu_relax(); 348 /* 349 * we've dropped the lock, did someone else 350 * jump_in? 351 */ 352 smp_mb(); 353 if (!list_empty(&worker->pending) || 354 !list_empty(&worker->prio_pending)) 355 continue; 356 357 /* 358 * this short schedule allows more work to 359 * come in without the queue functions 360 * needing to go through wake_up_process() 361 * 362 * worker->working is still 1, so nobody 363 * is going to try and wake us up 364 */ 365 schedule_timeout(1); 366 smp_mb(); 367 if (!list_empty(&worker->pending) || 368 !list_empty(&worker->prio_pending)) 369 continue; 370 371 if (kthread_should_stop()) 372 break; 373 374 /* still no more work?, sleep for real */ 375 spin_lock_irq(&worker->lock); 376 set_current_state(TASK_INTERRUPTIBLE); 377 if (!list_empty(&worker->pending) || 378 !list_empty(&worker->prio_pending)) { 379 spin_unlock_irq(&worker->lock); 380 goto again; 381 } 382 383 /* 384 * this makes sure we get a wakeup when someone 385 * adds something new to the queue 386 */ 387 worker->working = 0; 388 spin_unlock_irq(&worker->lock); 389 390 if (!kthread_should_stop()) { 391 schedule_timeout(HZ * 120); 392 if (!worker->working && 393 try_worker_shutdown(worker)) { 394 return 0; 395 } 396 } 397 } 398 __set_current_state(TASK_RUNNING); 399 } 400 } while (!kthread_should_stop()); 401 return 0; 402 } 403 404 /* 405 * this will wait for all the worker threads to shutdown 406 */ 407 int btrfs_stop_workers(struct btrfs_workers *workers) 408 { 409 struct list_head *cur; 410 struct btrfs_worker_thread *worker; 411 int can_stop; 412 413 spin_lock_irq(&workers->lock); 414 list_splice_init(&workers->idle_list, &workers->worker_list); 415 while (!list_empty(&workers->worker_list)) { 416 cur = workers->worker_list.next; 417 worker = list_entry(cur, struct btrfs_worker_thread, 418 worker_list); 419 420 atomic_inc(&worker->refs); 421 workers->num_workers -= 1; 422 if (!list_empty(&worker->worker_list)) { 423 list_del_init(&worker->worker_list); 424 put_worker(worker); 425 can_stop = 1; 426 } else 427 can_stop = 0; 428 spin_unlock_irq(&workers->lock); 429 if (can_stop) 430 kthread_stop(worker->task); 431 spin_lock_irq(&workers->lock); 432 put_worker(worker); 433 } 434 spin_unlock_irq(&workers->lock); 435 return 0; 436 } 437 438 /* 439 * simple init on struct btrfs_workers 440 */ 441 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max, 442 struct btrfs_workers *async_helper) 443 { 444 workers->num_workers = 0; 445 workers->num_workers_starting = 0; 446 INIT_LIST_HEAD(&workers->worker_list); 447 INIT_LIST_HEAD(&workers->idle_list); 448 INIT_LIST_HEAD(&workers->order_list); 449 INIT_LIST_HEAD(&workers->prio_order_list); 450 spin_lock_init(&workers->lock); 451 spin_lock_init(&workers->order_lock); 452 workers->max_workers = max; 453 workers->idle_thresh = 32; 454 workers->name = name; 455 workers->ordered = 0; 456 workers->atomic_start_pending = 0; 457 workers->atomic_worker_start = async_helper; 458 } 459 460 /* 461 * starts new worker threads. This does not enforce the max worker 462 * count in case you need to temporarily go past it. 463 */ 464 static int __btrfs_start_workers(struct btrfs_workers *workers, 465 int num_workers) 466 { 467 struct btrfs_worker_thread *worker; 468 int ret = 0; 469 int i; 470 471 for (i = 0; i < num_workers; i++) { 472 worker = kzalloc(sizeof(*worker), GFP_NOFS); 473 if (!worker) { 474 ret = -ENOMEM; 475 goto fail; 476 } 477 478 INIT_LIST_HEAD(&worker->pending); 479 INIT_LIST_HEAD(&worker->prio_pending); 480 INIT_LIST_HEAD(&worker->worker_list); 481 spin_lock_init(&worker->lock); 482 483 atomic_set(&worker->num_pending, 0); 484 atomic_set(&worker->refs, 1); 485 worker->workers = workers; 486 worker->task = kthread_run(worker_loop, worker, 487 "btrfs-%s-%d", workers->name, 488 workers->num_workers + i); 489 if (IS_ERR(worker->task)) { 490 ret = PTR_ERR(worker->task); 491 kfree(worker); 492 goto fail; 493 } 494 spin_lock_irq(&workers->lock); 495 list_add_tail(&worker->worker_list, &workers->idle_list); 496 worker->idle = 1; 497 workers->num_workers++; 498 workers->num_workers_starting--; 499 WARN_ON(workers->num_workers_starting < 0); 500 spin_unlock_irq(&workers->lock); 501 } 502 return 0; 503 fail: 504 btrfs_stop_workers(workers); 505 return ret; 506 } 507 508 int btrfs_start_workers(struct btrfs_workers *workers, int num_workers) 509 { 510 spin_lock_irq(&workers->lock); 511 workers->num_workers_starting += num_workers; 512 spin_unlock_irq(&workers->lock); 513 return __btrfs_start_workers(workers, num_workers); 514 } 515 516 /* 517 * run through the list and find a worker thread that doesn't have a lot 518 * to do right now. This can return null if we aren't yet at the thread 519 * count limit and all of the threads are busy. 520 */ 521 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers) 522 { 523 struct btrfs_worker_thread *worker; 524 struct list_head *next; 525 int enforce_min; 526 527 enforce_min = (workers->num_workers + workers->num_workers_starting) < 528 workers->max_workers; 529 530 /* 531 * if we find an idle thread, don't move it to the end of the 532 * idle list. This improves the chance that the next submission 533 * will reuse the same thread, and maybe catch it while it is still 534 * working 535 */ 536 if (!list_empty(&workers->idle_list)) { 537 next = workers->idle_list.next; 538 worker = list_entry(next, struct btrfs_worker_thread, 539 worker_list); 540 return worker; 541 } 542 if (enforce_min || list_empty(&workers->worker_list)) 543 return NULL; 544 545 /* 546 * if we pick a busy task, move the task to the end of the list. 547 * hopefully this will keep things somewhat evenly balanced. 548 * Do the move in batches based on the sequence number. This groups 549 * requests submitted at roughly the same time onto the same worker. 550 */ 551 next = workers->worker_list.next; 552 worker = list_entry(next, struct btrfs_worker_thread, worker_list); 553 worker->sequence++; 554 555 if (worker->sequence % workers->idle_thresh == 0) 556 list_move_tail(next, &workers->worker_list); 557 return worker; 558 } 559 560 /* 561 * selects a worker thread to take the next job. This will either find 562 * an idle worker, start a new worker up to the max count, or just return 563 * one of the existing busy workers. 564 */ 565 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) 566 { 567 struct btrfs_worker_thread *worker; 568 unsigned long flags; 569 struct list_head *fallback; 570 571 again: 572 spin_lock_irqsave(&workers->lock, flags); 573 worker = next_worker(workers); 574 575 if (!worker) { 576 if (workers->num_workers + workers->num_workers_starting >= 577 workers->max_workers) { 578 goto fallback; 579 } else if (workers->atomic_worker_start) { 580 workers->atomic_start_pending = 1; 581 goto fallback; 582 } else { 583 workers->num_workers_starting++; 584 spin_unlock_irqrestore(&workers->lock, flags); 585 /* we're below the limit, start another worker */ 586 __btrfs_start_workers(workers, 1); 587 goto again; 588 } 589 } 590 goto found; 591 592 fallback: 593 fallback = NULL; 594 /* 595 * we have failed to find any workers, just 596 * return the first one we can find. 597 */ 598 if (!list_empty(&workers->worker_list)) 599 fallback = workers->worker_list.next; 600 if (!list_empty(&workers->idle_list)) 601 fallback = workers->idle_list.next; 602 BUG_ON(!fallback); 603 worker = list_entry(fallback, 604 struct btrfs_worker_thread, worker_list); 605 found: 606 /* 607 * this makes sure the worker doesn't exit before it is placed 608 * onto a busy/idle list 609 */ 610 atomic_inc(&worker->num_pending); 611 spin_unlock_irqrestore(&workers->lock, flags); 612 return worker; 613 } 614 615 /* 616 * btrfs_requeue_work just puts the work item back on the tail of the list 617 * it was taken from. It is intended for use with long running work functions 618 * that make some progress and want to give the cpu up for others. 619 */ 620 int btrfs_requeue_work(struct btrfs_work *work) 621 { 622 struct btrfs_worker_thread *worker = work->worker; 623 unsigned long flags; 624 int wake = 0; 625 626 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 627 goto out; 628 629 spin_lock_irqsave(&worker->lock, flags); 630 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 631 list_add_tail(&work->list, &worker->prio_pending); 632 else 633 list_add_tail(&work->list, &worker->pending); 634 atomic_inc(&worker->num_pending); 635 636 /* by definition we're busy, take ourselves off the idle 637 * list 638 */ 639 if (worker->idle) { 640 spin_lock(&worker->workers->lock); 641 worker->idle = 0; 642 list_move_tail(&worker->worker_list, 643 &worker->workers->worker_list); 644 spin_unlock(&worker->workers->lock); 645 } 646 if (!worker->working) { 647 wake = 1; 648 worker->working = 1; 649 } 650 651 if (wake) 652 wake_up_process(worker->task); 653 spin_unlock_irqrestore(&worker->lock, flags); 654 out: 655 656 return 0; 657 } 658 659 void btrfs_set_work_high_prio(struct btrfs_work *work) 660 { 661 set_bit(WORK_HIGH_PRIO_BIT, &work->flags); 662 } 663 664 /* 665 * places a struct btrfs_work into the pending queue of one of the kthreads 666 */ 667 int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) 668 { 669 struct btrfs_worker_thread *worker; 670 unsigned long flags; 671 int wake = 0; 672 673 /* don't requeue something already on a list */ 674 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 675 goto out; 676 677 worker = find_worker(workers); 678 if (workers->ordered) { 679 /* 680 * you're not allowed to do ordered queues from an 681 * interrupt handler 682 */ 683 spin_lock(&workers->order_lock); 684 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) { 685 list_add_tail(&work->order_list, 686 &workers->prio_order_list); 687 } else { 688 list_add_tail(&work->order_list, &workers->order_list); 689 } 690 spin_unlock(&workers->order_lock); 691 } else { 692 INIT_LIST_HEAD(&work->order_list); 693 } 694 695 spin_lock_irqsave(&worker->lock, flags); 696 697 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 698 list_add_tail(&work->list, &worker->prio_pending); 699 else 700 list_add_tail(&work->list, &worker->pending); 701 check_busy_worker(worker); 702 703 /* 704 * avoid calling into wake_up_process if this thread has already 705 * been kicked 706 */ 707 if (!worker->working) 708 wake = 1; 709 worker->working = 1; 710 711 if (wake) 712 wake_up_process(worker->task); 713 spin_unlock_irqrestore(&worker->lock, flags); 714 715 out: 716 return 0; 717 } 718