1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
23 */
24
25 /*
26 * Copyright 2012 Joyent, Inc. All rights reserved.
27 */
28
29 /*
30 * Squeues: General purpose serialization mechanism
31 * ------------------------------------------------
32 *
33 * Background:
34 * -----------
35 *
36 * This is a general purpose high-performance serialization mechanism
37 * currently used by TCP/IP. It is implement by means of a per CPU queue,
38 * a worker thread and a polling thread with are bound to the CPU
39 * associated with the squeue. The squeue is strictly FIFO for both read
40 * and write side and only one thread can process it at any given time.
41 * The design goal of squeue was to offer a very high degree of
42 * parallelization (on a per H/W execution pipeline basis) with at
43 * most one queuing.
44 *
45 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or
46 * SQUEUE_ENTER() macro as soon as a thread enter the module
47 * from either direction. For each packet, the processing function
48 * and argument is stored in the mblk itself. When the packet is ready
49 * to be processed, the squeue retrieves the stored function and calls
50 * it with the supplied argument and the pointer to the packet itself.
51 * The called function can assume that no other thread is processing
52 * the squeue when it is executing.
53 *
54 * Squeue/connection binding:
55 * --------------------------
56 *
57 * TCP/IP uses an IP classifier in conjunction with squeue where specific
58 * connections are assigned to specific squeue (based on various policies),
59 * at the connection creation time. Once assigned, the connection to
60 * squeue mapping is never changed and all future packets for that
61 * connection are processed on that squeue. The connection ("conn") to
62 * squeue mapping is stored in "conn_t" member "conn_sqp".
63 *
64 * Since the processing of the connection cuts across multiple layers
65 * but still allows packets for different connnection to be processed on
66 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
67 * "Per Connection Vertical Perimeter".
68 *
69 * Processing Model:
70 * -----------------
71 *
72 * Squeue doesn't necessary processes packets with its own worker thread.
73 * The callers can pick if they just want to queue the packet, process
74 * their packet if nothing is queued or drain and process. The first two
75 * modes are typically employed when the packet was generated while
76 * already doing the processing behind the squeue and last mode (drain
77 * and process) is typically employed when the thread is entering squeue
78 * for the first time. The squeue still imposes a finite time limit
79 * for which a external thread can do processing after which it switches
80 * processing to its own worker thread.
81 *
82 * Once created, squeues are never deleted. Hence squeue pointers are
83 * always valid. This means that functions outside the squeue can still
84 * refer safely to conn_sqp and their is no need for ref counts.
85 *
86 * Only a thread executing in the squeue can change the squeue of the
87 * connection. It does so by calling a squeue framework function to do this.
88 * After changing the squeue, the thread must leave the squeue. It must not
89 * continue to execute any code that needs squeue protection.
90 *
91 * The squeue framework, after entering the squeue, checks if the current
92 * squeue matches the conn_sqp. If the check fails, the packet is delivered
93 * to right squeue.
94 *
95 * Polling Model:
96 * --------------
97 *
98 * Squeues can control the rate of packet arrival into itself from the
99 * NIC or specific Rx ring within a NIC. As part of capability negotiation
100 * between IP and MAC layer, squeue are created for each TCP soft ring
101 * (or TCP Rx ring - to be implemented in future). As part of this
102 * negotiation, squeues get a cookie for underlying soft ring or Rx
103 * ring, a function to turn off incoming packets and a function to call
104 * to poll for packets. This helps schedule the receive side packet
105 * processing so that queue backlog doesn't build up and packet processing
106 * doesn't keep getting disturbed by high priority interrupts. As part
107 * of this mode, as soon as a backlog starts building, squeue turns off
108 * the interrupts and switches to poll mode. In poll mode, when poll
109 * thread goes down to retrieve packets, it retrieves them in the form of
110 * a chain which improves performance even more. As the squeue/softring
111 * system gets more packets, it gets more efficient by switching to
112 * polling more often and dealing with larger packet chains.
113 *
114 */
115
116 #include <sys/types.h>
117 #include <sys/cmn_err.h>
118 #include <sys/debug.h>
119 #include <sys/kmem.h>
120 #include <sys/cpuvar.h>
121 #include <sys/condvar_impl.h>
122 #include <sys/systm.h>
123 #include <sys/callb.h>
124 #include <sys/sdt.h>
125 #include <sys/ddi.h>
126 #include <sys/sunddi.h>
127 #include <sys/stack.h>
128 #include <sys/archsystm.h>
129
130 #include <inet/ipclassifier.h>
131 #include <inet/udp_impl.h>
132
133 #include <sys/squeue_impl.h>
134
135 static void squeue_fire(void *);
136 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
137 static void squeue_worker(squeue_t *sqp);
138 static void squeue_polling_thread(squeue_t *sqp);
139
140 kmem_cache_t *squeue_cache;
141
142 #define SQUEUE_MSEC_TO_NSEC 1000000
143
144 int squeue_drain_ms = 20;
145 int squeue_workerwait_ms = 0;
146
147 /* The values above converted to ticks or nano seconds */
148 static int squeue_drain_ns = 0;
149 static int squeue_workerwait_tick = 0;
150
151 uintptr_t squeue_drain_stack_needed = 10240;
152 uint_t squeue_drain_stack_toodeep;
153
154 #define MAX_BYTES_TO_PICKUP 150000
155
156 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \
157 /* \
158 * Enqueue our mblk chain. \
159 */ \
160 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
161 \
162 if ((sqp)->sq_last != NULL) \
163 (sqp)->sq_last->b_next = (mp); \
164 else \
165 (sqp)->sq_first = (mp); \
166 (sqp)->sq_last = (tail); \
167 (sqp)->sq_count += (cnt); \
168 ASSERT((sqp)->sq_count > 0); \
169 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \
170 mblk_t *, mp, mblk_t *, tail, int, cnt); \
171 \
172 }
173
174 /*
175 * Blank the receive ring (in this case it is the soft ring). When
176 * blanked, the soft ring will not send any more packets up.
177 * Blanking may not succeed when there is a CPU already in the soft
178 * ring sending packets up. In that case, SQS_POLLING will not be
179 * set.
180 */
181 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \
182 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
183 if (sq_poll_capable) { \
184 ASSERT(rx_ring != NULL); \
185 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
186 if (!(sqp->sq_state & SQS_POLLING)) { \
187 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \
188 sqp->sq_state |= SQS_POLLING; \
189 } \
190 } \
191 }
192
193 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \
194 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
195 if (sq_poll_capable) { \
196 ASSERT(rx_ring != NULL); \
197 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
198 if (sqp->sq_state & SQS_POLLING) { \
199 sqp->sq_state &= ~SQS_POLLING; \
200 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
201 } \
202 } \
203 }
204
205 /* Wakeup poll thread only if SQS_POLLING is set */
206 #define SQS_POLL_RING(sqp) { \
207 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
208 if (sqp->sq_state & SQS_POLLING) { \
209 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
210 if (!(sqp->sq_state & SQS_GET_PKTS)) { \
211 sqp->sq_state |= SQS_GET_PKTS; \
212 cv_signal(&sqp->sq_poll_cv); \
213 } \
214 } \
215 }
216
217 #ifdef DEBUG
218 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \
219 (sqp)->sq_curmp = (mp); \
220 (sqp)->sq_curproc = (proc); \
221 (sqp)->sq_connp = (connp); \
222 (mp)->b_tag = (sqp)->sq_tag = (tag); \
223 }
224
225 #define SQUEUE_DBG_CLEAR(sqp) { \
226 (sqp)->sq_curmp = NULL; \
227 (sqp)->sq_curproc = NULL; \
228 (sqp)->sq_connp = NULL; \
229 }
230 #else
231 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
232 #define SQUEUE_DBG_CLEAR(sqp)
233 #endif
234
235 void
squeue_init(void)236 squeue_init(void)
237 {
238 squeue_cache = kmem_cache_create("squeue_cache",
239 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
240
241 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
242 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
243 }
244
245 /* ARGSUSED */
246 squeue_t *
squeue_create(clock_t wait,pri_t pri)247 squeue_create(clock_t wait, pri_t pri)
248 {
249 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
250
251 bzero(sqp, sizeof (squeue_t));
252 sqp->sq_bind = PBIND_NONE;
253 sqp->sq_priority = pri;
254 sqp->sq_wait = MSEC_TO_TICK(wait);
255 sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
256 sqp, 0, &p0, TS_RUN, pri);
257
258 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
259 sqp, 0, &p0, TS_RUN, pri);
260
261 sqp->sq_enter = squeue_enter;
262 sqp->sq_drain = squeue_drain;
263
264 return (sqp);
265 }
266
267 /*
268 * Bind squeue worker thread to the specified CPU, given by CPU id.
269 * If the CPU id value is -1, bind the worker thread to the value
270 * specified in sq_bind field. If a thread is already bound to a
271 * different CPU, unbind it from the old CPU and bind to the new one.
272 */
273
274 void
squeue_bind(squeue_t * sqp,processorid_t bind)275 squeue_bind(squeue_t *sqp, processorid_t bind)
276 {
277 mutex_enter(&sqp->sq_lock);
278 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
279 ASSERT(MUTEX_HELD(&cpu_lock));
280
281 if (sqp->sq_state & SQS_BOUND) {
282 if (sqp->sq_bind == bind) {
283 mutex_exit(&sqp->sq_lock);
284 return;
285 }
286 thread_affinity_clear(sqp->sq_worker);
287 } else {
288 sqp->sq_state |= SQS_BOUND;
289 }
290
291 if (bind != PBIND_NONE)
292 sqp->sq_bind = bind;
293
294 thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
295 mutex_exit(&sqp->sq_lock);
296 }
297
298 void
squeue_unbind(squeue_t * sqp)299 squeue_unbind(squeue_t *sqp)
300 {
301 mutex_enter(&sqp->sq_lock);
302 if (!(sqp->sq_state & SQS_BOUND)) {
303 mutex_exit(&sqp->sq_lock);
304 return;
305 }
306
307 sqp->sq_state &= ~SQS_BOUND;
308 thread_affinity_clear(sqp->sq_worker);
309 mutex_exit(&sqp->sq_lock);
310 }
311
312 void
squeue_worker_wakeup(squeue_t * sqp)313 squeue_worker_wakeup(squeue_t *sqp)
314 {
315 timeout_id_t tid = (sqp)->sq_tid;
316
317 ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
318
319 if (sqp->sq_wait == 0) {
320 ASSERT(tid == 0);
321 ASSERT(!(sqp->sq_state & SQS_TMO_PROG));
322 sqp->sq_awaken = ddi_get_lbolt();
323 cv_signal(&sqp->sq_worker_cv);
324 mutex_exit(&sqp->sq_lock);
325 return;
326 }
327
328 /*
329 * Queue isn't being processed, so take
330 * any post enqueue actions needed before leaving.
331 */
332 if (tid != 0) {
333 /*
334 * Waiting for an enter() to process mblk(s).
335 */
336 clock_t now = ddi_get_lbolt();
337 clock_t waited = now - sqp->sq_awaken;
338
339 if (TICK_TO_MSEC(waited) >= sqp->sq_wait) {
340 /*
341 * Times up and have a worker thread
342 * waiting for work, so schedule it.
343 */
344 sqp->sq_tid = 0;
345 sqp->sq_awaken = now;
346 cv_signal(&sqp->sq_worker_cv);
347 mutex_exit(&sqp->sq_lock);
348 (void) untimeout(tid);
349 return;
350 }
351 mutex_exit(&sqp->sq_lock);
352 return;
353 } else if (sqp->sq_state & SQS_TMO_PROG) {
354 mutex_exit(&sqp->sq_lock);
355 return;
356 } else {
357 clock_t wait = sqp->sq_wait;
358 /*
359 * Wait up to sqp->sq_wait ms for an
360 * enter() to process this queue. We
361 * don't want to contend on timeout locks
362 * with sq_lock held for performance reasons,
363 * so drop the sq_lock before calling timeout
364 * but we need to check if timeout is required
365 * after re acquiring the sq_lock. Once
366 * the sq_lock is dropped, someone else could
367 * have processed the packet or the timeout could
368 * have already fired.
369 */
370 sqp->sq_state |= SQS_TMO_PROG;
371 mutex_exit(&sqp->sq_lock);
372 tid = timeout(squeue_fire, sqp, wait);
373 mutex_enter(&sqp->sq_lock);
374 /* Check again if we still need the timeout */
375 if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==
376 SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
377 (sqp->sq_first != NULL)) {
378 sqp->sq_state &= ~SQS_TMO_PROG;
379 sqp->sq_tid = tid;
380 mutex_exit(&sqp->sq_lock);
381 return;
382 } else {
383 if (sqp->sq_state & SQS_TMO_PROG) {
384 sqp->sq_state &= ~SQS_TMO_PROG;
385 mutex_exit(&sqp->sq_lock);
386 (void) untimeout(tid);
387 } else {
388 /*
389 * The timer fired before we could
390 * reacquire the sq_lock. squeue_fire
391 * removes the SQS_TMO_PROG flag
392 * and we don't need to do anything
393 * else.
394 */
395 mutex_exit(&sqp->sq_lock);
396 }
397 }
398 }
399
400 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
401 }
402
403 /*
404 * squeue_enter() - enter squeue sqp with mblk mp (which can be
405 * a chain), while tail points to the end and cnt in number of
406 * mblks in the chain.
407 *
408 * For a chain of single packet (i.e. mp == tail), go through the
409 * fast path if no one is processing the squeue and nothing is queued.
410 *
411 * The proc and arg for each mblk is already stored in the mblk in
412 * appropriate places.
413 *
414 * The process_flag specifies if we are allowed to process the mblk
415 * and drain in the entering thread context. If process_flag is
416 * SQ_FILL, then we just queue the mblk and return (after signaling
417 * the worker thread if no one else is processing the squeue).
418 *
419 * The ira argument can be used when the count is one.
420 * For a chain the caller needs to prepend any needed mblks from
421 * ip_recv_attr_to_mblk().
422 */
423 /* ARGSUSED */
424 void
squeue_enter(squeue_t * sqp,mblk_t * mp,mblk_t * tail,uint32_t cnt,ip_recv_attr_t * ira,int process_flag,uint8_t tag)425 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
426 ip_recv_attr_t *ira, int process_flag, uint8_t tag)
427 {
428 conn_t *connp;
429 sqproc_t proc;
430 hrtime_t now;
431
432 ASSERT(sqp != NULL);
433 ASSERT(mp != NULL);
434 ASSERT(tail != NULL);
435 ASSERT(cnt > 0);
436 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
437 ASSERT(ira == NULL || cnt == 1);
438
439 mutex_enter(&sqp->sq_lock);
440
441 /*
442 * Try to process the packet if SQ_FILL flag is not set and
443 * we are allowed to process the squeue. The SQ_NODRAIN is
444 * ignored if the packet chain consists of more than 1 packet.
445 */
446 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
447 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
448 /*
449 * See if anything is already queued. If we are the
450 * first packet, do inline processing else queue the
451 * packet and do the drain.
452 */
453 if (sqp->sq_first == NULL && cnt == 1) {
454 /*
455 * Fast-path, ok to process and nothing queued.
456 */
457 sqp->sq_state |= (SQS_PROC|SQS_FAST);
458 sqp->sq_run = curthread;
459 mutex_exit(&sqp->sq_lock);
460
461 /*
462 * We are the chain of 1 packet so
463 * go through this fast path.
464 */
465 ASSERT(mp->b_prev != NULL);
466 ASSERT(mp->b_queue != NULL);
467 connp = (conn_t *)mp->b_prev;
468 mp->b_prev = NULL;
469 proc = (sqproc_t)mp->b_queue;
470 mp->b_queue = NULL;
471 ASSERT(proc != NULL && connp != NULL);
472 ASSERT(mp->b_next == NULL);
473
474 /*
475 * Handle squeue switching. More details in the
476 * block comment at the top of the file
477 */
478 if (connp->conn_sqp == sqp) {
479 SQUEUE_DBG_SET(sqp, mp, proc, connp,
480 tag);
481 connp->conn_on_sqp = B_TRUE;
482 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
483 sqp, mblk_t *, mp, conn_t *, connp);
484 (*proc)(connp, mp, sqp, ira);
485 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
486 sqp, conn_t *, connp);
487 connp->conn_on_sqp = B_FALSE;
488 SQUEUE_DBG_CLEAR(sqp);
489 CONN_DEC_REF(connp);
490 } else {
491 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
492 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
493 }
494 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
495 mutex_enter(&sqp->sq_lock);
496 sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
497 sqp->sq_run = NULL;
498 if (sqp->sq_first == NULL ||
499 process_flag == SQ_NODRAIN) {
500 if (sqp->sq_first != NULL) {
501 squeue_worker_wakeup(sqp);
502 return;
503 }
504 /*
505 * We processed inline our packet and nothing
506 * new has arrived. We are done. In case any
507 * control actions are pending, wake up the
508 * worker.
509 */
510 if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
511 cv_signal(&sqp->sq_worker_cv);
512 mutex_exit(&sqp->sq_lock);
513 return;
514 }
515 } else {
516 if (ira != NULL) {
517 mblk_t *attrmp;
518
519 ASSERT(cnt == 1);
520 attrmp = ip_recv_attr_to_mblk(ira);
521 if (attrmp == NULL) {
522 mutex_exit(&sqp->sq_lock);
523 ip_drop_input("squeue: "
524 "ip_recv_attr_to_mblk",
525 mp, NULL);
526 /* Caller already set b_prev/b_next */
527 mp->b_prev = mp->b_next = NULL;
528 freemsg(mp);
529 return;
530 }
531 ASSERT(attrmp->b_cont == NULL);
532 attrmp->b_cont = mp;
533 /* Move connp and func to new */
534 attrmp->b_queue = mp->b_queue;
535 mp->b_queue = NULL;
536 attrmp->b_prev = mp->b_prev;
537 mp->b_prev = NULL;
538
539 ASSERT(mp == tail);
540 tail = mp = attrmp;
541 }
542
543 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
544 #ifdef DEBUG
545 mp->b_tag = tag;
546 #endif
547 }
548 /*
549 * We are here because either we couldn't do inline
550 * processing (because something was already queued),
551 * or we had a chain of more than one packet,
552 * or something else arrived after we were done with
553 * inline processing.
554 */
555 ASSERT(MUTEX_HELD(&sqp->sq_lock));
556 ASSERT(sqp->sq_first != NULL);
557 now = gethrtime();
558 sqp->sq_run = curthread;
559 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
560
561 /*
562 * If we didn't do a complete drain, the worker
563 * thread was already signalled by squeue_drain.
564 * In case any control actions are pending, wake
565 * up the worker.
566 */
567 sqp->sq_run = NULL;
568 if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
569 cv_signal(&sqp->sq_worker_cv);
570 mutex_exit(&sqp->sq_lock);
571 return;
572 } else {
573 /*
574 * We let a thread processing a squeue reenter only
575 * once. This helps the case of incoming connection
576 * where a SYN-ACK-ACK that triggers the conn_ind
577 * doesn't have to queue the packet if listener and
578 * eager are on the same squeue. Also helps the
579 * loopback connection where the two ends are bound
580 * to the same squeue (which is typical on single
581 * CPU machines).
582 *
583 * We let the thread reenter only once for the fear
584 * of stack getting blown with multiple traversal.
585 */
586 connp = (conn_t *)mp->b_prev;
587 if (!(sqp->sq_state & SQS_REENTER) &&
588 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
589 (sqp->sq_run == curthread) && (cnt == 1) &&
590 (connp->conn_on_sqp == B_FALSE)) {
591 sqp->sq_state |= SQS_REENTER;
592 mutex_exit(&sqp->sq_lock);
593
594 ASSERT(mp->b_prev != NULL);
595 ASSERT(mp->b_queue != NULL);
596
597 mp->b_prev = NULL;
598 proc = (sqproc_t)mp->b_queue;
599 mp->b_queue = NULL;
600
601 /*
602 * Handle squeue switching. More details in the
603 * block comment at the top of the file
604 */
605 if (connp->conn_sqp == sqp) {
606 connp->conn_on_sqp = B_TRUE;
607 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
608 sqp, mblk_t *, mp, conn_t *, connp);
609 (*proc)(connp, mp, sqp, ira);
610 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
611 sqp, conn_t *, connp);
612 connp->conn_on_sqp = B_FALSE;
613 CONN_DEC_REF(connp);
614 } else {
615 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
616 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
617 }
618
619 mutex_enter(&sqp->sq_lock);
620 sqp->sq_state &= ~SQS_REENTER;
621 mutex_exit(&sqp->sq_lock);
622 return;
623 }
624
625 /*
626 * Queue is already being processed or there is already
627 * one or more paquets on the queue. Enqueue the
628 * packet and wakeup the squeue worker thread if the
629 * squeue is not being processed.
630 */
631 #ifdef DEBUG
632 mp->b_tag = tag;
633 #endif
634 if (ira != NULL) {
635 mblk_t *attrmp;
636
637 ASSERT(cnt == 1);
638 attrmp = ip_recv_attr_to_mblk(ira);
639 if (attrmp == NULL) {
640 mutex_exit(&sqp->sq_lock);
641 ip_drop_input("squeue: ip_recv_attr_to_mblk",
642 mp, NULL);
643 /* Caller already set b_prev/b_next */
644 mp->b_prev = mp->b_next = NULL;
645 freemsg(mp);
646 return;
647 }
648 ASSERT(attrmp->b_cont == NULL);
649 attrmp->b_cont = mp;
650 /* Move connp and func to new */
651 attrmp->b_queue = mp->b_queue;
652 mp->b_queue = NULL;
653 attrmp->b_prev = mp->b_prev;
654 mp->b_prev = NULL;
655
656 ASSERT(mp == tail);
657 tail = mp = attrmp;
658 }
659 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
660 if (!(sqp->sq_state & SQS_PROC)) {
661 squeue_worker_wakeup(sqp);
662 return;
663 }
664 /*
665 * In case any control actions are pending, wake
666 * up the worker.
667 */
668 if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
669 cv_signal(&sqp->sq_worker_cv);
670 mutex_exit(&sqp->sq_lock);
671 return;
672 }
673 }
674
675 /*
676 * PRIVATE FUNCTIONS
677 */
678
679 static void
squeue_fire(void * arg)680 squeue_fire(void *arg)
681 {
682 squeue_t *sqp = arg;
683 uint_t state;
684
685 mutex_enter(&sqp->sq_lock);
686
687 state = sqp->sq_state;
688 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) {
689 mutex_exit(&sqp->sq_lock);
690 return;
691 }
692
693 sqp->sq_tid = 0;
694 /*
695 * The timeout fired before we got a chance to set it.
696 * Process it anyway but remove the SQS_TMO_PROG so that
697 * the guy trying to set the timeout knows that it has
698 * already been processed.
699 */
700 if (state & SQS_TMO_PROG)
701 sqp->sq_state &= ~SQS_TMO_PROG;
702
703 if (!(state & SQS_PROC)) {
704 sqp->sq_awaken = ddi_get_lbolt();
705 cv_signal(&sqp->sq_worker_cv);
706 }
707 mutex_exit(&sqp->sq_lock);
708 }
709
710 static void
squeue_drain(squeue_t * sqp,uint_t proc_type,hrtime_t expire)711 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
712 {
713 mblk_t *mp;
714 mblk_t *head;
715 sqproc_t proc;
716 conn_t *connp;
717 timeout_id_t tid;
718 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring;
719 hrtime_t now;
720 boolean_t did_wakeup = B_FALSE;
721 boolean_t sq_poll_capable;
722 ip_recv_attr_t *ira, iras;
723
724 /*
725 * Before doing any work, check our stack depth; if we're not a
726 * worker thread for this squeue and we're beginning to get tight on
727 * on stack, kick the worker, bump a counter and return.
728 */
729 if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() -
730 (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) {
731 ASSERT(mutex_owned(&sqp->sq_lock));
732 sqp->sq_awaken = ddi_get_lbolt();
733 cv_signal(&sqp->sq_worker_cv);
734 squeue_drain_stack_toodeep++;
735 return;
736 }
737
738 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
739 again:
740 ASSERT(mutex_owned(&sqp->sq_lock));
741 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
742 SQS_POLL_QUIESCE_DONE)));
743
744 head = sqp->sq_first;
745 sqp->sq_first = NULL;
746 sqp->sq_last = NULL;
747 sqp->sq_count = 0;
748
749 if ((tid = sqp->sq_tid) != 0)
750 sqp->sq_tid = 0;
751
752 sqp->sq_state |= SQS_PROC | proc_type;
753
754 /*
755 * We have backlog built up. Switch to polling mode if the
756 * device underneath allows it. Need to do it so that
757 * more packets don't come in and disturb us (by contending
758 * for sq_lock or higher priority thread preempting us).
759 *
760 * The worker thread is allowed to do active polling while we
761 * just disable the interrupts for drain by non worker (kernel
762 * or userland) threads so they can peacefully process the
763 * packets during time allocated to them.
764 */
765 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
766 mutex_exit(&sqp->sq_lock);
767
768 if (tid != 0)
769 (void) untimeout(tid);
770
771 while ((mp = head) != NULL) {
772
773 head = mp->b_next;
774 mp->b_next = NULL;
775
776 proc = (sqproc_t)mp->b_queue;
777 mp->b_queue = NULL;
778 connp = (conn_t *)mp->b_prev;
779 mp->b_prev = NULL;
780
781 /* Is there an ip_recv_attr_t to handle? */
782 if (ip_recv_attr_is_mblk(mp)) {
783 mblk_t *attrmp = mp;
784
785 ASSERT(attrmp->b_cont != NULL);
786
787 mp = attrmp->b_cont;
788 attrmp->b_cont = NULL;
789 ASSERT(mp->b_queue == NULL);
790 ASSERT(mp->b_prev == NULL);
791
792 if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
793 /* The ill or ip_stack_t disappeared on us */
794 ip_drop_input("ip_recv_attr_from_mblk",
795 mp, NULL);
796 ira_cleanup(&iras, B_TRUE);
797 CONN_DEC_REF(connp);
798 continue;
799 }
800 ira = &iras;
801 } else {
802 ira = NULL;
803 }
804
805
806 /*
807 * Handle squeue switching. More details in the
808 * block comment at the top of the file
809 */
810 if (connp->conn_sqp == sqp) {
811 SQUEUE_DBG_SET(sqp, mp, proc, connp,
812 mp->b_tag);
813 connp->conn_on_sqp = B_TRUE;
814 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
815 sqp, mblk_t *, mp, conn_t *, connp);
816 (*proc)(connp, mp, sqp, ira);
817 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
818 sqp, conn_t *, connp);
819 connp->conn_on_sqp = B_FALSE;
820 CONN_DEC_REF(connp);
821 } else {
822 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira,
823 SQ_FILL, SQTAG_SQUEUE_CHANGE);
824 }
825 if (ira != NULL)
826 ira_cleanup(ira, B_TRUE);
827 }
828
829 SQUEUE_DBG_CLEAR(sqp);
830
831 mutex_enter(&sqp->sq_lock);
832
833 /*
834 * Check if there is still work to do (either more arrived or timer
835 * expired). If we are the worker thread and we are polling capable,
836 * continue doing the work since no one else is around to do the
837 * work anyway (but signal the poll thread to retrieve some packets
838 * in the meanwhile). If we are not the worker thread, just
839 * signal the worker thread to take up the work if processing time
840 * has expired.
841 */
842 if (sqp->sq_first != NULL) {
843 /*
844 * Still more to process. If time quanta not expired, we
845 * should let the drain go on. The worker thread is allowed
846 * to drain as long as there is anything left.
847 */
848 now = gethrtime();
849 if ((now < expire) || (proc_type == SQS_WORKER)) {
850 /*
851 * If time not expired or we are worker thread and
852 * this squeue is polling capable, continue to do
853 * the drain.
854 *
855 * We turn off interrupts for all userland threads
856 * doing drain but we do active polling only for
857 * worker thread.
858 *
859 * Calling SQS_POLL_RING() even in the case of
860 * SQS_POLLING_ON() not succeeding is ok as
861 * SQS_POLL_RING() will not wake up poll thread
862 * if SQS_POLLING bit is not set.
863 */
864 if (proc_type == SQS_WORKER)
865 SQS_POLL_RING(sqp);
866 goto again;
867 } else {
868 did_wakeup = B_TRUE;
869 sqp->sq_awaken = ddi_get_lbolt();
870 cv_signal(&sqp->sq_worker_cv);
871 }
872 }
873
874 /*
875 * If the poll thread is already running, just return. The
876 * poll thread continues to hold the proc and will finish
877 * processing.
878 */
879 if (sqp->sq_state & SQS_GET_PKTS) {
880 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
881 SQS_POLL_QUIESCE_DONE)));
882 sqp->sq_state &= ~proc_type;
883 return;
884 }
885
886 /*
887 *
888 * If we are the worker thread and no work is left, send the poll
889 * thread down once more to see if something arrived. Otherwise,
890 * turn the interrupts back on and we are done.
891 */
892 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) {
893 /*
894 * Do one last check to see if anything arrived
895 * in the NIC. We leave the SQS_PROC set to ensure
896 * that poll thread keeps the PROC and can decide
897 * if it needs to turn polling off or continue
898 * processing.
899 *
900 * If we drop the SQS_PROC here and poll thread comes
901 * up empty handed, it can not safely turn polling off
902 * since someone else could have acquired the PROC
903 * and started draining. The previously running poll
904 * thread and the current thread doing drain would end
905 * up in a race for turning polling on/off and more
906 * complex code would be required to deal with it.
907 *
908 * Its lot simpler for drain to hand the SQS_PROC to
909 * poll thread (if running) and let poll thread finish
910 * without worrying about racing with any other thread.
911 */
912 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
913 SQS_POLL_QUIESCE_DONE)));
914 SQS_POLL_RING(sqp);
915 sqp->sq_state &= ~proc_type;
916 } else {
917 /*
918 * The squeue is either not capable of polling or the
919 * attempt to blank (i.e., turn SQS_POLLING_ON()) was
920 * unsuccessful or poll thread already finished
921 * processing and didn't find anything. Since there
922 * is nothing queued and we already turn polling on
923 * (for all threads doing drain), we should turn
924 * polling off and relinquish the PROC.
925 */
926 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
927 SQS_POLL_QUIESCE_DONE)));
928 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
929 sqp->sq_state &= ~(SQS_PROC | proc_type);
930 if (!did_wakeup && sqp->sq_first != NULL) {
931 squeue_worker_wakeup(sqp);
932 mutex_enter(&sqp->sq_lock);
933 }
934 /*
935 * If we are not the worker and there is a pending quiesce
936 * event, wake up the worker
937 */
938 if ((proc_type != SQS_WORKER) &&
939 (sqp->sq_state & SQS_WORKER_THR_CONTROL))
940 cv_signal(&sqp->sq_worker_cv);
941 }
942 }
943
944 /*
945 * Quiesce, Restart, or Cleanup of the squeue poll thread.
946 *
947 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
948 * not attempt to poll the underlying soft ring any more. The quiesce is
949 * triggered by the mac layer when it wants to quiesce a soft ring. Typically
950 * control operations such as changing the fanout of a NIC or VNIC (dladm
951 * setlinkprop) need to quiesce data flow before changing the wiring.
952 * The operation is done by the mac layer, but it calls back into IP to
953 * quiesce the soft ring. After completing the operation (say increase or
954 * decrease of the fanout) the mac layer then calls back into IP to restart
955 * the quiesced soft ring.
956 *
957 * Cleanup: This is triggered when the squeue binding to a soft ring is
958 * removed permanently. Typically interface plumb and unplumb would trigger
959 * this. It can also be triggered from the mac layer when a soft ring is
960 * being deleted say as the result of a fanout reduction. Since squeues are
961 * never deleted, the cleanup marks the squeue as fit for recycling and
962 * moves it to the zeroth squeue set.
963 */
964 static void
squeue_poll_thr_control(squeue_t * sqp)965 squeue_poll_thr_control(squeue_t *sqp)
966 {
967 if (sqp->sq_state & SQS_POLL_THR_RESTART) {
968 /* Restart implies a previous quiesce */
969 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
970 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
971 SQS_POLL_THR_RESTART);
972 sqp->sq_state |= SQS_POLL_CAPAB;
973 cv_signal(&sqp->sq_worker_cv);
974 return;
975 }
976
977 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
978 sqp->sq_state |= SQS_POLL_THR_QUIESCED;
979 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
980 cv_signal(&sqp->sq_worker_cv);
981 return;
982 }
983 }
984
985 /*
986 * POLLING Notes
987 *
988 * With polling mode, we want to do as much processing as we possibly can
989 * in worker thread context. The sweet spot is worker thread keeps doing
990 * work all the time in polling mode and writers etc. keep dumping packets
991 * to worker thread. Occassionally, we send the poll thread (running at
992 * lower priority to NIC to get the chain of packets to feed to worker).
993 * Sending the poll thread down to NIC is dependant on 3 criterions
994 *
995 * 1) Its always driven from squeue_drain and only if worker thread is
996 * doing the drain.
997 * 2) We clear the backlog once and more packets arrived in between.
998 * Before starting drain again, send the poll thread down if
999 * the drain is being done by worker thread.
1000 * 3) Before exiting the squeue_drain, if the poll thread is not already
1001 * working and we are the worker thread, try to poll one more time.
1002 *
1003 * For latency sake, we do allow any thread calling squeue_enter
1004 * to process its packet provided:
1005 *
1006 * 1) Nothing is queued
1007 * 2) If more packets arrived in between, the non worker thread are allowed
1008 * to do the drain till their time quanta expired provided SQS_GET_PKTS
1009 * wasn't set in between.
1010 *
1011 * Avoiding deadlocks with interrupts
1012 * ==================================
1013 *
1014 * One of the big problem is that we can't send poll_thr down while holding
1015 * the sq_lock since the thread can block. So we drop the sq_lock before
1016 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
1017 * poll thread is running so that no other thread can acquire the
1018 * perimeter in between. If the squeue_drain gets done (no more work
1019 * left), it leaves the SQS_PROC set if poll thread is running.
1020 */
1021
1022 /*
1023 * This is the squeue poll thread. In poll mode, it polls the underlying
1024 * TCP softring and feeds packets into the squeue. The worker thread then
1025 * drains the squeue. The poll thread also responds to control signals for
1026 * quiesceing, restarting, or cleanup of an squeue. These are driven by
1027 * control operations like plumb/unplumb or as a result of dynamic Rx ring
1028 * related operations that are driven from the mac layer.
1029 */
1030 static void
squeue_polling_thread(squeue_t * sqp)1031 squeue_polling_thread(squeue_t *sqp)
1032 {
1033 kmutex_t *lock = &sqp->sq_lock;
1034 kcondvar_t *async = &sqp->sq_poll_cv;
1035 ip_mac_rx_t sq_get_pkts;
1036 ip_accept_t ip_accept;
1037 ill_rx_ring_t *sq_rx_ring;
1038 ill_t *sq_ill;
1039 mblk_t *head, *tail, *mp;
1040 uint_t cnt;
1041 void *sq_mac_handle;
1042 callb_cpr_t cprinfo;
1043 size_t bytes_to_pickup;
1044 uint32_t ctl_state;
1045
1046 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
1047 mutex_enter(lock);
1048
1049 for (;;) {
1050 CALLB_CPR_SAFE_BEGIN(&cprinfo);
1051 cv_wait(async, lock);
1052 CALLB_CPR_SAFE_END(&cprinfo, lock);
1053
1054 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
1055 SQS_POLL_THR_QUIESCED);
1056 if (ctl_state != 0) {
1057 /*
1058 * If the squeue is quiesced, then wait for a control
1059 * request. A quiesced squeue must not poll the
1060 * underlying soft ring.
1061 */
1062 if (ctl_state == SQS_POLL_THR_QUIESCED)
1063 continue;
1064 /*
1065 * Act on control requests to quiesce, cleanup or
1066 * restart an squeue
1067 */
1068 squeue_poll_thr_control(sqp);
1069 continue;
1070 }
1071
1072 if (!(sqp->sq_state & SQS_POLL_CAPAB))
1073 continue;
1074
1075 ASSERT((sqp->sq_state &
1076 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
1077 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
1078
1079 poll_again:
1080 sq_rx_ring = sqp->sq_rx_ring;
1081 sq_get_pkts = sq_rx_ring->rr_rx;
1082 sq_mac_handle = sq_rx_ring->rr_rx_handle;
1083 ip_accept = sq_rx_ring->rr_ip_accept;
1084 sq_ill = sq_rx_ring->rr_ill;
1085 bytes_to_pickup = MAX_BYTES_TO_PICKUP;
1086 mutex_exit(lock);
1087 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
1088 mp = NULL;
1089 if (head != NULL) {
1090 /*
1091 * We got the packet chain from the mac layer. It
1092 * would be nice to be able to process it inline
1093 * for better performance but we need to give
1094 * IP a chance to look at this chain to ensure
1095 * that packets are really meant for this squeue
1096 * and do the IP processing.
1097 */
1098 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
1099 &tail, &cnt);
1100 }
1101 mutex_enter(lock);
1102 if (mp != NULL) {
1103 /*
1104 * The ip_accept function has already added an
1105 * ip_recv_attr_t mblk if that is needed.
1106 */
1107 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
1108 }
1109 ASSERT((sqp->sq_state &
1110 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
1111 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
1112
1113 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
1114 /*
1115 * We have packets to process and worker thread
1116 * is not running. Check to see if poll thread is
1117 * allowed to process. Let it do processing only if it
1118 * picked up some packets from the NIC otherwise
1119 * wakeup the worker thread.
1120 */
1121 if (mp != NULL) {
1122 hrtime_t now;
1123
1124 now = gethrtime();
1125 sqp->sq_run = curthread;
1126 sqp->sq_drain(sqp, SQS_POLL_PROC, now +
1127 squeue_drain_ns);
1128 sqp->sq_run = NULL;
1129
1130 if (sqp->sq_first == NULL)
1131 goto poll_again;
1132
1133 /*
1134 * Couldn't do the entire drain because the
1135 * time limit expired, let the
1136 * worker thread take over.
1137 */
1138 }
1139
1140 sqp->sq_awaken = ddi_get_lbolt();
1141 /*
1142 * Put the SQS_PROC_HELD on so the worker
1143 * thread can distinguish where its called from. We
1144 * can remove the SQS_PROC flag here and turn off the
1145 * polling so that it wouldn't matter who gets the
1146 * processing but we get better performance this way
1147 * and save the cost of turn polling off and possibly
1148 * on again as soon as we start draining again.
1149 *
1150 * We can't remove the SQS_PROC flag without turning
1151 * polling off until we can guarantee that control
1152 * will return to squeue_drain immediately.
1153 */
1154 sqp->sq_state |= SQS_PROC_HELD;
1155 sqp->sq_state &= ~SQS_GET_PKTS;
1156 cv_signal(&sqp->sq_worker_cv);
1157 } else if (sqp->sq_first == NULL &&
1158 !(sqp->sq_state & SQS_WORKER)) {
1159 /*
1160 * Nothing queued and worker thread not running.
1161 * Since we hold the proc, no other thread is
1162 * processing the squeue. This means that there
1163 * is no work to be done and nothing is queued
1164 * in squeue or in NIC. Turn polling off and go
1165 * back to interrupt mode.
1166 */
1167 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
1168 /* LINTED: constant in conditional context */
1169 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
1170
1171 /*
1172 * If there is a pending control operation
1173 * wake up the worker, since it is currently
1174 * not running.
1175 */
1176 if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
1177 cv_signal(&sqp->sq_worker_cv);
1178 } else {
1179 /*
1180 * Worker thread is already running. We don't need
1181 * to do anything. Indicate that poll thread is done.
1182 */
1183 sqp->sq_state &= ~SQS_GET_PKTS;
1184 }
1185 if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
1186 /*
1187 * Act on control requests to quiesce, cleanup or
1188 * restart an squeue
1189 */
1190 squeue_poll_thr_control(sqp);
1191 }
1192 }
1193 }
1194
1195 /*
1196 * The squeue worker thread acts on any control requests to quiesce, cleanup
1197 * or restart an ill_rx_ring_t by calling this function. The worker thread
1198 * synchronizes with the squeue poll thread to complete the request and finally
1199 * wakes up the requestor when the request is completed.
1200 */
1201 static void
squeue_worker_thr_control(squeue_t * sqp)1202 squeue_worker_thr_control(squeue_t *sqp)
1203 {
1204 ill_t *ill;
1205 ill_rx_ring_t *rx_ring;
1206
1207 ASSERT(MUTEX_HELD(&sqp->sq_lock));
1208
1209 if (sqp->sq_state & SQS_POLL_RESTART) {
1210 /* Restart implies a previous quiesce. */
1211 ASSERT((sqp->sq_state & (SQS_PROC_HELD |
1212 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
1213 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
1214 /*
1215 * Request the squeue poll thread to restart and wait till
1216 * it actually restarts.
1217 */
1218 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
1219 sqp->sq_state |= SQS_POLL_THR_RESTART;
1220 cv_signal(&sqp->sq_poll_cv);
1221 while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
1222 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1223 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
1224 SQS_WORKER);
1225 /*
1226 * Signal any waiter that is waiting for the restart
1227 * to complete
1228 */
1229 sqp->sq_state |= SQS_POLL_RESTART_DONE;
1230 cv_signal(&sqp->sq_ctrlop_done_cv);
1231 return;
1232 }
1233
1234 if (sqp->sq_state & SQS_PROC_HELD) {
1235 /* The squeue poll thread handed control to us */
1236 ASSERT(sqp->sq_state & SQS_PROC);
1237 }
1238
1239 /*
1240 * Prevent any other thread from processing the squeue
1241 * until we finish the control actions by setting SQS_PROC.
1242 * But allow ourself to reenter by setting SQS_WORKER
1243 */
1244 sqp->sq_state |= (SQS_PROC | SQS_WORKER);
1245
1246 /* Signal the squeue poll thread and wait for it to quiesce itself */
1247 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
1248 sqp->sq_state |= SQS_POLL_THR_QUIESCE;
1249 cv_signal(&sqp->sq_poll_cv);
1250 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
1251 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1252 }
1253
1254 rx_ring = sqp->sq_rx_ring;
1255 ill = rx_ring->rr_ill;
1256 /*
1257 * The lock hierarchy is as follows.
1258 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1259 */
1260 mutex_exit(&sqp->sq_lock);
1261 mutex_enter(&ill->ill_lock);
1262 mutex_enter(&sqp->sq_lock);
1263
1264 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
1265 sqp->sq_rx_ring);
1266 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
1267 if (sqp->sq_state & SQS_POLL_CLEANUP) {
1268 /*
1269 * Disassociate this squeue from its ill_rx_ring_t.
1270 * The rr_sqp, sq_rx_ring fields are protected by the
1271 * corresponding squeue, ill_lock* and sq_lock. Holding any
1272 * of them will ensure that the ring to squeue mapping does
1273 * not change.
1274 */
1275 ASSERT(!(sqp->sq_state & SQS_DEFAULT));
1276
1277 sqp->sq_rx_ring = NULL;
1278 rx_ring->rr_sqp = NULL;
1279
1280 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
1281 SQS_POLL_QUIESCE_DONE);
1282 sqp->sq_ill = NULL;
1283
1284 rx_ring->rr_rx_handle = NULL;
1285 rx_ring->rr_intr_handle = NULL;
1286 rx_ring->rr_intr_enable = NULL;
1287 rx_ring->rr_intr_disable = NULL;
1288 sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
1289 } else {
1290 sqp->sq_state &= ~SQS_POLL_QUIESCE;
1291 sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
1292 }
1293 /*
1294 * Signal any waiter that is waiting for the quiesce or cleanup
1295 * to complete and also wait for it to actually see and reset the
1296 * SQS_POLL_CLEANUP_DONE.
1297 */
1298 cv_signal(&sqp->sq_ctrlop_done_cv);
1299 mutex_exit(&ill->ill_lock);
1300 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
1301 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1302 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
1303 }
1304 }
1305
1306 static void
squeue_worker(squeue_t * sqp)1307 squeue_worker(squeue_t *sqp)
1308 {
1309 kmutex_t *lock = &sqp->sq_lock;
1310 kcondvar_t *async = &sqp->sq_worker_cv;
1311 callb_cpr_t cprinfo;
1312 hrtime_t now;
1313
1314 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
1315 mutex_enter(lock);
1316
1317 for (;;) {
1318 for (;;) {
1319 /*
1320 * If the poll thread has handed control to us
1321 * we need to break out of the wait.
1322 */
1323 if (sqp->sq_state & SQS_PROC_HELD)
1324 break;
1325
1326 /*
1327 * If the squeue is not being processed and we either
1328 * have messages to drain or some thread has signaled
1329 * some control activity we need to break
1330 */
1331 if (!(sqp->sq_state & SQS_PROC) &&
1332 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
1333 (sqp->sq_first != NULL)))
1334 break;
1335
1336 /*
1337 * If we have started some control action, then check
1338 * for the SQS_WORKER flag (since we don't
1339 * release the squeue) to make sure we own the squeue
1340 * and break out
1341 */
1342 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
1343 (sqp->sq_state & SQS_WORKER))
1344 break;
1345
1346 CALLB_CPR_SAFE_BEGIN(&cprinfo);
1347 cv_wait(async, lock);
1348 CALLB_CPR_SAFE_END(&cprinfo, lock);
1349 }
1350 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1351 squeue_worker_thr_control(sqp);
1352 continue;
1353 }
1354 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
1355 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
1356 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
1357
1358 if (sqp->sq_state & SQS_PROC_HELD)
1359 sqp->sq_state &= ~SQS_PROC_HELD;
1360
1361 now = gethrtime();
1362 sqp->sq_run = curthread;
1363 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns);
1364 sqp->sq_run = NULL;
1365 }
1366 }
1367
1368 uintptr_t *
squeue_getprivate(squeue_t * sqp,sqprivate_t p)1369 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1370 {
1371 ASSERT(p < SQPRIVATE_MAX);
1372
1373 return (&sqp->sq_private[p]);
1374 }
1375
1376 /* ARGSUSED */
1377 void
squeue_wakeup_conn(void * arg,mblk_t * mp,void * arg2,ip_recv_attr_t * dummy)1378 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy)
1379 {
1380 conn_t *connp = (conn_t *)arg;
1381 squeue_t *sqp = connp->conn_sqp;
1382
1383 /*
1384 * Mark the squeue as paused before waking up the thread stuck
1385 * in squeue_synch_enter().
1386 */
1387 mutex_enter(&sqp->sq_lock);
1388 sqp->sq_state |= SQS_PAUSE;
1389
1390 /*
1391 * Notify the thread that it's OK to proceed; that is done by
1392 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1393 */
1394 ASSERT(mp->b_flag & MSGWAITSYNC);
1395 mp->b_flag &= ~MSGWAITSYNC;
1396 cv_broadcast(&connp->conn_sq_cv);
1397
1398 /*
1399 * We are doing something on behalf of another thread, so we have to
1400 * pause and wait until it finishes.
1401 */
1402 while (sqp->sq_state & SQS_PAUSE) {
1403 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock);
1404 }
1405 mutex_exit(&sqp->sq_lock);
1406 }
1407
1408 int
squeue_synch_enter(conn_t * connp,mblk_t * use_mp)1409 squeue_synch_enter(conn_t *connp, mblk_t *use_mp)
1410 {
1411 squeue_t *sqp;
1412
1413 again:
1414 sqp = connp->conn_sqp;
1415
1416 mutex_enter(&sqp->sq_lock);
1417 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
1418 /*
1419 * We are OK to proceed if the squeue is empty, and
1420 * no one owns the squeue.
1421 *
1422 * The caller won't own the squeue as this is called from the
1423 * application.
1424 */
1425 ASSERT(sqp->sq_run == NULL);
1426
1427 sqp->sq_state |= SQS_PROC;
1428 sqp->sq_run = curthread;
1429 mutex_exit(&sqp->sq_lock);
1430
1431 /*
1432 * Handle squeue switching. The conn's squeue can only change
1433 * while there is a thread in the squeue, which is why we do
1434 * the check after entering the squeue. If it has changed, exit
1435 * this squeue and redo everything with the new sqeueue.
1436 */
1437 if (sqp != connp->conn_sqp) {
1438 mutex_enter(&sqp->sq_lock);
1439 sqp->sq_state &= ~SQS_PROC;
1440 sqp->sq_run = NULL;
1441 mutex_exit(&sqp->sq_lock);
1442 goto again;
1443 }
1444 #if SQUEUE_DEBUG
1445 sqp->sq_curmp = NULL;
1446 sqp->sq_curproc = NULL;
1447 sqp->sq_connp = connp;
1448 #endif
1449 connp->conn_on_sqp = B_TRUE;
1450 return (0);
1451 } else {
1452 mblk_t *mp;
1453
1454 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp;
1455 if (mp == NULL) {
1456 mutex_exit(&sqp->sq_lock);
1457 return (ENOMEM);
1458 }
1459
1460 /*
1461 * We mark the mblk as awaiting synchronous squeue access
1462 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1463 * fires, MSGWAITSYNC is cleared, at which point we know we
1464 * have exclusive access.
1465 */
1466 mp->b_flag |= MSGWAITSYNC;
1467
1468 CONN_INC_REF(connp);
1469 SET_SQUEUE(mp, squeue_wakeup_conn, connp);
1470 ENQUEUE_CHAIN(sqp, mp, mp, 1);
1471
1472 ASSERT(sqp->sq_run != curthread);
1473
1474 /* Wait until the enqueued mblk get processed. */
1475 while (mp->b_flag & MSGWAITSYNC)
1476 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock);
1477 mutex_exit(&sqp->sq_lock);
1478
1479 if (use_mp == NULL)
1480 freeb(mp);
1481
1482 return (0);
1483 }
1484 }
1485
1486 void
squeue_synch_exit(conn_t * connp)1487 squeue_synch_exit(conn_t *connp)
1488 {
1489 squeue_t *sqp = connp->conn_sqp;
1490
1491 mutex_enter(&sqp->sq_lock);
1492 if (sqp->sq_run == curthread) {
1493 ASSERT(sqp->sq_state & SQS_PROC);
1494
1495 sqp->sq_state &= ~SQS_PROC;
1496 sqp->sq_run = NULL;
1497 connp->conn_on_sqp = B_FALSE;
1498
1499 if (sqp->sq_first == NULL) {
1500 mutex_exit(&sqp->sq_lock);
1501 } else {
1502 /*
1503 * If this was a normal thread, then it would
1504 * (most likely) continue processing the pending
1505 * requests. Since the just completed operation
1506 * was executed synchronously, the thread should
1507 * not be delayed. To compensate, wake up the
1508 * worker thread right away when there are outstanding
1509 * requests.
1510 */
1511 sqp->sq_awaken = ddi_get_lbolt();
1512 cv_signal(&sqp->sq_worker_cv);
1513 mutex_exit(&sqp->sq_lock);
1514 }
1515 } else {
1516 /*
1517 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1518 * and wake up the squeue owner, such that owner can continue
1519 * processing.
1520 */
1521 ASSERT(sqp->sq_state & SQS_PAUSE);
1522 sqp->sq_state &= ~SQS_PAUSE;
1523
1524 /* There should be only one thread blocking on sq_synch_cv. */
1525 cv_signal(&sqp->sq_synch_cv);
1526 mutex_exit(&sqp->sq_lock);
1527 }
1528 }
1529