xref: /illumos-gate/usr/src/uts/common/io/mac/mac_soft_ring.c (revision 08e8465ea9de8f93d6ca817333b2ea217df7e3b2)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 /*
27  * General Soft rings - Simulating Rx rings in S/W.
28  *
29  * Soft ring is a data abstraction containing a queue and a worker
30  * thread and represents a hardware Rx ring in software. Each soft
31  * ring set can have a collection of soft rings for separating
32  * L3/L4 specific traffic (IPv4 from IPv6 or TCP from UDP) or for
33  * allowing a higher degree of parallelism by sending traffic to
34  * one of the soft rings for a SRS (using a hash on src IP or port).
35  * Each soft ring worker thread can be bound to a different CPU
36  * allowing the processing for each soft ring to happen in parallel
37  * and independent from each other.
38  *
39  * Protocol soft rings:
40  *
41  * Each SRS has at an minimum 3 softrings. One each for IPv4 TCP,
42  * IPv4 UDP and rest (OTH - for IPv6 and everything else). The
43  * SRS does dynamic polling and enforces link level bandwidth but
44  * it does so for all traffic (IPv4 and IPv6 and all protocols) on
45  * that link. However, each protocol layer wants a different
46  * behaviour. For instance IPv4 TCP has per CPU squeues which
47  * enforce their own polling and flow control so IPv4 TCP traffic
48  * needs to go to a separate soft ring which can be polled by the
49  * TCP squeue. It also allows TCP squeue to push back flow control
50  * all the way to NIC hardware (if it puts its corresponding soft
51  * ring in the poll mode and soft ring queue builds up, the
52  * shared srs_poll_pkt_cnt goes up and SRS automatically stops
53  * more packets from entering the system).
54  *
55  * Similarly, the UDP benefits from a DLS bypass and packet chaining
56  * so sending it to a separate soft ring is desired. All the rest of
57  * the traffic (including IPv6 is sent to OTH softring). The IPv6
58  * traffic current goes through OTH softring and via DLS because
59  * it need more processing to be done. Irrespective of the sap
60  * (IPv4 or IPv6) or the transport, the dynamic polling, B/W enforcement,
61  * cpu assignment, fanout, etc apply to all traffic since they
62  * are implement by the SRS which is agnostic to sap or transport.
63  *
64  * Fanout soft rings:
65  *
66  * On a multithreaded system, we can assign more CPU and multi thread
67  * the stack by creating a soft ring per CPU and spreading traffic
68  * based on a hash computed on src IP etc. Since we still need to
69  * keep the protocol separation, we create a set of 3 soft ring per
70  * CPU (specified by cpu list or degree of fanout).
71  *
72  * NOTE: See the block level comment on top of mac_sched.c
73  */
74 
75 #include <sys/types.h>
76 #include <sys/callb.h>
77 #include <sys/sdt.h>
78 #include <sys/strsubr.h>
79 #include <sys/strsun.h>
80 #include <sys/vlan.h>
81 #include <inet/ipsec_impl.h>
82 #include <inet/ip_impl.h>
83 #include <inet/sadb.h>
84 #include <inet/ipsecesp.h>
85 #include <inet/ipsecah.h>
86 
87 #include <sys/mac_impl.h>
88 #include <sys/mac_client_impl.h>
89 #include <sys/mac_soft_ring.h>
90 #include <sys/mac_flow_impl.h>
91 #include <sys/mac_stat.h>
92 
93 static void mac_rx_soft_ring_drain(mac_soft_ring_t *);
94 static void mac_soft_ring_fire(void *);
95 static void mac_soft_ring_worker(mac_soft_ring_t *);
96 static void mac_tx_soft_ring_drain(mac_soft_ring_t *);
97 
98 uint32_t mac_tx_soft_ring_max_q_cnt = 100000;
99 uint32_t mac_tx_soft_ring_hiwat = 1000;
100 
101 extern kmem_cache_t *mac_soft_ring_cache;
102 
103 #define	ADD_SOFTRING_TO_SET(mac_srs, softring) {			\
104 	if (mac_srs->srs_soft_ring_head == NULL) {			\
105 		mac_srs->srs_soft_ring_head = softring;			\
106 		mac_srs->srs_soft_ring_tail = softring;			\
107 	} else {							\
108 		/* ADD to the list */					\
109 		softring->s_ring_prev =					\
110 			mac_srs->srs_soft_ring_tail;			\
111 		mac_srs->srs_soft_ring_tail->s_ring_next = softring;	\
112 		mac_srs->srs_soft_ring_tail = softring;			\
113 	}								\
114 	mac_srs->srs_soft_ring_count++;					\
115 }
116 
117 /*
118  * mac_soft_ring_worker_wakeup
119  *
120  * Wake up the soft ring worker thread to process the queue as long
121  * as no one else is processing it and upper layer (client) is still
122  * ready to receive packets.
123  */
124 void
125 mac_soft_ring_worker_wakeup(mac_soft_ring_t *ringp)
126 {
127 	ASSERT(MUTEX_HELD(&ringp->s_ring_lock));
128 	if (!(ringp->s_ring_state & S_RING_PROC) &&
129 	    !(ringp->s_ring_state & S_RING_BLANK) &&
130 	    (ringp->s_ring_tid == NULL)) {
131 		if (ringp->s_ring_wait != 0) {
132 			ringp->s_ring_tid =
133 			    timeout(mac_soft_ring_fire, ringp,
134 			    ringp->s_ring_wait);
135 		} else {
136 			/* Schedule the worker thread. */
137 			cv_signal(&ringp->s_ring_async);
138 		}
139 	}
140 }
141 
142 /*
143  * mac_soft_ring_create
144  *
145  * Create a soft ring, do the necessary setup and bind the worker
146  * thread to the assigned CPU.
147  */
148 mac_soft_ring_t *
149 mac_soft_ring_create(int id, clock_t wait, uint16_t type,
150     pri_t pri, mac_client_impl_t *mcip, mac_soft_ring_set_t *mac_srs,
151     processorid_t cpuid, mac_direct_rx_t rx_func, void *x_arg1,
152     mac_resource_handle_t x_arg2)
153 {
154 	mac_soft_ring_t 	*ringp;
155 	char 			name[S_RING_NAMELEN];
156 
157 	bzero(name, 64);
158 	ringp = kmem_cache_alloc(mac_soft_ring_cache, KM_SLEEP);
159 
160 	if (type & ST_RING_TCP) {
161 		(void) snprintf(name, sizeof (name),
162 		    "mac_tcp_soft_ring_%d_%p", id, (void *)mac_srs);
163 	} else if (type & ST_RING_UDP) {
164 		(void) snprintf(name, sizeof (name),
165 		    "mac_udp_soft_ring_%d_%p", id, (void *)mac_srs);
166 	} else if (type & ST_RING_OTH) {
167 		(void) snprintf(name, sizeof (name),
168 		    "mac_oth_soft_ring_%d_%p", id, (void *)mac_srs);
169 	} else {
170 		ASSERT(type & ST_RING_TX);
171 		(void) snprintf(name, sizeof (name),
172 		    "mac_tx_soft_ring_%d_%p", id, (void *)mac_srs);
173 	}
174 
175 	bzero(ringp, sizeof (mac_soft_ring_t));
176 	(void) strncpy(ringp->s_ring_name, name, S_RING_NAMELEN + 1);
177 	ringp->s_ring_name[S_RING_NAMELEN] = '\0';
178 	mutex_init(&ringp->s_ring_lock, NULL, MUTEX_DEFAULT, NULL);
179 	ringp->s_ring_notify_cb_info.mcbi_lockp = &ringp->s_ring_lock;
180 
181 	ringp->s_ring_type = type;
182 	ringp->s_ring_wait = MSEC_TO_TICK(wait);
183 	ringp->s_ring_mcip = mcip;
184 	ringp->s_ring_set = mac_srs;
185 
186 	/*
187 	 * Protect against access from DR callbacks (mac_walk_srs_bind/unbind)
188 	 * which can't grab the mac perimeter
189 	 */
190 	mutex_enter(&mac_srs->srs_lock);
191 	ADD_SOFTRING_TO_SET(mac_srs, ringp);
192 	mutex_exit(&mac_srs->srs_lock);
193 
194 	/*
195 	 * set the bind CPU to -1 to indicate
196 	 * no thread affinity set
197 	 */
198 	ringp->s_ring_cpuid = ringp->s_ring_cpuid_save = -1;
199 	ringp->s_ring_worker = thread_create(NULL, 0,
200 	    mac_soft_ring_worker, ringp, 0, &p0, TS_RUN, pri);
201 	if (type & ST_RING_TX) {
202 		ringp->s_ring_drain_func = mac_tx_soft_ring_drain;
203 		ringp->s_ring_tx_arg1 = x_arg1;
204 		ringp->s_ring_tx_arg2 = x_arg2;
205 		ringp->s_ring_tx_max_q_cnt = mac_tx_soft_ring_max_q_cnt;
206 		ringp->s_ring_tx_hiwat =
207 		    (mac_tx_soft_ring_hiwat > mac_tx_soft_ring_max_q_cnt) ?
208 		    mac_tx_soft_ring_max_q_cnt : mac_tx_soft_ring_hiwat;
209 		if (mcip->mci_state_flags & MCIS_IS_AGGR) {
210 			mac_srs_tx_t *tx = &mac_srs->srs_tx;
211 
212 			ASSERT(tx->st_soft_rings[
213 			    ((mac_ring_t *)x_arg2)->mr_index] == NULL);
214 			tx->st_soft_rings[((mac_ring_t *)x_arg2)->mr_index] =
215 			    ringp;
216 		}
217 	} else {
218 		ringp->s_ring_drain_func = mac_rx_soft_ring_drain;
219 		ringp->s_ring_rx_func = rx_func;
220 		ringp->s_ring_rx_arg1 = x_arg1;
221 		ringp->s_ring_rx_arg2 = x_arg2;
222 		if (mac_srs->srs_state & SRS_SOFTRING_QUEUE)
223 			ringp->s_ring_type |= ST_RING_WORKER_ONLY;
224 	}
225 	if (cpuid != -1)
226 		(void) mac_soft_ring_bind(ringp, cpuid);
227 
228 	mac_soft_ring_stat_create(ringp);
229 
230 	return (ringp);
231 }
232 
233 /*
234  * mac_soft_ring_free
235  *
236  * Free the soft ring once we are done with it.
237  */
238 void
239 mac_soft_ring_free(mac_soft_ring_t *softring)
240 {
241 	ASSERT((softring->s_ring_state &
242 	    (S_RING_CONDEMNED | S_RING_CONDEMNED_DONE | S_RING_PROC)) ==
243 	    (S_RING_CONDEMNED | S_RING_CONDEMNED_DONE));
244 	mac_pkt_drop(NULL, NULL, softring->s_ring_first, B_FALSE);
245 	softring->s_ring_tx_arg2 = NULL;
246 	mac_soft_ring_stat_delete(softring);
247 	mac_callback_free(softring->s_ring_notify_cb_list);
248 	kmem_cache_free(mac_soft_ring_cache, softring);
249 }
250 
251 int mac_soft_ring_thread_bind = 1;
252 
253 /*
254  * mac_soft_ring_bind
255  *
256  * Bind a soft ring worker thread to supplied CPU.
257  */
258 cpu_t *
259 mac_soft_ring_bind(mac_soft_ring_t *ringp, processorid_t cpuid)
260 {
261 	cpu_t *cp;
262 	boolean_t clear = B_FALSE;
263 
264 	ASSERT(MUTEX_HELD(&cpu_lock));
265 
266 	if (mac_soft_ring_thread_bind == 0) {
267 		DTRACE_PROBE1(mac__soft__ring__no__cpu__bound,
268 		    mac_soft_ring_t *, ringp);
269 		return (NULL);
270 	}
271 
272 	cp = cpu_get(cpuid);
273 	if (cp == NULL || !cpu_is_online(cp))
274 		return (NULL);
275 
276 	mutex_enter(&ringp->s_ring_lock);
277 	ringp->s_ring_state |= S_RING_BOUND;
278 	if (ringp->s_ring_cpuid != -1)
279 		clear = B_TRUE;
280 	ringp->s_ring_cpuid = cpuid;
281 	mutex_exit(&ringp->s_ring_lock);
282 
283 	if (clear)
284 		thread_affinity_clear(ringp->s_ring_worker);
285 
286 	DTRACE_PROBE2(mac__soft__ring__cpu__bound, mac_soft_ring_t *,
287 	    ringp, processorid_t, cpuid);
288 
289 	thread_affinity_set(ringp->s_ring_worker, cpuid);
290 
291 	return (cp);
292 }
293 
294 /*
295  * mac_soft_ring_unbind
296  *
297  * Un Bind a soft ring worker thread.
298  */
299 void
300 mac_soft_ring_unbind(mac_soft_ring_t *ringp)
301 {
302 	ASSERT(MUTEX_HELD(&cpu_lock));
303 
304 	mutex_enter(&ringp->s_ring_lock);
305 	if (!(ringp->s_ring_state & S_RING_BOUND)) {
306 		ASSERT(ringp->s_ring_cpuid == -1);
307 		mutex_exit(&ringp->s_ring_lock);
308 		return;
309 	}
310 
311 	ringp->s_ring_cpuid = -1;
312 	ringp->s_ring_state &= ~S_RING_BOUND;
313 	thread_affinity_clear(ringp->s_ring_worker);
314 	mutex_exit(&ringp->s_ring_lock);
315 }
316 
317 /*
318  * PRIVATE FUNCTIONS
319  */
320 
321 static void
322 mac_soft_ring_fire(void *arg)
323 {
324 	mac_soft_ring_t	*ringp = arg;
325 
326 	mutex_enter(&ringp->s_ring_lock);
327 	if (ringp->s_ring_tid == 0) {
328 		mutex_exit(&ringp->s_ring_lock);
329 		return;
330 	}
331 
332 	ringp->s_ring_tid = 0;
333 
334 	if (!(ringp->s_ring_state & S_RING_PROC)) {
335 		cv_signal(&ringp->s_ring_async);
336 	}
337 	mutex_exit(&ringp->s_ring_lock);
338 }
339 
340 /*
341  * mac_rx_soft_ring_drain
342  *
343  * Called when worker thread model (ST_RING_WORKER_ONLY) of processing
344  * incoming packets is used. s_ring_first contain the queued packets.
345  * s_ring_rx_func contains the upper level (client) routine where the
346  * packets are destined and s_ring_rx_arg1/s_ring_rx_arg2 are the
347  * cookie meant for the client.
348  */
349 /* ARGSUSED */
350 static void
351 mac_rx_soft_ring_drain(mac_soft_ring_t *ringp)
352 {
353 	mblk_t		*mp;
354 	void		*arg1;
355 	mac_resource_handle_t arg2;
356 	timeout_id_t 	tid;
357 	mac_direct_rx_t	proc;
358 	size_t		sz;
359 	int		cnt;
360 	mac_soft_ring_set_t	*mac_srs = ringp->s_ring_set;
361 
362 	ringp->s_ring_run = curthread;
363 	ASSERT(mutex_owned(&ringp->s_ring_lock));
364 	ASSERT(!(ringp->s_ring_state & S_RING_PROC));
365 
366 	if ((tid = ringp->s_ring_tid) != 0)
367 		ringp->s_ring_tid = 0;
368 
369 	ringp->s_ring_state |= S_RING_PROC;
370 
371 	proc = ringp->s_ring_rx_func;
372 	arg1 = ringp->s_ring_rx_arg1;
373 	arg2 = ringp->s_ring_rx_arg2;
374 
375 	while ((ringp->s_ring_first != NULL) &&
376 	    !(ringp->s_ring_state & S_RING_PAUSE)) {
377 		mp = ringp->s_ring_first;
378 		ringp->s_ring_first = NULL;
379 		ringp->s_ring_last = NULL;
380 		cnt = ringp->s_ring_count;
381 		ringp->s_ring_count = 0;
382 		sz = ringp->s_ring_size;
383 		ringp->s_ring_size = 0;
384 		mutex_exit(&ringp->s_ring_lock);
385 
386 		if (tid != 0) {
387 			(void) untimeout(tid);
388 			tid = 0;
389 		}
390 
391 		(*proc)(arg1, arg2, mp, NULL);
392 
393 		/*
394 		 * If we have a soft ring set which is doing
395 		 * bandwidth control, we need to decrement its
396 		 * srs_size so it can have a accurate idea of
397 		 * what is the real data queued between SRS and
398 		 * its soft rings. We decrement the size for a
399 		 * packet only when it gets processed by both
400 		 * SRS and the soft ring.
401 		 */
402 		mutex_enter(&mac_srs->srs_lock);
403 		MAC_UPDATE_SRS_COUNT_LOCKED(mac_srs, cnt);
404 		MAC_UPDATE_SRS_SIZE_LOCKED(mac_srs, sz);
405 		mutex_exit(&mac_srs->srs_lock);
406 
407 		mutex_enter(&ringp->s_ring_lock);
408 	}
409 	ringp->s_ring_state &= ~S_RING_PROC;
410 	if (ringp->s_ring_state & S_RING_CLIENT_WAIT)
411 		cv_signal(&ringp->s_ring_client_cv);
412 	ringp->s_ring_run = NULL;
413 }
414 
415 /*
416  * mac_soft_ring_worker
417  *
418  * The soft ring worker routine to process any queued packets. In
419  * normal case, the worker thread is bound to a CPU. It the soft
420  * ring is dealing with TCP packets, then the worker thread will
421  * be bound to the same CPU as the TCP squeue.
422  */
423 static void
424 mac_soft_ring_worker(mac_soft_ring_t *ringp)
425 {
426 	kmutex_t *lock = &ringp->s_ring_lock;
427 	kcondvar_t *async = &ringp->s_ring_async;
428 	mac_soft_ring_set_t *srs = ringp->s_ring_set;
429 	callb_cpr_t cprinfo;
430 
431 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "mac_soft_ring");
432 	mutex_enter(lock);
433 start:
434 	for (;;) {
435 		while (((ringp->s_ring_first == NULL ||
436 		    (ringp->s_ring_state & (S_RING_BLOCK|S_RING_BLANK))) &&
437 		    !(ringp->s_ring_state & S_RING_PAUSE)) ||
438 		    (ringp->s_ring_state & S_RING_PROC)) {
439 
440 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
441 			cv_wait(async, lock);
442 			CALLB_CPR_SAFE_END(&cprinfo, lock);
443 		}
444 
445 		/*
446 		 * Either we have work to do, or we have been asked to
447 		 * shutdown temporarily or permanently
448 		 */
449 		if (ringp->s_ring_state & S_RING_PAUSE)
450 			goto done;
451 
452 		ringp->s_ring_drain_func(ringp);
453 	}
454 done:
455 	mutex_exit(lock);
456 	mutex_enter(&srs->srs_lock);
457 	mutex_enter(lock);
458 
459 	ringp->s_ring_state |= S_RING_QUIESCE_DONE;
460 	if (!(ringp->s_ring_state & S_RING_CONDEMNED)) {
461 		srs->srs_soft_ring_quiesced_count++;
462 		cv_broadcast(&srs->srs_async);
463 		mutex_exit(&srs->srs_lock);
464 		while (!(ringp->s_ring_state &
465 		    (S_RING_RESTART | S_RING_CONDEMNED)))
466 			cv_wait(&ringp->s_ring_async, &ringp->s_ring_lock);
467 		mutex_exit(lock);
468 		mutex_enter(&srs->srs_lock);
469 		mutex_enter(lock);
470 		srs->srs_soft_ring_quiesced_count--;
471 		if (ringp->s_ring_state & S_RING_RESTART) {
472 			ASSERT(!(ringp->s_ring_state & S_RING_CONDEMNED));
473 			ringp->s_ring_state &= ~(S_RING_RESTART |
474 			    S_RING_QUIESCE | S_RING_QUIESCE_DONE);
475 			cv_broadcast(&srs->srs_async);
476 			mutex_exit(&srs->srs_lock);
477 			goto start;
478 		}
479 	}
480 	ASSERT(ringp->s_ring_state & S_RING_CONDEMNED);
481 	ringp->s_ring_state |= S_RING_CONDEMNED_DONE;
482 	CALLB_CPR_EXIT(&cprinfo);
483 	srs->srs_soft_ring_condemned_count++;
484 	cv_broadcast(&srs->srs_async);
485 	mutex_exit(&srs->srs_lock);
486 	thread_exit();
487 }
488 
489 /*
490  * mac_soft_ring_intr_enable and mac_soft_ring_intr_disable
491  *
492  * these functions are called to toggle the sending of packets to the
493  * client. They are called by the client. the client gets the name
494  * of these routine and corresponding cookie (pointing to softring)
495  * during capability negotiation at setup time.
496  *
497  * Enabling is allow the processing thread to send packets to the
498  * client while disabling does the opposite.
499  */
500 void
501 mac_soft_ring_intr_enable(void *arg)
502 {
503 	mac_soft_ring_t *ringp = (mac_soft_ring_t *)arg;
504 	mutex_enter(&ringp->s_ring_lock);
505 	ringp->s_ring_state &= ~S_RING_BLANK;
506 	if (ringp->s_ring_first != NULL)
507 		mac_soft_ring_worker_wakeup(ringp);
508 	mutex_exit(&ringp->s_ring_lock);
509 }
510 
511 boolean_t
512 mac_soft_ring_intr_disable(void *arg)
513 {
514 	mac_soft_ring_t *ringp = (mac_soft_ring_t *)arg;
515 	boolean_t sring_blanked = B_FALSE;
516 	/*
517 	 * Stop worker thread from sending packets above.
518 	 * Squeue will poll soft ring when it needs packets.
519 	 */
520 	mutex_enter(&ringp->s_ring_lock);
521 	if (!(ringp->s_ring_state & S_RING_PROC)) {
522 		ringp->s_ring_state |= S_RING_BLANK;
523 		sring_blanked = B_TRUE;
524 	}
525 	mutex_exit(&ringp->s_ring_lock);
526 	return (sring_blanked);
527 }
528 
529 /*
530  * mac_soft_ring_poll
531  *
532  * This routine is called by the client to poll for packets from
533  * the soft ring. The function name and cookie corresponding to
534  * the soft ring is exchanged during capability negotiation during
535  * setup.
536  */
537 mblk_t *
538 mac_soft_ring_poll(mac_soft_ring_t *ringp, int bytes_to_pickup)
539 {
540 	mblk_t	*head, *tail;
541 	mblk_t	*mp;
542 	size_t	sz = 0;
543 	int	cnt = 0;
544 	mac_soft_ring_set_t	*mac_srs = ringp->s_ring_set;
545 
546 	ASSERT(mac_srs != NULL);
547 
548 	mutex_enter(&ringp->s_ring_lock);
549 	head = tail = mp = ringp->s_ring_first;
550 	if (head == NULL) {
551 		mutex_exit(&ringp->s_ring_lock);
552 		return (NULL);
553 	}
554 
555 	if (ringp->s_ring_size <= bytes_to_pickup) {
556 		head = ringp->s_ring_first;
557 		ringp->s_ring_first = NULL;
558 		ringp->s_ring_last = NULL;
559 		cnt = ringp->s_ring_count;
560 		ringp->s_ring_count = 0;
561 		sz = ringp->s_ring_size;
562 		ringp->s_ring_size = 0;
563 	} else {
564 		while (mp && sz <= bytes_to_pickup) {
565 			sz += msgdsize(mp);
566 			cnt++;
567 			tail = mp;
568 			mp = mp->b_next;
569 		}
570 		ringp->s_ring_count -= cnt;
571 		ringp->s_ring_size -= sz;
572 		tail->b_next = NULL;
573 		if (mp == NULL) {
574 			ringp->s_ring_first = NULL;
575 			ringp->s_ring_last = NULL;
576 			ASSERT(ringp->s_ring_count == 0);
577 		} else {
578 			ringp->s_ring_first = mp;
579 		}
580 	}
581 
582 	mutex_exit(&ringp->s_ring_lock);
583 	/*
584 	 * Update the shared count and size counters so
585 	 * that SRS has a accurate idea of queued packets.
586 	 */
587 	mutex_enter(&mac_srs->srs_lock);
588 	MAC_UPDATE_SRS_COUNT_LOCKED(mac_srs, cnt);
589 	MAC_UPDATE_SRS_SIZE_LOCKED(mac_srs, sz);
590 	mutex_exit(&mac_srs->srs_lock);
591 	return (head);
592 }
593 
594 /*
595  * mac_soft_ring_dls_bypass
596  *
597  * Enable direct client (IP) callback function from the softrings.
598  * Callers need to make sure they don't need any DLS layer processing
599  */
600 void
601 mac_soft_ring_dls_bypass(void *arg, mac_direct_rx_t rx_func, void *rx_arg1)
602 {
603 	mac_soft_ring_t		*softring = arg;
604 	mac_soft_ring_set_t	*srs;
605 
606 	ASSERT(rx_func != NULL);
607 
608 	mutex_enter(&softring->s_ring_lock);
609 	softring->s_ring_rx_func = rx_func;
610 	softring->s_ring_rx_arg1 = rx_arg1;
611 	mutex_exit(&softring->s_ring_lock);
612 
613 	srs = softring->s_ring_set;
614 	mutex_enter(&srs->srs_lock);
615 	srs->srs_type |= SRST_DLS_BYPASS;
616 	mutex_exit(&srs->srs_lock);
617 }
618 
619 /*
620  * mac_soft_ring_signal
621  *
622  * Typically used to set the soft ring state to QUIESCE, CONDEMNED, or
623  * RESTART.
624  *
625  * In the Rx side, the quiescing is done bottom up. After the Rx upcalls
626  * from the driver are done, then the Rx SRS is quiesced and only then can
627  * we signal the soft rings. Thus this function can't be called arbitrarily
628  * without satisfying the prerequisites. On the Tx side, the threads from
629  * top need to quiesced, then the Tx SRS and only then can we signal the
630  * Tx soft rings.
631  */
632 void
633 mac_soft_ring_signal(mac_soft_ring_t *softring, uint_t sr_flag)
634 {
635 	mutex_enter(&softring->s_ring_lock);
636 	softring->s_ring_state |= sr_flag;
637 	cv_signal(&softring->s_ring_async);
638 	mutex_exit(&softring->s_ring_lock);
639 }
640 
641 /*
642  * mac_tx_soft_ring_drain
643  *
644  * The transmit side drain routine in case the soft ring was being
645  * used to transmit packets.
646  */
647 static void
648 mac_tx_soft_ring_drain(mac_soft_ring_t *ringp)
649 {
650 	mblk_t 			*mp;
651 	void 			*arg1;
652 	void 			*arg2;
653 	mblk_t 			*tail;
654 	uint_t			saved_pkt_count, saved_size;
655 	mac_tx_stats_t		stats;
656 	mac_soft_ring_set_t	*mac_srs = ringp->s_ring_set;
657 
658 	saved_pkt_count = saved_size = 0;
659 	ringp->s_ring_run = curthread;
660 	ASSERT(mutex_owned(&ringp->s_ring_lock));
661 	ASSERT(!(ringp->s_ring_state & S_RING_PROC));
662 
663 	ringp->s_ring_state |= S_RING_PROC;
664 	arg1 = ringp->s_ring_tx_arg1;
665 	arg2 = ringp->s_ring_tx_arg2;
666 
667 	while (ringp->s_ring_first != NULL) {
668 		mp = ringp->s_ring_first;
669 		tail = ringp->s_ring_last;
670 		saved_pkt_count = ringp->s_ring_count;
671 		saved_size = ringp->s_ring_size;
672 		ringp->s_ring_first = NULL;
673 		ringp->s_ring_last = NULL;
674 		ringp->s_ring_count = 0;
675 		ringp->s_ring_size = 0;
676 		mutex_exit(&ringp->s_ring_lock);
677 
678 		mp = mac_tx_send(arg1, arg2, mp, &stats);
679 
680 		mutex_enter(&ringp->s_ring_lock);
681 		if (mp != NULL) {
682 			/* Device out of tx desc, set block */
683 			tail->b_next = ringp->s_ring_first;
684 			ringp->s_ring_first = mp;
685 			ringp->s_ring_count +=
686 			    (saved_pkt_count - stats.mts_opackets);
687 			ringp->s_ring_size += (saved_size - stats.mts_obytes);
688 			if (ringp->s_ring_last == NULL)
689 				ringp->s_ring_last = tail;
690 
691 			if (ringp->s_ring_tx_woken_up) {
692 				ringp->s_ring_tx_woken_up = B_FALSE;
693 			} else {
694 				ringp->s_ring_state |= S_RING_BLOCK;
695 				ringp->s_st_stat.mts_blockcnt++;
696 			}
697 
698 			ringp->s_ring_state &= ~S_RING_PROC;
699 			ringp->s_ring_run = NULL;
700 			return;
701 		} else {
702 			ringp->s_ring_tx_woken_up = B_FALSE;
703 			SRS_TX_STATS_UPDATE(mac_srs, &stats);
704 			SOFTRING_TX_STATS_UPDATE(ringp, &stats);
705 		}
706 	}
707 
708 	if (ringp->s_ring_count == 0 && ringp->s_ring_state &
709 	    (S_RING_TX_HIWAT | S_RING_WAKEUP_CLIENT | S_RING_ENQUEUED)) {
710 		mac_client_impl_t *mcip =  ringp->s_ring_mcip;
711 		boolean_t wakeup_required = B_FALSE;
712 
713 		if (ringp->s_ring_state &
714 		    (S_RING_TX_HIWAT|S_RING_WAKEUP_CLIENT)) {
715 			wakeup_required = B_TRUE;
716 		}
717 		ringp->s_ring_state &=
718 		    ~(S_RING_TX_HIWAT | S_RING_WAKEUP_CLIENT | S_RING_ENQUEUED);
719 		mutex_exit(&ringp->s_ring_lock);
720 		if (wakeup_required) {
721 			mac_tx_invoke_callbacks(mcip, (mac_tx_cookie_t)ringp);
722 			/*
723 			 * If the client is not the primary MAC client, then we
724 			 * need to send the notification to the clients upper
725 			 * MAC, i.e. mci_upper_mip.
726 			 */
727 			mac_tx_notify(mcip->mci_upper_mip != NULL ?
728 			    mcip->mci_upper_mip : mcip->mci_mip);
729 		}
730 		mutex_enter(&ringp->s_ring_lock);
731 	}
732 	ringp->s_ring_state &= ~S_RING_PROC;
733 	ringp->s_ring_run = NULL;
734 }
735