xref: /illumos-gate/usr/src/uts/common/io/mac/mac_soft_ring.c (revision 8bab47abcb471dffa36ddbf409a8ef5303398ddf)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 /*
27  * General Soft rings - Simulating Rx rings in S/W.
28  *
29  * Soft ring is a data abstraction containing a queue and a worker
30  * thread and represents a hardware Rx ring in software. Each soft
31  * ring set can have a collection of soft rings for separating
32  * L3/L4 specific traffic (IPv4 from IPv6 or TCP from UDP) or for
33  * allowing a higher degree of parallelism by sending traffic to
34  * one of the soft rings for a SRS (using a hash on src IP or port).
35  * Each soft ring worker thread can be bound to a different CPU
36  * allowing the processing for each soft ring to happen in parallel
37  * and independent from each other.
38  *
39  * Protocol soft rings:
40  *
41  * Each SRS has at an minimum 3 softrings. One each for IPv4 TCP,
42  * IPv4 UDP and rest (OTH - for IPv6 and everything else). The
43  * SRS does dynamic polling and enforces link level bandwidth but
44  * it does so for all traffic (IPv4 and IPv6 and all protocols) on
45  * that link. However, each protocol layer wants a different
46  * behaviour. For instance IPv4 TCP has per CPU squeues which
47  * enforce their own polling and flow control so IPv4 TCP traffic
48  * needs to go to a separate soft ring which can be polled by the
49  * TCP squeue. It also allows TCP squeue to push back flow control
50  * all the way to NIC hardware (if it puts its corresponding soft
51  * ring in the poll mode and soft ring queue builds up, the
52  * shared srs_poll_pkt_cnt goes up and SRS automatically stops
53  * more packets from entering the system).
54  *
55  * Similarly, the UDP benefits from a DLS bypass and packet chaining
56  * so sending it to a separate soft ring is desired. All the rest of
57  * the traffic (including IPv6 is sent to OTH softring). The IPv6
58  * traffic current goes through OTH softring and via DLS because
59  * it need more processing to be done. Irrespective of the sap
60  * (IPv4 or IPv6) or the transport, the dynamic polling, B/W enforcement,
61  * cpu assignment, fanout, etc apply to all traffic since they
62  * are implement by the SRS which is agnostic to sap or transport.
63  *
64  * Fanout soft rings:
65  *
66  * On a multithreaded system, we can assign more CPU and multi thread
67  * the stack by creating a soft ring per CPU and spreading traffic
68  * based on a hash computed on src IP etc. Since we still need to
69  * keep the protocol separation, we create a set of 3 soft ring per
70  * CPU (specified by cpu list or degree of fanout).
71  *
72  * NOTE: See the block level comment on top of mac_sched.c
73  */
74 
75 #include <sys/types.h>
76 #include <sys/callb.h>
77 #include <sys/sdt.h>
78 #include <sys/strsubr.h>
79 #include <sys/strsun.h>
80 #include <sys/vlan.h>
81 #include <inet/ipsec_impl.h>
82 #include <inet/ip_impl.h>
83 #include <inet/sadb.h>
84 #include <inet/ipsecesp.h>
85 #include <inet/ipsecah.h>
86 
87 #include <sys/mac_impl.h>
88 #include <sys/mac_client_impl.h>
89 #include <sys/mac_soft_ring.h>
90 #include <sys/mac_flow_impl.h>
91 
92 static void mac_rx_soft_ring_drain(mac_soft_ring_t *);
93 static void mac_soft_ring_fire(void *);
94 static void mac_soft_ring_worker(mac_soft_ring_t *);
95 static void mac_tx_soft_ring_drain(mac_soft_ring_t *);
96 
97 uint32_t mac_tx_soft_ring_max_q_cnt = 100000;
98 uint32_t mac_tx_soft_ring_hiwat = 1000;
99 
100 extern kmem_cache_t *mac_soft_ring_cache;
101 
102 #define	ADD_SOFTRING_TO_SET(mac_srs, softring) {			\
103 	if (mac_srs->srs_soft_ring_head == NULL) {			\
104 		mac_srs->srs_soft_ring_head = softring;			\
105 		mac_srs->srs_soft_ring_tail = softring;			\
106 	} else {							\
107 		/* ADD to the list */					\
108 		softring->s_ring_prev =					\
109 			mac_srs->srs_soft_ring_tail;			\
110 		mac_srs->srs_soft_ring_tail->s_ring_next = softring;	\
111 		mac_srs->srs_soft_ring_tail = softring;			\
112 	}								\
113 	mac_srs->srs_soft_ring_count++;					\
114 }
115 
116 /*
117  * mac_soft_ring_worker_wakeup
118  *
119  * Wake up the soft ring worker thread to process the queue as long
120  * as no one else is processing it and upper layer (client) is still
121  * ready to receive packets.
122  */
123 void
124 mac_soft_ring_worker_wakeup(mac_soft_ring_t *ringp)
125 {
126 	ASSERT(MUTEX_HELD(&ringp->s_ring_lock));
127 	if (!(ringp->s_ring_state & S_RING_PROC) &&
128 	    !(ringp->s_ring_state & S_RING_BLANK) &&
129 	    (ringp->s_ring_tid == NULL)) {
130 		if (ringp->s_ring_wait != 0) {
131 			ringp->s_ring_tid =
132 			    timeout(mac_soft_ring_fire, ringp,
133 			    ringp->s_ring_wait);
134 		} else {
135 			/* Schedule the worker thread. */
136 			cv_signal(&ringp->s_ring_async);
137 		}
138 	}
139 }
140 
141 /*
142  * mac_soft_ring_create
143  *
144  * Create a soft ring, do the necessary setup and bind the worker
145  * thread to the assigned CPU.
146  */
147 mac_soft_ring_t *
148 mac_soft_ring_create(int id, clock_t wait, void *flent, uint16_t type,
149     pri_t pri, mac_client_impl_t *mcip, mac_soft_ring_set_t *mac_srs,
150     processorid_t cpuid, mac_direct_rx_t rx_func, void *x_arg1,
151     mac_resource_handle_t x_arg2)
152 {
153 	mac_soft_ring_t 	*ringp;
154 	char 			name[S_RING_NAMELEN];
155 
156 	bzero(name, 64);
157 	ringp = kmem_cache_alloc(mac_soft_ring_cache, KM_SLEEP);
158 
159 	if (type & ST_RING_TCP) {
160 		(void) snprintf(name, sizeof (name),
161 		    "mac_tcp_soft_ring_%d_%p", id, (void *)mac_srs);
162 	} else if (type & ST_RING_UDP) {
163 		(void) snprintf(name, sizeof (name),
164 		    "mac_udp_soft_ring_%d_%p", id, (void *)mac_srs);
165 	} else {
166 		(void) snprintf(name, sizeof (name),
167 		    "mac_oth_soft_ring_%d_%p", id, (void *)mac_srs);
168 	}
169 
170 	bzero(ringp, sizeof (mac_soft_ring_t));
171 	(void) strncpy(ringp->s_ring_name, name, S_RING_NAMELEN + 1);
172 	ringp->s_ring_name[S_RING_NAMELEN] = '\0';
173 	mutex_init(&ringp->s_ring_lock, NULL, MUTEX_DEFAULT, NULL);
174 	ringp->s_ring_notify_cb_info.mcbi_lockp = &ringp->s_ring_lock;
175 
176 	ringp->s_ring_type = type;
177 	ringp->s_ring_wait = MSEC_TO_TICK(wait);
178 	ringp->s_ring_mcip = mcip;
179 	ringp->s_ring_set = mac_srs;
180 	ringp->s_ring_flent = flent;
181 
182 	/*
183 	 * Protect against access from DR callbacks (mac_walk_srs_bind/unbind)
184 	 * which can't grab the mac perimeter
185 	 */
186 	mutex_enter(&mac_srs->srs_lock);
187 	ADD_SOFTRING_TO_SET(mac_srs, ringp);
188 	mutex_exit(&mac_srs->srs_lock);
189 
190 	/*
191 	 * set the bind CPU to -1 to indicate
192 	 * no thread affinity set
193 	 */
194 	ringp->s_ring_cpuid = ringp->s_ring_cpuid_save = -1;
195 	ringp->s_ring_worker = thread_create(NULL, 0,
196 	    mac_soft_ring_worker, ringp, 0, &p0, TS_RUN, pri);
197 	if (type & ST_RING_TX) {
198 		ringp->s_ring_drain_func = mac_tx_soft_ring_drain;
199 		ringp->s_ring_tx_arg1 = x_arg1;
200 		ringp->s_ring_tx_arg2 = x_arg2;
201 		ringp->s_ring_tx_max_q_cnt = mac_tx_soft_ring_max_q_cnt;
202 		ringp->s_ring_tx_hiwat =
203 		    (mac_tx_soft_ring_hiwat > mac_tx_soft_ring_max_q_cnt) ?
204 		    mac_tx_soft_ring_max_q_cnt : mac_tx_soft_ring_hiwat;
205 	} else {
206 		ringp->s_ring_drain_func = mac_rx_soft_ring_drain;
207 		ringp->s_ring_rx_func = rx_func;
208 		ringp->s_ring_rx_arg1 = x_arg1;
209 		ringp->s_ring_rx_arg2 = x_arg2;
210 		if (mac_srs->srs_state & SRS_SOFTRING_QUEUE)
211 			ringp->s_ring_type |= ST_RING_WORKER_ONLY;
212 	}
213 	if (cpuid != -1)
214 		(void) mac_soft_ring_bind(ringp, cpuid);
215 
216 	return (ringp);
217 }
218 
219 /*
220  * mac_soft_ring_free
221  *
222  * Free the soft ring once we are done with it.
223  */
224 void
225 mac_soft_ring_free(mac_soft_ring_t *softring, boolean_t release_tx_ring)
226 {
227 	ASSERT((softring->s_ring_state &
228 	    (S_RING_CONDEMNED | S_RING_CONDEMNED_DONE | S_RING_PROC)) ==
229 	    (S_RING_CONDEMNED | S_RING_CONDEMNED_DONE));
230 	mac_pkt_drop(NULL, NULL, softring->s_ring_first, B_FALSE);
231 	if (release_tx_ring && softring->s_ring_tx_arg2 != NULL) {
232 		ASSERT(softring->s_ring_type & ST_RING_TX);
233 		mac_release_tx_ring(softring->s_ring_tx_arg2);
234 	}
235 	if (softring->s_ring_ksp)
236 		kstat_delete(softring->s_ring_ksp);
237 	mac_callback_free(softring->s_ring_notify_cb_list);
238 	kmem_cache_free(mac_soft_ring_cache, softring);
239 }
240 
241 int mac_soft_ring_thread_bind = 1;
242 
243 /*
244  * mac_soft_ring_bind
245  *
246  * Bind a soft ring worker thread to supplied CPU.
247  */
248 cpu_t *
249 mac_soft_ring_bind(mac_soft_ring_t *ringp, processorid_t cpuid)
250 {
251 	cpu_t *cp;
252 	boolean_t clear = B_FALSE;
253 
254 	ASSERT(MUTEX_HELD(&cpu_lock));
255 
256 	if (mac_soft_ring_thread_bind == 0) {
257 		DTRACE_PROBE1(mac__soft__ring__no__cpu__bound,
258 		    mac_soft_ring_t *, ringp);
259 		return (NULL);
260 	}
261 
262 	cp = cpu_get(cpuid);
263 	if (cp == NULL || !cpu_is_online(cp))
264 		return (NULL);
265 
266 	mutex_enter(&ringp->s_ring_lock);
267 	ringp->s_ring_state |= S_RING_BOUND;
268 	if (ringp->s_ring_cpuid != -1)
269 		clear = B_TRUE;
270 	ringp->s_ring_cpuid = cpuid;
271 	mutex_exit(&ringp->s_ring_lock);
272 
273 	if (clear)
274 		thread_affinity_clear(ringp->s_ring_worker);
275 
276 	DTRACE_PROBE2(mac__soft__ring__cpu__bound, mac_soft_ring_t *,
277 	    ringp, processorid_t, cpuid);
278 
279 	thread_affinity_set(ringp->s_ring_worker, cpuid);
280 
281 	return (cp);
282 }
283 
284 /*
285  * mac_soft_ring_unbind
286  *
287  * Un Bind a soft ring worker thread.
288  */
289 void
290 mac_soft_ring_unbind(mac_soft_ring_t *ringp)
291 {
292 	ASSERT(MUTEX_HELD(&cpu_lock));
293 
294 	mutex_enter(&ringp->s_ring_lock);
295 	if (!(ringp->s_ring_state & S_RING_BOUND)) {
296 		ASSERT(ringp->s_ring_cpuid == -1);
297 		mutex_exit(&ringp->s_ring_lock);
298 		return;
299 	}
300 
301 	ringp->s_ring_cpuid = -1;
302 	ringp->s_ring_state &= ~S_RING_BOUND;
303 	thread_affinity_clear(ringp->s_ring_worker);
304 	mutex_exit(&ringp->s_ring_lock);
305 }
306 
307 /*
308  * PRIVATE FUNCTIONS
309  */
310 
311 static void
312 mac_soft_ring_fire(void *arg)
313 {
314 	mac_soft_ring_t	*ringp = arg;
315 
316 	mutex_enter(&ringp->s_ring_lock);
317 	if (ringp->s_ring_tid == 0) {
318 		mutex_exit(&ringp->s_ring_lock);
319 		return;
320 	}
321 
322 	ringp->s_ring_tid = 0;
323 
324 	if (!(ringp->s_ring_state & S_RING_PROC)) {
325 		cv_signal(&ringp->s_ring_async);
326 	}
327 	mutex_exit(&ringp->s_ring_lock);
328 }
329 
330 /*
331  * mac_rx_soft_ring_drain
332  *
333  * Called when worker thread model (ST_RING_WORKER_ONLY) of processing
334  * incoming packets is used. s_ring_first contain the queued packets.
335  * s_ring_rx_func contains the upper level (client) routine where the
336  * packets are destined and s_ring_rx_arg1/s_ring_rx_arg2 are the
337  * cookie meant for the client.
338  */
339 /* ARGSUSED */
340 static void
341 mac_rx_soft_ring_drain(mac_soft_ring_t *ringp)
342 {
343 	mblk_t		*mp;
344 	void		*arg1;
345 	mac_resource_handle_t arg2;
346 	timeout_id_t 	tid;
347 	mac_direct_rx_t	proc;
348 	size_t		sz;
349 	int		cnt;
350 	mac_soft_ring_set_t	*mac_srs = ringp->s_ring_set;
351 
352 	ringp->s_ring_run = curthread;
353 	ASSERT(mutex_owned(&ringp->s_ring_lock));
354 	ASSERT(!(ringp->s_ring_state & S_RING_PROC));
355 
356 	if ((tid = ringp->s_ring_tid) != 0)
357 		ringp->s_ring_tid = 0;
358 
359 	ringp->s_ring_state |= S_RING_PROC;
360 
361 	proc = ringp->s_ring_rx_func;
362 	arg1 = ringp->s_ring_rx_arg1;
363 	arg2 = ringp->s_ring_rx_arg2;
364 
365 	while ((ringp->s_ring_first != NULL) &&
366 	    !(ringp->s_ring_state & S_RING_PAUSE)) {
367 		mp = ringp->s_ring_first;
368 		ringp->s_ring_first = NULL;
369 		ringp->s_ring_last = NULL;
370 		cnt = ringp->s_ring_count;
371 		ringp->s_ring_count = 0;
372 		sz = ringp->s_ring_size;
373 		ringp->s_ring_size = 0;
374 		mutex_exit(&ringp->s_ring_lock);
375 
376 		if (tid != 0) {
377 			(void) untimeout(tid);
378 			tid = 0;
379 		}
380 
381 		(*proc)(arg1, arg2, mp, NULL);
382 
383 		/*
384 		 * If we have a soft ring set which is doing
385 		 * bandwidth control, we need to decrement its
386 		 * srs_size so it can have a accurate idea of
387 		 * what is the real data queued between SRS and
388 		 * its soft rings. We decrement the size for a
389 		 * packet only when it gets processed by both
390 		 * SRS and the soft ring.
391 		 */
392 		mutex_enter(&mac_srs->srs_lock);
393 		MAC_UPDATE_SRS_COUNT_LOCKED(mac_srs, cnt);
394 		MAC_UPDATE_SRS_SIZE_LOCKED(mac_srs, sz);
395 		mutex_exit(&mac_srs->srs_lock);
396 
397 		mutex_enter(&ringp->s_ring_lock);
398 	}
399 	ringp->s_ring_state &= ~S_RING_PROC;
400 	if (ringp->s_ring_state & S_RING_CLIENT_WAIT)
401 		cv_signal(&ringp->s_ring_client_cv);
402 	ringp->s_ring_run = NULL;
403 }
404 
405 /*
406  * mac_soft_ring_worker
407  *
408  * The soft ring worker routine to process any queued packets. In
409  * normal case, the worker thread is bound to a CPU. It the soft
410  * ring is dealing with TCP packets, then the worker thread will
411  * be bound to the same CPU as the TCP squeue.
412  */
413 static void
414 mac_soft_ring_worker(mac_soft_ring_t *ringp)
415 {
416 	kmutex_t *lock = &ringp->s_ring_lock;
417 	kcondvar_t *async = &ringp->s_ring_async;
418 	mac_soft_ring_set_t *srs = ringp->s_ring_set;
419 	callb_cpr_t cprinfo;
420 
421 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "mac_soft_ring");
422 	mutex_enter(lock);
423 start:
424 	for (;;) {
425 		while (((ringp->s_ring_first == NULL ||
426 		    (ringp->s_ring_state & (S_RING_BLOCK|S_RING_BLANK))) &&
427 		    !(ringp->s_ring_state & S_RING_PAUSE)) ||
428 		    (ringp->s_ring_state & S_RING_PROC)) {
429 
430 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
431 			cv_wait(async, lock);
432 			CALLB_CPR_SAFE_END(&cprinfo, lock);
433 		}
434 
435 		/*
436 		 * Either we have work to do, or we have been asked to
437 		 * shutdown temporarily or permanently
438 		 */
439 		if (ringp->s_ring_state & S_RING_PAUSE)
440 			goto done;
441 
442 		ringp->s_ring_drain_func(ringp);
443 	}
444 done:
445 	mutex_exit(lock);
446 	mutex_enter(&srs->srs_lock);
447 	mutex_enter(lock);
448 
449 	ringp->s_ring_state |= S_RING_QUIESCE_DONE;
450 	if (!(ringp->s_ring_state & S_RING_CONDEMNED)) {
451 		srs->srs_soft_ring_quiesced_count++;
452 		cv_broadcast(&srs->srs_async);
453 		mutex_exit(&srs->srs_lock);
454 		while (!(ringp->s_ring_state &
455 		    (S_RING_RESTART | S_RING_CONDEMNED)))
456 			cv_wait(&ringp->s_ring_async, &ringp->s_ring_lock);
457 		mutex_exit(lock);
458 		mutex_enter(&srs->srs_lock);
459 		mutex_enter(lock);
460 		srs->srs_soft_ring_quiesced_count--;
461 		if (ringp->s_ring_state & S_RING_RESTART) {
462 			ASSERT(!(ringp->s_ring_state & S_RING_CONDEMNED));
463 			ringp->s_ring_state &= ~(S_RING_RESTART |
464 			    S_RING_QUIESCE | S_RING_QUIESCE_DONE);
465 			cv_broadcast(&srs->srs_async);
466 			mutex_exit(&srs->srs_lock);
467 			goto start;
468 		}
469 	}
470 	ASSERT(ringp->s_ring_state & S_RING_CONDEMNED);
471 	ringp->s_ring_state |= S_RING_CONDEMNED_DONE;
472 	CALLB_CPR_EXIT(&cprinfo);
473 	srs->srs_soft_ring_condemned_count++;
474 	cv_broadcast(&srs->srs_async);
475 	mutex_exit(&srs->srs_lock);
476 	thread_exit();
477 }
478 
479 /*
480  * mac_soft_ring_intr_enable and mac_soft_ring_intr_disable
481  *
482  * these functions are called to toggle the sending of packets to the
483  * client. They are called by the client. the client gets the name
484  * of these routine and corresponding cookie (pointing to softring)
485  * during capability negotiation at setup time.
486  *
487  * Enabling is allow the processing thread to send packets to the
488  * client while disabling does the opposite.
489  */
490 void
491 mac_soft_ring_intr_enable(void *arg)
492 {
493 	mac_soft_ring_t *ringp = (mac_soft_ring_t *)arg;
494 	mutex_enter(&ringp->s_ring_lock);
495 	ringp->s_ring_state &= ~S_RING_BLANK;
496 	if (ringp->s_ring_first != NULL)
497 		mac_soft_ring_worker_wakeup(ringp);
498 	mutex_exit(&ringp->s_ring_lock);
499 }
500 
501 boolean_t
502 mac_soft_ring_intr_disable(void *arg)
503 {
504 	mac_soft_ring_t *ringp = (mac_soft_ring_t *)arg;
505 	boolean_t sring_blanked = B_FALSE;
506 	/*
507 	 * Stop worker thread from sending packets above.
508 	 * Squeue will poll soft ring when it needs packets.
509 	 */
510 	mutex_enter(&ringp->s_ring_lock);
511 	if (!(ringp->s_ring_state & S_RING_PROC)) {
512 		ringp->s_ring_state |= S_RING_BLANK;
513 		sring_blanked = B_TRUE;
514 	}
515 	mutex_exit(&ringp->s_ring_lock);
516 	return (sring_blanked);
517 }
518 
519 /*
520  * mac_soft_ring_poll
521  *
522  * This routine is called by the client to poll for packets from
523  * the soft ring. The function name and cookie corresponding to
524  * the soft ring is exchanged during capability negotiation during
525  * setup.
526  */
527 mblk_t *
528 mac_soft_ring_poll(mac_soft_ring_t *ringp, int bytes_to_pickup)
529 {
530 	mblk_t	*head, *tail;
531 	mblk_t	*mp;
532 	size_t	sz = 0;
533 	int	cnt = 0;
534 	mac_soft_ring_set_t	*mac_srs = ringp->s_ring_set;
535 
536 	ASSERT(mac_srs != NULL);
537 
538 	mutex_enter(&ringp->s_ring_lock);
539 	head = tail = mp = ringp->s_ring_first;
540 	if (head == NULL) {
541 		mutex_exit(&ringp->s_ring_lock);
542 		return (NULL);
543 	}
544 
545 	if (ringp->s_ring_size <= bytes_to_pickup) {
546 		head = ringp->s_ring_first;
547 		ringp->s_ring_first = NULL;
548 		ringp->s_ring_last = NULL;
549 		cnt = ringp->s_ring_count;
550 		ringp->s_ring_count = 0;
551 		sz = ringp->s_ring_size;
552 		ringp->s_ring_size = 0;
553 	} else {
554 		while (mp && sz <= bytes_to_pickup) {
555 			sz += msgdsize(mp);
556 			cnt++;
557 			tail = mp;
558 			mp = mp->b_next;
559 		}
560 		ringp->s_ring_count -= cnt;
561 		ringp->s_ring_size -= sz;
562 		tail->b_next = NULL;
563 		if (mp == NULL) {
564 			ringp->s_ring_first = NULL;
565 			ringp->s_ring_last = NULL;
566 			ASSERT(ringp->s_ring_count == 0);
567 		} else {
568 			ringp->s_ring_first = mp;
569 		}
570 	}
571 
572 	mutex_exit(&ringp->s_ring_lock);
573 	/*
574 	 * Update the shared count and size counters so
575 	 * that SRS has a accurate idea of queued packets.
576 	 */
577 	mutex_enter(&mac_srs->srs_lock);
578 	MAC_UPDATE_SRS_COUNT_LOCKED(mac_srs, cnt);
579 	MAC_UPDATE_SRS_SIZE_LOCKED(mac_srs, sz);
580 	mutex_exit(&mac_srs->srs_lock);
581 	return (head);
582 }
583 
584 /*
585  * mac_soft_ring_dls_bypass
586  *
587  * Enable direct client (IP) callback function from the softrings.
588  * Callers need to make sure they don't need any DLS layer processing
589  */
590 void
591 mac_soft_ring_dls_bypass(void *arg, mac_direct_rx_t rx_func, void *rx_arg1)
592 {
593 	mac_soft_ring_t		*softring = arg;
594 	mac_soft_ring_set_t	*srs;
595 
596 	ASSERT(rx_func != NULL);
597 
598 	mutex_enter(&softring->s_ring_lock);
599 	softring->s_ring_rx_func = rx_func;
600 	softring->s_ring_rx_arg1 = rx_arg1;
601 	mutex_exit(&softring->s_ring_lock);
602 
603 	srs = softring->s_ring_set;
604 	mutex_enter(&srs->srs_lock);
605 	srs->srs_type |= SRST_DLS_BYPASS;
606 	mutex_exit(&srs->srs_lock);
607 }
608 
609 /*
610  * mac_soft_ring_signal
611  *
612  * Typically used to set the soft ring state to QUIESCE, CONDEMNED, or
613  * RESTART.
614  *
615  * In the Rx side, the quiescing is done bottom up. After the Rx upcalls
616  * from the driver are done, then the Rx SRS is quiesced and only then can
617  * we signal the soft rings. Thus this function can't be called arbitrarily
618  * without satisfying the prerequisites. On the Tx side, the threads from
619  * top need to quiesced, then the Tx SRS and only then can we signal the
620  * Tx soft rings.
621  */
622 void
623 mac_soft_ring_signal(mac_soft_ring_t *softring, uint_t sr_flag)
624 {
625 	mutex_enter(&softring->s_ring_lock);
626 	softring->s_ring_state |= sr_flag;
627 	cv_signal(&softring->s_ring_async);
628 	mutex_exit(&softring->s_ring_lock);
629 }
630 
631 /*
632  * mac_tx_soft_ring_drain
633  *
634  * The transmit side drain routine in case the soft ring was being
635  * used to transmit packets.
636  */
637 static void
638 mac_tx_soft_ring_drain(mac_soft_ring_t *ringp)
639 {
640 	mblk_t 			*mp;
641 	void 			*arg1;
642 	void 			*arg2;
643 	mblk_t 			*tail;
644 	uint_t			saved_pkt_count, saved_size;
645 	boolean_t		is_subflow;
646 	mac_tx_stats_t		stats;
647 	mac_soft_ring_set_t	*mac_srs = ringp->s_ring_set;
648 
649 	saved_pkt_count = saved_size = 0;
650 	ringp->s_ring_run = curthread;
651 	ASSERT(mutex_owned(&ringp->s_ring_lock));
652 	ASSERT(!(ringp->s_ring_state & S_RING_PROC));
653 
654 	ringp->s_ring_state |= S_RING_PROC;
655 	is_subflow = ((mac_srs->srs_type & SRST_FLOW) != 0);
656 	arg1 = ringp->s_ring_tx_arg1;
657 	arg2 = ringp->s_ring_tx_arg2;
658 
659 	while (ringp->s_ring_first != NULL) {
660 		mp = ringp->s_ring_first;
661 		tail = ringp->s_ring_last;
662 		saved_pkt_count = ringp->s_ring_count;
663 		saved_size = ringp->s_ring_size;
664 		ringp->s_ring_first = NULL;
665 		ringp->s_ring_last = NULL;
666 		ringp->s_ring_count = 0;
667 		ringp->s_ring_size = 0;
668 		mutex_exit(&ringp->s_ring_lock);
669 
670 		mp = mac_tx_send(arg1, arg2, mp, &stats);
671 
672 		mutex_enter(&ringp->s_ring_lock);
673 		if (mp != NULL) {
674 			/* Device out of tx desc, set block */
675 			tail->b_next = ringp->s_ring_first;
676 			ringp->s_ring_first = mp;
677 			ringp->s_ring_count +=
678 			    (saved_pkt_count - stats.ts_opackets);
679 			ringp->s_ring_size += (saved_size - stats.ts_obytes);
680 			if (ringp->s_ring_last == NULL)
681 				ringp->s_ring_last = tail;
682 
683 			if (ringp->s_ring_tx_woken_up) {
684 				ringp->s_ring_tx_woken_up = B_FALSE;
685 			} else {
686 				ringp->s_ring_state |= S_RING_BLOCK;
687 				ringp->s_ring_blocked_cnt++;
688 			}
689 
690 			ringp->s_ring_state &= ~S_RING_PROC;
691 			ringp->s_ring_run = NULL;
692 			return;
693 		} else {
694 			ringp->s_ring_tx_woken_up = B_FALSE;
695 			if (is_subflow) {
696 				FLOW_TX_STATS_UPDATE(
697 				    mac_srs->srs_flent, &stats);
698 			}
699 		}
700 	}
701 
702 	if (ringp->s_ring_count == 0 && ringp->s_ring_state &
703 	    (S_RING_TX_HIWAT | S_RING_WAKEUP_CLIENT | S_RING_ENQUEUED)) {
704 		mac_tx_notify_cb_t *mtnfp;
705 		mac_cb_t *mcb;
706 		mac_client_impl_t *mcip =  ringp->s_ring_mcip;
707 		boolean_t wakeup_required = B_FALSE;
708 
709 		if (ringp->s_ring_state &
710 		    (S_RING_TX_HIWAT|S_RING_WAKEUP_CLIENT)) {
711 			wakeup_required = B_TRUE;
712 		}
713 		ringp->s_ring_state &=
714 		    ~(S_RING_TX_HIWAT | S_RING_WAKEUP_CLIENT | S_RING_ENQUEUED);
715 		mutex_exit(&ringp->s_ring_lock);
716 		if (wakeup_required) {
717 			/* Wakeup callback registered clients */
718 			MAC_CALLBACK_WALKER_INC(&mcip->mci_tx_notify_cb_info);
719 			for (mcb = mcip->mci_tx_notify_cb_list; mcb != NULL;
720 			    mcb = mcb->mcb_nextp) {
721 				mtnfp = (mac_tx_notify_cb_t *)mcb->mcb_objp;
722 				mtnfp->mtnf_fn(mtnfp->mtnf_arg,
723 				    (mac_tx_cookie_t)ringp);
724 			}
725 			MAC_CALLBACK_WALKER_DCR(&mcip->mci_tx_notify_cb_info,
726 			    &mcip->mci_tx_notify_cb_list);
727 			/*
728 			 * If the client is not the primary MAC client, then we
729 			 * need to send the notification to the clients upper
730 			 * MAC, i.e. mci_upper_mip.
731 			 */
732 			mac_tx_notify(mcip->mci_upper_mip != NULL ?
733 			    mcip->mci_upper_mip : mcip->mci_mip);
734 		}
735 		mutex_enter(&ringp->s_ring_lock);
736 	}
737 	ringp->s_ring_state &= ~S_RING_PROC;
738 	ringp->s_ring_run = NULL;
739 }
740