xref: /linux/fs/fuse/dev_uring.c (revision 9587fde0da0365d300ea1c967e63ad3fe09b883e)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * FUSE: Filesystem in Userspace
4  * Copyright (c) 2023-2024 DataDirect Networks.
5  */
6 
7 #include "fuse_i.h"
8 #include "dev_uring_i.h"
9 #include "fuse_dev_i.h"
10 #include "fuse_trace.h"
11 
12 #include <linux/fs.h>
13 #include <linux/io_uring/cmd.h>
14 
15 static bool __read_mostly enable_uring;
16 module_param(enable_uring, bool, 0644);
17 MODULE_PARM_DESC(enable_uring,
18 		 "Enable userspace communication through io-uring");
19 
20 #define FUSE_URING_IOV_SEGS 2 /* header and payload */
21 
22 
23 bool fuse_uring_enabled(void)
24 {
25 	return enable_uring;
26 }
27 
28 struct fuse_uring_pdu {
29 	struct fuse_ring_ent *ent;
30 };
31 
32 static const struct fuse_iqueue_ops fuse_io_uring_ops;
33 
34 static void uring_cmd_set_ring_ent(struct io_uring_cmd *cmd,
35 				   struct fuse_ring_ent *ring_ent)
36 {
37 	struct fuse_uring_pdu *pdu =
38 		io_uring_cmd_to_pdu(cmd, struct fuse_uring_pdu);
39 
40 	pdu->ent = ring_ent;
41 }
42 
43 static struct fuse_ring_ent *uring_cmd_to_ring_ent(struct io_uring_cmd *cmd)
44 {
45 	struct fuse_uring_pdu *pdu =
46 		io_uring_cmd_to_pdu(cmd, struct fuse_uring_pdu);
47 
48 	return pdu->ent;
49 }
50 
51 static void fuse_uring_flush_bg(struct fuse_ring_queue *queue)
52 {
53 	struct fuse_ring *ring = queue->ring;
54 	struct fuse_conn *fc = ring->fc;
55 
56 	lockdep_assert_held(&queue->lock);
57 	lockdep_assert_held(&fc->bg_lock);
58 
59 	/*
60 	 * Allow one bg request per queue, ignoring global fc limits.
61 	 * This prevents a single queue from consuming all resources and
62 	 * eliminates the need for remote queue wake-ups when global
63 	 * limits are met but this queue has no more waiting requests.
64 	 */
65 	while ((fc->active_background < fc->max_background ||
66 		!queue->active_background) &&
67 	       (!list_empty(&queue->fuse_req_bg_queue))) {
68 		struct fuse_req *req;
69 
70 		req = list_first_entry(&queue->fuse_req_bg_queue,
71 				       struct fuse_req, list);
72 		fc->active_background++;
73 		queue->active_background++;
74 
75 		list_move_tail(&req->list, &queue->fuse_req_queue);
76 	}
77 }
78 
79 static void fuse_uring_req_end(struct fuse_ring_ent *ent, struct fuse_req *req,
80 			       int error)
81 {
82 	struct fuse_ring_queue *queue = ent->queue;
83 	struct fuse_ring *ring = queue->ring;
84 	struct fuse_conn *fc = ring->fc;
85 
86 	lockdep_assert_not_held(&queue->lock);
87 	spin_lock(&queue->lock);
88 	ent->fuse_req = NULL;
89 	list_del_init(&req->list);
90 	if (test_bit(FR_BACKGROUND, &req->flags)) {
91 		queue->active_background--;
92 		spin_lock(&fc->bg_lock);
93 		fuse_uring_flush_bg(queue);
94 		spin_unlock(&fc->bg_lock);
95 	}
96 
97 	spin_unlock(&queue->lock);
98 
99 	if (error)
100 		req->out.h.error = error;
101 
102 	clear_bit(FR_SENT, &req->flags);
103 	fuse_request_end(req);
104 }
105 
106 /* Abort all list queued request on the given ring queue */
107 static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue)
108 {
109 	struct fuse_req *req;
110 	LIST_HEAD(req_list);
111 
112 	spin_lock(&queue->lock);
113 	list_for_each_entry(req, &queue->fuse_req_queue, list)
114 		clear_bit(FR_PENDING, &req->flags);
115 	list_splice_init(&queue->fuse_req_queue, &req_list);
116 	spin_unlock(&queue->lock);
117 
118 	/* must not hold queue lock to avoid order issues with fi->lock */
119 	fuse_dev_end_requests(&req_list);
120 }
121 
122 void fuse_uring_abort_end_requests(struct fuse_ring *ring)
123 {
124 	int qid;
125 	struct fuse_ring_queue *queue;
126 	struct fuse_conn *fc = ring->fc;
127 
128 	for (qid = 0; qid < ring->nr_queues; qid++) {
129 		queue = READ_ONCE(ring->queues[qid]);
130 		if (!queue)
131 			continue;
132 
133 		queue->stopped = true;
134 
135 		WARN_ON_ONCE(ring->fc->max_background != UINT_MAX);
136 		spin_lock(&queue->lock);
137 		spin_lock(&fc->bg_lock);
138 		fuse_uring_flush_bg(queue);
139 		spin_unlock(&fc->bg_lock);
140 		spin_unlock(&queue->lock);
141 		fuse_uring_abort_end_queue_requests(queue);
142 	}
143 }
144 
145 static bool ent_list_request_expired(struct fuse_conn *fc, struct list_head *list)
146 {
147 	struct fuse_ring_ent *ent;
148 	struct fuse_req *req;
149 
150 	ent = list_first_entry_or_null(list, struct fuse_ring_ent, list);
151 	if (!ent)
152 		return false;
153 
154 	req = ent->fuse_req;
155 
156 	return time_is_before_jiffies(req->create_time +
157 				      fc->timeout.req_timeout);
158 }
159 
160 bool fuse_uring_request_expired(struct fuse_conn *fc)
161 {
162 	struct fuse_ring *ring = fc->ring;
163 	struct fuse_ring_queue *queue;
164 	int qid;
165 
166 	if (!ring)
167 		return false;
168 
169 	for (qid = 0; qid < ring->nr_queues; qid++) {
170 		queue = READ_ONCE(ring->queues[qid]);
171 		if (!queue)
172 			continue;
173 
174 		spin_lock(&queue->lock);
175 		if (fuse_request_expired(fc, &queue->fuse_req_queue) ||
176 		    fuse_request_expired(fc, &queue->fuse_req_bg_queue) ||
177 		    ent_list_request_expired(fc, &queue->ent_w_req_queue) ||
178 		    ent_list_request_expired(fc, &queue->ent_in_userspace)) {
179 			spin_unlock(&queue->lock);
180 			return true;
181 		}
182 		spin_unlock(&queue->lock);
183 	}
184 
185 	return false;
186 }
187 
188 void fuse_uring_destruct(struct fuse_conn *fc)
189 {
190 	struct fuse_ring *ring = fc->ring;
191 	int qid;
192 
193 	if (!ring)
194 		return;
195 
196 	for (qid = 0; qid < ring->nr_queues; qid++) {
197 		struct fuse_ring_queue *queue = ring->queues[qid];
198 		struct fuse_ring_ent *ent, *next;
199 
200 		if (!queue)
201 			continue;
202 
203 		WARN_ON(!list_empty(&queue->ent_avail_queue));
204 		WARN_ON(!list_empty(&queue->ent_w_req_queue));
205 		WARN_ON(!list_empty(&queue->ent_commit_queue));
206 		WARN_ON(!list_empty(&queue->ent_in_userspace));
207 
208 		list_for_each_entry_safe(ent, next, &queue->ent_released,
209 					 list) {
210 			list_del_init(&ent->list);
211 			kfree(ent);
212 		}
213 
214 		kfree(queue->fpq.processing);
215 		kfree(queue);
216 		ring->queues[qid] = NULL;
217 	}
218 
219 	kfree(ring->queues);
220 	kfree(ring);
221 	fc->ring = NULL;
222 }
223 
224 /*
225  * Basic ring setup for this connection based on the provided configuration
226  */
227 static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc)
228 {
229 	struct fuse_ring *ring;
230 	size_t nr_queues = num_possible_cpus();
231 	struct fuse_ring *res = NULL;
232 	size_t max_payload_size;
233 
234 	ring = kzalloc_obj(*fc->ring, GFP_KERNEL_ACCOUNT);
235 	if (!ring)
236 		return NULL;
237 
238 	ring->queues = kzalloc_objs(struct fuse_ring_queue *, nr_queues,
239 				    GFP_KERNEL_ACCOUNT);
240 	if (!ring->queues)
241 		goto out_err;
242 
243 	max_payload_size = max(FUSE_MIN_READ_BUFFER, fc->max_write);
244 	max_payload_size = max(max_payload_size, fc->max_pages * PAGE_SIZE);
245 
246 	spin_lock(&fc->lock);
247 	if (fc->ring) {
248 		/* race, another thread created the ring in the meantime */
249 		spin_unlock(&fc->lock);
250 		res = fc->ring;
251 		goto out_err;
252 	}
253 
254 	init_waitqueue_head(&ring->stop_waitq);
255 
256 	ring->nr_queues = nr_queues;
257 	ring->fc = fc;
258 	ring->max_payload_sz = max_payload_size;
259 	smp_store_release(&fc->ring, ring);
260 
261 	spin_unlock(&fc->lock);
262 	return ring;
263 
264 out_err:
265 	kfree(ring->queues);
266 	kfree(ring);
267 	return res;
268 }
269 
270 static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring,
271 						       int qid)
272 {
273 	struct fuse_conn *fc = ring->fc;
274 	struct fuse_ring_queue *queue;
275 	struct list_head *pq;
276 
277 	queue = kzalloc_obj(*queue, GFP_KERNEL_ACCOUNT);
278 	if (!queue)
279 		return NULL;
280 	pq = kzalloc_objs(struct list_head, FUSE_PQ_HASH_SIZE);
281 	if (!pq) {
282 		kfree(queue);
283 		return NULL;
284 	}
285 
286 	queue->qid = qid;
287 	queue->ring = ring;
288 	spin_lock_init(&queue->lock);
289 
290 	INIT_LIST_HEAD(&queue->ent_avail_queue);
291 	INIT_LIST_HEAD(&queue->ent_commit_queue);
292 	INIT_LIST_HEAD(&queue->ent_w_req_queue);
293 	INIT_LIST_HEAD(&queue->ent_in_userspace);
294 	INIT_LIST_HEAD(&queue->fuse_req_queue);
295 	INIT_LIST_HEAD(&queue->fuse_req_bg_queue);
296 	INIT_LIST_HEAD(&queue->ent_released);
297 
298 	queue->fpq.processing = pq;
299 	fuse_pqueue_init(&queue->fpq);
300 
301 	spin_lock(&fc->lock);
302 	if (ring->queues[qid]) {
303 		spin_unlock(&fc->lock);
304 		kfree(queue->fpq.processing);
305 		kfree(queue);
306 		return ring->queues[qid];
307 	}
308 
309 	/*
310 	 * write_once and lock as the caller mostly doesn't take the lock at all
311 	 */
312 	WRITE_ONCE(ring->queues[qid], queue);
313 	spin_unlock(&fc->lock);
314 
315 	return queue;
316 }
317 
318 static void fuse_uring_stop_fuse_req_end(struct fuse_req *req)
319 {
320 	clear_bit(FR_SENT, &req->flags);
321 	req->out.h.error = -ECONNABORTED;
322 	fuse_request_end(req);
323 }
324 
325 /*
326  * Release a request/entry on connection tear down
327  */
328 static void fuse_uring_entry_teardown(struct fuse_ring_ent *ent)
329 {
330 	struct fuse_req *req;
331 	struct io_uring_cmd *cmd;
332 
333 	struct fuse_ring_queue *queue = ent->queue;
334 
335 	spin_lock(&queue->lock);
336 	cmd = ent->cmd;
337 	ent->cmd = NULL;
338 	req = ent->fuse_req;
339 	ent->fuse_req = NULL;
340 	if (req) {
341 		/* remove entry from queue->fpq->processing */
342 		list_del_init(&req->list);
343 	}
344 
345 	/*
346 	 * The entry must not be freed immediately, due to access of direct
347 	 * pointer access of entries through IO_URING_F_CANCEL - there is a risk
348 	 * of race between daemon termination (which triggers IO_URING_F_CANCEL
349 	 * and accesses entries without checking the list state first
350 	 */
351 	list_move(&ent->list, &queue->ent_released);
352 	ent->state = FRRS_RELEASED;
353 	spin_unlock(&queue->lock);
354 
355 	if (cmd)
356 		io_uring_cmd_done(cmd, -ENOTCONN, IO_URING_F_UNLOCKED);
357 
358 	if (req)
359 		fuse_uring_stop_fuse_req_end(req);
360 }
361 
362 static void fuse_uring_stop_list_entries(struct list_head *head,
363 					 struct fuse_ring_queue *queue,
364 					 enum fuse_ring_req_state exp_state)
365 {
366 	struct fuse_ring *ring = queue->ring;
367 	struct fuse_ring_ent *ent, *next;
368 	ssize_t queue_refs = SSIZE_MAX;
369 	LIST_HEAD(to_teardown);
370 
371 	spin_lock(&queue->lock);
372 	list_for_each_entry_safe(ent, next, head, list) {
373 		if (ent->state != exp_state) {
374 			pr_warn("entry teardown qid=%d state=%d expected=%d",
375 				queue->qid, ent->state, exp_state);
376 			continue;
377 		}
378 
379 		ent->state = FRRS_TEARDOWN;
380 		list_move(&ent->list, &to_teardown);
381 	}
382 	spin_unlock(&queue->lock);
383 
384 	/* no queue lock to avoid lock order issues */
385 	list_for_each_entry_safe(ent, next, &to_teardown, list) {
386 		fuse_uring_entry_teardown(ent);
387 		queue_refs = atomic_dec_return(&ring->queue_refs);
388 		WARN_ON_ONCE(queue_refs < 0);
389 	}
390 }
391 
392 static void fuse_uring_teardown_entries(struct fuse_ring_queue *queue)
393 {
394 	fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue,
395 				     FRRS_USERSPACE);
396 	fuse_uring_stop_list_entries(&queue->ent_avail_queue, queue,
397 				     FRRS_AVAILABLE);
398 }
399 
400 static void fuse_uring_teardown_all_queues(struct fuse_ring *ring)
401 {
402 	int qid;
403 
404 	for (qid = 0; qid < ring->nr_queues; qid++) {
405 		struct fuse_ring_queue *queue = READ_ONCE(ring->queues[qid]);
406 
407 		if (!queue)
408 			continue;
409 
410 		fuse_uring_teardown_entries(queue);
411 	}
412 }
413 
414 /*
415  * Log state debug info
416  */
417 static void fuse_uring_log_ent_state(struct fuse_ring *ring)
418 {
419 	int qid;
420 	struct fuse_ring_ent *ent;
421 
422 	for (qid = 0; qid < ring->nr_queues; qid++) {
423 		struct fuse_ring_queue *queue = ring->queues[qid];
424 
425 		if (!queue)
426 			continue;
427 
428 		spin_lock(&queue->lock);
429 		/*
430 		 * Log entries from the intermediate queue, the other queues
431 		 * should be empty
432 		 */
433 		list_for_each_entry(ent, &queue->ent_w_req_queue, list) {
434 			pr_info(" ent-req-queue ring=%p qid=%d ent=%p state=%d\n",
435 				ring, qid, ent, ent->state);
436 		}
437 		list_for_each_entry(ent, &queue->ent_commit_queue, list) {
438 			pr_info(" ent-commit-queue ring=%p qid=%d ent=%p state=%d\n",
439 				ring, qid, ent, ent->state);
440 		}
441 		spin_unlock(&queue->lock);
442 	}
443 	ring->stop_debug_log = 1;
444 }
445 
446 static void fuse_uring_async_stop_queues(struct work_struct *work)
447 {
448 	struct fuse_ring *ring =
449 		container_of(work, struct fuse_ring, async_teardown_work.work);
450 
451 	fuse_uring_teardown_all_queues(ring);
452 
453 	/*
454 	 * Some ring entries might be in the middle of IO operations,
455 	 * i.e. in process to get handled by file_operations::uring_cmd
456 	 * or on the way to userspace - we could handle that with conditions in
457 	 * run time code, but easier/cleaner to have an async tear down handler
458 	 * If there are still queue references left
459 	 */
460 	if (atomic_read(&ring->queue_refs) > 0) {
461 		if (time_after(jiffies,
462 			       ring->teardown_time + FUSE_URING_TEARDOWN_TIMEOUT))
463 			fuse_uring_log_ent_state(ring);
464 
465 		schedule_delayed_work(&ring->async_teardown_work,
466 				      FUSE_URING_TEARDOWN_INTERVAL);
467 	} else {
468 		wake_up_all(&ring->stop_waitq);
469 	}
470 }
471 
472 /*
473  * Stop the ring queues
474  */
475 void fuse_uring_stop_queues(struct fuse_ring *ring)
476 {
477 	fuse_uring_teardown_all_queues(ring);
478 
479 	if (atomic_read(&ring->queue_refs) > 0) {
480 		ring->teardown_time = jiffies;
481 		INIT_DELAYED_WORK(&ring->async_teardown_work,
482 				  fuse_uring_async_stop_queues);
483 		schedule_delayed_work(&ring->async_teardown_work,
484 				      FUSE_URING_TEARDOWN_INTERVAL);
485 	} else {
486 		wake_up_all(&ring->stop_waitq);
487 	}
488 }
489 
490 /*
491  * Handle IO_URING_F_CANCEL, typically should come on daemon termination.
492  *
493  * Releasing the last entry should trigger fuse_dev_release() if
494  * the daemon was terminated
495  */
496 static void fuse_uring_cancel(struct io_uring_cmd *cmd,
497 			      unsigned int issue_flags)
498 {
499 	struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd);
500 	struct fuse_ring_queue *queue;
501 	bool need_cmd_done = false;
502 
503 	/*
504 	 * direct access on ent - it must not be destructed as long as
505 	 * IO_URING_F_CANCEL might come up
506 	 */
507 	queue = ent->queue;
508 	spin_lock(&queue->lock);
509 	if (ent->state == FRRS_AVAILABLE) {
510 		ent->state = FRRS_USERSPACE;
511 		list_move_tail(&ent->list, &queue->ent_in_userspace);
512 		need_cmd_done = true;
513 		ent->cmd = NULL;
514 	}
515 	spin_unlock(&queue->lock);
516 
517 	if (need_cmd_done) {
518 		/* no queue lock to avoid lock order issues */
519 		io_uring_cmd_done(cmd, -ENOTCONN, issue_flags);
520 	}
521 }
522 
523 static void fuse_uring_prepare_cancel(struct io_uring_cmd *cmd, int issue_flags,
524 				      struct fuse_ring_ent *ring_ent)
525 {
526 	uring_cmd_set_ring_ent(cmd, ring_ent);
527 	io_uring_cmd_mark_cancelable(cmd, issue_flags);
528 }
529 
530 /*
531  * Checks for errors and stores it into the request
532  */
533 static int fuse_uring_out_header_has_err(struct fuse_out_header *oh,
534 					 struct fuse_req *req,
535 					 struct fuse_conn *fc)
536 {
537 	int err;
538 
539 	err = -EINVAL;
540 	if (oh->unique == 0) {
541 		/* Not supported through io-uring yet */
542 		pr_warn_once("notify through fuse-io-uring not supported\n");
543 		goto err;
544 	}
545 
546 	if (oh->error <= -ERESTARTSYS || oh->error > 0)
547 		goto err;
548 
549 	if (oh->error) {
550 		err = oh->error;
551 		goto err;
552 	}
553 
554 	err = -ENOENT;
555 	if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) {
556 		pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n",
557 				    req->in.h.unique,
558 				    oh->unique & ~FUSE_INT_REQ_BIT);
559 		goto err;
560 	}
561 
562 	/*
563 	 * Is it an interrupt reply ID?
564 	 * XXX: Not supported through fuse-io-uring yet, it should not even
565 	 *      find the request - should not happen.
566 	 */
567 	WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT);
568 
569 	err = 0;
570 err:
571 	return err;
572 }
573 
574 static int fuse_uring_copy_from_ring(struct fuse_ring *ring,
575 				     struct fuse_req *req,
576 				     struct fuse_ring_ent *ent)
577 {
578 	struct fuse_copy_state cs;
579 	struct fuse_args *args = req->args;
580 	struct iov_iter iter;
581 	int err;
582 	struct fuse_uring_ent_in_out ring_in_out;
583 
584 	err = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out,
585 			     sizeof(ring_in_out));
586 	if (err)
587 		return -EFAULT;
588 
589 	err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz,
590 			  &iter);
591 	if (err)
592 		return err;
593 
594 	fuse_copy_init(&cs, false, &iter);
595 	cs.is_uring = true;
596 	cs.req = req;
597 
598 	err = fuse_copy_out_args(&cs, args, ring_in_out.payload_sz);
599 	fuse_copy_finish(&cs);
600 	return err;
601 }
602 
603 /*
604  * Copy data from the req to the ring buffer
605  */
606 static int fuse_uring_args_to_ring(struct fuse_ring *ring, struct fuse_req *req,
607 				   struct fuse_ring_ent *ent)
608 {
609 	struct fuse_copy_state cs;
610 	struct fuse_args *args = req->args;
611 	struct fuse_in_arg *in_args = args->in_args;
612 	int num_args = args->in_numargs;
613 	int err;
614 	struct iov_iter iter;
615 	struct fuse_uring_ent_in_out ent_in_out = {
616 		.flags = 0,
617 		.commit_id = req->in.h.unique,
618 	};
619 
620 	err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter);
621 	if (err) {
622 		pr_info_ratelimited("fuse: Import of user buffer failed\n");
623 		return err;
624 	}
625 
626 	fuse_copy_init(&cs, true, &iter);
627 	cs.is_uring = true;
628 	cs.req = req;
629 
630 	if (num_args > 0) {
631 		/*
632 		 * Expectation is that the first argument is the per op header.
633 		 * Some op code have that as zero size.
634 		 */
635 		if (args->in_args[0].size > 0) {
636 			err = copy_to_user(&ent->headers->op_in, in_args->value,
637 					   in_args->size);
638 			if (err) {
639 				pr_info_ratelimited(
640 					"Copying the header failed.\n");
641 				return -EFAULT;
642 			}
643 		}
644 		in_args++;
645 		num_args--;
646 	}
647 
648 	/* copy the payload */
649 	err = fuse_copy_args(&cs, num_args, args->in_pages,
650 			     (struct fuse_arg *)in_args, 0);
651 	fuse_copy_finish(&cs);
652 	if (err) {
653 		pr_info_ratelimited("%s fuse_copy_args failed\n", __func__);
654 		return err;
655 	}
656 
657 	ent_in_out.payload_sz = cs.ring.copied_sz;
658 	err = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out,
659 			   sizeof(ent_in_out));
660 	return err ? -EFAULT : 0;
661 }
662 
663 static int fuse_uring_copy_to_ring(struct fuse_ring_ent *ent,
664 				   struct fuse_req *req)
665 {
666 	struct fuse_ring_queue *queue = ent->queue;
667 	struct fuse_ring *ring = queue->ring;
668 	int err;
669 
670 	err = -EIO;
671 	if (WARN_ON(ent->state != FRRS_FUSE_REQ)) {
672 		pr_err("qid=%d ring-req=%p invalid state %d on send\n",
673 		       queue->qid, ent, ent->state);
674 		return err;
675 	}
676 
677 	err = -EINVAL;
678 	if (WARN_ON(req->in.h.unique == 0))
679 		return err;
680 
681 	/* copy the request */
682 	err = fuse_uring_args_to_ring(ring, req, ent);
683 	if (unlikely(err)) {
684 		pr_info_ratelimited("Copy to ring failed: %d\n", err);
685 		return err;
686 	}
687 
688 	/* copy fuse_in_header */
689 	err = copy_to_user(&ent->headers->in_out, &req->in.h,
690 			   sizeof(req->in.h));
691 	if (err) {
692 		err = -EFAULT;
693 		return err;
694 	}
695 
696 	return 0;
697 }
698 
699 static int fuse_uring_prepare_send(struct fuse_ring_ent *ent,
700 				   struct fuse_req *req)
701 {
702 	int err;
703 
704 	err = fuse_uring_copy_to_ring(ent, req);
705 	if (!err)
706 		set_bit(FR_SENT, &req->flags);
707 	else
708 		fuse_uring_req_end(ent, req, err);
709 
710 	return err;
711 }
712 
713 /*
714  * Write data to the ring buffer and send the request to userspace,
715  * userspace will read it
716  * This is comparable with classical read(/dev/fuse)
717  */
718 static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ent,
719 					struct fuse_req *req,
720 					unsigned int issue_flags)
721 {
722 	struct fuse_ring_queue *queue = ent->queue;
723 	int err;
724 	struct io_uring_cmd *cmd;
725 
726 	err = fuse_uring_prepare_send(ent, req);
727 	if (err)
728 		return err;
729 
730 	spin_lock(&queue->lock);
731 	cmd = ent->cmd;
732 	ent->cmd = NULL;
733 	ent->state = FRRS_USERSPACE;
734 	list_move_tail(&ent->list, &queue->ent_in_userspace);
735 	spin_unlock(&queue->lock);
736 
737 	io_uring_cmd_done(cmd, 0, issue_flags);
738 	return 0;
739 }
740 
741 /*
742  * Make a ring entry available for fuse_req assignment
743  */
744 static void fuse_uring_ent_avail(struct fuse_ring_ent *ent,
745 				 struct fuse_ring_queue *queue)
746 {
747 	WARN_ON_ONCE(!ent->cmd);
748 	list_move(&ent->list, &queue->ent_avail_queue);
749 	ent->state = FRRS_AVAILABLE;
750 }
751 
752 /* Used to find the request on SQE commit */
753 static void fuse_uring_add_to_pq(struct fuse_ring_ent *ent,
754 				 struct fuse_req *req)
755 {
756 	struct fuse_ring_queue *queue = ent->queue;
757 	struct fuse_pqueue *fpq = &queue->fpq;
758 	unsigned int hash;
759 
760 	req->ring_entry = ent;
761 	hash = fuse_req_hash(req->in.h.unique);
762 	list_move_tail(&req->list, &fpq->processing[hash]);
763 }
764 
765 /*
766  * Assign a fuse queue entry to the given entry
767  */
768 static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ent,
769 					   struct fuse_req *req)
770 {
771 	struct fuse_ring_queue *queue = ent->queue;
772 
773 	lockdep_assert_held(&queue->lock);
774 
775 	if (WARN_ON_ONCE(ent->state != FRRS_AVAILABLE &&
776 			 ent->state != FRRS_COMMIT)) {
777 		pr_warn("%s qid=%d state=%d\n", __func__, ent->queue->qid,
778 			ent->state);
779 	}
780 
781 	clear_bit(FR_PENDING, &req->flags);
782 	ent->fuse_req = req;
783 	ent->state = FRRS_FUSE_REQ;
784 	list_move_tail(&ent->list, &queue->ent_w_req_queue);
785 	fuse_uring_add_to_pq(ent, req);
786 }
787 
788 /* Fetch the next fuse request if available */
789 static struct fuse_req *fuse_uring_ent_assign_req(struct fuse_ring_ent *ent)
790 	__must_hold(&queue->lock)
791 {
792 	struct fuse_req *req;
793 	struct fuse_ring_queue *queue = ent->queue;
794 	struct list_head *req_queue = &queue->fuse_req_queue;
795 
796 	lockdep_assert_held(&queue->lock);
797 
798 	/* get and assign the next entry while it is still holding the lock */
799 	req = list_first_entry_or_null(req_queue, struct fuse_req, list);
800 	if (req)
801 		fuse_uring_add_req_to_ring_ent(ent, req);
802 
803 	return req;
804 }
805 
806 /*
807  * Read data from the ring buffer, which user space has written to
808  * This is comparible with handling of classical write(/dev/fuse).
809  * Also make the ring request available again for new fuse requests.
810  */
811 static void fuse_uring_commit(struct fuse_ring_ent *ent, struct fuse_req *req,
812 			      unsigned int issue_flags)
813 {
814 	struct fuse_ring *ring = ent->queue->ring;
815 	struct fuse_conn *fc = ring->fc;
816 	ssize_t err = 0;
817 
818 	err = copy_from_user(&req->out.h, &ent->headers->in_out,
819 			     sizeof(req->out.h));
820 	if (err) {
821 		req->out.h.error = -EFAULT;
822 		goto out;
823 	}
824 
825 	err = fuse_uring_out_header_has_err(&req->out.h, req, fc);
826 	if (err) {
827 		/* req->out.h.error already set */
828 		goto out;
829 	}
830 
831 	err = fuse_uring_copy_from_ring(ring, req, ent);
832 out:
833 	fuse_uring_req_end(ent, req, err);
834 }
835 
836 /*
837  * Get the next fuse req and send it
838  */
839 static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ent,
840 				     struct fuse_ring_queue *queue,
841 				     unsigned int issue_flags)
842 {
843 	int err;
844 	struct fuse_req *req;
845 
846 retry:
847 	spin_lock(&queue->lock);
848 	fuse_uring_ent_avail(ent, queue);
849 	req = fuse_uring_ent_assign_req(ent);
850 	spin_unlock(&queue->lock);
851 
852 	if (req) {
853 		err = fuse_uring_send_next_to_ring(ent, req, issue_flags);
854 		if (err)
855 			goto retry;
856 	}
857 }
858 
859 static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent)
860 {
861 	struct fuse_ring_queue *queue = ent->queue;
862 
863 	lockdep_assert_held(&queue->lock);
864 
865 	if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE))
866 		return -EIO;
867 
868 	ent->state = FRRS_COMMIT;
869 	list_move(&ent->list, &queue->ent_commit_queue);
870 
871 	return 0;
872 }
873 
874 /* FUSE_URING_CMD_COMMIT_AND_FETCH handler */
875 static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags,
876 				   struct fuse_conn *fc)
877 {
878 	const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe128_cmd(cmd->sqe,
879 								       struct fuse_uring_cmd_req);
880 	struct fuse_ring_ent *ent;
881 	int err;
882 	struct fuse_ring *ring = fc->ring;
883 	struct fuse_ring_queue *queue;
884 	uint64_t commit_id = READ_ONCE(cmd_req->commit_id);
885 	unsigned int qid = READ_ONCE(cmd_req->qid);
886 	struct fuse_pqueue *fpq;
887 	struct fuse_req *req;
888 
889 	err = -ENOTCONN;
890 	if (!ring)
891 		return err;
892 
893 	if (qid >= ring->nr_queues)
894 		return -EINVAL;
895 
896 	queue = ring->queues[qid];
897 	if (!queue)
898 		return err;
899 	fpq = &queue->fpq;
900 
901 	if (!READ_ONCE(fc->connected) || READ_ONCE(queue->stopped))
902 		return err;
903 
904 	spin_lock(&queue->lock);
905 	/* Find a request based on the unique ID of the fuse request
906 	 * This should get revised, as it needs a hash calculation and list
907 	 * search. And full struct fuse_pqueue is needed (memory overhead).
908 	 * As well as the link from req to ring_ent.
909 	 */
910 	req = fuse_request_find(fpq, commit_id);
911 	err = -ENOENT;
912 	if (!req) {
913 		pr_info("qid=%d commit_id %llu not found\n", queue->qid,
914 			commit_id);
915 		spin_unlock(&queue->lock);
916 		return err;
917 	}
918 	list_del_init(&req->list);
919 	ent = req->ring_entry;
920 	req->ring_entry = NULL;
921 
922 	err = fuse_ring_ent_set_commit(ent);
923 	if (err != 0) {
924 		pr_info_ratelimited("qid=%d commit_id %llu state %d",
925 				    queue->qid, commit_id, ent->state);
926 		spin_unlock(&queue->lock);
927 		req->out.h.error = err;
928 		clear_bit(FR_SENT, &req->flags);
929 		fuse_request_end(req);
930 		return err;
931 	}
932 
933 	ent->cmd = cmd;
934 	spin_unlock(&queue->lock);
935 
936 	/* without the queue lock, as other locks are taken */
937 	fuse_uring_prepare_cancel(cmd, issue_flags, ent);
938 	fuse_uring_commit(ent, req, issue_flags);
939 
940 	/*
941 	 * Fetching the next request is absolutely required as queued
942 	 * fuse requests would otherwise not get processed - committing
943 	 * and fetching is done in one step vs legacy fuse, which has separated
944 	 * read (fetch request) and write (commit result).
945 	 */
946 	fuse_uring_next_fuse_req(ent, queue, issue_flags);
947 	return 0;
948 }
949 
950 static bool is_ring_ready(struct fuse_ring *ring, int current_qid)
951 {
952 	int qid;
953 	struct fuse_ring_queue *queue;
954 	bool ready = true;
955 
956 	for (qid = 0; qid < ring->nr_queues && ready; qid++) {
957 		if (current_qid == qid)
958 			continue;
959 
960 		queue = ring->queues[qid];
961 		if (!queue) {
962 			ready = false;
963 			break;
964 		}
965 
966 		spin_lock(&queue->lock);
967 		if (list_empty(&queue->ent_avail_queue))
968 			ready = false;
969 		spin_unlock(&queue->lock);
970 	}
971 
972 	return ready;
973 }
974 
975 /*
976  * fuse_uring_req_fetch command handling
977  */
978 static void fuse_uring_do_register(struct fuse_ring_ent *ent,
979 				   struct io_uring_cmd *cmd,
980 				   unsigned int issue_flags)
981 {
982 	struct fuse_ring_queue *queue = ent->queue;
983 	struct fuse_ring *ring = queue->ring;
984 	struct fuse_conn *fc = ring->fc;
985 	struct fuse_iqueue *fiq = &fc->iq;
986 
987 	fuse_uring_prepare_cancel(cmd, issue_flags, ent);
988 
989 	spin_lock(&queue->lock);
990 	ent->cmd = cmd;
991 	fuse_uring_ent_avail(ent, queue);
992 	spin_unlock(&queue->lock);
993 
994 	if (!ring->ready) {
995 		bool ready = is_ring_ready(ring, queue->qid);
996 
997 		if (ready) {
998 			WRITE_ONCE(fiq->ops, &fuse_io_uring_ops);
999 			WRITE_ONCE(ring->ready, true);
1000 			wake_up_all(&fc->blocked_waitq);
1001 		}
1002 	}
1003 }
1004 
1005 /*
1006  * sqe->addr is a ptr to an iovec array, iov[0] has the headers, iov[1]
1007  * the payload
1008  */
1009 static int fuse_uring_get_iovec_from_sqe(const struct io_uring_sqe *sqe,
1010 					 struct iovec iov[FUSE_URING_IOV_SEGS])
1011 {
1012 	struct iovec __user *uiov = u64_to_user_ptr(READ_ONCE(sqe->addr));
1013 	struct iov_iter iter;
1014 	ssize_t ret;
1015 
1016 	if (sqe->len != FUSE_URING_IOV_SEGS)
1017 		return -EINVAL;
1018 
1019 	/*
1020 	 * Direction for buffer access will actually be READ and WRITE,
1021 	 * using write for the import should include READ access as well.
1022 	 */
1023 	ret = import_iovec(WRITE, uiov, FUSE_URING_IOV_SEGS,
1024 			   FUSE_URING_IOV_SEGS, &iov, &iter);
1025 	if (ret < 0)
1026 		return ret;
1027 
1028 	return 0;
1029 }
1030 
1031 static struct fuse_ring_ent *
1032 fuse_uring_create_ring_ent(struct io_uring_cmd *cmd,
1033 			   struct fuse_ring_queue *queue)
1034 {
1035 	struct fuse_ring *ring = queue->ring;
1036 	struct fuse_ring_ent *ent;
1037 	size_t payload_size;
1038 	struct iovec iov[FUSE_URING_IOV_SEGS];
1039 	int err;
1040 
1041 	err = fuse_uring_get_iovec_from_sqe(cmd->sqe, iov);
1042 	if (err) {
1043 		pr_info_ratelimited("Failed to get iovec from sqe, err=%d\n",
1044 				    err);
1045 		return ERR_PTR(err);
1046 	}
1047 
1048 	err = -EINVAL;
1049 	if (iov[0].iov_len < sizeof(struct fuse_uring_req_header)) {
1050 		pr_info_ratelimited("Invalid header len %zu\n", iov[0].iov_len);
1051 		return ERR_PTR(err);
1052 	}
1053 
1054 	payload_size = iov[1].iov_len;
1055 	if (payload_size < ring->max_payload_sz) {
1056 		pr_info_ratelimited("Invalid req payload len %zu\n",
1057 				    payload_size);
1058 		return ERR_PTR(err);
1059 	}
1060 
1061 	err = -ENOMEM;
1062 	ent = kzalloc_obj(*ent, GFP_KERNEL_ACCOUNT);
1063 	if (!ent)
1064 		return ERR_PTR(err);
1065 
1066 	INIT_LIST_HEAD(&ent->list);
1067 
1068 	ent->queue = queue;
1069 	ent->headers = iov[0].iov_base;
1070 	ent->payload = iov[1].iov_base;
1071 
1072 	atomic_inc(&ring->queue_refs);
1073 	return ent;
1074 }
1075 
1076 /*
1077  * Register header and payload buffer with the kernel and puts the
1078  * entry as "ready to get fuse requests" on the queue
1079  */
1080 static int fuse_uring_register(struct io_uring_cmd *cmd,
1081 			       unsigned int issue_flags, struct fuse_conn *fc)
1082 {
1083 	const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe128_cmd(cmd->sqe,
1084 								       struct fuse_uring_cmd_req);
1085 	struct fuse_ring *ring = smp_load_acquire(&fc->ring);
1086 	struct fuse_ring_queue *queue;
1087 	struct fuse_ring_ent *ent;
1088 	int err;
1089 	unsigned int qid = READ_ONCE(cmd_req->qid);
1090 
1091 	err = -ENOMEM;
1092 	if (!ring) {
1093 		ring = fuse_uring_create(fc);
1094 		if (!ring)
1095 			return err;
1096 	}
1097 
1098 	if (qid >= ring->nr_queues) {
1099 		pr_info_ratelimited("fuse: Invalid ring qid %u\n", qid);
1100 		return -EINVAL;
1101 	}
1102 
1103 	queue = ring->queues[qid];
1104 	if (!queue) {
1105 		queue = fuse_uring_create_queue(ring, qid);
1106 		if (!queue)
1107 			return err;
1108 	}
1109 
1110 	/*
1111 	 * The created queue above does not need to be destructed in
1112 	 * case of entry errors below, will be done at ring destruction time.
1113 	 */
1114 
1115 	ent = fuse_uring_create_ring_ent(cmd, queue);
1116 	if (IS_ERR(ent))
1117 		return PTR_ERR(ent);
1118 
1119 	fuse_uring_do_register(ent, cmd, issue_flags);
1120 
1121 	return 0;
1122 }
1123 
1124 /*
1125  * Entry function from io_uring to handle the given passthrough command
1126  * (op code IORING_OP_URING_CMD)
1127  */
1128 int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
1129 {
1130 	struct fuse_dev *fud;
1131 	struct fuse_conn *fc;
1132 	u32 cmd_op = cmd->cmd_op;
1133 	int err;
1134 
1135 	if ((unlikely(issue_flags & IO_URING_F_CANCEL))) {
1136 		fuse_uring_cancel(cmd, issue_flags);
1137 		return 0;
1138 	}
1139 
1140 	/* This extra SQE size holds struct fuse_uring_cmd_req */
1141 	if (!(issue_flags & IO_URING_F_SQE128))
1142 		return -EINVAL;
1143 
1144 	fud = fuse_get_dev(cmd->file);
1145 	if (IS_ERR(fud)) {
1146 		pr_info_ratelimited("No fuse device found\n");
1147 		return PTR_ERR(fud);
1148 	}
1149 	fc = fud->fc;
1150 
1151 	/* Once a connection has io-uring enabled on it, it can't be disabled */
1152 	if (!enable_uring && !fc->io_uring) {
1153 		pr_info_ratelimited("fuse-io-uring is disabled\n");
1154 		return -EOPNOTSUPP;
1155 	}
1156 
1157 	if (fc->aborted)
1158 		return -ECONNABORTED;
1159 	if (!fc->connected)
1160 		return -ENOTCONN;
1161 
1162 	/*
1163 	 * fuse_uring_register() needs the ring to be initialized,
1164 	 * we need to know the max payload size
1165 	 */
1166 	if (!fc->initialized)
1167 		return -EAGAIN;
1168 
1169 	switch (cmd_op) {
1170 	case FUSE_IO_URING_CMD_REGISTER:
1171 		err = fuse_uring_register(cmd, issue_flags, fc);
1172 		if (err) {
1173 			pr_info_once("FUSE_IO_URING_CMD_REGISTER failed err=%d\n",
1174 				     err);
1175 			fc->io_uring = 0;
1176 			wake_up_all(&fc->blocked_waitq);
1177 			return err;
1178 		}
1179 		break;
1180 	case FUSE_IO_URING_CMD_COMMIT_AND_FETCH:
1181 		err = fuse_uring_commit_fetch(cmd, issue_flags, fc);
1182 		if (err) {
1183 			pr_info_once("FUSE_IO_URING_COMMIT_AND_FETCH failed err=%d\n",
1184 				     err);
1185 			return err;
1186 		}
1187 		break;
1188 	default:
1189 		return -EINVAL;
1190 	}
1191 
1192 	return -EIOCBQUEUED;
1193 }
1194 
1195 static void fuse_uring_send(struct fuse_ring_ent *ent, struct io_uring_cmd *cmd,
1196 			    ssize_t ret, unsigned int issue_flags)
1197 {
1198 	struct fuse_ring_queue *queue = ent->queue;
1199 
1200 	spin_lock(&queue->lock);
1201 	ent->state = FRRS_USERSPACE;
1202 	list_move_tail(&ent->list, &queue->ent_in_userspace);
1203 	ent->cmd = NULL;
1204 	spin_unlock(&queue->lock);
1205 
1206 	io_uring_cmd_done(cmd, ret, issue_flags);
1207 }
1208 
1209 /*
1210  * This prepares and sends the ring request in fuse-uring task context.
1211  * User buffers are not mapped yet - the application does not have permission
1212  * to write to it - this has to be executed in ring task context.
1213  */
1214 static void fuse_uring_send_in_task(struct io_tw_req tw_req, io_tw_token_t tw)
1215 {
1216 	unsigned int issue_flags = IO_URING_CMD_TASK_WORK_ISSUE_FLAGS;
1217 	struct io_uring_cmd *cmd = io_uring_cmd_from_tw(tw_req);
1218 	struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd);
1219 	struct fuse_ring_queue *queue = ent->queue;
1220 	int err;
1221 
1222 	if (!tw.cancel) {
1223 		err = fuse_uring_prepare_send(ent, ent->fuse_req);
1224 		if (err) {
1225 			fuse_uring_next_fuse_req(ent, queue, issue_flags);
1226 			return;
1227 		}
1228 	} else {
1229 		err = -ECANCELED;
1230 	}
1231 
1232 	fuse_uring_send(ent, cmd, err, issue_flags);
1233 }
1234 
1235 static struct fuse_ring_queue *fuse_uring_task_to_queue(struct fuse_ring *ring)
1236 {
1237 	unsigned int qid;
1238 	struct fuse_ring_queue *queue;
1239 
1240 	qid = task_cpu(current);
1241 
1242 	if (WARN_ONCE(qid >= ring->nr_queues,
1243 		      "Core number (%u) exceeds nr queues (%zu)\n", qid,
1244 		      ring->nr_queues))
1245 		qid = 0;
1246 
1247 	queue = ring->queues[qid];
1248 	WARN_ONCE(!queue, "Missing queue for qid %d\n", qid);
1249 
1250 	return queue;
1251 }
1252 
1253 static void fuse_uring_dispatch_ent(struct fuse_ring_ent *ent)
1254 {
1255 	struct io_uring_cmd *cmd = ent->cmd;
1256 
1257 	uring_cmd_set_ring_ent(cmd, ent);
1258 	io_uring_cmd_complete_in_task(cmd, fuse_uring_send_in_task);
1259 }
1260 
1261 /* queue a fuse request and send it if a ring entry is available */
1262 void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req)
1263 {
1264 	struct fuse_conn *fc = req->fm->fc;
1265 	struct fuse_ring *ring = fc->ring;
1266 	struct fuse_ring_queue *queue;
1267 	struct fuse_ring_ent *ent = NULL;
1268 	int err;
1269 
1270 	err = -EINVAL;
1271 	queue = fuse_uring_task_to_queue(ring);
1272 	if (!queue)
1273 		goto err;
1274 
1275 	fuse_request_assign_unique(fiq, req);
1276 
1277 	spin_lock(&queue->lock);
1278 	err = -ENOTCONN;
1279 	if (unlikely(queue->stopped))
1280 		goto err_unlock;
1281 
1282 	set_bit(FR_URING, &req->flags);
1283 	req->ring_queue = queue;
1284 	ent = list_first_entry_or_null(&queue->ent_avail_queue,
1285 				       struct fuse_ring_ent, list);
1286 	if (ent)
1287 		fuse_uring_add_req_to_ring_ent(ent, req);
1288 	else
1289 		list_add_tail(&req->list, &queue->fuse_req_queue);
1290 	spin_unlock(&queue->lock);
1291 
1292 	if (ent)
1293 		fuse_uring_dispatch_ent(ent);
1294 
1295 	return;
1296 
1297 err_unlock:
1298 	spin_unlock(&queue->lock);
1299 err:
1300 	req->out.h.error = err;
1301 	clear_bit(FR_PENDING, &req->flags);
1302 	fuse_request_end(req);
1303 }
1304 
1305 bool fuse_uring_queue_bq_req(struct fuse_req *req)
1306 {
1307 	struct fuse_conn *fc = req->fm->fc;
1308 	struct fuse_ring *ring = fc->ring;
1309 	struct fuse_ring_queue *queue;
1310 	struct fuse_ring_ent *ent = NULL;
1311 
1312 	queue = fuse_uring_task_to_queue(ring);
1313 	if (!queue)
1314 		return false;
1315 
1316 	spin_lock(&queue->lock);
1317 	if (unlikely(queue->stopped)) {
1318 		spin_unlock(&queue->lock);
1319 		return false;
1320 	}
1321 
1322 	set_bit(FR_URING, &req->flags);
1323 	req->ring_queue = queue;
1324 	list_add_tail(&req->list, &queue->fuse_req_bg_queue);
1325 
1326 	ent = list_first_entry_or_null(&queue->ent_avail_queue,
1327 				       struct fuse_ring_ent, list);
1328 	spin_lock(&fc->bg_lock);
1329 	fc->num_background++;
1330 	if (fc->num_background == fc->max_background)
1331 		fc->blocked = 1;
1332 	fuse_uring_flush_bg(queue);
1333 	spin_unlock(&fc->bg_lock);
1334 
1335 	/*
1336 	 * Due to bg_queue flush limits there might be other bg requests
1337 	 * in the queue that need to be handled first. Or no further req
1338 	 * might be available.
1339 	 */
1340 	req = list_first_entry_or_null(&queue->fuse_req_queue, struct fuse_req,
1341 				       list);
1342 	if (ent && req) {
1343 		fuse_uring_add_req_to_ring_ent(ent, req);
1344 		spin_unlock(&queue->lock);
1345 
1346 		fuse_uring_dispatch_ent(ent);
1347 	} else {
1348 		spin_unlock(&queue->lock);
1349 	}
1350 
1351 	return true;
1352 }
1353 
1354 bool fuse_uring_remove_pending_req(struct fuse_req *req)
1355 {
1356 	struct fuse_ring_queue *queue = req->ring_queue;
1357 
1358 	return fuse_remove_pending_req(req, &queue->lock);
1359 }
1360 
1361 static const struct fuse_iqueue_ops fuse_io_uring_ops = {
1362 	/* should be send over io-uring as enhancement */
1363 	.send_forget = fuse_dev_queue_forget,
1364 
1365 	/*
1366 	 * could be send over io-uring, but interrupts should be rare,
1367 	 * no need to make the code complex
1368 	 */
1369 	.send_interrupt = fuse_dev_queue_interrupt,
1370 	.send_req = fuse_uring_queue_fuse_req,
1371 };
1372