1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * FUSE: Filesystem in Userspace 4 * Copyright (c) 2023-2024 DataDirect Networks. 5 */ 6 7 #include "fuse_i.h" 8 #include "dev_uring_i.h" 9 #include "fuse_dev_i.h" 10 11 #include <linux/fs.h> 12 #include <linux/io_uring/cmd.h> 13 14 static bool __read_mostly enable_uring; 15 module_param(enable_uring, bool, 0644); 16 MODULE_PARM_DESC(enable_uring, 17 "Enable userspace communication through io-uring"); 18 19 #define FUSE_URING_IOV_SEGS 2 /* header and payload */ 20 21 22 bool fuse_uring_enabled(void) 23 { 24 return enable_uring; 25 } 26 27 struct fuse_uring_pdu { 28 struct fuse_ring_ent *ent; 29 }; 30 31 static const struct fuse_iqueue_ops fuse_io_uring_ops; 32 33 static void uring_cmd_set_ring_ent(struct io_uring_cmd *cmd, 34 struct fuse_ring_ent *ring_ent) 35 { 36 struct fuse_uring_pdu *pdu = 37 io_uring_cmd_to_pdu(cmd, struct fuse_uring_pdu); 38 39 pdu->ent = ring_ent; 40 } 41 42 static struct fuse_ring_ent *uring_cmd_to_ring_ent(struct io_uring_cmd *cmd) 43 { 44 struct fuse_uring_pdu *pdu = 45 io_uring_cmd_to_pdu(cmd, struct fuse_uring_pdu); 46 47 return pdu->ent; 48 } 49 50 static void fuse_uring_flush_bg(struct fuse_ring_queue *queue) 51 { 52 struct fuse_ring *ring = queue->ring; 53 struct fuse_conn *fc = ring->fc; 54 55 lockdep_assert_held(&queue->lock); 56 lockdep_assert_held(&fc->bg_lock); 57 58 /* 59 * Allow one bg request per queue, ignoring global fc limits. 60 * This prevents a single queue from consuming all resources and 61 * eliminates the need for remote queue wake-ups when global 62 * limits are met but this queue has no more waiting requests. 63 */ 64 while ((fc->active_background < fc->max_background || 65 !queue->active_background) && 66 (!list_empty(&queue->fuse_req_bg_queue))) { 67 struct fuse_req *req; 68 69 req = list_first_entry(&queue->fuse_req_bg_queue, 70 struct fuse_req, list); 71 fc->active_background++; 72 queue->active_background++; 73 74 list_move_tail(&req->list, &queue->fuse_req_queue); 75 } 76 } 77 78 static void fuse_uring_req_end(struct fuse_ring_ent *ent, struct fuse_req *req, 79 int error) 80 { 81 struct fuse_ring_queue *queue = ent->queue; 82 struct fuse_ring *ring = queue->ring; 83 struct fuse_conn *fc = ring->fc; 84 85 lockdep_assert_not_held(&queue->lock); 86 spin_lock(&queue->lock); 87 ent->fuse_req = NULL; 88 if (test_bit(FR_BACKGROUND, &req->flags)) { 89 queue->active_background--; 90 spin_lock(&fc->bg_lock); 91 fuse_uring_flush_bg(queue); 92 spin_unlock(&fc->bg_lock); 93 } 94 95 spin_unlock(&queue->lock); 96 97 if (error) 98 req->out.h.error = error; 99 100 clear_bit(FR_SENT, &req->flags); 101 fuse_request_end(req); 102 } 103 104 /* Abort all list queued request on the given ring queue */ 105 static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue) 106 { 107 struct fuse_req *req; 108 LIST_HEAD(req_list); 109 110 spin_lock(&queue->lock); 111 list_for_each_entry(req, &queue->fuse_req_queue, list) 112 clear_bit(FR_PENDING, &req->flags); 113 list_splice_init(&queue->fuse_req_queue, &req_list); 114 spin_unlock(&queue->lock); 115 116 /* must not hold queue lock to avoid order issues with fi->lock */ 117 fuse_dev_end_requests(&req_list); 118 } 119 120 void fuse_uring_abort_end_requests(struct fuse_ring *ring) 121 { 122 int qid; 123 struct fuse_ring_queue *queue; 124 struct fuse_conn *fc = ring->fc; 125 126 for (qid = 0; qid < ring->nr_queues; qid++) { 127 queue = READ_ONCE(ring->queues[qid]); 128 if (!queue) 129 continue; 130 131 queue->stopped = true; 132 133 WARN_ON_ONCE(ring->fc->max_background != UINT_MAX); 134 spin_lock(&queue->lock); 135 spin_lock(&fc->bg_lock); 136 fuse_uring_flush_bg(queue); 137 spin_unlock(&fc->bg_lock); 138 spin_unlock(&queue->lock); 139 fuse_uring_abort_end_queue_requests(queue); 140 } 141 } 142 143 void fuse_uring_destruct(struct fuse_conn *fc) 144 { 145 struct fuse_ring *ring = fc->ring; 146 int qid; 147 148 if (!ring) 149 return; 150 151 for (qid = 0; qid < ring->nr_queues; qid++) { 152 struct fuse_ring_queue *queue = ring->queues[qid]; 153 struct fuse_ring_ent *ent, *next; 154 155 if (!queue) 156 continue; 157 158 WARN_ON(!list_empty(&queue->ent_avail_queue)); 159 WARN_ON(!list_empty(&queue->ent_w_req_queue)); 160 WARN_ON(!list_empty(&queue->ent_commit_queue)); 161 WARN_ON(!list_empty(&queue->ent_in_userspace)); 162 163 list_for_each_entry_safe(ent, next, &queue->ent_released, 164 list) { 165 list_del_init(&ent->list); 166 kfree(ent); 167 } 168 169 kfree(queue->fpq.processing); 170 kfree(queue); 171 ring->queues[qid] = NULL; 172 } 173 174 kfree(ring->queues); 175 kfree(ring); 176 fc->ring = NULL; 177 } 178 179 /* 180 * Basic ring setup for this connection based on the provided configuration 181 */ 182 static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc) 183 { 184 struct fuse_ring *ring; 185 size_t nr_queues = num_possible_cpus(); 186 struct fuse_ring *res = NULL; 187 size_t max_payload_size; 188 189 ring = kzalloc(sizeof(*fc->ring), GFP_KERNEL_ACCOUNT); 190 if (!ring) 191 return NULL; 192 193 ring->queues = kcalloc(nr_queues, sizeof(struct fuse_ring_queue *), 194 GFP_KERNEL_ACCOUNT); 195 if (!ring->queues) 196 goto out_err; 197 198 max_payload_size = max(FUSE_MIN_READ_BUFFER, fc->max_write); 199 max_payload_size = max(max_payload_size, fc->max_pages * PAGE_SIZE); 200 201 spin_lock(&fc->lock); 202 if (fc->ring) { 203 /* race, another thread created the ring in the meantime */ 204 spin_unlock(&fc->lock); 205 res = fc->ring; 206 goto out_err; 207 } 208 209 init_waitqueue_head(&ring->stop_waitq); 210 211 fc->ring = ring; 212 ring->nr_queues = nr_queues; 213 ring->fc = fc; 214 ring->max_payload_sz = max_payload_size; 215 atomic_set(&ring->queue_refs, 0); 216 217 spin_unlock(&fc->lock); 218 return ring; 219 220 out_err: 221 kfree(ring->queues); 222 kfree(ring); 223 return res; 224 } 225 226 static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, 227 int qid) 228 { 229 struct fuse_conn *fc = ring->fc; 230 struct fuse_ring_queue *queue; 231 struct list_head *pq; 232 233 queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); 234 if (!queue) 235 return NULL; 236 pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL); 237 if (!pq) { 238 kfree(queue); 239 return NULL; 240 } 241 242 queue->qid = qid; 243 queue->ring = ring; 244 spin_lock_init(&queue->lock); 245 246 INIT_LIST_HEAD(&queue->ent_avail_queue); 247 INIT_LIST_HEAD(&queue->ent_commit_queue); 248 INIT_LIST_HEAD(&queue->ent_w_req_queue); 249 INIT_LIST_HEAD(&queue->ent_in_userspace); 250 INIT_LIST_HEAD(&queue->fuse_req_queue); 251 INIT_LIST_HEAD(&queue->fuse_req_bg_queue); 252 INIT_LIST_HEAD(&queue->ent_released); 253 254 queue->fpq.processing = pq; 255 fuse_pqueue_init(&queue->fpq); 256 257 spin_lock(&fc->lock); 258 if (ring->queues[qid]) { 259 spin_unlock(&fc->lock); 260 kfree(queue->fpq.processing); 261 kfree(queue); 262 return ring->queues[qid]; 263 } 264 265 /* 266 * write_once and lock as the caller mostly doesn't take the lock at all 267 */ 268 WRITE_ONCE(ring->queues[qid], queue); 269 spin_unlock(&fc->lock); 270 271 return queue; 272 } 273 274 static void fuse_uring_stop_fuse_req_end(struct fuse_req *req) 275 { 276 clear_bit(FR_SENT, &req->flags); 277 req->out.h.error = -ECONNABORTED; 278 fuse_request_end(req); 279 } 280 281 /* 282 * Release a request/entry on connection tear down 283 */ 284 static void fuse_uring_entry_teardown(struct fuse_ring_ent *ent) 285 { 286 struct fuse_req *req; 287 struct io_uring_cmd *cmd; 288 289 struct fuse_ring_queue *queue = ent->queue; 290 291 spin_lock(&queue->lock); 292 cmd = ent->cmd; 293 ent->cmd = NULL; 294 req = ent->fuse_req; 295 ent->fuse_req = NULL; 296 if (req) { 297 /* remove entry from queue->fpq->processing */ 298 list_del_init(&req->list); 299 } 300 301 /* 302 * The entry must not be freed immediately, due to access of direct 303 * pointer access of entries through IO_URING_F_CANCEL - there is a risk 304 * of race between daemon termination (which triggers IO_URING_F_CANCEL 305 * and accesses entries without checking the list state first 306 */ 307 list_move(&ent->list, &queue->ent_released); 308 ent->state = FRRS_RELEASED; 309 spin_unlock(&queue->lock); 310 311 if (cmd) 312 io_uring_cmd_done(cmd, -ENOTCONN, 0, IO_URING_F_UNLOCKED); 313 314 if (req) 315 fuse_uring_stop_fuse_req_end(req); 316 } 317 318 static void fuse_uring_stop_list_entries(struct list_head *head, 319 struct fuse_ring_queue *queue, 320 enum fuse_ring_req_state exp_state) 321 { 322 struct fuse_ring *ring = queue->ring; 323 struct fuse_ring_ent *ent, *next; 324 ssize_t queue_refs = SSIZE_MAX; 325 LIST_HEAD(to_teardown); 326 327 spin_lock(&queue->lock); 328 list_for_each_entry_safe(ent, next, head, list) { 329 if (ent->state != exp_state) { 330 pr_warn("entry teardown qid=%d state=%d expected=%d", 331 queue->qid, ent->state, exp_state); 332 continue; 333 } 334 335 ent->state = FRRS_TEARDOWN; 336 list_move(&ent->list, &to_teardown); 337 } 338 spin_unlock(&queue->lock); 339 340 /* no queue lock to avoid lock order issues */ 341 list_for_each_entry_safe(ent, next, &to_teardown, list) { 342 fuse_uring_entry_teardown(ent); 343 queue_refs = atomic_dec_return(&ring->queue_refs); 344 WARN_ON_ONCE(queue_refs < 0); 345 } 346 } 347 348 static void fuse_uring_teardown_entries(struct fuse_ring_queue *queue) 349 { 350 fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue, 351 FRRS_USERSPACE); 352 fuse_uring_stop_list_entries(&queue->ent_avail_queue, queue, 353 FRRS_AVAILABLE); 354 } 355 356 /* 357 * Log state debug info 358 */ 359 static void fuse_uring_log_ent_state(struct fuse_ring *ring) 360 { 361 int qid; 362 struct fuse_ring_ent *ent; 363 364 for (qid = 0; qid < ring->nr_queues; qid++) { 365 struct fuse_ring_queue *queue = ring->queues[qid]; 366 367 if (!queue) 368 continue; 369 370 spin_lock(&queue->lock); 371 /* 372 * Log entries from the intermediate queue, the other queues 373 * should be empty 374 */ 375 list_for_each_entry(ent, &queue->ent_w_req_queue, list) { 376 pr_info(" ent-req-queue ring=%p qid=%d ent=%p state=%d\n", 377 ring, qid, ent, ent->state); 378 } 379 list_for_each_entry(ent, &queue->ent_commit_queue, list) { 380 pr_info(" ent-commit-queue ring=%p qid=%d ent=%p state=%d\n", 381 ring, qid, ent, ent->state); 382 } 383 spin_unlock(&queue->lock); 384 } 385 ring->stop_debug_log = 1; 386 } 387 388 static void fuse_uring_async_stop_queues(struct work_struct *work) 389 { 390 int qid; 391 struct fuse_ring *ring = 392 container_of(work, struct fuse_ring, async_teardown_work.work); 393 394 /* XXX code dup */ 395 for (qid = 0; qid < ring->nr_queues; qid++) { 396 struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]); 397 398 if (!queue) 399 continue; 400 401 fuse_uring_teardown_entries(queue); 402 } 403 404 /* 405 * Some ring entries might be in the middle of IO operations, 406 * i.e. in process to get handled by file_operations::uring_cmd 407 * or on the way to userspace - we could handle that with conditions in 408 * run time code, but easier/cleaner to have an async tear down handler 409 * If there are still queue references left 410 */ 411 if (atomic_read(&ring->queue_refs) > 0) { 412 if (time_after(jiffies, 413 ring->teardown_time + FUSE_URING_TEARDOWN_TIMEOUT)) 414 fuse_uring_log_ent_state(ring); 415 416 schedule_delayed_work(&ring->async_teardown_work, 417 FUSE_URING_TEARDOWN_INTERVAL); 418 } else { 419 wake_up_all(&ring->stop_waitq); 420 } 421 } 422 423 /* 424 * Stop the ring queues 425 */ 426 void fuse_uring_stop_queues(struct fuse_ring *ring) 427 { 428 int qid; 429 430 for (qid = 0; qid < ring->nr_queues; qid++) { 431 struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]); 432 433 if (!queue) 434 continue; 435 436 fuse_uring_teardown_entries(queue); 437 } 438 439 if (atomic_read(&ring->queue_refs) > 0) { 440 ring->teardown_time = jiffies; 441 INIT_DELAYED_WORK(&ring->async_teardown_work, 442 fuse_uring_async_stop_queues); 443 schedule_delayed_work(&ring->async_teardown_work, 444 FUSE_URING_TEARDOWN_INTERVAL); 445 } else { 446 wake_up_all(&ring->stop_waitq); 447 } 448 } 449 450 /* 451 * Handle IO_URING_F_CANCEL, typically should come on daemon termination. 452 * 453 * Releasing the last entry should trigger fuse_dev_release() if 454 * the daemon was terminated 455 */ 456 static void fuse_uring_cancel(struct io_uring_cmd *cmd, 457 unsigned int issue_flags) 458 { 459 struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd); 460 struct fuse_ring_queue *queue; 461 bool need_cmd_done = false; 462 463 /* 464 * direct access on ent - it must not be destructed as long as 465 * IO_URING_F_CANCEL might come up 466 */ 467 queue = ent->queue; 468 spin_lock(&queue->lock); 469 if (ent->state == FRRS_AVAILABLE) { 470 ent->state = FRRS_USERSPACE; 471 list_move(&ent->list, &queue->ent_in_userspace); 472 need_cmd_done = true; 473 ent->cmd = NULL; 474 } 475 spin_unlock(&queue->lock); 476 477 if (need_cmd_done) { 478 /* no queue lock to avoid lock order issues */ 479 io_uring_cmd_done(cmd, -ENOTCONN, 0, issue_flags); 480 } 481 } 482 483 static void fuse_uring_prepare_cancel(struct io_uring_cmd *cmd, int issue_flags, 484 struct fuse_ring_ent *ring_ent) 485 { 486 uring_cmd_set_ring_ent(cmd, ring_ent); 487 io_uring_cmd_mark_cancelable(cmd, issue_flags); 488 } 489 490 /* 491 * Checks for errors and stores it into the request 492 */ 493 static int fuse_uring_out_header_has_err(struct fuse_out_header *oh, 494 struct fuse_req *req, 495 struct fuse_conn *fc) 496 { 497 int err; 498 499 err = -EINVAL; 500 if (oh->unique == 0) { 501 /* Not supported through io-uring yet */ 502 pr_warn_once("notify through fuse-io-uring not supported\n"); 503 goto err; 504 } 505 506 if (oh->error <= -ERESTARTSYS || oh->error > 0) 507 goto err; 508 509 if (oh->error) { 510 err = oh->error; 511 goto err; 512 } 513 514 err = -ENOENT; 515 if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) { 516 pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n", 517 req->in.h.unique, 518 oh->unique & ~FUSE_INT_REQ_BIT); 519 goto err; 520 } 521 522 /* 523 * Is it an interrupt reply ID? 524 * XXX: Not supported through fuse-io-uring yet, it should not even 525 * find the request - should not happen. 526 */ 527 WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT); 528 529 err = 0; 530 err: 531 return err; 532 } 533 534 static int fuse_uring_copy_from_ring(struct fuse_ring *ring, 535 struct fuse_req *req, 536 struct fuse_ring_ent *ent) 537 { 538 struct fuse_copy_state cs; 539 struct fuse_args *args = req->args; 540 struct iov_iter iter; 541 int err; 542 struct fuse_uring_ent_in_out ring_in_out; 543 544 err = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out, 545 sizeof(ring_in_out)); 546 if (err) 547 return -EFAULT; 548 549 err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz, 550 &iter); 551 if (err) 552 return err; 553 554 fuse_copy_init(&cs, 0, &iter); 555 cs.is_uring = 1; 556 cs.req = req; 557 558 return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz); 559 } 560 561 /* 562 * Copy data from the req to the ring buffer 563 */ 564 static int fuse_uring_args_to_ring(struct fuse_ring *ring, struct fuse_req *req, 565 struct fuse_ring_ent *ent) 566 { 567 struct fuse_copy_state cs; 568 struct fuse_args *args = req->args; 569 struct fuse_in_arg *in_args = args->in_args; 570 int num_args = args->in_numargs; 571 int err; 572 struct iov_iter iter; 573 struct fuse_uring_ent_in_out ent_in_out = { 574 .flags = 0, 575 .commit_id = req->in.h.unique, 576 }; 577 578 err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter); 579 if (err) { 580 pr_info_ratelimited("fuse: Import of user buffer failed\n"); 581 return err; 582 } 583 584 fuse_copy_init(&cs, 1, &iter); 585 cs.is_uring = 1; 586 cs.req = req; 587 588 if (num_args > 0) { 589 /* 590 * Expectation is that the first argument is the per op header. 591 * Some op code have that as zero size. 592 */ 593 if (args->in_args[0].size > 0) { 594 err = copy_to_user(&ent->headers->op_in, in_args->value, 595 in_args->size); 596 if (err) { 597 pr_info_ratelimited( 598 "Copying the header failed.\n"); 599 return -EFAULT; 600 } 601 } 602 in_args++; 603 num_args--; 604 } 605 606 /* copy the payload */ 607 err = fuse_copy_args(&cs, num_args, args->in_pages, 608 (struct fuse_arg *)in_args, 0); 609 if (err) { 610 pr_info_ratelimited("%s fuse_copy_args failed\n", __func__); 611 return err; 612 } 613 614 ent_in_out.payload_sz = cs.ring.copied_sz; 615 err = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out, 616 sizeof(ent_in_out)); 617 return err ? -EFAULT : 0; 618 } 619 620 static int fuse_uring_copy_to_ring(struct fuse_ring_ent *ent, 621 struct fuse_req *req) 622 { 623 struct fuse_ring_queue *queue = ent->queue; 624 struct fuse_ring *ring = queue->ring; 625 int err; 626 627 err = -EIO; 628 if (WARN_ON(ent->state != FRRS_FUSE_REQ)) { 629 pr_err("qid=%d ring-req=%p invalid state %d on send\n", 630 queue->qid, ent, ent->state); 631 return err; 632 } 633 634 err = -EINVAL; 635 if (WARN_ON(req->in.h.unique == 0)) 636 return err; 637 638 /* copy the request */ 639 err = fuse_uring_args_to_ring(ring, req, ent); 640 if (unlikely(err)) { 641 pr_info_ratelimited("Copy to ring failed: %d\n", err); 642 return err; 643 } 644 645 /* copy fuse_in_header */ 646 err = copy_to_user(&ent->headers->in_out, &req->in.h, 647 sizeof(req->in.h)); 648 if (err) { 649 err = -EFAULT; 650 return err; 651 } 652 653 return 0; 654 } 655 656 static int fuse_uring_prepare_send(struct fuse_ring_ent *ent, 657 struct fuse_req *req) 658 { 659 int err; 660 661 err = fuse_uring_copy_to_ring(ent, req); 662 if (!err) 663 set_bit(FR_SENT, &req->flags); 664 else 665 fuse_uring_req_end(ent, req, err); 666 667 return err; 668 } 669 670 /* 671 * Write data to the ring buffer and send the request to userspace, 672 * userspace will read it 673 * This is comparable with classical read(/dev/fuse) 674 */ 675 static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ent, 676 struct fuse_req *req, 677 unsigned int issue_flags) 678 { 679 struct fuse_ring_queue *queue = ent->queue; 680 int err; 681 struct io_uring_cmd *cmd; 682 683 err = fuse_uring_prepare_send(ent, req); 684 if (err) 685 return err; 686 687 spin_lock(&queue->lock); 688 cmd = ent->cmd; 689 ent->cmd = NULL; 690 ent->state = FRRS_USERSPACE; 691 list_move(&ent->list, &queue->ent_in_userspace); 692 spin_unlock(&queue->lock); 693 694 io_uring_cmd_done(cmd, 0, 0, issue_flags); 695 return 0; 696 } 697 698 /* 699 * Make a ring entry available for fuse_req assignment 700 */ 701 static void fuse_uring_ent_avail(struct fuse_ring_ent *ent, 702 struct fuse_ring_queue *queue) 703 { 704 WARN_ON_ONCE(!ent->cmd); 705 list_move(&ent->list, &queue->ent_avail_queue); 706 ent->state = FRRS_AVAILABLE; 707 } 708 709 /* Used to find the request on SQE commit */ 710 static void fuse_uring_add_to_pq(struct fuse_ring_ent *ent, 711 struct fuse_req *req) 712 { 713 struct fuse_ring_queue *queue = ent->queue; 714 struct fuse_pqueue *fpq = &queue->fpq; 715 unsigned int hash; 716 717 req->ring_entry = ent; 718 hash = fuse_req_hash(req->in.h.unique); 719 list_move_tail(&req->list, &fpq->processing[hash]); 720 } 721 722 /* 723 * Assign a fuse queue entry to the given entry 724 */ 725 static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ent, 726 struct fuse_req *req) 727 { 728 struct fuse_ring_queue *queue = ent->queue; 729 struct fuse_conn *fc = req->fm->fc; 730 struct fuse_iqueue *fiq = &fc->iq; 731 732 lockdep_assert_held(&queue->lock); 733 734 if (WARN_ON_ONCE(ent->state != FRRS_AVAILABLE && 735 ent->state != FRRS_COMMIT)) { 736 pr_warn("%s qid=%d state=%d\n", __func__, ent->queue->qid, 737 ent->state); 738 } 739 740 spin_lock(&fiq->lock); 741 clear_bit(FR_PENDING, &req->flags); 742 spin_unlock(&fiq->lock); 743 ent->fuse_req = req; 744 ent->state = FRRS_FUSE_REQ; 745 list_move(&ent->list, &queue->ent_w_req_queue); 746 fuse_uring_add_to_pq(ent, req); 747 } 748 749 /* Fetch the next fuse request if available */ 750 static struct fuse_req *fuse_uring_ent_assign_req(struct fuse_ring_ent *ent) 751 __must_hold(&queue->lock) 752 { 753 struct fuse_req *req; 754 struct fuse_ring_queue *queue = ent->queue; 755 struct list_head *req_queue = &queue->fuse_req_queue; 756 757 lockdep_assert_held(&queue->lock); 758 759 /* get and assign the next entry while it is still holding the lock */ 760 req = list_first_entry_or_null(req_queue, struct fuse_req, list); 761 if (req) 762 fuse_uring_add_req_to_ring_ent(ent, req); 763 764 return req; 765 } 766 767 /* 768 * Read data from the ring buffer, which user space has written to 769 * This is comparible with handling of classical write(/dev/fuse). 770 * Also make the ring request available again for new fuse requests. 771 */ 772 static void fuse_uring_commit(struct fuse_ring_ent *ent, struct fuse_req *req, 773 unsigned int issue_flags) 774 { 775 struct fuse_ring *ring = ent->queue->ring; 776 struct fuse_conn *fc = ring->fc; 777 ssize_t err = 0; 778 779 err = copy_from_user(&req->out.h, &ent->headers->in_out, 780 sizeof(req->out.h)); 781 if (err) { 782 req->out.h.error = -EFAULT; 783 goto out; 784 } 785 786 err = fuse_uring_out_header_has_err(&req->out.h, req, fc); 787 if (err) { 788 /* req->out.h.error already set */ 789 goto out; 790 } 791 792 err = fuse_uring_copy_from_ring(ring, req, ent); 793 out: 794 fuse_uring_req_end(ent, req, err); 795 } 796 797 /* 798 * Get the next fuse req and send it 799 */ 800 static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ent, 801 struct fuse_ring_queue *queue, 802 unsigned int issue_flags) 803 { 804 int err; 805 struct fuse_req *req; 806 807 retry: 808 spin_lock(&queue->lock); 809 fuse_uring_ent_avail(ent, queue); 810 req = fuse_uring_ent_assign_req(ent); 811 spin_unlock(&queue->lock); 812 813 if (req) { 814 err = fuse_uring_send_next_to_ring(ent, req, issue_flags); 815 if (err) 816 goto retry; 817 } 818 } 819 820 static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent) 821 { 822 struct fuse_ring_queue *queue = ent->queue; 823 824 lockdep_assert_held(&queue->lock); 825 826 if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE)) 827 return -EIO; 828 829 ent->state = FRRS_COMMIT; 830 list_move(&ent->list, &queue->ent_commit_queue); 831 832 return 0; 833 } 834 835 /* FUSE_URING_CMD_COMMIT_AND_FETCH handler */ 836 static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags, 837 struct fuse_conn *fc) 838 { 839 const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); 840 struct fuse_ring_ent *ent; 841 int err; 842 struct fuse_ring *ring = fc->ring; 843 struct fuse_ring_queue *queue; 844 uint64_t commit_id = READ_ONCE(cmd_req->commit_id); 845 unsigned int qid = READ_ONCE(cmd_req->qid); 846 struct fuse_pqueue *fpq; 847 struct fuse_req *req; 848 849 err = -ENOTCONN; 850 if (!ring) 851 return err; 852 853 if (qid >= ring->nr_queues) 854 return -EINVAL; 855 856 queue = ring->queues[qid]; 857 if (!queue) 858 return err; 859 fpq = &queue->fpq; 860 861 if (!READ_ONCE(fc->connected) || READ_ONCE(queue->stopped)) 862 return err; 863 864 spin_lock(&queue->lock); 865 /* Find a request based on the unique ID of the fuse request 866 * This should get revised, as it needs a hash calculation and list 867 * search. And full struct fuse_pqueue is needed (memory overhead). 868 * As well as the link from req to ring_ent. 869 */ 870 req = fuse_request_find(fpq, commit_id); 871 err = -ENOENT; 872 if (!req) { 873 pr_info("qid=%d commit_id %llu not found\n", queue->qid, 874 commit_id); 875 spin_unlock(&queue->lock); 876 return err; 877 } 878 list_del_init(&req->list); 879 ent = req->ring_entry; 880 req->ring_entry = NULL; 881 882 err = fuse_ring_ent_set_commit(ent); 883 if (err != 0) { 884 pr_info_ratelimited("qid=%d commit_id %llu state %d", 885 queue->qid, commit_id, ent->state); 886 spin_unlock(&queue->lock); 887 req->out.h.error = err; 888 clear_bit(FR_SENT, &req->flags); 889 fuse_request_end(req); 890 return err; 891 } 892 893 ent->cmd = cmd; 894 spin_unlock(&queue->lock); 895 896 /* without the queue lock, as other locks are taken */ 897 fuse_uring_prepare_cancel(cmd, issue_flags, ent); 898 fuse_uring_commit(ent, req, issue_flags); 899 900 /* 901 * Fetching the next request is absolutely required as queued 902 * fuse requests would otherwise not get processed - committing 903 * and fetching is done in one step vs legacy fuse, which has separated 904 * read (fetch request) and write (commit result). 905 */ 906 fuse_uring_next_fuse_req(ent, queue, issue_flags); 907 return 0; 908 } 909 910 static bool is_ring_ready(struct fuse_ring *ring, int current_qid) 911 { 912 int qid; 913 struct fuse_ring_queue *queue; 914 bool ready = true; 915 916 for (qid = 0; qid < ring->nr_queues && ready; qid++) { 917 if (current_qid == qid) 918 continue; 919 920 queue = ring->queues[qid]; 921 if (!queue) { 922 ready = false; 923 break; 924 } 925 926 spin_lock(&queue->lock); 927 if (list_empty(&queue->ent_avail_queue)) 928 ready = false; 929 spin_unlock(&queue->lock); 930 } 931 932 return ready; 933 } 934 935 /* 936 * fuse_uring_req_fetch command handling 937 */ 938 static void fuse_uring_do_register(struct fuse_ring_ent *ent, 939 struct io_uring_cmd *cmd, 940 unsigned int issue_flags) 941 { 942 struct fuse_ring_queue *queue = ent->queue; 943 struct fuse_ring *ring = queue->ring; 944 struct fuse_conn *fc = ring->fc; 945 struct fuse_iqueue *fiq = &fc->iq; 946 947 fuse_uring_prepare_cancel(cmd, issue_flags, ent); 948 949 spin_lock(&queue->lock); 950 ent->cmd = cmd; 951 fuse_uring_ent_avail(ent, queue); 952 spin_unlock(&queue->lock); 953 954 if (!ring->ready) { 955 bool ready = is_ring_ready(ring, queue->qid); 956 957 if (ready) { 958 WRITE_ONCE(fiq->ops, &fuse_io_uring_ops); 959 WRITE_ONCE(ring->ready, true); 960 wake_up_all(&fc->blocked_waitq); 961 } 962 } 963 } 964 965 /* 966 * sqe->addr is a ptr to an iovec array, iov[0] has the headers, iov[1] 967 * the payload 968 */ 969 static int fuse_uring_get_iovec_from_sqe(const struct io_uring_sqe *sqe, 970 struct iovec iov[FUSE_URING_IOV_SEGS]) 971 { 972 struct iovec __user *uiov = u64_to_user_ptr(READ_ONCE(sqe->addr)); 973 struct iov_iter iter; 974 ssize_t ret; 975 976 if (sqe->len != FUSE_URING_IOV_SEGS) 977 return -EINVAL; 978 979 /* 980 * Direction for buffer access will actually be READ and WRITE, 981 * using write for the import should include READ access as well. 982 */ 983 ret = import_iovec(WRITE, uiov, FUSE_URING_IOV_SEGS, 984 FUSE_URING_IOV_SEGS, &iov, &iter); 985 if (ret < 0) 986 return ret; 987 988 return 0; 989 } 990 991 static struct fuse_ring_ent * 992 fuse_uring_create_ring_ent(struct io_uring_cmd *cmd, 993 struct fuse_ring_queue *queue) 994 { 995 struct fuse_ring *ring = queue->ring; 996 struct fuse_ring_ent *ent; 997 size_t payload_size; 998 struct iovec iov[FUSE_URING_IOV_SEGS]; 999 int err; 1000 1001 err = fuse_uring_get_iovec_from_sqe(cmd->sqe, iov); 1002 if (err) { 1003 pr_info_ratelimited("Failed to get iovec from sqe, err=%d\n", 1004 err); 1005 return ERR_PTR(err); 1006 } 1007 1008 err = -EINVAL; 1009 if (iov[0].iov_len < sizeof(struct fuse_uring_req_header)) { 1010 pr_info_ratelimited("Invalid header len %zu\n", iov[0].iov_len); 1011 return ERR_PTR(err); 1012 } 1013 1014 payload_size = iov[1].iov_len; 1015 if (payload_size < ring->max_payload_sz) { 1016 pr_info_ratelimited("Invalid req payload len %zu\n", 1017 payload_size); 1018 return ERR_PTR(err); 1019 } 1020 1021 err = -ENOMEM; 1022 ent = kzalloc(sizeof(*ent), GFP_KERNEL_ACCOUNT); 1023 if (!ent) 1024 return ERR_PTR(err); 1025 1026 INIT_LIST_HEAD(&ent->list); 1027 1028 ent->queue = queue; 1029 ent->headers = iov[0].iov_base; 1030 ent->payload = iov[1].iov_base; 1031 1032 atomic_inc(&ring->queue_refs); 1033 return ent; 1034 } 1035 1036 /* 1037 * Register header and payload buffer with the kernel and puts the 1038 * entry as "ready to get fuse requests" on the queue 1039 */ 1040 static int fuse_uring_register(struct io_uring_cmd *cmd, 1041 unsigned int issue_flags, struct fuse_conn *fc) 1042 { 1043 const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); 1044 struct fuse_ring *ring = fc->ring; 1045 struct fuse_ring_queue *queue; 1046 struct fuse_ring_ent *ent; 1047 int err; 1048 unsigned int qid = READ_ONCE(cmd_req->qid); 1049 1050 err = -ENOMEM; 1051 if (!ring) { 1052 ring = fuse_uring_create(fc); 1053 if (!ring) 1054 return err; 1055 } 1056 1057 if (qid >= ring->nr_queues) { 1058 pr_info_ratelimited("fuse: Invalid ring qid %u\n", qid); 1059 return -EINVAL; 1060 } 1061 1062 queue = ring->queues[qid]; 1063 if (!queue) { 1064 queue = fuse_uring_create_queue(ring, qid); 1065 if (!queue) 1066 return err; 1067 } 1068 1069 /* 1070 * The created queue above does not need to be destructed in 1071 * case of entry errors below, will be done at ring destruction time. 1072 */ 1073 1074 ent = fuse_uring_create_ring_ent(cmd, queue); 1075 if (IS_ERR(ent)) 1076 return PTR_ERR(ent); 1077 1078 fuse_uring_do_register(ent, cmd, issue_flags); 1079 1080 return 0; 1081 } 1082 1083 /* 1084 * Entry function from io_uring to handle the given passthrough command 1085 * (op code IORING_OP_URING_CMD) 1086 */ 1087 int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags) 1088 { 1089 struct fuse_dev *fud; 1090 struct fuse_conn *fc; 1091 u32 cmd_op = cmd->cmd_op; 1092 int err; 1093 1094 if ((unlikely(issue_flags & IO_URING_F_CANCEL))) { 1095 fuse_uring_cancel(cmd, issue_flags); 1096 return 0; 1097 } 1098 1099 /* This extra SQE size holds struct fuse_uring_cmd_req */ 1100 if (!(issue_flags & IO_URING_F_SQE128)) 1101 return -EINVAL; 1102 1103 fud = fuse_get_dev(cmd->file); 1104 if (!fud) { 1105 pr_info_ratelimited("No fuse device found\n"); 1106 return -ENOTCONN; 1107 } 1108 fc = fud->fc; 1109 1110 /* Once a connection has io-uring enabled on it, it can't be disabled */ 1111 if (!enable_uring && !fc->io_uring) { 1112 pr_info_ratelimited("fuse-io-uring is disabled\n"); 1113 return -EOPNOTSUPP; 1114 } 1115 1116 if (fc->aborted) 1117 return -ECONNABORTED; 1118 if (!fc->connected) 1119 return -ENOTCONN; 1120 1121 /* 1122 * fuse_uring_register() needs the ring to be initialized, 1123 * we need to know the max payload size 1124 */ 1125 if (!fc->initialized) 1126 return -EAGAIN; 1127 1128 switch (cmd_op) { 1129 case FUSE_IO_URING_CMD_REGISTER: 1130 err = fuse_uring_register(cmd, issue_flags, fc); 1131 if (err) { 1132 pr_info_once("FUSE_IO_URING_CMD_REGISTER failed err=%d\n", 1133 err); 1134 fc->io_uring = 0; 1135 wake_up_all(&fc->blocked_waitq); 1136 return err; 1137 } 1138 break; 1139 case FUSE_IO_URING_CMD_COMMIT_AND_FETCH: 1140 err = fuse_uring_commit_fetch(cmd, issue_flags, fc); 1141 if (err) { 1142 pr_info_once("FUSE_IO_URING_COMMIT_AND_FETCH failed err=%d\n", 1143 err); 1144 return err; 1145 } 1146 break; 1147 default: 1148 return -EINVAL; 1149 } 1150 1151 return -EIOCBQUEUED; 1152 } 1153 1154 static void fuse_uring_send(struct fuse_ring_ent *ent, struct io_uring_cmd *cmd, 1155 ssize_t ret, unsigned int issue_flags) 1156 { 1157 struct fuse_ring_queue *queue = ent->queue; 1158 1159 spin_lock(&queue->lock); 1160 ent->state = FRRS_USERSPACE; 1161 list_move(&ent->list, &queue->ent_in_userspace); 1162 ent->cmd = NULL; 1163 spin_unlock(&queue->lock); 1164 1165 io_uring_cmd_done(cmd, ret, 0, issue_flags); 1166 } 1167 1168 /* 1169 * This prepares and sends the ring request in fuse-uring task context. 1170 * User buffers are not mapped yet - the application does not have permission 1171 * to write to it - this has to be executed in ring task context. 1172 */ 1173 static void fuse_uring_send_in_task(struct io_uring_cmd *cmd, 1174 unsigned int issue_flags) 1175 { 1176 struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd); 1177 struct fuse_ring_queue *queue = ent->queue; 1178 int err; 1179 1180 if (!(issue_flags & IO_URING_F_TASK_DEAD)) { 1181 err = fuse_uring_prepare_send(ent, ent->fuse_req); 1182 if (err) { 1183 fuse_uring_next_fuse_req(ent, queue, issue_flags); 1184 return; 1185 } 1186 } else { 1187 err = -ECANCELED; 1188 } 1189 1190 fuse_uring_send(ent, cmd, err, issue_flags); 1191 } 1192 1193 static struct fuse_ring_queue *fuse_uring_task_to_queue(struct fuse_ring *ring) 1194 { 1195 unsigned int qid; 1196 struct fuse_ring_queue *queue; 1197 1198 qid = task_cpu(current); 1199 1200 if (WARN_ONCE(qid >= ring->nr_queues, 1201 "Core number (%u) exceeds nr queues (%zu)\n", qid, 1202 ring->nr_queues)) 1203 qid = 0; 1204 1205 queue = ring->queues[qid]; 1206 WARN_ONCE(!queue, "Missing queue for qid %d\n", qid); 1207 1208 return queue; 1209 } 1210 1211 static void fuse_uring_dispatch_ent(struct fuse_ring_ent *ent) 1212 { 1213 struct io_uring_cmd *cmd = ent->cmd; 1214 1215 uring_cmd_set_ring_ent(cmd, ent); 1216 io_uring_cmd_complete_in_task(cmd, fuse_uring_send_in_task); 1217 } 1218 1219 /* queue a fuse request and send it if a ring entry is available */ 1220 void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req) 1221 { 1222 struct fuse_conn *fc = req->fm->fc; 1223 struct fuse_ring *ring = fc->ring; 1224 struct fuse_ring_queue *queue; 1225 struct fuse_ring_ent *ent = NULL; 1226 int err; 1227 1228 err = -EINVAL; 1229 queue = fuse_uring_task_to_queue(ring); 1230 if (!queue) 1231 goto err; 1232 1233 if (req->in.h.opcode != FUSE_NOTIFY_REPLY) 1234 req->in.h.unique = fuse_get_unique(fiq); 1235 1236 spin_lock(&queue->lock); 1237 err = -ENOTCONN; 1238 if (unlikely(queue->stopped)) 1239 goto err_unlock; 1240 1241 ent = list_first_entry_or_null(&queue->ent_avail_queue, 1242 struct fuse_ring_ent, list); 1243 if (ent) 1244 fuse_uring_add_req_to_ring_ent(ent, req); 1245 else 1246 list_add_tail(&req->list, &queue->fuse_req_queue); 1247 spin_unlock(&queue->lock); 1248 1249 if (ent) 1250 fuse_uring_dispatch_ent(ent); 1251 1252 return; 1253 1254 err_unlock: 1255 spin_unlock(&queue->lock); 1256 err: 1257 req->out.h.error = err; 1258 clear_bit(FR_PENDING, &req->flags); 1259 fuse_request_end(req); 1260 } 1261 1262 bool fuse_uring_queue_bq_req(struct fuse_req *req) 1263 { 1264 struct fuse_conn *fc = req->fm->fc; 1265 struct fuse_ring *ring = fc->ring; 1266 struct fuse_ring_queue *queue; 1267 struct fuse_ring_ent *ent = NULL; 1268 1269 queue = fuse_uring_task_to_queue(ring); 1270 if (!queue) 1271 return false; 1272 1273 spin_lock(&queue->lock); 1274 if (unlikely(queue->stopped)) { 1275 spin_unlock(&queue->lock); 1276 return false; 1277 } 1278 1279 list_add_tail(&req->list, &queue->fuse_req_bg_queue); 1280 1281 ent = list_first_entry_or_null(&queue->ent_avail_queue, 1282 struct fuse_ring_ent, list); 1283 spin_lock(&fc->bg_lock); 1284 fc->num_background++; 1285 if (fc->num_background == fc->max_background) 1286 fc->blocked = 1; 1287 fuse_uring_flush_bg(queue); 1288 spin_unlock(&fc->bg_lock); 1289 1290 /* 1291 * Due to bg_queue flush limits there might be other bg requests 1292 * in the queue that need to be handled first. Or no further req 1293 * might be available. 1294 */ 1295 req = list_first_entry_or_null(&queue->fuse_req_queue, struct fuse_req, 1296 list); 1297 if (ent && req) { 1298 fuse_uring_add_req_to_ring_ent(ent, req); 1299 spin_unlock(&queue->lock); 1300 1301 fuse_uring_dispatch_ent(ent); 1302 } else { 1303 spin_unlock(&queue->lock); 1304 } 1305 1306 return true; 1307 } 1308 1309 static const struct fuse_iqueue_ops fuse_io_uring_ops = { 1310 /* should be send over io-uring as enhancement */ 1311 .send_forget = fuse_dev_queue_forget, 1312 1313 /* 1314 * could be send over io-uring, but interrupts should be rare, 1315 * no need to make the code complex 1316 */ 1317 .send_interrupt = fuse_dev_queue_interrupt, 1318 .send_req = fuse_uring_queue_fuse_req, 1319 }; 1320