1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * FUSE: Filesystem in Userspace
4 * Copyright (c) 2023-2024 DataDirect Networks.
5 */
6
7 #include "fuse_i.h"
8 #include "dev_uring_i.h"
9 #include "fuse_dev_i.h"
10 #include "fuse_trace.h"
11
12 #include <linux/fs.h>
13 #include <linux/io_uring/cmd.h>
14
15 static bool __read_mostly enable_uring;
16 module_param(enable_uring, bool, 0644);
17 MODULE_PARM_DESC(enable_uring,
18 "Enable userspace communication through io-uring");
19
20 #define FUSE_URING_IOV_SEGS 2 /* header and payload */
21
22
fuse_uring_enabled(void)23 bool fuse_uring_enabled(void)
24 {
25 return enable_uring;
26 }
27
28 struct fuse_uring_pdu {
29 struct fuse_ring_ent *ent;
30 };
31
32 static const struct fuse_iqueue_ops fuse_io_uring_ops;
33
uring_cmd_set_ring_ent(struct io_uring_cmd * cmd,struct fuse_ring_ent * ring_ent)34 static void uring_cmd_set_ring_ent(struct io_uring_cmd *cmd,
35 struct fuse_ring_ent *ring_ent)
36 {
37 struct fuse_uring_pdu *pdu =
38 io_uring_cmd_to_pdu(cmd, struct fuse_uring_pdu);
39
40 pdu->ent = ring_ent;
41 }
42
uring_cmd_to_ring_ent(struct io_uring_cmd * cmd)43 static struct fuse_ring_ent *uring_cmd_to_ring_ent(struct io_uring_cmd *cmd)
44 {
45 struct fuse_uring_pdu *pdu =
46 io_uring_cmd_to_pdu(cmd, struct fuse_uring_pdu);
47
48 return pdu->ent;
49 }
50
fuse_uring_flush_bg(struct fuse_ring_queue * queue)51 static void fuse_uring_flush_bg(struct fuse_ring_queue *queue)
52 {
53 struct fuse_ring *ring = queue->ring;
54 struct fuse_conn *fc = ring->fc;
55
56 lockdep_assert_held(&queue->lock);
57 lockdep_assert_held(&fc->bg_lock);
58
59 /*
60 * Allow one bg request per queue, ignoring global fc limits.
61 * This prevents a single queue from consuming all resources and
62 * eliminates the need for remote queue wake-ups when global
63 * limits are met but this queue has no more waiting requests.
64 */
65 while ((fc->active_background < fc->max_background ||
66 !queue->active_background) &&
67 (!list_empty(&queue->fuse_req_bg_queue))) {
68 struct fuse_req *req;
69
70 req = list_first_entry(&queue->fuse_req_bg_queue,
71 struct fuse_req, list);
72 fc->active_background++;
73 queue->active_background++;
74
75 list_move_tail(&req->list, &queue->fuse_req_queue);
76 }
77 }
78
fuse_uring_req_end(struct fuse_ring_ent * ent,struct fuse_req * req,int error)79 static void fuse_uring_req_end(struct fuse_ring_ent *ent, struct fuse_req *req,
80 int error)
81 {
82 struct fuse_ring_queue *queue = ent->queue;
83 struct fuse_ring *ring = queue->ring;
84 struct fuse_conn *fc = ring->fc;
85
86 lockdep_assert_not_held(&queue->lock);
87 spin_lock(&queue->lock);
88 ent->fuse_req = NULL;
89 list_del_init(&req->list);
90 if (test_bit(FR_BACKGROUND, &req->flags)) {
91 queue->active_background--;
92 spin_lock(&fc->bg_lock);
93 fuse_uring_flush_bg(queue);
94 spin_unlock(&fc->bg_lock);
95 }
96
97 spin_unlock(&queue->lock);
98
99 if (error)
100 req->out.h.error = error;
101
102 clear_bit(FR_SENT, &req->flags);
103 fuse_request_end(req);
104 }
105
106 /* Abort all list queued request on the given ring queue */
fuse_uring_abort_end_queue_requests(struct fuse_ring_queue * queue)107 static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue)
108 {
109 struct fuse_req *req;
110 LIST_HEAD(req_list);
111
112 spin_lock(&queue->lock);
113 list_for_each_entry(req, &queue->fuse_req_queue, list)
114 clear_bit(FR_PENDING, &req->flags);
115 list_splice_init(&queue->fuse_req_queue, &req_list);
116 spin_unlock(&queue->lock);
117
118 /* must not hold queue lock to avoid order issues with fi->lock */
119 fuse_dev_end_requests(&req_list);
120 }
121
fuse_uring_abort_end_requests(struct fuse_ring * ring)122 void fuse_uring_abort_end_requests(struct fuse_ring *ring)
123 {
124 int qid;
125 struct fuse_ring_queue *queue;
126 struct fuse_conn *fc = ring->fc;
127
128 for (qid = 0; qid < ring->nr_queues; qid++) {
129 queue = READ_ONCE(ring->queues[qid]);
130 if (!queue)
131 continue;
132
133 queue->stopped = true;
134
135 WARN_ON_ONCE(ring->fc->max_background != UINT_MAX);
136 spin_lock(&queue->lock);
137 spin_lock(&fc->bg_lock);
138 fuse_uring_flush_bg(queue);
139 spin_unlock(&fc->bg_lock);
140 spin_unlock(&queue->lock);
141 fuse_uring_abort_end_queue_requests(queue);
142 }
143 }
144
ent_list_request_expired(struct fuse_conn * fc,struct list_head * list)145 static bool ent_list_request_expired(struct fuse_conn *fc, struct list_head *list)
146 {
147 struct fuse_ring_ent *ent;
148 struct fuse_req *req;
149
150 ent = list_first_entry_or_null(list, struct fuse_ring_ent, list);
151 if (!ent)
152 return false;
153
154 req = ent->fuse_req;
155
156 return time_is_before_jiffies(req->create_time +
157 fc->timeout.req_timeout);
158 }
159
fuse_uring_request_expired(struct fuse_conn * fc)160 bool fuse_uring_request_expired(struct fuse_conn *fc)
161 {
162 struct fuse_ring *ring = fc->ring;
163 struct fuse_ring_queue *queue;
164 int qid;
165
166 if (!ring)
167 return false;
168
169 for (qid = 0; qid < ring->nr_queues; qid++) {
170 queue = READ_ONCE(ring->queues[qid]);
171 if (!queue)
172 continue;
173
174 spin_lock(&queue->lock);
175 if (fuse_request_expired(fc, &queue->fuse_req_queue) ||
176 fuse_request_expired(fc, &queue->fuse_req_bg_queue) ||
177 ent_list_request_expired(fc, &queue->ent_w_req_queue) ||
178 ent_list_request_expired(fc, &queue->ent_in_userspace)) {
179 spin_unlock(&queue->lock);
180 return true;
181 }
182 spin_unlock(&queue->lock);
183 }
184
185 return false;
186 }
187
fuse_uring_destruct(struct fuse_conn * fc)188 void fuse_uring_destruct(struct fuse_conn *fc)
189 {
190 struct fuse_ring *ring = fc->ring;
191 int qid;
192
193 if (!ring)
194 return;
195
196 for (qid = 0; qid < ring->nr_queues; qid++) {
197 struct fuse_ring_queue *queue = ring->queues[qid];
198 struct fuse_ring_ent *ent, *next;
199
200 if (!queue)
201 continue;
202
203 WARN_ON(!list_empty(&queue->ent_avail_queue));
204 WARN_ON(!list_empty(&queue->ent_w_req_queue));
205 WARN_ON(!list_empty(&queue->ent_commit_queue));
206 WARN_ON(!list_empty(&queue->ent_in_userspace));
207
208 list_for_each_entry_safe(ent, next, &queue->ent_released,
209 list) {
210 list_del_init(&ent->list);
211 kfree(ent);
212 }
213
214 kfree(queue->fpq.processing);
215 kfree(queue);
216 ring->queues[qid] = NULL;
217 }
218
219 kfree(ring->queues);
220 kfree(ring);
221 fc->ring = NULL;
222 }
223
224 /*
225 * Basic ring setup for this connection based on the provided configuration
226 */
fuse_uring_create(struct fuse_conn * fc)227 static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc)
228 {
229 struct fuse_ring *ring;
230 size_t nr_queues = num_possible_cpus();
231 struct fuse_ring *res = NULL;
232 size_t max_payload_size;
233
234 ring = kzalloc(sizeof(*fc->ring), GFP_KERNEL_ACCOUNT);
235 if (!ring)
236 return NULL;
237
238 ring->queues = kcalloc(nr_queues, sizeof(struct fuse_ring_queue *),
239 GFP_KERNEL_ACCOUNT);
240 if (!ring->queues)
241 goto out_err;
242
243 max_payload_size = max(FUSE_MIN_READ_BUFFER, fc->max_write);
244 max_payload_size = max(max_payload_size, fc->max_pages * PAGE_SIZE);
245
246 spin_lock(&fc->lock);
247 if (fc->ring) {
248 /* race, another thread created the ring in the meantime */
249 spin_unlock(&fc->lock);
250 res = fc->ring;
251 goto out_err;
252 }
253
254 init_waitqueue_head(&ring->stop_waitq);
255
256 ring->nr_queues = nr_queues;
257 ring->fc = fc;
258 ring->max_payload_sz = max_payload_size;
259 smp_store_release(&fc->ring, ring);
260
261 spin_unlock(&fc->lock);
262 return ring;
263
264 out_err:
265 kfree(ring->queues);
266 kfree(ring);
267 return res;
268 }
269
fuse_uring_create_queue(struct fuse_ring * ring,int qid)270 static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring,
271 int qid)
272 {
273 struct fuse_conn *fc = ring->fc;
274 struct fuse_ring_queue *queue;
275 struct list_head *pq;
276
277 queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT);
278 if (!queue)
279 return NULL;
280 pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL);
281 if (!pq) {
282 kfree(queue);
283 return NULL;
284 }
285
286 queue->qid = qid;
287 queue->ring = ring;
288 spin_lock_init(&queue->lock);
289
290 INIT_LIST_HEAD(&queue->ent_avail_queue);
291 INIT_LIST_HEAD(&queue->ent_commit_queue);
292 INIT_LIST_HEAD(&queue->ent_w_req_queue);
293 INIT_LIST_HEAD(&queue->ent_in_userspace);
294 INIT_LIST_HEAD(&queue->fuse_req_queue);
295 INIT_LIST_HEAD(&queue->fuse_req_bg_queue);
296 INIT_LIST_HEAD(&queue->ent_released);
297
298 queue->fpq.processing = pq;
299 fuse_pqueue_init(&queue->fpq);
300
301 spin_lock(&fc->lock);
302 if (ring->queues[qid]) {
303 spin_unlock(&fc->lock);
304 kfree(queue->fpq.processing);
305 kfree(queue);
306 return ring->queues[qid];
307 }
308
309 /*
310 * write_once and lock as the caller mostly doesn't take the lock at all
311 */
312 WRITE_ONCE(ring->queues[qid], queue);
313 spin_unlock(&fc->lock);
314
315 return queue;
316 }
317
fuse_uring_stop_fuse_req_end(struct fuse_req * req)318 static void fuse_uring_stop_fuse_req_end(struct fuse_req *req)
319 {
320 clear_bit(FR_SENT, &req->flags);
321 req->out.h.error = -ECONNABORTED;
322 fuse_request_end(req);
323 }
324
325 /*
326 * Release a request/entry on connection tear down
327 */
fuse_uring_entry_teardown(struct fuse_ring_ent * ent)328 static void fuse_uring_entry_teardown(struct fuse_ring_ent *ent)
329 {
330 struct fuse_req *req;
331 struct io_uring_cmd *cmd;
332
333 struct fuse_ring_queue *queue = ent->queue;
334
335 spin_lock(&queue->lock);
336 cmd = ent->cmd;
337 ent->cmd = NULL;
338 req = ent->fuse_req;
339 ent->fuse_req = NULL;
340 if (req) {
341 /* remove entry from queue->fpq->processing */
342 list_del_init(&req->list);
343 }
344
345 /*
346 * The entry must not be freed immediately, due to access of direct
347 * pointer access of entries through IO_URING_F_CANCEL - there is a risk
348 * of race between daemon termination (which triggers IO_URING_F_CANCEL
349 * and accesses entries without checking the list state first
350 */
351 list_move(&ent->list, &queue->ent_released);
352 ent->state = FRRS_RELEASED;
353 spin_unlock(&queue->lock);
354
355 if (cmd)
356 io_uring_cmd_done(cmd, -ENOTCONN, IO_URING_F_UNLOCKED);
357
358 if (req)
359 fuse_uring_stop_fuse_req_end(req);
360 }
361
fuse_uring_stop_list_entries(struct list_head * head,struct fuse_ring_queue * queue,enum fuse_ring_req_state exp_state)362 static void fuse_uring_stop_list_entries(struct list_head *head,
363 struct fuse_ring_queue *queue,
364 enum fuse_ring_req_state exp_state)
365 {
366 struct fuse_ring *ring = queue->ring;
367 struct fuse_ring_ent *ent, *next;
368 ssize_t queue_refs = SSIZE_MAX;
369 LIST_HEAD(to_teardown);
370
371 spin_lock(&queue->lock);
372 list_for_each_entry_safe(ent, next, head, list) {
373 if (ent->state != exp_state) {
374 pr_warn("entry teardown qid=%d state=%d expected=%d",
375 queue->qid, ent->state, exp_state);
376 continue;
377 }
378
379 ent->state = FRRS_TEARDOWN;
380 list_move(&ent->list, &to_teardown);
381 }
382 spin_unlock(&queue->lock);
383
384 /* no queue lock to avoid lock order issues */
385 list_for_each_entry_safe(ent, next, &to_teardown, list) {
386 fuse_uring_entry_teardown(ent);
387 queue_refs = atomic_dec_return(&ring->queue_refs);
388 WARN_ON_ONCE(queue_refs < 0);
389 }
390 }
391
fuse_uring_teardown_entries(struct fuse_ring_queue * queue)392 static void fuse_uring_teardown_entries(struct fuse_ring_queue *queue)
393 {
394 fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue,
395 FRRS_USERSPACE);
396 fuse_uring_stop_list_entries(&queue->ent_avail_queue, queue,
397 FRRS_AVAILABLE);
398 }
399
400 /*
401 * Log state debug info
402 */
fuse_uring_log_ent_state(struct fuse_ring * ring)403 static void fuse_uring_log_ent_state(struct fuse_ring *ring)
404 {
405 int qid;
406 struct fuse_ring_ent *ent;
407
408 for (qid = 0; qid < ring->nr_queues; qid++) {
409 struct fuse_ring_queue *queue = ring->queues[qid];
410
411 if (!queue)
412 continue;
413
414 spin_lock(&queue->lock);
415 /*
416 * Log entries from the intermediate queue, the other queues
417 * should be empty
418 */
419 list_for_each_entry(ent, &queue->ent_w_req_queue, list) {
420 pr_info(" ent-req-queue ring=%p qid=%d ent=%p state=%d\n",
421 ring, qid, ent, ent->state);
422 }
423 list_for_each_entry(ent, &queue->ent_commit_queue, list) {
424 pr_info(" ent-commit-queue ring=%p qid=%d ent=%p state=%d\n",
425 ring, qid, ent, ent->state);
426 }
427 spin_unlock(&queue->lock);
428 }
429 ring->stop_debug_log = 1;
430 }
431
fuse_uring_async_stop_queues(struct work_struct * work)432 static void fuse_uring_async_stop_queues(struct work_struct *work)
433 {
434 int qid;
435 struct fuse_ring *ring =
436 container_of(work, struct fuse_ring, async_teardown_work.work);
437
438 /* XXX code dup */
439 for (qid = 0; qid < ring->nr_queues; qid++) {
440 struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]);
441
442 if (!queue)
443 continue;
444
445 fuse_uring_teardown_entries(queue);
446 }
447
448 /*
449 * Some ring entries might be in the middle of IO operations,
450 * i.e. in process to get handled by file_operations::uring_cmd
451 * or on the way to userspace - we could handle that with conditions in
452 * run time code, but easier/cleaner to have an async tear down handler
453 * If there are still queue references left
454 */
455 if (atomic_read(&ring->queue_refs) > 0) {
456 if (time_after(jiffies,
457 ring->teardown_time + FUSE_URING_TEARDOWN_TIMEOUT))
458 fuse_uring_log_ent_state(ring);
459
460 schedule_delayed_work(&ring->async_teardown_work,
461 FUSE_URING_TEARDOWN_INTERVAL);
462 } else {
463 wake_up_all(&ring->stop_waitq);
464 }
465 }
466
467 /*
468 * Stop the ring queues
469 */
fuse_uring_stop_queues(struct fuse_ring * ring)470 void fuse_uring_stop_queues(struct fuse_ring *ring)
471 {
472 int qid;
473
474 for (qid = 0; qid < ring->nr_queues; qid++) {
475 struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]);
476
477 if (!queue)
478 continue;
479
480 fuse_uring_teardown_entries(queue);
481 }
482
483 if (atomic_read(&ring->queue_refs) > 0) {
484 ring->teardown_time = jiffies;
485 INIT_DELAYED_WORK(&ring->async_teardown_work,
486 fuse_uring_async_stop_queues);
487 schedule_delayed_work(&ring->async_teardown_work,
488 FUSE_URING_TEARDOWN_INTERVAL);
489 } else {
490 wake_up_all(&ring->stop_waitq);
491 }
492 }
493
494 /*
495 * Handle IO_URING_F_CANCEL, typically should come on daemon termination.
496 *
497 * Releasing the last entry should trigger fuse_dev_release() if
498 * the daemon was terminated
499 */
fuse_uring_cancel(struct io_uring_cmd * cmd,unsigned int issue_flags)500 static void fuse_uring_cancel(struct io_uring_cmd *cmd,
501 unsigned int issue_flags)
502 {
503 struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd);
504 struct fuse_ring_queue *queue;
505 bool need_cmd_done = false;
506
507 /*
508 * direct access on ent - it must not be destructed as long as
509 * IO_URING_F_CANCEL might come up
510 */
511 queue = ent->queue;
512 spin_lock(&queue->lock);
513 if (ent->state == FRRS_AVAILABLE) {
514 ent->state = FRRS_USERSPACE;
515 list_move_tail(&ent->list, &queue->ent_in_userspace);
516 need_cmd_done = true;
517 ent->cmd = NULL;
518 }
519 spin_unlock(&queue->lock);
520
521 if (need_cmd_done) {
522 /* no queue lock to avoid lock order issues */
523 io_uring_cmd_done(cmd, -ENOTCONN, issue_flags);
524 }
525 }
526
fuse_uring_prepare_cancel(struct io_uring_cmd * cmd,int issue_flags,struct fuse_ring_ent * ring_ent)527 static void fuse_uring_prepare_cancel(struct io_uring_cmd *cmd, int issue_flags,
528 struct fuse_ring_ent *ring_ent)
529 {
530 uring_cmd_set_ring_ent(cmd, ring_ent);
531 io_uring_cmd_mark_cancelable(cmd, issue_flags);
532 }
533
534 /*
535 * Checks for errors and stores it into the request
536 */
fuse_uring_out_header_has_err(struct fuse_out_header * oh,struct fuse_req * req,struct fuse_conn * fc)537 static int fuse_uring_out_header_has_err(struct fuse_out_header *oh,
538 struct fuse_req *req,
539 struct fuse_conn *fc)
540 {
541 int err;
542
543 err = -EINVAL;
544 if (oh->unique == 0) {
545 /* Not supported through io-uring yet */
546 pr_warn_once("notify through fuse-io-uring not supported\n");
547 goto err;
548 }
549
550 if (oh->error <= -ERESTARTSYS || oh->error > 0)
551 goto err;
552
553 if (oh->error) {
554 err = oh->error;
555 goto err;
556 }
557
558 err = -ENOENT;
559 if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) {
560 pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n",
561 req->in.h.unique,
562 oh->unique & ~FUSE_INT_REQ_BIT);
563 goto err;
564 }
565
566 /*
567 * Is it an interrupt reply ID?
568 * XXX: Not supported through fuse-io-uring yet, it should not even
569 * find the request - should not happen.
570 */
571 WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT);
572
573 err = 0;
574 err:
575 return err;
576 }
577
fuse_uring_copy_from_ring(struct fuse_ring * ring,struct fuse_req * req,struct fuse_ring_ent * ent)578 static int fuse_uring_copy_from_ring(struct fuse_ring *ring,
579 struct fuse_req *req,
580 struct fuse_ring_ent *ent)
581 {
582 struct fuse_copy_state cs;
583 struct fuse_args *args = req->args;
584 struct iov_iter iter;
585 int err;
586 struct fuse_uring_ent_in_out ring_in_out;
587
588 err = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out,
589 sizeof(ring_in_out));
590 if (err)
591 return -EFAULT;
592
593 err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz,
594 &iter);
595 if (err)
596 return err;
597
598 fuse_copy_init(&cs, false, &iter);
599 cs.is_uring = true;
600 cs.req = req;
601
602 err = fuse_copy_out_args(&cs, args, ring_in_out.payload_sz);
603 fuse_copy_finish(&cs);
604 return err;
605 }
606
607 /*
608 * Copy data from the req to the ring buffer
609 */
fuse_uring_args_to_ring(struct fuse_ring * ring,struct fuse_req * req,struct fuse_ring_ent * ent)610 static int fuse_uring_args_to_ring(struct fuse_ring *ring, struct fuse_req *req,
611 struct fuse_ring_ent *ent)
612 {
613 struct fuse_copy_state cs;
614 struct fuse_args *args = req->args;
615 struct fuse_in_arg *in_args = args->in_args;
616 int num_args = args->in_numargs;
617 int err;
618 struct iov_iter iter;
619 struct fuse_uring_ent_in_out ent_in_out = {
620 .flags = 0,
621 .commit_id = req->in.h.unique,
622 };
623
624 err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter);
625 if (err) {
626 pr_info_ratelimited("fuse: Import of user buffer failed\n");
627 return err;
628 }
629
630 fuse_copy_init(&cs, true, &iter);
631 cs.is_uring = true;
632 cs.req = req;
633
634 if (num_args > 0) {
635 /*
636 * Expectation is that the first argument is the per op header.
637 * Some op code have that as zero size.
638 */
639 if (args->in_args[0].size > 0) {
640 err = copy_to_user(&ent->headers->op_in, in_args->value,
641 in_args->size);
642 if (err) {
643 pr_info_ratelimited(
644 "Copying the header failed.\n");
645 return -EFAULT;
646 }
647 }
648 in_args++;
649 num_args--;
650 }
651
652 /* copy the payload */
653 err = fuse_copy_args(&cs, num_args, args->in_pages,
654 (struct fuse_arg *)in_args, 0);
655 fuse_copy_finish(&cs);
656 if (err) {
657 pr_info_ratelimited("%s fuse_copy_args failed\n", __func__);
658 return err;
659 }
660
661 ent_in_out.payload_sz = cs.ring.copied_sz;
662 err = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out,
663 sizeof(ent_in_out));
664 return err ? -EFAULT : 0;
665 }
666
fuse_uring_copy_to_ring(struct fuse_ring_ent * ent,struct fuse_req * req)667 static int fuse_uring_copy_to_ring(struct fuse_ring_ent *ent,
668 struct fuse_req *req)
669 {
670 struct fuse_ring_queue *queue = ent->queue;
671 struct fuse_ring *ring = queue->ring;
672 int err;
673
674 err = -EIO;
675 if (WARN_ON(ent->state != FRRS_FUSE_REQ)) {
676 pr_err("qid=%d ring-req=%p invalid state %d on send\n",
677 queue->qid, ent, ent->state);
678 return err;
679 }
680
681 err = -EINVAL;
682 if (WARN_ON(req->in.h.unique == 0))
683 return err;
684
685 /* copy the request */
686 err = fuse_uring_args_to_ring(ring, req, ent);
687 if (unlikely(err)) {
688 pr_info_ratelimited("Copy to ring failed: %d\n", err);
689 return err;
690 }
691
692 /* copy fuse_in_header */
693 err = copy_to_user(&ent->headers->in_out, &req->in.h,
694 sizeof(req->in.h));
695 if (err) {
696 err = -EFAULT;
697 return err;
698 }
699
700 return 0;
701 }
702
fuse_uring_prepare_send(struct fuse_ring_ent * ent,struct fuse_req * req)703 static int fuse_uring_prepare_send(struct fuse_ring_ent *ent,
704 struct fuse_req *req)
705 {
706 int err;
707
708 err = fuse_uring_copy_to_ring(ent, req);
709 if (!err)
710 set_bit(FR_SENT, &req->flags);
711 else
712 fuse_uring_req_end(ent, req, err);
713
714 return err;
715 }
716
717 /*
718 * Write data to the ring buffer and send the request to userspace,
719 * userspace will read it
720 * This is comparable with classical read(/dev/fuse)
721 */
fuse_uring_send_next_to_ring(struct fuse_ring_ent * ent,struct fuse_req * req,unsigned int issue_flags)722 static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ent,
723 struct fuse_req *req,
724 unsigned int issue_flags)
725 {
726 struct fuse_ring_queue *queue = ent->queue;
727 int err;
728 struct io_uring_cmd *cmd;
729
730 err = fuse_uring_prepare_send(ent, req);
731 if (err)
732 return err;
733
734 spin_lock(&queue->lock);
735 cmd = ent->cmd;
736 ent->cmd = NULL;
737 ent->state = FRRS_USERSPACE;
738 list_move_tail(&ent->list, &queue->ent_in_userspace);
739 spin_unlock(&queue->lock);
740
741 io_uring_cmd_done(cmd, 0, issue_flags);
742 return 0;
743 }
744
745 /*
746 * Make a ring entry available for fuse_req assignment
747 */
fuse_uring_ent_avail(struct fuse_ring_ent * ent,struct fuse_ring_queue * queue)748 static void fuse_uring_ent_avail(struct fuse_ring_ent *ent,
749 struct fuse_ring_queue *queue)
750 {
751 WARN_ON_ONCE(!ent->cmd);
752 list_move(&ent->list, &queue->ent_avail_queue);
753 ent->state = FRRS_AVAILABLE;
754 }
755
756 /* Used to find the request on SQE commit */
fuse_uring_add_to_pq(struct fuse_ring_ent * ent,struct fuse_req * req)757 static void fuse_uring_add_to_pq(struct fuse_ring_ent *ent,
758 struct fuse_req *req)
759 {
760 struct fuse_ring_queue *queue = ent->queue;
761 struct fuse_pqueue *fpq = &queue->fpq;
762 unsigned int hash;
763
764 req->ring_entry = ent;
765 hash = fuse_req_hash(req->in.h.unique);
766 list_move_tail(&req->list, &fpq->processing[hash]);
767 }
768
769 /*
770 * Assign a fuse queue entry to the given entry
771 */
fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent * ent,struct fuse_req * req)772 static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ent,
773 struct fuse_req *req)
774 {
775 struct fuse_ring_queue *queue = ent->queue;
776
777 lockdep_assert_held(&queue->lock);
778
779 if (WARN_ON_ONCE(ent->state != FRRS_AVAILABLE &&
780 ent->state != FRRS_COMMIT)) {
781 pr_warn("%s qid=%d state=%d\n", __func__, ent->queue->qid,
782 ent->state);
783 }
784
785 clear_bit(FR_PENDING, &req->flags);
786 ent->fuse_req = req;
787 ent->state = FRRS_FUSE_REQ;
788 list_move_tail(&ent->list, &queue->ent_w_req_queue);
789 fuse_uring_add_to_pq(ent, req);
790 }
791
792 /* Fetch the next fuse request if available */
fuse_uring_ent_assign_req(struct fuse_ring_ent * ent)793 static struct fuse_req *fuse_uring_ent_assign_req(struct fuse_ring_ent *ent)
794 __must_hold(&queue->lock)
795 {
796 struct fuse_req *req;
797 struct fuse_ring_queue *queue = ent->queue;
798 struct list_head *req_queue = &queue->fuse_req_queue;
799
800 lockdep_assert_held(&queue->lock);
801
802 /* get and assign the next entry while it is still holding the lock */
803 req = list_first_entry_or_null(req_queue, struct fuse_req, list);
804 if (req)
805 fuse_uring_add_req_to_ring_ent(ent, req);
806
807 return req;
808 }
809
810 /*
811 * Read data from the ring buffer, which user space has written to
812 * This is comparible with handling of classical write(/dev/fuse).
813 * Also make the ring request available again for new fuse requests.
814 */
fuse_uring_commit(struct fuse_ring_ent * ent,struct fuse_req * req,unsigned int issue_flags)815 static void fuse_uring_commit(struct fuse_ring_ent *ent, struct fuse_req *req,
816 unsigned int issue_flags)
817 {
818 struct fuse_ring *ring = ent->queue->ring;
819 struct fuse_conn *fc = ring->fc;
820 ssize_t err = 0;
821
822 err = copy_from_user(&req->out.h, &ent->headers->in_out,
823 sizeof(req->out.h));
824 if (err) {
825 req->out.h.error = -EFAULT;
826 goto out;
827 }
828
829 err = fuse_uring_out_header_has_err(&req->out.h, req, fc);
830 if (err) {
831 /* req->out.h.error already set */
832 goto out;
833 }
834
835 err = fuse_uring_copy_from_ring(ring, req, ent);
836 out:
837 fuse_uring_req_end(ent, req, err);
838 }
839
840 /*
841 * Get the next fuse req and send it
842 */
fuse_uring_next_fuse_req(struct fuse_ring_ent * ent,struct fuse_ring_queue * queue,unsigned int issue_flags)843 static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ent,
844 struct fuse_ring_queue *queue,
845 unsigned int issue_flags)
846 {
847 int err;
848 struct fuse_req *req;
849
850 retry:
851 spin_lock(&queue->lock);
852 fuse_uring_ent_avail(ent, queue);
853 req = fuse_uring_ent_assign_req(ent);
854 spin_unlock(&queue->lock);
855
856 if (req) {
857 err = fuse_uring_send_next_to_ring(ent, req, issue_flags);
858 if (err)
859 goto retry;
860 }
861 }
862
fuse_ring_ent_set_commit(struct fuse_ring_ent * ent)863 static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent)
864 {
865 struct fuse_ring_queue *queue = ent->queue;
866
867 lockdep_assert_held(&queue->lock);
868
869 if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE))
870 return -EIO;
871
872 ent->state = FRRS_COMMIT;
873 list_move(&ent->list, &queue->ent_commit_queue);
874
875 return 0;
876 }
877
878 /* FUSE_URING_CMD_COMMIT_AND_FETCH handler */
fuse_uring_commit_fetch(struct io_uring_cmd * cmd,int issue_flags,struct fuse_conn * fc)879 static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags,
880 struct fuse_conn *fc)
881 {
882 const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe);
883 struct fuse_ring_ent *ent;
884 int err;
885 struct fuse_ring *ring = fc->ring;
886 struct fuse_ring_queue *queue;
887 uint64_t commit_id = READ_ONCE(cmd_req->commit_id);
888 unsigned int qid = READ_ONCE(cmd_req->qid);
889 struct fuse_pqueue *fpq;
890 struct fuse_req *req;
891
892 err = -ENOTCONN;
893 if (!ring)
894 return err;
895
896 if (qid >= ring->nr_queues)
897 return -EINVAL;
898
899 queue = ring->queues[qid];
900 if (!queue)
901 return err;
902 fpq = &queue->fpq;
903
904 if (!READ_ONCE(fc->connected) || READ_ONCE(queue->stopped))
905 return err;
906
907 spin_lock(&queue->lock);
908 /* Find a request based on the unique ID of the fuse request
909 * This should get revised, as it needs a hash calculation and list
910 * search. And full struct fuse_pqueue is needed (memory overhead).
911 * As well as the link from req to ring_ent.
912 */
913 req = fuse_request_find(fpq, commit_id);
914 err = -ENOENT;
915 if (!req) {
916 pr_info("qid=%d commit_id %llu not found\n", queue->qid,
917 commit_id);
918 spin_unlock(&queue->lock);
919 return err;
920 }
921 list_del_init(&req->list);
922 ent = req->ring_entry;
923 req->ring_entry = NULL;
924
925 err = fuse_ring_ent_set_commit(ent);
926 if (err != 0) {
927 pr_info_ratelimited("qid=%d commit_id %llu state %d",
928 queue->qid, commit_id, ent->state);
929 spin_unlock(&queue->lock);
930 req->out.h.error = err;
931 clear_bit(FR_SENT, &req->flags);
932 fuse_request_end(req);
933 return err;
934 }
935
936 ent->cmd = cmd;
937 spin_unlock(&queue->lock);
938
939 /* without the queue lock, as other locks are taken */
940 fuse_uring_prepare_cancel(cmd, issue_flags, ent);
941 fuse_uring_commit(ent, req, issue_flags);
942
943 /*
944 * Fetching the next request is absolutely required as queued
945 * fuse requests would otherwise not get processed - committing
946 * and fetching is done in one step vs legacy fuse, which has separated
947 * read (fetch request) and write (commit result).
948 */
949 fuse_uring_next_fuse_req(ent, queue, issue_flags);
950 return 0;
951 }
952
is_ring_ready(struct fuse_ring * ring,int current_qid)953 static bool is_ring_ready(struct fuse_ring *ring, int current_qid)
954 {
955 int qid;
956 struct fuse_ring_queue *queue;
957 bool ready = true;
958
959 for (qid = 0; qid < ring->nr_queues && ready; qid++) {
960 if (current_qid == qid)
961 continue;
962
963 queue = ring->queues[qid];
964 if (!queue) {
965 ready = false;
966 break;
967 }
968
969 spin_lock(&queue->lock);
970 if (list_empty(&queue->ent_avail_queue))
971 ready = false;
972 spin_unlock(&queue->lock);
973 }
974
975 return ready;
976 }
977
978 /*
979 * fuse_uring_req_fetch command handling
980 */
fuse_uring_do_register(struct fuse_ring_ent * ent,struct io_uring_cmd * cmd,unsigned int issue_flags)981 static void fuse_uring_do_register(struct fuse_ring_ent *ent,
982 struct io_uring_cmd *cmd,
983 unsigned int issue_flags)
984 {
985 struct fuse_ring_queue *queue = ent->queue;
986 struct fuse_ring *ring = queue->ring;
987 struct fuse_conn *fc = ring->fc;
988 struct fuse_iqueue *fiq = &fc->iq;
989
990 fuse_uring_prepare_cancel(cmd, issue_flags, ent);
991
992 spin_lock(&queue->lock);
993 ent->cmd = cmd;
994 fuse_uring_ent_avail(ent, queue);
995 spin_unlock(&queue->lock);
996
997 if (!ring->ready) {
998 bool ready = is_ring_ready(ring, queue->qid);
999
1000 if (ready) {
1001 WRITE_ONCE(fiq->ops, &fuse_io_uring_ops);
1002 WRITE_ONCE(ring->ready, true);
1003 wake_up_all(&fc->blocked_waitq);
1004 }
1005 }
1006 }
1007
1008 /*
1009 * sqe->addr is a ptr to an iovec array, iov[0] has the headers, iov[1]
1010 * the payload
1011 */
fuse_uring_get_iovec_from_sqe(const struct io_uring_sqe * sqe,struct iovec iov[FUSE_URING_IOV_SEGS])1012 static int fuse_uring_get_iovec_from_sqe(const struct io_uring_sqe *sqe,
1013 struct iovec iov[FUSE_URING_IOV_SEGS])
1014 {
1015 struct iovec __user *uiov = u64_to_user_ptr(READ_ONCE(sqe->addr));
1016 struct iov_iter iter;
1017 ssize_t ret;
1018
1019 if (sqe->len != FUSE_URING_IOV_SEGS)
1020 return -EINVAL;
1021
1022 /*
1023 * Direction for buffer access will actually be READ and WRITE,
1024 * using write for the import should include READ access as well.
1025 */
1026 ret = import_iovec(WRITE, uiov, FUSE_URING_IOV_SEGS,
1027 FUSE_URING_IOV_SEGS, &iov, &iter);
1028 if (ret < 0)
1029 return ret;
1030
1031 return 0;
1032 }
1033
1034 static struct fuse_ring_ent *
fuse_uring_create_ring_ent(struct io_uring_cmd * cmd,struct fuse_ring_queue * queue)1035 fuse_uring_create_ring_ent(struct io_uring_cmd *cmd,
1036 struct fuse_ring_queue *queue)
1037 {
1038 struct fuse_ring *ring = queue->ring;
1039 struct fuse_ring_ent *ent;
1040 size_t payload_size;
1041 struct iovec iov[FUSE_URING_IOV_SEGS];
1042 int err;
1043
1044 err = fuse_uring_get_iovec_from_sqe(cmd->sqe, iov);
1045 if (err) {
1046 pr_info_ratelimited("Failed to get iovec from sqe, err=%d\n",
1047 err);
1048 return ERR_PTR(err);
1049 }
1050
1051 err = -EINVAL;
1052 if (iov[0].iov_len < sizeof(struct fuse_uring_req_header)) {
1053 pr_info_ratelimited("Invalid header len %zu\n", iov[0].iov_len);
1054 return ERR_PTR(err);
1055 }
1056
1057 payload_size = iov[1].iov_len;
1058 if (payload_size < ring->max_payload_sz) {
1059 pr_info_ratelimited("Invalid req payload len %zu\n",
1060 payload_size);
1061 return ERR_PTR(err);
1062 }
1063
1064 err = -ENOMEM;
1065 ent = kzalloc(sizeof(*ent), GFP_KERNEL_ACCOUNT);
1066 if (!ent)
1067 return ERR_PTR(err);
1068
1069 INIT_LIST_HEAD(&ent->list);
1070
1071 ent->queue = queue;
1072 ent->headers = iov[0].iov_base;
1073 ent->payload = iov[1].iov_base;
1074
1075 atomic_inc(&ring->queue_refs);
1076 return ent;
1077 }
1078
1079 /*
1080 * Register header and payload buffer with the kernel and puts the
1081 * entry as "ready to get fuse requests" on the queue
1082 */
fuse_uring_register(struct io_uring_cmd * cmd,unsigned int issue_flags,struct fuse_conn * fc)1083 static int fuse_uring_register(struct io_uring_cmd *cmd,
1084 unsigned int issue_flags, struct fuse_conn *fc)
1085 {
1086 const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe);
1087 struct fuse_ring *ring = smp_load_acquire(&fc->ring);
1088 struct fuse_ring_queue *queue;
1089 struct fuse_ring_ent *ent;
1090 int err;
1091 unsigned int qid = READ_ONCE(cmd_req->qid);
1092
1093 err = -ENOMEM;
1094 if (!ring) {
1095 ring = fuse_uring_create(fc);
1096 if (!ring)
1097 return err;
1098 }
1099
1100 if (qid >= ring->nr_queues) {
1101 pr_info_ratelimited("fuse: Invalid ring qid %u\n", qid);
1102 return -EINVAL;
1103 }
1104
1105 queue = ring->queues[qid];
1106 if (!queue) {
1107 queue = fuse_uring_create_queue(ring, qid);
1108 if (!queue)
1109 return err;
1110 }
1111
1112 /*
1113 * The created queue above does not need to be destructed in
1114 * case of entry errors below, will be done at ring destruction time.
1115 */
1116
1117 ent = fuse_uring_create_ring_ent(cmd, queue);
1118 if (IS_ERR(ent))
1119 return PTR_ERR(ent);
1120
1121 fuse_uring_do_register(ent, cmd, issue_flags);
1122
1123 return 0;
1124 }
1125
1126 /*
1127 * Entry function from io_uring to handle the given passthrough command
1128 * (op code IORING_OP_URING_CMD)
1129 */
fuse_uring_cmd(struct io_uring_cmd * cmd,unsigned int issue_flags)1130 int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
1131 {
1132 struct fuse_dev *fud;
1133 struct fuse_conn *fc;
1134 u32 cmd_op = cmd->cmd_op;
1135 int err;
1136
1137 if ((unlikely(issue_flags & IO_URING_F_CANCEL))) {
1138 fuse_uring_cancel(cmd, issue_flags);
1139 return 0;
1140 }
1141
1142 /* This extra SQE size holds struct fuse_uring_cmd_req */
1143 if (!(issue_flags & IO_URING_F_SQE128))
1144 return -EINVAL;
1145
1146 fud = fuse_get_dev(cmd->file);
1147 if (IS_ERR(fud)) {
1148 pr_info_ratelimited("No fuse device found\n");
1149 return PTR_ERR(fud);
1150 }
1151 fc = fud->fc;
1152
1153 /* Once a connection has io-uring enabled on it, it can't be disabled */
1154 if (!enable_uring && !fc->io_uring) {
1155 pr_info_ratelimited("fuse-io-uring is disabled\n");
1156 return -EOPNOTSUPP;
1157 }
1158
1159 if (fc->aborted)
1160 return -ECONNABORTED;
1161 if (!fc->connected)
1162 return -ENOTCONN;
1163
1164 /*
1165 * fuse_uring_register() needs the ring to be initialized,
1166 * we need to know the max payload size
1167 */
1168 if (!fc->initialized)
1169 return -EAGAIN;
1170
1171 switch (cmd_op) {
1172 case FUSE_IO_URING_CMD_REGISTER:
1173 err = fuse_uring_register(cmd, issue_flags, fc);
1174 if (err) {
1175 pr_info_once("FUSE_IO_URING_CMD_REGISTER failed err=%d\n",
1176 err);
1177 fc->io_uring = 0;
1178 wake_up_all(&fc->blocked_waitq);
1179 return err;
1180 }
1181 break;
1182 case FUSE_IO_URING_CMD_COMMIT_AND_FETCH:
1183 err = fuse_uring_commit_fetch(cmd, issue_flags, fc);
1184 if (err) {
1185 pr_info_once("FUSE_IO_URING_COMMIT_AND_FETCH failed err=%d\n",
1186 err);
1187 return err;
1188 }
1189 break;
1190 default:
1191 return -EINVAL;
1192 }
1193
1194 return -EIOCBQUEUED;
1195 }
1196
fuse_uring_send(struct fuse_ring_ent * ent,struct io_uring_cmd * cmd,ssize_t ret,unsigned int issue_flags)1197 static void fuse_uring_send(struct fuse_ring_ent *ent, struct io_uring_cmd *cmd,
1198 ssize_t ret, unsigned int issue_flags)
1199 {
1200 struct fuse_ring_queue *queue = ent->queue;
1201
1202 spin_lock(&queue->lock);
1203 ent->state = FRRS_USERSPACE;
1204 list_move_tail(&ent->list, &queue->ent_in_userspace);
1205 ent->cmd = NULL;
1206 spin_unlock(&queue->lock);
1207
1208 io_uring_cmd_done(cmd, ret, issue_flags);
1209 }
1210
1211 /*
1212 * This prepares and sends the ring request in fuse-uring task context.
1213 * User buffers are not mapped yet - the application does not have permission
1214 * to write to it - this has to be executed in ring task context.
1215 */
fuse_uring_send_in_task(struct io_tw_req tw_req,io_tw_token_t tw)1216 static void fuse_uring_send_in_task(struct io_tw_req tw_req, io_tw_token_t tw)
1217 {
1218 unsigned int issue_flags = IO_URING_CMD_TASK_WORK_ISSUE_FLAGS;
1219 struct io_uring_cmd *cmd = io_uring_cmd_from_tw(tw_req);
1220 struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd);
1221 struct fuse_ring_queue *queue = ent->queue;
1222 int err;
1223
1224 if (!tw.cancel) {
1225 err = fuse_uring_prepare_send(ent, ent->fuse_req);
1226 if (err) {
1227 fuse_uring_next_fuse_req(ent, queue, issue_flags);
1228 return;
1229 }
1230 } else {
1231 err = -ECANCELED;
1232 }
1233
1234 fuse_uring_send(ent, cmd, err, issue_flags);
1235 }
1236
fuse_uring_task_to_queue(struct fuse_ring * ring)1237 static struct fuse_ring_queue *fuse_uring_task_to_queue(struct fuse_ring *ring)
1238 {
1239 unsigned int qid;
1240 struct fuse_ring_queue *queue;
1241
1242 qid = task_cpu(current);
1243
1244 if (WARN_ONCE(qid >= ring->nr_queues,
1245 "Core number (%u) exceeds nr queues (%zu)\n", qid,
1246 ring->nr_queues))
1247 qid = 0;
1248
1249 queue = ring->queues[qid];
1250 WARN_ONCE(!queue, "Missing queue for qid %d\n", qid);
1251
1252 return queue;
1253 }
1254
fuse_uring_dispatch_ent(struct fuse_ring_ent * ent)1255 static void fuse_uring_dispatch_ent(struct fuse_ring_ent *ent)
1256 {
1257 struct io_uring_cmd *cmd = ent->cmd;
1258
1259 uring_cmd_set_ring_ent(cmd, ent);
1260 io_uring_cmd_complete_in_task(cmd, fuse_uring_send_in_task);
1261 }
1262
1263 /* queue a fuse request and send it if a ring entry is available */
fuse_uring_queue_fuse_req(struct fuse_iqueue * fiq,struct fuse_req * req)1264 void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req)
1265 {
1266 struct fuse_conn *fc = req->fm->fc;
1267 struct fuse_ring *ring = fc->ring;
1268 struct fuse_ring_queue *queue;
1269 struct fuse_ring_ent *ent = NULL;
1270 int err;
1271
1272 err = -EINVAL;
1273 queue = fuse_uring_task_to_queue(ring);
1274 if (!queue)
1275 goto err;
1276
1277 fuse_request_assign_unique(fiq, req);
1278
1279 spin_lock(&queue->lock);
1280 err = -ENOTCONN;
1281 if (unlikely(queue->stopped))
1282 goto err_unlock;
1283
1284 set_bit(FR_URING, &req->flags);
1285 req->ring_queue = queue;
1286 ent = list_first_entry_or_null(&queue->ent_avail_queue,
1287 struct fuse_ring_ent, list);
1288 if (ent)
1289 fuse_uring_add_req_to_ring_ent(ent, req);
1290 else
1291 list_add_tail(&req->list, &queue->fuse_req_queue);
1292 spin_unlock(&queue->lock);
1293
1294 if (ent)
1295 fuse_uring_dispatch_ent(ent);
1296
1297 return;
1298
1299 err_unlock:
1300 spin_unlock(&queue->lock);
1301 err:
1302 req->out.h.error = err;
1303 clear_bit(FR_PENDING, &req->flags);
1304 fuse_request_end(req);
1305 }
1306
fuse_uring_queue_bq_req(struct fuse_req * req)1307 bool fuse_uring_queue_bq_req(struct fuse_req *req)
1308 {
1309 struct fuse_conn *fc = req->fm->fc;
1310 struct fuse_ring *ring = fc->ring;
1311 struct fuse_ring_queue *queue;
1312 struct fuse_ring_ent *ent = NULL;
1313
1314 queue = fuse_uring_task_to_queue(ring);
1315 if (!queue)
1316 return false;
1317
1318 spin_lock(&queue->lock);
1319 if (unlikely(queue->stopped)) {
1320 spin_unlock(&queue->lock);
1321 return false;
1322 }
1323
1324 set_bit(FR_URING, &req->flags);
1325 req->ring_queue = queue;
1326 list_add_tail(&req->list, &queue->fuse_req_bg_queue);
1327
1328 ent = list_first_entry_or_null(&queue->ent_avail_queue,
1329 struct fuse_ring_ent, list);
1330 spin_lock(&fc->bg_lock);
1331 fc->num_background++;
1332 if (fc->num_background == fc->max_background)
1333 fc->blocked = 1;
1334 fuse_uring_flush_bg(queue);
1335 spin_unlock(&fc->bg_lock);
1336
1337 /*
1338 * Due to bg_queue flush limits there might be other bg requests
1339 * in the queue that need to be handled first. Or no further req
1340 * might be available.
1341 */
1342 req = list_first_entry_or_null(&queue->fuse_req_queue, struct fuse_req,
1343 list);
1344 if (ent && req) {
1345 fuse_uring_add_req_to_ring_ent(ent, req);
1346 spin_unlock(&queue->lock);
1347
1348 fuse_uring_dispatch_ent(ent);
1349 } else {
1350 spin_unlock(&queue->lock);
1351 }
1352
1353 return true;
1354 }
1355
fuse_uring_remove_pending_req(struct fuse_req * req)1356 bool fuse_uring_remove_pending_req(struct fuse_req *req)
1357 {
1358 struct fuse_ring_queue *queue = req->ring_queue;
1359
1360 return fuse_remove_pending_req(req, &queue->lock);
1361 }
1362
1363 static const struct fuse_iqueue_ops fuse_io_uring_ops = {
1364 /* should be send over io-uring as enhancement */
1365 .send_forget = fuse_dev_queue_forget,
1366
1367 /*
1368 * could be send over io-uring, but interrupts should be rare,
1369 * no need to make the code complex
1370 */
1371 .send_interrupt = fuse_dev_queue_interrupt,
1372 .send_req = fuse_uring_queue_fuse_req,
1373 };
1374