1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
23 */
24
25 /*
26 * Copyright 2017 Joyent, Inc.
27 */
28
29 /*
30 * Squeues: General purpose serialization mechanism
31 * ------------------------------------------------
32 *
33 * Background:
34 * -----------
35 *
36 * This is a general purpose high-performance serialization mechanism
37 * currently used by TCP/IP. It is implement by means of a per CPU queue,
38 * a worker thread and a polling thread with are bound to the CPU
39 * associated with the squeue. The squeue is strictly FIFO for both read
40 * and write side and only one thread can process it at any given time.
41 * The design goal of squeue was to offer a very high degree of
42 * parallelization (on a per H/W execution pipeline basis) with at
43 * most one queuing.
44 *
45 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or
46 * SQUEUE_ENTER() macro as soon as a thread enter the module
47 * from either direction. For each packet, the processing function
48 * and argument is stored in the mblk itself. When the packet is ready
49 * to be processed, the squeue retrieves the stored function and calls
50 * it with the supplied argument and the pointer to the packet itself.
51 * The called function can assume that no other thread is processing
52 * the squeue when it is executing.
53 *
54 * Squeue/connection binding:
55 * --------------------------
56 *
57 * TCP/IP uses an IP classifier in conjunction with squeue where specific
58 * connections are assigned to specific squeue (based on various policies),
59 * at the connection creation time. Once assigned, the connection to
60 * squeue mapping is never changed and all future packets for that
61 * connection are processed on that squeue. The connection ("conn") to
62 * squeue mapping is stored in "conn_t" member "conn_sqp".
63 *
64 * Since the processing of the connection cuts across multiple layers
65 * but still allows packets for different connnection to be processed on
66 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
67 * "Per Connection Vertical Perimeter".
68 *
69 * Processing Model:
70 * -----------------
71 *
72 * Squeue doesn't necessary processes packets with its own worker thread.
73 * The callers can pick if they just want to queue the packet, process
74 * their packet if nothing is queued or drain and process. The first two
75 * modes are typically employed when the packet was generated while
76 * already doing the processing behind the squeue and last mode (drain
77 * and process) is typically employed when the thread is entering squeue
78 * for the first time. The squeue still imposes a finite time limit
79 * for which a external thread can do processing after which it switches
80 * processing to its own worker thread.
81 *
82 * Once created, squeues are never deleted. Hence squeue pointers are
83 * always valid. This means that functions outside the squeue can still
84 * refer safely to conn_sqp and their is no need for ref counts.
85 *
86 * Only a thread executing in the squeue can change the squeue of the
87 * connection. It does so by calling a squeue framework function to do this.
88 * After changing the squeue, the thread must leave the squeue. It must not
89 * continue to execute any code that needs squeue protection.
90 *
91 * The squeue framework, after entering the squeue, checks if the current
92 * squeue matches the conn_sqp. If the check fails, the packet is delivered
93 * to right squeue.
94 *
95 * Polling Model:
96 * --------------
97 *
98 * Squeues can control the rate of packet arrival into itself from the
99 * NIC or specific Rx ring within a NIC. As part of capability negotiation
100 * between IP and MAC layer, squeue are created for each TCP soft ring
101 * (or TCP Rx ring - to be implemented in future). As part of this
102 * negotiation, squeues get a cookie for underlying soft ring or Rx
103 * ring, a function to turn off incoming packets and a function to call
104 * to poll for packets. This helps schedule the receive side packet
105 * processing so that queue backlog doesn't build up and packet processing
106 * doesn't keep getting disturbed by high priority interrupts. As part
107 * of this mode, as soon as a backlog starts building, squeue turns off
108 * the interrupts and switches to poll mode. In poll mode, when poll
109 * thread goes down to retrieve packets, it retrieves them in the form of
110 * a chain which improves performance even more. As the squeue/softring
111 * system gets more packets, it gets more efficient by switching to
112 * polling more often and dealing with larger packet chains.
113 *
114 */
115
116 #include <sys/types.h>
117 #include <sys/cmn_err.h>
118 #include <sys/debug.h>
119 #include <sys/kmem.h>
120 #include <sys/cpuvar.h>
121 #include <sys/condvar_impl.h>
122 #include <sys/systm.h>
123 #include <sys/callb.h>
124 #include <sys/sdt.h>
125 #include <sys/ddi.h>
126 #include <sys/sunddi.h>
127 #include <sys/stack.h>
128 #include <sys/archsystm.h>
129
130 #include <inet/ipclassifier.h>
131 #include <inet/udp_impl.h>
132
133 #include <sys/squeue_impl.h>
134
135 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
136 static void squeue_worker(squeue_t *sqp);
137 static void squeue_polling_thread(squeue_t *sqp);
138 static void squeue_worker_wakeup(squeue_t *sqp);
139 static void squeue_try_drain_one(squeue_t *, conn_t *);
140
141 kmem_cache_t *squeue_cache;
142
143 #define SQUEUE_MSEC_TO_NSEC 1000000
144
145 int squeue_drain_ms = 20;
146
147 /* The values above converted to ticks or nano seconds */
148 static uint_t squeue_drain_ns = 0;
149
150 uintptr_t squeue_drain_stack_needed = 10240;
151 uint_t squeue_drain_stack_toodeep;
152
153 #define MAX_BYTES_TO_PICKUP 150000
154
155 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \
156 /* \
157 * Enqueue our mblk chain. \
158 */ \
159 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
160 \
161 if ((sqp)->sq_last != NULL) \
162 (sqp)->sq_last->b_next = (mp); \
163 else \
164 (sqp)->sq_first = (mp); \
165 (sqp)->sq_last = (tail); \
166 (sqp)->sq_count += (cnt); \
167 ASSERT((sqp)->sq_count > 0); \
168 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \
169 mblk_t *, mp, mblk_t *, tail, int, cnt); \
170 \
171 }
172
173 /*
174 * Blank the receive ring (in this case it is the soft ring). When
175 * blanked, the soft ring will not send any more packets up.
176 * Blanking may not succeed when there is a CPU already in the soft
177 * ring sending packets up. In that case, SQS_POLLING will not be
178 * set.
179 */
180 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \
181 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
182 if (sq_poll_capable) { \
183 ASSERT(rx_ring != NULL); \
184 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
185 if (!(sqp->sq_state & SQS_POLLING)) { \
186 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \
187 sqp->sq_state |= SQS_POLLING; \
188 } \
189 } \
190 }
191
192 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \
193 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
194 if (sq_poll_capable) { \
195 ASSERT(rx_ring != NULL); \
196 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
197 if (sqp->sq_state & SQS_POLLING) { \
198 sqp->sq_state &= ~SQS_POLLING; \
199 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
200 } \
201 } \
202 }
203
204 /* Wakeup poll thread only if SQS_POLLING is set */
205 #define SQS_POLL_RING(sqp) { \
206 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
207 if (sqp->sq_state & SQS_POLLING) { \
208 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
209 if (!(sqp->sq_state & SQS_GET_PKTS)) { \
210 sqp->sq_state |= SQS_GET_PKTS; \
211 cv_signal(&sqp->sq_poll_cv); \
212 } \
213 } \
214 }
215
216 #ifdef DEBUG
217 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \
218 (sqp)->sq_curmp = (mp); \
219 (sqp)->sq_curproc = (proc); \
220 (sqp)->sq_connp = (connp); \
221 (mp)->b_tag = (sqp)->sq_tag = (tag); \
222 }
223
224 #define SQUEUE_DBG_CLEAR(sqp) { \
225 (sqp)->sq_curmp = NULL; \
226 (sqp)->sq_curproc = NULL; \
227 (sqp)->sq_connp = NULL; \
228 }
229 #else
230 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
231 #define SQUEUE_DBG_CLEAR(sqp)
232 #endif
233
234 void
squeue_init(void)235 squeue_init(void)
236 {
237 squeue_cache = kmem_cache_create("squeue_cache",
238 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
239
240 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
241 }
242
243 squeue_t *
squeue_create(pri_t pri)244 squeue_create(pri_t pri)
245 {
246 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
247
248 bzero(sqp, sizeof (squeue_t));
249 sqp->sq_bind = PBIND_NONE;
250 sqp->sq_priority = pri;
251 sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
252 sqp, 0, &p0, TS_RUN, pri);
253
254 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
255 sqp, 0, &p0, TS_RUN, pri);
256
257 sqp->sq_enter = squeue_enter;
258 sqp->sq_drain = squeue_drain;
259
260 return (sqp);
261 }
262
263 /*
264 * Bind squeue worker thread to the specified CPU, given by CPU id.
265 * If the CPU id value is -1, bind the worker thread to the value
266 * specified in sq_bind field. If a thread is already bound to a
267 * different CPU, unbind it from the old CPU and bind to the new one.
268 */
269
270 void
squeue_bind(squeue_t * sqp,processorid_t bind)271 squeue_bind(squeue_t *sqp, processorid_t bind)
272 {
273 mutex_enter(&sqp->sq_lock);
274 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
275 ASSERT(MUTEX_HELD(&cpu_lock));
276
277 if (sqp->sq_state & SQS_BOUND) {
278 if (sqp->sq_bind == bind) {
279 mutex_exit(&sqp->sq_lock);
280 return;
281 }
282 thread_affinity_clear(sqp->sq_worker);
283 } else {
284 sqp->sq_state |= SQS_BOUND;
285 }
286
287 if (bind != PBIND_NONE)
288 sqp->sq_bind = bind;
289
290 thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
291 mutex_exit(&sqp->sq_lock);
292 }
293
294 void
squeue_unbind(squeue_t * sqp)295 squeue_unbind(squeue_t *sqp)
296 {
297 mutex_enter(&sqp->sq_lock);
298 if (!(sqp->sq_state & SQS_BOUND)) {
299 mutex_exit(&sqp->sq_lock);
300 return;
301 }
302
303 sqp->sq_state &= ~SQS_BOUND;
304 thread_affinity_clear(sqp->sq_worker);
305 mutex_exit(&sqp->sq_lock);
306 }
307
308 /*
309 * squeue_enter() - enter squeue sqp with mblk mp (which can be
310 * a chain), while tail points to the end and cnt in number of
311 * mblks in the chain.
312 *
313 * For a chain of single packet (i.e. mp == tail), go through the
314 * fast path if no one is processing the squeue and nothing is queued.
315 *
316 * The proc and arg for each mblk is already stored in the mblk in
317 * appropriate places.
318 *
319 * The process_flag specifies if we are allowed to process the mblk
320 * and drain in the entering thread context. If process_flag is
321 * SQ_FILL, then we just queue the mblk and return (after signaling
322 * the worker thread if no one else is processing the squeue).
323 *
324 * The ira argument can be used when the count is one.
325 * For a chain the caller needs to prepend any needed mblks from
326 * ip_recv_attr_to_mblk().
327 */
328 /* ARGSUSED */
329 void
squeue_enter(squeue_t * sqp,mblk_t * mp,mblk_t * tail,uint32_t cnt,ip_recv_attr_t * ira,int process_flag,uint8_t tag)330 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
331 ip_recv_attr_t *ira, int process_flag, uint8_t tag)
332 {
333 conn_t *connp;
334 sqproc_t proc;
335 hrtime_t now;
336
337 ASSERT(sqp != NULL);
338 ASSERT(mp != NULL);
339 ASSERT(tail != NULL);
340 ASSERT(cnt > 0);
341 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
342 ASSERT(ira == NULL || cnt == 1);
343
344 mutex_enter(&sqp->sq_lock);
345
346 /*
347 * Try to process the packet if SQ_FILL flag is not set and
348 * we are allowed to process the squeue. The SQ_NODRAIN is
349 * ignored if the packet chain consists of more than 1 packet.
350 */
351 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
352 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
353 /*
354 * See if anything is already queued. If we are the
355 * first packet, do inline processing else queue the
356 * packet and do the drain.
357 */
358 if (sqp->sq_first == NULL && cnt == 1) {
359 /*
360 * Fast-path, ok to process and nothing queued.
361 */
362 sqp->sq_state |= (SQS_PROC|SQS_FAST);
363 sqp->sq_run = curthread;
364 mutex_exit(&sqp->sq_lock);
365
366 /*
367 * We are the chain of 1 packet so
368 * go through this fast path.
369 */
370 ASSERT(mp->b_prev != NULL);
371 ASSERT(mp->b_queue != NULL);
372 connp = (conn_t *)mp->b_prev;
373 mp->b_prev = NULL;
374 proc = (sqproc_t)mp->b_queue;
375 mp->b_queue = NULL;
376 ASSERT(proc != NULL && connp != NULL);
377 ASSERT(mp->b_next == NULL);
378
379 /*
380 * Handle squeue switching. More details in the
381 * block comment at the top of the file
382 */
383 if (connp->conn_sqp == sqp) {
384 SQUEUE_DBG_SET(sqp, mp, proc, connp,
385 tag);
386 connp->conn_on_sqp = B_TRUE;
387 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
388 sqp, mblk_t *, mp, conn_t *, connp);
389 (*proc)(connp, mp, sqp, ira);
390 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
391 sqp, conn_t *, connp);
392 connp->conn_on_sqp = B_FALSE;
393 SQUEUE_DBG_CLEAR(sqp);
394 CONN_DEC_REF(connp);
395 } else {
396 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
397 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
398 }
399 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
400 mutex_enter(&sqp->sq_lock);
401 sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
402 sqp->sq_run = NULL;
403 if (sqp->sq_first == NULL ||
404 process_flag == SQ_NODRAIN) {
405 /*
406 * Even if SQ_NODRAIN was specified, it may
407 * still be best to process a single queued
408 * item if it matches the active connection.
409 */
410 if (sqp->sq_first != NULL) {
411 squeue_try_drain_one(sqp, connp);
412 }
413
414 /*
415 * If work or control actions are pending, wake
416 * up the worker thread.
417 */
418 if (sqp->sq_first != NULL ||
419 sqp->sq_state & SQS_WORKER_THR_CONTROL) {
420 squeue_worker_wakeup(sqp);
421 }
422 mutex_exit(&sqp->sq_lock);
423 return;
424 }
425 } else {
426 if (ira != NULL) {
427 mblk_t *attrmp;
428
429 ASSERT(cnt == 1);
430 attrmp = ip_recv_attr_to_mblk(ira);
431 if (attrmp == NULL) {
432 mutex_exit(&sqp->sq_lock);
433 ip_drop_input("squeue: "
434 "ip_recv_attr_to_mblk",
435 mp, NULL);
436 /* Caller already set b_prev/b_next */
437 mp->b_prev = mp->b_next = NULL;
438 freemsg(mp);
439 return;
440 }
441 ASSERT(attrmp->b_cont == NULL);
442 attrmp->b_cont = mp;
443 /* Move connp and func to new */
444 attrmp->b_queue = mp->b_queue;
445 mp->b_queue = NULL;
446 attrmp->b_prev = mp->b_prev;
447 mp->b_prev = NULL;
448
449 ASSERT(mp == tail);
450 tail = mp = attrmp;
451 }
452
453 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
454 #ifdef DEBUG
455 mp->b_tag = tag;
456 #endif
457 }
458 /*
459 * We are here because either we couldn't do inline
460 * processing (because something was already queued),
461 * or we had a chain of more than one packet,
462 * or something else arrived after we were done with
463 * inline processing.
464 */
465 ASSERT(MUTEX_HELD(&sqp->sq_lock));
466 ASSERT(sqp->sq_first != NULL);
467 now = gethrtime();
468 sqp->sq_run = curthread;
469 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
470
471 /*
472 * If we didn't do a complete drain, the worker
473 * thread was already signalled by squeue_drain.
474 * In case any control actions are pending, wake
475 * up the worker.
476 */
477 sqp->sq_run = NULL;
478 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
479 squeue_worker_wakeup(sqp);
480 }
481 } else {
482 /*
483 * We let a thread processing a squeue reenter only
484 * once. This helps the case of incoming connection
485 * where a SYN-ACK-ACK that triggers the conn_ind
486 * doesn't have to queue the packet if listener and
487 * eager are on the same squeue. Also helps the
488 * loopback connection where the two ends are bound
489 * to the same squeue (which is typical on single
490 * CPU machines).
491 *
492 * We let the thread reenter only once for the fear
493 * of stack getting blown with multiple traversal.
494 */
495 connp = (conn_t *)mp->b_prev;
496 if (!(sqp->sq_state & SQS_REENTER) &&
497 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
498 (sqp->sq_run == curthread) && (cnt == 1) &&
499 (connp->conn_on_sqp == B_FALSE)) {
500 sqp->sq_state |= SQS_REENTER;
501 mutex_exit(&sqp->sq_lock);
502
503 ASSERT(mp->b_prev != NULL);
504 ASSERT(mp->b_queue != NULL);
505
506 mp->b_prev = NULL;
507 proc = (sqproc_t)mp->b_queue;
508 mp->b_queue = NULL;
509
510 /*
511 * Handle squeue switching. More details in the
512 * block comment at the top of the file
513 */
514 if (connp->conn_sqp == sqp) {
515 connp->conn_on_sqp = B_TRUE;
516 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
517 sqp, mblk_t *, mp, conn_t *, connp);
518 (*proc)(connp, mp, sqp, ira);
519 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
520 sqp, conn_t *, connp);
521 connp->conn_on_sqp = B_FALSE;
522 CONN_DEC_REF(connp);
523 } else {
524 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
525 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
526 }
527
528 mutex_enter(&sqp->sq_lock);
529 sqp->sq_state &= ~SQS_REENTER;
530 mutex_exit(&sqp->sq_lock);
531 return;
532 }
533
534 /*
535 * Queue is already being processed or there is already
536 * one or more paquets on the queue. Enqueue the
537 * packet and wakeup the squeue worker thread if the
538 * squeue is not being processed.
539 */
540 #ifdef DEBUG
541 mp->b_tag = tag;
542 #endif
543 if (ira != NULL) {
544 mblk_t *attrmp;
545
546 ASSERT(cnt == 1);
547 attrmp = ip_recv_attr_to_mblk(ira);
548 if (attrmp == NULL) {
549 mutex_exit(&sqp->sq_lock);
550 ip_drop_input("squeue: ip_recv_attr_to_mblk",
551 mp, NULL);
552 /* Caller already set b_prev/b_next */
553 mp->b_prev = mp->b_next = NULL;
554 freemsg(mp);
555 return;
556 }
557 ASSERT(attrmp->b_cont == NULL);
558 attrmp->b_cont = mp;
559 /* Move connp and func to new */
560 attrmp->b_queue = mp->b_queue;
561 mp->b_queue = NULL;
562 attrmp->b_prev = mp->b_prev;
563 mp->b_prev = NULL;
564
565 ASSERT(mp == tail);
566 tail = mp = attrmp;
567 }
568 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
569 /*
570 * If the worker isn't running or control actions are pending,
571 * wake it it up now.
572 */
573 if ((sqp->sq_state & SQS_PROC) == 0 ||
574 (sqp->sq_state & SQS_WORKER_THR_CONTROL) != 0) {
575 squeue_worker_wakeup(sqp);
576 }
577 }
578 mutex_exit(&sqp->sq_lock);
579 }
580
581 /*
582 * PRIVATE FUNCTIONS
583 */
584
585
586 /*
587 * Wake up worker thread for squeue to process queued work.
588 */
589 static void
squeue_worker_wakeup(squeue_t * sqp)590 squeue_worker_wakeup(squeue_t *sqp)
591 {
592 ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
593
594 cv_signal(&sqp->sq_worker_cv);
595 sqp->sq_awoken = gethrtime();
596 }
597
598 static void
squeue_drain(squeue_t * sqp,uint_t proc_type,hrtime_t expire)599 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
600 {
601 mblk_t *mp;
602 mblk_t *head;
603 sqproc_t proc;
604 conn_t *connp;
605 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring;
606 hrtime_t now;
607 boolean_t sq_poll_capable;
608 ip_recv_attr_t *ira, iras;
609
610 /*
611 * Before doing any work, check our stack depth; if we're not a
612 * worker thread for this squeue and we're beginning to get tight on
613 * on stack, kick the worker, bump a counter and return.
614 */
615 if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() -
616 (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) {
617 ASSERT(mutex_owned(&sqp->sq_lock));
618 squeue_worker_wakeup(sqp);
619 squeue_drain_stack_toodeep++;
620 return;
621 }
622
623 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
624 again:
625 ASSERT(mutex_owned(&sqp->sq_lock));
626 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
627 SQS_POLL_QUIESCE_DONE)));
628
629 head = sqp->sq_first;
630 sqp->sq_first = NULL;
631 sqp->sq_last = NULL;
632 sqp->sq_count = 0;
633
634 sqp->sq_state |= SQS_PROC | proc_type;
635
636 /*
637 * We have backlog built up. Switch to polling mode if the
638 * device underneath allows it. Need to do it so that
639 * more packets don't come in and disturb us (by contending
640 * for sq_lock or higher priority thread preempting us).
641 *
642 * The worker thread is allowed to do active polling while we
643 * just disable the interrupts for drain by non worker (kernel
644 * or userland) threads so they can peacefully process the
645 * packets during time allocated to them.
646 */
647 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
648 mutex_exit(&sqp->sq_lock);
649
650 while ((mp = head) != NULL) {
651
652 head = mp->b_next;
653 mp->b_next = NULL;
654
655 proc = (sqproc_t)mp->b_queue;
656 mp->b_queue = NULL;
657 connp = (conn_t *)mp->b_prev;
658 mp->b_prev = NULL;
659
660 /* Is there an ip_recv_attr_t to handle? */
661 if (ip_recv_attr_is_mblk(mp)) {
662 mblk_t *attrmp = mp;
663
664 ASSERT(attrmp->b_cont != NULL);
665
666 mp = attrmp->b_cont;
667 attrmp->b_cont = NULL;
668 ASSERT(mp->b_queue == NULL);
669 ASSERT(mp->b_prev == NULL);
670
671 if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
672 /* The ill or ip_stack_t disappeared on us */
673 ip_drop_input("ip_recv_attr_from_mblk",
674 mp, NULL);
675 ira_cleanup(&iras, B_TRUE);
676 CONN_DEC_REF(connp);
677 continue;
678 }
679 ira = &iras;
680 } else {
681 ira = NULL;
682 }
683
684
685 /*
686 * Handle squeue switching. More details in the
687 * block comment at the top of the file
688 */
689 if (connp->conn_sqp == sqp) {
690 SQUEUE_DBG_SET(sqp, mp, proc, connp,
691 mp->b_tag);
692 connp->conn_on_sqp = B_TRUE;
693 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
694 sqp, mblk_t *, mp, conn_t *, connp);
695 (*proc)(connp, mp, sqp, ira);
696 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
697 sqp, conn_t *, connp);
698 connp->conn_on_sqp = B_FALSE;
699 CONN_DEC_REF(connp);
700 } else {
701 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira,
702 SQ_FILL, SQTAG_SQUEUE_CHANGE);
703 }
704 if (ira != NULL)
705 ira_cleanup(ira, B_TRUE);
706 }
707
708 SQUEUE_DBG_CLEAR(sqp);
709
710 mutex_enter(&sqp->sq_lock);
711
712 /*
713 * Check if there is still work to do (either more arrived or timer
714 * expired). If we are the worker thread and we are polling capable,
715 * continue doing the work since no one else is around to do the
716 * work anyway (but signal the poll thread to retrieve some packets
717 * in the meanwhile). If we are not the worker thread, just
718 * signal the worker thread to take up the work if processing time
719 * has expired.
720 */
721 if (sqp->sq_first != NULL) {
722 /*
723 * Still more to process. If time quanta not expired, we
724 * should let the drain go on. The worker thread is allowed
725 * to drain as long as there is anything left.
726 */
727 now = gethrtime();
728 if ((now < expire) || (proc_type == SQS_WORKER)) {
729 /*
730 * If time not expired or we are worker thread and
731 * this squeue is polling capable, continue to do
732 * the drain.
733 *
734 * We turn off interrupts for all userland threads
735 * doing drain but we do active polling only for
736 * worker thread.
737 *
738 * Calling SQS_POLL_RING() even in the case of
739 * SQS_POLLING_ON() not succeeding is ok as
740 * SQS_POLL_RING() will not wake up poll thread
741 * if SQS_POLLING bit is not set.
742 */
743 if (proc_type == SQS_WORKER)
744 SQS_POLL_RING(sqp);
745 goto again;
746 }
747
748 squeue_worker_wakeup(sqp);
749 }
750
751 /*
752 * If the poll thread is already running, just return. The
753 * poll thread continues to hold the proc and will finish
754 * processing.
755 */
756 if (sqp->sq_state & SQS_GET_PKTS) {
757 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
758 SQS_POLL_QUIESCE_DONE)));
759 sqp->sq_state &= ~proc_type;
760 return;
761 }
762
763 /*
764 *
765 * If we are the worker thread and no work is left, send the poll
766 * thread down once more to see if something arrived. Otherwise,
767 * turn the interrupts back on and we are done.
768 */
769 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) {
770 /*
771 * Do one last check to see if anything arrived
772 * in the NIC. We leave the SQS_PROC set to ensure
773 * that poll thread keeps the PROC and can decide
774 * if it needs to turn polling off or continue
775 * processing.
776 *
777 * If we drop the SQS_PROC here and poll thread comes
778 * up empty handed, it can not safely turn polling off
779 * since someone else could have acquired the PROC
780 * and started draining. The previously running poll
781 * thread and the current thread doing drain would end
782 * up in a race for turning polling on/off and more
783 * complex code would be required to deal with it.
784 *
785 * Its lot simpler for drain to hand the SQS_PROC to
786 * poll thread (if running) and let poll thread finish
787 * without worrying about racing with any other thread.
788 */
789 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
790 SQS_POLL_QUIESCE_DONE)));
791 SQS_POLL_RING(sqp);
792 sqp->sq_state &= ~proc_type;
793 } else {
794 /*
795 * The squeue is either not capable of polling or the
796 * attempt to blank (i.e., turn SQS_POLLING_ON()) was
797 * unsuccessful or poll thread already finished
798 * processing and didn't find anything. Since there
799 * is nothing queued and we already turn polling on
800 * (for all threads doing drain), we should turn
801 * polling off and relinquish the PROC.
802 */
803 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
804 SQS_POLL_QUIESCE_DONE)));
805 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
806 sqp->sq_state &= ~(SQS_PROC | proc_type);
807 /*
808 * If we are not the worker and there is a pending quiesce
809 * event, wake up the worker
810 */
811 if ((proc_type != SQS_WORKER) &&
812 (sqp->sq_state & SQS_WORKER_THR_CONTROL)) {
813 squeue_worker_wakeup(sqp);
814 }
815 }
816 }
817
818 /*
819 * Quiesce, Restart, or Cleanup of the squeue poll thread.
820 *
821 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
822 * not attempt to poll the underlying soft ring any more. The quiesce is
823 * triggered by the mac layer when it wants to quiesce a soft ring. Typically
824 * control operations such as changing the fanout of a NIC or VNIC (dladm
825 * setlinkprop) need to quiesce data flow before changing the wiring.
826 * The operation is done by the mac layer, but it calls back into IP to
827 * quiesce the soft ring. After completing the operation (say increase or
828 * decrease of the fanout) the mac layer then calls back into IP to restart
829 * the quiesced soft ring.
830 *
831 * Cleanup: This is triggered when the squeue binding to a soft ring is
832 * removed permanently. Typically interface plumb and unplumb would trigger
833 * this. It can also be triggered from the mac layer when a soft ring is
834 * being deleted say as the result of a fanout reduction. Since squeues are
835 * never deleted, the cleanup marks the squeue as fit for recycling and
836 * moves it to the zeroth squeue set.
837 */
838 static void
squeue_poll_thr_control(squeue_t * sqp)839 squeue_poll_thr_control(squeue_t *sqp)
840 {
841 if (sqp->sq_state & SQS_POLL_THR_RESTART) {
842 /* Restart implies a previous quiesce */
843 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
844 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
845 SQS_POLL_THR_RESTART);
846 sqp->sq_state |= SQS_POLL_CAPAB;
847 cv_signal(&sqp->sq_worker_cv);
848 return;
849 }
850
851 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
852 sqp->sq_state |= SQS_POLL_THR_QUIESCED;
853 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
854 cv_signal(&sqp->sq_worker_cv);
855 return;
856 }
857 }
858
859 /*
860 * POLLING Notes
861 *
862 * With polling mode, we want to do as much processing as we possibly can
863 * in worker thread context. The sweet spot is worker thread keeps doing
864 * work all the time in polling mode and writers etc. keep dumping packets
865 * to worker thread. Occassionally, we send the poll thread (running at
866 * lower priority to NIC to get the chain of packets to feed to worker).
867 * Sending the poll thread down to NIC is dependant on 3 criterions
868 *
869 * 1) Its always driven from squeue_drain and only if worker thread is
870 * doing the drain.
871 * 2) We clear the backlog once and more packets arrived in between.
872 * Before starting drain again, send the poll thread down if
873 * the drain is being done by worker thread.
874 * 3) Before exiting the squeue_drain, if the poll thread is not already
875 * working and we are the worker thread, try to poll one more time.
876 *
877 * For latency sake, we do allow any thread calling squeue_enter
878 * to process its packet provided:
879 *
880 * 1) Nothing is queued
881 * 2) If more packets arrived in between, the non worker thread are allowed
882 * to do the drain till their time quanta expired provided SQS_GET_PKTS
883 * wasn't set in between.
884 *
885 * Avoiding deadlocks with interrupts
886 * ==================================
887 *
888 * One of the big problem is that we can't send poll_thr down while holding
889 * the sq_lock since the thread can block. So we drop the sq_lock before
890 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
891 * poll thread is running so that no other thread can acquire the
892 * perimeter in between. If the squeue_drain gets done (no more work
893 * left), it leaves the SQS_PROC set if poll thread is running.
894 */
895
896 /*
897 * This is the squeue poll thread. In poll mode, it polls the underlying
898 * TCP softring and feeds packets into the squeue. The worker thread then
899 * drains the squeue. The poll thread also responds to control signals for
900 * quiesceing, restarting, or cleanup of an squeue. These are driven by
901 * control operations like plumb/unplumb or as a result of dynamic Rx ring
902 * related operations that are driven from the mac layer.
903 */
904 static void
squeue_polling_thread(squeue_t * sqp)905 squeue_polling_thread(squeue_t *sqp)
906 {
907 kmutex_t *lock = &sqp->sq_lock;
908 kcondvar_t *async = &sqp->sq_poll_cv;
909 ip_mac_rx_t sq_get_pkts;
910 ip_accept_t ip_accept;
911 ill_rx_ring_t *sq_rx_ring;
912 ill_t *sq_ill;
913 mblk_t *head, *tail, *mp;
914 uint_t cnt;
915 void *sq_mac_handle;
916 callb_cpr_t cprinfo;
917 size_t bytes_to_pickup;
918 uint32_t ctl_state;
919
920 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
921 mutex_enter(lock);
922
923 for (;;) {
924 CALLB_CPR_SAFE_BEGIN(&cprinfo);
925 cv_wait(async, lock);
926 CALLB_CPR_SAFE_END(&cprinfo, lock);
927
928 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
929 SQS_POLL_THR_QUIESCED);
930 if (ctl_state != 0) {
931 /*
932 * If the squeue is quiesced, then wait for a control
933 * request. A quiesced squeue must not poll the
934 * underlying soft ring.
935 */
936 if (ctl_state == SQS_POLL_THR_QUIESCED)
937 continue;
938 /*
939 * Act on control requests to quiesce, cleanup or
940 * restart an squeue
941 */
942 squeue_poll_thr_control(sqp);
943 continue;
944 }
945
946 if (!(sqp->sq_state & SQS_POLL_CAPAB))
947 continue;
948
949 ASSERT((sqp->sq_state &
950 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
951 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
952
953 poll_again:
954 sq_rx_ring = sqp->sq_rx_ring;
955 sq_get_pkts = sq_rx_ring->rr_rx;
956 sq_mac_handle = sq_rx_ring->rr_rx_handle;
957 ip_accept = sq_rx_ring->rr_ip_accept;
958 sq_ill = sq_rx_ring->rr_ill;
959 bytes_to_pickup = MAX_BYTES_TO_PICKUP;
960 mutex_exit(lock);
961 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
962 mp = NULL;
963 if (head != NULL) {
964 /*
965 * We got the packet chain from the mac layer. It
966 * would be nice to be able to process it inline
967 * for better performance but we need to give
968 * IP a chance to look at this chain to ensure
969 * that packets are really meant for this squeue
970 * and do the IP processing.
971 */
972 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
973 &tail, &cnt);
974 }
975 mutex_enter(lock);
976 if (mp != NULL) {
977 /*
978 * The ip_accept function has already added an
979 * ip_recv_attr_t mblk if that is needed.
980 */
981 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
982 }
983 ASSERT((sqp->sq_state &
984 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
985 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
986
987 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
988 /*
989 * We have packets to process and worker thread
990 * is not running. Check to see if poll thread is
991 * allowed to process. Let it do processing only if it
992 * picked up some packets from the NIC otherwise
993 * wakeup the worker thread.
994 */
995 if (mp != NULL) {
996 hrtime_t now;
997
998 now = gethrtime();
999 sqp->sq_run = curthread;
1000 sqp->sq_drain(sqp, SQS_POLL_PROC, now +
1001 squeue_drain_ns);
1002 sqp->sq_run = NULL;
1003
1004 if (sqp->sq_first == NULL)
1005 goto poll_again;
1006
1007 /*
1008 * Couldn't do the entire drain because the
1009 * time limit expired, let the
1010 * worker thread take over.
1011 */
1012 }
1013
1014 /*
1015 * Put the SQS_PROC_HELD on so the worker
1016 * thread can distinguish where its called from. We
1017 * can remove the SQS_PROC flag here and turn off the
1018 * polling so that it wouldn't matter who gets the
1019 * processing but we get better performance this way
1020 * and save the cost of turn polling off and possibly
1021 * on again as soon as we start draining again.
1022 *
1023 * We can't remove the SQS_PROC flag without turning
1024 * polling off until we can guarantee that control
1025 * will return to squeue_drain immediately.
1026 */
1027 sqp->sq_state |= SQS_PROC_HELD;
1028 sqp->sq_state &= ~SQS_GET_PKTS;
1029 squeue_worker_wakeup(sqp);
1030 } else if (sqp->sq_first == NULL &&
1031 !(sqp->sq_state & SQS_WORKER)) {
1032 /*
1033 * Nothing queued and worker thread not running.
1034 * Since we hold the proc, no other thread is
1035 * processing the squeue. This means that there
1036 * is no work to be done and nothing is queued
1037 * in squeue or in NIC. Turn polling off and go
1038 * back to interrupt mode.
1039 */
1040 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
1041 /* LINTED: constant in conditional context */
1042 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
1043
1044 /*
1045 * If there is a pending control operation
1046 * wake up the worker, since it is currently
1047 * not running.
1048 */
1049 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1050 squeue_worker_wakeup(sqp);
1051 }
1052 } else {
1053 /*
1054 * Worker thread is already running. We don't need
1055 * to do anything. Indicate that poll thread is done.
1056 */
1057 sqp->sq_state &= ~SQS_GET_PKTS;
1058 }
1059 if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
1060 /*
1061 * Act on control requests to quiesce, cleanup or
1062 * restart an squeue
1063 */
1064 squeue_poll_thr_control(sqp);
1065 }
1066 }
1067 }
1068
1069 /*
1070 * The squeue worker thread acts on any control requests to quiesce, cleanup
1071 * or restart an ill_rx_ring_t by calling this function. The worker thread
1072 * synchronizes with the squeue poll thread to complete the request and finally
1073 * wakes up the requestor when the request is completed.
1074 */
1075 static void
squeue_worker_thr_control(squeue_t * sqp)1076 squeue_worker_thr_control(squeue_t *sqp)
1077 {
1078 ill_t *ill;
1079 ill_rx_ring_t *rx_ring;
1080
1081 ASSERT(MUTEX_HELD(&sqp->sq_lock));
1082
1083 if (sqp->sq_state & SQS_POLL_RESTART) {
1084 /* Restart implies a previous quiesce. */
1085 ASSERT((sqp->sq_state & (SQS_PROC_HELD |
1086 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
1087 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
1088 /*
1089 * Request the squeue poll thread to restart and wait till
1090 * it actually restarts.
1091 */
1092 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
1093 sqp->sq_state |= SQS_POLL_THR_RESTART;
1094 cv_signal(&sqp->sq_poll_cv);
1095 while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
1096 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1097 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
1098 SQS_WORKER);
1099 /*
1100 * Signal any waiter that is waiting for the restart
1101 * to complete
1102 */
1103 sqp->sq_state |= SQS_POLL_RESTART_DONE;
1104 cv_signal(&sqp->sq_ctrlop_done_cv);
1105 return;
1106 }
1107
1108 if (sqp->sq_state & SQS_PROC_HELD) {
1109 /* The squeue poll thread handed control to us */
1110 ASSERT(sqp->sq_state & SQS_PROC);
1111 }
1112
1113 /*
1114 * Prevent any other thread from processing the squeue
1115 * until we finish the control actions by setting SQS_PROC.
1116 * But allow ourself to reenter by setting SQS_WORKER
1117 */
1118 sqp->sq_state |= (SQS_PROC | SQS_WORKER);
1119
1120 /* Signal the squeue poll thread and wait for it to quiesce itself */
1121 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
1122 sqp->sq_state |= SQS_POLL_THR_QUIESCE;
1123 cv_signal(&sqp->sq_poll_cv);
1124 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
1125 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1126 }
1127
1128 rx_ring = sqp->sq_rx_ring;
1129 ill = rx_ring->rr_ill;
1130 /*
1131 * The lock hierarchy is as follows.
1132 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1133 */
1134 mutex_exit(&sqp->sq_lock);
1135 mutex_enter(&ill->ill_lock);
1136 mutex_enter(&sqp->sq_lock);
1137
1138 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
1139 sqp->sq_rx_ring);
1140 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
1141 if (sqp->sq_state & SQS_POLL_CLEANUP) {
1142 /*
1143 * Disassociate this squeue from its ill_rx_ring_t.
1144 * The rr_sqp, sq_rx_ring fields are protected by the
1145 * corresponding squeue, ill_lock* and sq_lock. Holding any
1146 * of them will ensure that the ring to squeue mapping does
1147 * not change.
1148 */
1149 ASSERT(!(sqp->sq_state & SQS_DEFAULT));
1150
1151 sqp->sq_rx_ring = NULL;
1152 rx_ring->rr_sqp = NULL;
1153
1154 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
1155 SQS_POLL_QUIESCE_DONE);
1156 sqp->sq_ill = NULL;
1157
1158 rx_ring->rr_rx_handle = NULL;
1159 rx_ring->rr_intr_handle = NULL;
1160 rx_ring->rr_intr_enable = NULL;
1161 rx_ring->rr_intr_disable = NULL;
1162 sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
1163 } else {
1164 sqp->sq_state &= ~SQS_POLL_QUIESCE;
1165 sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
1166 }
1167 /*
1168 * Signal any waiter that is waiting for the quiesce or cleanup
1169 * to complete and also wait for it to actually see and reset the
1170 * SQS_POLL_CLEANUP_DONE.
1171 */
1172 cv_signal(&sqp->sq_ctrlop_done_cv);
1173 mutex_exit(&ill->ill_lock);
1174 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
1175 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1176 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
1177 }
1178 }
1179
1180 static void
squeue_worker(squeue_t * sqp)1181 squeue_worker(squeue_t *sqp)
1182 {
1183 kmutex_t *lock = &sqp->sq_lock;
1184 kcondvar_t *async = &sqp->sq_worker_cv;
1185 callb_cpr_t cprinfo;
1186 hrtime_t now;
1187
1188 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
1189 mutex_enter(lock);
1190
1191 for (;;) {
1192 for (;;) {
1193 /*
1194 * If the poll thread has handed control to us
1195 * we need to break out of the wait.
1196 */
1197 if (sqp->sq_state & SQS_PROC_HELD)
1198 break;
1199
1200 /*
1201 * If the squeue is not being processed and we either
1202 * have messages to drain or some thread has signaled
1203 * some control activity we need to break
1204 */
1205 if (!(sqp->sq_state & SQS_PROC) &&
1206 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
1207 (sqp->sq_first != NULL)))
1208 break;
1209
1210 /*
1211 * If we have started some control action, then check
1212 * for the SQS_WORKER flag (since we don't
1213 * release the squeue) to make sure we own the squeue
1214 * and break out
1215 */
1216 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
1217 (sqp->sq_state & SQS_WORKER))
1218 break;
1219
1220 CALLB_CPR_SAFE_BEGIN(&cprinfo);
1221 cv_wait(async, lock);
1222 CALLB_CPR_SAFE_END(&cprinfo, lock);
1223 }
1224 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1225 squeue_worker_thr_control(sqp);
1226 continue;
1227 }
1228 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
1229 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
1230 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
1231
1232 if (sqp->sq_state & SQS_PROC_HELD)
1233 sqp->sq_state &= ~SQS_PROC_HELD;
1234
1235 now = gethrtime();
1236 sqp->sq_run = curthread;
1237 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns);
1238 sqp->sq_run = NULL;
1239 }
1240 }
1241
1242 uintptr_t *
squeue_getprivate(squeue_t * sqp,sqprivate_t p)1243 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1244 {
1245 ASSERT(p < SQPRIVATE_MAX);
1246
1247 return (&sqp->sq_private[p]);
1248 }
1249
1250 /* ARGSUSED */
1251 void
squeue_wakeup_conn(void * arg,mblk_t * mp,void * arg2,ip_recv_attr_t * dummy)1252 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy)
1253 {
1254 conn_t *connp = (conn_t *)arg;
1255 squeue_t *sqp = connp->conn_sqp;
1256
1257 /*
1258 * Mark the squeue as paused before waking up the thread stuck
1259 * in squeue_synch_enter().
1260 */
1261 mutex_enter(&sqp->sq_lock);
1262 sqp->sq_state |= SQS_PAUSE;
1263
1264 /*
1265 * Notify the thread that it's OK to proceed; that is done by
1266 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1267 */
1268 ASSERT(mp->b_flag & MSGWAITSYNC);
1269 mp->b_flag &= ~MSGWAITSYNC;
1270 cv_broadcast(&connp->conn_sq_cv);
1271
1272 /*
1273 * We are doing something on behalf of another thread, so we have to
1274 * pause and wait until it finishes.
1275 */
1276 while (sqp->sq_state & SQS_PAUSE) {
1277 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock);
1278 }
1279 mutex_exit(&sqp->sq_lock);
1280 }
1281
1282 int
squeue_synch_enter(conn_t * connp,mblk_t * use_mp)1283 squeue_synch_enter(conn_t *connp, mblk_t *use_mp)
1284 {
1285 squeue_t *sqp;
1286
1287 again:
1288 sqp = connp->conn_sqp;
1289
1290 mutex_enter(&sqp->sq_lock);
1291 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
1292 /*
1293 * We are OK to proceed if the squeue is empty, and
1294 * no one owns the squeue.
1295 *
1296 * The caller won't own the squeue as this is called from the
1297 * application.
1298 */
1299 ASSERT(sqp->sq_run == NULL);
1300
1301 sqp->sq_state |= SQS_PROC;
1302 sqp->sq_run = curthread;
1303 mutex_exit(&sqp->sq_lock);
1304
1305 /*
1306 * Handle squeue switching. The conn's squeue can only change
1307 * while there is a thread in the squeue, which is why we do
1308 * the check after entering the squeue. If it has changed, exit
1309 * this squeue and redo everything with the new sqeueue.
1310 */
1311 if (sqp != connp->conn_sqp) {
1312 mutex_enter(&sqp->sq_lock);
1313 sqp->sq_state &= ~SQS_PROC;
1314 sqp->sq_run = NULL;
1315 mutex_exit(&sqp->sq_lock);
1316 goto again;
1317 }
1318 #if SQUEUE_DEBUG
1319 sqp->sq_curmp = NULL;
1320 sqp->sq_curproc = NULL;
1321 sqp->sq_connp = connp;
1322 #endif
1323 connp->conn_on_sqp = B_TRUE;
1324 return (0);
1325 } else {
1326 mblk_t *mp;
1327
1328 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp;
1329 if (mp == NULL) {
1330 mutex_exit(&sqp->sq_lock);
1331 return (ENOMEM);
1332 }
1333
1334 /*
1335 * We mark the mblk as awaiting synchronous squeue access
1336 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1337 * fires, MSGWAITSYNC is cleared, at which point we know we
1338 * have exclusive access.
1339 */
1340 mp->b_flag |= MSGWAITSYNC;
1341
1342 CONN_INC_REF(connp);
1343 SET_SQUEUE(mp, squeue_wakeup_conn, connp);
1344 ENQUEUE_CHAIN(sqp, mp, mp, 1);
1345
1346 ASSERT(sqp->sq_run != curthread);
1347
1348 /* Wait until the enqueued mblk get processed. */
1349 while (mp->b_flag & MSGWAITSYNC)
1350 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock);
1351 mutex_exit(&sqp->sq_lock);
1352
1353 if (use_mp == NULL)
1354 freeb(mp);
1355
1356 return (0);
1357 }
1358 }
1359
1360 /*
1361 * If possible, attempt to immediately process a single queued request, should
1362 * it match the supplied conn_t reference. This is primarily intended to elide
1363 * squeue worker thread wake-ups during local TCP connect() or close()
1364 * operations where the response is placed on the squeue during processing.
1365 */
1366 static void
squeue_try_drain_one(squeue_t * sqp,conn_t * compare_conn)1367 squeue_try_drain_one(squeue_t *sqp, conn_t *compare_conn)
1368 {
1369 mblk_t *next, *mp = sqp->sq_first;
1370 conn_t *connp;
1371 sqproc_t proc = (sqproc_t)mp->b_queue;
1372 ip_recv_attr_t iras, *ira = NULL;
1373
1374 ASSERT(MUTEX_HELD(&sqp->sq_lock));
1375 ASSERT((sqp->sq_state & SQS_PROC) == 0);
1376 ASSERT(sqp->sq_run == NULL);
1377 VERIFY(mp != NULL);
1378
1379 /*
1380 * There is no guarantee that compare_conn references a valid object at
1381 * this time, so under no circumstance may it be deferenced unless it
1382 * matches the squeue entry.
1383 */
1384 connp = (conn_t *)mp->b_prev;
1385 if (connp != compare_conn) {
1386 return;
1387 }
1388
1389 next = mp->b_next;
1390 proc = (sqproc_t)mp->b_queue;
1391
1392 ASSERT(proc != NULL);
1393 ASSERT(sqp->sq_count > 0);
1394
1395 /* Dequeue item from squeue */
1396 if (next == NULL) {
1397 sqp->sq_first = NULL;
1398 sqp->sq_last = NULL;
1399 } else {
1400 sqp->sq_first = next;
1401 }
1402 sqp->sq_count--;
1403
1404 sqp->sq_state |= SQS_PROC;
1405 sqp->sq_run = curthread;
1406 mutex_exit(&sqp->sq_lock);
1407
1408 /* Prep mblk_t and retrieve ira if needed */
1409 mp->b_prev = NULL;
1410 mp->b_queue = NULL;
1411 mp->b_next = NULL;
1412 if (ip_recv_attr_is_mblk(mp)) {
1413 mblk_t *attrmp = mp;
1414
1415 ASSERT(attrmp->b_cont != NULL);
1416
1417 mp = attrmp->b_cont;
1418 attrmp->b_cont = NULL;
1419
1420 ASSERT(mp->b_queue == NULL);
1421 ASSERT(mp->b_prev == NULL);
1422
1423 if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
1424 /* ill_t or ip_stack_t disappeared */
1425 ip_drop_input("ip_recv_attr_from_mblk", mp, NULL);
1426 ira_cleanup(&iras, B_TRUE);
1427 CONN_DEC_REF(connp);
1428 goto done;
1429 }
1430 ira = &iras;
1431 }
1432
1433 SQUEUE_DBG_SET(sqp, mp, proc, connp, mp->b_tag);
1434 connp->conn_on_sqp = B_TRUE;
1435 DTRACE_PROBE3(squeue__proc__start, squeue_t *, sqp, mblk_t *, mp,
1436 conn_t *, connp);
1437 (*proc)(connp, mp, sqp, ira);
1438 DTRACE_PROBE2(squeue__proc__end, squeue_t *, sqp, conn_t *, connp);
1439 connp->conn_on_sqp = B_FALSE;
1440 CONN_DEC_REF(connp);
1441 SQUEUE_DBG_CLEAR(sqp);
1442
1443 if (ira != NULL)
1444 ira_cleanup(ira, B_TRUE);
1445
1446 done:
1447 mutex_enter(&sqp->sq_lock);
1448 sqp->sq_state &= ~(SQS_PROC);
1449 sqp->sq_run = NULL;
1450 }
1451
1452 void
squeue_synch_exit(conn_t * connp,int flag)1453 squeue_synch_exit(conn_t *connp, int flag)
1454 {
1455 squeue_t *sqp = connp->conn_sqp;
1456
1457 ASSERT(flag == SQ_NODRAIN || flag == SQ_PROCESS);
1458
1459 mutex_enter(&sqp->sq_lock);
1460 if (sqp->sq_run != curthread) {
1461 /*
1462 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1463 * and wake up the squeue owner, such that owner can continue
1464 * processing.
1465 */
1466 ASSERT(sqp->sq_state & SQS_PAUSE);
1467 sqp->sq_state &= ~SQS_PAUSE;
1468
1469 /* There should be only one thread blocking on sq_synch_cv. */
1470 cv_signal(&sqp->sq_synch_cv);
1471 mutex_exit(&sqp->sq_lock);
1472 return;
1473 }
1474
1475 ASSERT(sqp->sq_state & SQS_PROC);
1476
1477 sqp->sq_state &= ~SQS_PROC;
1478 sqp->sq_run = NULL;
1479 connp->conn_on_sqp = B_FALSE;
1480
1481 /* If the caller opted in, attempt to process the head squeue item. */
1482 if (flag == SQ_PROCESS && sqp->sq_first != NULL) {
1483 squeue_try_drain_one(sqp, connp);
1484 }
1485
1486 /* Wake up the worker if further requests are pending. */
1487 if (sqp->sq_first != NULL) {
1488 squeue_worker_wakeup(sqp);
1489 }
1490 mutex_exit(&sqp->sq_lock);
1491 }
1492