xref: /freebsd/sys/dev/liquidio/base/lio_droq.c (revision c6989859ae9388eeb46a24fe88f9b8d07101c710)
1 /*
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2017 Cavium, Inc.. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Cavium, Inc. nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER(S) OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 /*$FreeBSD$*/
34 
35 #include "lio_bsd.h"
36 #include "lio_common.h"
37 #include "lio_droq.h"
38 #include "lio_iq.h"
39 #include "lio_response_manager.h"
40 #include "lio_device.h"
41 #include "lio_main.h"
42 #include "cn23xx_pf_device.h"
43 #include "lio_network.h"
44 
45 struct __dispatch {
46 	struct lio_stailq_node	node;
47 	struct lio_recv_info	*rinfo;
48 	lio_dispatch_fn_t	disp_fn;
49 };
50 
51 void	*lio_get_dispatch_arg(struct octeon_device *oct,
52 			      uint16_t opcode, uint16_t subcode);
53 
54 /*
55  *  Get the argument that the user set when registering dispatch
56  *  function for a given opcode/subcode.
57  *  @param  octeon_dev - the octeon device pointer.
58  *  @param  opcode     - the opcode for which the dispatch argument
59  *                       is to be checked.
60  *  @param  subcode    - the subcode for which the dispatch argument
61  *                       is to be checked.
62  *  @return  Success: void * (argument to the dispatch function)
63  *  @return  Failure: NULL
64  *
65  */
66 void   *
67 lio_get_dispatch_arg(struct octeon_device *octeon_dev,
68 		     uint16_t opcode, uint16_t subcode)
69 {
70 	struct lio_stailq_node	*dispatch;
71 	void			*fn_arg = NULL;
72 	int			idx;
73 	uint16_t		combined_opcode;
74 
75 	combined_opcode = LIO_OPCODE_SUBCODE(opcode, subcode);
76 
77 	idx = combined_opcode & LIO_OPCODE_MASK;
78 
79 	mtx_lock(&octeon_dev->dispatch.lock);
80 
81 	if (octeon_dev->dispatch.count == 0) {
82 		mtx_unlock(&octeon_dev->dispatch.lock);
83 		return (NULL);
84 	}
85 
86 	if (octeon_dev->dispatch.dlist[idx].opcode == combined_opcode) {
87 		fn_arg = octeon_dev->dispatch.dlist[idx].arg;
88 	} else {
89 		STAILQ_FOREACH(dispatch,
90 			       &octeon_dev->dispatch.dlist[idx].head, entries) {
91 			if (((struct lio_dispatch *)dispatch)->opcode ==
92 			    combined_opcode) {
93 				fn_arg = ((struct lio_dispatch *)dispatch)->arg;
94 				break;
95 			}
96 		}
97 	}
98 
99 	mtx_unlock(&octeon_dev->dispatch.lock);
100 	return (fn_arg);
101 }
102 
103 /*
104  *  Check for packets on Droq. This function should be called with lock held.
105  *  @param  droq - Droq on which count is checked.
106  *  @return Returns packet count.
107  */
108 uint32_t
109 lio_droq_check_hw_for_pkts(struct lio_droq *droq)
110 {
111 	struct octeon_device	*oct = droq->oct_dev;
112 	uint32_t		last_count;
113 	uint32_t		pkt_count = 0;
114 
115 	pkt_count = lio_read_csr32(oct, droq->pkts_sent_reg);
116 
117 	last_count = pkt_count - droq->pkt_count;
118 	droq->pkt_count = pkt_count;
119 
120 	/* we shall write to cnts at the end of processing */
121 	if (last_count)
122 		atomic_add_int(&droq->pkts_pending, last_count);
123 
124 	return (last_count);
125 }
126 
127 static void
128 lio_droq_compute_max_packet_bufs(struct lio_droq *droq)
129 {
130 	uint32_t	count = 0;
131 
132 	/*
133 	 * max_empty_descs is the max. no. of descs that can have no buffers.
134 	 * If the empty desc count goes beyond this value, we cannot safely
135 	 * read in a 64K packet sent by Octeon
136 	 * (64K is max pkt size from Octeon)
137 	 */
138 	droq->max_empty_descs = 0;
139 
140 	do {
141 		droq->max_empty_descs++;
142 		count += droq->buffer_size;
143 	} while (count < (64 * 1024));
144 
145 	droq->max_empty_descs = droq->max_count - droq->max_empty_descs;
146 }
147 
148 static void
149 lio_droq_reset_indices(struct lio_droq *droq)
150 {
151 
152 	droq->read_idx = 0;
153 	droq->refill_idx = 0;
154 	droq->refill_count = 0;
155 	atomic_store_rel_int(&droq->pkts_pending, 0);
156 }
157 
158 static void
159 lio_droq_destroy_ring_buffers(struct octeon_device *oct,
160 			      struct lio_droq *droq)
161 {
162 	uint32_t	i;
163 
164 	for (i = 0; i < droq->max_count; i++) {
165 		if (droq->recv_buf_list[i].buffer != NULL) {
166 			lio_recv_buffer_free(droq->recv_buf_list[i].buffer);
167 			droq->recv_buf_list[i].buffer = NULL;
168 		}
169 	}
170 
171 	lio_droq_reset_indices(droq);
172 }
173 
174 static int
175 lio_droq_setup_ring_buffers(struct octeon_device *oct,
176 			    struct lio_droq *droq)
177 {
178 	struct lio_droq_desc	*desc_ring = droq->desc_ring;
179 	void			*buf;
180 	uint32_t		i;
181 
182 	for (i = 0; i < droq->max_count; i++) {
183 		buf = lio_recv_buffer_alloc(droq->buffer_size);
184 
185 		if (buf == NULL) {
186 			lio_dev_err(oct, "%s buffer alloc failed\n",
187 				    __func__);
188 			droq->stats.rx_alloc_failure++;
189 			return (-ENOMEM);
190 		}
191 
192 		droq->recv_buf_list[i].buffer = buf;
193 		droq->recv_buf_list[i].data = ((struct mbuf *)buf)->m_data;
194 		desc_ring[i].info_ptr = 0;
195 		desc_ring[i].buffer_ptr =
196 			lio_map_ring(oct->device, droq->recv_buf_list[i].buffer,
197 				     droq->buffer_size);
198 	}
199 
200 	lio_droq_reset_indices(droq);
201 
202 	lio_droq_compute_max_packet_bufs(droq);
203 
204 	return (0);
205 }
206 
207 int
208 lio_delete_droq(struct octeon_device *oct, uint32_t q_no)
209 {
210 	struct lio_droq	*droq = oct->droq[q_no];
211 
212 	lio_dev_dbg(oct, "%s[%d]\n", __func__, q_no);
213 
214 	while (taskqueue_cancel(droq->droq_taskqueue, &droq->droq_task, NULL))
215 		taskqueue_drain(droq->droq_taskqueue, &droq->droq_task);
216 
217 	taskqueue_free(droq->droq_taskqueue);
218 	droq->droq_taskqueue = NULL;
219 
220 	lio_droq_destroy_ring_buffers(oct, droq);
221 	free(droq->recv_buf_list, M_DEVBUF);
222 
223 	if (droq->desc_ring != NULL)
224 		lio_dma_free((droq->max_count * LIO_DROQ_DESC_SIZE),
225 			     droq->desc_ring);
226 
227 	oct->io_qmask.oq &= ~(1ULL << q_no);
228 	bzero(oct->droq[q_no], sizeof(struct lio_droq));
229 	oct->num_oqs--;
230 
231 	return (0);
232 }
233 
234 void
235 lio_droq_bh(void *ptr, int pending __unused)
236 {
237 	struct lio_droq		*droq = ptr;
238 	struct octeon_device	*oct = droq->oct_dev;
239 	struct lio_instr_queue	*iq = oct->instr_queue[droq->q_no];
240 	int	reschedule, tx_done = 1;
241 
242 	reschedule = lio_droq_process_packets(oct, droq, oct->rx_budget);
243 
244 	if (atomic_load_acq_int(&iq->instr_pending))
245 		tx_done = lio_flush_iq(oct, iq, oct->tx_budget);
246 
247 	if (reschedule || !tx_done)
248 		taskqueue_enqueue(droq->droq_taskqueue, &droq->droq_task);
249 	else
250 		lio_enable_irq(droq, iq);
251 }
252 
253 int
254 lio_init_droq(struct octeon_device *oct, uint32_t q_no,
255 	      uint32_t num_descs, uint32_t desc_size, void *app_ctx)
256 {
257 	struct lio_droq	*droq;
258 	unsigned long	size;
259 	uint32_t	c_buf_size = 0, c_num_descs = 0, c_pkts_per_intr = 0;
260 	uint32_t	c_refill_threshold = 0, desc_ring_size = 0;
261 
262 	lio_dev_dbg(oct, "%s[%d]\n", __func__, q_no);
263 
264 	droq = oct->droq[q_no];
265 	bzero(droq, LIO_DROQ_SIZE);
266 
267 	droq->oct_dev = oct;
268 	droq->q_no = q_no;
269 	if (app_ctx != NULL)
270 		droq->app_ctx = app_ctx;
271 	else
272 		droq->app_ctx = (void *)(size_t)q_no;
273 
274 	c_num_descs = num_descs;
275 	c_buf_size = desc_size;
276 	if (LIO_CN23XX_PF(oct)) {
277 		struct lio_config *conf23 = LIO_CHIP_CONF(oct, cn23xx_pf);
278 
279 		c_pkts_per_intr =
280 			(uint32_t)LIO_GET_OQ_PKTS_PER_INTR_CFG(conf23);
281 		c_refill_threshold =
282 			(uint32_t)LIO_GET_OQ_REFILL_THRESHOLD_CFG(conf23);
283 	} else {
284 		return (1);
285 	}
286 
287 	droq->max_count = c_num_descs;
288 	droq->buffer_size = c_buf_size;
289 
290 	desc_ring_size = droq->max_count * LIO_DROQ_DESC_SIZE;
291 	droq->desc_ring = lio_dma_alloc(desc_ring_size, &droq->desc_ring_dma);
292 	if (droq->desc_ring == NULL) {
293 		lio_dev_err(oct, "Output queue %d ring alloc failed\n", q_no);
294 		return (1);
295 	}
296 
297 	lio_dev_dbg(oct, "droq[%d]: desc_ring: virt: 0x%p, dma: %llx\n", q_no,
298 		    droq->desc_ring, LIO_CAST64(droq->desc_ring_dma));
299 	lio_dev_dbg(oct, "droq[%d]: num_desc: %d\n", q_no, droq->max_count);
300 
301 	size = droq->max_count * LIO_DROQ_RECVBUF_SIZE;
302 	droq->recv_buf_list =
303 		(struct lio_recv_buffer *)malloc(size, M_DEVBUF,
304 						 M_NOWAIT | M_ZERO);
305 	if (droq->recv_buf_list == NULL) {
306 		lio_dev_err(oct, "Output queue recv buf list alloc failed\n");
307 		goto init_droq_fail;
308 	}
309 
310 	if (lio_droq_setup_ring_buffers(oct, droq))
311 		goto init_droq_fail;
312 
313 	droq->pkts_per_intr = c_pkts_per_intr;
314 	droq->refill_threshold = c_refill_threshold;
315 
316 	lio_dev_dbg(oct, "DROQ INIT: max_empty_descs: %d\n",
317 		    droq->max_empty_descs);
318 
319 	mtx_init(&droq->lock, "droq_lock", NULL, MTX_DEF);
320 
321 	STAILQ_INIT(&droq->dispatch_stq_head);
322 
323 	oct->fn_list.setup_oq_regs(oct, q_no);
324 
325 	oct->io_qmask.oq |= BIT_ULL(q_no);
326 
327 	/*
328 	 * Initialize the taskqueue that handles
329 	 * output queue packet processing.
330 	 */
331 	lio_dev_dbg(oct, "Initializing droq%d taskqueue\n", q_no);
332 	NET_TASK_INIT(&droq->droq_task, 0, lio_droq_bh, (void *)droq);
333 
334 	droq->droq_taskqueue = taskqueue_create_fast("lio_droq_task", M_NOWAIT,
335 						     taskqueue_thread_enqueue,
336 						     &droq->droq_taskqueue);
337 	taskqueue_start_threads_cpuset(&droq->droq_taskqueue, 1, PI_NET,
338 				       &oct->ioq_vector[q_no].affinity_mask,
339 				       "lio%d_droq%d_task", oct->octeon_id,
340 				       q_no);
341 
342 	return (0);
343 
344 init_droq_fail:
345 	lio_delete_droq(oct, q_no);
346 	return (1);
347 }
348 
349 /*
350  * lio_create_recv_info
351  * Parameters:
352  *  octeon_dev - pointer to the octeon device structure
353  *  droq       - droq in which the packet arrived.
354  *  buf_cnt    - no. of buffers used by the packet.
355  *  idx        - index in the descriptor for the first buffer in the packet.
356  * Description:
357  *  Allocates a recv_info_t and copies the buffer addresses for packet data
358  *  into the recv_pkt space which starts at an 8B offset from recv_info_t.
359  *  Flags the descriptors for refill later. If available descriptors go
360  *  below the threshold to receive a 64K pkt, new buffers are first allocated
361  *  before the recv_pkt_t is created.
362  *  This routine will be called in interrupt context.
363  * Returns:
364  *  Success: Pointer to recv_info_t
365  *  Failure: NULL.
366  * Locks:
367  *  The droq->lock is held when this routine is called.
368  */
369 static inline struct lio_recv_info *
370 lio_create_recv_info(struct octeon_device *octeon_dev, struct lio_droq *droq,
371 		     uint32_t buf_cnt, uint32_t idx)
372 {
373 	struct lio_droq_info	*info;
374 	struct lio_recv_pkt	*recv_pkt;
375 	struct lio_recv_info	*recv_info;
376 	uint32_t		bytes_left, i;
377 
378 	info = (struct lio_droq_info *)droq->recv_buf_list[idx].data;
379 
380 	recv_info = lio_alloc_recv_info(sizeof(struct __dispatch));
381 	if (recv_info == NULL)
382 		return (NULL);
383 
384 	recv_pkt = recv_info->recv_pkt;
385 	recv_pkt->rh = info->rh;
386 	recv_pkt->length = (uint32_t)info->length;
387 	recv_pkt->buffer_count = (uint16_t)buf_cnt;
388 	recv_pkt->octeon_id = (uint16_t)octeon_dev->octeon_id;
389 
390 	i = 0;
391 	bytes_left = (uint32_t)info->length;
392 
393 	while (buf_cnt) {
394 		recv_pkt->buffer_size[i] = (bytes_left >= droq->buffer_size) ?
395 			droq->buffer_size : bytes_left;
396 
397 		recv_pkt->buffer_ptr[i] = droq->recv_buf_list[idx].buffer;
398 		droq->recv_buf_list[idx].buffer = NULL;
399 
400 		idx = lio_incr_index(idx, 1, droq->max_count);
401 		bytes_left -= droq->buffer_size;
402 		i++;
403 		buf_cnt--;
404 	}
405 
406 	return (recv_info);
407 }
408 
409 /*
410  * If we were not able to refill all buffers, try to move around
411  * the buffers that were not dispatched.
412  */
413 static inline uint32_t
414 lio_droq_refill_pullup_descs(struct lio_droq *droq,
415 			     struct lio_droq_desc *desc_ring)
416 {
417 	uint32_t	desc_refilled = 0;
418 	uint32_t	refill_index = droq->refill_idx;
419 
420 	while (refill_index != droq->read_idx) {
421 		if (droq->recv_buf_list[refill_index].buffer != NULL) {
422 			droq->recv_buf_list[droq->refill_idx].buffer =
423 				droq->recv_buf_list[refill_index].buffer;
424 			droq->recv_buf_list[droq->refill_idx].data =
425 				droq->recv_buf_list[refill_index].data;
426 			desc_ring[droq->refill_idx].buffer_ptr =
427 				desc_ring[refill_index].buffer_ptr;
428 			droq->recv_buf_list[refill_index].buffer = NULL;
429 			desc_ring[refill_index].buffer_ptr = 0;
430 			do {
431 				droq->refill_idx =
432 					lio_incr_index(droq->refill_idx, 1,
433 						       droq->max_count);
434 				desc_refilled++;
435 				droq->refill_count--;
436 			} while (droq->recv_buf_list[droq->refill_idx].buffer !=
437 				 NULL);
438 		}
439 		refill_index = lio_incr_index(refill_index, 1, droq->max_count);
440 	}	/* while */
441 	return (desc_refilled);
442 }
443 
444 /*
445  * lio_droq_refill
446  * Parameters:
447  *  droq       - droq in which descriptors require new buffers.
448  * Description:
449  *  Called during normal DROQ processing in interrupt mode or by the poll
450  *  thread to refill the descriptors from which buffers were dispatched
451  *  to upper layers. Attempts to allocate new buffers. If that fails, moves
452  *  up buffers (that were not dispatched) to form a contiguous ring.
453  * Returns:
454  *  No of descriptors refilled.
455  * Locks:
456  *  This routine is called with droq->lock held.
457  */
458 uint32_t
459 lio_droq_refill(struct octeon_device *octeon_dev, struct lio_droq *droq)
460 {
461 	struct lio_droq_desc	*desc_ring;
462 	void			*buf = NULL;
463 	uint32_t		desc_refilled = 0;
464 	uint8_t			*data;
465 
466 	desc_ring = droq->desc_ring;
467 
468 	while (droq->refill_count && (desc_refilled < droq->max_count)) {
469 		/*
470 		 * If a valid buffer exists (happens if there is no dispatch),
471 		 * reuse
472 		 * the buffer, else allocate.
473 		 */
474 		if (droq->recv_buf_list[droq->refill_idx].buffer == NULL) {
475 			buf = lio_recv_buffer_alloc(droq->buffer_size);
476 			/*
477 			 * If a buffer could not be allocated, no point in
478 			 * continuing
479 			 */
480 			if (buf == NULL) {
481 				droq->stats.rx_alloc_failure++;
482 				break;
483 			}
484 
485 			droq->recv_buf_list[droq->refill_idx].buffer = buf;
486 			data = ((struct mbuf *)buf)->m_data;
487 		} else {
488 			data = ((struct mbuf *)droq->recv_buf_list
489 				[droq->refill_idx].buffer)->m_data;
490 		}
491 
492 		droq->recv_buf_list[droq->refill_idx].data = data;
493 
494 		desc_ring[droq->refill_idx].buffer_ptr =
495 		    lio_map_ring(octeon_dev->device,
496 				 droq->recv_buf_list[droq->refill_idx].buffer,
497 				 droq->buffer_size);
498 
499 		droq->refill_idx = lio_incr_index(droq->refill_idx, 1,
500 						  droq->max_count);
501 		desc_refilled++;
502 		droq->refill_count--;
503 	}
504 
505 	if (droq->refill_count)
506 		desc_refilled += lio_droq_refill_pullup_descs(droq, desc_ring);
507 
508 	/*
509 	 * if droq->refill_count
510 	 * The refill count would not change in pass two. We only moved buffers
511 	 * to close the gap in the ring, but we would still have the same no. of
512 	 * buffers to refill.
513 	 */
514 	return (desc_refilled);
515 }
516 
517 static inline uint32_t
518 lio_droq_get_bufcount(uint32_t buf_size, uint32_t total_len)
519 {
520 
521 	return ((total_len + buf_size - 1) / buf_size);
522 }
523 
524 static int
525 lio_droq_dispatch_pkt(struct octeon_device *oct, struct lio_droq *droq,
526 		      union octeon_rh *rh, struct lio_droq_info *info)
527 {
528 	struct lio_recv_info	*rinfo;
529 	lio_dispatch_fn_t	disp_fn;
530 	uint32_t		cnt;
531 
532 	cnt = lio_droq_get_bufcount(droq->buffer_size, (uint32_t)info->length);
533 
534 	disp_fn = lio_get_dispatch(oct, (uint16_t)rh->r.opcode,
535 				   (uint16_t)rh->r.subcode);
536 	if (disp_fn) {
537 		rinfo = lio_create_recv_info(oct, droq, cnt, droq->read_idx);
538 		if (rinfo != NULL) {
539 			struct __dispatch *rdisp = rinfo->rsvd;
540 
541 			rdisp->rinfo = rinfo;
542 			rdisp->disp_fn = disp_fn;
543 			rinfo->recv_pkt->rh = *rh;
544 			STAILQ_INSERT_TAIL(&droq->dispatch_stq_head,
545 					   &rdisp->node, entries);
546 		} else {
547 			droq->stats.dropped_nomem++;
548 		}
549 	} else {
550 		lio_dev_err(oct, "DROQ: No dispatch function (opcode %u/%u)\n",
551 			    (unsigned int)rh->r.opcode,
552 			    (unsigned int)rh->r.subcode);
553 		droq->stats.dropped_nodispatch++;
554 	}
555 
556 	return (cnt);
557 }
558 
559 static inline void
560 lio_droq_drop_packets(struct octeon_device *oct, struct lio_droq *droq,
561 		      uint32_t cnt)
562 {
563 	struct lio_droq_info	*info;
564 	uint32_t		i = 0, buf_cnt;
565 
566 	for (i = 0; i < cnt; i++) {
567 		info = (struct lio_droq_info *)
568 			droq->recv_buf_list[droq->read_idx].data;
569 
570 		lio_swap_8B_data((uint64_t *)info, 2);
571 
572 		if (info->length) {
573 			info->length += 8;
574 			droq->stats.bytes_received += info->length;
575 			buf_cnt = lio_droq_get_bufcount(droq->buffer_size,
576 							(uint32_t)info->length);
577 		} else {
578 			lio_dev_err(oct, "DROQ: In drop: pkt with len 0\n");
579 			buf_cnt = 1;
580 		}
581 
582 		droq->read_idx = lio_incr_index(droq->read_idx, buf_cnt,
583 						droq->max_count);
584 		droq->refill_count += buf_cnt;
585 	}
586 }
587 
588 static uint32_t
589 lio_droq_fast_process_packets(struct octeon_device *oct, struct lio_droq *droq,
590 			      uint32_t pkts_to_process)
591 {
592 	struct lio_droq_info	*info;
593 	union			octeon_rh *rh;
594 	uint32_t		pkt, pkt_count, total_len = 0;
595 
596 	pkt_count = pkts_to_process;
597 
598 	for (pkt = 0; pkt < pkt_count; pkt++) {
599 		struct mbuf	*nicbuf = NULL;
600 		uint32_t	pkt_len = 0;
601 
602 		info = (struct lio_droq_info *)
603 		    droq->recv_buf_list[droq->read_idx].data;
604 
605 		lio_swap_8B_data((uint64_t *)info, 2);
606 
607 		if (!info->length) {
608 			lio_dev_err(oct,
609 				    "DROQ[%d] idx: %d len:0, pkt_cnt: %d\n",
610 				    droq->q_no, droq->read_idx, pkt_count);
611 			hexdump((uint8_t *)info, LIO_DROQ_INFO_SIZE, NULL,
612 				HD_OMIT_CHARS);
613 			pkt++;
614 			lio_incr_index(droq->read_idx, 1, droq->max_count);
615 			droq->refill_count++;
616 			break;
617 		}
618 
619 		rh = &info->rh;
620 
621 		info->length += 8;
622 		rh->r_dh.len += (LIO_DROQ_INFO_SIZE + 7) / 8;
623 
624 		total_len += (uint32_t)info->length;
625 		if (lio_opcode_slow_path(rh)) {
626 			uint32_t	buf_cnt;
627 
628 			buf_cnt = lio_droq_dispatch_pkt(oct, droq, rh, info);
629 			droq->read_idx = lio_incr_index(droq->read_idx,	buf_cnt,
630 							droq->max_count);
631 			droq->refill_count += buf_cnt;
632 		} else {
633 			if (info->length <= droq->buffer_size) {
634 				pkt_len = (uint32_t)info->length;
635 				nicbuf = droq->recv_buf_list[
636 						       droq->read_idx].buffer;
637 				nicbuf->m_len = pkt_len;
638 				droq->recv_buf_list[droq->read_idx].buffer =
639 					NULL;
640 
641 				droq->read_idx =
642 					lio_incr_index(droq->read_idx,
643 						       1, droq->max_count);
644 				droq->refill_count++;
645 			} else {
646 				bool	secondary_frag = false;
647 
648 				pkt_len = 0;
649 
650 				while (pkt_len < info->length) {
651 					int	frag_len, idx = droq->read_idx;
652 					struct mbuf	*buffer;
653 
654 					frag_len =
655 						((pkt_len + droq->buffer_size) >
656 						 info->length) ?
657 						((uint32_t)info->length -
658 						 pkt_len) : droq->buffer_size;
659 
660 					buffer = ((struct mbuf *)
661 						  droq->recv_buf_list[idx].
662 						  buffer);
663 					buffer->m_len = frag_len;
664 					if (__predict_true(secondary_frag)) {
665 						m_cat(nicbuf, buffer);
666 					} else {
667 						nicbuf = buffer;
668 						secondary_frag = true;
669 					}
670 
671 					droq->recv_buf_list[droq->read_idx].
672 						buffer = NULL;
673 
674 					pkt_len += frag_len;
675 					droq->read_idx =
676 						lio_incr_index(droq->read_idx,
677 							       1,
678 							       droq->max_count);
679 					droq->refill_count++;
680 				}
681 			}
682 
683 			if (nicbuf != NULL) {
684 				if (droq->ops.fptr != NULL) {
685 					droq->ops.fptr(nicbuf, pkt_len, rh,
686 						       droq, droq->ops.farg);
687 				} else {
688 					lio_recv_buffer_free(nicbuf);
689 				}
690 			}
691 		}
692 
693 		if (droq->refill_count >= droq->refill_threshold) {
694 			int desc_refilled = lio_droq_refill(oct, droq);
695 
696 			/*
697 			 * Flush the droq descriptor data to memory to be sure
698 			 * that when we update the credits the data in memory
699 			 * is accurate.
700 			 */
701 			wmb();
702 			lio_write_csr32(oct, droq->pkts_credit_reg,
703 					desc_refilled);
704 			/* make sure mmio write completes */
705 			__compiler_membar();
706 		}
707 	}	/* for (each packet)... */
708 
709 	/* Increment refill_count by the number of buffers processed. */
710 	droq->stats.pkts_received += pkt;
711 	droq->stats.bytes_received += total_len;
712 
713 	tcp_lro_flush_all(&droq->lro);
714 
715 	if ((droq->ops.drop_on_max) && (pkts_to_process - pkt)) {
716 		lio_droq_drop_packets(oct, droq, (pkts_to_process - pkt));
717 
718 		droq->stats.dropped_toomany += (pkts_to_process - pkt);
719 		return (pkts_to_process);
720 	}
721 
722 	return (pkt);
723 }
724 
725 int
726 lio_droq_process_packets(struct octeon_device *oct, struct lio_droq *droq,
727 			 uint32_t budget)
728 {
729 	struct lio_stailq_node	*tmp, *tmp2;
730 	uint32_t		pkt_count = 0, pkts_processed = 0;
731 
732 	/* Grab the droq lock */
733 	mtx_lock(&droq->lock);
734 
735 	lio_droq_check_hw_for_pkts(droq);
736 	pkt_count = atomic_load_acq_int(&droq->pkts_pending);
737 
738 	if (!pkt_count) {
739 		mtx_unlock(&droq->lock);
740 		return (0);
741 	}
742 	if (pkt_count > budget)
743 		pkt_count = budget;
744 
745 	pkts_processed = lio_droq_fast_process_packets(oct, droq, pkt_count);
746 
747 	atomic_subtract_int(&droq->pkts_pending, pkts_processed);
748 
749 	/* Release the lock */
750 	mtx_unlock(&droq->lock);
751 
752 	STAILQ_FOREACH_SAFE(tmp, &droq->dispatch_stq_head, entries, tmp2) {
753 		struct __dispatch *rdisp = (struct __dispatch *)tmp;
754 
755 		STAILQ_REMOVE_HEAD(&droq->dispatch_stq_head, entries);
756 		rdisp->disp_fn(rdisp->rinfo, lio_get_dispatch_arg(oct,
757 			(uint16_t)rdisp->rinfo->recv_pkt->rh.r.opcode,
758 			(uint16_t)rdisp->rinfo->recv_pkt->rh.r.subcode));
759 	}
760 
761 	/* If there are packets pending. schedule tasklet again */
762 	if (atomic_load_acq_int(&droq->pkts_pending))
763 		return (1);
764 
765 	return (0);
766 }
767 
768 int
769 lio_register_droq_ops(struct octeon_device *oct, uint32_t q_no,
770 		      struct lio_droq_ops *ops)
771 {
772 	struct lio_droq		*droq;
773 	struct lio_config	*lio_cfg = NULL;
774 
775 	lio_cfg = lio_get_conf(oct);
776 
777 	if (lio_cfg == NULL)
778 		return (-EINVAL);
779 
780 	if (ops == NULL) {
781 		lio_dev_err(oct, "%s: droq_ops pointer is NULL\n", __func__);
782 		return (-EINVAL);
783 	}
784 
785 	if (q_no >= LIO_GET_OQ_MAX_Q_CFG(lio_cfg)) {
786 		lio_dev_err(oct, "%s: droq id (%d) exceeds MAX (%d)\n",
787 			    __func__, q_no, (oct->num_oqs - 1));
788 		return (-EINVAL);
789 	}
790 	droq = oct->droq[q_no];
791 
792 	mtx_lock(&droq->lock);
793 
794 	memcpy(&droq->ops, ops, sizeof(struct lio_droq_ops));
795 
796 	mtx_unlock(&droq->lock);
797 
798 	return (0);
799 }
800 
801 int
802 lio_unregister_droq_ops(struct octeon_device *oct, uint32_t q_no)
803 {
804 	struct lio_droq		*droq;
805 	struct lio_config	*lio_cfg = NULL;
806 
807 	lio_cfg = lio_get_conf(oct);
808 
809 	if (lio_cfg == NULL)
810 		return (-EINVAL);
811 
812 	if (q_no >= LIO_GET_OQ_MAX_Q_CFG(lio_cfg)) {
813 		lio_dev_err(oct, "%s: droq id (%d) exceeds MAX (%d)\n",
814 			    __func__, q_no, oct->num_oqs - 1);
815 		return (-EINVAL);
816 	}
817 
818 	droq = oct->droq[q_no];
819 
820 	if (droq == NULL) {
821 		lio_dev_info(oct, "Droq id (%d) not available.\n", q_no);
822 		return (0);
823 	}
824 
825 	mtx_lock(&droq->lock);
826 
827 	droq->ops.fptr = NULL;
828 	droq->ops.farg = NULL;
829 	droq->ops.drop_on_max = 0;
830 
831 	mtx_unlock(&droq->lock);
832 
833 	return (0);
834 }
835 
836 int
837 lio_create_droq(struct octeon_device *oct, uint32_t q_no, uint32_t num_descs,
838 		uint32_t desc_size, void *app_ctx)
839 {
840 
841 	if (oct->droq[q_no]->oct_dev != NULL) {
842 		lio_dev_dbg(oct, "Droq already in use. Cannot create droq %d again\n",
843 			    q_no);
844 		return (1);
845 	}
846 
847 	/* Initialize the Droq */
848 	if (lio_init_droq(oct, q_no, num_descs, desc_size, app_ctx)) {
849 		bzero(oct->droq[q_no], sizeof(struct lio_droq));
850 		goto create_droq_fail;
851 	}
852 
853 	oct->num_oqs++;
854 
855 	lio_dev_dbg(oct, "%s: Total number of OQ: %d\n", __func__,
856 		    oct->num_oqs);
857 
858 	/* Global Droq register settings */
859 
860 	/*
861 	 * As of now not required, as setting are done for all 32 Droqs at
862 	 * the same time.
863 	 */
864 	return (0);
865 
866 create_droq_fail:
867 	return (-ENOMEM);
868 }
869