1 /* SPDX-License-Identifier: GPL-2.0 */ 2 /* XDP user-space ring structure 3 * Copyright(c) 2018 Intel Corporation. 4 */ 5 6 #ifndef _LINUX_XSK_QUEUE_H 7 #define _LINUX_XSK_QUEUE_H 8 9 #include <linux/types.h> 10 #include <linux/if_xdp.h> 11 #include <net/xdp_sock.h> 12 13 #define RX_BATCH_SIZE 16 14 #define LAZY_UPDATE_THRESHOLD 128 15 16 struct xdp_ring { 17 u32 producer ____cacheline_aligned_in_smp; 18 u32 consumer ____cacheline_aligned_in_smp; 19 u32 flags; 20 }; 21 22 /* Used for the RX and TX queues for packets */ 23 struct xdp_rxtx_ring { 24 struct xdp_ring ptrs; 25 struct xdp_desc desc[0] ____cacheline_aligned_in_smp; 26 }; 27 28 /* Used for the fill and completion queues for buffers */ 29 struct xdp_umem_ring { 30 struct xdp_ring ptrs; 31 u64 desc[0] ____cacheline_aligned_in_smp; 32 }; 33 34 struct xsk_queue { 35 u64 chunk_mask; 36 u64 size; 37 u32 ring_mask; 38 u32 nentries; 39 u32 prod_head; 40 u32 prod_tail; 41 u32 cons_head; 42 u32 cons_tail; 43 struct xdp_ring *ring; 44 u64 invalid_descs; 45 }; 46 47 /* The structure of the shared state of the rings are the same as the 48 * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion 49 * ring, the kernel is the producer and user space is the consumer. For 50 * the Tx and fill rings, the kernel is the consumer and user space is 51 * the producer. 52 * 53 * producer consumer 54 * 55 * if (LOAD ->consumer) { LOAD ->producer 56 * (A) smp_rmb() (C) 57 * STORE $data LOAD $data 58 * smp_wmb() (B) smp_mb() (D) 59 * STORE ->producer STORE ->consumer 60 * } 61 * 62 * (A) pairs with (D), and (B) pairs with (C). 63 * 64 * Starting with (B), it protects the data from being written after 65 * the producer pointer. If this barrier was missing, the consumer 66 * could observe the producer pointer being set and thus load the data 67 * before the producer has written the new data. The consumer would in 68 * this case load the old data. 69 * 70 * (C) protects the consumer from speculatively loading the data before 71 * the producer pointer actually has been read. If we do not have this 72 * barrier, some architectures could load old data as speculative loads 73 * are not discarded as the CPU does not know there is a dependency 74 * between ->producer and data. 75 * 76 * (A) is a control dependency that separates the load of ->consumer 77 * from the stores of $data. In case ->consumer indicates there is no 78 * room in the buffer to store $data we do not. So no barrier is needed. 79 * 80 * (D) protects the load of the data to be observed to happen after the 81 * store of the consumer pointer. If we did not have this memory 82 * barrier, the producer could observe the consumer pointer being set 83 * and overwrite the data with a new value before the consumer got the 84 * chance to read the old value. The consumer would thus miss reading 85 * the old entry and very likely read the new entry twice, once right 86 * now and again after circling through the ring. 87 */ 88 89 /* Common functions operating for both RXTX and umem queues */ 90 91 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) 92 { 93 return q ? q->invalid_descs : 0; 94 } 95 96 static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) 97 { 98 u32 entries = q->prod_tail - q->cons_tail; 99 100 if (entries == 0) { 101 /* Refresh the local pointer */ 102 q->prod_tail = READ_ONCE(q->ring->producer); 103 entries = q->prod_tail - q->cons_tail; 104 } 105 106 return (entries > dcnt) ? dcnt : entries; 107 } 108 109 static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) 110 { 111 u32 free_entries = q->nentries - (producer - q->cons_tail); 112 113 if (free_entries >= dcnt) 114 return free_entries; 115 116 /* Refresh the local tail pointer */ 117 q->cons_tail = READ_ONCE(q->ring->consumer); 118 return q->nentries - (producer - q->cons_tail); 119 } 120 121 static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt) 122 { 123 u32 entries = q->prod_tail - q->cons_tail; 124 125 if (entries >= cnt) 126 return true; 127 128 /* Refresh the local pointer. */ 129 q->prod_tail = READ_ONCE(q->ring->producer); 130 entries = q->prod_tail - q->cons_tail; 131 132 return entries >= cnt; 133 } 134 135 /* UMEM queue */ 136 137 static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) 138 { 139 if (addr >= q->size) { 140 q->invalid_descs++; 141 return false; 142 } 143 144 return true; 145 } 146 147 static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr) 148 { 149 while (q->cons_tail != q->cons_head) { 150 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 151 unsigned int idx = q->cons_tail & q->ring_mask; 152 153 *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask; 154 if (xskq_is_valid_addr(q, *addr)) 155 return addr; 156 157 q->cons_tail++; 158 } 159 160 return NULL; 161 } 162 163 static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr) 164 { 165 if (q->cons_tail == q->cons_head) { 166 smp_mb(); /* D, matches A */ 167 WRITE_ONCE(q->ring->consumer, q->cons_tail); 168 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 169 170 /* Order consumer and data */ 171 smp_rmb(); 172 } 173 174 return xskq_validate_addr(q, addr); 175 } 176 177 static inline void xskq_discard_addr(struct xsk_queue *q) 178 { 179 q->cons_tail++; 180 } 181 182 static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) 183 { 184 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 185 186 if (xskq_nb_free(q, q->prod_tail, 1) == 0) 187 return -ENOSPC; 188 189 /* A, matches D */ 190 ring->desc[q->prod_tail++ & q->ring_mask] = addr; 191 192 /* Order producer and data */ 193 smp_wmb(); /* B, matches C */ 194 195 WRITE_ONCE(q->ring->producer, q->prod_tail); 196 return 0; 197 } 198 199 static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) 200 { 201 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 202 203 if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) 204 return -ENOSPC; 205 206 /* A, matches D */ 207 ring->desc[q->prod_head++ & q->ring_mask] = addr; 208 return 0; 209 } 210 211 static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, 212 u32 nb_entries) 213 { 214 /* Order producer and data */ 215 smp_wmb(); /* B, matches C */ 216 217 q->prod_tail += nb_entries; 218 WRITE_ONCE(q->ring->producer, q->prod_tail); 219 } 220 221 static inline int xskq_reserve_addr(struct xsk_queue *q) 222 { 223 if (xskq_nb_free(q, q->prod_head, 1) == 0) 224 return -ENOSPC; 225 226 /* A, matches D */ 227 q->prod_head++; 228 return 0; 229 } 230 231 /* Rx/Tx queue */ 232 233 static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d) 234 { 235 if (!xskq_is_valid_addr(q, d->addr)) 236 return false; 237 238 if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) || 239 d->options) { 240 q->invalid_descs++; 241 return false; 242 } 243 244 return true; 245 } 246 247 static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q, 248 struct xdp_desc *desc) 249 { 250 while (q->cons_tail != q->cons_head) { 251 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 252 unsigned int idx = q->cons_tail & q->ring_mask; 253 254 *desc = READ_ONCE(ring->desc[idx]); 255 if (xskq_is_valid_desc(q, desc)) 256 return desc; 257 258 q->cons_tail++; 259 } 260 261 return NULL; 262 } 263 264 static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, 265 struct xdp_desc *desc) 266 { 267 if (q->cons_tail == q->cons_head) { 268 smp_mb(); /* D, matches A */ 269 WRITE_ONCE(q->ring->consumer, q->cons_tail); 270 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 271 272 /* Order consumer and data */ 273 smp_rmb(); /* C, matches B */ 274 } 275 276 return xskq_validate_desc(q, desc); 277 } 278 279 static inline void xskq_discard_desc(struct xsk_queue *q) 280 { 281 q->cons_tail++; 282 } 283 284 static inline int xskq_produce_batch_desc(struct xsk_queue *q, 285 u64 addr, u32 len) 286 { 287 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 288 unsigned int idx; 289 290 if (xskq_nb_free(q, q->prod_head, 1) == 0) 291 return -ENOSPC; 292 293 /* A, matches D */ 294 idx = (q->prod_head++) & q->ring_mask; 295 ring->desc[idx].addr = addr; 296 ring->desc[idx].len = len; 297 298 return 0; 299 } 300 301 static inline void xskq_produce_flush_desc(struct xsk_queue *q) 302 { 303 /* Order producer and data */ 304 smp_wmb(); /* B, matches C */ 305 306 q->prod_tail = q->prod_head; 307 WRITE_ONCE(q->ring->producer, q->prod_tail); 308 } 309 310 static inline bool xskq_full_desc(struct xsk_queue *q) 311 { 312 return xskq_nb_avail(q, q->nentries) == q->nentries; 313 } 314 315 static inline bool xskq_empty_desc(struct xsk_queue *q) 316 { 317 return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries; 318 } 319 320 void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask); 321 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue); 322 void xskq_destroy(struct xsk_queue *q_ops); 323 324 /* Executed by the core when the entire UMEM gets freed */ 325 void xsk_reuseq_destroy(struct xdp_umem *umem); 326 327 #endif /* _LINUX_XSK_QUEUE_H */ 328