1 // SPDX-License-Identifier: GPL-2.0 2 3 #include "bcachefs.h" 4 #include "alloc_foreground.h" 5 #include "btree_iter.h" 6 #include "buckets.h" 7 #include "clock.h" 8 #include "compress.h" 9 #include "disk_groups.h" 10 #include "errcode.h" 11 #include "move.h" 12 #include "rebalance.h" 13 #include "super-io.h" 14 #include "trace.h" 15 16 #include <linux/freezer.h> 17 #include <linux/kthread.h> 18 #include <linux/sched/cputime.h> 19 20 /* 21 * Check if an extent should be moved: 22 * returns -1 if it should not be moved, or 23 * device of pointer that should be moved, if known, or INT_MAX if unknown 24 */ 25 static bool rebalance_pred(struct bch_fs *c, void *arg, 26 struct bkey_s_c k, 27 struct bch_io_opts *io_opts, 28 struct data_update_opts *data_opts) 29 { 30 struct bkey_ptrs_c ptrs = bch2_bkey_ptrs_c(k); 31 unsigned i; 32 33 data_opts->rewrite_ptrs = 0; 34 data_opts->target = io_opts->background_target; 35 data_opts->extra_replicas = 0; 36 data_opts->btree_insert_flags = 0; 37 38 if (io_opts->background_compression && 39 !bch2_bkey_is_incompressible(k)) { 40 const union bch_extent_entry *entry; 41 struct extent_ptr_decoded p; 42 43 i = 0; 44 bkey_for_each_ptr_decode(k.k, ptrs, p, entry) { 45 if (!p.ptr.cached && 46 p.crc.compression_type != 47 bch2_compression_opt_to_type(io_opts->background_compression)) 48 data_opts->rewrite_ptrs |= 1U << i; 49 i++; 50 } 51 } 52 53 if (io_opts->background_target) { 54 const struct bch_extent_ptr *ptr; 55 56 i = 0; 57 bkey_for_each_ptr(ptrs, ptr) { 58 if (!ptr->cached && 59 !bch2_dev_in_target(c, ptr->dev, io_opts->background_target) && 60 bch2_target_accepts_data(c, BCH_DATA_user, io_opts->background_target)) 61 data_opts->rewrite_ptrs |= 1U << i; 62 i++; 63 } 64 } 65 66 return data_opts->rewrite_ptrs != 0; 67 } 68 69 void bch2_rebalance_add_key(struct bch_fs *c, 70 struct bkey_s_c k, 71 struct bch_io_opts *io_opts) 72 { 73 struct data_update_opts update_opts = { 0 }; 74 struct bkey_ptrs_c ptrs; 75 const struct bch_extent_ptr *ptr; 76 unsigned i; 77 78 if (!rebalance_pred(c, NULL, k, io_opts, &update_opts)) 79 return; 80 81 i = 0; 82 ptrs = bch2_bkey_ptrs_c(k); 83 bkey_for_each_ptr(ptrs, ptr) { 84 if ((1U << i) && update_opts.rewrite_ptrs) 85 if (atomic64_add_return(k.k->size, 86 &bch_dev_bkey_exists(c, ptr->dev)->rebalance_work) == 87 k.k->size) 88 rebalance_wakeup(c); 89 i++; 90 } 91 } 92 93 void bch2_rebalance_add_work(struct bch_fs *c, u64 sectors) 94 { 95 if (atomic64_add_return(sectors, &c->rebalance.work_unknown_dev) == 96 sectors) 97 rebalance_wakeup(c); 98 } 99 100 struct rebalance_work { 101 int dev_most_full_idx; 102 unsigned dev_most_full_percent; 103 u64 dev_most_full_work; 104 u64 dev_most_full_capacity; 105 u64 total_work; 106 }; 107 108 static void rebalance_work_accumulate(struct rebalance_work *w, 109 u64 dev_work, u64 unknown_dev, u64 capacity, int idx) 110 { 111 unsigned percent_full; 112 u64 work = dev_work + unknown_dev; 113 114 /* avoid divide by 0 */ 115 if (!capacity) 116 return; 117 118 if (work < dev_work || work < unknown_dev) 119 work = U64_MAX; 120 work = min(work, capacity); 121 122 percent_full = div64_u64(work * 100, capacity); 123 124 if (percent_full >= w->dev_most_full_percent) { 125 w->dev_most_full_idx = idx; 126 w->dev_most_full_percent = percent_full; 127 w->dev_most_full_work = work; 128 w->dev_most_full_capacity = capacity; 129 } 130 131 if (w->total_work + dev_work >= w->total_work && 132 w->total_work + dev_work >= dev_work) 133 w->total_work += dev_work; 134 } 135 136 static struct rebalance_work rebalance_work(struct bch_fs *c) 137 { 138 struct bch_dev *ca; 139 struct rebalance_work ret = { .dev_most_full_idx = -1 }; 140 u64 unknown_dev = atomic64_read(&c->rebalance.work_unknown_dev); 141 unsigned i; 142 143 for_each_online_member(ca, c, i) 144 rebalance_work_accumulate(&ret, 145 atomic64_read(&ca->rebalance_work), 146 unknown_dev, 147 bucket_to_sector(ca, ca->mi.nbuckets - 148 ca->mi.first_bucket), 149 i); 150 151 rebalance_work_accumulate(&ret, 152 unknown_dev, 0, c->capacity, -1); 153 154 return ret; 155 } 156 157 static void rebalance_work_reset(struct bch_fs *c) 158 { 159 struct bch_dev *ca; 160 unsigned i; 161 162 for_each_online_member(ca, c, i) 163 atomic64_set(&ca->rebalance_work, 0); 164 165 atomic64_set(&c->rebalance.work_unknown_dev, 0); 166 } 167 168 static unsigned long curr_cputime(void) 169 { 170 u64 utime, stime; 171 172 task_cputime_adjusted(current, &utime, &stime); 173 return nsecs_to_jiffies(utime + stime); 174 } 175 176 static int bch2_rebalance_thread(void *arg) 177 { 178 struct bch_fs *c = arg; 179 struct bch_fs_rebalance *r = &c->rebalance; 180 struct io_clock *clock = &c->io_clock[WRITE]; 181 struct rebalance_work w, p; 182 struct bch_move_stats move_stats; 183 unsigned long start, prev_start; 184 unsigned long prev_run_time, prev_run_cputime; 185 unsigned long cputime, prev_cputime; 186 u64 io_start; 187 long throttle; 188 189 set_freezable(); 190 191 io_start = atomic64_read(&clock->now); 192 p = rebalance_work(c); 193 prev_start = jiffies; 194 prev_cputime = curr_cputime(); 195 196 bch2_move_stats_init(&move_stats, "rebalance"); 197 while (!kthread_wait_freezable(r->enabled)) { 198 cond_resched(); 199 200 start = jiffies; 201 cputime = curr_cputime(); 202 203 prev_run_time = start - prev_start; 204 prev_run_cputime = cputime - prev_cputime; 205 206 w = rebalance_work(c); 207 BUG_ON(!w.dev_most_full_capacity); 208 209 if (!w.total_work) { 210 r->state = REBALANCE_WAITING; 211 kthread_wait_freezable(rebalance_work(c).total_work); 212 continue; 213 } 214 215 /* 216 * If there isn't much work to do, throttle cpu usage: 217 */ 218 throttle = prev_run_cputime * 100 / 219 max(1U, w.dev_most_full_percent) - 220 prev_run_time; 221 222 if (w.dev_most_full_percent < 20 && throttle > 0) { 223 r->throttled_until_iotime = io_start + 224 div_u64(w.dev_most_full_capacity * 225 (20 - w.dev_most_full_percent), 226 50); 227 228 if (atomic64_read(&clock->now) + clock->max_slop < 229 r->throttled_until_iotime) { 230 r->throttled_until_cputime = start + throttle; 231 r->state = REBALANCE_THROTTLED; 232 233 bch2_kthread_io_clock_wait(clock, 234 r->throttled_until_iotime, 235 throttle); 236 continue; 237 } 238 } 239 240 /* minimum 1 mb/sec: */ 241 r->pd.rate.rate = 242 max_t(u64, 1 << 11, 243 r->pd.rate.rate * 244 max(p.dev_most_full_percent, 1U) / 245 max(w.dev_most_full_percent, 1U)); 246 247 io_start = atomic64_read(&clock->now); 248 p = w; 249 prev_start = start; 250 prev_cputime = cputime; 251 252 r->state = REBALANCE_RUNNING; 253 memset(&move_stats, 0, sizeof(move_stats)); 254 rebalance_work_reset(c); 255 256 bch2_move_data(c, 257 0, POS_MIN, 258 BTREE_ID_NR, POS_MAX, 259 /* ratelimiting disabled for now */ 260 NULL, /* &r->pd.rate, */ 261 &move_stats, 262 writepoint_ptr(&c->rebalance_write_point), 263 true, 264 rebalance_pred, NULL); 265 } 266 267 return 0; 268 } 269 270 void bch2_rebalance_work_to_text(struct printbuf *out, struct bch_fs *c) 271 { 272 struct bch_fs_rebalance *r = &c->rebalance; 273 struct rebalance_work w = rebalance_work(c); 274 275 if (!out->nr_tabstops) 276 printbuf_tabstop_push(out, 20); 277 278 prt_printf(out, "fullest_dev (%i):", w.dev_most_full_idx); 279 prt_tab(out); 280 281 prt_human_readable_u64(out, w.dev_most_full_work << 9); 282 prt_printf(out, "/"); 283 prt_human_readable_u64(out, w.dev_most_full_capacity << 9); 284 prt_newline(out); 285 286 prt_printf(out, "total work:"); 287 prt_tab(out); 288 289 prt_human_readable_u64(out, w.total_work << 9); 290 prt_printf(out, "/"); 291 prt_human_readable_u64(out, c->capacity << 9); 292 prt_newline(out); 293 294 prt_printf(out, "rate:"); 295 prt_tab(out); 296 prt_printf(out, "%u", r->pd.rate.rate); 297 prt_newline(out); 298 299 switch (r->state) { 300 case REBALANCE_WAITING: 301 prt_printf(out, "waiting"); 302 break; 303 case REBALANCE_THROTTLED: 304 prt_printf(out, "throttled for %lu sec or ", 305 (r->throttled_until_cputime - jiffies) / HZ); 306 prt_human_readable_u64(out, 307 (r->throttled_until_iotime - 308 atomic64_read(&c->io_clock[WRITE].now)) << 9); 309 prt_printf(out, " io"); 310 break; 311 case REBALANCE_RUNNING: 312 prt_printf(out, "running"); 313 break; 314 } 315 prt_newline(out); 316 } 317 318 void bch2_rebalance_stop(struct bch_fs *c) 319 { 320 struct task_struct *p; 321 322 c->rebalance.pd.rate.rate = UINT_MAX; 323 bch2_ratelimit_reset(&c->rebalance.pd.rate); 324 325 p = rcu_dereference_protected(c->rebalance.thread, 1); 326 c->rebalance.thread = NULL; 327 328 if (p) { 329 /* for sychronizing with rebalance_wakeup() */ 330 synchronize_rcu(); 331 332 kthread_stop(p); 333 put_task_struct(p); 334 } 335 } 336 337 int bch2_rebalance_start(struct bch_fs *c) 338 { 339 struct task_struct *p; 340 int ret; 341 342 if (c->rebalance.thread) 343 return 0; 344 345 if (c->opts.nochanges) 346 return 0; 347 348 p = kthread_create(bch2_rebalance_thread, c, "bch-rebalance/%s", c->name); 349 ret = PTR_ERR_OR_ZERO(p); 350 if (ret) { 351 bch_err_msg(c, ret, "creating rebalance thread"); 352 return ret; 353 } 354 355 get_task_struct(p); 356 rcu_assign_pointer(c->rebalance.thread, p); 357 wake_up_process(p); 358 return 0; 359 } 360 361 void bch2_fs_rebalance_init(struct bch_fs *c) 362 { 363 bch2_pd_controller_init(&c->rebalance.pd); 364 365 atomic64_set(&c->rebalance.work_unknown_dev, S64_MAX); 366 } 367