1 /* 2 * Copyright (C) 2007 Oracle. All rights reserved. 3 * 4 * This program is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU General Public 6 * License v2 as published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 11 * General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public 14 * License along with this program; if not, write to the 15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 16 * Boston, MA 021110-1307, USA. 17 */ 18 19 #include <linux/kthread.h> 20 #include <linux/slab.h> 21 #include <linux/list.h> 22 #include <linux/spinlock.h> 23 #include <linux/freezer.h> 24 #include "async-thread.h" 25 26 #define WORK_QUEUED_BIT 0 27 #define WORK_DONE_BIT 1 28 #define WORK_ORDER_DONE_BIT 2 29 #define WORK_HIGH_PRIO_BIT 3 30 31 /* 32 * container for the kthread task pointer and the list of pending work 33 * One of these is allocated per thread. 34 */ 35 struct btrfs_worker_thread { 36 /* pool we belong to */ 37 struct btrfs_workers *workers; 38 39 /* list of struct btrfs_work that are waiting for service */ 40 struct list_head pending; 41 struct list_head prio_pending; 42 43 /* list of worker threads from struct btrfs_workers */ 44 struct list_head worker_list; 45 46 /* kthread */ 47 struct task_struct *task; 48 49 /* number of things on the pending list */ 50 atomic_t num_pending; 51 52 /* reference counter for this struct */ 53 atomic_t refs; 54 55 unsigned long sequence; 56 57 /* protects the pending list. */ 58 spinlock_t lock; 59 60 /* set to non-zero when this thread is already awake and kicking */ 61 int working; 62 63 /* are we currently idle */ 64 int idle; 65 }; 66 67 static int __btrfs_start_workers(struct btrfs_workers *workers); 68 69 /* 70 * btrfs_start_workers uses kthread_run, which can block waiting for memory 71 * for a very long time. It will actually throttle on page writeback, 72 * and so it may not make progress until after our btrfs worker threads 73 * process all of the pending work structs in their queue 74 * 75 * This means we can't use btrfs_start_workers from inside a btrfs worker 76 * thread that is used as part of cleaning dirty memory, which pretty much 77 * involves all of the worker threads. 78 * 79 * Instead we have a helper queue who never has more than one thread 80 * where we scheduler thread start operations. This worker_start struct 81 * is used to contain the work and hold a pointer to the queue that needs 82 * another worker. 83 */ 84 struct worker_start { 85 struct btrfs_work work; 86 struct btrfs_workers *queue; 87 }; 88 89 static void start_new_worker_func(struct btrfs_work *work) 90 { 91 struct worker_start *start; 92 start = container_of(work, struct worker_start, work); 93 __btrfs_start_workers(start->queue); 94 kfree(start); 95 } 96 97 /* 98 * helper function to move a thread onto the idle list after it 99 * has finished some requests. 100 */ 101 static void check_idle_worker(struct btrfs_worker_thread *worker) 102 { 103 if (!worker->idle && atomic_read(&worker->num_pending) < 104 worker->workers->idle_thresh / 2) { 105 unsigned long flags; 106 spin_lock_irqsave(&worker->workers->lock, flags); 107 worker->idle = 1; 108 109 /* the list may be empty if the worker is just starting */ 110 if (!list_empty(&worker->worker_list)) { 111 list_move(&worker->worker_list, 112 &worker->workers->idle_list); 113 } 114 spin_unlock_irqrestore(&worker->workers->lock, flags); 115 } 116 } 117 118 /* 119 * helper function to move a thread off the idle list after new 120 * pending work is added. 121 */ 122 static void check_busy_worker(struct btrfs_worker_thread *worker) 123 { 124 if (worker->idle && atomic_read(&worker->num_pending) >= 125 worker->workers->idle_thresh) { 126 unsigned long flags; 127 spin_lock_irqsave(&worker->workers->lock, flags); 128 worker->idle = 0; 129 130 if (!list_empty(&worker->worker_list)) { 131 list_move_tail(&worker->worker_list, 132 &worker->workers->worker_list); 133 } 134 spin_unlock_irqrestore(&worker->workers->lock, flags); 135 } 136 } 137 138 static void check_pending_worker_creates(struct btrfs_worker_thread *worker) 139 { 140 struct btrfs_workers *workers = worker->workers; 141 struct worker_start *start; 142 unsigned long flags; 143 144 rmb(); 145 if (!workers->atomic_start_pending) 146 return; 147 148 start = kzalloc(sizeof(*start), GFP_NOFS); 149 if (!start) 150 return; 151 152 start->work.func = start_new_worker_func; 153 start->queue = workers; 154 155 spin_lock_irqsave(&workers->lock, flags); 156 if (!workers->atomic_start_pending) 157 goto out; 158 159 workers->atomic_start_pending = 0; 160 if (workers->num_workers + workers->num_workers_starting >= 161 workers->max_workers) 162 goto out; 163 164 workers->num_workers_starting += 1; 165 spin_unlock_irqrestore(&workers->lock, flags); 166 btrfs_queue_worker(workers->atomic_worker_start, &start->work); 167 return; 168 169 out: 170 kfree(start); 171 spin_unlock_irqrestore(&workers->lock, flags); 172 } 173 174 static noinline int run_ordered_completions(struct btrfs_workers *workers, 175 struct btrfs_work *work) 176 { 177 if (!workers->ordered) 178 return 0; 179 180 set_bit(WORK_DONE_BIT, &work->flags); 181 182 spin_lock(&workers->order_lock); 183 184 while (1) { 185 if (!list_empty(&workers->prio_order_list)) { 186 work = list_entry(workers->prio_order_list.next, 187 struct btrfs_work, order_list); 188 } else if (!list_empty(&workers->order_list)) { 189 work = list_entry(workers->order_list.next, 190 struct btrfs_work, order_list); 191 } else { 192 break; 193 } 194 if (!test_bit(WORK_DONE_BIT, &work->flags)) 195 break; 196 197 /* we are going to call the ordered done function, but 198 * we leave the work item on the list as a barrier so 199 * that later work items that are done don't have their 200 * functions called before this one returns 201 */ 202 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 203 break; 204 205 spin_unlock(&workers->order_lock); 206 207 work->ordered_func(work); 208 209 /* now take the lock again and call the freeing code */ 210 spin_lock(&workers->order_lock); 211 list_del(&work->order_list); 212 work->ordered_free(work); 213 } 214 215 spin_unlock(&workers->order_lock); 216 return 0; 217 } 218 219 static void put_worker(struct btrfs_worker_thread *worker) 220 { 221 if (atomic_dec_and_test(&worker->refs)) 222 kfree(worker); 223 } 224 225 static int try_worker_shutdown(struct btrfs_worker_thread *worker) 226 { 227 int freeit = 0; 228 229 spin_lock_irq(&worker->lock); 230 spin_lock(&worker->workers->lock); 231 if (worker->workers->num_workers > 1 && 232 worker->idle && 233 !worker->working && 234 !list_empty(&worker->worker_list) && 235 list_empty(&worker->prio_pending) && 236 list_empty(&worker->pending) && 237 atomic_read(&worker->num_pending) == 0) { 238 freeit = 1; 239 list_del_init(&worker->worker_list); 240 worker->workers->num_workers--; 241 } 242 spin_unlock(&worker->workers->lock); 243 spin_unlock_irq(&worker->lock); 244 245 if (freeit) 246 put_worker(worker); 247 return freeit; 248 } 249 250 static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker, 251 struct list_head *prio_head, 252 struct list_head *head) 253 { 254 struct btrfs_work *work = NULL; 255 struct list_head *cur = NULL; 256 257 if(!list_empty(prio_head)) 258 cur = prio_head->next; 259 260 smp_mb(); 261 if (!list_empty(&worker->prio_pending)) 262 goto refill; 263 264 if (!list_empty(head)) 265 cur = head->next; 266 267 if (cur) 268 goto out; 269 270 refill: 271 spin_lock_irq(&worker->lock); 272 list_splice_tail_init(&worker->prio_pending, prio_head); 273 list_splice_tail_init(&worker->pending, head); 274 275 if (!list_empty(prio_head)) 276 cur = prio_head->next; 277 else if (!list_empty(head)) 278 cur = head->next; 279 spin_unlock_irq(&worker->lock); 280 281 if (!cur) 282 goto out_fail; 283 284 out: 285 work = list_entry(cur, struct btrfs_work, list); 286 287 out_fail: 288 return work; 289 } 290 291 /* 292 * main loop for servicing work items 293 */ 294 static int worker_loop(void *arg) 295 { 296 struct btrfs_worker_thread *worker = arg; 297 struct list_head head; 298 struct list_head prio_head; 299 struct btrfs_work *work; 300 301 INIT_LIST_HEAD(&head); 302 INIT_LIST_HEAD(&prio_head); 303 304 do { 305 again: 306 while (1) { 307 308 309 work = get_next_work(worker, &prio_head, &head); 310 if (!work) 311 break; 312 313 list_del(&work->list); 314 clear_bit(WORK_QUEUED_BIT, &work->flags); 315 316 work->worker = worker; 317 318 work->func(work); 319 320 atomic_dec(&worker->num_pending); 321 /* 322 * unless this is an ordered work queue, 323 * 'work' was probably freed by func above. 324 */ 325 run_ordered_completions(worker->workers, work); 326 327 check_pending_worker_creates(worker); 328 cond_resched(); 329 } 330 331 spin_lock_irq(&worker->lock); 332 check_idle_worker(worker); 333 334 if (freezing(current)) { 335 worker->working = 0; 336 spin_unlock_irq(&worker->lock); 337 try_to_freeze(); 338 } else { 339 spin_unlock_irq(&worker->lock); 340 if (!kthread_should_stop()) { 341 cpu_relax(); 342 /* 343 * we've dropped the lock, did someone else 344 * jump_in? 345 */ 346 smp_mb(); 347 if (!list_empty(&worker->pending) || 348 !list_empty(&worker->prio_pending)) 349 continue; 350 351 /* 352 * this short schedule allows more work to 353 * come in without the queue functions 354 * needing to go through wake_up_process() 355 * 356 * worker->working is still 1, so nobody 357 * is going to try and wake us up 358 */ 359 schedule_timeout(1); 360 smp_mb(); 361 if (!list_empty(&worker->pending) || 362 !list_empty(&worker->prio_pending)) 363 continue; 364 365 if (kthread_should_stop()) 366 break; 367 368 /* still no more work?, sleep for real */ 369 spin_lock_irq(&worker->lock); 370 set_current_state(TASK_INTERRUPTIBLE); 371 if (!list_empty(&worker->pending) || 372 !list_empty(&worker->prio_pending)) { 373 spin_unlock_irq(&worker->lock); 374 set_current_state(TASK_RUNNING); 375 goto again; 376 } 377 378 /* 379 * this makes sure we get a wakeup when someone 380 * adds something new to the queue 381 */ 382 worker->working = 0; 383 spin_unlock_irq(&worker->lock); 384 385 if (!kthread_should_stop()) { 386 schedule_timeout(HZ * 120); 387 if (!worker->working && 388 try_worker_shutdown(worker)) { 389 return 0; 390 } 391 } 392 } 393 __set_current_state(TASK_RUNNING); 394 } 395 } while (!kthread_should_stop()); 396 return 0; 397 } 398 399 /* 400 * this will wait for all the worker threads to shutdown 401 */ 402 int btrfs_stop_workers(struct btrfs_workers *workers) 403 { 404 struct list_head *cur; 405 struct btrfs_worker_thread *worker; 406 int can_stop; 407 408 spin_lock_irq(&workers->lock); 409 list_splice_init(&workers->idle_list, &workers->worker_list); 410 while (!list_empty(&workers->worker_list)) { 411 cur = workers->worker_list.next; 412 worker = list_entry(cur, struct btrfs_worker_thread, 413 worker_list); 414 415 atomic_inc(&worker->refs); 416 workers->num_workers -= 1; 417 if (!list_empty(&worker->worker_list)) { 418 list_del_init(&worker->worker_list); 419 put_worker(worker); 420 can_stop = 1; 421 } else 422 can_stop = 0; 423 spin_unlock_irq(&workers->lock); 424 if (can_stop) 425 kthread_stop(worker->task); 426 spin_lock_irq(&workers->lock); 427 put_worker(worker); 428 } 429 spin_unlock_irq(&workers->lock); 430 return 0; 431 } 432 433 /* 434 * simple init on struct btrfs_workers 435 */ 436 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max, 437 struct btrfs_workers *async_helper) 438 { 439 workers->num_workers = 0; 440 workers->num_workers_starting = 0; 441 INIT_LIST_HEAD(&workers->worker_list); 442 INIT_LIST_HEAD(&workers->idle_list); 443 INIT_LIST_HEAD(&workers->order_list); 444 INIT_LIST_HEAD(&workers->prio_order_list); 445 spin_lock_init(&workers->lock); 446 spin_lock_init(&workers->order_lock); 447 workers->max_workers = max; 448 workers->idle_thresh = 32; 449 workers->name = name; 450 workers->ordered = 0; 451 workers->atomic_start_pending = 0; 452 workers->atomic_worker_start = async_helper; 453 } 454 455 /* 456 * starts new worker threads. This does not enforce the max worker 457 * count in case you need to temporarily go past it. 458 */ 459 static int __btrfs_start_workers(struct btrfs_workers *workers) 460 { 461 struct btrfs_worker_thread *worker; 462 int ret = 0; 463 464 worker = kzalloc(sizeof(*worker), GFP_NOFS); 465 if (!worker) { 466 ret = -ENOMEM; 467 goto fail; 468 } 469 470 INIT_LIST_HEAD(&worker->pending); 471 INIT_LIST_HEAD(&worker->prio_pending); 472 INIT_LIST_HEAD(&worker->worker_list); 473 spin_lock_init(&worker->lock); 474 475 atomic_set(&worker->num_pending, 0); 476 atomic_set(&worker->refs, 1); 477 worker->workers = workers; 478 worker->task = kthread_run(worker_loop, worker, 479 "btrfs-%s-%d", workers->name, 480 workers->num_workers + 1); 481 if (IS_ERR(worker->task)) { 482 ret = PTR_ERR(worker->task); 483 kfree(worker); 484 goto fail; 485 } 486 spin_lock_irq(&workers->lock); 487 list_add_tail(&worker->worker_list, &workers->idle_list); 488 worker->idle = 1; 489 workers->num_workers++; 490 workers->num_workers_starting--; 491 WARN_ON(workers->num_workers_starting < 0); 492 spin_unlock_irq(&workers->lock); 493 494 return 0; 495 fail: 496 spin_lock_irq(&workers->lock); 497 workers->num_workers_starting--; 498 spin_unlock_irq(&workers->lock); 499 return ret; 500 } 501 502 int btrfs_start_workers(struct btrfs_workers *workers) 503 { 504 spin_lock_irq(&workers->lock); 505 workers->num_workers_starting++; 506 spin_unlock_irq(&workers->lock); 507 return __btrfs_start_workers(workers); 508 } 509 510 /* 511 * run through the list and find a worker thread that doesn't have a lot 512 * to do right now. This can return null if we aren't yet at the thread 513 * count limit and all of the threads are busy. 514 */ 515 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers) 516 { 517 struct btrfs_worker_thread *worker; 518 struct list_head *next; 519 int enforce_min; 520 521 enforce_min = (workers->num_workers + workers->num_workers_starting) < 522 workers->max_workers; 523 524 /* 525 * if we find an idle thread, don't move it to the end of the 526 * idle list. This improves the chance that the next submission 527 * will reuse the same thread, and maybe catch it while it is still 528 * working 529 */ 530 if (!list_empty(&workers->idle_list)) { 531 next = workers->idle_list.next; 532 worker = list_entry(next, struct btrfs_worker_thread, 533 worker_list); 534 return worker; 535 } 536 if (enforce_min || list_empty(&workers->worker_list)) 537 return NULL; 538 539 /* 540 * if we pick a busy task, move the task to the end of the list. 541 * hopefully this will keep things somewhat evenly balanced. 542 * Do the move in batches based on the sequence number. This groups 543 * requests submitted at roughly the same time onto the same worker. 544 */ 545 next = workers->worker_list.next; 546 worker = list_entry(next, struct btrfs_worker_thread, worker_list); 547 worker->sequence++; 548 549 if (worker->sequence % workers->idle_thresh == 0) 550 list_move_tail(next, &workers->worker_list); 551 return worker; 552 } 553 554 /* 555 * selects a worker thread to take the next job. This will either find 556 * an idle worker, start a new worker up to the max count, or just return 557 * one of the existing busy workers. 558 */ 559 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) 560 { 561 struct btrfs_worker_thread *worker; 562 unsigned long flags; 563 struct list_head *fallback; 564 int ret; 565 566 spin_lock_irqsave(&workers->lock, flags); 567 again: 568 worker = next_worker(workers); 569 570 if (!worker) { 571 if (workers->num_workers + workers->num_workers_starting >= 572 workers->max_workers) { 573 goto fallback; 574 } else if (workers->atomic_worker_start) { 575 workers->atomic_start_pending = 1; 576 goto fallback; 577 } else { 578 workers->num_workers_starting++; 579 spin_unlock_irqrestore(&workers->lock, flags); 580 /* we're below the limit, start another worker */ 581 ret = __btrfs_start_workers(workers); 582 spin_lock_irqsave(&workers->lock, flags); 583 if (ret) 584 goto fallback; 585 goto again; 586 } 587 } 588 goto found; 589 590 fallback: 591 fallback = NULL; 592 /* 593 * we have failed to find any workers, just 594 * return the first one we can find. 595 */ 596 if (!list_empty(&workers->worker_list)) 597 fallback = workers->worker_list.next; 598 if (!list_empty(&workers->idle_list)) 599 fallback = workers->idle_list.next; 600 BUG_ON(!fallback); 601 worker = list_entry(fallback, 602 struct btrfs_worker_thread, worker_list); 603 found: 604 /* 605 * this makes sure the worker doesn't exit before it is placed 606 * onto a busy/idle list 607 */ 608 atomic_inc(&worker->num_pending); 609 spin_unlock_irqrestore(&workers->lock, flags); 610 return worker; 611 } 612 613 /* 614 * btrfs_requeue_work just puts the work item back on the tail of the list 615 * it was taken from. It is intended for use with long running work functions 616 * that make some progress and want to give the cpu up for others. 617 */ 618 int btrfs_requeue_work(struct btrfs_work *work) 619 { 620 struct btrfs_worker_thread *worker = work->worker; 621 unsigned long flags; 622 int wake = 0; 623 624 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 625 goto out; 626 627 spin_lock_irqsave(&worker->lock, flags); 628 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 629 list_add_tail(&work->list, &worker->prio_pending); 630 else 631 list_add_tail(&work->list, &worker->pending); 632 atomic_inc(&worker->num_pending); 633 634 /* by definition we're busy, take ourselves off the idle 635 * list 636 */ 637 if (worker->idle) { 638 spin_lock(&worker->workers->lock); 639 worker->idle = 0; 640 list_move_tail(&worker->worker_list, 641 &worker->workers->worker_list); 642 spin_unlock(&worker->workers->lock); 643 } 644 if (!worker->working) { 645 wake = 1; 646 worker->working = 1; 647 } 648 649 if (wake) 650 wake_up_process(worker->task); 651 spin_unlock_irqrestore(&worker->lock, flags); 652 out: 653 654 return 0; 655 } 656 657 void btrfs_set_work_high_prio(struct btrfs_work *work) 658 { 659 set_bit(WORK_HIGH_PRIO_BIT, &work->flags); 660 } 661 662 /* 663 * places a struct btrfs_work into the pending queue of one of the kthreads 664 */ 665 void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) 666 { 667 struct btrfs_worker_thread *worker; 668 unsigned long flags; 669 int wake = 0; 670 671 /* don't requeue something already on a list */ 672 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 673 return; 674 675 worker = find_worker(workers); 676 if (workers->ordered) { 677 /* 678 * you're not allowed to do ordered queues from an 679 * interrupt handler 680 */ 681 spin_lock(&workers->order_lock); 682 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) { 683 list_add_tail(&work->order_list, 684 &workers->prio_order_list); 685 } else { 686 list_add_tail(&work->order_list, &workers->order_list); 687 } 688 spin_unlock(&workers->order_lock); 689 } else { 690 INIT_LIST_HEAD(&work->order_list); 691 } 692 693 spin_lock_irqsave(&worker->lock, flags); 694 695 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 696 list_add_tail(&work->list, &worker->prio_pending); 697 else 698 list_add_tail(&work->list, &worker->pending); 699 check_busy_worker(worker); 700 701 /* 702 * avoid calling into wake_up_process if this thread has already 703 * been kicked 704 */ 705 if (!worker->working) 706 wake = 1; 707 worker->working = 1; 708 709 if (wake) 710 wake_up_process(worker->task); 711 spin_unlock_irqrestore(&worker->lock, flags); 712 } 713