xref: /linux/tools/testing/selftests/ublk/batch.c (revision cb5a6b308700c65c29baccbb6b9b07f306633ad5)
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Description: UBLK_F_BATCH_IO buffer management
4  */
5 
6 #include "kublk.h"
7 
8 static inline void *ublk_get_commit_buf(struct ublk_thread *t,
9 					unsigned short buf_idx)
10 {
11 	unsigned idx;
12 
13 	if (buf_idx < t->commit_buf_start ||
14 			buf_idx >= t->commit_buf_start + t->nr_commit_buf)
15 		return NULL;
16 	idx = buf_idx - t->commit_buf_start;
17 	return t->commit_buf + idx * t->commit_buf_size;
18 }
19 
20 /*
21  * Allocate one buffer for UBLK_U_IO_PREP_IO_CMDS or UBLK_U_IO_COMMIT_IO_CMDS
22  *
23  * Buffer index is returned.
24  */
25 static inline unsigned short ublk_alloc_commit_buf(struct ublk_thread *t)
26 {
27 	int idx = allocator_get(&t->commit_buf_alloc);
28 
29 	if (idx >= 0)
30 		return  idx + t->commit_buf_start;
31 	return UBLKS_T_COMMIT_BUF_INV_IDX;
32 }
33 
34 /*
35  * Free one commit buffer which is used by UBLK_U_IO_PREP_IO_CMDS or
36  * UBLK_U_IO_COMMIT_IO_CMDS
37  */
38 static inline void ublk_free_commit_buf(struct ublk_thread *t,
39 					 unsigned short i)
40 {
41 	unsigned short idx = i - t->commit_buf_start;
42 
43 	ublk_assert(idx < t->nr_commit_buf);
44 	ublk_assert(allocator_get_val(&t->commit_buf_alloc, idx) != 0);
45 
46 	allocator_put(&t->commit_buf_alloc, idx);
47 }
48 
49 static unsigned char ublk_commit_elem_buf_size(struct ublk_dev *dev)
50 {
51 	if (dev->dev_info.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_USER_COPY |
52 				UBLK_F_AUTO_BUF_REG))
53 		return 8;
54 
55 	/* one extra 8bytes for carrying buffer address */
56 	return 16;
57 }
58 
59 static unsigned ublk_commit_buf_size(struct ublk_thread *t)
60 {
61 	struct ublk_dev *dev = t->dev;
62 	unsigned elem_size = ublk_commit_elem_buf_size(dev);
63 	unsigned int total = elem_size * dev->dev_info.queue_depth;
64 	unsigned int page_sz = getpagesize();
65 
66 	return round_up(total, page_sz);
67 }
68 
69 static void free_batch_commit_buf(struct ublk_thread *t)
70 {
71 	if (t->commit_buf) {
72 		unsigned buf_size = ublk_commit_buf_size(t);
73 		unsigned int total = buf_size * t->nr_commit_buf;
74 
75 		munlock(t->commit_buf, total);
76 		free(t->commit_buf);
77 	}
78 	allocator_deinit(&t->commit_buf_alloc);
79 }
80 
81 static int alloc_batch_commit_buf(struct ublk_thread *t)
82 {
83 	unsigned buf_size = ublk_commit_buf_size(t);
84 	unsigned int total = buf_size * t->nr_commit_buf;
85 	unsigned int page_sz = getpagesize();
86 	void *buf = NULL;
87 	int ret;
88 
89 	allocator_init(&t->commit_buf_alloc, t->nr_commit_buf);
90 
91 	t->commit_buf = NULL;
92 	ret = posix_memalign(&buf, page_sz, total);
93 	if (ret || !buf)
94 		goto fail;
95 
96 	t->commit_buf = buf;
97 
98 	/* lock commit buffer pages for fast access */
99 	if (mlock(t->commit_buf, total))
100 		ublk_err("%s: can't lock commit buffer %s\n", __func__,
101 			strerror(errno));
102 
103 	return 0;
104 
105 fail:
106 	free_batch_commit_buf(t);
107 	return ret;
108 }
109 
110 void ublk_batch_prepare(struct ublk_thread *t)
111 {
112 	/*
113 	 * We only handle single device in this thread context.
114 	 *
115 	 * All queues have same feature flags, so use queue 0's for
116 	 * calculate uring_cmd flags.
117 	 *
118 	 * This way looks not elegant, but it works so far.
119 	 */
120 	struct ublk_queue *q = &t->dev->q[0];
121 
122 	t->commit_buf_elem_size = ublk_commit_elem_buf_size(t->dev);
123 	t->commit_buf_size = ublk_commit_buf_size(t);
124 	t->commit_buf_start = t->nr_bufs;
125 	t->nr_commit_buf = 2;
126 	t->nr_bufs += t->nr_commit_buf;
127 
128 	t->cmd_flags = 0;
129 	if (ublk_queue_use_auto_zc(q)) {
130 		if (ublk_queue_auto_zc_fallback(q))
131 			t->cmd_flags |= UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK;
132 	} else if (!ublk_queue_no_buf(q))
133 		t->cmd_flags |= UBLK_BATCH_F_HAS_BUF_ADDR;
134 
135 	t->state |= UBLKS_T_BATCH_IO;
136 
137 	ublk_log("%s: thread %d commit(nr_bufs %u, buf_size %u, start %u)\n",
138 			__func__, t->idx,
139 			t->nr_commit_buf, t->commit_buf_size,
140 			t->nr_bufs);
141 }
142 
143 static void free_batch_fetch_buf(struct ublk_thread *t)
144 {
145 	int i;
146 
147 	for (i = 0; i < UBLKS_T_NR_FETCH_BUF; i++) {
148 		io_uring_free_buf_ring(&t->ring, t->fetch[i].br, 1, i);
149 		munlock(t->fetch[i].fetch_buf, t->fetch[i].fetch_buf_size);
150 		free(t->fetch[i].fetch_buf);
151 	}
152 }
153 
154 static int alloc_batch_fetch_buf(struct ublk_thread *t)
155 {
156 	/* page aligned fetch buffer, and it is mlocked for speedup delivery */
157 	unsigned pg_sz = getpagesize();
158 	unsigned buf_size = round_up(t->dev->dev_info.queue_depth * 2, pg_sz);
159 	int ret;
160 	int i = 0;
161 
162 	for (i = 0; i < UBLKS_T_NR_FETCH_BUF; i++) {
163 		t->fetch[i].fetch_buf_size = buf_size;
164 
165 		if (posix_memalign((void **)&t->fetch[i].fetch_buf, pg_sz,
166 					t->fetch[i].fetch_buf_size))
167 			return -ENOMEM;
168 
169 		/* lock fetch buffer page for fast fetching */
170 		if (mlock(t->fetch[i].fetch_buf, t->fetch[i].fetch_buf_size))
171 			ublk_err("%s: can't lock fetch buffer %s\n", __func__,
172 				strerror(errno));
173 		t->fetch[i].br = io_uring_setup_buf_ring(&t->ring, 1,
174 			i, IOU_PBUF_RING_INC, &ret);
175 		if (!t->fetch[i].br) {
176 			ublk_err("Buffer ring register failed %d\n", ret);
177 			return ret;
178 		}
179 	}
180 
181 	return 0;
182 }
183 
184 int ublk_batch_alloc_buf(struct ublk_thread *t)
185 {
186 	int ret;
187 
188 	ublk_assert(t->nr_commit_buf < 16);
189 
190 	ret = alloc_batch_commit_buf(t);
191 	if (ret)
192 		return ret;
193 	return alloc_batch_fetch_buf(t);
194 }
195 
196 void ublk_batch_free_buf(struct ublk_thread *t)
197 {
198 	free_batch_commit_buf(t);
199 	free_batch_fetch_buf(t);
200 }
201 
202 static void ublk_init_batch_cmd(struct ublk_thread *t, __u16 q_id,
203 				struct io_uring_sqe *sqe, unsigned op,
204 				unsigned short elem_bytes,
205 				unsigned short nr_elem,
206 				unsigned short buf_idx)
207 {
208 	struct ublk_batch_io *cmd;
209 	__u64 user_data;
210 
211 	cmd = (struct ublk_batch_io *)ublk_get_sqe_cmd(sqe);
212 
213 	ublk_set_sqe_cmd_op(sqe, op);
214 
215 	sqe->fd	= 0;	/* dev->fds[0] */
216 	sqe->opcode	= IORING_OP_URING_CMD;
217 	sqe->flags	= IOSQE_FIXED_FILE;
218 
219 	cmd->q_id	= q_id;
220 	cmd->flags	= 0;
221 	cmd->reserved 	= 0;
222 	cmd->elem_bytes = elem_bytes;
223 	cmd->nr_elem	= nr_elem;
224 
225 	user_data = build_user_data(buf_idx, _IOC_NR(op), nr_elem, q_id, 0);
226 	io_uring_sqe_set_data64(sqe, user_data);
227 
228 	t->cmd_inflight += 1;
229 
230 	ublk_dbg(UBLK_DBG_IO_CMD, "%s: thread %u qid %d cmd_op %x data %lx "
231 			"nr_elem %u elem_bytes %u buf_size %u buf_idx %d "
232 			"cmd_inflight %u\n",
233 			__func__, t->idx, q_id, op, user_data,
234 			cmd->nr_elem, cmd->elem_bytes,
235 			nr_elem * elem_bytes, buf_idx, t->cmd_inflight);
236 }
237 
238 static void ublk_setup_commit_sqe(struct ublk_thread *t,
239 				  struct io_uring_sqe *sqe,
240 				  unsigned short buf_idx)
241 {
242 	struct ublk_batch_io *cmd;
243 
244 	cmd = (struct ublk_batch_io *)ublk_get_sqe_cmd(sqe);
245 
246 	/* Use plain user buffer instead of fixed buffer */
247 	cmd->flags |= t->cmd_flags;
248 }
249 
250 static void ublk_batch_queue_fetch(struct ublk_thread *t,
251 				   struct ublk_queue *q,
252 				   unsigned short buf_idx)
253 {
254 	unsigned short nr_elem = t->fetch[buf_idx].fetch_buf_size / 2;
255 	struct io_uring_sqe *sqe;
256 
257 	io_uring_buf_ring_add(t->fetch[buf_idx].br, t->fetch[buf_idx].fetch_buf,
258 			t->fetch[buf_idx].fetch_buf_size,
259 			0, 0, 0);
260 	io_uring_buf_ring_advance(t->fetch[buf_idx].br, 1);
261 
262 	ublk_io_alloc_sqes(t, &sqe, 1);
263 
264 	ublk_init_batch_cmd(t, q->q_id, sqe, UBLK_U_IO_FETCH_IO_CMDS, 2, nr_elem,
265 			buf_idx);
266 
267 	sqe->rw_flags= IORING_URING_CMD_MULTISHOT;
268 	sqe->buf_group = buf_idx;
269 	sqe->flags |= IOSQE_BUFFER_SELECT;
270 
271 	t->fetch[buf_idx].fetch_buf_off = 0;
272 }
273 
274 void ublk_batch_start_fetch(struct ublk_thread *t,
275 			    struct ublk_queue *q)
276 {
277 	int i;
278 
279 	for (i = 0; i < UBLKS_T_NR_FETCH_BUF; i++)
280 		ublk_batch_queue_fetch(t, q, i);
281 }
282 
283 static unsigned short ublk_compl_batch_fetch(struct ublk_thread *t,
284 				   struct ublk_queue *q,
285 				   const struct io_uring_cqe *cqe)
286 {
287 	unsigned short buf_idx = user_data_to_tag(cqe->user_data);
288 	unsigned start = t->fetch[buf_idx].fetch_buf_off;
289 	unsigned end = start + cqe->res;
290 	void *buf = t->fetch[buf_idx].fetch_buf;
291 	int i;
292 
293 	if (cqe->res < 0)
294 		return buf_idx;
295 
296        if ((end - start) / 2 > q->q_depth) {
297                ublk_err("%s: fetch duplicated ios offset %u count %u\n", __func__, start, cqe->res);
298 
299                for (i = start; i < end; i += 2) {
300                        unsigned short tag = *(unsigned short *)(buf + i);
301 
302                        ublk_err("%u ", tag);
303                }
304                ublk_err("\n");
305        }
306 
307 	for (i = start; i < end; i += 2) {
308 		unsigned short tag = *(unsigned short *)(buf + i);
309 
310 		if (tag >= q->q_depth)
311 			ublk_err("%s: bad tag %u\n", __func__, tag);
312 
313 		if (q->tgt_ops->queue_io)
314 			q->tgt_ops->queue_io(t, q, tag);
315 	}
316 	t->fetch[buf_idx].fetch_buf_off = end;
317 	return buf_idx;
318 }
319 
320 int ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q)
321 {
322 	unsigned short nr_elem = q->q_depth;
323 	unsigned short buf_idx = ublk_alloc_commit_buf(t);
324 	struct io_uring_sqe *sqe;
325 	void *buf;
326 	int i;
327 
328 	ublk_assert(buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX);
329 
330 	ublk_io_alloc_sqes(t, &sqe, 1);
331 
332 	ublk_assert(nr_elem == q->q_depth);
333 	buf = ublk_get_commit_buf(t, buf_idx);
334 	for (i = 0; i < nr_elem; i++) {
335 		struct ublk_batch_elem *elem = (struct ublk_batch_elem *)(
336 				buf + i * t->commit_buf_elem_size);
337 		struct ublk_io *io = &q->ios[i];
338 
339 		elem->tag = i;
340 		elem->result = 0;
341 
342 		if (ublk_queue_use_auto_zc(q))
343 			elem->buf_index = ublk_batch_io_buf_idx(t, q, i);
344 		else if (!ublk_queue_no_buf(q))
345 			elem->buf_addr = (__u64)io->buf_addr;
346 	}
347 
348 	sqe->addr = (__u64)buf;
349 	sqe->len = t->commit_buf_elem_size * nr_elem;
350 
351 	ublk_init_batch_cmd(t, q->q_id, sqe, UBLK_U_IO_PREP_IO_CMDS,
352 			t->commit_buf_elem_size, nr_elem, buf_idx);
353 	ublk_setup_commit_sqe(t, sqe, buf_idx);
354 	return 0;
355 }
356 
357 static void ublk_batch_compl_commit_cmd(struct ublk_thread *t,
358 					const struct io_uring_cqe *cqe,
359 					unsigned op)
360 {
361 	unsigned short buf_idx = user_data_to_tag(cqe->user_data);
362 
363 	if (op == _IOC_NR(UBLK_U_IO_PREP_IO_CMDS))
364 		ublk_assert(cqe->res == 0);
365 	else if (op == _IOC_NR(UBLK_U_IO_COMMIT_IO_CMDS)) {
366 		int nr_elem = user_data_to_tgt_data(cqe->user_data);
367 
368 		ublk_assert(cqe->res == t->commit_buf_elem_size * nr_elem);
369 	} else
370 		ublk_assert(0);
371 
372 	ublk_free_commit_buf(t, buf_idx);
373 }
374 
375 void ublk_batch_compl_cmd(struct ublk_thread *t,
376 			  const struct io_uring_cqe *cqe)
377 {
378 	unsigned op = user_data_to_op(cqe->user_data);
379 	struct ublk_queue *q;
380 	unsigned buf_idx;
381 	unsigned q_id;
382 
383 	if (op == _IOC_NR(UBLK_U_IO_PREP_IO_CMDS) ||
384 			op == _IOC_NR(UBLK_U_IO_COMMIT_IO_CMDS)) {
385 		t->cmd_inflight--;
386 		ublk_batch_compl_commit_cmd(t, cqe, op);
387 		return;
388 	}
389 
390 	/* FETCH command is per queue */
391 	q_id = user_data_to_q_id(cqe->user_data);
392 	q = &t->dev->q[q_id];
393 	buf_idx = ublk_compl_batch_fetch(t, q, cqe);
394 
395 	if (cqe->res < 0 && cqe->res != -ENOBUFS) {
396 		t->cmd_inflight--;
397 		t->state |= UBLKS_T_STOPPING;
398 	} else if (!(cqe->flags & IORING_CQE_F_MORE) || cqe->res == -ENOBUFS) {
399 		t->cmd_inflight--;
400 		ublk_batch_queue_fetch(t, q, buf_idx);
401 	}
402 }
403 
404 void ublk_batch_commit_io_cmds(struct ublk_thread *t)
405 {
406 	struct io_uring_sqe *sqe;
407 	unsigned short buf_idx;
408 	unsigned short nr_elem = t->commit.done;
409 
410 	/* nothing to commit */
411 	if (!nr_elem) {
412 		ublk_free_commit_buf(t, t->commit.buf_idx);
413 		return;
414 	}
415 
416 	ublk_io_alloc_sqes(t, &sqe, 1);
417 	buf_idx = t->commit.buf_idx;
418 	sqe->addr = (__u64)t->commit.elem;
419 	sqe->len = nr_elem * t->commit_buf_elem_size;
420 
421 	/* commit isn't per-queue command */
422 	ublk_init_batch_cmd(t, t->commit.q_id, sqe, UBLK_U_IO_COMMIT_IO_CMDS,
423 			t->commit_buf_elem_size, nr_elem, buf_idx);
424 	ublk_setup_commit_sqe(t, sqe, buf_idx);
425 }
426 
427 static void ublk_batch_init_commit(struct ublk_thread *t,
428 				   unsigned short buf_idx)
429 {
430 	/* so far only support 1:1 queue/thread mapping */
431 	t->commit.q_id = t->idx;
432 	t->commit.buf_idx = buf_idx;
433 	t->commit.elem = ublk_get_commit_buf(t, buf_idx);
434 	t->commit.done = 0;
435 	t->commit.count = t->commit_buf_size /
436 		t->commit_buf_elem_size;
437 }
438 
439 void ublk_batch_prep_commit(struct ublk_thread *t)
440 {
441 	unsigned short buf_idx = ublk_alloc_commit_buf(t);
442 
443 	ublk_assert(buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX);
444 	ublk_batch_init_commit(t, buf_idx);
445 }
446 
447 void ublk_batch_complete_io(struct ublk_thread *t, struct ublk_queue *q,
448 			    unsigned tag, int res)
449 {
450 	struct batch_commit_buf *cb = &t->commit;
451 	struct ublk_batch_elem *elem = (struct ublk_batch_elem *)(cb->elem +
452 			cb->done * t->commit_buf_elem_size);
453 	struct ublk_io *io = &q->ios[tag];
454 
455 	ublk_assert(q->q_id == t->commit.q_id);
456 
457 	elem->tag = tag;
458 	elem->buf_index = ublk_batch_io_buf_idx(t, q, tag);
459 	elem->result = res;
460 
461 	if (!ublk_queue_no_buf(q))
462 		elem->buf_addr	= (__u64) (uintptr_t) io->buf_addr;
463 
464 	cb->done += 1;
465 	ublk_assert(cb->done <= cb->count);
466 }
467