1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (C) 2007 Oracle. All rights reserved. 4 * Copyright (C) 2014 Fujitsu. All rights reserved. 5 */ 6 7 #include <linux/kthread.h> 8 #include <linux/slab.h> 9 #include <linux/list.h> 10 #include <linux/spinlock.h> 11 #include <linux/freezer.h> 12 #include <trace/events/btrfs.h> 13 #include "async-thread.h" 14 #include "ctree.h" 15 16 enum { 17 WORK_DONE_BIT, 18 WORK_ORDER_DONE_BIT, 19 }; 20 21 #define NO_THRESHOLD (-1) 22 #define DFT_THRESHOLD (32) 23 24 struct btrfs_workqueue { 25 struct workqueue_struct *normal_wq; 26 27 /* File system this workqueue services */ 28 struct btrfs_fs_info *fs_info; 29 30 /* List head pointing to ordered work list */ 31 struct list_head ordered_list; 32 33 /* Spinlock for ordered_list */ 34 spinlock_t list_lock; 35 36 /* Thresholding related variants */ 37 atomic_t pending; 38 39 /* Up limit of concurrency workers */ 40 int limit_active; 41 42 /* Current number of concurrency workers */ 43 int current_active; 44 45 /* Threshold to change current_active */ 46 int thresh; 47 unsigned int count; 48 spinlock_t thres_lock; 49 }; 50 51 struct btrfs_fs_info * __pure btrfs_workqueue_owner(const struct btrfs_workqueue *wq) 52 { 53 return wq->fs_info; 54 } 55 56 struct btrfs_fs_info * __pure btrfs_work_owner(const struct btrfs_work *work) 57 { 58 return work->wq->fs_info; 59 } 60 61 bool btrfs_workqueue_normal_congested(const struct btrfs_workqueue *wq) 62 { 63 /* 64 * We could compare wq->pending with num_online_cpus() 65 * to support "thresh == NO_THRESHOLD" case, but it requires 66 * moving up atomic_inc/dec in thresh_queue/exec_hook. Let's 67 * postpone it until someone needs the support of that case. 68 */ 69 if (wq->thresh == NO_THRESHOLD) 70 return false; 71 72 return atomic_read(&wq->pending) > wq->thresh * 2; 73 } 74 75 static void btrfs_init_workqueue(struct btrfs_workqueue *wq, 76 struct btrfs_fs_info *fs_info) 77 { 78 wq->fs_info = fs_info; 79 atomic_set(&wq->pending, 0); 80 INIT_LIST_HEAD(&wq->ordered_list); 81 spin_lock_init(&wq->list_lock); 82 spin_lock_init(&wq->thres_lock); 83 } 84 85 struct btrfs_workqueue *btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info, 86 const char *name, unsigned int flags, 87 int limit_active, int thresh) 88 { 89 struct btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL); 90 91 if (!ret) 92 return NULL; 93 94 btrfs_init_workqueue(ret, fs_info); 95 96 ret->limit_active = limit_active; 97 if (thresh == 0) 98 thresh = DFT_THRESHOLD; 99 /* For low threshold, disabling threshold is a better choice */ 100 if (thresh < DFT_THRESHOLD) { 101 ret->current_active = limit_active; 102 ret->thresh = NO_THRESHOLD; 103 } else { 104 /* 105 * For threshold-able wq, let its concurrency grow on demand. 106 * Use minimal max_active at alloc time to reduce resource 107 * usage. 108 */ 109 ret->current_active = 1; 110 ret->thresh = thresh; 111 } 112 113 ret->normal_wq = alloc_workqueue("btrfs-%s", flags, ret->current_active, 114 name); 115 if (!ret->normal_wq) { 116 kfree(ret); 117 return NULL; 118 } 119 120 trace_btrfs_workqueue_alloc(ret, name); 121 return ret; 122 } 123 124 struct btrfs_workqueue *btrfs_alloc_ordered_workqueue( 125 struct btrfs_fs_info *fs_info, const char *name, 126 unsigned int flags) 127 { 128 struct btrfs_workqueue *ret; 129 130 ret = kzalloc(sizeof(*ret), GFP_KERNEL); 131 if (!ret) 132 return NULL; 133 134 btrfs_init_workqueue(ret, fs_info); 135 136 /* Ordered workqueues don't allow @max_active adjustments. */ 137 ret->limit_active = 1; 138 ret->current_active = 1; 139 ret->thresh = NO_THRESHOLD; 140 141 ret->normal_wq = alloc_ordered_workqueue("btrfs-%s", flags, name); 142 if (!ret->normal_wq) { 143 kfree(ret); 144 return NULL; 145 } 146 147 trace_btrfs_workqueue_alloc(ret, name); 148 return ret; 149 } 150 151 /* 152 * Hook for threshold which will be called in btrfs_queue_work. 153 * This hook WILL be called in IRQ handler context, 154 * so workqueue_set_max_active MUST NOT be called in this hook 155 */ 156 static inline void thresh_queue_hook(struct btrfs_workqueue *wq) 157 { 158 if (wq->thresh == NO_THRESHOLD) 159 return; 160 atomic_inc(&wq->pending); 161 } 162 163 /* 164 * Hook for threshold which will be called before executing the work, 165 * This hook is called in kthread content. 166 * So workqueue_set_max_active is called here. 167 */ 168 static inline void thresh_exec_hook(struct btrfs_workqueue *wq) 169 { 170 int new_current_active; 171 long pending; 172 int need_change = 0; 173 174 if (wq->thresh == NO_THRESHOLD) 175 return; 176 177 atomic_dec(&wq->pending); 178 spin_lock(&wq->thres_lock); 179 /* 180 * Use wq->count to limit the calling frequency of 181 * workqueue_set_max_active. 182 */ 183 wq->count++; 184 wq->count %= (wq->thresh / 4); 185 if (!wq->count) 186 goto out; 187 new_current_active = wq->current_active; 188 189 /* 190 * pending may be changed later, but it's OK since we really 191 * don't need it so accurate to calculate new_max_active. 192 */ 193 pending = atomic_read(&wq->pending); 194 if (pending > wq->thresh) 195 new_current_active++; 196 if (pending < wq->thresh / 2) 197 new_current_active--; 198 new_current_active = clamp_val(new_current_active, 1, wq->limit_active); 199 if (new_current_active != wq->current_active) { 200 need_change = 1; 201 wq->current_active = new_current_active; 202 } 203 out: 204 spin_unlock(&wq->thres_lock); 205 206 if (need_change) { 207 workqueue_set_max_active(wq->normal_wq, wq->current_active); 208 } 209 } 210 211 static void run_ordered_work(struct btrfs_workqueue *wq, 212 struct btrfs_work *self) 213 { 214 struct list_head *list = &wq->ordered_list; 215 struct btrfs_work *work; 216 spinlock_t *lock = &wq->list_lock; 217 unsigned long flags; 218 bool free_self = false; 219 220 while (1) { 221 spin_lock_irqsave(lock, flags); 222 if (list_empty(list)) 223 break; 224 work = list_entry(list->next, struct btrfs_work, 225 ordered_list); 226 if (!test_bit(WORK_DONE_BIT, &work->flags)) 227 break; 228 /* 229 * Orders all subsequent loads after reading WORK_DONE_BIT, 230 * paired with the smp_mb__before_atomic in btrfs_work_helper 231 * this guarantees that the ordered function will see all 232 * updates from ordinary work function. 233 */ 234 smp_rmb(); 235 236 /* 237 * we are going to call the ordered done function, but 238 * we leave the work item on the list as a barrier so 239 * that later work items that are done don't have their 240 * functions called before this one returns 241 */ 242 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 243 break; 244 trace_btrfs_ordered_sched(work); 245 spin_unlock_irqrestore(lock, flags); 246 work->ordered_func(work, false); 247 248 /* now take the lock again and drop our item from the list */ 249 spin_lock_irqsave(lock, flags); 250 list_del(&work->ordered_list); 251 spin_unlock_irqrestore(lock, flags); 252 253 if (work == self) { 254 /* 255 * This is the work item that the worker is currently 256 * executing. 257 * 258 * The kernel workqueue code guarantees non-reentrancy 259 * of work items. I.e., if a work item with the same 260 * address and work function is queued twice, the second 261 * execution is blocked until the first one finishes. A 262 * work item may be freed and recycled with the same 263 * work function; the workqueue code assumes that the 264 * original work item cannot depend on the recycled work 265 * item in that case (see find_worker_executing_work()). 266 * 267 * Note that different types of Btrfs work can depend on 268 * each other, and one type of work on one Btrfs 269 * filesystem may even depend on the same type of work 270 * on another Btrfs filesystem via, e.g., a loop device. 271 * Therefore, we must not allow the current work item to 272 * be recycled until we are really done, otherwise we 273 * break the above assumption and can deadlock. 274 */ 275 free_self = true; 276 } else { 277 /* 278 * We don't want to call the ordered free functions with 279 * the lock held. 280 */ 281 work->ordered_func(work, true); 282 /* NB: work must not be dereferenced past this point. */ 283 trace_btrfs_all_work_done(wq->fs_info, work); 284 } 285 } 286 spin_unlock_irqrestore(lock, flags); 287 288 if (free_self) { 289 self->ordered_func(self, true); 290 /* NB: self must not be dereferenced past this point. */ 291 trace_btrfs_all_work_done(wq->fs_info, self); 292 } 293 } 294 295 static void btrfs_work_helper(struct work_struct *normal_work) 296 { 297 struct btrfs_work *work = container_of(normal_work, struct btrfs_work, 298 normal_work); 299 struct btrfs_workqueue *wq = work->wq; 300 int need_order = 0; 301 302 /* 303 * We should not touch things inside work in the following cases: 304 * 1) after work->func() if it has no ordered_func(..., true) to free 305 * Since the struct is freed in work->func(). 306 * 2) after setting WORK_DONE_BIT 307 * The work may be freed in other threads almost instantly. 308 * So we save the needed things here. 309 */ 310 if (work->ordered_func) 311 need_order = 1; 312 313 trace_btrfs_work_sched(work); 314 thresh_exec_hook(wq); 315 work->func(work); 316 if (need_order) { 317 /* 318 * Ensures all memory accesses done in the work function are 319 * ordered before setting the WORK_DONE_BIT. Ensuring the thread 320 * which is going to executed the ordered work sees them. 321 * Pairs with the smp_rmb in run_ordered_work. 322 */ 323 smp_mb__before_atomic(); 324 set_bit(WORK_DONE_BIT, &work->flags); 325 run_ordered_work(wq, work); 326 } else { 327 /* NB: work must not be dereferenced past this point. */ 328 trace_btrfs_all_work_done(wq->fs_info, work); 329 } 330 } 331 332 void btrfs_init_work(struct btrfs_work *work, btrfs_func_t func, 333 btrfs_ordered_func_t ordered_func) 334 { 335 work->func = func; 336 work->ordered_func = ordered_func; 337 INIT_WORK(&work->normal_work, btrfs_work_helper); 338 INIT_LIST_HEAD(&work->ordered_list); 339 work->flags = 0; 340 } 341 342 void btrfs_queue_work(struct btrfs_workqueue *wq, struct btrfs_work *work) 343 { 344 unsigned long flags; 345 346 work->wq = wq; 347 thresh_queue_hook(wq); 348 if (work->ordered_func) { 349 spin_lock_irqsave(&wq->list_lock, flags); 350 list_add_tail(&work->ordered_list, &wq->ordered_list); 351 spin_unlock_irqrestore(&wq->list_lock, flags); 352 } 353 trace_btrfs_work_queued(work); 354 queue_work(wq->normal_wq, &work->normal_work); 355 } 356 357 void btrfs_destroy_workqueue(struct btrfs_workqueue *wq) 358 { 359 if (!wq) 360 return; 361 destroy_workqueue(wq->normal_wq); 362 trace_btrfs_workqueue_destroy(wq); 363 kfree(wq); 364 } 365 366 void btrfs_workqueue_set_max(struct btrfs_workqueue *wq, int limit_active) 367 { 368 if (wq) 369 wq->limit_active = limit_active; 370 } 371 372 void btrfs_flush_workqueue(struct btrfs_workqueue *wq) 373 { 374 flush_workqueue(wq->normal_wq); 375 } 376