1 /* 2 * kernel/workqueue.c - generic async execution with shared worker pool 3 * 4 * Copyright (C) 2002 Ingo Molnar 5 * 6 * Derived from the taskqueue/keventd code by: 7 * David Woodhouse <dwmw2@infradead.org> 8 * Andrew Morton 9 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 10 * Theodore Ts'o <tytso@mit.edu> 11 * 12 * Made to use alloc_percpu by Christoph Lameter. 13 * 14 * Copyright (C) 2010 SUSE Linux Products GmbH 15 * Copyright (C) 2010 Tejun Heo <tj@kernel.org> 16 * 17 * This is the generic async execution mechanism. Work items as are 18 * executed in process context. The worker pool is shared and 19 * automatically managed. There is one worker pool for each CPU and 20 * one extra for works which are better served by workers which are 21 * not bound to any specific CPU. 22 * 23 * Please read Documentation/workqueue.txt for details. 24 */ 25 26 #include <linux/export.h> 27 #include <linux/kernel.h> 28 #include <linux/sched.h> 29 #include <linux/init.h> 30 #include <linux/signal.h> 31 #include <linux/completion.h> 32 #include <linux/workqueue.h> 33 #include <linux/slab.h> 34 #include <linux/cpu.h> 35 #include <linux/notifier.h> 36 #include <linux/kthread.h> 37 #include <linux/hardirq.h> 38 #include <linux/mempolicy.h> 39 #include <linux/freezer.h> 40 #include <linux/kallsyms.h> 41 #include <linux/debug_locks.h> 42 #include <linux/lockdep.h> 43 #include <linux/idr.h> 44 45 #include "workqueue_sched.h" 46 47 enum { 48 /* 49 * global_cwq flags 50 * 51 * A bound gcwq is either associated or disassociated with its CPU. 52 * While associated (!DISASSOCIATED), all workers are bound to the 53 * CPU and none has %WORKER_UNBOUND set and concurrency management 54 * is in effect. 55 * 56 * While DISASSOCIATED, the cpu may be offline and all workers have 57 * %WORKER_UNBOUND set and concurrency management disabled, and may 58 * be executing on any CPU. The gcwq behaves as an unbound one. 59 * 60 * Note that DISASSOCIATED can be flipped only while holding 61 * managership of all pools on the gcwq to avoid changing binding 62 * state while create_worker() is in progress. 63 */ 64 GCWQ_DISASSOCIATED = 1 << 0, /* cpu can't serve workers */ 65 GCWQ_FREEZING = 1 << 1, /* freeze in progress */ 66 67 /* pool flags */ 68 POOL_MANAGE_WORKERS = 1 << 0, /* need to manage workers */ 69 POOL_MANAGING_WORKERS = 1 << 1, /* managing workers */ 70 71 /* worker flags */ 72 WORKER_STARTED = 1 << 0, /* started */ 73 WORKER_DIE = 1 << 1, /* die die die */ 74 WORKER_IDLE = 1 << 2, /* is idle */ 75 WORKER_PREP = 1 << 3, /* preparing to run works */ 76 WORKER_REBIND = 1 << 5, /* mom is home, come back */ 77 WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */ 78 WORKER_UNBOUND = 1 << 7, /* worker is unbound */ 79 80 WORKER_NOT_RUNNING = WORKER_PREP | WORKER_REBIND | WORKER_UNBOUND | 81 WORKER_CPU_INTENSIVE, 82 83 NR_WORKER_POOLS = 2, /* # worker pools per gcwq */ 84 85 BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */ 86 BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER, 87 BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1, 88 89 MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */ 90 IDLE_WORKER_TIMEOUT = 300 * HZ, /* keep idle ones for 5 mins */ 91 92 MAYDAY_INITIAL_TIMEOUT = HZ / 100 >= 2 ? HZ / 100 : 2, 93 /* call for help after 10ms 94 (min two ticks) */ 95 MAYDAY_INTERVAL = HZ / 10, /* and then every 100ms */ 96 CREATE_COOLDOWN = HZ, /* time to breath after fail */ 97 98 /* 99 * Rescue workers are used only on emergencies and shared by 100 * all cpus. Give -20. 101 */ 102 RESCUER_NICE_LEVEL = -20, 103 HIGHPRI_NICE_LEVEL = -20, 104 }; 105 106 /* 107 * Structure fields follow one of the following exclusion rules. 108 * 109 * I: Modifiable by initialization/destruction paths and read-only for 110 * everyone else. 111 * 112 * P: Preemption protected. Disabling preemption is enough and should 113 * only be modified and accessed from the local cpu. 114 * 115 * L: gcwq->lock protected. Access with gcwq->lock held. 116 * 117 * X: During normal operation, modification requires gcwq->lock and 118 * should be done only from local cpu. Either disabling preemption 119 * on local cpu or grabbing gcwq->lock is enough for read access. 120 * If GCWQ_DISASSOCIATED is set, it's identical to L. 121 * 122 * F: wq->flush_mutex protected. 123 * 124 * W: workqueue_lock protected. 125 */ 126 127 struct global_cwq; 128 struct worker_pool; 129 struct idle_rebind; 130 131 /* 132 * The poor guys doing the actual heavy lifting. All on-duty workers 133 * are either serving the manager role, on idle list or on busy hash. 134 */ 135 struct worker { 136 /* on idle list while idle, on busy hash table while busy */ 137 union { 138 struct list_head entry; /* L: while idle */ 139 struct hlist_node hentry; /* L: while busy */ 140 }; 141 142 struct work_struct *current_work; /* L: work being processed */ 143 struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq */ 144 struct list_head scheduled; /* L: scheduled works */ 145 struct task_struct *task; /* I: worker task */ 146 struct worker_pool *pool; /* I: the associated pool */ 147 /* 64 bytes boundary on 64bit, 32 on 32bit */ 148 unsigned long last_active; /* L: last active timestamp */ 149 unsigned int flags; /* X: flags */ 150 int id; /* I: worker id */ 151 152 /* for rebinding worker to CPU */ 153 struct idle_rebind *idle_rebind; /* L: for idle worker */ 154 struct work_struct rebind_work; /* L: for busy worker */ 155 }; 156 157 struct worker_pool { 158 struct global_cwq *gcwq; /* I: the owning gcwq */ 159 unsigned int flags; /* X: flags */ 160 161 struct list_head worklist; /* L: list of pending works */ 162 int nr_workers; /* L: total number of workers */ 163 int nr_idle; /* L: currently idle ones */ 164 165 struct list_head idle_list; /* X: list of idle workers */ 166 struct timer_list idle_timer; /* L: worker idle timeout */ 167 struct timer_list mayday_timer; /* L: SOS timer for workers */ 168 169 struct mutex manager_mutex; /* mutex manager should hold */ 170 struct ida worker_ida; /* L: for worker IDs */ 171 }; 172 173 /* 174 * Global per-cpu workqueue. There's one and only one for each cpu 175 * and all works are queued and processed here regardless of their 176 * target workqueues. 177 */ 178 struct global_cwq { 179 spinlock_t lock; /* the gcwq lock */ 180 unsigned int cpu; /* I: the associated cpu */ 181 unsigned int flags; /* L: GCWQ_* flags */ 182 183 /* workers are chained either in busy_hash or pool idle_list */ 184 struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE]; 185 /* L: hash of busy workers */ 186 187 struct worker_pool pools[2]; /* normal and highpri pools */ 188 189 wait_queue_head_t rebind_hold; /* rebind hold wait */ 190 } ____cacheline_aligned_in_smp; 191 192 /* 193 * The per-CPU workqueue. The lower WORK_STRUCT_FLAG_BITS of 194 * work_struct->data are used for flags and thus cwqs need to be 195 * aligned at two's power of the number of flag bits. 196 */ 197 struct cpu_workqueue_struct { 198 struct worker_pool *pool; /* I: the associated pool */ 199 struct workqueue_struct *wq; /* I: the owning workqueue */ 200 int work_color; /* L: current color */ 201 int flush_color; /* L: flushing color */ 202 int nr_in_flight[WORK_NR_COLORS]; 203 /* L: nr of in_flight works */ 204 int nr_active; /* L: nr of active works */ 205 int max_active; /* L: max active works */ 206 struct list_head delayed_works; /* L: delayed works */ 207 }; 208 209 /* 210 * Structure used to wait for workqueue flush. 211 */ 212 struct wq_flusher { 213 struct list_head list; /* F: list of flushers */ 214 int flush_color; /* F: flush color waiting for */ 215 struct completion done; /* flush completion */ 216 }; 217 218 /* 219 * All cpumasks are assumed to be always set on UP and thus can't be 220 * used to determine whether there's something to be done. 221 */ 222 #ifdef CONFIG_SMP 223 typedef cpumask_var_t mayday_mask_t; 224 #define mayday_test_and_set_cpu(cpu, mask) \ 225 cpumask_test_and_set_cpu((cpu), (mask)) 226 #define mayday_clear_cpu(cpu, mask) cpumask_clear_cpu((cpu), (mask)) 227 #define for_each_mayday_cpu(cpu, mask) for_each_cpu((cpu), (mask)) 228 #define alloc_mayday_mask(maskp, gfp) zalloc_cpumask_var((maskp), (gfp)) 229 #define free_mayday_mask(mask) free_cpumask_var((mask)) 230 #else 231 typedef unsigned long mayday_mask_t; 232 #define mayday_test_and_set_cpu(cpu, mask) test_and_set_bit(0, &(mask)) 233 #define mayday_clear_cpu(cpu, mask) clear_bit(0, &(mask)) 234 #define for_each_mayday_cpu(cpu, mask) if ((cpu) = 0, (mask)) 235 #define alloc_mayday_mask(maskp, gfp) true 236 #define free_mayday_mask(mask) do { } while (0) 237 #endif 238 239 /* 240 * The externally visible workqueue abstraction is an array of 241 * per-CPU workqueues: 242 */ 243 struct workqueue_struct { 244 unsigned int flags; /* W: WQ_* flags */ 245 union { 246 struct cpu_workqueue_struct __percpu *pcpu; 247 struct cpu_workqueue_struct *single; 248 unsigned long v; 249 } cpu_wq; /* I: cwq's */ 250 struct list_head list; /* W: list of all workqueues */ 251 252 struct mutex flush_mutex; /* protects wq flushing */ 253 int work_color; /* F: current work color */ 254 int flush_color; /* F: current flush color */ 255 atomic_t nr_cwqs_to_flush; /* flush in progress */ 256 struct wq_flusher *first_flusher; /* F: first flusher */ 257 struct list_head flusher_queue; /* F: flush waiters */ 258 struct list_head flusher_overflow; /* F: flush overflow list */ 259 260 mayday_mask_t mayday_mask; /* cpus requesting rescue */ 261 struct worker *rescuer; /* I: rescue worker */ 262 263 int nr_drainers; /* W: drain in progress */ 264 int saved_max_active; /* W: saved cwq max_active */ 265 #ifdef CONFIG_LOCKDEP 266 struct lockdep_map lockdep_map; 267 #endif 268 char name[]; /* I: workqueue name */ 269 }; 270 271 struct workqueue_struct *system_wq __read_mostly; 272 struct workqueue_struct *system_long_wq __read_mostly; 273 struct workqueue_struct *system_nrt_wq __read_mostly; 274 struct workqueue_struct *system_unbound_wq __read_mostly; 275 struct workqueue_struct *system_freezable_wq __read_mostly; 276 struct workqueue_struct *system_nrt_freezable_wq __read_mostly; 277 EXPORT_SYMBOL_GPL(system_wq); 278 EXPORT_SYMBOL_GPL(system_long_wq); 279 EXPORT_SYMBOL_GPL(system_nrt_wq); 280 EXPORT_SYMBOL_GPL(system_unbound_wq); 281 EXPORT_SYMBOL_GPL(system_freezable_wq); 282 EXPORT_SYMBOL_GPL(system_nrt_freezable_wq); 283 284 #define CREATE_TRACE_POINTS 285 #include <trace/events/workqueue.h> 286 287 #define for_each_worker_pool(pool, gcwq) \ 288 for ((pool) = &(gcwq)->pools[0]; \ 289 (pool) < &(gcwq)->pools[NR_WORKER_POOLS]; (pool)++) 290 291 #define for_each_busy_worker(worker, i, pos, gcwq) \ 292 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \ 293 hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry) 294 295 static inline int __next_gcwq_cpu(int cpu, const struct cpumask *mask, 296 unsigned int sw) 297 { 298 if (cpu < nr_cpu_ids) { 299 if (sw & 1) { 300 cpu = cpumask_next(cpu, mask); 301 if (cpu < nr_cpu_ids) 302 return cpu; 303 } 304 if (sw & 2) 305 return WORK_CPU_UNBOUND; 306 } 307 return WORK_CPU_NONE; 308 } 309 310 static inline int __next_wq_cpu(int cpu, const struct cpumask *mask, 311 struct workqueue_struct *wq) 312 { 313 return __next_gcwq_cpu(cpu, mask, !(wq->flags & WQ_UNBOUND) ? 1 : 2); 314 } 315 316 /* 317 * CPU iterators 318 * 319 * An extra gcwq is defined for an invalid cpu number 320 * (WORK_CPU_UNBOUND) to host workqueues which are not bound to any 321 * specific CPU. The following iterators are similar to 322 * for_each_*_cpu() iterators but also considers the unbound gcwq. 323 * 324 * for_each_gcwq_cpu() : possible CPUs + WORK_CPU_UNBOUND 325 * for_each_online_gcwq_cpu() : online CPUs + WORK_CPU_UNBOUND 326 * for_each_cwq_cpu() : possible CPUs for bound workqueues, 327 * WORK_CPU_UNBOUND for unbound workqueues 328 */ 329 #define for_each_gcwq_cpu(cpu) \ 330 for ((cpu) = __next_gcwq_cpu(-1, cpu_possible_mask, 3); \ 331 (cpu) < WORK_CPU_NONE; \ 332 (cpu) = __next_gcwq_cpu((cpu), cpu_possible_mask, 3)) 333 334 #define for_each_online_gcwq_cpu(cpu) \ 335 for ((cpu) = __next_gcwq_cpu(-1, cpu_online_mask, 3); \ 336 (cpu) < WORK_CPU_NONE; \ 337 (cpu) = __next_gcwq_cpu((cpu), cpu_online_mask, 3)) 338 339 #define for_each_cwq_cpu(cpu, wq) \ 340 for ((cpu) = __next_wq_cpu(-1, cpu_possible_mask, (wq)); \ 341 (cpu) < WORK_CPU_NONE; \ 342 (cpu) = __next_wq_cpu((cpu), cpu_possible_mask, (wq))) 343 344 #ifdef CONFIG_DEBUG_OBJECTS_WORK 345 346 static struct debug_obj_descr work_debug_descr; 347 348 static void *work_debug_hint(void *addr) 349 { 350 return ((struct work_struct *) addr)->func; 351 } 352 353 /* 354 * fixup_init is called when: 355 * - an active object is initialized 356 */ 357 static int work_fixup_init(void *addr, enum debug_obj_state state) 358 { 359 struct work_struct *work = addr; 360 361 switch (state) { 362 case ODEBUG_STATE_ACTIVE: 363 cancel_work_sync(work); 364 debug_object_init(work, &work_debug_descr); 365 return 1; 366 default: 367 return 0; 368 } 369 } 370 371 /* 372 * fixup_activate is called when: 373 * - an active object is activated 374 * - an unknown object is activated (might be a statically initialized object) 375 */ 376 static int work_fixup_activate(void *addr, enum debug_obj_state state) 377 { 378 struct work_struct *work = addr; 379 380 switch (state) { 381 382 case ODEBUG_STATE_NOTAVAILABLE: 383 /* 384 * This is not really a fixup. The work struct was 385 * statically initialized. We just make sure that it 386 * is tracked in the object tracker. 387 */ 388 if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) { 389 debug_object_init(work, &work_debug_descr); 390 debug_object_activate(work, &work_debug_descr); 391 return 0; 392 } 393 WARN_ON_ONCE(1); 394 return 0; 395 396 case ODEBUG_STATE_ACTIVE: 397 WARN_ON(1); 398 399 default: 400 return 0; 401 } 402 } 403 404 /* 405 * fixup_free is called when: 406 * - an active object is freed 407 */ 408 static int work_fixup_free(void *addr, enum debug_obj_state state) 409 { 410 struct work_struct *work = addr; 411 412 switch (state) { 413 case ODEBUG_STATE_ACTIVE: 414 cancel_work_sync(work); 415 debug_object_free(work, &work_debug_descr); 416 return 1; 417 default: 418 return 0; 419 } 420 } 421 422 static struct debug_obj_descr work_debug_descr = { 423 .name = "work_struct", 424 .debug_hint = work_debug_hint, 425 .fixup_init = work_fixup_init, 426 .fixup_activate = work_fixup_activate, 427 .fixup_free = work_fixup_free, 428 }; 429 430 static inline void debug_work_activate(struct work_struct *work) 431 { 432 debug_object_activate(work, &work_debug_descr); 433 } 434 435 static inline void debug_work_deactivate(struct work_struct *work) 436 { 437 debug_object_deactivate(work, &work_debug_descr); 438 } 439 440 void __init_work(struct work_struct *work, int onstack) 441 { 442 if (onstack) 443 debug_object_init_on_stack(work, &work_debug_descr); 444 else 445 debug_object_init(work, &work_debug_descr); 446 } 447 EXPORT_SYMBOL_GPL(__init_work); 448 449 void destroy_work_on_stack(struct work_struct *work) 450 { 451 debug_object_free(work, &work_debug_descr); 452 } 453 EXPORT_SYMBOL_GPL(destroy_work_on_stack); 454 455 #else 456 static inline void debug_work_activate(struct work_struct *work) { } 457 static inline void debug_work_deactivate(struct work_struct *work) { } 458 #endif 459 460 /* Serializes the accesses to the list of workqueues. */ 461 static DEFINE_SPINLOCK(workqueue_lock); 462 static LIST_HEAD(workqueues); 463 static bool workqueue_freezing; /* W: have wqs started freezing? */ 464 465 /* 466 * The almighty global cpu workqueues. nr_running is the only field 467 * which is expected to be used frequently by other cpus via 468 * try_to_wake_up(). Put it in a separate cacheline. 469 */ 470 static DEFINE_PER_CPU(struct global_cwq, global_cwq); 471 static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, pool_nr_running[NR_WORKER_POOLS]); 472 473 /* 474 * Global cpu workqueue and nr_running counter for unbound gcwq. The 475 * gcwq is always online, has GCWQ_DISASSOCIATED set, and all its 476 * workers have WORKER_UNBOUND set. 477 */ 478 static struct global_cwq unbound_global_cwq; 479 static atomic_t unbound_pool_nr_running[NR_WORKER_POOLS] = { 480 [0 ... NR_WORKER_POOLS - 1] = ATOMIC_INIT(0), /* always 0 */ 481 }; 482 483 static int worker_thread(void *__worker); 484 485 static int worker_pool_pri(struct worker_pool *pool) 486 { 487 return pool - pool->gcwq->pools; 488 } 489 490 static struct global_cwq *get_gcwq(unsigned int cpu) 491 { 492 if (cpu != WORK_CPU_UNBOUND) 493 return &per_cpu(global_cwq, cpu); 494 else 495 return &unbound_global_cwq; 496 } 497 498 static atomic_t *get_pool_nr_running(struct worker_pool *pool) 499 { 500 int cpu = pool->gcwq->cpu; 501 int idx = worker_pool_pri(pool); 502 503 if (cpu != WORK_CPU_UNBOUND) 504 return &per_cpu(pool_nr_running, cpu)[idx]; 505 else 506 return &unbound_pool_nr_running[idx]; 507 } 508 509 static struct cpu_workqueue_struct *get_cwq(unsigned int cpu, 510 struct workqueue_struct *wq) 511 { 512 if (!(wq->flags & WQ_UNBOUND)) { 513 if (likely(cpu < nr_cpu_ids)) 514 return per_cpu_ptr(wq->cpu_wq.pcpu, cpu); 515 } else if (likely(cpu == WORK_CPU_UNBOUND)) 516 return wq->cpu_wq.single; 517 return NULL; 518 } 519 520 static unsigned int work_color_to_flags(int color) 521 { 522 return color << WORK_STRUCT_COLOR_SHIFT; 523 } 524 525 static int get_work_color(struct work_struct *work) 526 { 527 return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) & 528 ((1 << WORK_STRUCT_COLOR_BITS) - 1); 529 } 530 531 static int work_next_color(int color) 532 { 533 return (color + 1) % WORK_NR_COLORS; 534 } 535 536 /* 537 * A work's data points to the cwq with WORK_STRUCT_CWQ set while the 538 * work is on queue. Once execution starts, WORK_STRUCT_CWQ is 539 * cleared and the work data contains the cpu number it was last on. 540 * 541 * set_work_{cwq|cpu}() and clear_work_data() can be used to set the 542 * cwq, cpu or clear work->data. These functions should only be 543 * called while the work is owned - ie. while the PENDING bit is set. 544 * 545 * get_work_[g]cwq() can be used to obtain the gcwq or cwq 546 * corresponding to a work. gcwq is available once the work has been 547 * queued anywhere after initialization. cwq is available only from 548 * queueing until execution starts. 549 */ 550 static inline void set_work_data(struct work_struct *work, unsigned long data, 551 unsigned long flags) 552 { 553 BUG_ON(!work_pending(work)); 554 atomic_long_set(&work->data, data | flags | work_static(work)); 555 } 556 557 static void set_work_cwq(struct work_struct *work, 558 struct cpu_workqueue_struct *cwq, 559 unsigned long extra_flags) 560 { 561 set_work_data(work, (unsigned long)cwq, 562 WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags); 563 } 564 565 static void set_work_cpu(struct work_struct *work, unsigned int cpu) 566 { 567 set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING); 568 } 569 570 static void clear_work_data(struct work_struct *work) 571 { 572 set_work_data(work, WORK_STRUCT_NO_CPU, 0); 573 } 574 575 static struct cpu_workqueue_struct *get_work_cwq(struct work_struct *work) 576 { 577 unsigned long data = atomic_long_read(&work->data); 578 579 if (data & WORK_STRUCT_CWQ) 580 return (void *)(data & WORK_STRUCT_WQ_DATA_MASK); 581 else 582 return NULL; 583 } 584 585 static struct global_cwq *get_work_gcwq(struct work_struct *work) 586 { 587 unsigned long data = atomic_long_read(&work->data); 588 unsigned int cpu; 589 590 if (data & WORK_STRUCT_CWQ) 591 return ((struct cpu_workqueue_struct *) 592 (data & WORK_STRUCT_WQ_DATA_MASK))->pool->gcwq; 593 594 cpu = data >> WORK_STRUCT_FLAG_BITS; 595 if (cpu == WORK_CPU_NONE) 596 return NULL; 597 598 BUG_ON(cpu >= nr_cpu_ids && cpu != WORK_CPU_UNBOUND); 599 return get_gcwq(cpu); 600 } 601 602 /* 603 * Policy functions. These define the policies on how the global worker 604 * pools are managed. Unless noted otherwise, these functions assume that 605 * they're being called with gcwq->lock held. 606 */ 607 608 static bool __need_more_worker(struct worker_pool *pool) 609 { 610 return !atomic_read(get_pool_nr_running(pool)); 611 } 612 613 /* 614 * Need to wake up a worker? Called from anything but currently 615 * running workers. 616 * 617 * Note that, because unbound workers never contribute to nr_running, this 618 * function will always return %true for unbound gcwq as long as the 619 * worklist isn't empty. 620 */ 621 static bool need_more_worker(struct worker_pool *pool) 622 { 623 return !list_empty(&pool->worklist) && __need_more_worker(pool); 624 } 625 626 /* Can I start working? Called from busy but !running workers. */ 627 static bool may_start_working(struct worker_pool *pool) 628 { 629 return pool->nr_idle; 630 } 631 632 /* Do I need to keep working? Called from currently running workers. */ 633 static bool keep_working(struct worker_pool *pool) 634 { 635 atomic_t *nr_running = get_pool_nr_running(pool); 636 637 return !list_empty(&pool->worklist) && atomic_read(nr_running) <= 1; 638 } 639 640 /* Do we need a new worker? Called from manager. */ 641 static bool need_to_create_worker(struct worker_pool *pool) 642 { 643 return need_more_worker(pool) && !may_start_working(pool); 644 } 645 646 /* Do I need to be the manager? */ 647 static bool need_to_manage_workers(struct worker_pool *pool) 648 { 649 return need_to_create_worker(pool) || 650 (pool->flags & POOL_MANAGE_WORKERS); 651 } 652 653 /* Do we have too many workers and should some go away? */ 654 static bool too_many_workers(struct worker_pool *pool) 655 { 656 bool managing = pool->flags & POOL_MANAGING_WORKERS; 657 int nr_idle = pool->nr_idle + managing; /* manager is considered idle */ 658 int nr_busy = pool->nr_workers - nr_idle; 659 660 return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy; 661 } 662 663 /* 664 * Wake up functions. 665 */ 666 667 /* Return the first worker. Safe with preemption disabled */ 668 static struct worker *first_worker(struct worker_pool *pool) 669 { 670 if (unlikely(list_empty(&pool->idle_list))) 671 return NULL; 672 673 return list_first_entry(&pool->idle_list, struct worker, entry); 674 } 675 676 /** 677 * wake_up_worker - wake up an idle worker 678 * @pool: worker pool to wake worker from 679 * 680 * Wake up the first idle worker of @pool. 681 * 682 * CONTEXT: 683 * spin_lock_irq(gcwq->lock). 684 */ 685 static void wake_up_worker(struct worker_pool *pool) 686 { 687 struct worker *worker = first_worker(pool); 688 689 if (likely(worker)) 690 wake_up_process(worker->task); 691 } 692 693 /** 694 * wq_worker_waking_up - a worker is waking up 695 * @task: task waking up 696 * @cpu: CPU @task is waking up to 697 * 698 * This function is called during try_to_wake_up() when a worker is 699 * being awoken. 700 * 701 * CONTEXT: 702 * spin_lock_irq(rq->lock) 703 */ 704 void wq_worker_waking_up(struct task_struct *task, unsigned int cpu) 705 { 706 struct worker *worker = kthread_data(task); 707 708 if (!(worker->flags & WORKER_NOT_RUNNING)) 709 atomic_inc(get_pool_nr_running(worker->pool)); 710 } 711 712 /** 713 * wq_worker_sleeping - a worker is going to sleep 714 * @task: task going to sleep 715 * @cpu: CPU in question, must be the current CPU number 716 * 717 * This function is called during schedule() when a busy worker is 718 * going to sleep. Worker on the same cpu can be woken up by 719 * returning pointer to its task. 720 * 721 * CONTEXT: 722 * spin_lock_irq(rq->lock) 723 * 724 * RETURNS: 725 * Worker task on @cpu to wake up, %NULL if none. 726 */ 727 struct task_struct *wq_worker_sleeping(struct task_struct *task, 728 unsigned int cpu) 729 { 730 struct worker *worker = kthread_data(task), *to_wakeup = NULL; 731 struct worker_pool *pool = worker->pool; 732 atomic_t *nr_running = get_pool_nr_running(pool); 733 734 if (worker->flags & WORKER_NOT_RUNNING) 735 return NULL; 736 737 /* this can only happen on the local cpu */ 738 BUG_ON(cpu != raw_smp_processor_id()); 739 740 /* 741 * The counterpart of the following dec_and_test, implied mb, 742 * worklist not empty test sequence is in insert_work(). 743 * Please read comment there. 744 * 745 * NOT_RUNNING is clear. This means that we're bound to and 746 * running on the local cpu w/ rq lock held and preemption 747 * disabled, which in turn means that none else could be 748 * manipulating idle_list, so dereferencing idle_list without gcwq 749 * lock is safe. 750 */ 751 if (atomic_dec_and_test(nr_running) && !list_empty(&pool->worklist)) 752 to_wakeup = first_worker(pool); 753 return to_wakeup ? to_wakeup->task : NULL; 754 } 755 756 /** 757 * worker_set_flags - set worker flags and adjust nr_running accordingly 758 * @worker: self 759 * @flags: flags to set 760 * @wakeup: wakeup an idle worker if necessary 761 * 762 * Set @flags in @worker->flags and adjust nr_running accordingly. If 763 * nr_running becomes zero and @wakeup is %true, an idle worker is 764 * woken up. 765 * 766 * CONTEXT: 767 * spin_lock_irq(gcwq->lock) 768 */ 769 static inline void worker_set_flags(struct worker *worker, unsigned int flags, 770 bool wakeup) 771 { 772 struct worker_pool *pool = worker->pool; 773 774 WARN_ON_ONCE(worker->task != current); 775 776 /* 777 * If transitioning into NOT_RUNNING, adjust nr_running and 778 * wake up an idle worker as necessary if requested by 779 * @wakeup. 780 */ 781 if ((flags & WORKER_NOT_RUNNING) && 782 !(worker->flags & WORKER_NOT_RUNNING)) { 783 atomic_t *nr_running = get_pool_nr_running(pool); 784 785 if (wakeup) { 786 if (atomic_dec_and_test(nr_running) && 787 !list_empty(&pool->worklist)) 788 wake_up_worker(pool); 789 } else 790 atomic_dec(nr_running); 791 } 792 793 worker->flags |= flags; 794 } 795 796 /** 797 * worker_clr_flags - clear worker flags and adjust nr_running accordingly 798 * @worker: self 799 * @flags: flags to clear 800 * 801 * Clear @flags in @worker->flags and adjust nr_running accordingly. 802 * 803 * CONTEXT: 804 * spin_lock_irq(gcwq->lock) 805 */ 806 static inline void worker_clr_flags(struct worker *worker, unsigned int flags) 807 { 808 struct worker_pool *pool = worker->pool; 809 unsigned int oflags = worker->flags; 810 811 WARN_ON_ONCE(worker->task != current); 812 813 worker->flags &= ~flags; 814 815 /* 816 * If transitioning out of NOT_RUNNING, increment nr_running. Note 817 * that the nested NOT_RUNNING is not a noop. NOT_RUNNING is mask 818 * of multiple flags, not a single flag. 819 */ 820 if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING)) 821 if (!(worker->flags & WORKER_NOT_RUNNING)) 822 atomic_inc(get_pool_nr_running(pool)); 823 } 824 825 /** 826 * busy_worker_head - return the busy hash head for a work 827 * @gcwq: gcwq of interest 828 * @work: work to be hashed 829 * 830 * Return hash head of @gcwq for @work. 831 * 832 * CONTEXT: 833 * spin_lock_irq(gcwq->lock). 834 * 835 * RETURNS: 836 * Pointer to the hash head. 837 */ 838 static struct hlist_head *busy_worker_head(struct global_cwq *gcwq, 839 struct work_struct *work) 840 { 841 const int base_shift = ilog2(sizeof(struct work_struct)); 842 unsigned long v = (unsigned long)work; 843 844 /* simple shift and fold hash, do we need something better? */ 845 v >>= base_shift; 846 v += v >> BUSY_WORKER_HASH_ORDER; 847 v &= BUSY_WORKER_HASH_MASK; 848 849 return &gcwq->busy_hash[v]; 850 } 851 852 /** 853 * __find_worker_executing_work - find worker which is executing a work 854 * @gcwq: gcwq of interest 855 * @bwh: hash head as returned by busy_worker_head() 856 * @work: work to find worker for 857 * 858 * Find a worker which is executing @work on @gcwq. @bwh should be 859 * the hash head obtained by calling busy_worker_head() with the same 860 * work. 861 * 862 * CONTEXT: 863 * spin_lock_irq(gcwq->lock). 864 * 865 * RETURNS: 866 * Pointer to worker which is executing @work if found, NULL 867 * otherwise. 868 */ 869 static struct worker *__find_worker_executing_work(struct global_cwq *gcwq, 870 struct hlist_head *bwh, 871 struct work_struct *work) 872 { 873 struct worker *worker; 874 struct hlist_node *tmp; 875 876 hlist_for_each_entry(worker, tmp, bwh, hentry) 877 if (worker->current_work == work) 878 return worker; 879 return NULL; 880 } 881 882 /** 883 * find_worker_executing_work - find worker which is executing a work 884 * @gcwq: gcwq of interest 885 * @work: work to find worker for 886 * 887 * Find a worker which is executing @work on @gcwq. This function is 888 * identical to __find_worker_executing_work() except that this 889 * function calculates @bwh itself. 890 * 891 * CONTEXT: 892 * spin_lock_irq(gcwq->lock). 893 * 894 * RETURNS: 895 * Pointer to worker which is executing @work if found, NULL 896 * otherwise. 897 */ 898 static struct worker *find_worker_executing_work(struct global_cwq *gcwq, 899 struct work_struct *work) 900 { 901 return __find_worker_executing_work(gcwq, busy_worker_head(gcwq, work), 902 work); 903 } 904 905 /** 906 * insert_work - insert a work into gcwq 907 * @cwq: cwq @work belongs to 908 * @work: work to insert 909 * @head: insertion point 910 * @extra_flags: extra WORK_STRUCT_* flags to set 911 * 912 * Insert @work which belongs to @cwq into @gcwq after @head. 913 * @extra_flags is or'd to work_struct flags. 914 * 915 * CONTEXT: 916 * spin_lock_irq(gcwq->lock). 917 */ 918 static void insert_work(struct cpu_workqueue_struct *cwq, 919 struct work_struct *work, struct list_head *head, 920 unsigned int extra_flags) 921 { 922 struct worker_pool *pool = cwq->pool; 923 924 /* we own @work, set data and link */ 925 set_work_cwq(work, cwq, extra_flags); 926 927 /* 928 * Ensure that we get the right work->data if we see the 929 * result of list_add() below, see try_to_grab_pending(). 930 */ 931 smp_wmb(); 932 933 list_add_tail(&work->entry, head); 934 935 /* 936 * Ensure either worker_sched_deactivated() sees the above 937 * list_add_tail() or we see zero nr_running to avoid workers 938 * lying around lazily while there are works to be processed. 939 */ 940 smp_mb(); 941 942 if (__need_more_worker(pool)) 943 wake_up_worker(pool); 944 } 945 946 /* 947 * Test whether @work is being queued from another work executing on the 948 * same workqueue. This is rather expensive and should only be used from 949 * cold paths. 950 */ 951 static bool is_chained_work(struct workqueue_struct *wq) 952 { 953 unsigned long flags; 954 unsigned int cpu; 955 956 for_each_gcwq_cpu(cpu) { 957 struct global_cwq *gcwq = get_gcwq(cpu); 958 struct worker *worker; 959 struct hlist_node *pos; 960 int i; 961 962 spin_lock_irqsave(&gcwq->lock, flags); 963 for_each_busy_worker(worker, i, pos, gcwq) { 964 if (worker->task != current) 965 continue; 966 spin_unlock_irqrestore(&gcwq->lock, flags); 967 /* 968 * I'm @worker, no locking necessary. See if @work 969 * is headed to the same workqueue. 970 */ 971 return worker->current_cwq->wq == wq; 972 } 973 spin_unlock_irqrestore(&gcwq->lock, flags); 974 } 975 return false; 976 } 977 978 static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, 979 struct work_struct *work) 980 { 981 struct global_cwq *gcwq; 982 struct cpu_workqueue_struct *cwq; 983 struct list_head *worklist; 984 unsigned int work_flags; 985 unsigned long flags; 986 987 debug_work_activate(work); 988 989 /* if dying, only works from the same workqueue are allowed */ 990 if (unlikely(wq->flags & WQ_DRAINING) && 991 WARN_ON_ONCE(!is_chained_work(wq))) 992 return; 993 994 /* determine gcwq to use */ 995 if (!(wq->flags & WQ_UNBOUND)) { 996 struct global_cwq *last_gcwq; 997 998 if (unlikely(cpu == WORK_CPU_UNBOUND)) 999 cpu = raw_smp_processor_id(); 1000 1001 /* 1002 * It's multi cpu. If @wq is non-reentrant and @work 1003 * was previously on a different cpu, it might still 1004 * be running there, in which case the work needs to 1005 * be queued on that cpu to guarantee non-reentrance. 1006 */ 1007 gcwq = get_gcwq(cpu); 1008 if (wq->flags & WQ_NON_REENTRANT && 1009 (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) { 1010 struct worker *worker; 1011 1012 spin_lock_irqsave(&last_gcwq->lock, flags); 1013 1014 worker = find_worker_executing_work(last_gcwq, work); 1015 1016 if (worker && worker->current_cwq->wq == wq) 1017 gcwq = last_gcwq; 1018 else { 1019 /* meh... not running there, queue here */ 1020 spin_unlock_irqrestore(&last_gcwq->lock, flags); 1021 spin_lock_irqsave(&gcwq->lock, flags); 1022 } 1023 } else 1024 spin_lock_irqsave(&gcwq->lock, flags); 1025 } else { 1026 gcwq = get_gcwq(WORK_CPU_UNBOUND); 1027 spin_lock_irqsave(&gcwq->lock, flags); 1028 } 1029 1030 /* gcwq determined, get cwq and queue */ 1031 cwq = get_cwq(gcwq->cpu, wq); 1032 trace_workqueue_queue_work(cpu, cwq, work); 1033 1034 if (WARN_ON(!list_empty(&work->entry))) { 1035 spin_unlock_irqrestore(&gcwq->lock, flags); 1036 return; 1037 } 1038 1039 cwq->nr_in_flight[cwq->work_color]++; 1040 work_flags = work_color_to_flags(cwq->work_color); 1041 1042 if (likely(cwq->nr_active < cwq->max_active)) { 1043 trace_workqueue_activate_work(work); 1044 cwq->nr_active++; 1045 worklist = &cwq->pool->worklist; 1046 } else { 1047 work_flags |= WORK_STRUCT_DELAYED; 1048 worklist = &cwq->delayed_works; 1049 } 1050 1051 insert_work(cwq, work, worklist, work_flags); 1052 1053 spin_unlock_irqrestore(&gcwq->lock, flags); 1054 } 1055 1056 /** 1057 * queue_work - queue work on a workqueue 1058 * @wq: workqueue to use 1059 * @work: work to queue 1060 * 1061 * Returns 0 if @work was already on a queue, non-zero otherwise. 1062 * 1063 * We queue the work to the CPU on which it was submitted, but if the CPU dies 1064 * it can be processed by another CPU. 1065 */ 1066 int queue_work(struct workqueue_struct *wq, struct work_struct *work) 1067 { 1068 int ret; 1069 1070 ret = queue_work_on(get_cpu(), wq, work); 1071 put_cpu(); 1072 1073 return ret; 1074 } 1075 EXPORT_SYMBOL_GPL(queue_work); 1076 1077 /** 1078 * queue_work_on - queue work on specific cpu 1079 * @cpu: CPU number to execute work on 1080 * @wq: workqueue to use 1081 * @work: work to queue 1082 * 1083 * Returns 0 if @work was already on a queue, non-zero otherwise. 1084 * 1085 * We queue the work to a specific CPU, the caller must ensure it 1086 * can't go away. 1087 */ 1088 int 1089 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) 1090 { 1091 int ret = 0; 1092 1093 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { 1094 __queue_work(cpu, wq, work); 1095 ret = 1; 1096 } 1097 return ret; 1098 } 1099 EXPORT_SYMBOL_GPL(queue_work_on); 1100 1101 static void delayed_work_timer_fn(unsigned long __data) 1102 { 1103 struct delayed_work *dwork = (struct delayed_work *)__data; 1104 struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work); 1105 1106 __queue_work(smp_processor_id(), cwq->wq, &dwork->work); 1107 } 1108 1109 /** 1110 * queue_delayed_work - queue work on a workqueue after delay 1111 * @wq: workqueue to use 1112 * @dwork: delayable work to queue 1113 * @delay: number of jiffies to wait before queueing 1114 * 1115 * Returns 0 if @work was already on a queue, non-zero otherwise. 1116 */ 1117 int queue_delayed_work(struct workqueue_struct *wq, 1118 struct delayed_work *dwork, unsigned long delay) 1119 { 1120 if (delay == 0) 1121 return queue_work(wq, &dwork->work); 1122 1123 return queue_delayed_work_on(-1, wq, dwork, delay); 1124 } 1125 EXPORT_SYMBOL_GPL(queue_delayed_work); 1126 1127 /** 1128 * queue_delayed_work_on - queue work on specific CPU after delay 1129 * @cpu: CPU number to execute work on 1130 * @wq: workqueue to use 1131 * @dwork: work to queue 1132 * @delay: number of jiffies to wait before queueing 1133 * 1134 * Returns 0 if @work was already on a queue, non-zero otherwise. 1135 */ 1136 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 1137 struct delayed_work *dwork, unsigned long delay) 1138 { 1139 int ret = 0; 1140 struct timer_list *timer = &dwork->timer; 1141 struct work_struct *work = &dwork->work; 1142 1143 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { 1144 unsigned int lcpu; 1145 1146 BUG_ON(timer_pending(timer)); 1147 BUG_ON(!list_empty(&work->entry)); 1148 1149 timer_stats_timer_set_start_info(&dwork->timer); 1150 1151 /* 1152 * This stores cwq for the moment, for the timer_fn. 1153 * Note that the work's gcwq is preserved to allow 1154 * reentrance detection for delayed works. 1155 */ 1156 if (!(wq->flags & WQ_UNBOUND)) { 1157 struct global_cwq *gcwq = get_work_gcwq(work); 1158 1159 if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND) 1160 lcpu = gcwq->cpu; 1161 else 1162 lcpu = raw_smp_processor_id(); 1163 } else 1164 lcpu = WORK_CPU_UNBOUND; 1165 1166 set_work_cwq(work, get_cwq(lcpu, wq), 0); 1167 1168 timer->expires = jiffies + delay; 1169 timer->data = (unsigned long)dwork; 1170 timer->function = delayed_work_timer_fn; 1171 1172 if (unlikely(cpu >= 0)) 1173 add_timer_on(timer, cpu); 1174 else 1175 add_timer(timer); 1176 ret = 1; 1177 } 1178 return ret; 1179 } 1180 EXPORT_SYMBOL_GPL(queue_delayed_work_on); 1181 1182 /** 1183 * worker_enter_idle - enter idle state 1184 * @worker: worker which is entering idle state 1185 * 1186 * @worker is entering idle state. Update stats and idle timer if 1187 * necessary. 1188 * 1189 * LOCKING: 1190 * spin_lock_irq(gcwq->lock). 1191 */ 1192 static void worker_enter_idle(struct worker *worker) 1193 { 1194 struct worker_pool *pool = worker->pool; 1195 struct global_cwq *gcwq = pool->gcwq; 1196 1197 BUG_ON(worker->flags & WORKER_IDLE); 1198 BUG_ON(!list_empty(&worker->entry) && 1199 (worker->hentry.next || worker->hentry.pprev)); 1200 1201 /* can't use worker_set_flags(), also called from start_worker() */ 1202 worker->flags |= WORKER_IDLE; 1203 pool->nr_idle++; 1204 worker->last_active = jiffies; 1205 1206 /* idle_list is LIFO */ 1207 list_add(&worker->entry, &pool->idle_list); 1208 1209 if (too_many_workers(pool) && !timer_pending(&pool->idle_timer)) 1210 mod_timer(&pool->idle_timer, jiffies + IDLE_WORKER_TIMEOUT); 1211 1212 /* 1213 * Sanity check nr_running. Because gcwq_unbind_fn() releases 1214 * gcwq->lock between setting %WORKER_UNBOUND and zapping 1215 * nr_running, the warning may trigger spuriously. Check iff 1216 * unbind is not in progress. 1217 */ 1218 WARN_ON_ONCE(!(gcwq->flags & GCWQ_DISASSOCIATED) && 1219 pool->nr_workers == pool->nr_idle && 1220 atomic_read(get_pool_nr_running(pool))); 1221 } 1222 1223 /** 1224 * worker_leave_idle - leave idle state 1225 * @worker: worker which is leaving idle state 1226 * 1227 * @worker is leaving idle state. Update stats. 1228 * 1229 * LOCKING: 1230 * spin_lock_irq(gcwq->lock). 1231 */ 1232 static void worker_leave_idle(struct worker *worker) 1233 { 1234 struct worker_pool *pool = worker->pool; 1235 1236 BUG_ON(!(worker->flags & WORKER_IDLE)); 1237 worker_clr_flags(worker, WORKER_IDLE); 1238 pool->nr_idle--; 1239 list_del_init(&worker->entry); 1240 } 1241 1242 /** 1243 * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock gcwq 1244 * @worker: self 1245 * 1246 * Works which are scheduled while the cpu is online must at least be 1247 * scheduled to a worker which is bound to the cpu so that if they are 1248 * flushed from cpu callbacks while cpu is going down, they are 1249 * guaranteed to execute on the cpu. 1250 * 1251 * This function is to be used by rogue workers and rescuers to bind 1252 * themselves to the target cpu and may race with cpu going down or 1253 * coming online. kthread_bind() can't be used because it may put the 1254 * worker to already dead cpu and set_cpus_allowed_ptr() can't be used 1255 * verbatim as it's best effort and blocking and gcwq may be 1256 * [dis]associated in the meantime. 1257 * 1258 * This function tries set_cpus_allowed() and locks gcwq and verifies the 1259 * binding against %GCWQ_DISASSOCIATED which is set during 1260 * %CPU_DOWN_PREPARE and cleared during %CPU_ONLINE, so if the worker 1261 * enters idle state or fetches works without dropping lock, it can 1262 * guarantee the scheduling requirement described in the first paragraph. 1263 * 1264 * CONTEXT: 1265 * Might sleep. Called without any lock but returns with gcwq->lock 1266 * held. 1267 * 1268 * RETURNS: 1269 * %true if the associated gcwq is online (@worker is successfully 1270 * bound), %false if offline. 1271 */ 1272 static bool worker_maybe_bind_and_lock(struct worker *worker) 1273 __acquires(&gcwq->lock) 1274 { 1275 struct global_cwq *gcwq = worker->pool->gcwq; 1276 struct task_struct *task = worker->task; 1277 1278 while (true) { 1279 /* 1280 * The following call may fail, succeed or succeed 1281 * without actually migrating the task to the cpu if 1282 * it races with cpu hotunplug operation. Verify 1283 * against GCWQ_DISASSOCIATED. 1284 */ 1285 if (!(gcwq->flags & GCWQ_DISASSOCIATED)) 1286 set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu)); 1287 1288 spin_lock_irq(&gcwq->lock); 1289 if (gcwq->flags & GCWQ_DISASSOCIATED) 1290 return false; 1291 if (task_cpu(task) == gcwq->cpu && 1292 cpumask_equal(¤t->cpus_allowed, 1293 get_cpu_mask(gcwq->cpu))) 1294 return true; 1295 spin_unlock_irq(&gcwq->lock); 1296 1297 /* 1298 * We've raced with CPU hot[un]plug. Give it a breather 1299 * and retry migration. cond_resched() is required here; 1300 * otherwise, we might deadlock against cpu_stop trying to 1301 * bring down the CPU on non-preemptive kernel. 1302 */ 1303 cpu_relax(); 1304 cond_resched(); 1305 } 1306 } 1307 1308 struct idle_rebind { 1309 int cnt; /* # workers to be rebound */ 1310 struct completion done; /* all workers rebound */ 1311 }; 1312 1313 /* 1314 * Rebind an idle @worker to its CPU. During CPU onlining, this has to 1315 * happen synchronously for idle workers. worker_thread() will test 1316 * %WORKER_REBIND before leaving idle and call this function. 1317 */ 1318 static void idle_worker_rebind(struct worker *worker) 1319 { 1320 struct global_cwq *gcwq = worker->pool->gcwq; 1321 1322 /* CPU must be online at this point */ 1323 WARN_ON(!worker_maybe_bind_and_lock(worker)); 1324 if (!--worker->idle_rebind->cnt) 1325 complete(&worker->idle_rebind->done); 1326 spin_unlock_irq(&worker->pool->gcwq->lock); 1327 1328 /* we did our part, wait for rebind_workers() to finish up */ 1329 wait_event(gcwq->rebind_hold, !(worker->flags & WORKER_REBIND)); 1330 1331 /* 1332 * rebind_workers() shouldn't finish until all workers passed the 1333 * above WORKER_REBIND wait. Tell it when done. 1334 */ 1335 spin_lock_irq(&worker->pool->gcwq->lock); 1336 if (!--worker->idle_rebind->cnt) 1337 complete(&worker->idle_rebind->done); 1338 spin_unlock_irq(&worker->pool->gcwq->lock); 1339 } 1340 1341 /* 1342 * Function for @worker->rebind.work used to rebind unbound busy workers to 1343 * the associated cpu which is coming back online. This is scheduled by 1344 * cpu up but can race with other cpu hotplug operations and may be 1345 * executed twice without intervening cpu down. 1346 */ 1347 static void busy_worker_rebind_fn(struct work_struct *work) 1348 { 1349 struct worker *worker = container_of(work, struct worker, rebind_work); 1350 struct global_cwq *gcwq = worker->pool->gcwq; 1351 1352 if (worker_maybe_bind_and_lock(worker)) 1353 worker_clr_flags(worker, WORKER_REBIND); 1354 1355 spin_unlock_irq(&gcwq->lock); 1356 } 1357 1358 /** 1359 * rebind_workers - rebind all workers of a gcwq to the associated CPU 1360 * @gcwq: gcwq of interest 1361 * 1362 * @gcwq->cpu is coming online. Rebind all workers to the CPU. Rebinding 1363 * is different for idle and busy ones. 1364 * 1365 * The idle ones should be rebound synchronously and idle rebinding should 1366 * be complete before any worker starts executing work items with 1367 * concurrency management enabled; otherwise, scheduler may oops trying to 1368 * wake up non-local idle worker from wq_worker_sleeping(). 1369 * 1370 * This is achieved by repeatedly requesting rebinding until all idle 1371 * workers are known to have been rebound under @gcwq->lock and holding all 1372 * idle workers from becoming busy until idle rebinding is complete. 1373 * 1374 * Once idle workers are rebound, busy workers can be rebound as they 1375 * finish executing their current work items. Queueing the rebind work at 1376 * the head of their scheduled lists is enough. Note that nr_running will 1377 * be properbly bumped as busy workers rebind. 1378 * 1379 * On return, all workers are guaranteed to either be bound or have rebind 1380 * work item scheduled. 1381 */ 1382 static void rebind_workers(struct global_cwq *gcwq) 1383 __releases(&gcwq->lock) __acquires(&gcwq->lock) 1384 { 1385 struct idle_rebind idle_rebind; 1386 struct worker_pool *pool; 1387 struct worker *worker; 1388 struct hlist_node *pos; 1389 int i; 1390 1391 lockdep_assert_held(&gcwq->lock); 1392 1393 for_each_worker_pool(pool, gcwq) 1394 lockdep_assert_held(&pool->manager_mutex); 1395 1396 /* 1397 * Rebind idle workers. Interlocked both ways. We wait for 1398 * workers to rebind via @idle_rebind.done. Workers will wait for 1399 * us to finish up by watching %WORKER_REBIND. 1400 */ 1401 init_completion(&idle_rebind.done); 1402 retry: 1403 idle_rebind.cnt = 1; 1404 INIT_COMPLETION(idle_rebind.done); 1405 1406 /* set REBIND and kick idle ones, we'll wait for these later */ 1407 for_each_worker_pool(pool, gcwq) { 1408 list_for_each_entry(worker, &pool->idle_list, entry) { 1409 unsigned long worker_flags = worker->flags; 1410 1411 if (worker->flags & WORKER_REBIND) 1412 continue; 1413 1414 /* morph UNBOUND to REBIND atomically */ 1415 worker_flags &= ~WORKER_UNBOUND; 1416 worker_flags |= WORKER_REBIND; 1417 ACCESS_ONCE(worker->flags) = worker_flags; 1418 1419 idle_rebind.cnt++; 1420 worker->idle_rebind = &idle_rebind; 1421 1422 /* worker_thread() will call idle_worker_rebind() */ 1423 wake_up_process(worker->task); 1424 } 1425 } 1426 1427 if (--idle_rebind.cnt) { 1428 spin_unlock_irq(&gcwq->lock); 1429 wait_for_completion(&idle_rebind.done); 1430 spin_lock_irq(&gcwq->lock); 1431 /* busy ones might have become idle while waiting, retry */ 1432 goto retry; 1433 } 1434 1435 /* all idle workers are rebound, rebind busy workers */ 1436 for_each_busy_worker(worker, i, pos, gcwq) { 1437 struct work_struct *rebind_work = &worker->rebind_work; 1438 unsigned long worker_flags = worker->flags; 1439 1440 /* morph UNBOUND to REBIND atomically */ 1441 worker_flags &= ~WORKER_UNBOUND; 1442 worker_flags |= WORKER_REBIND; 1443 ACCESS_ONCE(worker->flags) = worker_flags; 1444 1445 if (test_and_set_bit(WORK_STRUCT_PENDING_BIT, 1446 work_data_bits(rebind_work))) 1447 continue; 1448 1449 /* wq doesn't matter, use the default one */ 1450 debug_work_activate(rebind_work); 1451 insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work, 1452 worker->scheduled.next, 1453 work_color_to_flags(WORK_NO_COLOR)); 1454 } 1455 1456 /* 1457 * All idle workers are rebound and waiting for %WORKER_REBIND to 1458 * be cleared inside idle_worker_rebind(). Clear and release. 1459 * Clearing %WORKER_REBIND from this foreign context is safe 1460 * because these workers are still guaranteed to be idle. 1461 * 1462 * We need to make sure all idle workers passed WORKER_REBIND wait 1463 * in idle_worker_rebind() before returning; otherwise, workers can 1464 * get stuck at the wait if hotplug cycle repeats. 1465 */ 1466 idle_rebind.cnt = 1; 1467 INIT_COMPLETION(idle_rebind.done); 1468 1469 for_each_worker_pool(pool, gcwq) { 1470 list_for_each_entry(worker, &pool->idle_list, entry) { 1471 worker->flags &= ~WORKER_REBIND; 1472 idle_rebind.cnt++; 1473 } 1474 } 1475 1476 wake_up_all(&gcwq->rebind_hold); 1477 1478 if (--idle_rebind.cnt) { 1479 spin_unlock_irq(&gcwq->lock); 1480 wait_for_completion(&idle_rebind.done); 1481 spin_lock_irq(&gcwq->lock); 1482 } 1483 } 1484 1485 static struct worker *alloc_worker(void) 1486 { 1487 struct worker *worker; 1488 1489 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 1490 if (worker) { 1491 INIT_LIST_HEAD(&worker->entry); 1492 INIT_LIST_HEAD(&worker->scheduled); 1493 INIT_WORK(&worker->rebind_work, busy_worker_rebind_fn); 1494 /* on creation a worker is in !idle && prep state */ 1495 worker->flags = WORKER_PREP; 1496 } 1497 return worker; 1498 } 1499 1500 /** 1501 * create_worker - create a new workqueue worker 1502 * @pool: pool the new worker will belong to 1503 * 1504 * Create a new worker which is bound to @pool. The returned worker 1505 * can be started by calling start_worker() or destroyed using 1506 * destroy_worker(). 1507 * 1508 * CONTEXT: 1509 * Might sleep. Does GFP_KERNEL allocations. 1510 * 1511 * RETURNS: 1512 * Pointer to the newly created worker. 1513 */ 1514 static struct worker *create_worker(struct worker_pool *pool) 1515 { 1516 struct global_cwq *gcwq = pool->gcwq; 1517 const char *pri = worker_pool_pri(pool) ? "H" : ""; 1518 struct worker *worker = NULL; 1519 int id = -1; 1520 1521 spin_lock_irq(&gcwq->lock); 1522 while (ida_get_new(&pool->worker_ida, &id)) { 1523 spin_unlock_irq(&gcwq->lock); 1524 if (!ida_pre_get(&pool->worker_ida, GFP_KERNEL)) 1525 goto fail; 1526 spin_lock_irq(&gcwq->lock); 1527 } 1528 spin_unlock_irq(&gcwq->lock); 1529 1530 worker = alloc_worker(); 1531 if (!worker) 1532 goto fail; 1533 1534 worker->pool = pool; 1535 worker->id = id; 1536 1537 if (gcwq->cpu != WORK_CPU_UNBOUND) 1538 worker->task = kthread_create_on_node(worker_thread, 1539 worker, cpu_to_node(gcwq->cpu), 1540 "kworker/%u:%d%s", gcwq->cpu, id, pri); 1541 else 1542 worker->task = kthread_create(worker_thread, worker, 1543 "kworker/u:%d%s", id, pri); 1544 if (IS_ERR(worker->task)) 1545 goto fail; 1546 1547 if (worker_pool_pri(pool)) 1548 set_user_nice(worker->task, HIGHPRI_NICE_LEVEL); 1549 1550 /* 1551 * Determine CPU binding of the new worker depending on 1552 * %GCWQ_DISASSOCIATED. The caller is responsible for ensuring the 1553 * flag remains stable across this function. See the comments 1554 * above the flag definition for details. 1555 * 1556 * As an unbound worker may later become a regular one if CPU comes 1557 * online, make sure every worker has %PF_THREAD_BOUND set. 1558 */ 1559 if (!(gcwq->flags & GCWQ_DISASSOCIATED)) { 1560 kthread_bind(worker->task, gcwq->cpu); 1561 } else { 1562 worker->task->flags |= PF_THREAD_BOUND; 1563 worker->flags |= WORKER_UNBOUND; 1564 } 1565 1566 return worker; 1567 fail: 1568 if (id >= 0) { 1569 spin_lock_irq(&gcwq->lock); 1570 ida_remove(&pool->worker_ida, id); 1571 spin_unlock_irq(&gcwq->lock); 1572 } 1573 kfree(worker); 1574 return NULL; 1575 } 1576 1577 /** 1578 * start_worker - start a newly created worker 1579 * @worker: worker to start 1580 * 1581 * Make the gcwq aware of @worker and start it. 1582 * 1583 * CONTEXT: 1584 * spin_lock_irq(gcwq->lock). 1585 */ 1586 static void start_worker(struct worker *worker) 1587 { 1588 worker->flags |= WORKER_STARTED; 1589 worker->pool->nr_workers++; 1590 worker_enter_idle(worker); 1591 wake_up_process(worker->task); 1592 } 1593 1594 /** 1595 * destroy_worker - destroy a workqueue worker 1596 * @worker: worker to be destroyed 1597 * 1598 * Destroy @worker and adjust @gcwq stats accordingly. 1599 * 1600 * CONTEXT: 1601 * spin_lock_irq(gcwq->lock) which is released and regrabbed. 1602 */ 1603 static void destroy_worker(struct worker *worker) 1604 { 1605 struct worker_pool *pool = worker->pool; 1606 struct global_cwq *gcwq = pool->gcwq; 1607 int id = worker->id; 1608 1609 /* sanity check frenzy */ 1610 BUG_ON(worker->current_work); 1611 BUG_ON(!list_empty(&worker->scheduled)); 1612 1613 if (worker->flags & WORKER_STARTED) 1614 pool->nr_workers--; 1615 if (worker->flags & WORKER_IDLE) 1616 pool->nr_idle--; 1617 1618 list_del_init(&worker->entry); 1619 worker->flags |= WORKER_DIE; 1620 1621 spin_unlock_irq(&gcwq->lock); 1622 1623 kthread_stop(worker->task); 1624 kfree(worker); 1625 1626 spin_lock_irq(&gcwq->lock); 1627 ida_remove(&pool->worker_ida, id); 1628 } 1629 1630 static void idle_worker_timeout(unsigned long __pool) 1631 { 1632 struct worker_pool *pool = (void *)__pool; 1633 struct global_cwq *gcwq = pool->gcwq; 1634 1635 spin_lock_irq(&gcwq->lock); 1636 1637 if (too_many_workers(pool)) { 1638 struct worker *worker; 1639 unsigned long expires; 1640 1641 /* idle_list is kept in LIFO order, check the last one */ 1642 worker = list_entry(pool->idle_list.prev, struct worker, entry); 1643 expires = worker->last_active + IDLE_WORKER_TIMEOUT; 1644 1645 if (time_before(jiffies, expires)) 1646 mod_timer(&pool->idle_timer, expires); 1647 else { 1648 /* it's been idle for too long, wake up manager */ 1649 pool->flags |= POOL_MANAGE_WORKERS; 1650 wake_up_worker(pool); 1651 } 1652 } 1653 1654 spin_unlock_irq(&gcwq->lock); 1655 } 1656 1657 static bool send_mayday(struct work_struct *work) 1658 { 1659 struct cpu_workqueue_struct *cwq = get_work_cwq(work); 1660 struct workqueue_struct *wq = cwq->wq; 1661 unsigned int cpu; 1662 1663 if (!(wq->flags & WQ_RESCUER)) 1664 return false; 1665 1666 /* mayday mayday mayday */ 1667 cpu = cwq->pool->gcwq->cpu; 1668 /* WORK_CPU_UNBOUND can't be set in cpumask, use cpu 0 instead */ 1669 if (cpu == WORK_CPU_UNBOUND) 1670 cpu = 0; 1671 if (!mayday_test_and_set_cpu(cpu, wq->mayday_mask)) 1672 wake_up_process(wq->rescuer->task); 1673 return true; 1674 } 1675 1676 static void gcwq_mayday_timeout(unsigned long __pool) 1677 { 1678 struct worker_pool *pool = (void *)__pool; 1679 struct global_cwq *gcwq = pool->gcwq; 1680 struct work_struct *work; 1681 1682 spin_lock_irq(&gcwq->lock); 1683 1684 if (need_to_create_worker(pool)) { 1685 /* 1686 * We've been trying to create a new worker but 1687 * haven't been successful. We might be hitting an 1688 * allocation deadlock. Send distress signals to 1689 * rescuers. 1690 */ 1691 list_for_each_entry(work, &pool->worklist, entry) 1692 send_mayday(work); 1693 } 1694 1695 spin_unlock_irq(&gcwq->lock); 1696 1697 mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INTERVAL); 1698 } 1699 1700 /** 1701 * maybe_create_worker - create a new worker if necessary 1702 * @pool: pool to create a new worker for 1703 * 1704 * Create a new worker for @pool if necessary. @pool is guaranteed to 1705 * have at least one idle worker on return from this function. If 1706 * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is 1707 * sent to all rescuers with works scheduled on @pool to resolve 1708 * possible allocation deadlock. 1709 * 1710 * On return, need_to_create_worker() is guaranteed to be false and 1711 * may_start_working() true. 1712 * 1713 * LOCKING: 1714 * spin_lock_irq(gcwq->lock) which may be released and regrabbed 1715 * multiple times. Does GFP_KERNEL allocations. Called only from 1716 * manager. 1717 * 1718 * RETURNS: 1719 * false if no action was taken and gcwq->lock stayed locked, true 1720 * otherwise. 1721 */ 1722 static bool maybe_create_worker(struct worker_pool *pool) 1723 __releases(&gcwq->lock) 1724 __acquires(&gcwq->lock) 1725 { 1726 struct global_cwq *gcwq = pool->gcwq; 1727 1728 if (!need_to_create_worker(pool)) 1729 return false; 1730 restart: 1731 spin_unlock_irq(&gcwq->lock); 1732 1733 /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */ 1734 mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT); 1735 1736 while (true) { 1737 struct worker *worker; 1738 1739 worker = create_worker(pool); 1740 if (worker) { 1741 del_timer_sync(&pool->mayday_timer); 1742 spin_lock_irq(&gcwq->lock); 1743 start_worker(worker); 1744 BUG_ON(need_to_create_worker(pool)); 1745 return true; 1746 } 1747 1748 if (!need_to_create_worker(pool)) 1749 break; 1750 1751 __set_current_state(TASK_INTERRUPTIBLE); 1752 schedule_timeout(CREATE_COOLDOWN); 1753 1754 if (!need_to_create_worker(pool)) 1755 break; 1756 } 1757 1758 del_timer_sync(&pool->mayday_timer); 1759 spin_lock_irq(&gcwq->lock); 1760 if (need_to_create_worker(pool)) 1761 goto restart; 1762 return true; 1763 } 1764 1765 /** 1766 * maybe_destroy_worker - destroy workers which have been idle for a while 1767 * @pool: pool to destroy workers for 1768 * 1769 * Destroy @pool workers which have been idle for longer than 1770 * IDLE_WORKER_TIMEOUT. 1771 * 1772 * LOCKING: 1773 * spin_lock_irq(gcwq->lock) which may be released and regrabbed 1774 * multiple times. Called only from manager. 1775 * 1776 * RETURNS: 1777 * false if no action was taken and gcwq->lock stayed locked, true 1778 * otherwise. 1779 */ 1780 static bool maybe_destroy_workers(struct worker_pool *pool) 1781 { 1782 bool ret = false; 1783 1784 while (too_many_workers(pool)) { 1785 struct worker *worker; 1786 unsigned long expires; 1787 1788 worker = list_entry(pool->idle_list.prev, struct worker, entry); 1789 expires = worker->last_active + IDLE_WORKER_TIMEOUT; 1790 1791 if (time_before(jiffies, expires)) { 1792 mod_timer(&pool->idle_timer, expires); 1793 break; 1794 } 1795 1796 destroy_worker(worker); 1797 ret = true; 1798 } 1799 1800 return ret; 1801 } 1802 1803 /** 1804 * manage_workers - manage worker pool 1805 * @worker: self 1806 * 1807 * Assume the manager role and manage gcwq worker pool @worker belongs 1808 * to. At any given time, there can be only zero or one manager per 1809 * gcwq. The exclusion is handled automatically by this function. 1810 * 1811 * The caller can safely start processing works on false return. On 1812 * true return, it's guaranteed that need_to_create_worker() is false 1813 * and may_start_working() is true. 1814 * 1815 * CONTEXT: 1816 * spin_lock_irq(gcwq->lock) which may be released and regrabbed 1817 * multiple times. Does GFP_KERNEL allocations. 1818 * 1819 * RETURNS: 1820 * false if no action was taken and gcwq->lock stayed locked, true if 1821 * some action was taken. 1822 */ 1823 static bool manage_workers(struct worker *worker) 1824 { 1825 struct worker_pool *pool = worker->pool; 1826 bool ret = false; 1827 1828 if (pool->flags & POOL_MANAGING_WORKERS) 1829 return ret; 1830 1831 pool->flags |= POOL_MANAGING_WORKERS; 1832 1833 /* 1834 * To simplify both worker management and CPU hotplug, hold off 1835 * management while hotplug is in progress. CPU hotplug path can't 1836 * grab %POOL_MANAGING_WORKERS to achieve this because that can 1837 * lead to idle worker depletion (all become busy thinking someone 1838 * else is managing) which in turn can result in deadlock under 1839 * extreme circumstances. Use @pool->manager_mutex to synchronize 1840 * manager against CPU hotplug. 1841 * 1842 * manager_mutex would always be free unless CPU hotplug is in 1843 * progress. trylock first without dropping @gcwq->lock. 1844 */ 1845 if (unlikely(!mutex_trylock(&pool->manager_mutex))) { 1846 spin_unlock_irq(&pool->gcwq->lock); 1847 mutex_lock(&pool->manager_mutex); 1848 /* 1849 * CPU hotplug could have happened while we were waiting 1850 * for manager_mutex. Hotplug itself can't handle us 1851 * because manager isn't either on idle or busy list, and 1852 * @gcwq's state and ours could have deviated. 1853 * 1854 * As hotplug is now excluded via manager_mutex, we can 1855 * simply try to bind. It will succeed or fail depending 1856 * on @gcwq's current state. Try it and adjust 1857 * %WORKER_UNBOUND accordingly. 1858 */ 1859 if (worker_maybe_bind_and_lock(worker)) 1860 worker->flags &= ~WORKER_UNBOUND; 1861 else 1862 worker->flags |= WORKER_UNBOUND; 1863 1864 ret = true; 1865 } 1866 1867 pool->flags &= ~POOL_MANAGE_WORKERS; 1868 1869 /* 1870 * Destroy and then create so that may_start_working() is true 1871 * on return. 1872 */ 1873 ret |= maybe_destroy_workers(pool); 1874 ret |= maybe_create_worker(pool); 1875 1876 pool->flags &= ~POOL_MANAGING_WORKERS; 1877 mutex_unlock(&pool->manager_mutex); 1878 return ret; 1879 } 1880 1881 /** 1882 * move_linked_works - move linked works to a list 1883 * @work: start of series of works to be scheduled 1884 * @head: target list to append @work to 1885 * @nextp: out paramter for nested worklist walking 1886 * 1887 * Schedule linked works starting from @work to @head. Work series to 1888 * be scheduled starts at @work and includes any consecutive work with 1889 * WORK_STRUCT_LINKED set in its predecessor. 1890 * 1891 * If @nextp is not NULL, it's updated to point to the next work of 1892 * the last scheduled work. This allows move_linked_works() to be 1893 * nested inside outer list_for_each_entry_safe(). 1894 * 1895 * CONTEXT: 1896 * spin_lock_irq(gcwq->lock). 1897 */ 1898 static void move_linked_works(struct work_struct *work, struct list_head *head, 1899 struct work_struct **nextp) 1900 { 1901 struct work_struct *n; 1902 1903 /* 1904 * Linked worklist will always end before the end of the list, 1905 * use NULL for list head. 1906 */ 1907 list_for_each_entry_safe_from(work, n, NULL, entry) { 1908 list_move_tail(&work->entry, head); 1909 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED)) 1910 break; 1911 } 1912 1913 /* 1914 * If we're already inside safe list traversal and have moved 1915 * multiple works to the scheduled queue, the next position 1916 * needs to be updated. 1917 */ 1918 if (nextp) 1919 *nextp = n; 1920 } 1921 1922 static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq) 1923 { 1924 struct work_struct *work = list_first_entry(&cwq->delayed_works, 1925 struct work_struct, entry); 1926 1927 trace_workqueue_activate_work(work); 1928 move_linked_works(work, &cwq->pool->worklist, NULL); 1929 __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work)); 1930 cwq->nr_active++; 1931 } 1932 1933 /** 1934 * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight 1935 * @cwq: cwq of interest 1936 * @color: color of work which left the queue 1937 * @delayed: for a delayed work 1938 * 1939 * A work either has completed or is removed from pending queue, 1940 * decrement nr_in_flight of its cwq and handle workqueue flushing. 1941 * 1942 * CONTEXT: 1943 * spin_lock_irq(gcwq->lock). 1944 */ 1945 static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color, 1946 bool delayed) 1947 { 1948 /* ignore uncolored works */ 1949 if (color == WORK_NO_COLOR) 1950 return; 1951 1952 cwq->nr_in_flight[color]--; 1953 1954 if (!delayed) { 1955 cwq->nr_active--; 1956 if (!list_empty(&cwq->delayed_works)) { 1957 /* one down, submit a delayed one */ 1958 if (cwq->nr_active < cwq->max_active) 1959 cwq_activate_first_delayed(cwq); 1960 } 1961 } 1962 1963 /* is flush in progress and are we at the flushing tip? */ 1964 if (likely(cwq->flush_color != color)) 1965 return; 1966 1967 /* are there still in-flight works? */ 1968 if (cwq->nr_in_flight[color]) 1969 return; 1970 1971 /* this cwq is done, clear flush_color */ 1972 cwq->flush_color = -1; 1973 1974 /* 1975 * If this was the last cwq, wake up the first flusher. It 1976 * will handle the rest. 1977 */ 1978 if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush)) 1979 complete(&cwq->wq->first_flusher->done); 1980 } 1981 1982 /** 1983 * process_one_work - process single work 1984 * @worker: self 1985 * @work: work to process 1986 * 1987 * Process @work. This function contains all the logics necessary to 1988 * process a single work including synchronization against and 1989 * interaction with other workers on the same cpu, queueing and 1990 * flushing. As long as context requirement is met, any worker can 1991 * call this function to process a work. 1992 * 1993 * CONTEXT: 1994 * spin_lock_irq(gcwq->lock) which is released and regrabbed. 1995 */ 1996 static void process_one_work(struct worker *worker, struct work_struct *work) 1997 __releases(&gcwq->lock) 1998 __acquires(&gcwq->lock) 1999 { 2000 struct cpu_workqueue_struct *cwq = get_work_cwq(work); 2001 struct worker_pool *pool = worker->pool; 2002 struct global_cwq *gcwq = pool->gcwq; 2003 struct hlist_head *bwh = busy_worker_head(gcwq, work); 2004 bool cpu_intensive = cwq->wq->flags & WQ_CPU_INTENSIVE; 2005 work_func_t f = work->func; 2006 int work_color; 2007 struct worker *collision; 2008 #ifdef CONFIG_LOCKDEP 2009 /* 2010 * It is permissible to free the struct work_struct from 2011 * inside the function that is called from it, this we need to 2012 * take into account for lockdep too. To avoid bogus "held 2013 * lock freed" warnings as well as problems when looking into 2014 * work->lockdep_map, make a copy and use that here. 2015 */ 2016 struct lockdep_map lockdep_map; 2017 2018 lockdep_copy_map(&lockdep_map, &work->lockdep_map); 2019 #endif 2020 /* 2021 * Ensure we're on the correct CPU. DISASSOCIATED test is 2022 * necessary to avoid spurious warnings from rescuers servicing the 2023 * unbound or a disassociated gcwq. 2024 */ 2025 WARN_ON_ONCE(!(worker->flags & (WORKER_UNBOUND | WORKER_REBIND)) && 2026 !(gcwq->flags & GCWQ_DISASSOCIATED) && 2027 raw_smp_processor_id() != gcwq->cpu); 2028 2029 /* 2030 * A single work shouldn't be executed concurrently by 2031 * multiple workers on a single cpu. Check whether anyone is 2032 * already processing the work. If so, defer the work to the 2033 * currently executing one. 2034 */ 2035 collision = __find_worker_executing_work(gcwq, bwh, work); 2036 if (unlikely(collision)) { 2037 move_linked_works(work, &collision->scheduled, NULL); 2038 return; 2039 } 2040 2041 /* claim and process */ 2042 debug_work_deactivate(work); 2043 hlist_add_head(&worker->hentry, bwh); 2044 worker->current_work = work; 2045 worker->current_cwq = cwq; 2046 work_color = get_work_color(work); 2047 2048 /* record the current cpu number in the work data and dequeue */ 2049 set_work_cpu(work, gcwq->cpu); 2050 list_del_init(&work->entry); 2051 2052 /* 2053 * CPU intensive works don't participate in concurrency 2054 * management. They're the scheduler's responsibility. 2055 */ 2056 if (unlikely(cpu_intensive)) 2057 worker_set_flags(worker, WORKER_CPU_INTENSIVE, true); 2058 2059 /* 2060 * Unbound gcwq isn't concurrency managed and work items should be 2061 * executed ASAP. Wake up another worker if necessary. 2062 */ 2063 if ((worker->flags & WORKER_UNBOUND) && need_more_worker(pool)) 2064 wake_up_worker(pool); 2065 2066 spin_unlock_irq(&gcwq->lock); 2067 2068 work_clear_pending(work); 2069 lock_map_acquire_read(&cwq->wq->lockdep_map); 2070 lock_map_acquire(&lockdep_map); 2071 trace_workqueue_execute_start(work); 2072 f(work); 2073 /* 2074 * While we must be careful to not use "work" after this, the trace 2075 * point will only record its address. 2076 */ 2077 trace_workqueue_execute_end(work); 2078 lock_map_release(&lockdep_map); 2079 lock_map_release(&cwq->wq->lockdep_map); 2080 2081 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 2082 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 2083 "%s/0x%08x/%d\n", 2084 current->comm, preempt_count(), task_pid_nr(current)); 2085 printk(KERN_ERR " last function: "); 2086 print_symbol("%s\n", (unsigned long)f); 2087 debug_show_held_locks(current); 2088 dump_stack(); 2089 } 2090 2091 spin_lock_irq(&gcwq->lock); 2092 2093 /* clear cpu intensive status */ 2094 if (unlikely(cpu_intensive)) 2095 worker_clr_flags(worker, WORKER_CPU_INTENSIVE); 2096 2097 /* we're done with it, release */ 2098 hlist_del_init(&worker->hentry); 2099 worker->current_work = NULL; 2100 worker->current_cwq = NULL; 2101 cwq_dec_nr_in_flight(cwq, work_color, false); 2102 } 2103 2104 /** 2105 * process_scheduled_works - process scheduled works 2106 * @worker: self 2107 * 2108 * Process all scheduled works. Please note that the scheduled list 2109 * may change while processing a work, so this function repeatedly 2110 * fetches a work from the top and executes it. 2111 * 2112 * CONTEXT: 2113 * spin_lock_irq(gcwq->lock) which may be released and regrabbed 2114 * multiple times. 2115 */ 2116 static void process_scheduled_works(struct worker *worker) 2117 { 2118 while (!list_empty(&worker->scheduled)) { 2119 struct work_struct *work = list_first_entry(&worker->scheduled, 2120 struct work_struct, entry); 2121 process_one_work(worker, work); 2122 } 2123 } 2124 2125 /** 2126 * worker_thread - the worker thread function 2127 * @__worker: self 2128 * 2129 * The gcwq worker thread function. There's a single dynamic pool of 2130 * these per each cpu. These workers process all works regardless of 2131 * their specific target workqueue. The only exception is works which 2132 * belong to workqueues with a rescuer which will be explained in 2133 * rescuer_thread(). 2134 */ 2135 static int worker_thread(void *__worker) 2136 { 2137 struct worker *worker = __worker; 2138 struct worker_pool *pool = worker->pool; 2139 struct global_cwq *gcwq = pool->gcwq; 2140 2141 /* tell the scheduler that this is a workqueue worker */ 2142 worker->task->flags |= PF_WQ_WORKER; 2143 woke_up: 2144 spin_lock_irq(&gcwq->lock); 2145 2146 /* 2147 * DIE can be set only while idle and REBIND set while busy has 2148 * @worker->rebind_work scheduled. Checking here is enough. 2149 */ 2150 if (unlikely(worker->flags & (WORKER_REBIND | WORKER_DIE))) { 2151 spin_unlock_irq(&gcwq->lock); 2152 2153 if (worker->flags & WORKER_DIE) { 2154 worker->task->flags &= ~PF_WQ_WORKER; 2155 return 0; 2156 } 2157 2158 idle_worker_rebind(worker); 2159 goto woke_up; 2160 } 2161 2162 worker_leave_idle(worker); 2163 recheck: 2164 /* no more worker necessary? */ 2165 if (!need_more_worker(pool)) 2166 goto sleep; 2167 2168 /* do we need to manage? */ 2169 if (unlikely(!may_start_working(pool)) && manage_workers(worker)) 2170 goto recheck; 2171 2172 /* 2173 * ->scheduled list can only be filled while a worker is 2174 * preparing to process a work or actually processing it. 2175 * Make sure nobody diddled with it while I was sleeping. 2176 */ 2177 BUG_ON(!list_empty(&worker->scheduled)); 2178 2179 /* 2180 * When control reaches this point, we're guaranteed to have 2181 * at least one idle worker or that someone else has already 2182 * assumed the manager role. 2183 */ 2184 worker_clr_flags(worker, WORKER_PREP); 2185 2186 do { 2187 struct work_struct *work = 2188 list_first_entry(&pool->worklist, 2189 struct work_struct, entry); 2190 2191 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) { 2192 /* optimization path, not strictly necessary */ 2193 process_one_work(worker, work); 2194 if (unlikely(!list_empty(&worker->scheduled))) 2195 process_scheduled_works(worker); 2196 } else { 2197 move_linked_works(work, &worker->scheduled, NULL); 2198 process_scheduled_works(worker); 2199 } 2200 } while (keep_working(pool)); 2201 2202 worker_set_flags(worker, WORKER_PREP, false); 2203 sleep: 2204 if (unlikely(need_to_manage_workers(pool)) && manage_workers(worker)) 2205 goto recheck; 2206 2207 /* 2208 * gcwq->lock is held and there's no work to process and no 2209 * need to manage, sleep. Workers are woken up only while 2210 * holding gcwq->lock or from local cpu, so setting the 2211 * current state before releasing gcwq->lock is enough to 2212 * prevent losing any event. 2213 */ 2214 worker_enter_idle(worker); 2215 __set_current_state(TASK_INTERRUPTIBLE); 2216 spin_unlock_irq(&gcwq->lock); 2217 schedule(); 2218 goto woke_up; 2219 } 2220 2221 /** 2222 * rescuer_thread - the rescuer thread function 2223 * @__wq: the associated workqueue 2224 * 2225 * Workqueue rescuer thread function. There's one rescuer for each 2226 * workqueue which has WQ_RESCUER set. 2227 * 2228 * Regular work processing on a gcwq may block trying to create a new 2229 * worker which uses GFP_KERNEL allocation which has slight chance of 2230 * developing into deadlock if some works currently on the same queue 2231 * need to be processed to satisfy the GFP_KERNEL allocation. This is 2232 * the problem rescuer solves. 2233 * 2234 * When such condition is possible, the gcwq summons rescuers of all 2235 * workqueues which have works queued on the gcwq and let them process 2236 * those works so that forward progress can be guaranteed. 2237 * 2238 * This should happen rarely. 2239 */ 2240 static int rescuer_thread(void *__wq) 2241 { 2242 struct workqueue_struct *wq = __wq; 2243 struct worker *rescuer = wq->rescuer; 2244 struct list_head *scheduled = &rescuer->scheduled; 2245 bool is_unbound = wq->flags & WQ_UNBOUND; 2246 unsigned int cpu; 2247 2248 set_user_nice(current, RESCUER_NICE_LEVEL); 2249 repeat: 2250 set_current_state(TASK_INTERRUPTIBLE); 2251 2252 if (kthread_should_stop()) 2253 return 0; 2254 2255 /* 2256 * See whether any cpu is asking for help. Unbounded 2257 * workqueues use cpu 0 in mayday_mask for CPU_UNBOUND. 2258 */ 2259 for_each_mayday_cpu(cpu, wq->mayday_mask) { 2260 unsigned int tcpu = is_unbound ? WORK_CPU_UNBOUND : cpu; 2261 struct cpu_workqueue_struct *cwq = get_cwq(tcpu, wq); 2262 struct worker_pool *pool = cwq->pool; 2263 struct global_cwq *gcwq = pool->gcwq; 2264 struct work_struct *work, *n; 2265 2266 __set_current_state(TASK_RUNNING); 2267 mayday_clear_cpu(cpu, wq->mayday_mask); 2268 2269 /* migrate to the target cpu if possible */ 2270 rescuer->pool = pool; 2271 worker_maybe_bind_and_lock(rescuer); 2272 2273 /* 2274 * Slurp in all works issued via this workqueue and 2275 * process'em. 2276 */ 2277 BUG_ON(!list_empty(&rescuer->scheduled)); 2278 list_for_each_entry_safe(work, n, &pool->worklist, entry) 2279 if (get_work_cwq(work) == cwq) 2280 move_linked_works(work, scheduled, &n); 2281 2282 process_scheduled_works(rescuer); 2283 2284 /* 2285 * Leave this gcwq. If keep_working() is %true, notify a 2286 * regular worker; otherwise, we end up with 0 concurrency 2287 * and stalling the execution. 2288 */ 2289 if (keep_working(pool)) 2290 wake_up_worker(pool); 2291 2292 spin_unlock_irq(&gcwq->lock); 2293 } 2294 2295 schedule(); 2296 goto repeat; 2297 } 2298 2299 struct wq_barrier { 2300 struct work_struct work; 2301 struct completion done; 2302 }; 2303 2304 static void wq_barrier_func(struct work_struct *work) 2305 { 2306 struct wq_barrier *barr = container_of(work, struct wq_barrier, work); 2307 complete(&barr->done); 2308 } 2309 2310 /** 2311 * insert_wq_barrier - insert a barrier work 2312 * @cwq: cwq to insert barrier into 2313 * @barr: wq_barrier to insert 2314 * @target: target work to attach @barr to 2315 * @worker: worker currently executing @target, NULL if @target is not executing 2316 * 2317 * @barr is linked to @target such that @barr is completed only after 2318 * @target finishes execution. Please note that the ordering 2319 * guarantee is observed only with respect to @target and on the local 2320 * cpu. 2321 * 2322 * Currently, a queued barrier can't be canceled. This is because 2323 * try_to_grab_pending() can't determine whether the work to be 2324 * grabbed is at the head of the queue and thus can't clear LINKED 2325 * flag of the previous work while there must be a valid next work 2326 * after a work with LINKED flag set. 2327 * 2328 * Note that when @worker is non-NULL, @target may be modified 2329 * underneath us, so we can't reliably determine cwq from @target. 2330 * 2331 * CONTEXT: 2332 * spin_lock_irq(gcwq->lock). 2333 */ 2334 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, 2335 struct wq_barrier *barr, 2336 struct work_struct *target, struct worker *worker) 2337 { 2338 struct list_head *head; 2339 unsigned int linked = 0; 2340 2341 /* 2342 * debugobject calls are safe here even with gcwq->lock locked 2343 * as we know for sure that this will not trigger any of the 2344 * checks and call back into the fixup functions where we 2345 * might deadlock. 2346 */ 2347 INIT_WORK_ONSTACK(&barr->work, wq_barrier_func); 2348 __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work)); 2349 init_completion(&barr->done); 2350 2351 /* 2352 * If @target is currently being executed, schedule the 2353 * barrier to the worker; otherwise, put it after @target. 2354 */ 2355 if (worker) 2356 head = worker->scheduled.next; 2357 else { 2358 unsigned long *bits = work_data_bits(target); 2359 2360 head = target->entry.next; 2361 /* there can already be other linked works, inherit and set */ 2362 linked = *bits & WORK_STRUCT_LINKED; 2363 __set_bit(WORK_STRUCT_LINKED_BIT, bits); 2364 } 2365 2366 debug_work_activate(&barr->work); 2367 insert_work(cwq, &barr->work, head, 2368 work_color_to_flags(WORK_NO_COLOR) | linked); 2369 } 2370 2371 /** 2372 * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing 2373 * @wq: workqueue being flushed 2374 * @flush_color: new flush color, < 0 for no-op 2375 * @work_color: new work color, < 0 for no-op 2376 * 2377 * Prepare cwqs for workqueue flushing. 2378 * 2379 * If @flush_color is non-negative, flush_color on all cwqs should be 2380 * -1. If no cwq has in-flight commands at the specified color, all 2381 * cwq->flush_color's stay at -1 and %false is returned. If any cwq 2382 * has in flight commands, its cwq->flush_color is set to 2383 * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq 2384 * wakeup logic is armed and %true is returned. 2385 * 2386 * The caller should have initialized @wq->first_flusher prior to 2387 * calling this function with non-negative @flush_color. If 2388 * @flush_color is negative, no flush color update is done and %false 2389 * is returned. 2390 * 2391 * If @work_color is non-negative, all cwqs should have the same 2392 * work_color which is previous to @work_color and all will be 2393 * advanced to @work_color. 2394 * 2395 * CONTEXT: 2396 * mutex_lock(wq->flush_mutex). 2397 * 2398 * RETURNS: 2399 * %true if @flush_color >= 0 and there's something to flush. %false 2400 * otherwise. 2401 */ 2402 static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq, 2403 int flush_color, int work_color) 2404 { 2405 bool wait = false; 2406 unsigned int cpu; 2407 2408 if (flush_color >= 0) { 2409 BUG_ON(atomic_read(&wq->nr_cwqs_to_flush)); 2410 atomic_set(&wq->nr_cwqs_to_flush, 1); 2411 } 2412 2413 for_each_cwq_cpu(cpu, wq) { 2414 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 2415 struct global_cwq *gcwq = cwq->pool->gcwq; 2416 2417 spin_lock_irq(&gcwq->lock); 2418 2419 if (flush_color >= 0) { 2420 BUG_ON(cwq->flush_color != -1); 2421 2422 if (cwq->nr_in_flight[flush_color]) { 2423 cwq->flush_color = flush_color; 2424 atomic_inc(&wq->nr_cwqs_to_flush); 2425 wait = true; 2426 } 2427 } 2428 2429 if (work_color >= 0) { 2430 BUG_ON(work_color != work_next_color(cwq->work_color)); 2431 cwq->work_color = work_color; 2432 } 2433 2434 spin_unlock_irq(&gcwq->lock); 2435 } 2436 2437 if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush)) 2438 complete(&wq->first_flusher->done); 2439 2440 return wait; 2441 } 2442 2443 /** 2444 * flush_workqueue - ensure that any scheduled work has run to completion. 2445 * @wq: workqueue to flush 2446 * 2447 * Forces execution of the workqueue and blocks until its completion. 2448 * This is typically used in driver shutdown handlers. 2449 * 2450 * We sleep until all works which were queued on entry have been handled, 2451 * but we are not livelocked by new incoming ones. 2452 */ 2453 void flush_workqueue(struct workqueue_struct *wq) 2454 { 2455 struct wq_flusher this_flusher = { 2456 .list = LIST_HEAD_INIT(this_flusher.list), 2457 .flush_color = -1, 2458 .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done), 2459 }; 2460 int next_color; 2461 2462 lock_map_acquire(&wq->lockdep_map); 2463 lock_map_release(&wq->lockdep_map); 2464 2465 mutex_lock(&wq->flush_mutex); 2466 2467 /* 2468 * Start-to-wait phase 2469 */ 2470 next_color = work_next_color(wq->work_color); 2471 2472 if (next_color != wq->flush_color) { 2473 /* 2474 * Color space is not full. The current work_color 2475 * becomes our flush_color and work_color is advanced 2476 * by one. 2477 */ 2478 BUG_ON(!list_empty(&wq->flusher_overflow)); 2479 this_flusher.flush_color = wq->work_color; 2480 wq->work_color = next_color; 2481 2482 if (!wq->first_flusher) { 2483 /* no flush in progress, become the first flusher */ 2484 BUG_ON(wq->flush_color != this_flusher.flush_color); 2485 2486 wq->first_flusher = &this_flusher; 2487 2488 if (!flush_workqueue_prep_cwqs(wq, wq->flush_color, 2489 wq->work_color)) { 2490 /* nothing to flush, done */ 2491 wq->flush_color = next_color; 2492 wq->first_flusher = NULL; 2493 goto out_unlock; 2494 } 2495 } else { 2496 /* wait in queue */ 2497 BUG_ON(wq->flush_color == this_flusher.flush_color); 2498 list_add_tail(&this_flusher.list, &wq->flusher_queue); 2499 flush_workqueue_prep_cwqs(wq, -1, wq->work_color); 2500 } 2501 } else { 2502 /* 2503 * Oops, color space is full, wait on overflow queue. 2504 * The next flush completion will assign us 2505 * flush_color and transfer to flusher_queue. 2506 */ 2507 list_add_tail(&this_flusher.list, &wq->flusher_overflow); 2508 } 2509 2510 mutex_unlock(&wq->flush_mutex); 2511 2512 wait_for_completion(&this_flusher.done); 2513 2514 /* 2515 * Wake-up-and-cascade phase 2516 * 2517 * First flushers are responsible for cascading flushes and 2518 * handling overflow. Non-first flushers can simply return. 2519 */ 2520 if (wq->first_flusher != &this_flusher) 2521 return; 2522 2523 mutex_lock(&wq->flush_mutex); 2524 2525 /* we might have raced, check again with mutex held */ 2526 if (wq->first_flusher != &this_flusher) 2527 goto out_unlock; 2528 2529 wq->first_flusher = NULL; 2530 2531 BUG_ON(!list_empty(&this_flusher.list)); 2532 BUG_ON(wq->flush_color != this_flusher.flush_color); 2533 2534 while (true) { 2535 struct wq_flusher *next, *tmp; 2536 2537 /* complete all the flushers sharing the current flush color */ 2538 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) { 2539 if (next->flush_color != wq->flush_color) 2540 break; 2541 list_del_init(&next->list); 2542 complete(&next->done); 2543 } 2544 2545 BUG_ON(!list_empty(&wq->flusher_overflow) && 2546 wq->flush_color != work_next_color(wq->work_color)); 2547 2548 /* this flush_color is finished, advance by one */ 2549 wq->flush_color = work_next_color(wq->flush_color); 2550 2551 /* one color has been freed, handle overflow queue */ 2552 if (!list_empty(&wq->flusher_overflow)) { 2553 /* 2554 * Assign the same color to all overflowed 2555 * flushers, advance work_color and append to 2556 * flusher_queue. This is the start-to-wait 2557 * phase for these overflowed flushers. 2558 */ 2559 list_for_each_entry(tmp, &wq->flusher_overflow, list) 2560 tmp->flush_color = wq->work_color; 2561 2562 wq->work_color = work_next_color(wq->work_color); 2563 2564 list_splice_tail_init(&wq->flusher_overflow, 2565 &wq->flusher_queue); 2566 flush_workqueue_prep_cwqs(wq, -1, wq->work_color); 2567 } 2568 2569 if (list_empty(&wq->flusher_queue)) { 2570 BUG_ON(wq->flush_color != wq->work_color); 2571 break; 2572 } 2573 2574 /* 2575 * Need to flush more colors. Make the next flusher 2576 * the new first flusher and arm cwqs. 2577 */ 2578 BUG_ON(wq->flush_color == wq->work_color); 2579 BUG_ON(wq->flush_color != next->flush_color); 2580 2581 list_del_init(&next->list); 2582 wq->first_flusher = next; 2583 2584 if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1)) 2585 break; 2586 2587 /* 2588 * Meh... this color is already done, clear first 2589 * flusher and repeat cascading. 2590 */ 2591 wq->first_flusher = NULL; 2592 } 2593 2594 out_unlock: 2595 mutex_unlock(&wq->flush_mutex); 2596 } 2597 EXPORT_SYMBOL_GPL(flush_workqueue); 2598 2599 /** 2600 * drain_workqueue - drain a workqueue 2601 * @wq: workqueue to drain 2602 * 2603 * Wait until the workqueue becomes empty. While draining is in progress, 2604 * only chain queueing is allowed. IOW, only currently pending or running 2605 * work items on @wq can queue further work items on it. @wq is flushed 2606 * repeatedly until it becomes empty. The number of flushing is detemined 2607 * by the depth of chaining and should be relatively short. Whine if it 2608 * takes too long. 2609 */ 2610 void drain_workqueue(struct workqueue_struct *wq) 2611 { 2612 unsigned int flush_cnt = 0; 2613 unsigned int cpu; 2614 2615 /* 2616 * __queue_work() needs to test whether there are drainers, is much 2617 * hotter than drain_workqueue() and already looks at @wq->flags. 2618 * Use WQ_DRAINING so that queue doesn't have to check nr_drainers. 2619 */ 2620 spin_lock(&workqueue_lock); 2621 if (!wq->nr_drainers++) 2622 wq->flags |= WQ_DRAINING; 2623 spin_unlock(&workqueue_lock); 2624 reflush: 2625 flush_workqueue(wq); 2626 2627 for_each_cwq_cpu(cpu, wq) { 2628 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 2629 bool drained; 2630 2631 spin_lock_irq(&cwq->pool->gcwq->lock); 2632 drained = !cwq->nr_active && list_empty(&cwq->delayed_works); 2633 spin_unlock_irq(&cwq->pool->gcwq->lock); 2634 2635 if (drained) 2636 continue; 2637 2638 if (++flush_cnt == 10 || 2639 (flush_cnt % 100 == 0 && flush_cnt <= 1000)) 2640 pr_warning("workqueue %s: flush on destruction isn't complete after %u tries\n", 2641 wq->name, flush_cnt); 2642 goto reflush; 2643 } 2644 2645 spin_lock(&workqueue_lock); 2646 if (!--wq->nr_drainers) 2647 wq->flags &= ~WQ_DRAINING; 2648 spin_unlock(&workqueue_lock); 2649 } 2650 EXPORT_SYMBOL_GPL(drain_workqueue); 2651 2652 static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr, 2653 bool wait_executing) 2654 { 2655 struct worker *worker = NULL; 2656 struct global_cwq *gcwq; 2657 struct cpu_workqueue_struct *cwq; 2658 2659 might_sleep(); 2660 gcwq = get_work_gcwq(work); 2661 if (!gcwq) 2662 return false; 2663 2664 spin_lock_irq(&gcwq->lock); 2665 if (!list_empty(&work->entry)) { 2666 /* 2667 * See the comment near try_to_grab_pending()->smp_rmb(). 2668 * If it was re-queued to a different gcwq under us, we 2669 * are not going to wait. 2670 */ 2671 smp_rmb(); 2672 cwq = get_work_cwq(work); 2673 if (unlikely(!cwq || gcwq != cwq->pool->gcwq)) 2674 goto already_gone; 2675 } else if (wait_executing) { 2676 worker = find_worker_executing_work(gcwq, work); 2677 if (!worker) 2678 goto already_gone; 2679 cwq = worker->current_cwq; 2680 } else 2681 goto already_gone; 2682 2683 insert_wq_barrier(cwq, barr, work, worker); 2684 spin_unlock_irq(&gcwq->lock); 2685 2686 /* 2687 * If @max_active is 1 or rescuer is in use, flushing another work 2688 * item on the same workqueue may lead to deadlock. Make sure the 2689 * flusher is not running on the same workqueue by verifying write 2690 * access. 2691 */ 2692 if (cwq->wq->saved_max_active == 1 || cwq->wq->flags & WQ_RESCUER) 2693 lock_map_acquire(&cwq->wq->lockdep_map); 2694 else 2695 lock_map_acquire_read(&cwq->wq->lockdep_map); 2696 lock_map_release(&cwq->wq->lockdep_map); 2697 2698 return true; 2699 already_gone: 2700 spin_unlock_irq(&gcwq->lock); 2701 return false; 2702 } 2703 2704 /** 2705 * flush_work - wait for a work to finish executing the last queueing instance 2706 * @work: the work to flush 2707 * 2708 * Wait until @work has finished execution. This function considers 2709 * only the last queueing instance of @work. If @work has been 2710 * enqueued across different CPUs on a non-reentrant workqueue or on 2711 * multiple workqueues, @work might still be executing on return on 2712 * some of the CPUs from earlier queueing. 2713 * 2714 * If @work was queued only on a non-reentrant, ordered or unbound 2715 * workqueue, @work is guaranteed to be idle on return if it hasn't 2716 * been requeued since flush started. 2717 * 2718 * RETURNS: 2719 * %true if flush_work() waited for the work to finish execution, 2720 * %false if it was already idle. 2721 */ 2722 bool flush_work(struct work_struct *work) 2723 { 2724 struct wq_barrier barr; 2725 2726 lock_map_acquire(&work->lockdep_map); 2727 lock_map_release(&work->lockdep_map); 2728 2729 if (start_flush_work(work, &barr, true)) { 2730 wait_for_completion(&barr.done); 2731 destroy_work_on_stack(&barr.work); 2732 return true; 2733 } else 2734 return false; 2735 } 2736 EXPORT_SYMBOL_GPL(flush_work); 2737 2738 static bool wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work) 2739 { 2740 struct wq_barrier barr; 2741 struct worker *worker; 2742 2743 spin_lock_irq(&gcwq->lock); 2744 2745 worker = find_worker_executing_work(gcwq, work); 2746 if (unlikely(worker)) 2747 insert_wq_barrier(worker->current_cwq, &barr, work, worker); 2748 2749 spin_unlock_irq(&gcwq->lock); 2750 2751 if (unlikely(worker)) { 2752 wait_for_completion(&barr.done); 2753 destroy_work_on_stack(&barr.work); 2754 return true; 2755 } else 2756 return false; 2757 } 2758 2759 static bool wait_on_work(struct work_struct *work) 2760 { 2761 bool ret = false; 2762 int cpu; 2763 2764 might_sleep(); 2765 2766 lock_map_acquire(&work->lockdep_map); 2767 lock_map_release(&work->lockdep_map); 2768 2769 for_each_gcwq_cpu(cpu) 2770 ret |= wait_on_cpu_work(get_gcwq(cpu), work); 2771 return ret; 2772 } 2773 2774 /** 2775 * flush_work_sync - wait until a work has finished execution 2776 * @work: the work to flush 2777 * 2778 * Wait until @work has finished execution. On return, it's 2779 * guaranteed that all queueing instances of @work which happened 2780 * before this function is called are finished. In other words, if 2781 * @work hasn't been requeued since this function was called, @work is 2782 * guaranteed to be idle on return. 2783 * 2784 * RETURNS: 2785 * %true if flush_work_sync() waited for the work to finish execution, 2786 * %false if it was already idle. 2787 */ 2788 bool flush_work_sync(struct work_struct *work) 2789 { 2790 struct wq_barrier barr; 2791 bool pending, waited; 2792 2793 /* we'll wait for executions separately, queue barr only if pending */ 2794 pending = start_flush_work(work, &barr, false); 2795 2796 /* wait for executions to finish */ 2797 waited = wait_on_work(work); 2798 2799 /* wait for the pending one */ 2800 if (pending) { 2801 wait_for_completion(&barr.done); 2802 destroy_work_on_stack(&barr.work); 2803 } 2804 2805 return pending || waited; 2806 } 2807 EXPORT_SYMBOL_GPL(flush_work_sync); 2808 2809 /* 2810 * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit, 2811 * so this work can't be re-armed in any way. 2812 */ 2813 static int try_to_grab_pending(struct work_struct *work) 2814 { 2815 struct global_cwq *gcwq; 2816 int ret = -1; 2817 2818 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) 2819 return 0; 2820 2821 /* 2822 * The queueing is in progress, or it is already queued. Try to 2823 * steal it from ->worklist without clearing WORK_STRUCT_PENDING. 2824 */ 2825 gcwq = get_work_gcwq(work); 2826 if (!gcwq) 2827 return ret; 2828 2829 spin_lock_irq(&gcwq->lock); 2830 if (!list_empty(&work->entry)) { 2831 /* 2832 * This work is queued, but perhaps we locked the wrong gcwq. 2833 * In that case we must see the new value after rmb(), see 2834 * insert_work()->wmb(). 2835 */ 2836 smp_rmb(); 2837 if (gcwq == get_work_gcwq(work)) { 2838 debug_work_deactivate(work); 2839 list_del_init(&work->entry); 2840 cwq_dec_nr_in_flight(get_work_cwq(work), 2841 get_work_color(work), 2842 *work_data_bits(work) & WORK_STRUCT_DELAYED); 2843 ret = 1; 2844 } 2845 } 2846 spin_unlock_irq(&gcwq->lock); 2847 2848 return ret; 2849 } 2850 2851 static bool __cancel_work_timer(struct work_struct *work, 2852 struct timer_list* timer) 2853 { 2854 int ret; 2855 2856 do { 2857 ret = (timer && likely(del_timer(timer))); 2858 if (!ret) 2859 ret = try_to_grab_pending(work); 2860 wait_on_work(work); 2861 } while (unlikely(ret < 0)); 2862 2863 clear_work_data(work); 2864 return ret; 2865 } 2866 2867 /** 2868 * cancel_work_sync - cancel a work and wait for it to finish 2869 * @work: the work to cancel 2870 * 2871 * Cancel @work and wait for its execution to finish. This function 2872 * can be used even if the work re-queues itself or migrates to 2873 * another workqueue. On return from this function, @work is 2874 * guaranteed to be not pending or executing on any CPU. 2875 * 2876 * cancel_work_sync(&delayed_work->work) must not be used for 2877 * delayed_work's. Use cancel_delayed_work_sync() instead. 2878 * 2879 * The caller must ensure that the workqueue on which @work was last 2880 * queued can't be destroyed before this function returns. 2881 * 2882 * RETURNS: 2883 * %true if @work was pending, %false otherwise. 2884 */ 2885 bool cancel_work_sync(struct work_struct *work) 2886 { 2887 return __cancel_work_timer(work, NULL); 2888 } 2889 EXPORT_SYMBOL_GPL(cancel_work_sync); 2890 2891 /** 2892 * flush_delayed_work - wait for a dwork to finish executing the last queueing 2893 * @dwork: the delayed work to flush 2894 * 2895 * Delayed timer is cancelled and the pending work is queued for 2896 * immediate execution. Like flush_work(), this function only 2897 * considers the last queueing instance of @dwork. 2898 * 2899 * RETURNS: 2900 * %true if flush_work() waited for the work to finish execution, 2901 * %false if it was already idle. 2902 */ 2903 bool flush_delayed_work(struct delayed_work *dwork) 2904 { 2905 if (del_timer_sync(&dwork->timer)) 2906 __queue_work(raw_smp_processor_id(), 2907 get_work_cwq(&dwork->work)->wq, &dwork->work); 2908 return flush_work(&dwork->work); 2909 } 2910 EXPORT_SYMBOL(flush_delayed_work); 2911 2912 /** 2913 * flush_delayed_work_sync - wait for a dwork to finish 2914 * @dwork: the delayed work to flush 2915 * 2916 * Delayed timer is cancelled and the pending work is queued for 2917 * execution immediately. Other than timer handling, its behavior 2918 * is identical to flush_work_sync(). 2919 * 2920 * RETURNS: 2921 * %true if flush_work_sync() waited for the work to finish execution, 2922 * %false if it was already idle. 2923 */ 2924 bool flush_delayed_work_sync(struct delayed_work *dwork) 2925 { 2926 if (del_timer_sync(&dwork->timer)) 2927 __queue_work(raw_smp_processor_id(), 2928 get_work_cwq(&dwork->work)->wq, &dwork->work); 2929 return flush_work_sync(&dwork->work); 2930 } 2931 EXPORT_SYMBOL(flush_delayed_work_sync); 2932 2933 /** 2934 * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish 2935 * @dwork: the delayed work cancel 2936 * 2937 * This is cancel_work_sync() for delayed works. 2938 * 2939 * RETURNS: 2940 * %true if @dwork was pending, %false otherwise. 2941 */ 2942 bool cancel_delayed_work_sync(struct delayed_work *dwork) 2943 { 2944 return __cancel_work_timer(&dwork->work, &dwork->timer); 2945 } 2946 EXPORT_SYMBOL(cancel_delayed_work_sync); 2947 2948 /** 2949 * schedule_work - put work task in global workqueue 2950 * @work: job to be done 2951 * 2952 * Returns zero if @work was already on the kernel-global workqueue and 2953 * non-zero otherwise. 2954 * 2955 * This puts a job in the kernel-global workqueue if it was not already 2956 * queued and leaves it in the same position on the kernel-global 2957 * workqueue otherwise. 2958 */ 2959 int schedule_work(struct work_struct *work) 2960 { 2961 return queue_work(system_wq, work); 2962 } 2963 EXPORT_SYMBOL(schedule_work); 2964 2965 /* 2966 * schedule_work_on - put work task on a specific cpu 2967 * @cpu: cpu to put the work task on 2968 * @work: job to be done 2969 * 2970 * This puts a job on a specific cpu 2971 */ 2972 int schedule_work_on(int cpu, struct work_struct *work) 2973 { 2974 return queue_work_on(cpu, system_wq, work); 2975 } 2976 EXPORT_SYMBOL(schedule_work_on); 2977 2978 /** 2979 * schedule_delayed_work - put work task in global workqueue after delay 2980 * @dwork: job to be done 2981 * @delay: number of jiffies to wait or 0 for immediate execution 2982 * 2983 * After waiting for a given time this puts a job in the kernel-global 2984 * workqueue. 2985 */ 2986 int schedule_delayed_work(struct delayed_work *dwork, 2987 unsigned long delay) 2988 { 2989 return queue_delayed_work(system_wq, dwork, delay); 2990 } 2991 EXPORT_SYMBOL(schedule_delayed_work); 2992 2993 /** 2994 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 2995 * @cpu: cpu to use 2996 * @dwork: job to be done 2997 * @delay: number of jiffies to wait 2998 * 2999 * After waiting for a given time this puts a job in the kernel-global 3000 * workqueue on the specified CPU. 3001 */ 3002 int schedule_delayed_work_on(int cpu, 3003 struct delayed_work *dwork, unsigned long delay) 3004 { 3005 return queue_delayed_work_on(cpu, system_wq, dwork, delay); 3006 } 3007 EXPORT_SYMBOL(schedule_delayed_work_on); 3008 3009 /** 3010 * schedule_on_each_cpu - execute a function synchronously on each online CPU 3011 * @func: the function to call 3012 * 3013 * schedule_on_each_cpu() executes @func on each online CPU using the 3014 * system workqueue and blocks until all CPUs have completed. 3015 * schedule_on_each_cpu() is very slow. 3016 * 3017 * RETURNS: 3018 * 0 on success, -errno on failure. 3019 */ 3020 int schedule_on_each_cpu(work_func_t func) 3021 { 3022 int cpu; 3023 struct work_struct __percpu *works; 3024 3025 works = alloc_percpu(struct work_struct); 3026 if (!works) 3027 return -ENOMEM; 3028 3029 get_online_cpus(); 3030 3031 for_each_online_cpu(cpu) { 3032 struct work_struct *work = per_cpu_ptr(works, cpu); 3033 3034 INIT_WORK(work, func); 3035 schedule_work_on(cpu, work); 3036 } 3037 3038 for_each_online_cpu(cpu) 3039 flush_work(per_cpu_ptr(works, cpu)); 3040 3041 put_online_cpus(); 3042 free_percpu(works); 3043 return 0; 3044 } 3045 3046 /** 3047 * flush_scheduled_work - ensure that any scheduled work has run to completion. 3048 * 3049 * Forces execution of the kernel-global workqueue and blocks until its 3050 * completion. 3051 * 3052 * Think twice before calling this function! It's very easy to get into 3053 * trouble if you don't take great care. Either of the following situations 3054 * will lead to deadlock: 3055 * 3056 * One of the work items currently on the workqueue needs to acquire 3057 * a lock held by your code or its caller. 3058 * 3059 * Your code is running in the context of a work routine. 3060 * 3061 * They will be detected by lockdep when they occur, but the first might not 3062 * occur very often. It depends on what work items are on the workqueue and 3063 * what locks they need, which you have no control over. 3064 * 3065 * In most situations flushing the entire workqueue is overkill; you merely 3066 * need to know that a particular work item isn't queued and isn't running. 3067 * In such cases you should use cancel_delayed_work_sync() or 3068 * cancel_work_sync() instead. 3069 */ 3070 void flush_scheduled_work(void) 3071 { 3072 flush_workqueue(system_wq); 3073 } 3074 EXPORT_SYMBOL(flush_scheduled_work); 3075 3076 /** 3077 * execute_in_process_context - reliably execute the routine with user context 3078 * @fn: the function to execute 3079 * @ew: guaranteed storage for the execute work structure (must 3080 * be available when the work executes) 3081 * 3082 * Executes the function immediately if process context is available, 3083 * otherwise schedules the function for delayed execution. 3084 * 3085 * Returns: 0 - function was executed 3086 * 1 - function was scheduled for execution 3087 */ 3088 int execute_in_process_context(work_func_t fn, struct execute_work *ew) 3089 { 3090 if (!in_interrupt()) { 3091 fn(&ew->work); 3092 return 0; 3093 } 3094 3095 INIT_WORK(&ew->work, fn); 3096 schedule_work(&ew->work); 3097 3098 return 1; 3099 } 3100 EXPORT_SYMBOL_GPL(execute_in_process_context); 3101 3102 int keventd_up(void) 3103 { 3104 return system_wq != NULL; 3105 } 3106 3107 static int alloc_cwqs(struct workqueue_struct *wq) 3108 { 3109 /* 3110 * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS. 3111 * Make sure that the alignment isn't lower than that of 3112 * unsigned long long. 3113 */ 3114 const size_t size = sizeof(struct cpu_workqueue_struct); 3115 const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS, 3116 __alignof__(unsigned long long)); 3117 3118 if (!(wq->flags & WQ_UNBOUND)) 3119 wq->cpu_wq.pcpu = __alloc_percpu(size, align); 3120 else { 3121 void *ptr; 3122 3123 /* 3124 * Allocate enough room to align cwq and put an extra 3125 * pointer at the end pointing back to the originally 3126 * allocated pointer which will be used for free. 3127 */ 3128 ptr = kzalloc(size + align + sizeof(void *), GFP_KERNEL); 3129 if (ptr) { 3130 wq->cpu_wq.single = PTR_ALIGN(ptr, align); 3131 *(void **)(wq->cpu_wq.single + 1) = ptr; 3132 } 3133 } 3134 3135 /* just in case, make sure it's actually aligned */ 3136 BUG_ON(!IS_ALIGNED(wq->cpu_wq.v, align)); 3137 return wq->cpu_wq.v ? 0 : -ENOMEM; 3138 } 3139 3140 static void free_cwqs(struct workqueue_struct *wq) 3141 { 3142 if (!(wq->flags & WQ_UNBOUND)) 3143 free_percpu(wq->cpu_wq.pcpu); 3144 else if (wq->cpu_wq.single) { 3145 /* the pointer to free is stored right after the cwq */ 3146 kfree(*(void **)(wq->cpu_wq.single + 1)); 3147 } 3148 } 3149 3150 static int wq_clamp_max_active(int max_active, unsigned int flags, 3151 const char *name) 3152 { 3153 int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE; 3154 3155 if (max_active < 1 || max_active > lim) 3156 printk(KERN_WARNING "workqueue: max_active %d requested for %s " 3157 "is out of range, clamping between %d and %d\n", 3158 max_active, name, 1, lim); 3159 3160 return clamp_val(max_active, 1, lim); 3161 } 3162 3163 struct workqueue_struct *__alloc_workqueue_key(const char *fmt, 3164 unsigned int flags, 3165 int max_active, 3166 struct lock_class_key *key, 3167 const char *lock_name, ...) 3168 { 3169 va_list args, args1; 3170 struct workqueue_struct *wq; 3171 unsigned int cpu; 3172 size_t namelen; 3173 3174 /* determine namelen, allocate wq and format name */ 3175 va_start(args, lock_name); 3176 va_copy(args1, args); 3177 namelen = vsnprintf(NULL, 0, fmt, args) + 1; 3178 3179 wq = kzalloc(sizeof(*wq) + namelen, GFP_KERNEL); 3180 if (!wq) 3181 goto err; 3182 3183 vsnprintf(wq->name, namelen, fmt, args1); 3184 va_end(args); 3185 va_end(args1); 3186 3187 /* 3188 * Workqueues which may be used during memory reclaim should 3189 * have a rescuer to guarantee forward progress. 3190 */ 3191 if (flags & WQ_MEM_RECLAIM) 3192 flags |= WQ_RESCUER; 3193 3194 max_active = max_active ?: WQ_DFL_ACTIVE; 3195 max_active = wq_clamp_max_active(max_active, flags, wq->name); 3196 3197 /* init wq */ 3198 wq->flags = flags; 3199 wq->saved_max_active = max_active; 3200 mutex_init(&wq->flush_mutex); 3201 atomic_set(&wq->nr_cwqs_to_flush, 0); 3202 INIT_LIST_HEAD(&wq->flusher_queue); 3203 INIT_LIST_HEAD(&wq->flusher_overflow); 3204 3205 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 3206 INIT_LIST_HEAD(&wq->list); 3207 3208 if (alloc_cwqs(wq) < 0) 3209 goto err; 3210 3211 for_each_cwq_cpu(cpu, wq) { 3212 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3213 struct global_cwq *gcwq = get_gcwq(cpu); 3214 int pool_idx = (bool)(flags & WQ_HIGHPRI); 3215 3216 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK); 3217 cwq->pool = &gcwq->pools[pool_idx]; 3218 cwq->wq = wq; 3219 cwq->flush_color = -1; 3220 cwq->max_active = max_active; 3221 INIT_LIST_HEAD(&cwq->delayed_works); 3222 } 3223 3224 if (flags & WQ_RESCUER) { 3225 struct worker *rescuer; 3226 3227 if (!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL)) 3228 goto err; 3229 3230 wq->rescuer = rescuer = alloc_worker(); 3231 if (!rescuer) 3232 goto err; 3233 3234 rescuer->task = kthread_create(rescuer_thread, wq, "%s", 3235 wq->name); 3236 if (IS_ERR(rescuer->task)) 3237 goto err; 3238 3239 rescuer->task->flags |= PF_THREAD_BOUND; 3240 wake_up_process(rescuer->task); 3241 } 3242 3243 /* 3244 * workqueue_lock protects global freeze state and workqueues 3245 * list. Grab it, set max_active accordingly and add the new 3246 * workqueue to workqueues list. 3247 */ 3248 spin_lock(&workqueue_lock); 3249 3250 if (workqueue_freezing && wq->flags & WQ_FREEZABLE) 3251 for_each_cwq_cpu(cpu, wq) 3252 get_cwq(cpu, wq)->max_active = 0; 3253 3254 list_add(&wq->list, &workqueues); 3255 3256 spin_unlock(&workqueue_lock); 3257 3258 return wq; 3259 err: 3260 if (wq) { 3261 free_cwqs(wq); 3262 free_mayday_mask(wq->mayday_mask); 3263 kfree(wq->rescuer); 3264 kfree(wq); 3265 } 3266 return NULL; 3267 } 3268 EXPORT_SYMBOL_GPL(__alloc_workqueue_key); 3269 3270 /** 3271 * destroy_workqueue - safely terminate a workqueue 3272 * @wq: target workqueue 3273 * 3274 * Safely destroy a workqueue. All work currently pending will be done first. 3275 */ 3276 void destroy_workqueue(struct workqueue_struct *wq) 3277 { 3278 unsigned int cpu; 3279 3280 /* drain it before proceeding with destruction */ 3281 drain_workqueue(wq); 3282 3283 /* 3284 * wq list is used to freeze wq, remove from list after 3285 * flushing is complete in case freeze races us. 3286 */ 3287 spin_lock(&workqueue_lock); 3288 list_del(&wq->list); 3289 spin_unlock(&workqueue_lock); 3290 3291 /* sanity check */ 3292 for_each_cwq_cpu(cpu, wq) { 3293 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3294 int i; 3295 3296 for (i = 0; i < WORK_NR_COLORS; i++) 3297 BUG_ON(cwq->nr_in_flight[i]); 3298 BUG_ON(cwq->nr_active); 3299 BUG_ON(!list_empty(&cwq->delayed_works)); 3300 } 3301 3302 if (wq->flags & WQ_RESCUER) { 3303 kthread_stop(wq->rescuer->task); 3304 free_mayday_mask(wq->mayday_mask); 3305 kfree(wq->rescuer); 3306 } 3307 3308 free_cwqs(wq); 3309 kfree(wq); 3310 } 3311 EXPORT_SYMBOL_GPL(destroy_workqueue); 3312 3313 /** 3314 * workqueue_set_max_active - adjust max_active of a workqueue 3315 * @wq: target workqueue 3316 * @max_active: new max_active value. 3317 * 3318 * Set max_active of @wq to @max_active. 3319 * 3320 * CONTEXT: 3321 * Don't call from IRQ context. 3322 */ 3323 void workqueue_set_max_active(struct workqueue_struct *wq, int max_active) 3324 { 3325 unsigned int cpu; 3326 3327 max_active = wq_clamp_max_active(max_active, wq->flags, wq->name); 3328 3329 spin_lock(&workqueue_lock); 3330 3331 wq->saved_max_active = max_active; 3332 3333 for_each_cwq_cpu(cpu, wq) { 3334 struct global_cwq *gcwq = get_gcwq(cpu); 3335 3336 spin_lock_irq(&gcwq->lock); 3337 3338 if (!(wq->flags & WQ_FREEZABLE) || 3339 !(gcwq->flags & GCWQ_FREEZING)) 3340 get_cwq(gcwq->cpu, wq)->max_active = max_active; 3341 3342 spin_unlock_irq(&gcwq->lock); 3343 } 3344 3345 spin_unlock(&workqueue_lock); 3346 } 3347 EXPORT_SYMBOL_GPL(workqueue_set_max_active); 3348 3349 /** 3350 * workqueue_congested - test whether a workqueue is congested 3351 * @cpu: CPU in question 3352 * @wq: target workqueue 3353 * 3354 * Test whether @wq's cpu workqueue for @cpu is congested. There is 3355 * no synchronization around this function and the test result is 3356 * unreliable and only useful as advisory hints or for debugging. 3357 * 3358 * RETURNS: 3359 * %true if congested, %false otherwise. 3360 */ 3361 bool workqueue_congested(unsigned int cpu, struct workqueue_struct *wq) 3362 { 3363 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3364 3365 return !list_empty(&cwq->delayed_works); 3366 } 3367 EXPORT_SYMBOL_GPL(workqueue_congested); 3368 3369 /** 3370 * work_cpu - return the last known associated cpu for @work 3371 * @work: the work of interest 3372 * 3373 * RETURNS: 3374 * CPU number if @work was ever queued. WORK_CPU_NONE otherwise. 3375 */ 3376 unsigned int work_cpu(struct work_struct *work) 3377 { 3378 struct global_cwq *gcwq = get_work_gcwq(work); 3379 3380 return gcwq ? gcwq->cpu : WORK_CPU_NONE; 3381 } 3382 EXPORT_SYMBOL_GPL(work_cpu); 3383 3384 /** 3385 * work_busy - test whether a work is currently pending or running 3386 * @work: the work to be tested 3387 * 3388 * Test whether @work is currently pending or running. There is no 3389 * synchronization around this function and the test result is 3390 * unreliable and only useful as advisory hints or for debugging. 3391 * Especially for reentrant wqs, the pending state might hide the 3392 * running state. 3393 * 3394 * RETURNS: 3395 * OR'd bitmask of WORK_BUSY_* bits. 3396 */ 3397 unsigned int work_busy(struct work_struct *work) 3398 { 3399 struct global_cwq *gcwq = get_work_gcwq(work); 3400 unsigned long flags; 3401 unsigned int ret = 0; 3402 3403 if (!gcwq) 3404 return false; 3405 3406 spin_lock_irqsave(&gcwq->lock, flags); 3407 3408 if (work_pending(work)) 3409 ret |= WORK_BUSY_PENDING; 3410 if (find_worker_executing_work(gcwq, work)) 3411 ret |= WORK_BUSY_RUNNING; 3412 3413 spin_unlock_irqrestore(&gcwq->lock, flags); 3414 3415 return ret; 3416 } 3417 EXPORT_SYMBOL_GPL(work_busy); 3418 3419 /* 3420 * CPU hotplug. 3421 * 3422 * There are two challenges in supporting CPU hotplug. Firstly, there 3423 * are a lot of assumptions on strong associations among work, cwq and 3424 * gcwq which make migrating pending and scheduled works very 3425 * difficult to implement without impacting hot paths. Secondly, 3426 * gcwqs serve mix of short, long and very long running works making 3427 * blocked draining impractical. 3428 * 3429 * This is solved by allowing a gcwq to be disassociated from the CPU 3430 * running as an unbound one and allowing it to be reattached later if the 3431 * cpu comes back online. 3432 */ 3433 3434 /* claim manager positions of all pools */ 3435 static void gcwq_claim_management_and_lock(struct global_cwq *gcwq) 3436 { 3437 struct worker_pool *pool; 3438 3439 for_each_worker_pool(pool, gcwq) 3440 mutex_lock_nested(&pool->manager_mutex, pool - gcwq->pools); 3441 spin_lock_irq(&gcwq->lock); 3442 } 3443 3444 /* release manager positions */ 3445 static void gcwq_release_management_and_unlock(struct global_cwq *gcwq) 3446 { 3447 struct worker_pool *pool; 3448 3449 spin_unlock_irq(&gcwq->lock); 3450 for_each_worker_pool(pool, gcwq) 3451 mutex_unlock(&pool->manager_mutex); 3452 } 3453 3454 static void gcwq_unbind_fn(struct work_struct *work) 3455 { 3456 struct global_cwq *gcwq = get_gcwq(smp_processor_id()); 3457 struct worker_pool *pool; 3458 struct worker *worker; 3459 struct hlist_node *pos; 3460 int i; 3461 3462 BUG_ON(gcwq->cpu != smp_processor_id()); 3463 3464 gcwq_claim_management_and_lock(gcwq); 3465 3466 /* 3467 * We've claimed all manager positions. Make all workers unbound 3468 * and set DISASSOCIATED. Before this, all workers except for the 3469 * ones which are still executing works from before the last CPU 3470 * down must be on the cpu. After this, they may become diasporas. 3471 */ 3472 for_each_worker_pool(pool, gcwq) 3473 list_for_each_entry(worker, &pool->idle_list, entry) 3474 worker->flags |= WORKER_UNBOUND; 3475 3476 for_each_busy_worker(worker, i, pos, gcwq) 3477 worker->flags |= WORKER_UNBOUND; 3478 3479 gcwq->flags |= GCWQ_DISASSOCIATED; 3480 3481 gcwq_release_management_and_unlock(gcwq); 3482 3483 /* 3484 * Call schedule() so that we cross rq->lock and thus can guarantee 3485 * sched callbacks see the %WORKER_UNBOUND flag. This is necessary 3486 * as scheduler callbacks may be invoked from other cpus. 3487 */ 3488 schedule(); 3489 3490 /* 3491 * Sched callbacks are disabled now. Zap nr_running. After this, 3492 * nr_running stays zero and need_more_worker() and keep_working() 3493 * are always true as long as the worklist is not empty. @gcwq now 3494 * behaves as unbound (in terms of concurrency management) gcwq 3495 * which is served by workers tied to the CPU. 3496 * 3497 * On return from this function, the current worker would trigger 3498 * unbound chain execution of pending work items if other workers 3499 * didn't already. 3500 */ 3501 for_each_worker_pool(pool, gcwq) 3502 atomic_set(get_pool_nr_running(pool), 0); 3503 } 3504 3505 /* 3506 * Workqueues should be brought up before normal priority CPU notifiers. 3507 * This will be registered high priority CPU notifier. 3508 */ 3509 static int __devinit workqueue_cpu_up_callback(struct notifier_block *nfb, 3510 unsigned long action, 3511 void *hcpu) 3512 { 3513 unsigned int cpu = (unsigned long)hcpu; 3514 struct global_cwq *gcwq = get_gcwq(cpu); 3515 struct worker_pool *pool; 3516 3517 switch (action & ~CPU_TASKS_FROZEN) { 3518 case CPU_UP_PREPARE: 3519 for_each_worker_pool(pool, gcwq) { 3520 struct worker *worker; 3521 3522 if (pool->nr_workers) 3523 continue; 3524 3525 worker = create_worker(pool); 3526 if (!worker) 3527 return NOTIFY_BAD; 3528 3529 spin_lock_irq(&gcwq->lock); 3530 start_worker(worker); 3531 spin_unlock_irq(&gcwq->lock); 3532 } 3533 break; 3534 3535 case CPU_DOWN_FAILED: 3536 case CPU_ONLINE: 3537 gcwq_claim_management_and_lock(gcwq); 3538 gcwq->flags &= ~GCWQ_DISASSOCIATED; 3539 rebind_workers(gcwq); 3540 gcwq_release_management_and_unlock(gcwq); 3541 break; 3542 } 3543 return NOTIFY_OK; 3544 } 3545 3546 /* 3547 * Workqueues should be brought down after normal priority CPU notifiers. 3548 * This will be registered as low priority CPU notifier. 3549 */ 3550 static int __devinit workqueue_cpu_down_callback(struct notifier_block *nfb, 3551 unsigned long action, 3552 void *hcpu) 3553 { 3554 unsigned int cpu = (unsigned long)hcpu; 3555 struct work_struct unbind_work; 3556 3557 switch (action & ~CPU_TASKS_FROZEN) { 3558 case CPU_DOWN_PREPARE: 3559 /* unbinding should happen on the local CPU */ 3560 INIT_WORK_ONSTACK(&unbind_work, gcwq_unbind_fn); 3561 schedule_work_on(cpu, &unbind_work); 3562 flush_work(&unbind_work); 3563 break; 3564 } 3565 return NOTIFY_OK; 3566 } 3567 3568 #ifdef CONFIG_SMP 3569 3570 struct work_for_cpu { 3571 struct completion completion; 3572 long (*fn)(void *); 3573 void *arg; 3574 long ret; 3575 }; 3576 3577 static int do_work_for_cpu(void *_wfc) 3578 { 3579 struct work_for_cpu *wfc = _wfc; 3580 wfc->ret = wfc->fn(wfc->arg); 3581 complete(&wfc->completion); 3582 return 0; 3583 } 3584 3585 /** 3586 * work_on_cpu - run a function in user context on a particular cpu 3587 * @cpu: the cpu to run on 3588 * @fn: the function to run 3589 * @arg: the function arg 3590 * 3591 * This will return the value @fn returns. 3592 * It is up to the caller to ensure that the cpu doesn't go offline. 3593 * The caller must not hold any locks which would prevent @fn from completing. 3594 */ 3595 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg) 3596 { 3597 struct task_struct *sub_thread; 3598 struct work_for_cpu wfc = { 3599 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion), 3600 .fn = fn, 3601 .arg = arg, 3602 }; 3603 3604 sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu"); 3605 if (IS_ERR(sub_thread)) 3606 return PTR_ERR(sub_thread); 3607 kthread_bind(sub_thread, cpu); 3608 wake_up_process(sub_thread); 3609 wait_for_completion(&wfc.completion); 3610 return wfc.ret; 3611 } 3612 EXPORT_SYMBOL_GPL(work_on_cpu); 3613 #endif /* CONFIG_SMP */ 3614 3615 #ifdef CONFIG_FREEZER 3616 3617 /** 3618 * freeze_workqueues_begin - begin freezing workqueues 3619 * 3620 * Start freezing workqueues. After this function returns, all freezable 3621 * workqueues will queue new works to their frozen_works list instead of 3622 * gcwq->worklist. 3623 * 3624 * CONTEXT: 3625 * Grabs and releases workqueue_lock and gcwq->lock's. 3626 */ 3627 void freeze_workqueues_begin(void) 3628 { 3629 unsigned int cpu; 3630 3631 spin_lock(&workqueue_lock); 3632 3633 BUG_ON(workqueue_freezing); 3634 workqueue_freezing = true; 3635 3636 for_each_gcwq_cpu(cpu) { 3637 struct global_cwq *gcwq = get_gcwq(cpu); 3638 struct workqueue_struct *wq; 3639 3640 spin_lock_irq(&gcwq->lock); 3641 3642 BUG_ON(gcwq->flags & GCWQ_FREEZING); 3643 gcwq->flags |= GCWQ_FREEZING; 3644 3645 list_for_each_entry(wq, &workqueues, list) { 3646 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3647 3648 if (cwq && wq->flags & WQ_FREEZABLE) 3649 cwq->max_active = 0; 3650 } 3651 3652 spin_unlock_irq(&gcwq->lock); 3653 } 3654 3655 spin_unlock(&workqueue_lock); 3656 } 3657 3658 /** 3659 * freeze_workqueues_busy - are freezable workqueues still busy? 3660 * 3661 * Check whether freezing is complete. This function must be called 3662 * between freeze_workqueues_begin() and thaw_workqueues(). 3663 * 3664 * CONTEXT: 3665 * Grabs and releases workqueue_lock. 3666 * 3667 * RETURNS: 3668 * %true if some freezable workqueues are still busy. %false if freezing 3669 * is complete. 3670 */ 3671 bool freeze_workqueues_busy(void) 3672 { 3673 unsigned int cpu; 3674 bool busy = false; 3675 3676 spin_lock(&workqueue_lock); 3677 3678 BUG_ON(!workqueue_freezing); 3679 3680 for_each_gcwq_cpu(cpu) { 3681 struct workqueue_struct *wq; 3682 /* 3683 * nr_active is monotonically decreasing. It's safe 3684 * to peek without lock. 3685 */ 3686 list_for_each_entry(wq, &workqueues, list) { 3687 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3688 3689 if (!cwq || !(wq->flags & WQ_FREEZABLE)) 3690 continue; 3691 3692 BUG_ON(cwq->nr_active < 0); 3693 if (cwq->nr_active) { 3694 busy = true; 3695 goto out_unlock; 3696 } 3697 } 3698 } 3699 out_unlock: 3700 spin_unlock(&workqueue_lock); 3701 return busy; 3702 } 3703 3704 /** 3705 * thaw_workqueues - thaw workqueues 3706 * 3707 * Thaw workqueues. Normal queueing is restored and all collected 3708 * frozen works are transferred to their respective gcwq worklists. 3709 * 3710 * CONTEXT: 3711 * Grabs and releases workqueue_lock and gcwq->lock's. 3712 */ 3713 void thaw_workqueues(void) 3714 { 3715 unsigned int cpu; 3716 3717 spin_lock(&workqueue_lock); 3718 3719 if (!workqueue_freezing) 3720 goto out_unlock; 3721 3722 for_each_gcwq_cpu(cpu) { 3723 struct global_cwq *gcwq = get_gcwq(cpu); 3724 struct worker_pool *pool; 3725 struct workqueue_struct *wq; 3726 3727 spin_lock_irq(&gcwq->lock); 3728 3729 BUG_ON(!(gcwq->flags & GCWQ_FREEZING)); 3730 gcwq->flags &= ~GCWQ_FREEZING; 3731 3732 list_for_each_entry(wq, &workqueues, list) { 3733 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); 3734 3735 if (!cwq || !(wq->flags & WQ_FREEZABLE)) 3736 continue; 3737 3738 /* restore max_active and repopulate worklist */ 3739 cwq->max_active = wq->saved_max_active; 3740 3741 while (!list_empty(&cwq->delayed_works) && 3742 cwq->nr_active < cwq->max_active) 3743 cwq_activate_first_delayed(cwq); 3744 } 3745 3746 for_each_worker_pool(pool, gcwq) 3747 wake_up_worker(pool); 3748 3749 spin_unlock_irq(&gcwq->lock); 3750 } 3751 3752 workqueue_freezing = false; 3753 out_unlock: 3754 spin_unlock(&workqueue_lock); 3755 } 3756 #endif /* CONFIG_FREEZER */ 3757 3758 static int __init init_workqueues(void) 3759 { 3760 unsigned int cpu; 3761 int i; 3762 3763 cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP); 3764 cpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN); 3765 3766 /* initialize gcwqs */ 3767 for_each_gcwq_cpu(cpu) { 3768 struct global_cwq *gcwq = get_gcwq(cpu); 3769 struct worker_pool *pool; 3770 3771 spin_lock_init(&gcwq->lock); 3772 gcwq->cpu = cpu; 3773 gcwq->flags |= GCWQ_DISASSOCIATED; 3774 3775 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) 3776 INIT_HLIST_HEAD(&gcwq->busy_hash[i]); 3777 3778 for_each_worker_pool(pool, gcwq) { 3779 pool->gcwq = gcwq; 3780 INIT_LIST_HEAD(&pool->worklist); 3781 INIT_LIST_HEAD(&pool->idle_list); 3782 3783 init_timer_deferrable(&pool->idle_timer); 3784 pool->idle_timer.function = idle_worker_timeout; 3785 pool->idle_timer.data = (unsigned long)pool; 3786 3787 setup_timer(&pool->mayday_timer, gcwq_mayday_timeout, 3788 (unsigned long)pool); 3789 3790 mutex_init(&pool->manager_mutex); 3791 ida_init(&pool->worker_ida); 3792 } 3793 3794 init_waitqueue_head(&gcwq->rebind_hold); 3795 } 3796 3797 /* create the initial worker */ 3798 for_each_online_gcwq_cpu(cpu) { 3799 struct global_cwq *gcwq = get_gcwq(cpu); 3800 struct worker_pool *pool; 3801 3802 if (cpu != WORK_CPU_UNBOUND) 3803 gcwq->flags &= ~GCWQ_DISASSOCIATED; 3804 3805 for_each_worker_pool(pool, gcwq) { 3806 struct worker *worker; 3807 3808 worker = create_worker(pool); 3809 BUG_ON(!worker); 3810 spin_lock_irq(&gcwq->lock); 3811 start_worker(worker); 3812 spin_unlock_irq(&gcwq->lock); 3813 } 3814 } 3815 3816 system_wq = alloc_workqueue("events", 0, 0); 3817 system_long_wq = alloc_workqueue("events_long", 0, 0); 3818 system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0); 3819 system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, 3820 WQ_UNBOUND_MAX_ACTIVE); 3821 system_freezable_wq = alloc_workqueue("events_freezable", 3822 WQ_FREEZABLE, 0); 3823 system_nrt_freezable_wq = alloc_workqueue("events_nrt_freezable", 3824 WQ_NON_REENTRANT | WQ_FREEZABLE, 0); 3825 BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq || 3826 !system_unbound_wq || !system_freezable_wq || 3827 !system_nrt_freezable_wq); 3828 return 0; 3829 } 3830 early_initcall(init_workqueues); 3831