1 /* SPDX-License-Identifier: MIT */ 2 /* 3 * Description: UBLK_F_BATCH_IO buffer management 4 */ 5 6 #include "kublk.h" 7 8 static inline void *ublk_get_commit_buf(struct ublk_thread *t, 9 unsigned short buf_idx) 10 { 11 unsigned idx; 12 13 if (buf_idx < t->commit_buf_start || 14 buf_idx >= t->commit_buf_start + t->nr_commit_buf) 15 return NULL; 16 idx = buf_idx - t->commit_buf_start; 17 return t->commit_buf + idx * t->commit_buf_size; 18 } 19 20 /* 21 * Allocate one buffer for UBLK_U_IO_PREP_IO_CMDS or UBLK_U_IO_COMMIT_IO_CMDS 22 * 23 * Buffer index is returned. 24 */ 25 static inline unsigned short ublk_alloc_commit_buf(struct ublk_thread *t) 26 { 27 int idx = allocator_get(&t->commit_buf_alloc); 28 29 if (idx >= 0) 30 return idx + t->commit_buf_start; 31 return UBLKS_T_COMMIT_BUF_INV_IDX; 32 } 33 34 /* 35 * Free one commit buffer which is used by UBLK_U_IO_PREP_IO_CMDS or 36 * UBLK_U_IO_COMMIT_IO_CMDS 37 */ 38 static inline void ublk_free_commit_buf(struct ublk_thread *t, 39 unsigned short i) 40 { 41 unsigned short idx = i - t->commit_buf_start; 42 43 ublk_assert(idx < t->nr_commit_buf); 44 ublk_assert(allocator_get_val(&t->commit_buf_alloc, idx) != 0); 45 46 allocator_put(&t->commit_buf_alloc, idx); 47 } 48 49 static unsigned char ublk_commit_elem_buf_size(struct ublk_dev *dev) 50 { 51 if (dev->dev_info.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_USER_COPY | 52 UBLK_F_AUTO_BUF_REG)) 53 return 8; 54 55 /* one extra 8bytes for carrying buffer address */ 56 return 16; 57 } 58 59 static unsigned ublk_commit_buf_size(struct ublk_thread *t) 60 { 61 struct ublk_dev *dev = t->dev; 62 unsigned elem_size = ublk_commit_elem_buf_size(dev); 63 unsigned int total = elem_size * dev->dev_info.queue_depth; 64 unsigned int page_sz = getpagesize(); 65 66 return round_up(total, page_sz); 67 } 68 69 static void free_batch_commit_buf(struct ublk_thread *t) 70 { 71 if (t->commit_buf) { 72 unsigned buf_size = ublk_commit_buf_size(t); 73 unsigned int total = buf_size * t->nr_commit_buf; 74 75 munlock(t->commit_buf, total); 76 free(t->commit_buf); 77 } 78 allocator_deinit(&t->commit_buf_alloc); 79 } 80 81 static int alloc_batch_commit_buf(struct ublk_thread *t) 82 { 83 unsigned buf_size = ublk_commit_buf_size(t); 84 unsigned int total = buf_size * t->nr_commit_buf; 85 unsigned int page_sz = getpagesize(); 86 void *buf = NULL; 87 int ret; 88 89 allocator_init(&t->commit_buf_alloc, t->nr_commit_buf); 90 91 t->commit_buf = NULL; 92 ret = posix_memalign(&buf, page_sz, total); 93 if (ret || !buf) 94 goto fail; 95 96 t->commit_buf = buf; 97 98 /* lock commit buffer pages for fast access */ 99 if (mlock(t->commit_buf, total)) 100 ublk_err("%s: can't lock commit buffer %s\n", __func__, 101 strerror(errno)); 102 103 return 0; 104 105 fail: 106 free_batch_commit_buf(t); 107 return ret; 108 } 109 110 void ublk_batch_prepare(struct ublk_thread *t) 111 { 112 /* 113 * We only handle single device in this thread context. 114 * 115 * All queues have same feature flags, so use queue 0's for 116 * calculate uring_cmd flags. 117 * 118 * This way looks not elegant, but it works so far. 119 */ 120 struct ublk_queue *q = &t->dev->q[0]; 121 122 t->commit_buf_elem_size = ublk_commit_elem_buf_size(t->dev); 123 t->commit_buf_size = ublk_commit_buf_size(t); 124 t->commit_buf_start = t->nr_bufs; 125 t->nr_commit_buf = 2; 126 t->nr_bufs += t->nr_commit_buf; 127 128 t->cmd_flags = 0; 129 if (ublk_queue_use_auto_zc(q)) { 130 if (ublk_queue_auto_zc_fallback(q)) 131 t->cmd_flags |= UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK; 132 } else if (!ublk_queue_no_buf(q)) 133 t->cmd_flags |= UBLK_BATCH_F_HAS_BUF_ADDR; 134 135 t->state |= UBLKS_T_BATCH_IO; 136 137 ublk_log("%s: thread %d commit(nr_bufs %u, buf_size %u, start %u)\n", 138 __func__, t->idx, 139 t->nr_commit_buf, t->commit_buf_size, 140 t->nr_bufs); 141 } 142 143 static void free_batch_fetch_buf(struct ublk_thread *t) 144 { 145 int i; 146 147 for (i = 0; i < UBLKS_T_NR_FETCH_BUF; i++) { 148 io_uring_free_buf_ring(&t->ring, t->fetch[i].br, 1, i); 149 munlock(t->fetch[i].fetch_buf, t->fetch[i].fetch_buf_size); 150 free(t->fetch[i].fetch_buf); 151 } 152 } 153 154 static int alloc_batch_fetch_buf(struct ublk_thread *t) 155 { 156 /* page aligned fetch buffer, and it is mlocked for speedup delivery */ 157 unsigned pg_sz = getpagesize(); 158 unsigned buf_size = round_up(t->dev->dev_info.queue_depth * 2, pg_sz); 159 int ret; 160 int i = 0; 161 162 for (i = 0; i < UBLKS_T_NR_FETCH_BUF; i++) { 163 t->fetch[i].fetch_buf_size = buf_size; 164 165 if (posix_memalign((void **)&t->fetch[i].fetch_buf, pg_sz, 166 t->fetch[i].fetch_buf_size)) 167 return -ENOMEM; 168 169 /* lock fetch buffer page for fast fetching */ 170 if (mlock(t->fetch[i].fetch_buf, t->fetch[i].fetch_buf_size)) 171 ublk_err("%s: can't lock fetch buffer %s\n", __func__, 172 strerror(errno)); 173 t->fetch[i].br = io_uring_setup_buf_ring(&t->ring, 1, 174 i, IOU_PBUF_RING_INC, &ret); 175 if (!t->fetch[i].br) { 176 ublk_err("Buffer ring register failed %d\n", ret); 177 return ret; 178 } 179 } 180 181 return 0; 182 } 183 184 int ublk_batch_alloc_buf(struct ublk_thread *t) 185 { 186 int ret; 187 188 ublk_assert(t->nr_commit_buf < 16); 189 190 ret = alloc_batch_commit_buf(t); 191 if (ret) 192 return ret; 193 return alloc_batch_fetch_buf(t); 194 } 195 196 void ublk_batch_free_buf(struct ublk_thread *t) 197 { 198 free_batch_commit_buf(t); 199 free_batch_fetch_buf(t); 200 } 201 202 static void ublk_init_batch_cmd(struct ublk_thread *t, __u16 q_id, 203 struct io_uring_sqe *sqe, unsigned op, 204 unsigned short elem_bytes, 205 unsigned short nr_elem, 206 unsigned short buf_idx) 207 { 208 struct ublk_batch_io *cmd; 209 __u64 user_data; 210 211 cmd = (struct ublk_batch_io *)ublk_get_sqe_cmd(sqe); 212 213 ublk_set_sqe_cmd_op(sqe, op); 214 215 sqe->fd = 0; /* dev->fds[0] */ 216 sqe->opcode = IORING_OP_URING_CMD; 217 sqe->flags = IOSQE_FIXED_FILE; 218 219 cmd->q_id = q_id; 220 cmd->flags = 0; 221 cmd->reserved = 0; 222 cmd->elem_bytes = elem_bytes; 223 cmd->nr_elem = nr_elem; 224 225 user_data = build_user_data(buf_idx, _IOC_NR(op), nr_elem, q_id, 0); 226 io_uring_sqe_set_data64(sqe, user_data); 227 228 t->cmd_inflight += 1; 229 230 ublk_dbg(UBLK_DBG_IO_CMD, "%s: thread %u qid %d cmd_op %x data %lx " 231 "nr_elem %u elem_bytes %u buf_size %u buf_idx %d " 232 "cmd_inflight %u\n", 233 __func__, t->idx, q_id, op, user_data, 234 cmd->nr_elem, cmd->elem_bytes, 235 nr_elem * elem_bytes, buf_idx, t->cmd_inflight); 236 } 237 238 static void ublk_setup_commit_sqe(struct ublk_thread *t, 239 struct io_uring_sqe *sqe, 240 unsigned short buf_idx) 241 { 242 struct ublk_batch_io *cmd; 243 244 cmd = (struct ublk_batch_io *)ublk_get_sqe_cmd(sqe); 245 246 /* Use plain user buffer instead of fixed buffer */ 247 cmd->flags |= t->cmd_flags; 248 } 249 250 static void ublk_batch_queue_fetch(struct ublk_thread *t, 251 struct ublk_queue *q, 252 unsigned short buf_idx) 253 { 254 unsigned short nr_elem = t->fetch[buf_idx].fetch_buf_size / 2; 255 struct io_uring_sqe *sqe; 256 257 io_uring_buf_ring_add(t->fetch[buf_idx].br, t->fetch[buf_idx].fetch_buf, 258 t->fetch[buf_idx].fetch_buf_size, 259 0, 0, 0); 260 io_uring_buf_ring_advance(t->fetch[buf_idx].br, 1); 261 262 ublk_io_alloc_sqes(t, &sqe, 1); 263 264 ublk_init_batch_cmd(t, q->q_id, sqe, UBLK_U_IO_FETCH_IO_CMDS, 2, nr_elem, 265 buf_idx); 266 267 sqe->rw_flags= IORING_URING_CMD_MULTISHOT; 268 sqe->buf_group = buf_idx; 269 sqe->flags |= IOSQE_BUFFER_SELECT; 270 271 t->fetch[buf_idx].fetch_buf_off = 0; 272 } 273 274 void ublk_batch_start_fetch(struct ublk_thread *t, 275 struct ublk_queue *q) 276 { 277 int i; 278 279 for (i = 0; i < UBLKS_T_NR_FETCH_BUF; i++) 280 ublk_batch_queue_fetch(t, q, i); 281 } 282 283 static unsigned short ublk_compl_batch_fetch(struct ublk_thread *t, 284 struct ublk_queue *q, 285 const struct io_uring_cqe *cqe) 286 { 287 unsigned short buf_idx = user_data_to_tag(cqe->user_data); 288 unsigned start = t->fetch[buf_idx].fetch_buf_off; 289 unsigned end = start + cqe->res; 290 void *buf = t->fetch[buf_idx].fetch_buf; 291 int i; 292 293 if (cqe->res < 0) 294 return buf_idx; 295 296 if ((end - start) / 2 > q->q_depth) { 297 ublk_err("%s: fetch duplicated ios offset %u count %u\n", __func__, start, cqe->res); 298 299 for (i = start; i < end; i += 2) { 300 unsigned short tag = *(unsigned short *)(buf + i); 301 302 ublk_err("%u ", tag); 303 } 304 ublk_err("\n"); 305 } 306 307 for (i = start; i < end; i += 2) { 308 unsigned short tag = *(unsigned short *)(buf + i); 309 310 if (tag >= q->q_depth) 311 ublk_err("%s: bad tag %u\n", __func__, tag); 312 313 if (q->tgt_ops->queue_io) 314 q->tgt_ops->queue_io(t, q, tag); 315 } 316 t->fetch[buf_idx].fetch_buf_off = end; 317 return buf_idx; 318 } 319 320 int ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q) 321 { 322 unsigned short nr_elem = q->q_depth; 323 unsigned short buf_idx = ublk_alloc_commit_buf(t); 324 struct io_uring_sqe *sqe; 325 void *buf; 326 int i; 327 328 ublk_assert(buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX); 329 330 ublk_io_alloc_sqes(t, &sqe, 1); 331 332 ublk_assert(nr_elem == q->q_depth); 333 buf = ublk_get_commit_buf(t, buf_idx); 334 for (i = 0; i < nr_elem; i++) { 335 struct ublk_batch_elem *elem = (struct ublk_batch_elem *)( 336 buf + i * t->commit_buf_elem_size); 337 struct ublk_io *io = &q->ios[i]; 338 339 elem->tag = i; 340 elem->result = 0; 341 342 if (ublk_queue_use_auto_zc(q)) 343 elem->buf_index = ublk_batch_io_buf_idx(t, q, i); 344 else if (!ublk_queue_no_buf(q)) 345 elem->buf_addr = (__u64)io->buf_addr; 346 } 347 348 sqe->addr = (__u64)buf; 349 sqe->len = t->commit_buf_elem_size * nr_elem; 350 351 ublk_init_batch_cmd(t, q->q_id, sqe, UBLK_U_IO_PREP_IO_CMDS, 352 t->commit_buf_elem_size, nr_elem, buf_idx); 353 ublk_setup_commit_sqe(t, sqe, buf_idx); 354 return 0; 355 } 356 357 static void ublk_batch_compl_commit_cmd(struct ublk_thread *t, 358 const struct io_uring_cqe *cqe, 359 unsigned op) 360 { 361 unsigned short buf_idx = user_data_to_tag(cqe->user_data); 362 363 if (op == _IOC_NR(UBLK_U_IO_PREP_IO_CMDS)) 364 ublk_assert(cqe->res == 0); 365 else if (op == _IOC_NR(UBLK_U_IO_COMMIT_IO_CMDS)) { 366 int nr_elem = user_data_to_tgt_data(cqe->user_data); 367 368 ublk_assert(cqe->res == t->commit_buf_elem_size * nr_elem); 369 } else 370 ublk_assert(0); 371 372 ublk_free_commit_buf(t, buf_idx); 373 } 374 375 void ublk_batch_compl_cmd(struct ublk_thread *t, 376 const struct io_uring_cqe *cqe) 377 { 378 unsigned op = user_data_to_op(cqe->user_data); 379 struct ublk_queue *q; 380 unsigned buf_idx; 381 unsigned q_id; 382 383 if (op == _IOC_NR(UBLK_U_IO_PREP_IO_CMDS) || 384 op == _IOC_NR(UBLK_U_IO_COMMIT_IO_CMDS)) { 385 t->cmd_inflight--; 386 ublk_batch_compl_commit_cmd(t, cqe, op); 387 return; 388 } 389 390 /* FETCH command is per queue */ 391 q_id = user_data_to_q_id(cqe->user_data); 392 q = &t->dev->q[q_id]; 393 buf_idx = ublk_compl_batch_fetch(t, q, cqe); 394 395 if (cqe->res < 0 && cqe->res != -ENOBUFS) { 396 t->cmd_inflight--; 397 t->state |= UBLKS_T_STOPPING; 398 } else if (!(cqe->flags & IORING_CQE_F_MORE) || cqe->res == -ENOBUFS) { 399 t->cmd_inflight--; 400 ublk_batch_queue_fetch(t, q, buf_idx); 401 } 402 } 403 404 void ublk_batch_commit_io_cmds(struct ublk_thread *t) 405 { 406 struct io_uring_sqe *sqe; 407 unsigned short buf_idx; 408 unsigned short nr_elem = t->commit.done; 409 410 /* nothing to commit */ 411 if (!nr_elem) { 412 ublk_free_commit_buf(t, t->commit.buf_idx); 413 return; 414 } 415 416 ublk_io_alloc_sqes(t, &sqe, 1); 417 buf_idx = t->commit.buf_idx; 418 sqe->addr = (__u64)t->commit.elem; 419 sqe->len = nr_elem * t->commit_buf_elem_size; 420 421 /* commit isn't per-queue command */ 422 ublk_init_batch_cmd(t, t->commit.q_id, sqe, UBLK_U_IO_COMMIT_IO_CMDS, 423 t->commit_buf_elem_size, nr_elem, buf_idx); 424 ublk_setup_commit_sqe(t, sqe, buf_idx); 425 } 426 427 static void ublk_batch_init_commit(struct ublk_thread *t, 428 unsigned short buf_idx) 429 { 430 /* so far only support 1:1 queue/thread mapping */ 431 t->commit.q_id = t->idx; 432 t->commit.buf_idx = buf_idx; 433 t->commit.elem = ublk_get_commit_buf(t, buf_idx); 434 t->commit.done = 0; 435 t->commit.count = t->commit_buf_size / 436 t->commit_buf_elem_size; 437 } 438 439 void ublk_batch_prep_commit(struct ublk_thread *t) 440 { 441 unsigned short buf_idx = ublk_alloc_commit_buf(t); 442 443 ublk_assert(buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX); 444 ublk_batch_init_commit(t, buf_idx); 445 } 446 447 void ublk_batch_complete_io(struct ublk_thread *t, struct ublk_queue *q, 448 unsigned tag, int res) 449 { 450 struct batch_commit_buf *cb = &t->commit; 451 struct ublk_batch_elem *elem = (struct ublk_batch_elem *)(cb->elem + 452 cb->done * t->commit_buf_elem_size); 453 struct ublk_io *io = &q->ios[tag]; 454 455 ublk_assert(q->q_id == t->commit.q_id); 456 457 elem->tag = tag; 458 elem->buf_index = ublk_batch_io_buf_idx(t, q, tag); 459 elem->result = res; 460 461 if (!ublk_queue_no_buf(q)) 462 elem->buf_addr = (__u64) (uintptr_t) io->buf_addr; 463 464 cb->done += 1; 465 ublk_assert(cb->done <= cb->count); 466 } 467