1 /* 2 * kernel/workqueue.c - generic async execution with shared worker pool 3 * 4 * Copyright (C) 2002 Ingo Molnar 5 * 6 * Derived from the taskqueue/keventd code by: 7 * David Woodhouse <dwmw2@infradead.org> 8 * Andrew Morton 9 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 10 * Theodore Ts'o <tytso@mit.edu> 11 * 12 * Made to use alloc_percpu by Christoph Lameter. 13 * 14 * Copyright (C) 2010 SUSE Linux Products GmbH 15 * Copyright (C) 2010 Tejun Heo <tj@kernel.org> 16 * 17 * This is the generic async execution mechanism. Work items as are 18 * executed in process context. The worker pool is shared and 19 * automatically managed. There is one worker pool for each CPU and 20 * one extra for works which are better served by workers which are 21 * not bound to any specific CPU. 22 * 23 * Please read Documentation/workqueue.txt for details. 24 */ 25 26 #include <linux/export.h> 27 #include <linux/kernel.h> 28 #include <linux/sched.h> 29 #include <linux/init.h> 30 #include <linux/signal.h> 31 #include <linux/completion.h> 32 #include <linux/workqueue.h> 33 #include <linux/slab.h> 34 #include <linux/cpu.h> 35 #include <linux/notifier.h> 36 #include <linux/kthread.h> 37 #include <linux/hardirq.h> 38 #include <linux/mempolicy.h> 39 #include <linux/freezer.h> 40 #include <linux/kallsyms.h> 41 #include <linux/debug_locks.h> 42 #include <linux/lockdep.h> 43 #include <linux/idr.h> 44 45 #include "workqueue_sched.h" 46 47 enum { 48 /* 49 * global_cwq flags 50 * 51 * A bound gcwq is either associated or disassociated with its CPU. 52 * While associated (!DISASSOCIATED), all workers are bound to the 53 * CPU and none has %WORKER_UNBOUND set and concurrency management 54 * is in effect. 55 * 56 * While DISASSOCIATED, the cpu may be offline and all workers have 57 * %WORKER_UNBOUND set and concurrency management disabled, and may 58 * be executing on any CPU. The gcwq behaves as an unbound one. 59 * 60 * Note that DISASSOCIATED can be flipped only while holding 61 * managership of all pools on the gcwq to avoid changing binding 62 * state while create_worker() is in progress. 63 */ 64 GCWQ_DISASSOCIATED = 1 << 0, /* cpu can't serve workers */ 65 GCWQ_FREEZING = 1 << 1, /* freeze in progress */ 66 67 /* pool flags */ 68 POOL_MANAGE_WORKERS = 1 << 0, /* need to manage workers */ 69 POOL_MANAGING_WORKERS = 1 << 1, /* managing workers */ 70 71 /* worker flags */ 72 WORKER_STARTED = 1 << 0, /* started */ 73 WORKER_DIE = 1 << 1, /* die die die */ 74 WORKER_IDLE = 1 << 2, /* is idle */ 75 WORKER_PREP = 1 << 3, /* preparing to run works */ 76 WORKER_REBIND = 1 << 5, /* mom is home, come back */ 77 WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */ 78 WORKER_UNBOUND = 1 << 7, /* worker is unbound */ 79 80 WORKER_NOT_RUNNING = WORKER_PREP | WORKER_REBIND | WORKER_UNBOUND | 81 WORKER_CPU_INTENSIVE, 82 83 NR_WORKER_POOLS = 2, /* # worker pools per gcwq */ 84 85 BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */ 86 BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER, 87 BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1, 88 89 MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */ 90 IDLE_WORKER_TIMEOUT = 300 * HZ, /* keep idle ones for 5 mins */ 91 92 MAYDAY_INITIAL_TIMEOUT = HZ / 100 >= 2 ? HZ / 100 : 2, 93 /* call for help after 10ms 94 (min two ticks) */ 95 MAYDAY_INTERVAL = HZ / 10, /* and then every 100ms */ 96 CREATE_COOLDOWN = HZ, /* time to breath after fail */ 97 98 /* 99 * Rescue workers are used only on emergencies and shared by 100 * all cpus. Give -20. 101 */ 102 RESCUER_NICE_LEVEL = -20, 103 HIGHPRI_NICE_LEVEL = -20, 104 }; 105 106 /* 107 * Structure fields follow one of the following exclusion rules. 108 * 109 * I: Modifiable by initialization/destruction paths and read-only for 110 * everyone else. 111 * 112 * P: Preemption protected. Disabling preemption is enough and should 113 * only be modified and accessed from the local cpu. 114 * 115 * L: gcwq->lock protected. Access with gcwq->lock held. 116 * 117 * X: During normal operation, modification requires gcwq->lock and 118 * should be done only from local cpu. Either disabling preemption 119 * on local cpu or grabbing gcwq->lock is enough for read access. 120 * If GCWQ_DISASSOCIATED is set, it's identical to L. 121 * 122 * F: wq->flush_mutex protected. 123 * 124 * W: workqueue_lock protected. 125 */ 126 127 struct global_cwq; 128 struct worker_pool; 129 struct idle_rebind; 130 131 /* 132 * The poor guys doing the actual heavy lifting. All on-duty workers 133 * are either serving the manager role, on idle list or on busy hash. 134 */ 135 struct worker { 136 /* on idle list while idle, on busy hash table while busy */ 137 union { 138 struct list_head entry; /* L: while idle */ 139 struct hlist_node hentry; /* L: while busy */ 140 }; 141 142 struct work_struct *current_work; /* L: work being processed */ 143 struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq */ 144 struct list_head scheduled; /* L: scheduled works */ 145 struct task_struct *task; /* I: worker task */ 146 struct worker_pool *pool; /* I: the associated pool */ 147 /* 64 bytes boundary on 64bit, 32 on 32bit */ 148 unsigned long last_active; /* L: last active timestamp */ 149 unsigned int flags; /* X: flags */ 150 int id; /* I: worker id */ 151 152 /* for rebinding worker to CPU */ 153 struct idle_rebind *idle_rebind; /* L: for idle worker */ 154 struct work_struct rebind_work; /* L: for busy worker */ 155 }; 156 157 struct worker_pool { 158 struct global_cwq *gcwq; /* I: the owning gcwq */ 159 unsigned int flags; /* X: flags */ 160 161 struct list_head worklist; /* L: list of pending works */ 162 int nr_workers; /* L: total number of workers */ 163 int nr_idle; /* L: currently idle ones */ 164 165 struct list_head idle_list; /* X: list of idle workers */ 166 struct timer_list idle_timer; /* L: worker idle timeout */ 167 struct timer_list mayday_timer; /* L: SOS timer for workers */ 168 169 struct mutex manager_mutex; /* mutex manager should hold */ 170 struct ida worker_ida; /* L: for worker IDs */ 171 }; 172 173 /* 174 * Global per-cpu workqueue. There's one and only one for each cpu 175 * and all works are queued and processed here regardless of their 176 * target workqueues. 177 */ 178 struct global_cwq { 179 spinlock_t lock; /* the gcwq lock */ 180 unsigned int cpu; /* I: the associated cpu */ 181 unsigned int flags; /* L: GCWQ_* flags */ 182 183 /* workers are chained either in busy_hash or pool idle_list */ 184 struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE]; 185 /* L: hash of busy workers */ 186 187 struct worker_pool pools[2]; /* normal and highpri pools */ 188 189 wait_queue_head_t rebind_hold; /* rebind hold wait */ 190 } ____cacheline_aligned_in_smp; 191 192 /* 193 * The per-CPU workqueue. The lower WORK_STRUCT_FLAG_BITS of 194 * work_struct->data are used for flags and thus cwqs need to be 195 * aligned at two's power of the number of flag bits. 196 */ 197 struct cpu_workqueue_struct { 198 struct worker_pool *pool; /* I: the associated pool */ 199 struct workqueue_struct *wq; /* I: the owning workqueue */ 200 int work_color; /* L: current color */ 201 int flush_color; /* L: flushing color */ 202 int nr_in_flight[WORK_NR_COLORS]; 203 /* L: nr of in_flight works */ 204 int nr_active; /* L: nr of active works */ 205 int max_active; /* L: max active works */ 206 struct list_head delayed_works; /* L: delayed works */ 207 }; 208 209 /* 210 * Structure used to wait for workqueue flush. 211 */ 212 struct wq_flusher { 213 struct list_head list; /* F: list of flushers */ 214 int flush_color; /* F: flush color waiting for */ 215 struct completion done; /* flush completion */ 216 }; 217 218 /* 219 * All cpumasks are assumed to be always set on UP and thus can't be 220 * used to determine whether there's something to be done. 221 */ 222 #ifdef CONFIG_SMP 223 typedef cpumask_var_t mayday_mask_t; 224 #define mayday_test_and_set_cpu(cpu, mask) \ 225 cpumask_test_and_set_cpu((cpu), (mask)) 226 #define mayday_clear_cpu(cpu, mask) cpumask_clear_cpu((cpu), (mask)) 227 #define for_each_mayday_cpu(cpu, mask) for_each_cpu((cpu), (mask)) 228 #define alloc_mayday_mask(maskp, gfp) zalloc_cpumask_var((maskp), (gfp)) 229 #define free_mayday_mask(mask) free_cpumask_var((mask)) 230 #else 231 typedef unsigned long mayday_mask_t; 232 #define mayday_test_and_set_cpu(cpu, mask) test_and_set_bit(0, &(mask)) 233 #define mayday_clear_cpu(cpu, mask) clear_bit(0, &(mask)) 234 #define for_each_mayday_cpu(cpu, mask) if ((cpu) = 0, (mask)) 235 #define alloc_mayday_mask(maskp, gfp) true 236 #define free_mayday_mask(mask) do { } while (0) 237 #endif 238 239 /* 240 * The externally visible workqueue abstraction is an array of 241 * per-CPU workqueues: 242 */ 243 struct workqueue_struct { 244 unsigned int flags; /* W: WQ_* flags */ 245 union { 246 struct cpu_workqueue_struct __percpu *pcpu; 247 struct cpu_workqueue_struct *single; 248 unsigned long v; 249 } cpu_wq; /* I: cwq's */ 250 struct list_head list; /* W: list of all workqueues */ 251 252 struct mutex flush_mutex; /* protects wq flushing */ 253 int work_color; /* F: current work color */ 254 int flush_color; /* F: current flush color */ 255 atomic_t nr_cwqs_to_flush; /* flush in progress */ 256 struct wq_flusher *first_flusher; /* F: first flusher */ 257 struct list_head flusher_queue; /* F: flush waiters */ 258 struct list_head flusher_overflow; /* F: flush overflow list */ 259 260 mayday_mask_t mayday_mask; /* cpus requesting rescue */ 261 struct worker *rescuer; /* I: rescue worker */ 262 263 int nr_drainers; /* W: drain in progress */ 264 int saved_max_active; /* W: saved cwq max_active */ 265 #ifdef CONFIG_LOCKDEP 266 struct lockdep_map lockdep_map; 267 #endif 268 char name[]; /* I: workqueue name */ 269 }; 270 271 struct workqueue_struct *system_wq __read_mostly; 272 struct workqueue_struct *system_long_wq __read_mostly; 273 struct workqueue_struct *system_nrt_wq __read_mostly; 274 struct workqueue_struct *system_unbound_wq __read_mostly; 275 struct workqueue_struct *system_freezable_wq __read_mostly; 276 struct workqueue_struct *system_nrt_freezable_wq __read_mostly; 277 EXPORT_SYMBOL_GPL(system_wq); 278 EXPORT_SYMBOL_GPL(system_long_wq); 279 EXPORT_SYMBOL_GPL(system_nrt_wq); 280 EXPORT_SYMBOL_GPL(system_unbound_wq); 281 EXPORT_SYMBOL_GPL(system_freezable_wq); 282 EXPORT_SYMBOL_GPL(system_nrt_freezable_wq); 283 284 #define CREATE_TRACE_POINTS 285 #include <trace/events/workqueue.h> 286 287 #define for_each_worker_pool(pool, gcwq) \ 288 for ((pool) = &(gcwq)->pools[0]; \ 289 (pool) < &(gcwq)->pools[NR_WORKER_POOLS]; (pool)++) 290 291 #define for_each_busy_worker(worker, i, pos, gcwq) \ 292 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \ 293 hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry) 294 295 static inline int __next_gcwq_cpu(int cpu, const struct cpumask *mask, 296 unsigned int sw) 297 { 298 if (cpu < nr_cpu_ids) { 299 if (sw & 1) { 300 cpu = cpumask_next(cpu, mask); 301 if (cpu < nr_cpu_ids) 302 return cpu; 303 } 304 if (sw & 2) 305 return WORK_CPU_UNBOUND; 306 } 307 return WORK_CPU_NONE; 308 } 309 310 static inline int __next_wq_cpu(int cpu, const struct cpumask *mask, 311 struct workqueue_struct *wq) 312 { 313 return __next_gcwq_cpu(cpu, mask, !(wq->flags & WQ_UNBOUND) ? 1 : 2); 314 } 315 316 /* 317 * CPU iterators 318 * 319 * An extra gcwq is defined for an invalid cpu number 320 * (WORK_CPU_UNBOUND) to host workqueues which are not bound to any 321 * specific CPU. The following iterators are similar to 322 * for_each_*_cpu() iterators but also considers the unbound gcwq. 323 * 324 * for_each_gcwq_cpu() : possible CPUs + WORK_CPU_UNBOUND 325 * for_each_online_gcwq_cpu() : online CPUs + WORK_CPU_UNBOUND 326 * for_each_cwq_cpu() : possible CPUs for bound workqueues, 327 * WORK_CPU_UNBOUND for unbound workqueues 328 */ 329 #define for_each_gcwq_cpu(cpu) \ 330 for ((cpu) = __next_gcwq_cpu(-1, cpu_possible_mask, 3); \ 331 (cpu) < WORK_CPU_NONE; \ 332 (cpu) = __next_gcwq_cpu((cpu), cpu_possible_mask, 3)) 333 334 #define for_each_online_gcwq_cpu(cpu) \ 335 for ((cpu) = __next_gcwq_cpu(-1, cpu_online_mask, 3); \ 336 (cpu) < WORK_CPU_NONE; \ 337 (cpu) = __next_gcwq_cpu((cpu), cpu_online_mask, 3)) 338 339 #define for_each_cwq_cpu(cpu, wq) \ 340 for ((cpu) = __next_wq_cpu(-1, cpu_possible_mask, (wq)); \ 341 (cpu) < WORK_CPU_NONE; \ 342 (cpu) = __next_wq_cpu((cpu), cpu_possible_mask, (wq))) 343 344 #ifdef CONFIG_DEBUG_OBJECTS_WORK 345 346 static struct debug_obj_descr work_debug_descr; 347 348 static void *work_debug_hint(void *addr) 349 { 350 return ((struct work_struct *) addr)->func; 351 } 352 353 /* 354 * fixup_init is called when: 355 * - an active object is initialized 356 */ 357 static int work_fixup_init(void *addr, enum debug_obj_state state) 358 { 359 struct work_struct *work = addr; 360 361 switch (state) { 362 case ODEBUG_STATE_ACTIVE: 363 cancel_work_sync(work); 364 debug_object_init(work, &work_debug_descr); 365 return 1; 366 default: 367 return 0; 368 } 369 } 370 371 /* 372 * fixup_activate is called when: 373 * - an active object is activated 374 * - an unknown object is activated (might be a statically initialized object) 375 */ 376 static int work_fixup_activate(void *addr, enum debug_obj_state state) 377 { 378 struct work_struct *work = addr; 379 380 switch (state) { 381 382 case ODEBUG_STATE_NOTAVAILABLE: 383 /* 384 * This is not really a fixup. The work struct was 385 * statically initialized. We just make sure that it 386 * is tracked in the object tracker. 387 */ 388 if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) { 389 debug_object_init(work, &work_debug_descr); 390 debug_object_activate(work, &work_debug_descr); 391 return 0; 392 } 393 WARN_ON_ONCE(1); 394 return 0; 395 396 case ODEBUG_STATE_ACTIVE: 397 WARN_ON(1); 398 399 default: 400 return 0; 401 } 402 } 403 404 /* 405 * fixup_free is called when: 406 * - an active object is freed 407 */ 408 static int work_fixup_free(void *addr, enum debug_obj_state state) 409 { 410 struct work_struct *work = addr; 411 412 switch (state) { 413 case ODEBUG_STATE_ACTIVE: 414 cancel_work_sync(work); 415 debug_object_free(work, &work_debug_descr); 416 return 1; 417 default: 418 return 0; 419 } 420 } 421 422 static struct debug_obj_descr work_debug_descr = { 423 .name = "work_struct", 424 .debug_hint = work_debug_hint, 425 .fixup_init = work_fixup_init, 426 .fixup_activate = work_fixup_activate, 427 .fixup_free = work_fixup_free, 428 }; 429 430 static inline void debug_work_activate(struct work_struct *work) 431 { 432 debug_object_activate(work, &work_debug_descr); 433 } 434 435 static inline void debug_work_deactivate(struct work_struct *work) 436 { 437 debug_object_deactivate(work, &work_debug_descr); 438 } 439 440 void __init_work(struct work_struct *work, int onstack) 441 { 442 if (onstack) 443 debug_object_init_on_stack(work, &work_debug_descr); 444 else 445 debug_object_init(work, &work_debug_descr); 446 } 447 EXPORT_SYMBOL_GPL(__init_work); 448 449 void destroy_work_on_stack(struct work_struct *work) 450 { 451 debug_object_free(work, &work_debug_descr); 452 } 453 EXPORT_SYMBOL_GPL(destroy_work_on_stack); 454 455 #else 456 static inline void debug_work_activate(struct work_struct *work) { } 457 static inline void debug_work_deactivate(struct work_struct *work) { } 458 #endif 459 460 /* Serializes the accesses to the list of workqueues. */ 461 static DEFINE_SPINLOCK(workqueue_lock); 462 static LIST_HEAD(workqueues); 463 static bool workqueue_freezing; /* W: have wqs started freezing? */ 464 465 /* 466 * The almighty global cpu workqueues. nr_running is the only field 467 * which is expected to be used frequently by other cpus via 468 * try_to_wake_up(). Put it in a separate cacheline. 469 */ 470 static DEFINE_PER_CPU(struct global_cwq, global_cwq); 471 static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, pool_nr_running[NR_WORKER_POOLS]); 472 473 /* 474 * Global cpu workqueue and nr_running counter for unbound gcwq. The 475 * gcwq is always online, has GCWQ_DISASSOCIATED set, and all its 476 * workers have WORKER_UNBOUND set. 477 */ 478 static struct global_cwq unbound_global_cwq; 479 static atomic_t unbound_pool_nr_running[NR_WORKER_POOLS] = { 480 [0 ... NR_WORKER_POOLS - 1] = ATOMIC_INIT(0), /* always 0 */ 481 }; 482 483 static int worker_thread(void *__worker); 484 485 static int worker_pool_pri(struct worker_pool *pool) 486 { 487 return pool - pool->gcwq->pools; 488 } 489 490 static struct global_cwq *get_gcwq(unsigned int cpu) 491 { 492 if (cpu != WORK_CPU_UNBOUND) 493 return &per_cpu(global_cwq, cpu); 494 else 495 return &unbound_global_cwq; 496 } 497 498 static atomic_t *get_pool_nr_running(struct worker_pool *pool) 499 { 500 int cpu = pool->gcwq->cpu; 501 int idx = worker_pool_pri(pool); 502 503 if (cpu != WORK_CPU_UNBOUND) 504 return &per_cpu(pool_nr_running, cpu)[idx]; 505 else 506 return &unbound_pool_nr_running[idx]; 507 } 508 509 static struct cpu_workqueue_struct *get_cwq(unsigned int cpu, 510 struct workqueue_struct *wq) 511 { 512 if (!(wq->flags & WQ_UNBOUND)) { 513 if (likely(cpu < nr_cpu_ids)) 514 return per_cpu_ptr(wq->cpu_wq.pcpu, cpu); 515 } else if (likely(cpu == WORK_CPU_UNBOUND)) 516 return wq->cpu_wq.single; 517 return NULL; 518 } 519 520 static unsigned int work_color_to_flags(int color) 521 { 522 return color << WORK_STRUCT_COLOR_SHIFT; 523 } 524 525 static int get_work_color(struct work_struct *work) 526 { 527 return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) & 528 ((1 << WORK_STRUCT_COLOR_BITS) - 1); 529 } 530 531 static int work_next_color(int color) 532 { 533 return (color + 1) % WORK_NR_COLORS; 534 } 535 536 /* 537 * A work's data points to the cwq with WORK_STRUCT_CWQ set while the 538 * work is on queue. Once execution starts, WORK_STRUCT_CWQ is 539 * cleared and the work data contains the cpu number it was last on. 540 * 541 * set_work_{cwq|cpu}() and clear_work_data() can be used to set the 542 * cwq, cpu or clear work->data. These functions should only be 543 * called while the work is owned - ie. while the PENDING bit is set. 544 * 545 * get_work_[g]cwq() can be used to obtain the gcwq or cwq 546 * corresponding to a work. gcwq is available once the work has been 547 * queued anywhere after initialization. cwq is available only from 548 * queueing until execution starts. 549 */ 550 static inline void set_work_data(struct work_struct *work, unsigned long data, 551 unsigned long flags) 552 { 553 BUG_ON(!work_pending(work)); 554 atomic_long_set(&work->data, data | flags | work_static(work)); 555 } 556 557 static void set_work_cwq(struct work_struct *work, 558 struct cpu_workqueue_struct *cwq, 559 unsigned long extra_flags) 560 { 561 set_work_data(work, (unsigned long)cwq, 562 WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags); 563 } 564 565 static void set_work_cpu(struct work_struct *work, unsigned int cpu) 566 { 567 set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING); 568 } 569 570 static void clear_work_data(struct work_struct *work) 571 { 572 set_work_data(work, WORK_STRUCT_NO_CPU, 0); 573 } 574 575 static struct cpu_workqueue_struct *get_work_cwq(struct work_struct *work) 576 { 577 unsigned long data = atomic_long_read(&work->data); 578 579 if (data & WORK_STRUCT_CWQ) 580 return (void *)(data & WORK_STRUCT_WQ_DATA_MASK); 581 else 582 return NULL; 583 } 584 585 static struct global_cwq *get_work_gcwq(struct work_struct *work) 586 { 587 unsigned long data = atomic_long_read(&work->data); 588 unsigned int cpu; 589 590 if (data & WORK_STRUCT_CWQ) 591 return ((struct cpu_workqueue_struct *) 592 (data & WORK_STRUCT_WQ_DATA_MASK))->pool->gcwq; 593 594 cpu = data >> WORK_STRUCT_FLAG_BITS; 595 if (cpu == WORK_CPU_NONE) 596 return NULL; 597 598 BUG_ON(cpu >= nr_cpu_ids && cpu != WORK_CPU_UNBOUND); 599 return get_gcwq(cpu); 600 } 601 602 /* 603 * Policy functions. These define the policies on how the global worker 604 * pools are managed. Unless noted otherwise, these functions assume that 605 * they're being called with gcwq->lock held. 606 */ 607 608 static bool __need_more_worker(struct worker_pool *pool) 609 { 610 return !atomic_read(get_pool_nr_running(pool)); 611 } 612 613 /* 614 * Need to wake up a worker? Called from anything but currently 615 * running workers. 616 * 617 * Note that, because unbound workers never contribute to nr_running, this 618 * function will always return %true for unbound gcwq as long as the 619 * worklist isn't empty. 620 */ 621 static bool need_more_worker(struct worker_pool *pool) 622 { 623 return !list_empty(&pool->worklist) && __need_more_worker(pool); 624 } 625 626 /* Can I start working? Called from busy but !running workers. */ 627 static bool may_start_working(struct worker_pool *pool) 628 { 629 return pool->nr_idle; 630 } 631 632 /* Do I need to keep working? Called from currently running workers. */ 633 static bool keep_working(struct worker_pool *pool) 634 { 635 atomic_t *nr_running = get_pool_nr_running(pool); 636 637 return !list_empty(&pool->worklist) && atomic_read(nr_running) <= 1; 638 } 639 640 /* Do we need a new worker? Called from manager. */ 641 static bool need_to_create_worker(struct worker_pool *pool) 642 { 643 return need_more_worker(pool) && !may_start_working(pool); 644 } 645 646 /* Do I need to be the manager? */ 647 static bool need_to_manage_workers(struct worker_pool *pool) 648 { 649 return need_to_create_worker(pool) || 650 (pool->flags & POOL_MANAGE_WORKERS); 651 } 652 653 /* Do we have too many workers and should some go away? */ 654 static bool too_many_workers(struct worker_pool *pool) 655 { 656 bool managing = pool->flags & POOL_MANAGING_WORKERS; 657 int nr_idle = pool->nr_idle + managing; /* manager is considered idle */ 658 int nr_busy = pool->nr_workers - nr_idle; 659 660 return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy; 661 } 662 663 /* 664 * Wake up functions. 665 */ 666 667 /* Return the first worker. Safe with preemption disabled */ 668 static struct worker *first_worker(struct worker_pool *pool) 669 { 670 if (unlikely(list_empty(&pool->idle_list))) 671 return NULL; 672 673 return list_first_entry(&pool->idle_list, struct worker, entry); 674 } 675 676 /** 677 * wake_up_worker - wake up an idle worker 678 * @pool: worker pool to wake worker from 679 * 680 * Wake up the first idle worker of @pool. 681 * 682 * CONTEXT: 683 * spin_lock_irq(gcwq->lock). 684 */ 685 static void wake_up_worker(struct worker_pool *pool) 686 { 687 struct worker *worker = first_worker(pool); 688 689 if (likely(worker)) 690 wake_up_process(worker->task); 691 } 692 693 /** 694 * wq_worker_waking_up - a worker is waking up 695 * @task: task waking up 696 * @cpu: CPU @task is waking up to 697 * 698 * This function is called during try_to_wake_up() when a worker is 699 * being awoken. 700 * 701 * CONTEXT: 702 * spin_lock_irq(rq->lock) 703 */ 704 void wq_worker_waking_up(struct task_struct *task, unsigned int cpu) 705 { 706 struct worker *worker = kthread_data(task); 707 708 if (!(worker->flags & WORKER_NOT_RUNNING)) 709 atomic_inc(get_pool_nr_running(worker->pool)); 710 } 711 712 /** 713 * wq_worker_sleeping - a worker is going to sleep 714 * @task: task going to sleep 715 * @cpu: CPU in question, must be the current CPU number 716 * 717 * This function is called during schedule() when a busy worker is 718 * going to sleep. Worker on the same cpu can be woken up by 719 * returning pointer to its task. 720 * 721 * CONTEXT: 722 * spin_lock_irq(rq->lock) 723 * 724 * RETURNS: 725 * Worker task on @cpu to wake up, %NULL if none. 726 */ 727 struct task_struct *wq_worker_sleeping(struct task_struct *task, 728 unsigned int cpu) 729 { 730 struct worker *worker = kthread_data(task), *to_wakeup = NULL; 731 struct worker_pool *pool = worker->pool; 732 atomic_t *nr_running = get_pool_nr_running(pool); 733 734 if (worker->flags & WORKER_NOT_RUNNING) 735 return NULL; 736 737 /* this can only happen on the local cpu */ 738 BUG_ON(cpu != raw_smp_processor_id()); 739 740 /* 741 * The counterpart of the following dec_and_test, implied mb, 742 * worklist not empty test sequence is in insert_work(). 743 * Please read comment there. 744 * 745 * NOT_RUNNING is clear. This means that we're bound to and 746 * running on the local cpu w/ rq lock held and preemption 747 * disabled, which in turn means that none else could be 748 * manipulating idle_list, so dereferencing idle_list without gcwq 749 * lock is safe. 750 */ 751 if (atomic_dec_and_test(nr_running) && !list_empty(&pool->worklist)) 752 to_wakeup = first_worker(pool); 753 return to_wakeup ? to_wakeup->task : NULL; 754 } 755 756 /** 757 * worker_set_flags - set worker flags and adjust nr_running accordingly 758 * @worker: self 759 * @flags: flags to set 760 * @wakeup: wakeup an idle worker if necessary 761 * 762 * Set @flags in @worker->flags and adjust nr_running accordingly. If 763 * nr_running becomes zero and @wakeup is %true, an idle worker is 764 * woken up. 765 * 766 * CONTEXT: 767 * spin_lock_irq(gcwq->lock) 768 */ 769 static inline void worker_set_flags(struct worker *worker, unsigned int flags, 770 bool wakeup) 771 { 772 struct worker_pool *pool = worker->pool; 773 774 WARN_ON_ONCE(worker->task != current); 775 776 /* 777 * If transitioning into NOT_RUNNING, adjust nr_running and 778 * wake up an idle worker as necessary if requested by 779 * @wakeup. 780 */ 781 if ((flags & WORKER_NOT_RUNNING) && 782 !(worker->flags & WORKER_NOT_RUNNING)) { 783 atomic_t *nr_running = get_pool_nr_running(pool); 784 785 if (wakeup) { 786 if (atomic_dec_and_test(nr_running) && 787 !list_empty(&pool->worklist)) 788 wake_up_worker(pool); 789 } else 790 atomic_dec(nr_running); 791 } 792 793 worker->flags |= flags; 794 } 795 796 /** 797 * worker_clr_flags - clear worker flags and adjust nr_running accordingly 798 * @worker: self 799 * @flags: flags to clear 800 * 801 * Clear @flags in @worker->flags and adjust nr_running accordingly. 802 * 803 * CONTEXT: 804 * spin_lock_irq(gcwq->lock) 805 */ 806 static inline void worker_clr_flags(struct worker *worker, unsigned int flags) 807 { 808 struct worker_pool *pool = worker->pool; 809 unsigned int oflags = worker->flags; 810 811 WARN_ON_ONCE(worker->task != current); 812 813 worker->flags &= ~flags; 814 815 /* 816 * If transitioning out of NOT_RUNNING, increment nr_running. Note 817 * that the nested NOT_RUNNING is not a noop. NOT_RUNNING is mask 818 * of multiple flags, not a single flag. 819 */ 820 if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING)) 821 if (!(worker->flags & WORKER_NOT_RUNNING)) 822 atomic_inc(get_pool_nr_running(pool)); 823 } 824 825 /** 826 * busy_worker_head - return the busy hash head for a work 827 * @gcwq: gcwq of interest 828 * @work: work to be hashed 829 * 830 * Return hash head of @gcwq for @work. 831 * 832 * CONTEXT: 833 * spin_lock_irq(gcwq->lock). 834 * 835 * RETURNS: 836 * Pointer to the hash head. 837 */ 838 static struct hlist_head *busy_worker_head(struct global_cwq *gcwq, 839 struct work_struct *work) 840 { 841 const int base_shift = ilog2(sizeof(struct work_struct)); 842 unsigned long v = (unsigned long)work; 843 844 /* simple shift and fold hash, do we need something better? */ 845 v >>= base_shift; 846 v += v >> BUSY_WORKER_HASH_ORDER; 847 v &= BUSY_WORKER_HASH_MASK; 848 849 return &gcwq->busy_hash[v]; 850 } 851 852 /** 853 * __find_worker_executing_work - find worker which is executing a work 854 * @gcwq: gcwq of interest 855 * @bwh: hash head as returned by busy_worker_head() 856 * @work: work to find worker for 857 * 858 * Find a worker which is executing @work on @gcwq. @bwh should be 859 * the hash head obtained by calling busy_worker_head() with the same 860 * work. 861 * 862 * CONTEXT: 863 * spin_lock_irq(gcwq->lock). 864 * 865 * RETURNS: 866 * Pointer to worker which is executing @work if found, NULL 867 * otherwise. 868 */ 869 static struct worker *__find_worker_executing_work(struct global_cwq *gcwq, 870 struct hlist_head *bwh, 871 struct work_struct *work) 872 { 873 struct worker *worker; 874 struct hlist_node *tmp; 875 876 hlist_for_each_entry(worker, tmp, bwh, hentry) 877 if (worker->current_work == work) 878 return worker; 879 return NULL; 880 } 881 882 /** 883 * find_worker_executing_work - find worker which is executing a work 884 * @gcwq: gcwq of interest 885 * @work: work to find worker for 886 * 887 * Find a worker which is executing @work on @gcwq. This function is 888 * identical to __find_worker_executing_work() except that this 889 * function calculates @bwh itself. 890 * 891 * CONTEXT: 892 * spin_lock_irq(gcwq->lock). 893 * 894 * RETURNS: 895 * Pointer to worker which is executing @work if found, NULL 896 * otherwise. 897 */ 898 static struct worker *find_worker_executing_work(struct global_cwq *gcwq, 899 struct work_struct *work) 900 { 901 return __find_worker_executing_work(gcwq, busy_worker_head(gcwq, work), 902 work); 903 } 904 905 /** 906 * insert_work - insert a work into gcwq 907 * @cwq: cwq @work belongs to 908 * @work: work to insert 909 * @head: insertion point 910 * @extra_flags: extra WORK_STRUCT_* flags to set 911 * 912 * Insert @work which belongs to @cwq into @gcwq after @head. 913 * @extra_flags is or'd to work_struct flags. 914 * 915 * CONTEXT: 916 * spin_lock_irq(gcwq->lock). 917 */ 918 static void insert_work(struct cpu_workqueue_struct *cwq, 919 struct work_struct *work, struct list_head *head, 920 unsigned int extra_flags) 921 { 922 struct worker_pool *pool = cwq->pool; 923 924 /* we own @work, set data and link */ 925 set_work_cwq(work, cwq, extra_flags); 926 927 /* 928 * Ensure that we get the right work->data if we see the 929 * result of list_add() below, see try_to_grab_pending(). 930 */ 931 smp_wmb(); 932 933 list_add_tail(&work->entry, head); 934 935 /* 936 * Ensure either worker_sched_deactivated() sees the above 937 * list_add_tail() or we see zero nr_running to avoid workers 938 * lying around lazily while there are works to be processed. 939 */ 940 smp_mb(); 941 942 if (__need_more_worker(pool)) 943 wake_up_worker(pool); 944 } 945 946 /* 947 * Test whether @work is being queued from another work executing on the 948 * same workqueue. This is rather expensive and should only be used from 949 * cold paths. 950 */ 951 static bool is_chained_work(struct workqueue_struct *wq) 952 { 953 unsigned long flags; 954 unsigned int cpu; 955 956 for_each_gcwq_cpu(cpu) { 957 struct global_cwq *gcwq = get_gcwq(cpu); 958 struct worker *worker; 959 struct hlist_node *pos; 960 int i; 961 962 spin_lock_irqsave(&gcwq->lock, flags); 963 for_each_busy_worker(worker, i, pos, gcwq) { 964 if (worker->task != current) 965 continue; 966 spin_unlock_irqrestore(&gcwq->lock, flags); 967 /* 968 * I'm @worker, no locking necessary. See if @work 969 * is headed to the same workqueue. 970 */ 971 return worker->current_cwq->wq == wq; 972 } 973 spin_unlock_irqrestore(&gcwq->lock, flags); 974 } 975 return false; 976 } 977 978 static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, 979 struct work_struct *work) 980 { 981 struct global_cwq *gcwq; 982 struct cpu_workqueue_struct *cwq; 983 struct list_head *worklist; 984 unsigned int work_flags; 985 unsigned long flags; 986 987 debug_work_activate(work); 988 989 /* if dying, only works from the same workqueue are allowed */ 990 if (unlikely(wq->flags & WQ_DRAINING) && 991 WARN_ON_ONCE(!is_chained_work(wq))) 992 return; 993 994 /* determine gcwq to use */ 995 if (!(wq->flags & WQ_UNBOUND)) { 996 struct global_cwq *last_gcwq; 997 998 if (unlikely(cpu == WORK_CPU_UNBOUND)) 999 cpu = raw_smp_processor_id(); 1000 1001 /* 1002 * It's multi cpu. If @wq is non-reentrant and @work 1003 * was previously on a different cpu, it might still 1004 * be running there, in which case the work needs to 1005 * be queued on that cpu to guarantee non-reentrance. 1006 */ 1007 gcwq = get_gcwq(cpu); 1008 if (wq->flags & WQ_NON_REENTRANT && 1009 (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) { 1010 struct worker *worker; 1011 1012 spin_lock_irqsave(&last_gcwq->lock, flags); 1013 1014 worker = find_worker_executing_work(last_gcwq, work); 1015 1016 if (worker && worker->current_cwq->wq == wq) 1017 gcwq = last_gcwq; 1018 else { 1019 /* meh... not running there, queue here */ 1020 spin_unlock_irqrestore(&last_gcwq->lock, flags); 1021 spin_lock_irqsave(&gcwq->lock, flags); 1022 } 1023 } else 1024 spin_lock_irqsave(&gcwq->lock, flags); 1025 } else { 1026 gcwq = get_gcwq(WORK_CPU_UNBOUND); 1027 spin_lock_irqsave(&gcwq->lock, flags); 1028 } 1029 1030 /* gcwq determined, get cwq and queue */ 1031 cwq = get_cwq(gcwq->cpu, wq); 1032 trace_workqueue_queue_work(cpu, cwq, work); 1033 1034 if (WARN_ON(!list_empty(&work->entry))) { 1035 spin_unlock_irqrestore(&gcwq->lock, flags); 1036 return; 1037 } 1038 1039 cwq->nr_in_flight[cwq->work_color]++; 1040 work_flags = work_color_to_flags(cwq->work_color); 1041 1042 if (likely(cwq->nr_active < cwq->max_active)) { 1043 trace_workqueue_activate_work(work); 1044 cwq->nr_active++; 1045 worklist = &cwq->pool->worklist; 1046 } else { 1047 work_flags |= WORK_STRUCT_DELAYED; 1048 worklist = &cwq->delayed_works; 1049 } 1050 1051 insert_work(cwq, work, worklist, work_flags); 1052 1053 spin_unlock_irqrestore(&gcwq->lock, flags); 1054 } 1055 1056 /** 1057 * queue_work - queue work on a workqueue 1058 * @wq: workqueue to use 1059 * @work: work to queue 1060 * 1061 * Returns 0 if @work was already on a queue, non-zero otherwise. 1062 * 1063 * We queue the work to the CPU on which it was submitted, but if the CPU dies 1064 * it can be processed by another CPU. 1065 */ 1066 int queue_work(struct workqueue_struct *wq, struct work_struct *work) 1067 { 1068 int ret; 1069 1070 ret = queue_work_on(get_cpu(), wq, work); 1071 put_cpu(); 1072 1073 return ret; 1074 } 1075 EXPORT_SYMBOL_GPL(queue_work); 1076 1077 /** 1078 * queue_work_on - queue work on specific cpu 1079 * @cpu: CPU number to execute work on 1080 * @wq: workqueue to use 1081 * @work: work to queue 1082 * 1083 * Returns 0 if @work was already on a queue, non-zero otherwise. 1084 * 1085 * We queue the work to a specific CPU, the caller must ensure it 1086 * can't go away. 1087 */ 1088 int 1089 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) 1090 { 1091 int ret = 0; 1092 1093 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { 1094 __queue_work(cpu, wq, work); 1095 ret = 1; 1096 } 1097 return ret; 1098 } 1099 EXPORT_SYMBOL_GPL(queue_work_on); 1100 1101 static void delayed_work_timer_fn(unsigned long __data) 1102 { 1103 struct delayed_work *dwork = (struct delayed_work *)__data; 1104 struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work); 1105 1106 __queue_work(smp_processor_id(), cwq->wq, &dwork->work); 1107 } 1108 1109 /** 1110 * queue_delayed_work - queue work on a workqueue after delay 1111 * @wq: workqueue to use 1112 * @dwork: delayable work to queue 1113 * @delay: number of jiffies to wait before queueing 1114 * 1115 * Returns 0 if @work was already on a queue, non-zero otherwise. 1116 */ 1117 int queue_delayed_work(struct workqueue_struct *wq, 1118 struct delayed_work *dwork, unsigned long delay) 1119 { 1120 if (delay == 0) 1121 return queue_work(wq, &dwork->work); 1122 1123 return queue_delayed_work_on(-1, wq, dwork, delay); 1124 } 1125 EXPORT_SYMBOL_GPL(queue_delayed_work); 1126 1127 /** 1128 * queue_delayed_work_on - queue work on specific CPU after delay 1129 * @cpu: CPU number to execute work on 1130 * @wq: workqueue to use 1131 * @dwork: work to queue 1132 * @delay: number of jiffies to wait before queueing 1133 * 1134 * Returns 0 if @work was already on a queue, non-zero otherwise. 1135 */ 1136 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 1137 struct delayed_work *dwork, unsigned long delay) 1138 { 1139 int ret = 0; 1140 struct timer_list *timer = &dwork->timer; 1141 struct work_struct *work = &dwork->work; 1142 1143 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { 1144 unsigned int lcpu; 1145 1146 BUG_ON(timer_pending(timer)); 1147 BUG_ON(!list_empty(&work->entry)); 1148 1149 timer_stats_timer_set_start_info(&dwork->timer); 1150 1151 /* 1152 * This stores cwq for the moment, for the timer_fn. 1153 * Note that the work's gcwq is preserved to allow 1154 * reentrance detection for delayed works. 1155 */ 1156 if (!(wq->flags & WQ_UNBOUND)) { 1157 struct global_cwq *gcwq = get_work_gcwq(work); 1158 1159 if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND) 1160 lcpu = gcwq->cpu; 1161 else 1162 lcpu = raw_smp_processor_id(); 1163 } else 1164 lcpu = WORK_CPU_UNBOUND; 1165 1166 set_work_cwq(work, get_cwq(lcpu, wq), 0); 1167 1168 timer->expires = jiffies + delay; 1169 timer->data = (unsigned long)dwork; 1170 timer->function = delayed_work_timer_fn; 1171 1172 if (unlikely(cpu >= 0)) 1173 add_timer_on(timer, cpu); 1174 else 1175 add_timer(timer); 1176 ret = 1; 1177 } 1178 return ret; 1179 } 1180 EXPORT_SYMBOL_GPL(queue_delayed_work_on); 1181 1182 /** 1183 * worker_enter_idle - enter idle state 1184 * @worker: worker which is entering idle state 1185 * 1186 * @worker is entering idle state. Update stats and idle timer if 1187 * necessary. 1188 * 1189 * LOCKING: 1190 * spin_lock_irq(gcwq->lock). 1191 */ 1192 static void worker_enter_idle(struct worker *worker) 1193 { 1194 struct worker_pool *pool = worker->pool; 1195 struct global_cwq *gcwq = pool->gcwq; 1196 1197 BUG_ON(worker->flags & WORKER_IDLE); 1198 BUG_ON(!list_empty(&worker->entry) && 1199 (worker->hentry.next || worker->hentry.pprev)); 1200 1201 /* can't use worker_set_flags(), also called from start_worker() */ 1202 worker->flags |= WORKER_IDLE; 1203 pool->nr_idle++; 1204 worker->last_active = jiffies; 1205 1206 /* idle_list is LIFO */ 1207 list_add(&worker->entry, &pool->idle_list); 1208 1209 if (too_many_workers(pool) && !timer_pending(&pool->idle_timer)) 1210 mod_timer(&pool->idle_timer, jiffies + IDLE_WORKER_TIMEOUT); 1211 1212 /* 1213 * Sanity check nr_running. Because gcwq_unbind_fn() releases 1214 * gcwq->lock between setting %WORKER_UNBOUND and zapping 1215 * nr_running, the warning may trigger spuriously. Check iff 1216 * unbind is not in progress. 1217 */ 1218 WARN_ON_ONCE(!(gcwq->flags & GCWQ_DISASSOCIATED) && 1219 pool->nr_workers == pool->nr_idle && 1220 atomic_read(get_pool_nr_running(pool))); 1221 } 1222 1223 /** 1224 * worker_leave_idle - leave idle state 1225 * @worker: worker which is leaving idle state 1226 * 1227 * @worker is leaving idle state. Update stats. 1228 * 1229 * LOCKING: 1230 * spin_lock_irq(gcwq->lock). 1231 */ 1232 static void worker_leave_idle(struct worker *worker) 1233 { 1234 struct worker_pool *pool = worker->pool; 1235 1236 BUG_ON(!(worker->flags & WORKER_IDLE)); 1237 worker_clr_flags(worker, WORKER_IDLE); 1238 pool->nr_idle--; 1239 list_del_init(&worker->entry); 1240 } 1241 1242 /** 1243 * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock gcwq 1244 * @worker: self 1245 * 1246 * Works which are scheduled while the cpu is online must at least be 1247 * scheduled to a worker which is bound to the cpu so that if they are 1248 * flushed from cpu callbacks while cpu is going down, they are 1249 * guaranteed to execute on the cpu. 1250 * 1251 * This function is to be used by rogue workers and rescuers to bind 1252 * themselves to the target cpu and may race with cpu going down or 1253 * coming online. kthread_bind() can't be used because it may put the 1254 * worker to already dead cpu and set_cpus_allowed_ptr() can't be used 1255 * verbatim as it's best effort and blocking and gcwq may be 1256 * [dis]associated in the meantime. 1257 * 1258 * This function tries set_cpus_allowed() and locks gcwq and verifies the 1259 * binding against %GCWQ_DISASSOCIATED which is set during 1260 * %CPU_DOWN_PREPARE and cleared during %CPU_ONLINE, so if the worker 1261 * enters idle state or fetches works without dropping lock, it can 1262 * guarantee the scheduling requirement described in the first paragraph. 1263 * 1264 * CONTEXT: 1265 * Might sleep. Called without any lock but returns with gcwq->lock 1266 * held. 1267 * 1268 * RETURNS: 1269 * %true if the associated gcwq is online (@worker is successfully 1270 * bound), %false if offline. 1271 */ 1272 static bool worker_maybe_bind_and_lock(struct worker *worker) 1273 __acquires(&gcwq->lock) 1274 { 1275 struct global_cwq *gcwq = worker->pool->gcwq; 1276 struct task_struct *task = worker->task; 1277 1278 while (true) { 1279 /* 1280 * The following call may fail, succeed or succeed 1281 * without actually migrating the task to the cpu if 1282 * it races with cpu hotunplug operation. Verify 1283 * against GCWQ_DISASSOCIATED. 1284 */ 1285 if (!(gcwq->flags & GCWQ_DISASSOCIATED)) 1286 set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu)); 1287 1288 spin_lock_irq(&gcwq->lock); 1289 if (gcwq->flags & GCWQ_DISASSOCIATED) 1290 return false; 1291 if (task_cpu(task) == gcwq->cpu && 1292 cpumask_equal(¤t->cpus_allowed, 1293 get_cpu_mask(gcwq->cpu))) 1294 return true; 1295 spin_unlock_irq(&gcwq->lock); 1296 1297 /* 1298 * We've raced with CPU hot[un]plug. Give it a breather 1299 * and retry migration. cond_resched() is required here; 1300 * otherwise, we might deadlock against cpu_stop trying to 1301 * bring down the CPU on non-preemptive kernel. 1302 */ 1303 cpu_relax(); 1304 cond_resched(); 1305 } 1306 } 1307 1308 struct idle_rebind { 1309 int cnt; /* # workers to be rebound */ 1310 struct completion done; /* all workers rebound */ 1311 }; 1312 1313 /* 1314 * Rebind an idle @worker to its CPU. During CPU onlining, this has to 1315 * happen synchronously for idle workers. worker_thread() will test 1316 * %WORKER_REBIND before leaving idle and call this function. 1317 */ 1318 static void idle_worker_rebind(struct worker *worker) 1319 { 1320 struct global_cwq *gcwq = worker->pool->gcwq; 1321 1322 /* CPU must be online at this point */ 1323 WARN_ON(!worker_maybe_bind_and_lock(worker)); 1324 if (!--worker->idle_rebind->cnt) 1325 complete(&worker->idle_rebind->done); 1326 spin_unlock_irq(&worker->pool->gcwq->lock); 1327 1328 /* we did our part, wait for rebind_workers() to finish up */ 1329 wait_event(gcwq->rebind_hold, !(worker->flags & WORKER_REBIND)); 1330 1331 /* 1332 * rebind_workers() shouldn't finish until all workers passed the 1333 * above WORKER_REBIND wait. Tell it when done. 1334 */ 1335 spin_lock_irq(&worker->pool->gcwq->lock); 1336 if (!--worker->idle_rebind->cnt) 1337 complete(&worker->idle_rebind->done); 1338 spin_unlock_irq(&worker->pool->gcwq->lock); 1339 } 1340 1341 /* 1342 * Function for @worker->rebind.work used to rebind unbound busy workers to 1343 * the associated cpu which is coming back online. This is scheduled by 1344 * cpu up but can race with other cpu hotplug operations and may be 1345 * executed twice without intervening cpu down. 1346 */ 1347 static void busy_worker_rebind_fn(struct work_struct *work) 1348 { 1349 struct worker *worker = container_of(work, struct worker, rebind_work); 1350 struct global_cwq *gcwq = worker->pool->gcwq; 1351 1352 worker_maybe_bind_and_lock(worker); 1353 1354 /* 1355 * %WORKER_REBIND must be cleared even if the above binding failed; 1356 * otherwise, we may confuse the next CPU_UP cycle or oops / get 1357 * stuck by calling idle_worker_rebind() prematurely. If CPU went 1358 * down again inbetween, %WORKER_UNBOUND would be set, so clearing 1359 * %WORKER_REBIND is always safe. 1360 */ 1361 worker_clr_flags(worker, WORKER_REBIND); 1362 1363 spin_unlock_irq(&gcwq->lock); 1364 } 1365 1366 /** 1367 * rebind_workers - rebind all workers of a gcwq to the associated CPU 1368 * @gcwq: gcwq of interest 1369 * 1370 * @gcwq->cpu is coming online. Rebind all workers to the CPU. Rebinding 1371 * is different for idle and busy ones. 1372 * 1373 * The idle ones should be rebound synchronously and idle rebinding should 1374 * be complete before any worker starts executing work items with 1375 * concurrency management enabled; otherwise, scheduler may oops trying to 1376 * wake up non-local idle worker from wq_worker_sleeping(). 1377 * 1378 * This is achieved by repeatedly requesting rebinding until all idle 1379 * workers are known to have been rebound under @gcwq->lock and holding all 1380 * idle workers from becoming busy until idle rebinding is complete. 1381 * 1382 * Once idle workers are rebound, busy workers can be rebound as they 1383 * finish executing their current work items. Queueing the rebind work at 1384 * the head of their scheduled lists is enough. Note that nr_running will 1385 * be properbly bumped as busy workers rebind. 1386 * 1387 * On return, all workers are guaranteed to either be bound or have rebind 1388 * work item scheduled. 1389 */ 1390 static void rebind_workers(struct global_cwq *gcwq) 1391 __releases(&gcwq->lock) __acquires(&gcwq->lock) 1392 { 1393 struct idle_rebind idle_rebind; 1394 struct worker_pool *pool; 1395 struct worker *worker; 1396 struct hlist_node *pos; 1397 int i; 1398 1399 lockdep_assert_held(&gcwq->lock); 1400 1401 for_each_worker_pool(pool, gcwq) 1402 lockdep_assert_held(&pool->manager_mutex); 1403 1404 /* 1405 * Rebind idle workers. Interlocked both ways. We wait for 1406 * workers to rebind via @idle_rebind.done. Workers will wait for 1407 * us to finish up by watching %WORKER_REBIND. 1408 */ 1409 init_completion(&idle_rebind.done); 1410 retry: 1411 idle_rebind.cnt = 1; 1412 INIT_COMPLETION(idle_rebind.done); 1413 1414 /* set REBIND and kick idle ones, we'll wait for these later */ 1415 for_each_worker_pool(pool, gcwq) { 1416 list_for_each_entry(worker, &pool->idle_list, entry) { 1417 unsigned long worker_flags = worker->flags; 1418 1419 if (worker->flags & WORKER_REBIND) 1420 continue; 1421 1422 /* morph UNBOUND to REBIND atomically */ 1423 worker_flags &= ~WORKER_UNBOUND; 1424 worker_flags |= WORKER_REBIND; 1425 ACCESS_ONCE(worker->flags) = worker_flags; 1426 1427 idle_rebind.cnt++; 1428 worker->idle_rebind = &idle_rebind; 1429 1430 /* worker_thread() will call idle_worker_rebind() */ 1431 wake_up_process(worker->task); 1432 } 1433 } 1434 1435 if (--idle_rebind.cnt) { 1436 spin_unlock_irq(&gcwq->lock); 1437 wait_for_completion(&idle_rebind.done); 1438 spin_lock_irq(&gcwq->lock); 1439 /* busy ones might have become idle while waiting, retry */ 1440 goto retry; 1441 } 1442 1443 /* all idle workers are rebound, rebind busy workers */ 1444 for_each_busy_worker(worker, i, pos, gcwq) { 1445 struct work_struct *rebind_work = &worker->rebind_work; 1446 unsigned long worker_flags = worker->flags; 1447 1448 /* morph UNBOUND to REBIND atomically */ 1449 worker_flags &= ~WORKER_UNBOUND; 1450 worker_flags |= WORKER_REBIND; 1451 ACCESS_ONCE(worker->flags) = worker_flags; 1452 1453 if (test_and_set_bit(WORK_STRUCT_PENDING_BIT, 1454 work_data_bits(rebind_work))) 1455 continue; 1456 1457 /* wq doesn't matter, use the default one */ 1458 debug_work_activate(rebind_work); 1459 insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work, 1460 worker->scheduled.next, 1461 work_color_to_flags(WORK_NO_COLOR)); 1462 } 1463 1464 /* 1465 * All idle workers are rebound and waiting for %WORKER_REBIND to 1466 * be cleared inside idle_worker_rebind(). Clear and release. 1467 * Clearing %WORKER_REBIND from this foreign context is safe 1468 * because these workers are still guaranteed to be idle. 1469 * 1470 * We need to make sure all idle workers passed WORKER_REBIND wait 1471 * in idle_worker_rebind() before returning; otherwise, workers can 1472 * get stuck at the wait if hotplug cycle repeats. 1473 */ 1474 idle_rebind.cnt = 1; 1475 INIT_COMPLETION(idle_rebind.done); 1476 1477 for_each_worker_pool(pool, gcwq) { 1478 list_for_each_entry(worker, &pool->idle_list, entry) { 1479 worker->flags &= ~WORKER_REBIND; 1480 idle_rebind.cnt++; 1481 } 1482 } 1483 1484 wake_up_all(&gcwq->rebind_hold); 1485 1486 if (--idle_rebind.cnt) { 1487 spin_unlock_irq(&gcwq->lock); 1488 wait_for_completion(&idle_rebind.done); 1489 spin_lock_irq(&gcwq->lock); 1490 } 1491 } 1492 1493 static struct worker *alloc_worker(void) 1494 { 1495 struct worker *worker; 1496 1497 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 1498 if (worker) { 1499 INIT_LIST_HEAD(&worker->entry); 1500 INIT_LIST_HEAD(&worker->scheduled); 1501 INIT_WORK(&worker->rebind_work, busy_worker_rebind_fn); 1502 /* on creation a worker is in !idle && prep state */ 1503 worker->flags = WORKER_PREP; 1504 } 1505 return worker; 1506 } 1507 1508 /** 1509 * create_worker - create a new workqueue worker 1510 * @pool: pool the new worker will belong to 1511 * 1512 * Create a new worker which is bound to @pool. The returned worker 1513 * can be started by calling start_worker() or destroyed using 1514 * destroy_worker(). 1515 * 1516 * CONTEXT: 1517 * Might sleep. Does GFP_KERNEL allocations. 1518 * 1519 * RETURNS: 1520 * Pointer to the newly created worker. 1521 */ 1522 static struct worker *create_worker(struct worker_pool *pool) 1523 { 1524 struct global_cwq *gcwq = pool->gcwq; 1525 const char *pri = worker_pool_pri(pool) ? "H" : ""; 1526 struct worker *worker = NULL; 1527 int id = -1; 1528 1529 spin_lock_irq(&gcwq->lock); 1530 while (ida_get_new(&pool->worker_ida, &id)) { 1531 spin_unlock_irq(&gcwq->lock); 1532 if (!ida_pre_get(&pool->worker_ida, GFP_KERNEL)) 1533 goto fail; 1534 spin_lock_irq(&gcwq->lock); 1535 } 1536 spin_unlock_irq(&gcwq->lock); 1537 1538 worker = alloc_worker(); 1539 if (!worker) 1540 goto fail; 1541 1542 worker->pool = pool; 1543 worker->id = id; 1544 1545 if (gcwq->cpu != WORK_CPU_UNBOUND) 1546 worker->task = kthread_create_on_node(worker_thread, 1547 worker, cpu_to_node(gcwq->cpu), 1548 "kworker/%u:%d%s", gcwq->cpu, id, pri); 1549 else 1550 worker->task = kthread_create(worker_thread, worker, 1551 "kworker/u:%d%s", id, pri); 1552 if (IS_ERR(worker->task)) 1553 goto fail; 1554 1555 if (worker_pool_pri(pool)) 1556 set_user_nice(worker->task, HIGHPRI_NICE_LEVEL); 1557 1558 /* 1559 * Determine CPU binding of the new worker depending on 1560 * %GCWQ_DISASSOCIATED. The caller is responsible for ensuring the 1561 * flag remains stable across this function. See the comments 1562 * above the flag definition for details. 1563 * 1564 * As an unbound worker may later become a regular one if CPU comes 1565 * online, make sure every worker has %PF_THREAD_BOUND set. 1566 */ 1567 if (!(gcwq->flags & GCWQ_DISASSOCIATED)) { 1568 kthread_bind(worker->task, gcwq->cpu); 1569 } else { 1570 worker->task->flags |= PF_THREAD_BOUND; 1571 worker->flags |= WORKER_UNBOUND; 1572 } 1573 1574 return worker; 1575 fail: 1576 if (id >= 0) { 1577 spin_lock_irq(&gcwq->lock); 1578 ida_remove(&pool->worker_ida, id); 1579 spin_unlock_irq(&gcwq->lock); 1580 } 1581 kfree(worker); 1582 return NULL; 1583 } 1584 1585 /** 1586 * start_worker - start a newly created worker 1587 * @worker: worker to start 1588 * 1589 * Make the gcwq aware of @worker and start it. 1590 * 1591 * CONTEXT: 1592 * spin_lock_irq(gcwq->lock). 1593 */ 1594 static void start_worker(struct worker *worker) 1595 { 1596 worker->flags |= WORKER_STARTED; 1597 worker->pool->nr_workers++; 1598 worker_enter_idle(worker); 1599 wake_up_process(worker->task); 1600 } 1601 1602 /** 1603 * destroy_worker - destroy a workqueue worker 1604 * @worker: worker to be destroyed 1605 * 1606 * Destroy @worker and adjust @gcwq stats accordingly. 1607 * 1608 * CONTEXT: 1609 * spin_lock_irq(gcwq->lock) which is released and regrabbed. 1610 */ 1611 static void destroy_worker(struct worker *worker) 1612 { 1613 struct worker_pool *pool = worker->pool; 1614 struct global_cwq *gcwq = pool->gcwq; 1615 int id = worker->id; 1616 1617 /* sanity check frenzy */ 1618 BUG_ON(worker->current_work); 1619 BUG_ON(!list_empty(&worker->scheduled)); 1620 1621 if (worker->flags & WORKER_STARTED) 1622 pool->nr_workers--; 1623 if (worker->flags & WORKER_IDLE) 1624 pool->nr_idle--; 1625 1626 list_del_init(&worker->entry); 1627 worker->flags |= WORKER_DIE; 1628 1629 spin_unlock_irq(&gcwq->lock); 1630 1631 kthread_stop(worker->task); 1632 kfree(worker); 1633 1634 spin_lock_irq(&gcwq->lock); 1635 ida_remove(&pool->worker_ida, id); 1636 } 1637 1638 static void idle_worker_timeout(unsigned long __pool) 1639 { 1640 struct worker_pool *pool = (void *)__pool; 1641 struct global_cwq *gcwq = pool->gcwq; 1642 1643 spin_lock_irq(&gcwq->lock); 1644 1645 if (too_many_workers(pool)) { 1646 struct worker *worker; 1647 unsigned long expires; 1648 1649 /* idle_list is kept in LIFO order, check the last one */ 1650 worker = list_entry(pool->idle_list.prev, struct worker, entry); 1651 expires = worker->last_active + IDLE_WORKER_TIMEOUT; 1652 1653 if (time_before(jiffies, expires)) 1654 mod_timer(&pool->idle_timer, expires); 1655 else { 1656 /* it's been idle for too long, wake up manager */ 1657 pool->flags |= POOL_MANAGE_WORKERS; 1658 wake_up_worker(pool); 1659 } 1660 } 1661 1662 spin_unlock_irq(&gcwq->lock); 1663 } 1664 1665 static bool send_mayday(struct work_struct *work) 1666 { 1667 struct cpu_workqueue_struct *cwq = get_work_cwq(work); 1668 struct workqueue_struct *wq = cwq->wq; 1669 unsigned int cpu; 1670 1671 if (!(wq->flags & WQ_RESCUER)) 1672 return false; 1673 1674 /* mayday mayday mayday */ 1675 cpu = cwq->pool->gcwq->cpu; 1676 /* WORK_CPU_UNBOUND can't be set in cpumask, use cpu 0 instead */ 1677 if (cpu == WORK_CPU_UNBOUND) 1678 cpu = 0; 1679 if (!mayday_test_and_set_cpu(cpu, wq->mayday_mask)) 1680 wake_up_process(wq->rescuer->task); 1681 return true; 1682 } 1683 1684 static void gcwq_mayday_timeout(unsigned long __pool) 1685 { 1686 struct worker_pool *pool = (void *)__pool; 1687 struct global_cwq *gcwq = pool->gcwq; 1688 struct work_struct *work; 1689 1690 spin_lock_irq(&gcwq->lock); 1691 1692 if (need_to_create_worker(pool)) { 1693 /* 1694 * We've been trying to create a new worker but 1695 * haven't been successful. We might be hitting an 1696 * allocation deadlock. Send distress signals to 1697 * rescuers. 1698 */ 1699 list_for_each_entry(work, &pool->worklist, entry) 1700 send_mayday(work); 1701 } 1702 1703 spin_unlock_irq(&gcwq->lock); 1704 1705 mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INTERVAL); 1706 } 1707 1708 /** 1709 * maybe_create_worker - create a new worker if necessary 1710 * @pool: pool to create a new worker for 1711 * 1712 * Create a new worker for @pool if necessary. @pool is guaranteed to 1713 * have at least one idle worker on return from this function. If 1714 * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is 1715 * sent to all rescuers with works scheduled on @pool to resolve 1716 * possible allocation deadlock. 1717 * 1718 * On return, need_to_create_worker() is guaranteed to be false and 1719 * may_start_working() true. 1720 * 1721 * LOCKING: 1722 * spin_lock_irq(gcwq->lock) which may be released and regrabbed 1723 * multiple times. Does GFP_KERNEL allocations. Called only from 1724 * manager. 1725 * 1726 * RETURNS: 1727 * false if no action was taken and gcwq->lock stayed locked, true 1728 * otherwise. 1729 */ 1730 static bool maybe_create_worker(struct worker_pool *pool) 1731 __releases(&gcwq->lock) 1732 __acquires(&gcwq->lock) 1733 { 1734 struct global_cwq *gcwq = pool->gcwq; 1735 1736 if (!need_to_create_worker(pool)) 1737 return false; 1738 restart: 1739 spin_unlock_irq(&gcwq->lock); 1740 1741 /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */ 1742 mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT); 1743 1744 while (true) { 1745 struct worker *worker; 1746 1747 worker = create_worker(pool); 1748 if (worker) { 1749 del_timer_sync(&pool->mayday_timer); 1750 spin_lock_irq(&gcwq->lock); 1751 start_worker(worker); 1752 BUG_ON(need_to_create_worker(pool)); 1753 return true; 1754 } 1755 1756 if (!need_to_create_worker(pool)) 1757 break; 1758 1759 __set_current_state(TASK_INTERRUPTIBLE); 1760 schedule_timeout(CREATE_COOLDOWN); 1761 1762 if (!need_to_create_worker(pool)) 1763 break; 1764 } 1765 1766 del_timer_sync(&pool->mayday_timer); 1767 spin_lock_irq(&gcwq->lock); 1768 if (need_to_create_worker(pool)) 1769 goto restart; 1770 return true; 1771 } 1772 1773 /** 1774 * maybe_destroy_worker - destroy workers which have been idle for a while 1775 * @pool: pool to destroy workers for 1776 * 1777 * Destroy @pool workers which have been idle for longer than 1778 * IDLE_WORKER_TIMEOUT. 1779 * 1780 * LOCKING: 1781 * spin_lock_irq(gcwq->lock) which may be released and regrabbed 1782 * multiple times. Called only from manager. 1783 * 1784 * RETURNS: 1785 * false if no action was taken and gcwq->lock stayed locked, true 1786 * otherwise. 1787 */ 1788 static bool maybe_destroy_workers(struct worker_pool *pool) 1789 { 1790 bool ret = false; 1791 1792 while (too_many_workers(pool)) { 1793 struct worker *worker; 1794 unsigned long expires; 1795 1796 worker = list_entry(pool->idle_list.prev, struct worker, entry); 1797 expires = worker->last_active + IDLE_WORKER_TIMEOUT; 1798 1799 if (time_before(jiffies, expires)) { 1800 mod_timer(&pool->idle_timer, expires); 1801 break; 1802 } 1803 1804 destroy_worker(worker); 1805 ret = true; 1806 } 1807 1808 return ret; 1809 } 1810 1811 /** 1812 * manage_workers - manage worker pool 1813 * @worker: self 1814 * 1815 * Assume the manager role and manage gcwq worker pool @worker belongs 1816 * to. At any given time, there can be only zero or one manager per 1817 * gcwq. The exclusion is handled automatically by this function. 1818 * 1819 * The caller can safely start processing works on false return. On 1820 * true return, it's guaranteed that need_to_create_worker() is false 1821 * and may_start_working() is true. 1822 * 1823 * CONTEXT: 1824 * spin_lock_irq(gcwq->lock) which may be released and regrabbed 1825 * multiple times. Does GFP_KERNEL allocations. 1826 * 1827 * RETURNS: 1828 * false if no action was taken and gcwq->lock stayed locked, true if 1829 * some action was taken. 1830 */ 1831 static bool manage_workers(struct worker *worker) 1832 { 1833 struct worker_pool *pool = worker->pool; 1834 bool ret = false; 1835 1836 if (pool->flags & POOL_MANAGING_WORKERS) 1837 return ret; 1838 1839 pool->flags |= POOL_MANAGING_WORKERS; 1840 1841 /* 1842 * To simplify both worker management and CPU hotplug, hold off 1843 * management while hotplug is in progress. CPU hotplug path can't 1844 * grab %POOL_MANAGING_WORKERS to achieve this because that can 1845 * lead to idle worker depletion (all become busy thinking someone 1846 * else is managing) which in turn can result in deadlock under 1847 * extreme circumstances. Use @pool->manager_mutex to synchronize 1848 * manager against CPU hotplug. 1849 * 1850 * manager_mutex would always be free unless CPU hotplug is in 1851 * progress. trylock first without dropping @gcwq->lock. 1852 */ 1853 if (unlikely(!mutex_trylock(&pool->manager_mutex))) { 1854 spin_unlock_irq(&pool->gcwq->lock); 1855 mutex_lock(&pool->manager_mutex); 1856 /* 1857 * CPU hotplug could have happened while we were waiting 1858 * for manager_mutex. Hotplug itself can't handle us 1859 * because manager isn't either on idle or busy list, and 1860 * @gcwq's state and ours could have deviated. 1861 * 1862 * As hotplug is now excluded via manager_mutex, we can 1863 * simply try to bind. It will succeed or fail depending 1864 * on @gcwq's current state. Try it and adjust 1865 * %WORKER_UNBOUND accordingly. 1866 */ 1867 if (worker_maybe_bind_and_lock(worker)) 1868 worker->flags &= ~WORKER_UNBOUND; 1869 else 1870 worker->flags |= WORKER_UNBOUND; 1871 1872 ret = true; 1873 } 1874 1875 pool->flags &= ~POOL_MANAGE_WORKERS; 1876 1877 /* 1878 * Destroy and then create so that may_start_working() is true 1879 * on return. 1880 */ 1881 ret |= maybe_destroy_workers(pool); 1882 ret |= maybe_create_worker(pool); 1883 1884 pool->flags &= ~POOL_MANAGING_WORKERS; 1885 mutex_unlock(&pool->manager_mutex); 1886 return ret; 1887 } 1888 1889 /** 1890 * move_linked_works - move linked works to a list 1891 * @work: start of series of works to be scheduled 1892 * @head: target list to append @work to 1893 * @nextp: out paramter for nested worklist walking 1894 * 1895 * Schedule linked works starting from @work to @head. Work series to 1896 * be scheduled starts at @work and includes any consecutive work with 1897 * WORK_STRUCT_LINKED set in its predecessor. 1898 * 1899 * If @nextp is not NULL, it's updated to point to the next work of 1900 * the last scheduled work. This allows move_linked_works() to be 1901 * nested inside outer list_for_each_entry_safe(). 1902 * 1903 * CONTEXT: 1904 * spin_lock_irq(gcwq->lock). 1905 */ 1906 static void move_linked_works(struct work_struct *work, struct list_head *head, 1907 struct work_struct **nextp) 1908 { 1909 struct work_struct *n; 1910 1911 /* 1912 * Linked worklist will always end before the end of the list, 1913 * use NULL for list head. 1914 */ 1915 list_for_each_entry_safe_from(work, n, NULL, entry) { 1916 list_move_tail(&work->entry, head); 1917 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED)) 1918 break; 1919 } 1920 1921 /* 1922 * If we're already inside safe list traversal and have moved 1923 * multiple works to the scheduled queue, the next position 1924 * needs to be updated. 1925 */ 1926 if (nextp) 1927 *nextp = n; 1928 } 1929 1930 static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq) 1931 { 1932 struct work_struct *work = list_first_entry(&cwq->delayed_works, 1933 struct work_struct, entry); 1934 1935 trace_workqueue_activate_work(work); 1936 move_linked_works(work, &cwq->pool->worklist, NULL); 1937 __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work)); 1938 cwq->nr_active++; 1939 } 1940 1941 /** 1942 * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight 1943 * @cwq: cwq of interest 1944 * @color: color of work which left the queue 1945 * @delayed: for a delayed work 1946 * 1947 * A work either has completed or is removed from pending queue, 1948 * decrement nr_in_flight of its cwq and handle workqueue flushing. 1949 * 1950 * CONTEXT: 1951 * spin_lock_irq(gcwq->lock). 1952 */ 1953 static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color, 1954 bool delayed) 1955 { 1956 /* ignore uncolored works */ 1957 if (color == WORK_NO_COLOR) 1958 return; 1959 1960 cwq->nr_in_flight[color]--; 1961 1962 if (!delayed) { 1963 cwq->nr_active--; 1964 if (!list_empty(&cwq->delayed_works)) { 1965 /* one down, submit a delayed one */ 1966 if (cwq->nr_active < cwq->max_active) 1967 cwq_activate_first_delayed(cwq); 1968 } 1969 } 1970 1971 /* is flush in progress and are we at the flushing tip? */ 1972 if (likely(cwq->flush_color != color)) 1973 return; 1974 1975 /* are there still in-flight works? */ 1976 if (cwq->nr_in_flight[color]) 1977 return; 1978 1979 /* this cwq is done, clear flush_color */ 1980 cwq->flush_color = -1; 1981 1982 /* 1983 * If this was the last cwq, wake up the first flusher. It 1984 * will handle the rest. 1985 */ 1986 if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush)) 1987 complete(&cwq->wq->first_flusher->done); 1988 } 1989 1990 /** 1991 * process_one_work - process single work 1992 * @worker: self 1993 * @work: work to process 1994 * 1995 * Process @work. This function contains all the logics necessary to 1996 * process a single work including synchronization against and 1997 * interaction with other workers on the same cpu, queueing and 1998 * flushing. As long as context requirement is met, any worker can 1999 * call this function to process a work. 2000 * 2001 * CONTEXT: 2002 * spin_lock_irq(gcwq->lock) which is released and regrabbed. 2003 */ 2004 static void process_one_work(struct worker *worker, struct work_struct *work) 2005 __releases(&gcwq->lock) 2006 __acquires(&gcwq->lock) 2007 { 2008 struct cpu_workqueue_struct *cwq = get_work_cwq(work); 2009 struct worker_pool *pool = worker->pool; 2010 struct global_cwq *gcwq = pool->gcwq; 2011 struct hlist_head *bwh = busy_worker_head(gcwq, work); 2012 bool cpu_intensive = cwq->wq->flags & WQ_CPU_INTENSIVE; 2013 work_func_t f = work->func; 2014 int work_color; 2015 struct worker *collision; 2016 #ifdef CONFIG_LOCKDEP 2017 /* 2018 * It is permissible to free the struct work_struct from 2019 * inside the function that is called from it, this we need to 2020 * take into account for lockdep too. To avoid bogus "held 2021 * lock freed" warnings as well as problems when looking into 2022 * work->lockdep_map, make a copy and use that here. 2023 */ 2024 struct lockdep_map lockdep_map; 2025 2026 lockdep_copy_map(&lockdep_map, &work->lockdep_map); 2027 #endif 2028 /* 2029 * Ensure we're on the correct CPU. DISASSOCIATED test is 2030 * necessary to avoid spurious warnings from rescuers servicing the 2031 * unbound or a disassociated gcwq. 2032 */ 2033 WARN_ON_ONCE(!(worker->flags & (WORKER_UNBOUND | WORKER_REBIND)) && 2034 !(gcwq->flags & GCWQ_DISASSOCIATED) && 2035 raw_smp_processor_id() != gcwq->cpu); 2036 2037 /* 2038 * A single work shouldn't be executed concurrently by 2039 * multiple workers on a single cpu. Check whether anyone is 2040 * already processing the work. If so, defer the work to the 2041 * currently executing one. 2042 */ 2043 collision = __find_worker_executing_work(gcwq, bwh, work); 2044 if (unlikely(collision)) { 2045 move_linked_works(work, &collision->scheduled, NULL); 2046 return; 2047 } 2048 2049 /* claim and process */ 2050 debug_work_deactivate(work); 2051 hlist_add_head(&worker->hentry, bwh); 2052 worker->current_work = work; 2053 worker->current_cwq = cwq; 2054 work_color = get_work_color(work); 2055 2056 /* record the current cpu number in the work data and dequeue */ 2057 set_work_cpu(work, gcwq->cpu); 2058 list_del_init(&work->entry); 2059 2060 /* 2061 * CPU intensive works don't participate in concurrency 2062 * management. They're the scheduler's responsibility. 2063 */ 2064 if (unlikely(cpu_intensive)) 2065 worker_set_flags(worker, WORKER_CPU_INTENSIVE, true); 2066 2067 /* 2068 * Unbound gcwq isn't concurrency managed and work items should be 2069 * executed ASAP. Wake up another worker if necessary. 2070 */ 2071 if ((worker->flags & WORKER_UNBOUND) && need_more_worker(pool)) 2072 wake_up_worker(pool); 2073 2074 spin_unlock_irq(&gcwq->lock); 2075 2076 work_clear_pending(work); 2077 lock_map_acquire_read(&cwq->wq->lockdep_map); 2078 lock_map_acquire(&lockdep_map); 2079 trace_workqueue_execute_start(work); 2080 f(work); 2081 /* 2082 * While we must be careful to not use "work" after this, the trace 2083 * point will only record its address. 2084 */ 2085 trace_workqueue_execute_end(work); 2086 lock_map_release(&lockdep_map); 2087 lock_map_release(&cwq->wq->lockdep_map); 2088 2089 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 2090 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 2091 "%s/0x%08x/%d\n", 2092 current->comm, preempt_count(), task_pid_nr(current)); 2093 printk(KERN_ERR " last function: "); 2094 print_symbol("%s\n", (unsigned long)f); 2095 debug_show_held_locks(current); 2096 dump_stack(); 2097 } 2098 2099 spin_lock_irq(&gcwq->lock); 2100 2101 /* clear cpu intensive status */ 2102 if (unlikely(cpu_intensive)) 2103 worker_clr_flags(worker, WORKER_CPU_INTENSIVE); 2104 2105 /* we're done with it, release */ 2106 hlist_del_init(&worker->hentry); 2107 worker->current_work = NULL; 2108 worker->current_cwq = NULL; 2109 cwq_dec_nr_in_flight(cwq, work_color, false); 2110 } 2111 2112 /** 2113 * process_scheduled_works - process scheduled works 2114 * @worker: self 2115 * 2116 * Process all scheduled works. Please note that the scheduled list 2117 * may change while processing a work, so this function repeatedly 2118 * fetches a work from the top and executes it. 2119 * 2120 * CONTEXT: 2121 * spin_lock_irq(gcwq->lock) which may be released and regrabbed 2122 * multiple times. 2123 */ 2124 static void process_scheduled_works(struct worker *worker) 2125 { 2126 while (!list_empty(&worker->scheduled)) { 2127 struct work_struct *work = list_first_entry(&worker->scheduled, 2128 struct work_struct, entry); 2129 process_one_work(worker, work); 2130 } 2131 } 2132 2133 /** 2134 * worker_thread - the worker thread function 2135 * @__worker: self 2136 * 2137 * The gcwq worker thread function. There's a single dynamic pool of 2138 * these per each cpu. These workers process all works regardless of 2139 * their specific target workqueue. The only exception is works which 2140 * belong to workqueues with a rescuer which will be explained in 2141 * rescuer_thread(). 2142 */ 2143 static int worker_thread(void *__worker) 2144 { 2145 struct worker *worker = __worker; 2146 struct worker_pool *pool = worker->pool; 2147 struct global_cwq *gcwq = pool->gcwq; 2148 2149 /* tell the scheduler that this is a workqueue worker */ 2150 worker->task->flags |= PF_WQ_WORKER; 2151 woke_up: 2152 spin_lock_irq(&gcwq->lock); 2153 2154 /* 2155 * DIE can be set only while idle and REBIND set while busy has 2156 * @worker->rebind_work scheduled. Checking here is enough. 2157 */ 2158 if (unlikely(worker->flags & (WORKER_REBIND | WORKER_DIE))) { 2159 spin_unlock_irq(&gcwq->lock); 2160 2161 if (worker->flags & WORKER_DIE) { 2162 worker->task->flags &= ~PF_WQ_WORKER; 2163 return 0; 2164 } 2165 2166 idle_worker_rebind(worker); 2167 goto woke_up; 2168 } 2169 2170 worker_leave_idle(worker); 2171 recheck: 2172 /* no more worker necessary? */ 2173 if (!need_more_worker(pool)) 2174 goto sleep; 2175 2176 /* do we need to manage? */ 2177 if (unlikely(!may_start_working(pool)) && manage_workers(worker)) 2178 goto recheck; 2179 2180 /* 2181 * ->scheduled list can only be filled while a worker is 2182 * preparing to process a work or actually processing it. 2183 * Make sure nobody diddled with it while I was sleeping. 2184 */ 2185 BUG_ON(!list_empty(&worker->scheduled)); 2186 2187 /* 2188 * When control reaches this point, we're guaranteed to have 2189 * at least one idle worker or that someone else has already 2190 * assumed the manager role. 2191 */ 2192 worker_clr_flags(worker, WORKER_PREP); 2193 2194 do { 2195 struct work_struct *work = 2196 list_first_entry(&pool->worklist, 2197 struct work_struct, entry); 2198 2199 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) { 2200 /* optimization path, not strictly necessary */ 2201 process_one_work(worker, work); 2202 if (unlikely(!list_empty(&worker->scheduled))) 2203 process_scheduled_works(worker); 2204 } else { 2205 move_linked_works(work, &worker->scheduled, NULL); 2206 process_scheduled_works(worker); 2207 } 2208 } while (keep_working(pool)); 2209 2210 worker_set_flags(worker, WORKER_PREP, false); 2211 sleep: 2212 if (unlikely(need_to_manage_workers(pool)) && manage_workers(worker)) 2213 goto recheck; 2214 2215 /* 2216 * gcwq->lock is held and there's no work to process and no 2217 * need to manage, sleep. Workers are woken up only while 2218 * holding gcwq->lock or from local cpu, so setting the 2219 * current state before releasing gcwq->lock is enough to 2220 * prevent losing any event. 2221 */ 2222 worker_enter_idle(worker); 2223 __set_current_state(TASK_INTERRUPTIBLE); 2224 spin_unlock_irq(&gcwq->lock); 2225 schedule(); 2226 goto woke_up; 2227 } 2228 2229 /** 2230 * rescuer_thread - the rescuer thread function 2231 * @__wq: the associated workqueue 2232 * 2233 * Workqueue rescuer thread function. There's one rescuer for each 2234 * workqueue which has WQ_RESCUER set. 2235 * 2236 * Regular work processing on a gcwq may block trying to create a new 2237 * worker which uses GFP_KERNEL allocation which has slight chance of 2238 * developing into deadlock if some works currently on the same queue 2239 * need to be processed to satisfy the GFP_KERNEL allocation. This is 2240 * the problem rescuer solves. 2241 * 2242 * When such condition is possible, the gcwq summons rescuers of all 2243 * workqueues which have works queued on the gcwq and let them process 2244 * those works so that forward progress can be guaranteed. 2245 * 2246 * This should happen rarely. 2247 */ 2248 static int rescuer_thread(void *__wq) 2249 { 2250 struct workqueue_struct *wq = __wq; 2251 struct worker *rescuer = wq->rescuer; 2252 struct list_head *scheduled = &rescuer->scheduled; 2253 bool is_unbound = wq->flags & WQ_UNBOUND; 2254 unsigned int cpu; 2255 2256 set_user_nice(current, RESCUER_NICE_LEVEL); 2257 repeat: 2258 set_current_state(TASK_INTERRUPTIBLE); 2259 2260 if (kthread_should_stop()) 2261 return 0; 2262 2263 /* 2264 * See whether any cpu is asking for help. Unbounded 2265 * workqueues use cpu 0 in mayday_mask for CPU_UNBOUND. 2266 */ 2267 for_each_mayday_cpu(cpu, wq->mayday_mask) { 2268 unsigned int tcpu = is_unbound ? WORK_CPU_UNBOUND : cpu; 2269 struct cpu_workqueue_struct *cwq = get_cwq(tcpu, wq); 2270 struct worker_pool *pool = cwq->pool; 2271 struct global_cwq *gcwq = pool->gcwq; 2272 struct work_struct *work, *n; 2273 2274 __set_current_state(TASK_RUNNING); 2275 mayday_clear_cpu(cpu, wq->mayday_mask); 2276 2277 /* migrate to the target cpu if possible */ 2278 rescuer->pool = pool; 2279 worker_maybe_bind_and_lock(rescuer); 2280 2281 /* 2282 * Slurp in all works issued via this workqueue and 2283 * process'em. 2284 */ 2285 BUG_ON(!list_empty(&rescuer->scheduled)); 2286 list_for_each_entry_safe(work, n, &pool->worklist, entry) 2287 if (get_work_cwq(work) == cwq) 2288 move_linked_works(work, scheduled, &n); 2289 2290 process_scheduled_works(rescuer); 2291 2292 /* 2293 * Leave this gcwq. If keep_working() is %true, notify a 2294 * regular worker; otherwise, we end up with 0 concurrency 2295 * and stalling the execution. 2296 */ 2297 if (keep_working(pool)) 2298 wake_up_worker(pool); 2299 2300 spin_unlock_irq(&gcwq->lock); 2301 } 2302 2303 schedule(); 2304 goto repeat; 2305 } 2306 2307 struct wq_barrier { 2308 struct work_struct work; 2309 struct completion done; 2310 }; 2311 2312 static void wq_barrier_func(struct work_struct *work) 2313 { 2314 struct wq_barrier *barr = container_of(work, struct wq_barrier, work); 2315 complete(&barr->done); 2316 } 2317 2318 /** 2319 * insert_wq_barrier - insert a barrier work 2320 * @cwq: cwq to insert barrier into 2321 * @barr: wq_barrier to insert 2322 * @target: target work to attach @barr to 2323 * @worker: worker currently executing @target, NULL if @target is not executing 2324 * 2325 * @barr is linked to @target such that @barr is completed only after 2326 * @target finishes execution. Please note that the ordering 2327 * guarantee is observed only with respect to @target and on the local 2328 * cpu. 2329 * 2330 * Currently, a queued barrier can't be canceled. This is because 2331 * try_to_grab_pending() can't determine whether the work to be 2332 * grabbed is at the head of the queue and thus can't clear LINKED 2333 * flag of the previous work while there must be a valid next work 2334 * after a work with LINKED flag set. 2335 * 2336 * Note that when @worker is non-NULL, @target may be modified 2337 * underneath us, so we can't reliably determine cwq from @target. 2338 * 2339 * CONTEXT: 2340 * spin_lock_irq(gcwq->lock). 2341 */ 2342 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, 2343 struct wq_barrier *barr, 2344 struct work_struct *target, struct worker *worker) 2345 { 2346 struct list_head *head; 2347 unsigned int linked = 0; 2348 2349 /* 2350 * debugobject calls are safe here even with gcwq->lock locked 2351 * as we know for sure that this will not trigger any of the 2352 * checks and call back into the fixup functions where we 2353 * might deadlock. 2354 */ 2355 INIT_WORK_ONSTACK(&barr->work, wq_barrier_func); 2356 __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work)); 2357 init_completion(&barr->done); 2358 2359 /* 2360 * If @target is currently being executed, schedule the 2361 * barrier to the worker; otherwise, put it after @target. 2362 */ 2363 if (worker) 2364 head = worker->scheduled.next; 2365 else { 2366 unsigned long *bits = work_data_bits(target); 2367 2368 head = target->entry.next; 2369 /* there can already be other linked works, inherit and set */ 2370 linked = *bits & WORK_STRUCT_LINKED; 2371 __set_bit(WORK_STRUCT_LINKED_BIT, bits); 2372 } 2373 2374 debug_work_activate(&barr->work); 2375 insert_work(cwq, &barr->work, head, 2376 work_color_to_flags(WORK_NO_COLOR) | linked); 2377 } 2378 2379 /** 2380 * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing 2381 * @wq: workqueue being flushed 2382 * @flush_color: new flush color, < 0 for no-op 2383 * @work_color: new work color, < 0 for no-op 2384 * 2385 * Prepare cwqs for workqueue flushing. 2386 * 2387 * If @flush_color is non-negative, flush_color on all cwqs should be 2388 * -1. If no cwq has in-flight commands at the specified color, all 2389 * cwq->flush_color's stay at -1 and %false is returned. If any cwq 2390 * has in flight commands, its cwq->flush_color is set to 2391 * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq 2392 * wakeup logic is armed and %true is returned. 2393 * 2394 * The caller should have initialized @wq->first_flusher prior to 2395 * calling this function with non-negative @flush_color. If 2396 * @flush_color is negative, no flush color update is done and %false 2397 * is returned. 2398 * 2399 * If @work_color is non-negative, all cwqs should have the same 2400 * work_color which is previous to @work_color and all will be 2401 * advanced to @work_color. 2402 * 2403 * CONTEXT: 2404 * mutex_lock(wq->flush_mutex). 2405 * 2406 * RETURNS: 2407 * %true if @flush_color >= 0 and there's something to flush. %false 2408 * otherwise. 2409 */ 2410 static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq, 2411 int flush_color, int work_color) 2412 { 2413 bool wait = false; 2414 unsigned int cpu; 2415 2416 if (flush_color >= 0) { 2417 BUG_ON(atomic_read(&wq->nr_cwqs_to_flush)); 2418 atomic_set(&wq->nr_cwqs_to_flush, 1); 2419 } 2420 2421 for_each_cwq_cpu(cpu, wq) { 2422 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 2423 struct global_cwq *gcwq = cwq->pool->gcwq; 2424 2425 spin_lock_irq(&gcwq->lock); 2426 2427 if (flush_color >= 0) { 2428 BUG_ON(cwq->flush_color != -1); 2429 2430 if (cwq->nr_in_flight[flush_color]) { 2431 cwq->flush_color = flush_color; 2432 atomic_inc(&wq->nr_cwqs_to_flush); 2433 wait = true; 2434 } 2435 } 2436 2437 if (work_color >= 0) { 2438 BUG_ON(work_color != work_next_color(cwq->work_color)); 2439 cwq->work_color = work_color; 2440 } 2441 2442 spin_unlock_irq(&gcwq->lock); 2443 } 2444 2445 if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush)) 2446 complete(&wq->first_flusher->done); 2447 2448 return wait; 2449 } 2450 2451 /** 2452 * flush_workqueue - ensure that any scheduled work has run to completion. 2453 * @wq: workqueue to flush 2454 * 2455 * Forces execution of the workqueue and blocks until its completion. 2456 * This is typically used in driver shutdown handlers. 2457 * 2458 * We sleep until all works which were queued on entry have been handled, 2459 * but we are not livelocked by new incoming ones. 2460 */ 2461 void flush_workqueue(struct workqueue_struct *wq) 2462 { 2463 struct wq_flusher this_flusher = { 2464 .list = LIST_HEAD_INIT(this_flusher.list), 2465 .flush_color = -1, 2466 .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done), 2467 }; 2468 int next_color; 2469 2470 lock_map_acquire(&wq->lockdep_map); 2471 lock_map_release(&wq->lockdep_map); 2472 2473 mutex_lock(&wq->flush_mutex); 2474 2475 /* 2476 * Start-to-wait phase 2477 */ 2478 next_color = work_next_color(wq->work_color); 2479 2480 if (next_color != wq->flush_color) { 2481 /* 2482 * Color space is not full. The current work_color 2483 * becomes our flush_color and work_color is advanced 2484 * by one. 2485 */ 2486 BUG_ON(!list_empty(&wq->flusher_overflow)); 2487 this_flusher.flush_color = wq->work_color; 2488 wq->work_color = next_color; 2489 2490 if (!wq->first_flusher) { 2491 /* no flush in progress, become the first flusher */ 2492 BUG_ON(wq->flush_color != this_flusher.flush_color); 2493 2494 wq->first_flusher = &this_flusher; 2495 2496 if (!flush_workqueue_prep_cwqs(wq, wq->flush_color, 2497 wq->work_color)) { 2498 /* nothing to flush, done */ 2499 wq->flush_color = next_color; 2500 wq->first_flusher = NULL; 2501 goto out_unlock; 2502 } 2503 } else { 2504 /* wait in queue */ 2505 BUG_ON(wq->flush_color == this_flusher.flush_color); 2506 list_add_tail(&this_flusher.list, &wq->flusher_queue); 2507 flush_workqueue_prep_cwqs(wq, -1, wq->work_color); 2508 } 2509 } else { 2510 /* 2511 * Oops, color space is full, wait on overflow queue. 2512 * The next flush completion will assign us 2513 * flush_color and transfer to flusher_queue. 2514 */ 2515 list_add_tail(&this_flusher.list, &wq->flusher_overflow); 2516 } 2517 2518 mutex_unlock(&wq->flush_mutex); 2519 2520 wait_for_completion(&this_flusher.done); 2521 2522 /* 2523 * Wake-up-and-cascade phase 2524 * 2525 * First flushers are responsible for cascading flushes and 2526 * handling overflow. Non-first flushers can simply return. 2527 */ 2528 if (wq->first_flusher != &this_flusher) 2529 return; 2530 2531 mutex_lock(&wq->flush_mutex); 2532 2533 /* we might have raced, check again with mutex held */ 2534 if (wq->first_flusher != &this_flusher) 2535 goto out_unlock; 2536 2537 wq->first_flusher = NULL; 2538 2539 BUG_ON(!list_empty(&this_flusher.list)); 2540 BUG_ON(wq->flush_color != this_flusher.flush_color); 2541 2542 while (true) { 2543 struct wq_flusher *next, *tmp; 2544 2545 /* complete all the flushers sharing the current flush color */ 2546 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) { 2547 if (next->flush_color != wq->flush_color) 2548 break; 2549 list_del_init(&next->list); 2550 complete(&next->done); 2551 } 2552 2553 BUG_ON(!list_empty(&wq->flusher_overflow) && 2554 wq->flush_color != work_next_color(wq->work_color)); 2555 2556 /* this flush_color is finished, advance by one */ 2557 wq->flush_color = work_next_color(wq->flush_color); 2558 2559 /* one color has been freed, handle overflow queue */ 2560 if (!list_empty(&wq->flusher_overflow)) { 2561 /* 2562 * Assign the same color to all overflowed 2563 * flushers, advance work_color and append to 2564 * flusher_queue. This is the start-to-wait 2565 * phase for these overflowed flushers. 2566 */ 2567 list_for_each_entry(tmp, &wq->flusher_overflow, list) 2568 tmp->flush_color = wq->work_color; 2569 2570 wq->work_color = work_next_color(wq->work_color); 2571 2572 list_splice_tail_init(&wq->flusher_overflow, 2573 &wq->flusher_queue); 2574 flush_workqueue_prep_cwqs(wq, -1, wq->work_color); 2575 } 2576 2577 if (list_empty(&wq->flusher_queue)) { 2578 BUG_ON(wq->flush_color != wq->work_color); 2579 break; 2580 } 2581 2582 /* 2583 * Need to flush more colors. Make the next flusher 2584 * the new first flusher and arm cwqs. 2585 */ 2586 BUG_ON(wq->flush_color == wq->work_color); 2587 BUG_ON(wq->flush_color != next->flush_color); 2588 2589 list_del_init(&next->list); 2590 wq->first_flusher = next; 2591 2592 if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1)) 2593 break; 2594 2595 /* 2596 * Meh... this color is already done, clear first 2597 * flusher and repeat cascading. 2598 */ 2599 wq->first_flusher = NULL; 2600 } 2601 2602 out_unlock: 2603 mutex_unlock(&wq->flush_mutex); 2604 } 2605 EXPORT_SYMBOL_GPL(flush_workqueue); 2606 2607 /** 2608 * drain_workqueue - drain a workqueue 2609 * @wq: workqueue to drain 2610 * 2611 * Wait until the workqueue becomes empty. While draining is in progress, 2612 * only chain queueing is allowed. IOW, only currently pending or running 2613 * work items on @wq can queue further work items on it. @wq is flushed 2614 * repeatedly until it becomes empty. The number of flushing is detemined 2615 * by the depth of chaining and should be relatively short. Whine if it 2616 * takes too long. 2617 */ 2618 void drain_workqueue(struct workqueue_struct *wq) 2619 { 2620 unsigned int flush_cnt = 0; 2621 unsigned int cpu; 2622 2623 /* 2624 * __queue_work() needs to test whether there are drainers, is much 2625 * hotter than drain_workqueue() and already looks at @wq->flags. 2626 * Use WQ_DRAINING so that queue doesn't have to check nr_drainers. 2627 */ 2628 spin_lock(&workqueue_lock); 2629 if (!wq->nr_drainers++) 2630 wq->flags |= WQ_DRAINING; 2631 spin_unlock(&workqueue_lock); 2632 reflush: 2633 flush_workqueue(wq); 2634 2635 for_each_cwq_cpu(cpu, wq) { 2636 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 2637 bool drained; 2638 2639 spin_lock_irq(&cwq->pool->gcwq->lock); 2640 drained = !cwq->nr_active && list_empty(&cwq->delayed_works); 2641 spin_unlock_irq(&cwq->pool->gcwq->lock); 2642 2643 if (drained) 2644 continue; 2645 2646 if (++flush_cnt == 10 || 2647 (flush_cnt % 100 == 0 && flush_cnt <= 1000)) 2648 pr_warning("workqueue %s: flush on destruction isn't complete after %u tries\n", 2649 wq->name, flush_cnt); 2650 goto reflush; 2651 } 2652 2653 spin_lock(&workqueue_lock); 2654 if (!--wq->nr_drainers) 2655 wq->flags &= ~WQ_DRAINING; 2656 spin_unlock(&workqueue_lock); 2657 } 2658 EXPORT_SYMBOL_GPL(drain_workqueue); 2659 2660 static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr, 2661 bool wait_executing) 2662 { 2663 struct worker *worker = NULL; 2664 struct global_cwq *gcwq; 2665 struct cpu_workqueue_struct *cwq; 2666 2667 might_sleep(); 2668 gcwq = get_work_gcwq(work); 2669 if (!gcwq) 2670 return false; 2671 2672 spin_lock_irq(&gcwq->lock); 2673 if (!list_empty(&work->entry)) { 2674 /* 2675 * See the comment near try_to_grab_pending()->smp_rmb(). 2676 * If it was re-queued to a different gcwq under us, we 2677 * are not going to wait. 2678 */ 2679 smp_rmb(); 2680 cwq = get_work_cwq(work); 2681 if (unlikely(!cwq || gcwq != cwq->pool->gcwq)) 2682 goto already_gone; 2683 } else if (wait_executing) { 2684 worker = find_worker_executing_work(gcwq, work); 2685 if (!worker) 2686 goto already_gone; 2687 cwq = worker->current_cwq; 2688 } else 2689 goto already_gone; 2690 2691 insert_wq_barrier(cwq, barr, work, worker); 2692 spin_unlock_irq(&gcwq->lock); 2693 2694 /* 2695 * If @max_active is 1 or rescuer is in use, flushing another work 2696 * item on the same workqueue may lead to deadlock. Make sure the 2697 * flusher is not running on the same workqueue by verifying write 2698 * access. 2699 */ 2700 if (cwq->wq->saved_max_active == 1 || cwq->wq->flags & WQ_RESCUER) 2701 lock_map_acquire(&cwq->wq->lockdep_map); 2702 else 2703 lock_map_acquire_read(&cwq->wq->lockdep_map); 2704 lock_map_release(&cwq->wq->lockdep_map); 2705 2706 return true; 2707 already_gone: 2708 spin_unlock_irq(&gcwq->lock); 2709 return false; 2710 } 2711 2712 /** 2713 * flush_work - wait for a work to finish executing the last queueing instance 2714 * @work: the work to flush 2715 * 2716 * Wait until @work has finished execution. This function considers 2717 * only the last queueing instance of @work. If @work has been 2718 * enqueued across different CPUs on a non-reentrant workqueue or on 2719 * multiple workqueues, @work might still be executing on return on 2720 * some of the CPUs from earlier queueing. 2721 * 2722 * If @work was queued only on a non-reentrant, ordered or unbound 2723 * workqueue, @work is guaranteed to be idle on return if it hasn't 2724 * been requeued since flush started. 2725 * 2726 * RETURNS: 2727 * %true if flush_work() waited for the work to finish execution, 2728 * %false if it was already idle. 2729 */ 2730 bool flush_work(struct work_struct *work) 2731 { 2732 struct wq_barrier barr; 2733 2734 lock_map_acquire(&work->lockdep_map); 2735 lock_map_release(&work->lockdep_map); 2736 2737 if (start_flush_work(work, &barr, true)) { 2738 wait_for_completion(&barr.done); 2739 destroy_work_on_stack(&barr.work); 2740 return true; 2741 } else 2742 return false; 2743 } 2744 EXPORT_SYMBOL_GPL(flush_work); 2745 2746 static bool wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work) 2747 { 2748 struct wq_barrier barr; 2749 struct worker *worker; 2750 2751 spin_lock_irq(&gcwq->lock); 2752 2753 worker = find_worker_executing_work(gcwq, work); 2754 if (unlikely(worker)) 2755 insert_wq_barrier(worker->current_cwq, &barr, work, worker); 2756 2757 spin_unlock_irq(&gcwq->lock); 2758 2759 if (unlikely(worker)) { 2760 wait_for_completion(&barr.done); 2761 destroy_work_on_stack(&barr.work); 2762 return true; 2763 } else 2764 return false; 2765 } 2766 2767 static bool wait_on_work(struct work_struct *work) 2768 { 2769 bool ret = false; 2770 int cpu; 2771 2772 might_sleep(); 2773 2774 lock_map_acquire(&work->lockdep_map); 2775 lock_map_release(&work->lockdep_map); 2776 2777 for_each_gcwq_cpu(cpu) 2778 ret |= wait_on_cpu_work(get_gcwq(cpu), work); 2779 return ret; 2780 } 2781 2782 /** 2783 * flush_work_sync - wait until a work has finished execution 2784 * @work: the work to flush 2785 * 2786 * Wait until @work has finished execution. On return, it's 2787 * guaranteed that all queueing instances of @work which happened 2788 * before this function is called are finished. In other words, if 2789 * @work hasn't been requeued since this function was called, @work is 2790 * guaranteed to be idle on return. 2791 * 2792 * RETURNS: 2793 * %true if flush_work_sync() waited for the work to finish execution, 2794 * %false if it was already idle. 2795 */ 2796 bool flush_work_sync(struct work_struct *work) 2797 { 2798 struct wq_barrier barr; 2799 bool pending, waited; 2800 2801 /* we'll wait for executions separately, queue barr only if pending */ 2802 pending = start_flush_work(work, &barr, false); 2803 2804 /* wait for executions to finish */ 2805 waited = wait_on_work(work); 2806 2807 /* wait for the pending one */ 2808 if (pending) { 2809 wait_for_completion(&barr.done); 2810 destroy_work_on_stack(&barr.work); 2811 } 2812 2813 return pending || waited; 2814 } 2815 EXPORT_SYMBOL_GPL(flush_work_sync); 2816 2817 /* 2818 * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit, 2819 * so this work can't be re-armed in any way. 2820 */ 2821 static int try_to_grab_pending(struct work_struct *work) 2822 { 2823 struct global_cwq *gcwq; 2824 int ret = -1; 2825 2826 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) 2827 return 0; 2828 2829 /* 2830 * The queueing is in progress, or it is already queued. Try to 2831 * steal it from ->worklist without clearing WORK_STRUCT_PENDING. 2832 */ 2833 gcwq = get_work_gcwq(work); 2834 if (!gcwq) 2835 return ret; 2836 2837 spin_lock_irq(&gcwq->lock); 2838 if (!list_empty(&work->entry)) { 2839 /* 2840 * This work is queued, but perhaps we locked the wrong gcwq. 2841 * In that case we must see the new value after rmb(), see 2842 * insert_work()->wmb(). 2843 */ 2844 smp_rmb(); 2845 if (gcwq == get_work_gcwq(work)) { 2846 debug_work_deactivate(work); 2847 list_del_init(&work->entry); 2848 cwq_dec_nr_in_flight(get_work_cwq(work), 2849 get_work_color(work), 2850 *work_data_bits(work) & WORK_STRUCT_DELAYED); 2851 ret = 1; 2852 } 2853 } 2854 spin_unlock_irq(&gcwq->lock); 2855 2856 return ret; 2857 } 2858 2859 static bool __cancel_work_timer(struct work_struct *work, 2860 struct timer_list* timer) 2861 { 2862 int ret; 2863 2864 do { 2865 ret = (timer && likely(del_timer(timer))); 2866 if (!ret) 2867 ret = try_to_grab_pending(work); 2868 wait_on_work(work); 2869 } while (unlikely(ret < 0)); 2870 2871 clear_work_data(work); 2872 return ret; 2873 } 2874 2875 /** 2876 * cancel_work_sync - cancel a work and wait for it to finish 2877 * @work: the work to cancel 2878 * 2879 * Cancel @work and wait for its execution to finish. This function 2880 * can be used even if the work re-queues itself or migrates to 2881 * another workqueue. On return from this function, @work is 2882 * guaranteed to be not pending or executing on any CPU. 2883 * 2884 * cancel_work_sync(&delayed_work->work) must not be used for 2885 * delayed_work's. Use cancel_delayed_work_sync() instead. 2886 * 2887 * The caller must ensure that the workqueue on which @work was last 2888 * queued can't be destroyed before this function returns. 2889 * 2890 * RETURNS: 2891 * %true if @work was pending, %false otherwise. 2892 */ 2893 bool cancel_work_sync(struct work_struct *work) 2894 { 2895 return __cancel_work_timer(work, NULL); 2896 } 2897 EXPORT_SYMBOL_GPL(cancel_work_sync); 2898 2899 /** 2900 * flush_delayed_work - wait for a dwork to finish executing the last queueing 2901 * @dwork: the delayed work to flush 2902 * 2903 * Delayed timer is cancelled and the pending work is queued for 2904 * immediate execution. Like flush_work(), this function only 2905 * considers the last queueing instance of @dwork. 2906 * 2907 * RETURNS: 2908 * %true if flush_work() waited for the work to finish execution, 2909 * %false if it was already idle. 2910 */ 2911 bool flush_delayed_work(struct delayed_work *dwork) 2912 { 2913 if (del_timer_sync(&dwork->timer)) 2914 __queue_work(raw_smp_processor_id(), 2915 get_work_cwq(&dwork->work)->wq, &dwork->work); 2916 return flush_work(&dwork->work); 2917 } 2918 EXPORT_SYMBOL(flush_delayed_work); 2919 2920 /** 2921 * flush_delayed_work_sync - wait for a dwork to finish 2922 * @dwork: the delayed work to flush 2923 * 2924 * Delayed timer is cancelled and the pending work is queued for 2925 * execution immediately. Other than timer handling, its behavior 2926 * is identical to flush_work_sync(). 2927 * 2928 * RETURNS: 2929 * %true if flush_work_sync() waited for the work to finish execution, 2930 * %false if it was already idle. 2931 */ 2932 bool flush_delayed_work_sync(struct delayed_work *dwork) 2933 { 2934 if (del_timer_sync(&dwork->timer)) 2935 __queue_work(raw_smp_processor_id(), 2936 get_work_cwq(&dwork->work)->wq, &dwork->work); 2937 return flush_work_sync(&dwork->work); 2938 } 2939 EXPORT_SYMBOL(flush_delayed_work_sync); 2940 2941 /** 2942 * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish 2943 * @dwork: the delayed work cancel 2944 * 2945 * This is cancel_work_sync() for delayed works. 2946 * 2947 * RETURNS: 2948 * %true if @dwork was pending, %false otherwise. 2949 */ 2950 bool cancel_delayed_work_sync(struct delayed_work *dwork) 2951 { 2952 return __cancel_work_timer(&dwork->work, &dwork->timer); 2953 } 2954 EXPORT_SYMBOL(cancel_delayed_work_sync); 2955 2956 /** 2957 * schedule_work - put work task in global workqueue 2958 * @work: job to be done 2959 * 2960 * Returns zero if @work was already on the kernel-global workqueue and 2961 * non-zero otherwise. 2962 * 2963 * This puts a job in the kernel-global workqueue if it was not already 2964 * queued and leaves it in the same position on the kernel-global 2965 * workqueue otherwise. 2966 */ 2967 int schedule_work(struct work_struct *work) 2968 { 2969 return queue_work(system_wq, work); 2970 } 2971 EXPORT_SYMBOL(schedule_work); 2972 2973 /* 2974 * schedule_work_on - put work task on a specific cpu 2975 * @cpu: cpu to put the work task on 2976 * @work: job to be done 2977 * 2978 * This puts a job on a specific cpu 2979 */ 2980 int schedule_work_on(int cpu, struct work_struct *work) 2981 { 2982 return queue_work_on(cpu, system_wq, work); 2983 } 2984 EXPORT_SYMBOL(schedule_work_on); 2985 2986 /** 2987 * schedule_delayed_work - put work task in global workqueue after delay 2988 * @dwork: job to be done 2989 * @delay: number of jiffies to wait or 0 for immediate execution 2990 * 2991 * After waiting for a given time this puts a job in the kernel-global 2992 * workqueue. 2993 */ 2994 int schedule_delayed_work(struct delayed_work *dwork, 2995 unsigned long delay) 2996 { 2997 return queue_delayed_work(system_wq, dwork, delay); 2998 } 2999 EXPORT_SYMBOL(schedule_delayed_work); 3000 3001 /** 3002 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 3003 * @cpu: cpu to use 3004 * @dwork: job to be done 3005 * @delay: number of jiffies to wait 3006 * 3007 * After waiting for a given time this puts a job in the kernel-global 3008 * workqueue on the specified CPU. 3009 */ 3010 int schedule_delayed_work_on(int cpu, 3011 struct delayed_work *dwork, unsigned long delay) 3012 { 3013 return queue_delayed_work_on(cpu, system_wq, dwork, delay); 3014 } 3015 EXPORT_SYMBOL(schedule_delayed_work_on); 3016 3017 /** 3018 * schedule_on_each_cpu - execute a function synchronously on each online CPU 3019 * @func: the function to call 3020 * 3021 * schedule_on_each_cpu() executes @func on each online CPU using the 3022 * system workqueue and blocks until all CPUs have completed. 3023 * schedule_on_each_cpu() is very slow. 3024 * 3025 * RETURNS: 3026 * 0 on success, -errno on failure. 3027 */ 3028 int schedule_on_each_cpu(work_func_t func) 3029 { 3030 int cpu; 3031 struct work_struct __percpu *works; 3032 3033 works = alloc_percpu(struct work_struct); 3034 if (!works) 3035 return -ENOMEM; 3036 3037 get_online_cpus(); 3038 3039 for_each_online_cpu(cpu) { 3040 struct work_struct *work = per_cpu_ptr(works, cpu); 3041 3042 INIT_WORK(work, func); 3043 schedule_work_on(cpu, work); 3044 } 3045 3046 for_each_online_cpu(cpu) 3047 flush_work(per_cpu_ptr(works, cpu)); 3048 3049 put_online_cpus(); 3050 free_percpu(works); 3051 return 0; 3052 } 3053 3054 /** 3055 * flush_scheduled_work - ensure that any scheduled work has run to completion. 3056 * 3057 * Forces execution of the kernel-global workqueue and blocks until its 3058 * completion. 3059 * 3060 * Think twice before calling this function! It's very easy to get into 3061 * trouble if you don't take great care. Either of the following situations 3062 * will lead to deadlock: 3063 * 3064 * One of the work items currently on the workqueue needs to acquire 3065 * a lock held by your code or its caller. 3066 * 3067 * Your code is running in the context of a work routine. 3068 * 3069 * They will be detected by lockdep when they occur, but the first might not 3070 * occur very often. It depends on what work items are on the workqueue and 3071 * what locks they need, which you have no control over. 3072 * 3073 * In most situations flushing the entire workqueue is overkill; you merely 3074 * need to know that a particular work item isn't queued and isn't running. 3075 * In such cases you should use cancel_delayed_work_sync() or 3076 * cancel_work_sync() instead. 3077 */ 3078 void flush_scheduled_work(void) 3079 { 3080 flush_workqueue(system_wq); 3081 } 3082 EXPORT_SYMBOL(flush_scheduled_work); 3083 3084 /** 3085 * execute_in_process_context - reliably execute the routine with user context 3086 * @fn: the function to execute 3087 * @ew: guaranteed storage for the execute work structure (must 3088 * be available when the work executes) 3089 * 3090 * Executes the function immediately if process context is available, 3091 * otherwise schedules the function for delayed execution. 3092 * 3093 * Returns: 0 - function was executed 3094 * 1 - function was scheduled for execution 3095 */ 3096 int execute_in_process_context(work_func_t fn, struct execute_work *ew) 3097 { 3098 if (!in_interrupt()) { 3099 fn(&ew->work); 3100 return 0; 3101 } 3102 3103 INIT_WORK(&ew->work, fn); 3104 schedule_work(&ew->work); 3105 3106 return 1; 3107 } 3108 EXPORT_SYMBOL_GPL(execute_in_process_context); 3109 3110 int keventd_up(void) 3111 { 3112 return system_wq != NULL; 3113 } 3114 3115 static int alloc_cwqs(struct workqueue_struct *wq) 3116 { 3117 /* 3118 * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS. 3119 * Make sure that the alignment isn't lower than that of 3120 * unsigned long long. 3121 */ 3122 const size_t size = sizeof(struct cpu_workqueue_struct); 3123 const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS, 3124 __alignof__(unsigned long long)); 3125 3126 if (!(wq->flags & WQ_UNBOUND)) 3127 wq->cpu_wq.pcpu = __alloc_percpu(size, align); 3128 else { 3129 void *ptr; 3130 3131 /* 3132 * Allocate enough room to align cwq and put an extra 3133 * pointer at the end pointing back to the originally 3134 * allocated pointer which will be used for free. 3135 */ 3136 ptr = kzalloc(size + align + sizeof(void *), GFP_KERNEL); 3137 if (ptr) { 3138 wq->cpu_wq.single = PTR_ALIGN(ptr, align); 3139 *(void **)(wq->cpu_wq.single + 1) = ptr; 3140 } 3141 } 3142 3143 /* just in case, make sure it's actually aligned */ 3144 BUG_ON(!IS_ALIGNED(wq->cpu_wq.v, align)); 3145 return wq->cpu_wq.v ? 0 : -ENOMEM; 3146 } 3147 3148 static void free_cwqs(struct workqueue_struct *wq) 3149 { 3150 if (!(wq->flags & WQ_UNBOUND)) 3151 free_percpu(wq->cpu_wq.pcpu); 3152 else if (wq->cpu_wq.single) { 3153 /* the pointer to free is stored right after the cwq */ 3154 kfree(*(void **)(wq->cpu_wq.single + 1)); 3155 } 3156 } 3157 3158 static int wq_clamp_max_active(int max_active, unsigned int flags, 3159 const char *name) 3160 { 3161 int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE; 3162 3163 if (max_active < 1 || max_active > lim) 3164 printk(KERN_WARNING "workqueue: max_active %d requested for %s " 3165 "is out of range, clamping between %d and %d\n", 3166 max_active, name, 1, lim); 3167 3168 return clamp_val(max_active, 1, lim); 3169 } 3170 3171 struct workqueue_struct *__alloc_workqueue_key(const char *fmt, 3172 unsigned int flags, 3173 int max_active, 3174 struct lock_class_key *key, 3175 const char *lock_name, ...) 3176 { 3177 va_list args, args1; 3178 struct workqueue_struct *wq; 3179 unsigned int cpu; 3180 size_t namelen; 3181 3182 /* determine namelen, allocate wq and format name */ 3183 va_start(args, lock_name); 3184 va_copy(args1, args); 3185 namelen = vsnprintf(NULL, 0, fmt, args) + 1; 3186 3187 wq = kzalloc(sizeof(*wq) + namelen, GFP_KERNEL); 3188 if (!wq) 3189 goto err; 3190 3191 vsnprintf(wq->name, namelen, fmt, args1); 3192 va_end(args); 3193 va_end(args1); 3194 3195 /* 3196 * Workqueues which may be used during memory reclaim should 3197 * have a rescuer to guarantee forward progress. 3198 */ 3199 if (flags & WQ_MEM_RECLAIM) 3200 flags |= WQ_RESCUER; 3201 3202 max_active = max_active ?: WQ_DFL_ACTIVE; 3203 max_active = wq_clamp_max_active(max_active, flags, wq->name); 3204 3205 /* init wq */ 3206 wq->flags = flags; 3207 wq->saved_max_active = max_active; 3208 mutex_init(&wq->flush_mutex); 3209 atomic_set(&wq->nr_cwqs_to_flush, 0); 3210 INIT_LIST_HEAD(&wq->flusher_queue); 3211 INIT_LIST_HEAD(&wq->flusher_overflow); 3212 3213 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 3214 INIT_LIST_HEAD(&wq->list); 3215 3216 if (alloc_cwqs(wq) < 0) 3217 goto err; 3218 3219 for_each_cwq_cpu(cpu, wq) { 3220 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3221 struct global_cwq *gcwq = get_gcwq(cpu); 3222 int pool_idx = (bool)(flags & WQ_HIGHPRI); 3223 3224 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK); 3225 cwq->pool = &gcwq->pools[pool_idx]; 3226 cwq->wq = wq; 3227 cwq->flush_color = -1; 3228 cwq->max_active = max_active; 3229 INIT_LIST_HEAD(&cwq->delayed_works); 3230 } 3231 3232 if (flags & WQ_RESCUER) { 3233 struct worker *rescuer; 3234 3235 if (!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL)) 3236 goto err; 3237 3238 wq->rescuer = rescuer = alloc_worker(); 3239 if (!rescuer) 3240 goto err; 3241 3242 rescuer->task = kthread_create(rescuer_thread, wq, "%s", 3243 wq->name); 3244 if (IS_ERR(rescuer->task)) 3245 goto err; 3246 3247 rescuer->task->flags |= PF_THREAD_BOUND; 3248 wake_up_process(rescuer->task); 3249 } 3250 3251 /* 3252 * workqueue_lock protects global freeze state and workqueues 3253 * list. Grab it, set max_active accordingly and add the new 3254 * workqueue to workqueues list. 3255 */ 3256 spin_lock(&workqueue_lock); 3257 3258 if (workqueue_freezing && wq->flags & WQ_FREEZABLE) 3259 for_each_cwq_cpu(cpu, wq) 3260 get_cwq(cpu, wq)->max_active = 0; 3261 3262 list_add(&wq->list, &workqueues); 3263 3264 spin_unlock(&workqueue_lock); 3265 3266 return wq; 3267 err: 3268 if (wq) { 3269 free_cwqs(wq); 3270 free_mayday_mask(wq->mayday_mask); 3271 kfree(wq->rescuer); 3272 kfree(wq); 3273 } 3274 return NULL; 3275 } 3276 EXPORT_SYMBOL_GPL(__alloc_workqueue_key); 3277 3278 /** 3279 * destroy_workqueue - safely terminate a workqueue 3280 * @wq: target workqueue 3281 * 3282 * Safely destroy a workqueue. All work currently pending will be done first. 3283 */ 3284 void destroy_workqueue(struct workqueue_struct *wq) 3285 { 3286 unsigned int cpu; 3287 3288 /* drain it before proceeding with destruction */ 3289 drain_workqueue(wq); 3290 3291 /* 3292 * wq list is used to freeze wq, remove from list after 3293 * flushing is complete in case freeze races us. 3294 */ 3295 spin_lock(&workqueue_lock); 3296 list_del(&wq->list); 3297 spin_unlock(&workqueue_lock); 3298 3299 /* sanity check */ 3300 for_each_cwq_cpu(cpu, wq) { 3301 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3302 int i; 3303 3304 for (i = 0; i < WORK_NR_COLORS; i++) 3305 BUG_ON(cwq->nr_in_flight[i]); 3306 BUG_ON(cwq->nr_active); 3307 BUG_ON(!list_empty(&cwq->delayed_works)); 3308 } 3309 3310 if (wq->flags & WQ_RESCUER) { 3311 kthread_stop(wq->rescuer->task); 3312 free_mayday_mask(wq->mayday_mask); 3313 kfree(wq->rescuer); 3314 } 3315 3316 free_cwqs(wq); 3317 kfree(wq); 3318 } 3319 EXPORT_SYMBOL_GPL(destroy_workqueue); 3320 3321 /** 3322 * workqueue_set_max_active - adjust max_active of a workqueue 3323 * @wq: target workqueue 3324 * @max_active: new max_active value. 3325 * 3326 * Set max_active of @wq to @max_active. 3327 * 3328 * CONTEXT: 3329 * Don't call from IRQ context. 3330 */ 3331 void workqueue_set_max_active(struct workqueue_struct *wq, int max_active) 3332 { 3333 unsigned int cpu; 3334 3335 max_active = wq_clamp_max_active(max_active, wq->flags, wq->name); 3336 3337 spin_lock(&workqueue_lock); 3338 3339 wq->saved_max_active = max_active; 3340 3341 for_each_cwq_cpu(cpu, wq) { 3342 struct global_cwq *gcwq = get_gcwq(cpu); 3343 3344 spin_lock_irq(&gcwq->lock); 3345 3346 if (!(wq->flags & WQ_FREEZABLE) || 3347 !(gcwq->flags & GCWQ_FREEZING)) 3348 get_cwq(gcwq->cpu, wq)->max_active = max_active; 3349 3350 spin_unlock_irq(&gcwq->lock); 3351 } 3352 3353 spin_unlock(&workqueue_lock); 3354 } 3355 EXPORT_SYMBOL_GPL(workqueue_set_max_active); 3356 3357 /** 3358 * workqueue_congested - test whether a workqueue is congested 3359 * @cpu: CPU in question 3360 * @wq: target workqueue 3361 * 3362 * Test whether @wq's cpu workqueue for @cpu is congested. There is 3363 * no synchronization around this function and the test result is 3364 * unreliable and only useful as advisory hints or for debugging. 3365 * 3366 * RETURNS: 3367 * %true if congested, %false otherwise. 3368 */ 3369 bool workqueue_congested(unsigned int cpu, struct workqueue_struct *wq) 3370 { 3371 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3372 3373 return !list_empty(&cwq->delayed_works); 3374 } 3375 EXPORT_SYMBOL_GPL(workqueue_congested); 3376 3377 /** 3378 * work_cpu - return the last known associated cpu for @work 3379 * @work: the work of interest 3380 * 3381 * RETURNS: 3382 * CPU number if @work was ever queued. WORK_CPU_NONE otherwise. 3383 */ 3384 unsigned int work_cpu(struct work_struct *work) 3385 { 3386 struct global_cwq *gcwq = get_work_gcwq(work); 3387 3388 return gcwq ? gcwq->cpu : WORK_CPU_NONE; 3389 } 3390 EXPORT_SYMBOL_GPL(work_cpu); 3391 3392 /** 3393 * work_busy - test whether a work is currently pending or running 3394 * @work: the work to be tested 3395 * 3396 * Test whether @work is currently pending or running. There is no 3397 * synchronization around this function and the test result is 3398 * unreliable and only useful as advisory hints or for debugging. 3399 * Especially for reentrant wqs, the pending state might hide the 3400 * running state. 3401 * 3402 * RETURNS: 3403 * OR'd bitmask of WORK_BUSY_* bits. 3404 */ 3405 unsigned int work_busy(struct work_struct *work) 3406 { 3407 struct global_cwq *gcwq = get_work_gcwq(work); 3408 unsigned long flags; 3409 unsigned int ret = 0; 3410 3411 if (!gcwq) 3412 return false; 3413 3414 spin_lock_irqsave(&gcwq->lock, flags); 3415 3416 if (work_pending(work)) 3417 ret |= WORK_BUSY_PENDING; 3418 if (find_worker_executing_work(gcwq, work)) 3419 ret |= WORK_BUSY_RUNNING; 3420 3421 spin_unlock_irqrestore(&gcwq->lock, flags); 3422 3423 return ret; 3424 } 3425 EXPORT_SYMBOL_GPL(work_busy); 3426 3427 /* 3428 * CPU hotplug. 3429 * 3430 * There are two challenges in supporting CPU hotplug. Firstly, there 3431 * are a lot of assumptions on strong associations among work, cwq and 3432 * gcwq which make migrating pending and scheduled works very 3433 * difficult to implement without impacting hot paths. Secondly, 3434 * gcwqs serve mix of short, long and very long running works making 3435 * blocked draining impractical. 3436 * 3437 * This is solved by allowing a gcwq to be disassociated from the CPU 3438 * running as an unbound one and allowing it to be reattached later if the 3439 * cpu comes back online. 3440 */ 3441 3442 /* claim manager positions of all pools */ 3443 static void gcwq_claim_management_and_lock(struct global_cwq *gcwq) 3444 { 3445 struct worker_pool *pool; 3446 3447 for_each_worker_pool(pool, gcwq) 3448 mutex_lock_nested(&pool->manager_mutex, pool - gcwq->pools); 3449 spin_lock_irq(&gcwq->lock); 3450 } 3451 3452 /* release manager positions */ 3453 static void gcwq_release_management_and_unlock(struct global_cwq *gcwq) 3454 { 3455 struct worker_pool *pool; 3456 3457 spin_unlock_irq(&gcwq->lock); 3458 for_each_worker_pool(pool, gcwq) 3459 mutex_unlock(&pool->manager_mutex); 3460 } 3461 3462 static void gcwq_unbind_fn(struct work_struct *work) 3463 { 3464 struct global_cwq *gcwq = get_gcwq(smp_processor_id()); 3465 struct worker_pool *pool; 3466 struct worker *worker; 3467 struct hlist_node *pos; 3468 int i; 3469 3470 BUG_ON(gcwq->cpu != smp_processor_id()); 3471 3472 gcwq_claim_management_and_lock(gcwq); 3473 3474 /* 3475 * We've claimed all manager positions. Make all workers unbound 3476 * and set DISASSOCIATED. Before this, all workers except for the 3477 * ones which are still executing works from before the last CPU 3478 * down must be on the cpu. After this, they may become diasporas. 3479 */ 3480 for_each_worker_pool(pool, gcwq) 3481 list_for_each_entry(worker, &pool->idle_list, entry) 3482 worker->flags |= WORKER_UNBOUND; 3483 3484 for_each_busy_worker(worker, i, pos, gcwq) 3485 worker->flags |= WORKER_UNBOUND; 3486 3487 gcwq->flags |= GCWQ_DISASSOCIATED; 3488 3489 gcwq_release_management_and_unlock(gcwq); 3490 3491 /* 3492 * Call schedule() so that we cross rq->lock and thus can guarantee 3493 * sched callbacks see the %WORKER_UNBOUND flag. This is necessary 3494 * as scheduler callbacks may be invoked from other cpus. 3495 */ 3496 schedule(); 3497 3498 /* 3499 * Sched callbacks are disabled now. Zap nr_running. After this, 3500 * nr_running stays zero and need_more_worker() and keep_working() 3501 * are always true as long as the worklist is not empty. @gcwq now 3502 * behaves as unbound (in terms of concurrency management) gcwq 3503 * which is served by workers tied to the CPU. 3504 * 3505 * On return from this function, the current worker would trigger 3506 * unbound chain execution of pending work items if other workers 3507 * didn't already. 3508 */ 3509 for_each_worker_pool(pool, gcwq) 3510 atomic_set(get_pool_nr_running(pool), 0); 3511 } 3512 3513 /* 3514 * Workqueues should be brought up before normal priority CPU notifiers. 3515 * This will be registered high priority CPU notifier. 3516 */ 3517 static int __devinit workqueue_cpu_up_callback(struct notifier_block *nfb, 3518 unsigned long action, 3519 void *hcpu) 3520 { 3521 unsigned int cpu = (unsigned long)hcpu; 3522 struct global_cwq *gcwq = get_gcwq(cpu); 3523 struct worker_pool *pool; 3524 3525 switch (action & ~CPU_TASKS_FROZEN) { 3526 case CPU_UP_PREPARE: 3527 for_each_worker_pool(pool, gcwq) { 3528 struct worker *worker; 3529 3530 if (pool->nr_workers) 3531 continue; 3532 3533 worker = create_worker(pool); 3534 if (!worker) 3535 return NOTIFY_BAD; 3536 3537 spin_lock_irq(&gcwq->lock); 3538 start_worker(worker); 3539 spin_unlock_irq(&gcwq->lock); 3540 } 3541 break; 3542 3543 case CPU_DOWN_FAILED: 3544 case CPU_ONLINE: 3545 gcwq_claim_management_and_lock(gcwq); 3546 gcwq->flags &= ~GCWQ_DISASSOCIATED; 3547 rebind_workers(gcwq); 3548 gcwq_release_management_and_unlock(gcwq); 3549 break; 3550 } 3551 return NOTIFY_OK; 3552 } 3553 3554 /* 3555 * Workqueues should be brought down after normal priority CPU notifiers. 3556 * This will be registered as low priority CPU notifier. 3557 */ 3558 static int __devinit workqueue_cpu_down_callback(struct notifier_block *nfb, 3559 unsigned long action, 3560 void *hcpu) 3561 { 3562 unsigned int cpu = (unsigned long)hcpu; 3563 struct work_struct unbind_work; 3564 3565 switch (action & ~CPU_TASKS_FROZEN) { 3566 case CPU_DOWN_PREPARE: 3567 /* unbinding should happen on the local CPU */ 3568 INIT_WORK_ONSTACK(&unbind_work, gcwq_unbind_fn); 3569 schedule_work_on(cpu, &unbind_work); 3570 flush_work(&unbind_work); 3571 break; 3572 } 3573 return NOTIFY_OK; 3574 } 3575 3576 #ifdef CONFIG_SMP 3577 3578 struct work_for_cpu { 3579 struct work_struct work; 3580 long (*fn)(void *); 3581 void *arg; 3582 long ret; 3583 }; 3584 3585 static void work_for_cpu_fn(struct work_struct *work) 3586 { 3587 struct work_for_cpu *wfc = container_of(work, struct work_for_cpu, work); 3588 3589 wfc->ret = wfc->fn(wfc->arg); 3590 } 3591 3592 /** 3593 * work_on_cpu - run a function in user context on a particular cpu 3594 * @cpu: the cpu to run on 3595 * @fn: the function to run 3596 * @arg: the function arg 3597 * 3598 * This will return the value @fn returns. 3599 * It is up to the caller to ensure that the cpu doesn't go offline. 3600 * The caller must not hold any locks which would prevent @fn from completing. 3601 */ 3602 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg) 3603 { 3604 struct work_for_cpu wfc = { .fn = fn, .arg = arg }; 3605 3606 INIT_WORK_ONSTACK(&wfc.work, work_for_cpu_fn); 3607 schedule_work_on(cpu, &wfc.work); 3608 flush_work(&wfc.work); 3609 return wfc.ret; 3610 } 3611 EXPORT_SYMBOL_GPL(work_on_cpu); 3612 #endif /* CONFIG_SMP */ 3613 3614 #ifdef CONFIG_FREEZER 3615 3616 /** 3617 * freeze_workqueues_begin - begin freezing workqueues 3618 * 3619 * Start freezing workqueues. After this function returns, all freezable 3620 * workqueues will queue new works to their frozen_works list instead of 3621 * gcwq->worklist. 3622 * 3623 * CONTEXT: 3624 * Grabs and releases workqueue_lock and gcwq->lock's. 3625 */ 3626 void freeze_workqueues_begin(void) 3627 { 3628 unsigned int cpu; 3629 3630 spin_lock(&workqueue_lock); 3631 3632 BUG_ON(workqueue_freezing); 3633 workqueue_freezing = true; 3634 3635 for_each_gcwq_cpu(cpu) { 3636 struct global_cwq *gcwq = get_gcwq(cpu); 3637 struct workqueue_struct *wq; 3638 3639 spin_lock_irq(&gcwq->lock); 3640 3641 BUG_ON(gcwq->flags & GCWQ_FREEZING); 3642 gcwq->flags |= GCWQ_FREEZING; 3643 3644 list_for_each_entry(wq, &workqueues, list) { 3645 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3646 3647 if (cwq && wq->flags & WQ_FREEZABLE) 3648 cwq->max_active = 0; 3649 } 3650 3651 spin_unlock_irq(&gcwq->lock); 3652 } 3653 3654 spin_unlock(&workqueue_lock); 3655 } 3656 3657 /** 3658 * freeze_workqueues_busy - are freezable workqueues still busy? 3659 * 3660 * Check whether freezing is complete. This function must be called 3661 * between freeze_workqueues_begin() and thaw_workqueues(). 3662 * 3663 * CONTEXT: 3664 * Grabs and releases workqueue_lock. 3665 * 3666 * RETURNS: 3667 * %true if some freezable workqueues are still busy. %false if freezing 3668 * is complete. 3669 */ 3670 bool freeze_workqueues_busy(void) 3671 { 3672 unsigned int cpu; 3673 bool busy = false; 3674 3675 spin_lock(&workqueue_lock); 3676 3677 BUG_ON(!workqueue_freezing); 3678 3679 for_each_gcwq_cpu(cpu) { 3680 struct workqueue_struct *wq; 3681 /* 3682 * nr_active is monotonically decreasing. It's safe 3683 * to peek without lock. 3684 */ 3685 list_for_each_entry(wq, &workqueues, list) { 3686 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3687 3688 if (!cwq || !(wq->flags & WQ_FREEZABLE)) 3689 continue; 3690 3691 BUG_ON(cwq->nr_active < 0); 3692 if (cwq->nr_active) { 3693 busy = true; 3694 goto out_unlock; 3695 } 3696 } 3697 } 3698 out_unlock: 3699 spin_unlock(&workqueue_lock); 3700 return busy; 3701 } 3702 3703 /** 3704 * thaw_workqueues - thaw workqueues 3705 * 3706 * Thaw workqueues. Normal queueing is restored and all collected 3707 * frozen works are transferred to their respective gcwq worklists. 3708 * 3709 * CONTEXT: 3710 * Grabs and releases workqueue_lock and gcwq->lock's. 3711 */ 3712 void thaw_workqueues(void) 3713 { 3714 unsigned int cpu; 3715 3716 spin_lock(&workqueue_lock); 3717 3718 if (!workqueue_freezing) 3719 goto out_unlock; 3720 3721 for_each_gcwq_cpu(cpu) { 3722 struct global_cwq *gcwq = get_gcwq(cpu); 3723 struct worker_pool *pool; 3724 struct workqueue_struct *wq; 3725 3726 spin_lock_irq(&gcwq->lock); 3727 3728 BUG_ON(!(gcwq->flags & GCWQ_FREEZING)); 3729 gcwq->flags &= ~GCWQ_FREEZING; 3730 3731 list_for_each_entry(wq, &workqueues, list) { 3732 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3733 3734 if (!cwq || !(wq->flags & WQ_FREEZABLE)) 3735 continue; 3736 3737 /* restore max_active and repopulate worklist */ 3738 cwq->max_active = wq->saved_max_active; 3739 3740 while (!list_empty(&cwq->delayed_works) && 3741 cwq->nr_active < cwq->max_active) 3742 cwq_activate_first_delayed(cwq); 3743 } 3744 3745 for_each_worker_pool(pool, gcwq) 3746 wake_up_worker(pool); 3747 3748 spin_unlock_irq(&gcwq->lock); 3749 } 3750 3751 workqueue_freezing = false; 3752 out_unlock: 3753 spin_unlock(&workqueue_lock); 3754 } 3755 #endif /* CONFIG_FREEZER */ 3756 3757 static int __init init_workqueues(void) 3758 { 3759 unsigned int cpu; 3760 int i; 3761 3762 cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP); 3763 cpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN); 3764 3765 /* initialize gcwqs */ 3766 for_each_gcwq_cpu(cpu) { 3767 struct global_cwq *gcwq = get_gcwq(cpu); 3768 struct worker_pool *pool; 3769 3770 spin_lock_init(&gcwq->lock); 3771 gcwq->cpu = cpu; 3772 gcwq->flags |= GCWQ_DISASSOCIATED; 3773 3774 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) 3775 INIT_HLIST_HEAD(&gcwq->busy_hash[i]); 3776 3777 for_each_worker_pool(pool, gcwq) { 3778 pool->gcwq = gcwq; 3779 INIT_LIST_HEAD(&pool->worklist); 3780 INIT_LIST_HEAD(&pool->idle_list); 3781 3782 init_timer_deferrable(&pool->idle_timer); 3783 pool->idle_timer.function = idle_worker_timeout; 3784 pool->idle_timer.data = (unsigned long)pool; 3785 3786 setup_timer(&pool->mayday_timer, gcwq_mayday_timeout, 3787 (unsigned long)pool); 3788 3789 mutex_init(&pool->manager_mutex); 3790 ida_init(&pool->worker_ida); 3791 } 3792 3793 init_waitqueue_head(&gcwq->rebind_hold); 3794 } 3795 3796 /* create the initial worker */ 3797 for_each_online_gcwq_cpu(cpu) { 3798 struct global_cwq *gcwq = get_gcwq(cpu); 3799 struct worker_pool *pool; 3800 3801 if (cpu != WORK_CPU_UNBOUND) 3802 gcwq->flags &= ~GCWQ_DISASSOCIATED; 3803 3804 for_each_worker_pool(pool, gcwq) { 3805 struct worker *worker; 3806 3807 worker = create_worker(pool); 3808 BUG_ON(!worker); 3809 spin_lock_irq(&gcwq->lock); 3810 start_worker(worker); 3811 spin_unlock_irq(&gcwq->lock); 3812 } 3813 } 3814 3815 system_wq = alloc_workqueue("events", 0, 0); 3816 system_long_wq = alloc_workqueue("events_long", 0, 0); 3817 system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0); 3818 system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, 3819 WQ_UNBOUND_MAX_ACTIVE); 3820 system_freezable_wq = alloc_workqueue("events_freezable", 3821 WQ_FREEZABLE, 0); 3822 system_nrt_freezable_wq = alloc_workqueue("events_nrt_freezable", 3823 WQ_NON_REENTRANT | WQ_FREEZABLE, 0); 3824 BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq || 3825 !system_unbound_wq || !system_freezable_wq || 3826 !system_nrt_freezable_wq); 3827 return 0; 3828 } 3829 early_initcall(init_workqueues); 3830