1 /* SPDX-License-Identifier: GPL-2.0 */ 2 #ifndef IOU_MPSCQ_H 3 #define IOU_MPSCQ_H 4 5 #include <linux/io_uring_types.h> 6 7 /* 8 * mpscq - lockless multi-producer, single-consumer FIFO queue 9 * 10 * Unlike llist, which is LIFO ordered and hence needs an O(n) 11 * llist_reverse_order() pass before entries can be processed in queue order, 12 * this queue hands out nodes in the order they were pushed. 13 * 14 * The consumer cursor is held by the caller rather than in the queue struct 15 * (see below), and with the stub reinsertion done as a single cmpxchg attempt 16 * instead of an unconditional push, keeping tail == stub a reliable empty test 17 * while a producer is in the middle of a push. 18 * 19 * Producers may run in any context (task, softirq, hardirq) and are wait-free: 20 * a push is one xchg() plus one store, with no retry loops. FIFO order between 21 * producers is the order in which the xchg() on ->tail serializes them. 22 * 23 * The price for linked-list FIFO is that a push publishes the node in two 24 * steps: the xchg() makes it the new tail, and the subsequent store links it to 25 * its predecessor. In between, the tail end of the queue is not yet reachable 26 * from the head. mpscq_pop() detects this and returns NULL, while mpscq_empty() 27 * reports false. The consumer must not treat such a NULL as "queue empty" - it 28 * should retry later. The window is two instructions wide, but a producer can 29 * be preempted inside it, so the consumer must not spin on it while holding 30 * resources the producer might need to make progress. 31 * 32 * The consumer side only supports a single consumer at a time, callers must 33 * provide their own serialization for it. The stub node is what allows the 34 * consumer to detach the final node without racing with the link stores of 35 * producers. This scheme also guarantees that the previous tail observed by 36 * mpscq_push() cannot be freed by the consumer until the push has linked it, 37 * which is what makes the deferred link store safe. 38 * 39 * The queue struct only holds the producer side. The consumer keeps its cursor 40 * (the oldest not yet handed out node) externally and passes it to mpscq_pop(), 41 * so that it can be placed on a different cacheline: the cursor is written for 42 * every pop, and having it share a line with ->tail would have the consumer 43 * invalidating the line that producers need for every push. 44 */ 45 static inline void mpscq_init(struct mpscq *q, struct llist_node **headp) 46 { 47 q->tail = *headp = &q->stub; 48 q->stub.next = NULL; 49 } 50 51 /* 52 * Returns true if the queue holds no entries that mpscq_pop() hasn't handed out 53 * yet. May be called from any context. Note that !empty doesn't guarantee that 54 * mpscq_pop() will return an entry yet, see the in-flight producer window 55 * above. 56 */ 57 static inline bool mpscq_empty(struct mpscq *q) 58 { 59 return READ_ONCE(q->tail) == &q->stub; 60 } 61 62 /* 63 * Push a node onto the queue. Safe against concurrent pushes from any context, 64 * and against the (single) consumer. Returns true if the queue was empty 65 * before this push. 66 */ 67 static inline bool mpscq_push(struct mpscq *q, struct llist_node *node) 68 { 69 struct llist_node *prev; 70 71 node->next = NULL; 72 /* 73 * xchg() implies a full barrier, so the initialization of the 74 * entry (including ->next above) is visible before the node can 75 * be reached, either via ->tail or via ->next chasing from the 76 * head once the store below has linked it. 77 */ 78 prev = xchg(&q->tail, node); 79 WRITE_ONCE(prev->next, node); 80 return prev == &q->stub; 81 } 82 83 /* 84 * Pop the oldest node off the queue, or return NULL if no node is available. 85 * NULL is returned both when the queue is empty and when a producer has 86 * published a node via ->tail but hasn't linked it yet; use mpscq_empty() to 87 * tell the two apart. Single consumer only, with headp being the consumer 88 * cursor that mpscq_init() set up. 89 */ 90 static inline struct llist_node *mpscq_pop(struct mpscq *q, 91 struct llist_node **headp) 92 { 93 struct llist_node *head = *headp, *next; 94 95 if (head == &q->stub) { 96 head = READ_ONCE(head->next); 97 if (!head) 98 return NULL; 99 /* 100 * The stub is now detached and stays quiescent until the 101 * consumer reinserts it as the tail, so reset its ->next here, 102 * ready for that. 103 */ 104 q->stub.next = NULL; 105 *headp = head; 106 } 107 next = READ_ONCE(head->next); 108 if (next) { 109 *headp = next; 110 return head; 111 } 112 /* 113 * 'head' is the last linked node, it can only be handed out once the 114 * stub has taken its place as the tail. If the cmpxchg fails, a 115 * producer has made a new node the tail but hasn't linked 'head' to 116 * it yet - bail and let the caller retry. 117 */ 118 if (try_cmpxchg(&q->tail, &head, &q->stub)) { 119 *headp = &q->stub; 120 return head; 121 } 122 return NULL; 123 } 124 125 #endif /* IOU_MPSCQ_H */ 126