xref: /illumos-gate/usr/src/uts/common/io/mac/mac_soft_ring.c (revision a63fed2a0384be5aa3f2ff7a38aac1153c549e87)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  * Copyright 2017 Joyent, Inc.
25  */
26 
27 /*
28  * General Soft rings - Simulating Rx rings in S/W.
29  *
30  * Soft ring is a data abstraction containing a queue and a worker
31  * thread and represents a hardware Rx ring in software. Each soft
32  * ring set can have a collection of soft rings for separating
33  * L3/L4 specific traffic (IPv4 from IPv6 or TCP from UDP) or for
34  * allowing a higher degree of parallelism by sending traffic to
35  * one of the soft rings for a SRS (using a hash on src IP or port).
36  * Each soft ring worker thread can be bound to a different CPU
37  * allowing the processing for each soft ring to happen in parallel
38  * and independent from each other.
39  *
40  * Protocol soft rings:
41  *
42  * Each SRS has at an minimum 3 softrings. One each for IPv4 TCP,
43  * IPv4 UDP and rest (OTH - for IPv6 and everything else). The
44  * SRS does dynamic polling and enforces link level bandwidth but
45  * it does so for all traffic (IPv4 and IPv6 and all protocols) on
46  * that link. However, each protocol layer wants a different
47  * behaviour. For instance IPv4 TCP has per CPU squeues which
48  * enforce their own polling and flow control so IPv4 TCP traffic
49  * needs to go to a separate soft ring which can be polled by the
50  * TCP squeue. It also allows TCP squeue to push back flow control
51  * all the way to NIC hardware (if it puts its corresponding soft
52  * ring in the poll mode and soft ring queue builds up, the
53  * shared srs_poll_pkt_cnt goes up and SRS automatically stops
54  * more packets from entering the system).
55  *
56  * Similarly, the UDP benefits from a DLS bypass and packet chaining
57  * so sending it to a separate soft ring is desired. All the rest of
58  * the traffic (including IPv6 is sent to OTH softring). The IPv6
59  * traffic current goes through OTH softring and via DLS because
60  * it need more processing to be done. Irrespective of the sap
61  * (IPv4 or IPv6) or the transport, the dynamic polling, B/W enforcement,
62  * cpu assignment, fanout, etc apply to all traffic since they
63  * are implement by the SRS which is agnostic to sap or transport.
64  *
65  * Fanout soft rings:
66  *
67  * On a multithreaded system, we can assign more CPU and multi thread
68  * the stack by creating a soft ring per CPU and spreading traffic
69  * based on a hash computed on src IP etc. Since we still need to
70  * keep the protocol separation, we create a set of 3 soft ring per
71  * CPU (specified by cpu list or degree of fanout).
72  *
73  * NOTE: See the block level comment on top of mac_sched.c
74  */
75 
76 #include <sys/types.h>
77 #include <sys/callb.h>
78 #include <sys/sdt.h>
79 #include <sys/strsubr.h>
80 #include <sys/strsun.h>
81 #include <sys/vlan.h>
82 #include <inet/ipsec_impl.h>
83 #include <inet/ip_impl.h>
84 #include <inet/sadb.h>
85 #include <inet/ipsecesp.h>
86 #include <inet/ipsecah.h>
87 
88 #include <sys/mac_impl.h>
89 #include <sys/mac_client_impl.h>
90 #include <sys/mac_soft_ring.h>
91 #include <sys/mac_flow_impl.h>
92 #include <sys/mac_stat.h>
93 
94 static void mac_rx_soft_ring_drain(mac_soft_ring_t *);
95 static void mac_soft_ring_fire(void *);
96 static void mac_soft_ring_worker(mac_soft_ring_t *);
97 static void mac_tx_soft_ring_drain(mac_soft_ring_t *);
98 
99 uint32_t mac_tx_soft_ring_max_q_cnt = 100000;
100 uint32_t mac_tx_soft_ring_hiwat = 1000;
101 
102 extern kmem_cache_t *mac_soft_ring_cache;
103 
104 #define	ADD_SOFTRING_TO_SET(mac_srs, softring) {			\
105 	if (mac_srs->srs_soft_ring_head == NULL) {			\
106 		mac_srs->srs_soft_ring_head = softring;			\
107 		mac_srs->srs_soft_ring_tail = softring;			\
108 	} else {							\
109 		/* ADD to the list */					\
110 		softring->s_ring_prev =					\
111 			mac_srs->srs_soft_ring_tail;			\
112 		mac_srs->srs_soft_ring_tail->s_ring_next = softring;	\
113 		mac_srs->srs_soft_ring_tail = softring;			\
114 	}								\
115 	mac_srs->srs_soft_ring_count++;					\
116 }
117 
118 /*
119  * mac_soft_ring_worker_wakeup
120  *
121  * Wake up the soft ring worker thread to process the queue as long
122  * as no one else is processing it and upper layer (client) is still
123  * ready to receive packets.
124  */
125 void
126 mac_soft_ring_worker_wakeup(mac_soft_ring_t *ringp)
127 {
128 	ASSERT(MUTEX_HELD(&ringp->s_ring_lock));
129 	if (!(ringp->s_ring_state & S_RING_PROC) &&
130 	    !(ringp->s_ring_state & S_RING_BLANK) &&
131 	    (ringp->s_ring_tid == NULL)) {
132 		if (ringp->s_ring_wait != 0) {
133 			ringp->s_ring_tid =
134 			    timeout(mac_soft_ring_fire, ringp,
135 			    ringp->s_ring_wait);
136 		} else {
137 			/* Schedule the worker thread. */
138 			cv_signal(&ringp->s_ring_async);
139 		}
140 	}
141 }
142 
143 /*
144  * mac_soft_ring_create
145  *
146  * Create a soft ring, do the necessary setup and bind the worker
147  * thread to the assigned CPU.
148  */
149 mac_soft_ring_t *
150 mac_soft_ring_create(int id, clock_t wait, uint16_t type,
151     pri_t pri, mac_client_impl_t *mcip, mac_soft_ring_set_t *mac_srs,
152     processorid_t cpuid, mac_direct_rx_t rx_func, void *x_arg1,
153     mac_resource_handle_t x_arg2)
154 {
155 	mac_soft_ring_t		*ringp;
156 	char			name[S_RING_NAMELEN];
157 
158 	bzero(name, 64);
159 	ringp = kmem_cache_alloc(mac_soft_ring_cache, KM_SLEEP);
160 
161 	if (type & ST_RING_TCP) {
162 		(void) snprintf(name, sizeof (name),
163 		    "mac_tcp_soft_ring_%d_%p", id, (void *)mac_srs);
164 	} else if (type & ST_RING_UDP) {
165 		(void) snprintf(name, sizeof (name),
166 		    "mac_udp_soft_ring_%d_%p", id, (void *)mac_srs);
167 	} else if (type & ST_RING_OTH) {
168 		(void) snprintf(name, sizeof (name),
169 		    "mac_oth_soft_ring_%d_%p", id, (void *)mac_srs);
170 	} else {
171 		ASSERT(type & ST_RING_TX);
172 		(void) snprintf(name, sizeof (name),
173 		    "mac_tx_soft_ring_%d_%p", id, (void *)mac_srs);
174 	}
175 
176 	bzero(ringp, sizeof (mac_soft_ring_t));
177 	(void) strncpy(ringp->s_ring_name, name, S_RING_NAMELEN + 1);
178 	ringp->s_ring_name[S_RING_NAMELEN] = '\0';
179 	mutex_init(&ringp->s_ring_lock, NULL, MUTEX_DEFAULT, NULL);
180 	ringp->s_ring_notify_cb_info.mcbi_lockp = &ringp->s_ring_lock;
181 
182 	ringp->s_ring_type = type;
183 	ringp->s_ring_wait = MSEC_TO_TICK(wait);
184 	ringp->s_ring_mcip = mcip;
185 	ringp->s_ring_set = mac_srs;
186 
187 	/*
188 	 * Protect against access from DR callbacks (mac_walk_srs_bind/unbind)
189 	 * which can't grab the mac perimeter
190 	 */
191 	mutex_enter(&mac_srs->srs_lock);
192 	ADD_SOFTRING_TO_SET(mac_srs, ringp);
193 	mutex_exit(&mac_srs->srs_lock);
194 
195 	/*
196 	 * set the bind CPU to -1 to indicate
197 	 * no thread affinity set
198 	 */
199 	ringp->s_ring_cpuid = ringp->s_ring_cpuid_save = -1;
200 	ringp->s_ring_worker = thread_create(NULL, 0,
201 	    mac_soft_ring_worker, ringp, 0, &p0, TS_RUN, pri);
202 	if (type & ST_RING_TX) {
203 		ringp->s_ring_drain_func = mac_tx_soft_ring_drain;
204 		ringp->s_ring_tx_arg1 = x_arg1;
205 		ringp->s_ring_tx_arg2 = x_arg2;
206 		ringp->s_ring_tx_max_q_cnt = mac_tx_soft_ring_max_q_cnt;
207 		ringp->s_ring_tx_hiwat =
208 		    (mac_tx_soft_ring_hiwat > mac_tx_soft_ring_max_q_cnt) ?
209 		    mac_tx_soft_ring_max_q_cnt : mac_tx_soft_ring_hiwat;
210 		if (mcip->mci_state_flags & MCIS_IS_AGGR) {
211 			mac_srs_tx_t *tx = &mac_srs->srs_tx;
212 
213 			ASSERT(tx->st_soft_rings[
214 			    ((mac_ring_t *)x_arg2)->mr_index] == NULL);
215 			tx->st_soft_rings[((mac_ring_t *)x_arg2)->mr_index] =
216 			    ringp;
217 		}
218 	} else {
219 		ringp->s_ring_drain_func = mac_rx_soft_ring_drain;
220 		ringp->s_ring_rx_func = rx_func;
221 		ringp->s_ring_rx_arg1 = x_arg1;
222 		ringp->s_ring_rx_arg2 = x_arg2;
223 		if (mac_srs->srs_state & SRS_SOFTRING_QUEUE)
224 			ringp->s_ring_type |= ST_RING_WORKER_ONLY;
225 	}
226 	if (cpuid != -1)
227 		(void) mac_soft_ring_bind(ringp, cpuid);
228 
229 	mac_soft_ring_stat_create(ringp);
230 
231 	return (ringp);
232 }
233 
234 /*
235  * mac_soft_ring_free
236  *
237  * Free the soft ring once we are done with it.
238  */
239 void
240 mac_soft_ring_free(mac_soft_ring_t *softring)
241 {
242 	ASSERT((softring->s_ring_state &
243 	    (S_RING_CONDEMNED | S_RING_CONDEMNED_DONE | S_RING_PROC)) ==
244 	    (S_RING_CONDEMNED | S_RING_CONDEMNED_DONE));
245 	mac_pkt_drop(NULL, NULL, softring->s_ring_first, B_FALSE);
246 	softring->s_ring_tx_arg2 = NULL;
247 	mac_soft_ring_stat_delete(softring);
248 	mac_callback_free(softring->s_ring_notify_cb_list);
249 	kmem_cache_free(mac_soft_ring_cache, softring);
250 }
251 
252 int mac_soft_ring_thread_bind = 1;
253 
254 /*
255  * mac_soft_ring_bind
256  *
257  * Bind a soft ring worker thread to supplied CPU.
258  */
259 cpu_t *
260 mac_soft_ring_bind(mac_soft_ring_t *ringp, processorid_t cpuid)
261 {
262 	cpu_t *cp;
263 	boolean_t clear = B_FALSE;
264 
265 	ASSERT(MUTEX_HELD(&cpu_lock));
266 
267 	if (mac_soft_ring_thread_bind == 0) {
268 		DTRACE_PROBE1(mac__soft__ring__no__cpu__bound,
269 		    mac_soft_ring_t *, ringp);
270 		return (NULL);
271 	}
272 
273 	cp = cpu_get(cpuid);
274 	if (cp == NULL || !cpu_is_online(cp))
275 		return (NULL);
276 
277 	mutex_enter(&ringp->s_ring_lock);
278 	ringp->s_ring_state |= S_RING_BOUND;
279 	if (ringp->s_ring_cpuid != -1)
280 		clear = B_TRUE;
281 	ringp->s_ring_cpuid = cpuid;
282 	mutex_exit(&ringp->s_ring_lock);
283 
284 	if (clear)
285 		thread_affinity_clear(ringp->s_ring_worker);
286 
287 	DTRACE_PROBE2(mac__soft__ring__cpu__bound, mac_soft_ring_t *,
288 	    ringp, processorid_t, cpuid);
289 
290 	thread_affinity_set(ringp->s_ring_worker, cpuid);
291 
292 	return (cp);
293 }
294 
295 /*
296  * mac_soft_ring_unbind
297  *
298  * Un Bind a soft ring worker thread.
299  */
300 void
301 mac_soft_ring_unbind(mac_soft_ring_t *ringp)
302 {
303 	ASSERT(MUTEX_HELD(&cpu_lock));
304 
305 	mutex_enter(&ringp->s_ring_lock);
306 	if (!(ringp->s_ring_state & S_RING_BOUND)) {
307 		ASSERT(ringp->s_ring_cpuid == -1);
308 		mutex_exit(&ringp->s_ring_lock);
309 		return;
310 	}
311 
312 	ringp->s_ring_cpuid = -1;
313 	ringp->s_ring_state &= ~S_RING_BOUND;
314 	thread_affinity_clear(ringp->s_ring_worker);
315 	mutex_exit(&ringp->s_ring_lock);
316 }
317 
318 /*
319  * PRIVATE FUNCTIONS
320  */
321 
322 static void
323 mac_soft_ring_fire(void *arg)
324 {
325 	mac_soft_ring_t	*ringp = arg;
326 
327 	mutex_enter(&ringp->s_ring_lock);
328 	if (ringp->s_ring_tid == NULL) {
329 		mutex_exit(&ringp->s_ring_lock);
330 		return;
331 	}
332 
333 	ringp->s_ring_tid = NULL;
334 
335 	if (!(ringp->s_ring_state & S_RING_PROC)) {
336 		cv_signal(&ringp->s_ring_async);
337 	}
338 	mutex_exit(&ringp->s_ring_lock);
339 }
340 
341 /*
342  * mac_rx_soft_ring_drain
343  *
344  * Called when worker thread model (ST_RING_WORKER_ONLY) of processing
345  * incoming packets is used. s_ring_first contain the queued packets.
346  * s_ring_rx_func contains the upper level (client) routine where the
347  * packets are destined and s_ring_rx_arg1/s_ring_rx_arg2 are the
348  * cookie meant for the client.
349  */
350 /* ARGSUSED */
351 static void
352 mac_rx_soft_ring_drain(mac_soft_ring_t *ringp)
353 {
354 	mblk_t		*mp;
355 	void		*arg1;
356 	mac_resource_handle_t arg2;
357 	timeout_id_t	tid;
358 	mac_direct_rx_t	proc;
359 	size_t		sz;
360 	int		cnt;
361 	mac_soft_ring_set_t	*mac_srs = ringp->s_ring_set;
362 
363 	ringp->s_ring_run = curthread;
364 	ASSERT(mutex_owned(&ringp->s_ring_lock));
365 	ASSERT(!(ringp->s_ring_state & S_RING_PROC));
366 
367 	if ((tid = ringp->s_ring_tid) != NULL)
368 		ringp->s_ring_tid = NULL;
369 
370 	ringp->s_ring_state |= S_RING_PROC;
371 
372 	proc = ringp->s_ring_rx_func;
373 	arg1 = ringp->s_ring_rx_arg1;
374 	arg2 = ringp->s_ring_rx_arg2;
375 
376 	while ((ringp->s_ring_first != NULL) &&
377 	    !(ringp->s_ring_state & S_RING_PAUSE)) {
378 		mp = ringp->s_ring_first;
379 		ringp->s_ring_first = NULL;
380 		ringp->s_ring_last = NULL;
381 		cnt = ringp->s_ring_count;
382 		ringp->s_ring_count = 0;
383 		sz = ringp->s_ring_size;
384 		ringp->s_ring_size = 0;
385 		mutex_exit(&ringp->s_ring_lock);
386 
387 		if (tid != NULL) {
388 			(void) untimeout(tid);
389 			tid = NULL;
390 		}
391 
392 		(*proc)(arg1, arg2, mp, NULL);
393 
394 		/*
395 		 * If we have a soft ring set which is doing
396 		 * bandwidth control, we need to decrement its
397 		 * srs_size so it can have a accurate idea of
398 		 * what is the real data queued between SRS and
399 		 * its soft rings. We decrement the size for a
400 		 * packet only when it gets processed by both
401 		 * SRS and the soft ring.
402 		 */
403 		mutex_enter(&mac_srs->srs_lock);
404 		MAC_UPDATE_SRS_COUNT_LOCKED(mac_srs, cnt);
405 		MAC_UPDATE_SRS_SIZE_LOCKED(mac_srs, sz);
406 		mutex_exit(&mac_srs->srs_lock);
407 
408 		mutex_enter(&ringp->s_ring_lock);
409 	}
410 	ringp->s_ring_state &= ~S_RING_PROC;
411 	if (ringp->s_ring_state & S_RING_CLIENT_WAIT)
412 		cv_signal(&ringp->s_ring_client_cv);
413 	ringp->s_ring_run = NULL;
414 }
415 
416 /*
417  * mac_soft_ring_worker
418  *
419  * The soft ring worker routine to process any queued packets. In
420  * normal case, the worker thread is bound to a CPU. It the soft
421  * ring is dealing with TCP packets, then the worker thread will
422  * be bound to the same CPU as the TCP squeue.
423  */
424 static void
425 mac_soft_ring_worker(mac_soft_ring_t *ringp)
426 {
427 	kmutex_t *lock = &ringp->s_ring_lock;
428 	kcondvar_t *async = &ringp->s_ring_async;
429 	mac_soft_ring_set_t *srs = ringp->s_ring_set;
430 	callb_cpr_t cprinfo;
431 
432 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "mac_soft_ring");
433 	mutex_enter(lock);
434 start:
435 	for (;;) {
436 		while (((ringp->s_ring_first == NULL ||
437 		    (ringp->s_ring_state & (S_RING_BLOCK|S_RING_BLANK))) &&
438 		    !(ringp->s_ring_state & S_RING_PAUSE)) ||
439 		    (ringp->s_ring_state & S_RING_PROC)) {
440 
441 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
442 			cv_wait(async, lock);
443 			CALLB_CPR_SAFE_END(&cprinfo, lock);
444 		}
445 
446 		/*
447 		 * Either we have work to do, or we have been asked to
448 		 * shutdown temporarily or permanently
449 		 */
450 		if (ringp->s_ring_state & S_RING_PAUSE)
451 			goto done;
452 
453 		ringp->s_ring_drain_func(ringp);
454 	}
455 done:
456 	mutex_exit(lock);
457 	mutex_enter(&srs->srs_lock);
458 	mutex_enter(lock);
459 
460 	ringp->s_ring_state |= S_RING_QUIESCE_DONE;
461 	if (!(ringp->s_ring_state & S_RING_CONDEMNED)) {
462 		srs->srs_soft_ring_quiesced_count++;
463 		cv_broadcast(&srs->srs_async);
464 		mutex_exit(&srs->srs_lock);
465 		while (!(ringp->s_ring_state &
466 		    (S_RING_RESTART | S_RING_CONDEMNED)))
467 			cv_wait(&ringp->s_ring_async, &ringp->s_ring_lock);
468 		mutex_exit(lock);
469 		mutex_enter(&srs->srs_lock);
470 		mutex_enter(lock);
471 		srs->srs_soft_ring_quiesced_count--;
472 		if (ringp->s_ring_state & S_RING_RESTART) {
473 			ASSERT(!(ringp->s_ring_state & S_RING_CONDEMNED));
474 			ringp->s_ring_state &= ~(S_RING_RESTART |
475 			    S_RING_QUIESCE | S_RING_QUIESCE_DONE);
476 			cv_broadcast(&srs->srs_async);
477 			mutex_exit(&srs->srs_lock);
478 			goto start;
479 		}
480 	}
481 	ASSERT(ringp->s_ring_state & S_RING_CONDEMNED);
482 	ringp->s_ring_state |= S_RING_CONDEMNED_DONE;
483 	CALLB_CPR_EXIT(&cprinfo);
484 	srs->srs_soft_ring_condemned_count++;
485 	cv_broadcast(&srs->srs_async);
486 	mutex_exit(&srs->srs_lock);
487 	thread_exit();
488 }
489 
490 /*
491  * mac_soft_ring_intr_enable and mac_soft_ring_intr_disable
492  *
493  * these functions are called to toggle the sending of packets to the
494  * client. They are called by the client. the client gets the name
495  * of these routine and corresponding cookie (pointing to softring)
496  * during capability negotiation at setup time.
497  *
498  * Enabling is allow the processing thread to send packets to the
499  * client while disabling does the opposite.
500  */
501 int
502 mac_soft_ring_intr_enable(void *arg)
503 {
504 	mac_soft_ring_t *ringp = (mac_soft_ring_t *)arg;
505 	mutex_enter(&ringp->s_ring_lock);
506 	ringp->s_ring_state &= ~S_RING_BLANK;
507 	if (ringp->s_ring_first != NULL)
508 		mac_soft_ring_worker_wakeup(ringp);
509 	mutex_exit(&ringp->s_ring_lock);
510 	return (0);
511 }
512 
513 boolean_t
514 mac_soft_ring_intr_disable(void *arg)
515 {
516 	mac_soft_ring_t *ringp = (mac_soft_ring_t *)arg;
517 	boolean_t sring_blanked = B_FALSE;
518 	/*
519 	 * Stop worker thread from sending packets above.
520 	 * Squeue will poll soft ring when it needs packets.
521 	 */
522 	mutex_enter(&ringp->s_ring_lock);
523 	if (!(ringp->s_ring_state & S_RING_PROC)) {
524 		ringp->s_ring_state |= S_RING_BLANK;
525 		sring_blanked = B_TRUE;
526 	}
527 	mutex_exit(&ringp->s_ring_lock);
528 	return (sring_blanked);
529 }
530 
531 /*
532  * mac_soft_ring_poll
533  *
534  * This routine is called by the client to poll for packets from
535  * the soft ring. The function name and cookie corresponding to
536  * the soft ring is exchanged during capability negotiation during
537  * setup.
538  */
539 mblk_t *
540 mac_soft_ring_poll(mac_soft_ring_t *ringp, size_t bytes_to_pickup)
541 {
542 	mblk_t	*head, *tail;
543 	mblk_t	*mp;
544 	size_t	sz = 0;
545 	int	cnt = 0;
546 	mac_soft_ring_set_t	*mac_srs = ringp->s_ring_set;
547 
548 	ASSERT(mac_srs != NULL);
549 
550 	mutex_enter(&ringp->s_ring_lock);
551 	head = tail = mp = ringp->s_ring_first;
552 	if (head == NULL) {
553 		mutex_exit(&ringp->s_ring_lock);
554 		return (NULL);
555 	}
556 
557 	if (ringp->s_ring_size <= bytes_to_pickup) {
558 		head = ringp->s_ring_first;
559 		ringp->s_ring_first = NULL;
560 		ringp->s_ring_last = NULL;
561 		cnt = ringp->s_ring_count;
562 		ringp->s_ring_count = 0;
563 		sz = ringp->s_ring_size;
564 		ringp->s_ring_size = 0;
565 	} else {
566 		while (mp && sz <= bytes_to_pickup) {
567 			sz += msgdsize(mp);
568 			cnt++;
569 			tail = mp;
570 			mp = mp->b_next;
571 		}
572 		ringp->s_ring_count -= cnt;
573 		ringp->s_ring_size -= sz;
574 		tail->b_next = NULL;
575 		if (mp == NULL) {
576 			ringp->s_ring_first = NULL;
577 			ringp->s_ring_last = NULL;
578 			ASSERT(ringp->s_ring_count == 0);
579 		} else {
580 			ringp->s_ring_first = mp;
581 		}
582 	}
583 
584 	mutex_exit(&ringp->s_ring_lock);
585 	/*
586 	 * Update the shared count and size counters so
587 	 * that SRS has a accurate idea of queued packets.
588 	 */
589 	mutex_enter(&mac_srs->srs_lock);
590 	MAC_UPDATE_SRS_COUNT_LOCKED(mac_srs, cnt);
591 	MAC_UPDATE_SRS_SIZE_LOCKED(mac_srs, sz);
592 	mutex_exit(&mac_srs->srs_lock);
593 	return (head);
594 }
595 
596 /*
597  * mac_soft_ring_dls_bypass
598  *
599  * Enable direct client (IP) callback function from the softrings.
600  * Callers need to make sure they don't need any DLS layer processing
601  */
602 void
603 mac_soft_ring_dls_bypass(void *arg, mac_direct_rx_t rx_func, void *rx_arg1)
604 {
605 	mac_soft_ring_t		*softring = arg;
606 	mac_soft_ring_set_t	*srs;
607 
608 	ASSERT(rx_func != NULL);
609 
610 	mutex_enter(&softring->s_ring_lock);
611 	softring->s_ring_rx_func = rx_func;
612 	softring->s_ring_rx_arg1 = rx_arg1;
613 	mutex_exit(&softring->s_ring_lock);
614 
615 	srs = softring->s_ring_set;
616 	mutex_enter(&srs->srs_lock);
617 	srs->srs_type |= SRST_DLS_BYPASS;
618 	mutex_exit(&srs->srs_lock);
619 }
620 
621 /*
622  * mac_soft_ring_signal
623  *
624  * Typically used to set the soft ring state to QUIESCE, CONDEMNED, or
625  * RESTART.
626  *
627  * In the Rx side, the quiescing is done bottom up. After the Rx upcalls
628  * from the driver are done, then the Rx SRS is quiesced and only then can
629  * we signal the soft rings. Thus this function can't be called arbitrarily
630  * without satisfying the prerequisites. On the Tx side, the threads from
631  * top need to quiesced, then the Tx SRS and only then can we signal the
632  * Tx soft rings.
633  */
634 void
635 mac_soft_ring_signal(mac_soft_ring_t *softring, uint_t sr_flag)
636 {
637 	mutex_enter(&softring->s_ring_lock);
638 	softring->s_ring_state |= sr_flag;
639 	cv_signal(&softring->s_ring_async);
640 	mutex_exit(&softring->s_ring_lock);
641 }
642 
643 /*
644  * mac_tx_soft_ring_drain
645  *
646  * The transmit side drain routine in case the soft ring was being
647  * used to transmit packets.
648  */
649 static void
650 mac_tx_soft_ring_drain(mac_soft_ring_t *ringp)
651 {
652 	mblk_t			*mp;
653 	void			*arg1;
654 	void			*arg2;
655 	mblk_t			*tail;
656 	uint_t			saved_pkt_count, saved_size;
657 	mac_tx_stats_t		stats;
658 	mac_soft_ring_set_t	*mac_srs = ringp->s_ring_set;
659 
660 	saved_pkt_count = saved_size = 0;
661 	ringp->s_ring_run = curthread;
662 	ASSERT(mutex_owned(&ringp->s_ring_lock));
663 	ASSERT(!(ringp->s_ring_state & S_RING_PROC));
664 
665 	ringp->s_ring_state |= S_RING_PROC;
666 	arg1 = ringp->s_ring_tx_arg1;
667 	arg2 = ringp->s_ring_tx_arg2;
668 
669 	while (ringp->s_ring_first != NULL) {
670 		mp = ringp->s_ring_first;
671 		tail = ringp->s_ring_last;
672 		saved_pkt_count = ringp->s_ring_count;
673 		saved_size = ringp->s_ring_size;
674 		ringp->s_ring_first = NULL;
675 		ringp->s_ring_last = NULL;
676 		ringp->s_ring_count = 0;
677 		ringp->s_ring_size = 0;
678 		mutex_exit(&ringp->s_ring_lock);
679 
680 		mp = mac_tx_send(arg1, arg2, mp, &stats);
681 
682 		mutex_enter(&ringp->s_ring_lock);
683 		if (mp != NULL) {
684 			/* Device out of tx desc, set block */
685 			tail->b_next = ringp->s_ring_first;
686 			ringp->s_ring_first = mp;
687 			ringp->s_ring_count +=
688 			    (saved_pkt_count - stats.mts_opackets);
689 			ringp->s_ring_size += (saved_size - stats.mts_obytes);
690 			if (ringp->s_ring_last == NULL)
691 				ringp->s_ring_last = tail;
692 
693 			if (ringp->s_ring_tx_woken_up) {
694 				ringp->s_ring_tx_woken_up = B_FALSE;
695 			} else {
696 				ringp->s_ring_state |= S_RING_BLOCK;
697 				ringp->s_st_stat.mts_blockcnt++;
698 			}
699 
700 			ringp->s_ring_state &= ~S_RING_PROC;
701 			ringp->s_ring_run = NULL;
702 			return;
703 		} else {
704 			ringp->s_ring_tx_woken_up = B_FALSE;
705 			SRS_TX_STATS_UPDATE(mac_srs, &stats);
706 			SOFTRING_TX_STATS_UPDATE(ringp, &stats);
707 		}
708 	}
709 
710 	if (ringp->s_ring_count == 0 && ringp->s_ring_state &
711 	    (S_RING_TX_HIWAT | S_RING_WAKEUP_CLIENT | S_RING_ENQUEUED)) {
712 		mac_client_impl_t *mcip =  ringp->s_ring_mcip;
713 		boolean_t wakeup_required = B_FALSE;
714 
715 		if (ringp->s_ring_state &
716 		    (S_RING_TX_HIWAT|S_RING_WAKEUP_CLIENT)) {
717 			wakeup_required = B_TRUE;
718 		}
719 		ringp->s_ring_state &=
720 		    ~(S_RING_TX_HIWAT | S_RING_WAKEUP_CLIENT | S_RING_ENQUEUED);
721 		mutex_exit(&ringp->s_ring_lock);
722 		if (wakeup_required) {
723 			mac_tx_invoke_callbacks(mcip, (mac_tx_cookie_t)ringp);
724 			/*
725 			 * If the client is not the primary MAC client, then we
726 			 * need to send the notification to the clients upper
727 			 * MAC, i.e. mci_upper_mip.
728 			 */
729 			mac_tx_notify(mcip->mci_upper_mip != NULL ?
730 			    mcip->mci_upper_mip : mcip->mci_mip);
731 		}
732 		mutex_enter(&ringp->s_ring_lock);
733 	}
734 	ringp->s_ring_state &= ~S_RING_PROC;
735 	ringp->s_ring_run = NULL;
736 }
737