xref: /linux/fs/bcachefs/rebalance.c (revision 031fba65fc202abf1f193e321be7a2c274fd88ba)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include "bcachefs.h"
4 #include "alloc_foreground.h"
5 #include "btree_iter.h"
6 #include "buckets.h"
7 #include "clock.h"
8 #include "compress.h"
9 #include "disk_groups.h"
10 #include "errcode.h"
11 #include "move.h"
12 #include "rebalance.h"
13 #include "super-io.h"
14 #include "trace.h"
15 
16 #include <linux/freezer.h>
17 #include <linux/kthread.h>
18 #include <linux/sched/cputime.h>
19 
20 /*
21  * Check if an extent should be moved:
22  * returns -1 if it should not be moved, or
23  * device of pointer that should be moved, if known, or INT_MAX if unknown
24  */
25 static bool rebalance_pred(struct bch_fs *c, void *arg,
26 			   struct bkey_s_c k,
27 			   struct bch_io_opts *io_opts,
28 			   struct data_update_opts *data_opts)
29 {
30 	struct bkey_ptrs_c ptrs = bch2_bkey_ptrs_c(k);
31 	unsigned i;
32 
33 	data_opts->rewrite_ptrs		= 0;
34 	data_opts->target		= io_opts->background_target;
35 	data_opts->extra_replicas	= 0;
36 	data_opts->btree_insert_flags	= 0;
37 
38 	if (io_opts->background_compression &&
39 	    !bch2_bkey_is_incompressible(k)) {
40 		const union bch_extent_entry *entry;
41 		struct extent_ptr_decoded p;
42 
43 		i = 0;
44 		bkey_for_each_ptr_decode(k.k, ptrs, p, entry) {
45 			if (!p.ptr.cached &&
46 			    p.crc.compression_type !=
47 			    bch2_compression_opt_to_type(io_opts->background_compression))
48 				data_opts->rewrite_ptrs |= 1U << i;
49 			i++;
50 		}
51 	}
52 
53 	if (io_opts->background_target) {
54 		const struct bch_extent_ptr *ptr;
55 
56 		i = 0;
57 		bkey_for_each_ptr(ptrs, ptr) {
58 			if (!ptr->cached &&
59 			    !bch2_dev_in_target(c, ptr->dev, io_opts->background_target) &&
60 			    bch2_target_accepts_data(c, BCH_DATA_user, io_opts->background_target))
61 				data_opts->rewrite_ptrs |= 1U << i;
62 			i++;
63 		}
64 	}
65 
66 	return data_opts->rewrite_ptrs != 0;
67 }
68 
69 void bch2_rebalance_add_key(struct bch_fs *c,
70 			    struct bkey_s_c k,
71 			    struct bch_io_opts *io_opts)
72 {
73 	struct data_update_opts update_opts = { 0 };
74 	struct bkey_ptrs_c ptrs;
75 	const struct bch_extent_ptr *ptr;
76 	unsigned i;
77 
78 	if (!rebalance_pred(c, NULL, k, io_opts, &update_opts))
79 		return;
80 
81 	i = 0;
82 	ptrs = bch2_bkey_ptrs_c(k);
83 	bkey_for_each_ptr(ptrs, ptr) {
84 		if ((1U << i) && update_opts.rewrite_ptrs)
85 			if (atomic64_add_return(k.k->size,
86 					&bch_dev_bkey_exists(c, ptr->dev)->rebalance_work) ==
87 			    k.k->size)
88 				rebalance_wakeup(c);
89 		i++;
90 	}
91 }
92 
93 void bch2_rebalance_add_work(struct bch_fs *c, u64 sectors)
94 {
95 	if (atomic64_add_return(sectors, &c->rebalance.work_unknown_dev) ==
96 	    sectors)
97 		rebalance_wakeup(c);
98 }
99 
100 struct rebalance_work {
101 	int		dev_most_full_idx;
102 	unsigned	dev_most_full_percent;
103 	u64		dev_most_full_work;
104 	u64		dev_most_full_capacity;
105 	u64		total_work;
106 };
107 
108 static void rebalance_work_accumulate(struct rebalance_work *w,
109 		u64 dev_work, u64 unknown_dev, u64 capacity, int idx)
110 {
111 	unsigned percent_full;
112 	u64 work = dev_work + unknown_dev;
113 
114 	/* avoid divide by 0 */
115 	if (!capacity)
116 		return;
117 
118 	if (work < dev_work || work < unknown_dev)
119 		work = U64_MAX;
120 	work = min(work, capacity);
121 
122 	percent_full = div64_u64(work * 100, capacity);
123 
124 	if (percent_full >= w->dev_most_full_percent) {
125 		w->dev_most_full_idx		= idx;
126 		w->dev_most_full_percent	= percent_full;
127 		w->dev_most_full_work		= work;
128 		w->dev_most_full_capacity	= capacity;
129 	}
130 
131 	if (w->total_work + dev_work >= w->total_work &&
132 	    w->total_work + dev_work >= dev_work)
133 		w->total_work += dev_work;
134 }
135 
136 static struct rebalance_work rebalance_work(struct bch_fs *c)
137 {
138 	struct bch_dev *ca;
139 	struct rebalance_work ret = { .dev_most_full_idx = -1 };
140 	u64 unknown_dev = atomic64_read(&c->rebalance.work_unknown_dev);
141 	unsigned i;
142 
143 	for_each_online_member(ca, c, i)
144 		rebalance_work_accumulate(&ret,
145 			atomic64_read(&ca->rebalance_work),
146 			unknown_dev,
147 			bucket_to_sector(ca, ca->mi.nbuckets -
148 					 ca->mi.first_bucket),
149 			i);
150 
151 	rebalance_work_accumulate(&ret,
152 		unknown_dev, 0, c->capacity, -1);
153 
154 	return ret;
155 }
156 
157 static void rebalance_work_reset(struct bch_fs *c)
158 {
159 	struct bch_dev *ca;
160 	unsigned i;
161 
162 	for_each_online_member(ca, c, i)
163 		atomic64_set(&ca->rebalance_work, 0);
164 
165 	atomic64_set(&c->rebalance.work_unknown_dev, 0);
166 }
167 
168 static unsigned long curr_cputime(void)
169 {
170 	u64 utime, stime;
171 
172 	task_cputime_adjusted(current, &utime, &stime);
173 	return nsecs_to_jiffies(utime + stime);
174 }
175 
176 static int bch2_rebalance_thread(void *arg)
177 {
178 	struct bch_fs *c = arg;
179 	struct bch_fs_rebalance *r = &c->rebalance;
180 	struct io_clock *clock = &c->io_clock[WRITE];
181 	struct rebalance_work w, p;
182 	struct bch_move_stats move_stats;
183 	unsigned long start, prev_start;
184 	unsigned long prev_run_time, prev_run_cputime;
185 	unsigned long cputime, prev_cputime;
186 	u64 io_start;
187 	long throttle;
188 
189 	set_freezable();
190 
191 	io_start	= atomic64_read(&clock->now);
192 	p		= rebalance_work(c);
193 	prev_start	= jiffies;
194 	prev_cputime	= curr_cputime();
195 
196 	bch2_move_stats_init(&move_stats, "rebalance");
197 	while (!kthread_wait_freezable(r->enabled)) {
198 		cond_resched();
199 
200 		start			= jiffies;
201 		cputime			= curr_cputime();
202 
203 		prev_run_time		= start - prev_start;
204 		prev_run_cputime	= cputime - prev_cputime;
205 
206 		w			= rebalance_work(c);
207 		BUG_ON(!w.dev_most_full_capacity);
208 
209 		if (!w.total_work) {
210 			r->state = REBALANCE_WAITING;
211 			kthread_wait_freezable(rebalance_work(c).total_work);
212 			continue;
213 		}
214 
215 		/*
216 		 * If there isn't much work to do, throttle cpu usage:
217 		 */
218 		throttle = prev_run_cputime * 100 /
219 			max(1U, w.dev_most_full_percent) -
220 			prev_run_time;
221 
222 		if (w.dev_most_full_percent < 20 && throttle > 0) {
223 			r->throttled_until_iotime = io_start +
224 				div_u64(w.dev_most_full_capacity *
225 					(20 - w.dev_most_full_percent),
226 					50);
227 
228 			if (atomic64_read(&clock->now) + clock->max_slop <
229 			    r->throttled_until_iotime) {
230 				r->throttled_until_cputime = start + throttle;
231 				r->state = REBALANCE_THROTTLED;
232 
233 				bch2_kthread_io_clock_wait(clock,
234 					r->throttled_until_iotime,
235 					throttle);
236 				continue;
237 			}
238 		}
239 
240 		/* minimum 1 mb/sec: */
241 		r->pd.rate.rate =
242 			max_t(u64, 1 << 11,
243 			      r->pd.rate.rate *
244 			      max(p.dev_most_full_percent, 1U) /
245 			      max(w.dev_most_full_percent, 1U));
246 
247 		io_start	= atomic64_read(&clock->now);
248 		p		= w;
249 		prev_start	= start;
250 		prev_cputime	= cputime;
251 
252 		r->state = REBALANCE_RUNNING;
253 		memset(&move_stats, 0, sizeof(move_stats));
254 		rebalance_work_reset(c);
255 
256 		bch2_move_data(c,
257 			       0,		POS_MIN,
258 			       BTREE_ID_NR,	POS_MAX,
259 			       /* ratelimiting disabled for now */
260 			       NULL, /*  &r->pd.rate, */
261 			       &move_stats,
262 			       writepoint_ptr(&c->rebalance_write_point),
263 			       true,
264 			       rebalance_pred, NULL);
265 	}
266 
267 	return 0;
268 }
269 
270 void bch2_rebalance_work_to_text(struct printbuf *out, struct bch_fs *c)
271 {
272 	struct bch_fs_rebalance *r = &c->rebalance;
273 	struct rebalance_work w = rebalance_work(c);
274 
275 	if (!out->nr_tabstops)
276 		printbuf_tabstop_push(out, 20);
277 
278 	prt_printf(out, "fullest_dev (%i):", w.dev_most_full_idx);
279 	prt_tab(out);
280 
281 	prt_human_readable_u64(out, w.dev_most_full_work << 9);
282 	prt_printf(out, "/");
283 	prt_human_readable_u64(out, w.dev_most_full_capacity << 9);
284 	prt_newline(out);
285 
286 	prt_printf(out, "total work:");
287 	prt_tab(out);
288 
289 	prt_human_readable_u64(out, w.total_work << 9);
290 	prt_printf(out, "/");
291 	prt_human_readable_u64(out, c->capacity << 9);
292 	prt_newline(out);
293 
294 	prt_printf(out, "rate:");
295 	prt_tab(out);
296 	prt_printf(out, "%u", r->pd.rate.rate);
297 	prt_newline(out);
298 
299 	switch (r->state) {
300 	case REBALANCE_WAITING:
301 		prt_printf(out, "waiting");
302 		break;
303 	case REBALANCE_THROTTLED:
304 		prt_printf(out, "throttled for %lu sec or ",
305 		       (r->throttled_until_cputime - jiffies) / HZ);
306 		prt_human_readable_u64(out,
307 			    (r->throttled_until_iotime -
308 			     atomic64_read(&c->io_clock[WRITE].now)) << 9);
309 		prt_printf(out, " io");
310 		break;
311 	case REBALANCE_RUNNING:
312 		prt_printf(out, "running");
313 		break;
314 	}
315 	prt_newline(out);
316 }
317 
318 void bch2_rebalance_stop(struct bch_fs *c)
319 {
320 	struct task_struct *p;
321 
322 	c->rebalance.pd.rate.rate = UINT_MAX;
323 	bch2_ratelimit_reset(&c->rebalance.pd.rate);
324 
325 	p = rcu_dereference_protected(c->rebalance.thread, 1);
326 	c->rebalance.thread = NULL;
327 
328 	if (p) {
329 		/* for sychronizing with rebalance_wakeup() */
330 		synchronize_rcu();
331 
332 		kthread_stop(p);
333 		put_task_struct(p);
334 	}
335 }
336 
337 int bch2_rebalance_start(struct bch_fs *c)
338 {
339 	struct task_struct *p;
340 	int ret;
341 
342 	if (c->rebalance.thread)
343 		return 0;
344 
345 	if (c->opts.nochanges)
346 		return 0;
347 
348 	p = kthread_create(bch2_rebalance_thread, c, "bch-rebalance/%s", c->name);
349 	ret = PTR_ERR_OR_ZERO(p);
350 	if (ret) {
351 		bch_err_msg(c, ret, "creating rebalance thread");
352 		return ret;
353 	}
354 
355 	get_task_struct(p);
356 	rcu_assign_pointer(c->rebalance.thread, p);
357 	wake_up_process(p);
358 	return 0;
359 }
360 
361 void bch2_fs_rebalance_init(struct bch_fs *c)
362 {
363 	bch2_pd_controller_init(&c->rebalance.pd);
364 
365 	atomic64_set(&c->rebalance.work_unknown_dev, S64_MAX);
366 }
367