1 /* SPDX-License-Identifier: MIT */ 2 /* 3 * Description: UBLK_F_BATCH_IO buffer management 4 */ 5 6 #include "kublk.h" 7 8 static inline void *ublk_get_commit_buf(struct ublk_thread *t, 9 unsigned short buf_idx) 10 { 11 unsigned idx; 12 13 if (buf_idx < t->commit_buf_start || 14 buf_idx >= t->commit_buf_start + t->nr_commit_buf) 15 return NULL; 16 idx = buf_idx - t->commit_buf_start; 17 return t->commit_buf + idx * t->commit_buf_size; 18 } 19 20 /* 21 * Allocate one buffer for UBLK_U_IO_PREP_IO_CMDS or UBLK_U_IO_COMMIT_IO_CMDS 22 * 23 * Buffer index is returned. 24 */ 25 static inline unsigned short ublk_alloc_commit_buf(struct ublk_thread *t) 26 { 27 int idx = allocator_get(&t->commit_buf_alloc); 28 29 if (idx >= 0) 30 return idx + t->commit_buf_start; 31 return UBLKS_T_COMMIT_BUF_INV_IDX; 32 } 33 34 /* 35 * Free one commit buffer which is used by UBLK_U_IO_PREP_IO_CMDS or 36 * UBLK_U_IO_COMMIT_IO_CMDS 37 */ 38 static inline void ublk_free_commit_buf(struct ublk_thread *t, 39 unsigned short i) 40 { 41 unsigned short idx = i - t->commit_buf_start; 42 43 ublk_assert(idx < t->nr_commit_buf); 44 ublk_assert(allocator_get_val(&t->commit_buf_alloc, idx) != 0); 45 46 allocator_put(&t->commit_buf_alloc, idx); 47 } 48 49 static unsigned char ublk_commit_elem_buf_size(struct ublk_dev *dev) 50 { 51 if (dev->dev_info.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_USER_COPY | 52 UBLK_F_AUTO_BUF_REG)) 53 return 8; 54 55 /* one extra 8bytes for carrying buffer address */ 56 return 16; 57 } 58 59 static unsigned ublk_commit_buf_size(struct ublk_thread *t) 60 { 61 struct ublk_dev *dev = t->dev; 62 unsigned elem_size = ublk_commit_elem_buf_size(dev); 63 unsigned int total = elem_size * dev->dev_info.queue_depth; 64 unsigned int page_sz = getpagesize(); 65 66 return round_up(total, page_sz); 67 } 68 69 static void free_batch_commit_buf(struct ublk_thread *t) 70 { 71 if (t->commit_buf) { 72 unsigned buf_size = ublk_commit_buf_size(t); 73 unsigned int total = buf_size * t->nr_commit_buf; 74 75 munlock(t->commit_buf, total); 76 free(t->commit_buf); 77 } 78 allocator_deinit(&t->commit_buf_alloc); 79 free(t->commit); 80 } 81 82 static int alloc_batch_commit_buf(struct ublk_thread *t) 83 { 84 unsigned buf_size = ublk_commit_buf_size(t); 85 unsigned int total = buf_size * t->nr_commit_buf; 86 unsigned int page_sz = getpagesize(); 87 void *buf = NULL; 88 int i, ret, j = 0; 89 90 t->commit = calloc(t->nr_queues, sizeof(*t->commit)); 91 for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++) { 92 if (t->q_map[i]) 93 t->commit[j++].q_id = i; 94 } 95 96 allocator_init(&t->commit_buf_alloc, t->nr_commit_buf); 97 98 t->commit_buf = NULL; 99 ret = posix_memalign(&buf, page_sz, total); 100 if (ret || !buf) 101 goto fail; 102 103 t->commit_buf = buf; 104 105 /* lock commit buffer pages for fast access */ 106 if (mlock(t->commit_buf, total)) 107 ublk_err("%s: can't lock commit buffer %s\n", __func__, 108 strerror(errno)); 109 110 return 0; 111 112 fail: 113 free_batch_commit_buf(t); 114 return ret; 115 } 116 117 static unsigned int ublk_thread_nr_queues(const struct ublk_thread *t) 118 { 119 int i; 120 int ret = 0; 121 122 for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++) 123 ret += !!t->q_map[i]; 124 125 return ret; 126 } 127 128 void ublk_batch_prepare(struct ublk_thread *t) 129 { 130 /* 131 * We only handle single device in this thread context. 132 * 133 * All queues have same feature flags, so use queue 0's for 134 * calculate uring_cmd flags. 135 * 136 * This way looks not elegant, but it works so far. 137 */ 138 struct ublk_queue *q = &t->dev->q[0]; 139 140 /* cache nr_queues because we don't support dynamic load-balance yet */ 141 t->nr_queues = ublk_thread_nr_queues(t); 142 143 t->commit_buf_elem_size = ublk_commit_elem_buf_size(t->dev); 144 t->commit_buf_size = ublk_commit_buf_size(t); 145 t->commit_buf_start = t->nr_bufs; 146 t->nr_commit_buf = 2 * t->nr_queues; 147 t->nr_bufs += t->nr_commit_buf; 148 149 t->cmd_flags = 0; 150 if (ublk_queue_use_auto_zc(q)) { 151 if (ublk_queue_auto_zc_fallback(q)) 152 t->cmd_flags |= UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK; 153 } else if (!ublk_queue_no_buf(q)) 154 t->cmd_flags |= UBLK_BATCH_F_HAS_BUF_ADDR; 155 156 t->state |= UBLKS_T_BATCH_IO; 157 158 ublk_log("%s: thread %d commit(nr_bufs %u, buf_size %u, start %u)\n", 159 __func__, t->idx, 160 t->nr_commit_buf, t->commit_buf_size, 161 t->nr_bufs); 162 } 163 164 static void free_batch_fetch_buf(struct ublk_thread *t) 165 { 166 int i; 167 168 for (i = 0; i < t->nr_fetch_bufs; i++) { 169 io_uring_free_buf_ring(&t->ring, t->fetch[i].br, 1, i); 170 munlock(t->fetch[i].fetch_buf, t->fetch[i].fetch_buf_size); 171 free(t->fetch[i].fetch_buf); 172 } 173 free(t->fetch); 174 } 175 176 static int alloc_batch_fetch_buf(struct ublk_thread *t) 177 { 178 /* page aligned fetch buffer, and it is mlocked for speedup delivery */ 179 unsigned pg_sz = getpagesize(); 180 unsigned buf_size = round_up(t->dev->dev_info.queue_depth * 2, pg_sz); 181 int ret; 182 int i = 0; 183 184 /* double fetch buffer for each queue */ 185 t->nr_fetch_bufs = t->nr_queues * 2; 186 t->fetch = calloc(t->nr_fetch_bufs, sizeof(*t->fetch)); 187 188 /* allocate one buffer for each queue */ 189 for (i = 0; i < t->nr_fetch_bufs; i++) { 190 t->fetch[i].fetch_buf_size = buf_size; 191 192 if (posix_memalign((void **)&t->fetch[i].fetch_buf, pg_sz, 193 t->fetch[i].fetch_buf_size)) 194 return -ENOMEM; 195 196 /* lock fetch buffer page for fast fetching */ 197 if (mlock(t->fetch[i].fetch_buf, t->fetch[i].fetch_buf_size)) 198 ublk_err("%s: can't lock fetch buffer %s\n", __func__, 199 strerror(errno)); 200 t->fetch[i].br = io_uring_setup_buf_ring(&t->ring, 1, 201 i, IOU_PBUF_RING_INC, &ret); 202 if (!t->fetch[i].br) { 203 ublk_err("Buffer ring register failed %d\n", ret); 204 return ret; 205 } 206 } 207 208 return 0; 209 } 210 211 int ublk_batch_alloc_buf(struct ublk_thread *t) 212 { 213 int ret; 214 215 ublk_assert(t->nr_commit_buf < 2 * UBLK_MAX_QUEUES); 216 217 ret = alloc_batch_commit_buf(t); 218 if (ret) 219 return ret; 220 return alloc_batch_fetch_buf(t); 221 } 222 223 void ublk_batch_free_buf(struct ublk_thread *t) 224 { 225 free_batch_commit_buf(t); 226 free_batch_fetch_buf(t); 227 } 228 229 static void ublk_init_batch_cmd(struct ublk_thread *t, __u16 q_id, 230 struct io_uring_sqe *sqe, unsigned op, 231 unsigned short elem_bytes, 232 unsigned short nr_elem, 233 unsigned short buf_idx) 234 { 235 struct ublk_batch_io *cmd; 236 __u64 user_data; 237 238 cmd = (struct ublk_batch_io *)ublk_get_sqe_cmd(sqe); 239 240 ublk_set_sqe_cmd_op(sqe, op); 241 242 sqe->fd = 0; /* dev->fds[0] */ 243 sqe->opcode = IORING_OP_URING_CMD; 244 sqe->flags = IOSQE_FIXED_FILE; 245 246 cmd->q_id = q_id; 247 cmd->flags = 0; 248 cmd->reserved = 0; 249 cmd->elem_bytes = elem_bytes; 250 cmd->nr_elem = nr_elem; 251 252 user_data = build_user_data(buf_idx, _IOC_NR(op), nr_elem, q_id, 0); 253 io_uring_sqe_set_data64(sqe, user_data); 254 255 t->cmd_inflight += 1; 256 257 ublk_dbg(UBLK_DBG_IO_CMD, "%s: thread %u qid %d cmd_op %x data %lx " 258 "nr_elem %u elem_bytes %u buf_size %u buf_idx %d " 259 "cmd_inflight %u\n", 260 __func__, t->idx, q_id, op, user_data, 261 cmd->nr_elem, cmd->elem_bytes, 262 nr_elem * elem_bytes, buf_idx, t->cmd_inflight); 263 } 264 265 static void ublk_setup_commit_sqe(struct ublk_thread *t, 266 struct io_uring_sqe *sqe, 267 unsigned short buf_idx) 268 { 269 struct ublk_batch_io *cmd; 270 271 cmd = (struct ublk_batch_io *)ublk_get_sqe_cmd(sqe); 272 273 /* Use plain user buffer instead of fixed buffer */ 274 cmd->flags |= t->cmd_flags; 275 } 276 277 static void ublk_batch_queue_fetch(struct ublk_thread *t, 278 struct ublk_queue *q, 279 unsigned short buf_idx) 280 { 281 unsigned short nr_elem = t->fetch[buf_idx].fetch_buf_size / 2; 282 struct io_uring_sqe *sqe; 283 284 io_uring_buf_ring_add(t->fetch[buf_idx].br, t->fetch[buf_idx].fetch_buf, 285 t->fetch[buf_idx].fetch_buf_size, 286 0, 0, 0); 287 io_uring_buf_ring_advance(t->fetch[buf_idx].br, 1); 288 289 ublk_io_alloc_sqes(t, &sqe, 1); 290 291 ublk_init_batch_cmd(t, q->q_id, sqe, UBLK_U_IO_FETCH_IO_CMDS, 2, nr_elem, 292 buf_idx); 293 294 sqe->rw_flags= IORING_URING_CMD_MULTISHOT; 295 sqe->buf_group = buf_idx; 296 sqe->flags |= IOSQE_BUFFER_SELECT; 297 298 t->fetch[buf_idx].fetch_buf_off = 0; 299 } 300 301 void ublk_batch_start_fetch(struct ublk_thread *t) 302 { 303 int i; 304 int j = 0; 305 306 for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++) { 307 if (t->q_map[i]) { 308 struct ublk_queue *q = &t->dev->q[i]; 309 310 /* submit two fetch commands for each queue */ 311 ublk_batch_queue_fetch(t, q, j++); 312 ublk_batch_queue_fetch(t, q, j++); 313 } 314 } 315 } 316 317 static unsigned short ublk_compl_batch_fetch(struct ublk_thread *t, 318 struct ublk_queue *q, 319 const struct io_uring_cqe *cqe) 320 { 321 unsigned short buf_idx = user_data_to_tag(cqe->user_data); 322 unsigned start = t->fetch[buf_idx].fetch_buf_off; 323 unsigned end = start + cqe->res; 324 void *buf = t->fetch[buf_idx].fetch_buf; 325 int i; 326 327 if (cqe->res < 0) 328 return buf_idx; 329 330 if ((end - start) / 2 > q->q_depth) { 331 ublk_err("%s: fetch duplicated ios offset %u count %u\n", __func__, start, cqe->res); 332 333 for (i = start; i < end; i += 2) { 334 unsigned short tag = *(unsigned short *)(buf + i); 335 336 ublk_err("%u ", tag); 337 } 338 ublk_err("\n"); 339 } 340 341 for (i = start; i < end; i += 2) { 342 unsigned short tag = *(unsigned short *)(buf + i); 343 344 if (tag >= q->q_depth) 345 ublk_err("%s: bad tag %u\n", __func__, tag); 346 347 if (q->tgt_ops->queue_io) 348 q->tgt_ops->queue_io(t, q, tag); 349 } 350 t->fetch[buf_idx].fetch_buf_off = end; 351 return buf_idx; 352 } 353 354 static int __ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q) 355 { 356 unsigned short nr_elem = q->q_depth; 357 unsigned short buf_idx = ublk_alloc_commit_buf(t); 358 struct io_uring_sqe *sqe; 359 void *buf; 360 int i; 361 362 ublk_assert(buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX); 363 364 ublk_io_alloc_sqes(t, &sqe, 1); 365 366 ublk_assert(nr_elem == q->q_depth); 367 buf = ublk_get_commit_buf(t, buf_idx); 368 for (i = 0; i < nr_elem; i++) { 369 struct ublk_batch_elem *elem = (struct ublk_batch_elem *)( 370 buf + i * t->commit_buf_elem_size); 371 struct ublk_io *io = &q->ios[i]; 372 373 elem->tag = i; 374 elem->result = 0; 375 376 if (ublk_queue_use_auto_zc(q)) 377 elem->buf_index = ublk_batch_io_buf_idx(t, q, i); 378 else if (!ublk_queue_no_buf(q)) 379 elem->buf_addr = (__u64)io->buf_addr; 380 } 381 382 sqe->addr = (__u64)buf; 383 sqe->len = t->commit_buf_elem_size * nr_elem; 384 385 ublk_init_batch_cmd(t, q->q_id, sqe, UBLK_U_IO_PREP_IO_CMDS, 386 t->commit_buf_elem_size, nr_elem, buf_idx); 387 ublk_setup_commit_sqe(t, sqe, buf_idx); 388 return 0; 389 } 390 391 int ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q) 392 { 393 int ret = 0; 394 395 pthread_spin_lock(&q->lock); 396 if (q->flags & UBLKS_Q_PREPARED) 397 goto unlock; 398 ret = __ublk_batch_queue_prep_io_cmds(t, q); 399 if (!ret) 400 q->flags |= UBLKS_Q_PREPARED; 401 unlock: 402 pthread_spin_unlock(&q->lock); 403 404 return ret; 405 } 406 407 static void ublk_batch_compl_commit_cmd(struct ublk_thread *t, 408 const struct io_uring_cqe *cqe, 409 unsigned op) 410 { 411 unsigned short buf_idx = user_data_to_tag(cqe->user_data); 412 413 if (op == _IOC_NR(UBLK_U_IO_PREP_IO_CMDS)) 414 ublk_assert(cqe->res == 0); 415 else if (op == _IOC_NR(UBLK_U_IO_COMMIT_IO_CMDS)) { 416 int nr_elem = user_data_to_tgt_data(cqe->user_data); 417 418 ublk_assert(cqe->res == t->commit_buf_elem_size * nr_elem); 419 } else 420 ublk_assert(0); 421 422 ublk_free_commit_buf(t, buf_idx); 423 } 424 425 void ublk_batch_compl_cmd(struct ublk_thread *t, 426 const struct io_uring_cqe *cqe) 427 { 428 unsigned op = user_data_to_op(cqe->user_data); 429 struct ublk_queue *q; 430 unsigned buf_idx; 431 unsigned q_id; 432 433 if (op == _IOC_NR(UBLK_U_IO_PREP_IO_CMDS) || 434 op == _IOC_NR(UBLK_U_IO_COMMIT_IO_CMDS)) { 435 t->cmd_inflight--; 436 ublk_batch_compl_commit_cmd(t, cqe, op); 437 return; 438 } 439 440 /* FETCH command is per queue */ 441 q_id = user_data_to_q_id(cqe->user_data); 442 q = &t->dev->q[q_id]; 443 buf_idx = ublk_compl_batch_fetch(t, q, cqe); 444 445 if (cqe->res < 0 && cqe->res != -ENOBUFS) { 446 t->cmd_inflight--; 447 t->state |= UBLKS_T_STOPPING; 448 } else if (!(cqe->flags & IORING_CQE_F_MORE) || cqe->res == -ENOBUFS) { 449 t->cmd_inflight--; 450 ublk_batch_queue_fetch(t, q, buf_idx); 451 } 452 } 453 454 static void __ublk_batch_commit_io_cmds(struct ublk_thread *t, 455 struct batch_commit_buf *cb) 456 { 457 struct io_uring_sqe *sqe; 458 unsigned short buf_idx; 459 unsigned short nr_elem = cb->done; 460 461 /* nothing to commit */ 462 if (!nr_elem) { 463 ublk_free_commit_buf(t, cb->buf_idx); 464 return; 465 } 466 467 ublk_io_alloc_sqes(t, &sqe, 1); 468 buf_idx = cb->buf_idx; 469 sqe->addr = (__u64)cb->elem; 470 sqe->len = nr_elem * t->commit_buf_elem_size; 471 472 /* commit isn't per-queue command */ 473 ublk_init_batch_cmd(t, cb->q_id, sqe, UBLK_U_IO_COMMIT_IO_CMDS, 474 t->commit_buf_elem_size, nr_elem, buf_idx); 475 ublk_setup_commit_sqe(t, sqe, buf_idx); 476 } 477 478 void ublk_batch_commit_io_cmds(struct ublk_thread *t) 479 { 480 int i; 481 482 for (i = 0; i < t->nr_queues; i++) { 483 struct batch_commit_buf *cb = &t->commit[i]; 484 485 if (cb->buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX) 486 __ublk_batch_commit_io_cmds(t, cb); 487 } 488 489 } 490 491 static void __ublk_batch_init_commit(struct ublk_thread *t, 492 struct batch_commit_buf *cb, 493 unsigned short buf_idx) 494 { 495 /* so far only support 1:1 queue/thread mapping */ 496 cb->buf_idx = buf_idx; 497 cb->elem = ublk_get_commit_buf(t, buf_idx); 498 cb->done = 0; 499 cb->count = t->commit_buf_size / 500 t->commit_buf_elem_size; 501 } 502 503 /* COMMIT_IO_CMDS is per-queue command, so use its own commit buffer */ 504 static void ublk_batch_init_commit(struct ublk_thread *t, 505 struct batch_commit_buf *cb) 506 { 507 unsigned short buf_idx = ublk_alloc_commit_buf(t); 508 509 ublk_assert(buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX); 510 ublk_assert(!ublk_batch_commit_prepared(cb)); 511 512 __ublk_batch_init_commit(t, cb, buf_idx); 513 } 514 515 void ublk_batch_prep_commit(struct ublk_thread *t) 516 { 517 int i; 518 519 for (i = 0; i < t->nr_queues; i++) 520 t->commit[i].buf_idx = UBLKS_T_COMMIT_BUF_INV_IDX; 521 } 522 523 void ublk_batch_complete_io(struct ublk_thread *t, struct ublk_queue *q, 524 unsigned tag, int res) 525 { 526 unsigned q_t_idx = ublk_queue_idx_in_thread(t, q); 527 struct batch_commit_buf *cb = &t->commit[q_t_idx]; 528 struct ublk_batch_elem *elem; 529 struct ublk_io *io = &q->ios[tag]; 530 531 if (!ublk_batch_commit_prepared(cb)) 532 ublk_batch_init_commit(t, cb); 533 534 ublk_assert(q->q_id == cb->q_id); 535 536 elem = (struct ublk_batch_elem *)(cb->elem + cb->done * t->commit_buf_elem_size); 537 elem->tag = tag; 538 elem->buf_index = ublk_batch_io_buf_idx(t, q, tag); 539 elem->result = res; 540 541 if (!ublk_queue_no_buf(q)) 542 elem->buf_addr = (__u64) (uintptr_t) io->buf_addr; 543 544 cb->done += 1; 545 ublk_assert(cb->done <= cb->count); 546 } 547 548 void ublk_batch_setup_map(unsigned char (*q_thread_map)[UBLK_MAX_QUEUES], 549 int nthreads, int queues) 550 { 551 int i, j; 552 553 /* 554 * Setup round-robin queue-to-thread mapping for arbitrary N:M combinations. 555 * 556 * This algorithm distributes queues across threads (and threads across queues) 557 * in a balanced round-robin fashion to ensure even load distribution. 558 * 559 * Examples: 560 * - 2 threads, 4 queues: T0=[Q0,Q2], T1=[Q1,Q3] 561 * - 4 threads, 2 queues: T0=[Q0], T1=[Q1], T2=[Q0], T3=[Q1] 562 * - 3 threads, 3 queues: T0=[Q0], T1=[Q1], T2=[Q2] (1:1 mapping) 563 * 564 * Phase 1: Mark which queues each thread handles (boolean mapping) 565 */ 566 for (i = 0, j = 0; i < queues || j < nthreads; i++, j++) { 567 q_thread_map[j % nthreads][i % queues] = 1; 568 } 569 570 /* 571 * Phase 2: Convert boolean mapping to sequential indices within each thread. 572 * 573 * Transform from: q_thread_map[thread][queue] = 1 (handles queue) 574 * To: q_thread_map[thread][queue] = N (queue index within thread) 575 * 576 * This allows each thread to know the local index of each queue it handles, 577 * which is essential for buffer allocation and management. For example: 578 * - Thread 0 handling queues [0,2] becomes: q_thread_map[0][0]=1, q_thread_map[0][2]=2 579 * - Thread 1 handling queues [1,3] becomes: q_thread_map[1][1]=1, q_thread_map[1][3]=2 580 */ 581 for (j = 0; j < nthreads; j++) { 582 unsigned char seq = 1; 583 584 for (i = 0; i < queues; i++) { 585 if (q_thread_map[j][i]) 586 q_thread_map[j][i] = seq++; 587 } 588 } 589 590 #if 0 591 for (j = 0; j < nthreads; j++) { 592 printf("thread %0d: ", j); 593 for (i = 0; i < queues; i++) { 594 if (q_thread_map[j][i]) 595 printf("%03u ", i); 596 } 597 printf("\n"); 598 } 599 printf("\n"); 600 for (j = 0; j < nthreads; j++) { 601 for (i = 0; i < queues; i++) { 602 printf("%03u ", q_thread_map[j][i]); 603 } 604 printf("\n"); 605 } 606 #endif 607 } 608