1 /* 2 * Copyright (C) 2007 Oracle. All rights reserved. 3 * 4 * This program is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU General Public 6 * License v2 as published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 11 * General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public 14 * License along with this program; if not, write to the 15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 16 * Boston, MA 021110-1307, USA. 17 */ 18 19 #include <linux/kthread.h> 20 #include <linux/list.h> 21 #include <linux/spinlock.h> 22 #include <linux/freezer.h> 23 #include "async-thread.h" 24 25 #define WORK_QUEUED_BIT 0 26 #define WORK_DONE_BIT 1 27 #define WORK_ORDER_DONE_BIT 2 28 29 /* 30 * container for the kthread task pointer and the list of pending work 31 * One of these is allocated per thread. 32 */ 33 struct btrfs_worker_thread { 34 /* pool we belong to */ 35 struct btrfs_workers *workers; 36 37 /* list of struct btrfs_work that are waiting for service */ 38 struct list_head pending; 39 40 /* list of worker threads from struct btrfs_workers */ 41 struct list_head worker_list; 42 43 /* kthread */ 44 struct task_struct *task; 45 46 /* number of things on the pending list */ 47 atomic_t num_pending; 48 49 unsigned long sequence; 50 51 /* protects the pending list. */ 52 spinlock_t lock; 53 54 /* set to non-zero when this thread is already awake and kicking */ 55 int working; 56 57 /* are we currently idle */ 58 int idle; 59 }; 60 61 /* 62 * helper function to move a thread onto the idle list after it 63 * has finished some requests. 64 */ 65 static void check_idle_worker(struct btrfs_worker_thread *worker) 66 { 67 if (!worker->idle && atomic_read(&worker->num_pending) < 68 worker->workers->idle_thresh / 2) { 69 unsigned long flags; 70 spin_lock_irqsave(&worker->workers->lock, flags); 71 worker->idle = 1; 72 list_move(&worker->worker_list, &worker->workers->idle_list); 73 spin_unlock_irqrestore(&worker->workers->lock, flags); 74 } 75 } 76 77 /* 78 * helper function to move a thread off the idle list after new 79 * pending work is added. 80 */ 81 static void check_busy_worker(struct btrfs_worker_thread *worker) 82 { 83 if (worker->idle && atomic_read(&worker->num_pending) >= 84 worker->workers->idle_thresh) { 85 unsigned long flags; 86 spin_lock_irqsave(&worker->workers->lock, flags); 87 worker->idle = 0; 88 list_move_tail(&worker->worker_list, 89 &worker->workers->worker_list); 90 spin_unlock_irqrestore(&worker->workers->lock, flags); 91 } 92 } 93 94 static noinline int run_ordered_completions(struct btrfs_workers *workers, 95 struct btrfs_work *work) 96 { 97 unsigned long flags; 98 99 if (!workers->ordered) 100 return 0; 101 102 set_bit(WORK_DONE_BIT, &work->flags); 103 104 spin_lock_irqsave(&workers->lock, flags); 105 106 while (!list_empty(&workers->order_list)) { 107 work = list_entry(workers->order_list.next, 108 struct btrfs_work, order_list); 109 110 if (!test_bit(WORK_DONE_BIT, &work->flags)) 111 break; 112 113 /* we are going to call the ordered done function, but 114 * we leave the work item on the list as a barrier so 115 * that later work items that are done don't have their 116 * functions called before this one returns 117 */ 118 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 119 break; 120 121 spin_unlock_irqrestore(&workers->lock, flags); 122 123 work->ordered_func(work); 124 125 /* now take the lock again and call the freeing code */ 126 spin_lock_irqsave(&workers->lock, flags); 127 list_del(&work->order_list); 128 work->ordered_free(work); 129 } 130 131 spin_unlock_irqrestore(&workers->lock, flags); 132 return 0; 133 } 134 135 /* 136 * main loop for servicing work items 137 */ 138 static int worker_loop(void *arg) 139 { 140 struct btrfs_worker_thread *worker = arg; 141 struct list_head *cur; 142 struct btrfs_work *work; 143 do { 144 spin_lock_irq(&worker->lock); 145 again_locked: 146 while (!list_empty(&worker->pending)) { 147 cur = worker->pending.next; 148 work = list_entry(cur, struct btrfs_work, list); 149 list_del(&work->list); 150 clear_bit(WORK_QUEUED_BIT, &work->flags); 151 152 work->worker = worker; 153 spin_unlock_irq(&worker->lock); 154 155 work->func(work); 156 157 atomic_dec(&worker->num_pending); 158 /* 159 * unless this is an ordered work queue, 160 * 'work' was probably freed by func above. 161 */ 162 run_ordered_completions(worker->workers, work); 163 164 spin_lock_irq(&worker->lock); 165 check_idle_worker(worker); 166 167 } 168 if (freezing(current)) { 169 worker->working = 0; 170 spin_unlock_irq(&worker->lock); 171 refrigerator(); 172 } else { 173 spin_unlock_irq(&worker->lock); 174 if (!kthread_should_stop()) { 175 cpu_relax(); 176 /* 177 * we've dropped the lock, did someone else 178 * jump_in? 179 */ 180 smp_mb(); 181 if (!list_empty(&worker->pending)) 182 continue; 183 184 /* 185 * this short schedule allows more work to 186 * come in without the queue functions 187 * needing to go through wake_up_process() 188 * 189 * worker->working is still 1, so nobody 190 * is going to try and wake us up 191 */ 192 schedule_timeout(1); 193 smp_mb(); 194 if (!list_empty(&worker->pending)) 195 continue; 196 197 if (kthread_should_stop()) 198 break; 199 200 /* still no more work?, sleep for real */ 201 spin_lock_irq(&worker->lock); 202 set_current_state(TASK_INTERRUPTIBLE); 203 if (!list_empty(&worker->pending)) 204 goto again_locked; 205 206 /* 207 * this makes sure we get a wakeup when someone 208 * adds something new to the queue 209 */ 210 worker->working = 0; 211 spin_unlock_irq(&worker->lock); 212 213 if (!kthread_should_stop()) 214 schedule(); 215 } 216 __set_current_state(TASK_RUNNING); 217 } 218 } while (!kthread_should_stop()); 219 return 0; 220 } 221 222 /* 223 * this will wait for all the worker threads to shutdown 224 */ 225 int btrfs_stop_workers(struct btrfs_workers *workers) 226 { 227 struct list_head *cur; 228 struct btrfs_worker_thread *worker; 229 230 list_splice_init(&workers->idle_list, &workers->worker_list); 231 while (!list_empty(&workers->worker_list)) { 232 cur = workers->worker_list.next; 233 worker = list_entry(cur, struct btrfs_worker_thread, 234 worker_list); 235 kthread_stop(worker->task); 236 list_del(&worker->worker_list); 237 kfree(worker); 238 } 239 return 0; 240 } 241 242 /* 243 * simple init on struct btrfs_workers 244 */ 245 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max) 246 { 247 workers->num_workers = 0; 248 INIT_LIST_HEAD(&workers->worker_list); 249 INIT_LIST_HEAD(&workers->idle_list); 250 INIT_LIST_HEAD(&workers->order_list); 251 spin_lock_init(&workers->lock); 252 workers->max_workers = max; 253 workers->idle_thresh = 32; 254 workers->name = name; 255 workers->ordered = 0; 256 } 257 258 /* 259 * starts new worker threads. This does not enforce the max worker 260 * count in case you need to temporarily go past it. 261 */ 262 int btrfs_start_workers(struct btrfs_workers *workers, int num_workers) 263 { 264 struct btrfs_worker_thread *worker; 265 int ret = 0; 266 int i; 267 268 for (i = 0; i < num_workers; i++) { 269 worker = kzalloc(sizeof(*worker), GFP_NOFS); 270 if (!worker) { 271 ret = -ENOMEM; 272 goto fail; 273 } 274 275 INIT_LIST_HEAD(&worker->pending); 276 INIT_LIST_HEAD(&worker->worker_list); 277 spin_lock_init(&worker->lock); 278 atomic_set(&worker->num_pending, 0); 279 worker->task = kthread_run(worker_loop, worker, 280 "btrfs-%s-%d", workers->name, 281 workers->num_workers + i); 282 worker->workers = workers; 283 if (IS_ERR(worker->task)) { 284 kfree(worker); 285 ret = PTR_ERR(worker->task); 286 goto fail; 287 } 288 289 spin_lock_irq(&workers->lock); 290 list_add_tail(&worker->worker_list, &workers->idle_list); 291 worker->idle = 1; 292 workers->num_workers++; 293 spin_unlock_irq(&workers->lock); 294 } 295 return 0; 296 fail: 297 btrfs_stop_workers(workers); 298 return ret; 299 } 300 301 /* 302 * run through the list and find a worker thread that doesn't have a lot 303 * to do right now. This can return null if we aren't yet at the thread 304 * count limit and all of the threads are busy. 305 */ 306 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers) 307 { 308 struct btrfs_worker_thread *worker; 309 struct list_head *next; 310 int enforce_min = workers->num_workers < workers->max_workers; 311 312 /* 313 * if we find an idle thread, don't move it to the end of the 314 * idle list. This improves the chance that the next submission 315 * will reuse the same thread, and maybe catch it while it is still 316 * working 317 */ 318 if (!list_empty(&workers->idle_list)) { 319 next = workers->idle_list.next; 320 worker = list_entry(next, struct btrfs_worker_thread, 321 worker_list); 322 return worker; 323 } 324 if (enforce_min || list_empty(&workers->worker_list)) 325 return NULL; 326 327 /* 328 * if we pick a busy task, move the task to the end of the list. 329 * hopefully this will keep things somewhat evenly balanced. 330 * Do the move in batches based on the sequence number. This groups 331 * requests submitted at roughly the same time onto the same worker. 332 */ 333 next = workers->worker_list.next; 334 worker = list_entry(next, struct btrfs_worker_thread, worker_list); 335 atomic_inc(&worker->num_pending); 336 worker->sequence++; 337 338 if (worker->sequence % workers->idle_thresh == 0) 339 list_move_tail(next, &workers->worker_list); 340 return worker; 341 } 342 343 /* 344 * selects a worker thread to take the next job. This will either find 345 * an idle worker, start a new worker up to the max count, or just return 346 * one of the existing busy workers. 347 */ 348 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) 349 { 350 struct btrfs_worker_thread *worker; 351 unsigned long flags; 352 353 again: 354 spin_lock_irqsave(&workers->lock, flags); 355 worker = next_worker(workers); 356 spin_unlock_irqrestore(&workers->lock, flags); 357 358 if (!worker) { 359 spin_lock_irqsave(&workers->lock, flags); 360 if (workers->num_workers >= workers->max_workers) { 361 struct list_head *fallback = NULL; 362 /* 363 * we have failed to find any workers, just 364 * return the force one 365 */ 366 if (!list_empty(&workers->worker_list)) 367 fallback = workers->worker_list.next; 368 if (!list_empty(&workers->idle_list)) 369 fallback = workers->idle_list.next; 370 BUG_ON(!fallback); 371 worker = list_entry(fallback, 372 struct btrfs_worker_thread, worker_list); 373 spin_unlock_irqrestore(&workers->lock, flags); 374 } else { 375 spin_unlock_irqrestore(&workers->lock, flags); 376 /* we're below the limit, start another worker */ 377 btrfs_start_workers(workers, 1); 378 goto again; 379 } 380 } 381 return worker; 382 } 383 384 /* 385 * btrfs_requeue_work just puts the work item back on the tail of the list 386 * it was taken from. It is intended for use with long running work functions 387 * that make some progress and want to give the cpu up for others. 388 */ 389 int btrfs_requeue_work(struct btrfs_work *work) 390 { 391 struct btrfs_worker_thread *worker = work->worker; 392 unsigned long flags; 393 int wake = 0; 394 395 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 396 goto out; 397 398 spin_lock_irqsave(&worker->lock, flags); 399 list_add_tail(&work->list, &worker->pending); 400 atomic_inc(&worker->num_pending); 401 402 /* by definition we're busy, take ourselves off the idle 403 * list 404 */ 405 if (worker->idle) { 406 spin_lock_irqsave(&worker->workers->lock, flags); 407 worker->idle = 0; 408 list_move_tail(&worker->worker_list, 409 &worker->workers->worker_list); 410 spin_unlock_irqrestore(&worker->workers->lock, flags); 411 } 412 if (!worker->working) { 413 wake = 1; 414 worker->working = 1; 415 } 416 417 spin_unlock_irqrestore(&worker->lock, flags); 418 if (wake) 419 wake_up_process(worker->task); 420 out: 421 422 return 0; 423 } 424 425 /* 426 * places a struct btrfs_work into the pending queue of one of the kthreads 427 */ 428 int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) 429 { 430 struct btrfs_worker_thread *worker; 431 unsigned long flags; 432 int wake = 0; 433 434 /* don't requeue something already on a list */ 435 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 436 goto out; 437 438 worker = find_worker(workers); 439 if (workers->ordered) { 440 spin_lock_irqsave(&workers->lock, flags); 441 list_add_tail(&work->order_list, &workers->order_list); 442 spin_unlock_irqrestore(&workers->lock, flags); 443 } else { 444 INIT_LIST_HEAD(&work->order_list); 445 } 446 447 spin_lock_irqsave(&worker->lock, flags); 448 449 list_add_tail(&work->list, &worker->pending); 450 atomic_inc(&worker->num_pending); 451 check_busy_worker(worker); 452 453 /* 454 * avoid calling into wake_up_process if this thread has already 455 * been kicked 456 */ 457 if (!worker->working) 458 wake = 1; 459 worker->working = 1; 460 461 spin_unlock_irqrestore(&worker->lock, flags); 462 463 if (wake) 464 wake_up_process(worker->task); 465 out: 466 return 0; 467 } 468