xref: /freebsd/sys/net/netisr.c (revision 38d947b53cbca845926bdd91272ce1c65ba59ecb)
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause
3  *
4  * Copyright (c) 2007-2009 Robert N. M. Watson
5  * Copyright (c) 2010-2011 Juniper Networks, Inc.
6  * All rights reserved.
7  *
8  * This software was developed by Robert N. M. Watson under contract
9  * to Juniper Networks, Inc.
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions
13  * are met:
14  * 1. Redistributions of source code must retain the above copyright
15  *    notice, this list of conditions and the following disclaimer.
16  * 2. Redistributions in binary form must reproduce the above copyright
17  *    notice, this list of conditions and the following disclaimer in the
18  *    documentation and/or other materials provided with the distribution.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
21  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
24  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
26  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
27  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
29  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30  * SUCH DAMAGE.
31  */
32 
33 #include <sys/cdefs.h>
34 /*
35  * netisr is a packet dispatch service, allowing synchronous (directly
36  * dispatched) and asynchronous (deferred dispatch) processing of packets by
37  * registered protocol handlers.  Callers pass a protocol identifier and
38  * packet to netisr, along with a direct dispatch hint, and work will either
39  * be immediately processed by the registered handler, or passed to a
40  * software interrupt (SWI) thread for deferred dispatch.  Callers will
41  * generally select one or the other based on:
42  *
43  * - Whether directly dispatching a netisr handler lead to code reentrance or
44  *   lock recursion, such as entering the socket code from the socket code.
45  * - Whether directly dispatching a netisr handler lead to recursive
46  *   processing, such as when decapsulating several wrapped layers of tunnel
47  *   information (IPSEC within IPSEC within ...).
48  *
49  * Maintaining ordering for protocol streams is a critical design concern.
50  * Enforcing ordering limits the opportunity for concurrency, but maintains
51  * the strong ordering requirements found in some protocols, such as TCP.  Of
52  * related concern is CPU affinity--it is desirable to process all data
53  * associated with a particular stream on the same CPU over time in order to
54  * avoid acquiring locks associated with the connection on different CPUs,
55  * keep connection data in one cache, and to generally encourage associated
56  * user threads to live on the same CPU as the stream.  It's also desirable
57  * to avoid lock migration and contention where locks are associated with
58  * more than one flow.
59  *
60  * netisr supports several policy variations, represented by the
61  * NETISR_POLICY_* constants, allowing protocols to play various roles in
62  * identifying flows, assigning work to CPUs, etc.  These are described in
63  * netisr.h.
64  */
65 
66 #include "opt_ddb.h"
67 #include "opt_device_polling.h"
68 
69 #include <sys/param.h>
70 #include <sys/bus.h>
71 #include <sys/kernel.h>
72 #include <sys/kthread.h>
73 #include <sys/malloc.h>
74 #include <sys/interrupt.h>
75 #include <sys/lock.h>
76 #include <sys/mbuf.h>
77 #include <sys/mutex.h>
78 #include <sys/pcpu.h>
79 #include <sys/proc.h>
80 #include <sys/rmlock.h>
81 #include <sys/sched.h>
82 #include <sys/smp.h>
83 #include <sys/socket.h>
84 #include <sys/sysctl.h>
85 #include <sys/systm.h>
86 
87 #ifdef DDB
88 #include <ddb/ddb.h>
89 #endif
90 
91 #define	_WANT_NETISR_INTERNAL	/* Enable definitions from netisr_internal.h */
92 #include <net/if.h>
93 #include <net/if_var.h>
94 #include <net/if_private.h>
95 #include <net/netisr.h>
96 #include <net/netisr_internal.h>
97 #include <net/vnet.h>
98 
99 /*-
100  * Synchronize use and modification of the registered netisr data structures;
101  * acquire a read lock while modifying the set of registered protocols to
102  * prevent partially registered or unregistered protocols from being run.
103  *
104  * The following data structures and fields are protected by this lock:
105  *
106  * - The netisr_proto array, including all fields of struct netisr_proto.
107  * - The nws array, including all fields of struct netisr_worker.
108  * - The nws_array array.
109  *
110  * Note: the NETISR_LOCKING define controls whether read locks are acquired
111  * in packet processing paths requiring netisr registration stability.  This
112  * is disabled by default as it can lead to measurable performance
113  * degradation even with rmlocks (3%-6% for loopback ping-pong traffic), and
114  * because netisr registration and unregistration is extremely rare at
115  * runtime.  If it becomes more common, this decision should be revisited.
116  *
117  * XXXRW: rmlocks don't support assertions.
118  */
119 static struct rmlock	netisr_rmlock;
120 #define	NETISR_LOCK_INIT()	rm_init_flags(&netisr_rmlock, "netisr", \
121 				    RM_NOWITNESS)
122 #define	NETISR_LOCK_ASSERT()
123 #define	NETISR_RLOCK(tracker)	rm_rlock(&netisr_rmlock, (tracker))
124 #define	NETISR_RUNLOCK(tracker)	rm_runlock(&netisr_rmlock, (tracker))
125 #define	NETISR_WLOCK()		rm_wlock(&netisr_rmlock)
126 #define	NETISR_WUNLOCK()	rm_wunlock(&netisr_rmlock)
127 /* #define	NETISR_LOCKING */
128 
129 static SYSCTL_NODE(_net, OID_AUTO, isr, CTLFLAG_RW | CTLFLAG_MPSAFE, 0,
130     "netisr");
131 
132 /*-
133  * Three global direct dispatch policies are supported:
134  *
135  * NETISR_DISPATCH_DEFERRED: All work is deferred for a netisr, regardless of
136  * context (may be overridden by protocols).
137  *
138  * NETISR_DISPATCH_HYBRID: If the executing context allows direct dispatch,
139  * and we're running on the CPU the work would be performed on, then direct
140  * dispatch it if it wouldn't violate ordering constraints on the workstream.
141  *
142  * NETISR_DISPATCH_DIRECT: If the executing context allows direct dispatch,
143  * always direct dispatch.  (The default.)
144  *
145  * Notice that changing the global policy could lead to short periods of
146  * misordered processing, but this is considered acceptable as compared to
147  * the complexity of enforcing ordering during policy changes.  Protocols can
148  * override the global policy (when they're not doing that, they select
149  * NETISR_DISPATCH_DEFAULT).
150  */
151 #define	NETISR_DISPATCH_POLICY_DEFAULT	NETISR_DISPATCH_DIRECT
152 #define	NETISR_DISPATCH_POLICY_MAXSTR	20 /* Used for temporary buffers. */
153 static u_int	netisr_dispatch_policy = NETISR_DISPATCH_POLICY_DEFAULT;
154 static int	sysctl_netisr_dispatch_policy(SYSCTL_HANDLER_ARGS);
155 SYSCTL_PROC(_net_isr, OID_AUTO, dispatch,
156     CTLTYPE_STRING | CTLFLAG_RWTUN | CTLFLAG_NEEDGIANT,
157     0, 0, sysctl_netisr_dispatch_policy, "A",
158     "netisr dispatch policy");
159 
160 /*
161  * Allow the administrator to limit the number of threads (CPUs) to use for
162  * netisr.  We don't check netisr_maxthreads before creating the thread for
163  * CPU 0. This must be set at boot. We will create at most one thread per CPU.
164  * By default we initialize this to 1 which would assign just 1 cpu (cpu0) and
165  * therefore only 1 workstream. If set to -1, netisr would use all cpus
166  * (mp_ncpus) and therefore would have those many workstreams. One workstream
167  * per thread (CPU).
168  */
169 static int	netisr_maxthreads = 1;		/* Max number of threads. */
170 SYSCTL_INT(_net_isr, OID_AUTO, maxthreads, CTLFLAG_RDTUN,
171     &netisr_maxthreads, 0,
172     "Use at most this many CPUs for netisr processing");
173 
174 static int	netisr_bindthreads = 0;		/* Bind threads to CPUs. */
175 SYSCTL_INT(_net_isr, OID_AUTO, bindthreads, CTLFLAG_RDTUN,
176     &netisr_bindthreads, 0, "Bind netisr threads to CPUs.");
177 
178 /*
179  * Limit per-workstream mbuf queue limits s to at most net.isr.maxqlimit,
180  * both for initial configuration and later modification using
181  * netisr_setqlimit().
182  */
183 #define	NETISR_DEFAULT_MAXQLIMIT	10240
184 static u_int	netisr_maxqlimit = NETISR_DEFAULT_MAXQLIMIT;
185 SYSCTL_UINT(_net_isr, OID_AUTO, maxqlimit, CTLFLAG_RDTUN,
186     &netisr_maxqlimit, 0,
187     "Maximum netisr per-protocol, per-CPU queue depth.");
188 
189 /*
190  * The default per-workstream mbuf queue limit for protocols that don't
191  * initialize the nh_qlimit field of their struct netisr_handler.  If this is
192  * set above netisr_maxqlimit, we truncate it to the maximum during boot.
193  */
194 #define	NETISR_DEFAULT_DEFAULTQLIMIT	256
195 static u_int	netisr_defaultqlimit = NETISR_DEFAULT_DEFAULTQLIMIT;
196 SYSCTL_UINT(_net_isr, OID_AUTO, defaultqlimit, CTLFLAG_RDTUN,
197     &netisr_defaultqlimit, 0,
198     "Default netisr per-protocol, per-CPU queue limit if not set by protocol");
199 
200 /*
201  * Store and export the compile-time constant NETISR_MAXPROT limit on the
202  * number of protocols that can register with netisr at a time.  This is
203  * required for crashdump analysis, as it sizes netisr_proto[].
204  */
205 static u_int	netisr_maxprot = NETISR_MAXPROT;
206 SYSCTL_UINT(_net_isr, OID_AUTO, maxprot, CTLFLAG_RD,
207     &netisr_maxprot, 0,
208     "Compile-time limit on the number of protocols supported by netisr.");
209 
210 /*
211  * The netisr_proto array describes all registered protocols, indexed by
212  * protocol number.  See netisr_internal.h for more details.
213  */
214 static struct netisr_proto	netisr_proto[NETISR_MAXPROT];
215 
216 #ifdef VIMAGE
217 /*
218  * The netisr_enable array describes a per-VNET flag for registered
219  * protocols on whether this netisr is active in this VNET or not.
220  * netisr_register() will automatically enable the netisr for the
221  * default VNET and all currently active instances.
222  * netisr_unregister() will disable all active VNETs, including vnet0.
223  * Individual network stack instances can be enabled/disabled by the
224  * netisr_(un)register _vnet() functions.
225  * With this we keep the one netisr_proto per protocol but add a
226  * mechanism to stop netisr processing for vnet teardown.
227  * Apart from that we expect a VNET to always be enabled.
228  */
229 VNET_DEFINE_STATIC(u_int,	netisr_enable[NETISR_MAXPROT]);
230 #define	V_netisr_enable		VNET(netisr_enable)
231 #endif
232 
233 /*
234  * Per-CPU workstream data.  See netisr_internal.h for more details.
235  */
236 DPCPU_DEFINE(struct netisr_workstream, nws);
237 
238 /*
239  * Map contiguous values between 0 and nws_count into CPU IDs appropriate for
240  * accessing workstreams.  This allows constructions of the form
241  * DPCPU_ID_GET(nws_array[arbitraryvalue % nws_count], nws).
242  */
243 static u_int				 nws_array[MAXCPU];
244 
245 /*
246  * Number of registered workstreams.  Will be at most the number of running
247  * CPUs once fully started.
248  */
249 static u_int				 nws_count;
250 SYSCTL_UINT(_net_isr, OID_AUTO, numthreads, CTLFLAG_RD,
251     &nws_count, 0, "Number of extant netisr threads.");
252 
253 /*
254  * Synchronization for each workstream: a mutex protects all mutable fields
255  * in each stream, including per-protocol state (mbuf queues).  The SWI is
256  * woken up if asynchronous dispatch is required.
257  */
258 #define	NWS_LOCK(s)		mtx_lock(&(s)->nws_mtx)
259 #define	NWS_LOCK_ASSERT(s)	mtx_assert(&(s)->nws_mtx, MA_OWNED)
260 #define	NWS_UNLOCK(s)		mtx_unlock(&(s)->nws_mtx)
261 #define	NWS_SIGNAL(s)		swi_sched((s)->nws_swi_cookie, 0)
262 
263 /*
264  * Utility routines for protocols that implement their own mapping of flows
265  * to CPUs.
266  */
267 u_int
268 netisr_get_cpucount(void)
269 {
270 
271 	return (nws_count);
272 }
273 
274 u_int
275 netisr_get_cpuid(u_int cpunumber)
276 {
277 
278 	return (nws_array[cpunumber % nws_count]);
279 }
280 
281 /*
282  * The default implementation of flow -> CPU ID mapping.
283  *
284  * Non-static so that protocols can use it to map their own work to specific
285  * CPUs in a manner consistent to netisr for affinity purposes.
286  */
287 u_int
288 netisr_default_flow2cpu(u_int flowid)
289 {
290 
291 	return (nws_array[flowid % nws_count]);
292 }
293 
294 /*
295  * Dispatch tunable and sysctl configuration.
296  */
297 struct netisr_dispatch_table_entry {
298 	u_int		 ndte_policy;
299 	const char	*ndte_policy_str;
300 };
301 static const struct netisr_dispatch_table_entry netisr_dispatch_table[] = {
302 	{ NETISR_DISPATCH_DEFAULT, "default" },
303 	{ NETISR_DISPATCH_DEFERRED, "deferred" },
304 	{ NETISR_DISPATCH_HYBRID, "hybrid" },
305 	{ NETISR_DISPATCH_DIRECT, "direct" },
306 };
307 
308 static void
309 netisr_dispatch_policy_to_str(u_int dispatch_policy, char *buffer,
310     u_int buflen)
311 {
312 	const struct netisr_dispatch_table_entry *ndtep;
313 	const char *str;
314 	u_int i;
315 
316 	str = "unknown";
317 	for (i = 0; i < nitems(netisr_dispatch_table); i++) {
318 		ndtep = &netisr_dispatch_table[i];
319 		if (ndtep->ndte_policy == dispatch_policy) {
320 			str = ndtep->ndte_policy_str;
321 			break;
322 		}
323 	}
324 	snprintf(buffer, buflen, "%s", str);
325 }
326 
327 static int
328 netisr_dispatch_policy_from_str(const char *str, u_int *dispatch_policyp)
329 {
330 	const struct netisr_dispatch_table_entry *ndtep;
331 	u_int i;
332 
333 	for (i = 0; i < nitems(netisr_dispatch_table); i++) {
334 		ndtep = &netisr_dispatch_table[i];
335 		if (strcmp(ndtep->ndte_policy_str, str) == 0) {
336 			*dispatch_policyp = ndtep->ndte_policy;
337 			return (0);
338 		}
339 	}
340 	return (EINVAL);
341 }
342 
343 static int
344 sysctl_netisr_dispatch_policy(SYSCTL_HANDLER_ARGS)
345 {
346 	char tmp[NETISR_DISPATCH_POLICY_MAXSTR];
347 	size_t len;
348 	u_int dispatch_policy;
349 	int error;
350 
351 	netisr_dispatch_policy_to_str(netisr_dispatch_policy, tmp,
352 	    sizeof(tmp));
353 	/*
354 	 * netisr is initialised very early during the boot when malloc isn't
355 	 * available yet so we can't use sysctl_handle_string() to process
356 	 * any non-default value that was potentially set via loader.
357 	 */
358 	if (req->newptr != NULL) {
359 		len = req->newlen - req->newidx;
360 		if (len >= NETISR_DISPATCH_POLICY_MAXSTR)
361 			return (EINVAL);
362 		error = SYSCTL_IN(req, tmp, len);
363 		if (error == 0) {
364 			tmp[len] = '\0';
365 			error = netisr_dispatch_policy_from_str(tmp,
366 			    &dispatch_policy);
367 			if (error == 0 &&
368 			    dispatch_policy == NETISR_DISPATCH_DEFAULT)
369 				error = EINVAL;
370 			if (error == 0)
371 				netisr_dispatch_policy = dispatch_policy;
372 		}
373 	} else {
374 		error = sysctl_handle_string(oidp, tmp, sizeof(tmp), req);
375 	}
376 	return (error);
377 }
378 
379 /*
380  * Register a new netisr handler, which requires initializing per-protocol
381  * fields for each workstream.  All netisr work is briefly suspended while
382  * the protocol is installed.
383  */
384 void
385 netisr_register(const struct netisr_handler *nhp)
386 {
387 	VNET_ITERATOR_DECL(vnet_iter);
388 	struct netisr_work *npwp;
389 	const char *name;
390 	u_int i, proto;
391 
392 	proto = nhp->nh_proto;
393 	name = nhp->nh_name;
394 
395 	/*
396 	 * Test that the requested registration is valid.
397 	 */
398 	CURVNET_ASSERT_SET();
399 	MPASS(IS_DEFAULT_VNET(curvnet));
400 	KASSERT(nhp->nh_name != NULL,
401 	    ("%s: nh_name NULL for %u", __func__, proto));
402 	KASSERT(nhp->nh_handler != NULL,
403 	    ("%s: nh_handler NULL for %s", __func__, name));
404 	KASSERT(nhp->nh_policy == NETISR_POLICY_SOURCE ||
405 	    nhp->nh_policy == NETISR_POLICY_FLOW ||
406 	    nhp->nh_policy == NETISR_POLICY_CPU,
407 	    ("%s: unsupported nh_policy %u for %s", __func__,
408 	    nhp->nh_policy, name));
409 	KASSERT(nhp->nh_policy == NETISR_POLICY_FLOW ||
410 	    nhp->nh_m2flow == NULL,
411 	    ("%s: nh_policy != FLOW but m2flow defined for %s", __func__,
412 	    name));
413 	KASSERT(nhp->nh_policy == NETISR_POLICY_CPU || nhp->nh_m2cpuid == NULL,
414 	    ("%s: nh_policy != CPU but m2cpuid defined for %s", __func__,
415 	    name));
416 	KASSERT(nhp->nh_policy != NETISR_POLICY_CPU || nhp->nh_m2cpuid != NULL,
417 	    ("%s: nh_policy == CPU but m2cpuid not defined for %s", __func__,
418 	    name));
419 	KASSERT(nhp->nh_dispatch == NETISR_DISPATCH_DEFAULT ||
420 	    nhp->nh_dispatch == NETISR_DISPATCH_DEFERRED ||
421 	    nhp->nh_dispatch == NETISR_DISPATCH_HYBRID ||
422 	    nhp->nh_dispatch == NETISR_DISPATCH_DIRECT,
423 	    ("%s: invalid nh_dispatch (%u)", __func__, nhp->nh_dispatch));
424 
425 	KASSERT(proto < NETISR_MAXPROT,
426 	    ("%s(%u, %s): protocol too big", __func__, proto, name));
427 
428 	/*
429 	 * Test that no existing registration exists for this protocol.
430 	 */
431 	NETISR_WLOCK();
432 	KASSERT(netisr_proto[proto].np_name == NULL,
433 	    ("%s(%u, %s): name present", __func__, proto, name));
434 	KASSERT(netisr_proto[proto].np_handler == NULL,
435 	    ("%s(%u, %s): handler present", __func__, proto, name));
436 
437 	netisr_proto[proto].np_name = name;
438 	netisr_proto[proto].np_handler = nhp->nh_handler;
439 	netisr_proto[proto].np_m2flow = nhp->nh_m2flow;
440 	netisr_proto[proto].np_m2cpuid = nhp->nh_m2cpuid;
441 	netisr_proto[proto].np_drainedcpu = nhp->nh_drainedcpu;
442 	if (nhp->nh_qlimit == 0)
443 		netisr_proto[proto].np_qlimit = netisr_defaultqlimit;
444 	else if (nhp->nh_qlimit > netisr_maxqlimit) {
445 		printf("%s: %s requested queue limit %u capped to "
446 		    "net.isr.maxqlimit %u\n", __func__, name, nhp->nh_qlimit,
447 		    netisr_maxqlimit);
448 		netisr_proto[proto].np_qlimit = netisr_maxqlimit;
449 	} else
450 		netisr_proto[proto].np_qlimit = nhp->nh_qlimit;
451 	netisr_proto[proto].np_policy = nhp->nh_policy;
452 	netisr_proto[proto].np_dispatch = nhp->nh_dispatch;
453 	CPU_FOREACH(i) {
454 		npwp = &(DPCPU_ID_PTR(i, nws))->nws_work[proto];
455 		bzero(npwp, sizeof(*npwp));
456 		npwp->nw_qlimit = netisr_proto[proto].np_qlimit;
457 	}
458 
459 #ifdef VIMAGE
460 	V_netisr_enable[proto] = 1;
461 	VNET_LIST_RLOCK_NOSLEEP();
462 	VNET_FOREACH(vnet_iter) {
463 		if (vnet_iter == curvnet)
464 			continue;
465 		CURVNET_SET(vnet_iter);
466 		V_netisr_enable[proto] = 1;
467 		CURVNET_RESTORE();
468 	}
469 	VNET_LIST_RUNLOCK_NOSLEEP();
470 #endif
471 	NETISR_WUNLOCK();
472 }
473 
474 /*
475  * Clear drop counters across all workstreams for a protocol.
476  */
477 void
478 netisr_clearqdrops(const struct netisr_handler *nhp)
479 {
480 	struct netisr_work *npwp;
481 #ifdef INVARIANTS
482 	const char *name;
483 #endif
484 	u_int i, proto;
485 
486 	proto = nhp->nh_proto;
487 #ifdef INVARIANTS
488 	name = nhp->nh_name;
489 #endif
490 	KASSERT(proto < NETISR_MAXPROT,
491 	    ("%s(%u): protocol too big for %s", __func__, proto, name));
492 
493 	NETISR_WLOCK();
494 	KASSERT(netisr_proto[proto].np_handler != NULL,
495 	    ("%s(%u): protocol not registered for %s", __func__, proto,
496 	    name));
497 
498 	CPU_FOREACH(i) {
499 		npwp = &(DPCPU_ID_PTR(i, nws))->nws_work[proto];
500 		npwp->nw_qdrops = 0;
501 	}
502 	NETISR_WUNLOCK();
503 }
504 
505 /*
506  * Query current drop counters across all workstreams for a protocol.
507  */
508 void
509 netisr_getqdrops(const struct netisr_handler *nhp, u_int64_t *qdropp)
510 {
511 	struct netisr_work *npwp;
512 	struct rm_priotracker tracker;
513 #ifdef INVARIANTS
514 	const char *name;
515 #endif
516 	u_int i, proto;
517 
518 	*qdropp = 0;
519 	proto = nhp->nh_proto;
520 #ifdef INVARIANTS
521 	name = nhp->nh_name;
522 #endif
523 	KASSERT(proto < NETISR_MAXPROT,
524 	    ("%s(%u): protocol too big for %s", __func__, proto, name));
525 
526 	NETISR_RLOCK(&tracker);
527 	KASSERT(netisr_proto[proto].np_handler != NULL,
528 	    ("%s(%u): protocol not registered for %s", __func__, proto,
529 	    name));
530 
531 	CPU_FOREACH(i) {
532 		npwp = &(DPCPU_ID_PTR(i, nws))->nws_work[proto];
533 		*qdropp += npwp->nw_qdrops;
534 	}
535 	NETISR_RUNLOCK(&tracker);
536 }
537 
538 /*
539  * Query current per-workstream queue limit for a protocol.
540  */
541 void
542 netisr_getqlimit(const struct netisr_handler *nhp, u_int *qlimitp)
543 {
544 	struct rm_priotracker tracker;
545 #ifdef INVARIANTS
546 	const char *name;
547 #endif
548 	u_int proto;
549 
550 	proto = nhp->nh_proto;
551 #ifdef INVARIANTS
552 	name = nhp->nh_name;
553 #endif
554 	KASSERT(proto < NETISR_MAXPROT,
555 	    ("%s(%u): protocol too big for %s", __func__, proto, name));
556 
557 	NETISR_RLOCK(&tracker);
558 	KASSERT(netisr_proto[proto].np_handler != NULL,
559 	    ("%s(%u): protocol not registered for %s", __func__, proto,
560 	    name));
561 	*qlimitp = netisr_proto[proto].np_qlimit;
562 	NETISR_RUNLOCK(&tracker);
563 }
564 
565 /*
566  * Update the queue limit across per-workstream queues for a protocol.  We
567  * simply change the limits, and don't drain overflowed packets as they will
568  * (hopefully) take care of themselves shortly.
569  */
570 int
571 netisr_setqlimit(const struct netisr_handler *nhp, u_int qlimit)
572 {
573 	struct netisr_work *npwp;
574 #ifdef INVARIANTS
575 	const char *name;
576 #endif
577 	u_int i, proto;
578 
579 	if (qlimit > netisr_maxqlimit)
580 		return (EINVAL);
581 
582 	proto = nhp->nh_proto;
583 #ifdef INVARIANTS
584 	name = nhp->nh_name;
585 #endif
586 	KASSERT(proto < NETISR_MAXPROT,
587 	    ("%s(%u): protocol too big for %s", __func__, proto, name));
588 
589 	NETISR_WLOCK();
590 	KASSERT(netisr_proto[proto].np_handler != NULL,
591 	    ("%s(%u): protocol not registered for %s", __func__, proto,
592 	    name));
593 
594 	netisr_proto[proto].np_qlimit = qlimit;
595 	CPU_FOREACH(i) {
596 		npwp = &(DPCPU_ID_PTR(i, nws))->nws_work[proto];
597 		npwp->nw_qlimit = qlimit;
598 	}
599 	NETISR_WUNLOCK();
600 	return (0);
601 }
602 
603 /*
604  * Drain all packets currently held in a particular protocol work queue.
605  */
606 static void
607 netisr_drain_proto(struct netisr_work *npwp)
608 {
609 	struct mbuf *m;
610 
611 	/*
612 	 * We would assert the lock on the workstream but it's not passed in.
613 	 */
614 	while ((m = npwp->nw_head) != NULL) {
615 		npwp->nw_head = m->m_nextpkt;
616 		m->m_nextpkt = NULL;
617 		if (npwp->nw_head == NULL)
618 			npwp->nw_tail = NULL;
619 		npwp->nw_len--;
620 		m_freem(m);
621 	}
622 	KASSERT(npwp->nw_tail == NULL, ("%s: tail", __func__));
623 	KASSERT(npwp->nw_len == 0, ("%s: len", __func__));
624 }
625 
626 /*
627  * Remove the registration of a network protocol, which requires clearing
628  * per-protocol fields across all workstreams, including freeing all mbufs in
629  * the queues at time of unregister.  All work in netisr is briefly suspended
630  * while this takes place.
631  */
632 void
633 netisr_unregister(const struct netisr_handler *nhp)
634 {
635 	VNET_ITERATOR_DECL(vnet_iter);
636 	struct netisr_work *npwp;
637 #ifdef INVARIANTS
638 	const char *name;
639 #endif
640 	u_int i, proto;
641 
642 	proto = nhp->nh_proto;
643 #ifdef INVARIANTS
644 	name = nhp->nh_name;
645 #endif
646 	KASSERT(proto < NETISR_MAXPROT,
647 	    ("%s(%u): protocol too big for %s", __func__, proto, name));
648 
649 	NETISR_WLOCK();
650 	KASSERT(netisr_proto[proto].np_handler != NULL,
651 	    ("%s(%u): protocol not registered for %s", __func__, proto,
652 	    name));
653 
654 #ifdef VIMAGE
655 	VNET_LIST_RLOCK_NOSLEEP();
656 	VNET_FOREACH(vnet_iter) {
657 		CURVNET_SET(vnet_iter);
658 		V_netisr_enable[proto] = 0;
659 		CURVNET_RESTORE();
660 	}
661 	VNET_LIST_RUNLOCK_NOSLEEP();
662 #endif
663 
664 	netisr_proto[proto].np_name = NULL;
665 	netisr_proto[proto].np_handler = NULL;
666 	netisr_proto[proto].np_m2flow = NULL;
667 	netisr_proto[proto].np_m2cpuid = NULL;
668 	netisr_proto[proto].np_qlimit = 0;
669 	netisr_proto[proto].np_policy = 0;
670 	CPU_FOREACH(i) {
671 		npwp = &(DPCPU_ID_PTR(i, nws))->nws_work[proto];
672 		netisr_drain_proto(npwp);
673 		bzero(npwp, sizeof(*npwp));
674 	}
675 	NETISR_WUNLOCK();
676 }
677 
678 #ifdef VIMAGE
679 void
680 netisr_register_vnet(const struct netisr_handler *nhp)
681 {
682 	u_int proto;
683 
684 	proto = nhp->nh_proto;
685 
686 	KASSERT(curvnet != NULL, ("%s: curvnet is NULL", __func__));
687 	KASSERT(proto < NETISR_MAXPROT,
688 	    ("%s(%u): protocol too big for %s", __func__, proto, nhp->nh_name));
689 	NETISR_WLOCK();
690 	KASSERT(netisr_proto[proto].np_handler != NULL,
691 	    ("%s(%u): protocol not registered for %s", __func__, proto,
692 	    nhp->nh_name));
693 
694 	V_netisr_enable[proto] = 1;
695 	NETISR_WUNLOCK();
696 }
697 
698 static void
699 netisr_drain_proto_vnet(struct vnet *vnet, u_int proto)
700 {
701 	struct epoch_tracker et;
702 	struct netisr_workstream *nwsp;
703 	struct netisr_work *npwp;
704 	struct mbuf *m, *mp, *n, *ne;
705 	struct ifnet *ifp;
706 	u_int i;
707 
708 	KASSERT(vnet != NULL, ("%s: vnet is NULL", __func__));
709 	NETISR_LOCK_ASSERT();
710 
711 	CPU_FOREACH(i) {
712 		nwsp = DPCPU_ID_PTR(i, nws);
713 		if (nwsp->nws_intr_event == NULL)
714 			continue;
715 		npwp = &nwsp->nws_work[proto];
716 		NWS_LOCK(nwsp);
717 
718 		/*
719 		 * Rather than dissecting and removing mbufs from the middle
720 		 * of the chain, we build a new chain if the packet stays and
721 		 * update the head and tail pointers at the end.  All packets
722 		 * matching the given vnet are freed.
723 		 */
724 		m = npwp->nw_head;
725 		n = ne = NULL;
726 		NET_EPOCH_ENTER(et);
727 		while (m != NULL) {
728 			mp = m;
729 			m = m->m_nextpkt;
730 			mp->m_nextpkt = NULL;
731 			if ((ifp = ifnet_byindexgen(mp->m_pkthdr.rcvidx,
732 			    mp->m_pkthdr.rcvgen)) != NULL &&
733 			    ifp->if_vnet != vnet) {
734 				if (n == NULL) {
735 					n = ne = mp;
736 				} else {
737 					ne->m_nextpkt = mp;
738 					ne = mp;
739 				}
740 				continue;
741 			}
742 			/* This is a packet in the selected vnet, or belongs
743 			   to destroyed interface. Free it. */
744 			npwp->nw_len--;
745 			m_freem(mp);
746 		}
747 		NET_EPOCH_EXIT(et);
748 		npwp->nw_head = n;
749 		npwp->nw_tail = ne;
750 		NWS_UNLOCK(nwsp);
751 	}
752 }
753 
754 void
755 netisr_unregister_vnet(const struct netisr_handler *nhp)
756 {
757 	u_int proto;
758 
759 	proto = nhp->nh_proto;
760 
761 	KASSERT(curvnet != NULL, ("%s: curvnet is NULL", __func__));
762 	KASSERT(proto < NETISR_MAXPROT,
763 	    ("%s(%u): protocol too big for %s", __func__, proto, nhp->nh_name));
764 	NETISR_WLOCK();
765 	KASSERT(netisr_proto[proto].np_handler != NULL,
766 	    ("%s(%u): protocol not registered for %s", __func__, proto,
767 	    nhp->nh_name));
768 
769 	V_netisr_enable[proto] = 0;
770 
771 	netisr_drain_proto_vnet(curvnet, proto);
772 	NETISR_WUNLOCK();
773 }
774 #endif
775 
776 /*
777  * Compose the global and per-protocol policies on dispatch, and return the
778  * dispatch policy to use.
779  */
780 static u_int
781 netisr_get_dispatch(struct netisr_proto *npp)
782 {
783 
784 	/*
785 	 * Protocol-specific configuration overrides the global default.
786 	 */
787 	if (npp->np_dispatch != NETISR_DISPATCH_DEFAULT)
788 		return (npp->np_dispatch);
789 	return (netisr_dispatch_policy);
790 }
791 
792 /*
793  * Look up the workstream given a packet and source identifier.  Do this by
794  * checking the protocol's policy, and optionally call out to the protocol
795  * for assistance if required.
796  */
797 static struct mbuf *
798 netisr_select_cpuid(struct netisr_proto *npp, u_int dispatch_policy,
799     uintptr_t source, struct mbuf *m, u_int *cpuidp)
800 {
801 	struct ifnet *ifp;
802 	u_int policy;
803 
804 	NETISR_LOCK_ASSERT();
805 
806 	/*
807 	 * In the event we have only one worker, shortcut and deliver to it
808 	 * without further ado.
809 	 */
810 	if (nws_count == 1) {
811 		*cpuidp = nws_array[0];
812 		return (m);
813 	}
814 
815 	/*
816 	 * What happens next depends on the policy selected by the protocol.
817 	 * If we want to support per-interface policies, we should do that
818 	 * here first.
819 	 */
820 	policy = npp->np_policy;
821 	if (policy == NETISR_POLICY_CPU) {
822 		m = npp->np_m2cpuid(m, source, cpuidp);
823 		if (m == NULL)
824 			return (NULL);
825 
826 		/*
827 		 * It's possible for a protocol not to have a good idea about
828 		 * where to process a packet, in which case we fall back on
829 		 * the netisr code to decide.  In the hybrid case, return the
830 		 * current CPU ID, which will force an immediate direct
831 		 * dispatch.  In the queued case, fall back on the SOURCE
832 		 * policy.
833 		 */
834 		if (*cpuidp != NETISR_CPUID_NONE) {
835 			*cpuidp = netisr_get_cpuid(*cpuidp);
836 			return (m);
837 		}
838 		if (dispatch_policy == NETISR_DISPATCH_HYBRID) {
839 			*cpuidp = netisr_get_cpuid(curcpu);
840 			return (m);
841 		}
842 		policy = NETISR_POLICY_SOURCE;
843 	}
844 
845 	if (policy == NETISR_POLICY_FLOW) {
846 		if (M_HASHTYPE_GET(m) == M_HASHTYPE_NONE &&
847 		    npp->np_m2flow != NULL) {
848 			m = npp->np_m2flow(m, source);
849 			if (m == NULL)
850 				return (NULL);
851 		}
852 		if (M_HASHTYPE_GET(m) != M_HASHTYPE_NONE) {
853 			*cpuidp =
854 			    netisr_default_flow2cpu(m->m_pkthdr.flowid);
855 			return (m);
856 		}
857 		policy = NETISR_POLICY_SOURCE;
858 	}
859 
860 	KASSERT(policy == NETISR_POLICY_SOURCE,
861 	    ("%s: invalid policy %u for %s", __func__, npp->np_policy,
862 	    npp->np_name));
863 
864 	MPASS((m->m_pkthdr.csum_flags & CSUM_SND_TAG) == 0);
865 	ifp = m->m_pkthdr.rcvif;
866 	if (ifp != NULL)
867 		*cpuidp = nws_array[(ifp->if_index + source) % nws_count];
868 	else
869 		*cpuidp = nws_array[source % nws_count];
870 	return (m);
871 }
872 
873 /*
874  * Process packets associated with a workstream and protocol.  For reasons of
875  * fairness, we process up to one complete netisr queue at a time, moving the
876  * queue to a stack-local queue for processing, but do not loop refreshing
877  * from the global queue.  The caller is responsible for deciding whether to
878  * loop, and for setting the NWS_RUNNING flag.  The passed workstream will be
879  * locked on entry and relocked before return, but will be released while
880  * processing.  The number of packets processed is returned.
881  */
882 static u_int
883 netisr_process_workstream_proto(struct netisr_workstream *nwsp, u_int proto)
884 {
885 	struct netisr_work local_npw, *npwp;
886 	u_int handled;
887 	struct mbuf *m;
888 
889 	NETISR_LOCK_ASSERT();
890 	NWS_LOCK_ASSERT(nwsp);
891 
892 	KASSERT(nwsp->nws_flags & NWS_RUNNING,
893 	    ("%s(%u): not running", __func__, proto));
894 	KASSERT(proto >= 0 && proto < NETISR_MAXPROT,
895 	    ("%s(%u): invalid proto\n", __func__, proto));
896 
897 	npwp = &nwsp->nws_work[proto];
898 	if (npwp->nw_len == 0)
899 		return (0);
900 
901 	/*
902 	 * Move the global work queue to a thread-local work queue.
903 	 *
904 	 * Notice that this means the effective maximum length of the queue
905 	 * is actually twice that of the maximum queue length specified in
906 	 * the protocol registration call.
907 	 */
908 	handled = npwp->nw_len;
909 	local_npw = *npwp;
910 	npwp->nw_head = NULL;
911 	npwp->nw_tail = NULL;
912 	npwp->nw_len = 0;
913 	nwsp->nws_pendingbits &= ~(1 << proto);
914 	NWS_UNLOCK(nwsp);
915 	while ((m = local_npw.nw_head) != NULL) {
916 		local_npw.nw_head = m->m_nextpkt;
917 		m->m_nextpkt = NULL;
918 		if (local_npw.nw_head == NULL)
919 			local_npw.nw_tail = NULL;
920 		local_npw.nw_len--;
921 		if (__predict_false(m_rcvif_restore(m) == NULL)) {
922 			m_freem(m);
923 			continue;
924 		}
925 		CURVNET_SET(m->m_pkthdr.rcvif->if_vnet);
926 		netisr_proto[proto].np_handler(m);
927 		CURVNET_RESTORE();
928 	}
929 	KASSERT(local_npw.nw_len == 0,
930 	    ("%s(%u): len %u", __func__, proto, local_npw.nw_len));
931 	if (netisr_proto[proto].np_drainedcpu)
932 		netisr_proto[proto].np_drainedcpu(nwsp->nws_cpu);
933 	NWS_LOCK(nwsp);
934 	npwp->nw_handled += handled;
935 	return (handled);
936 }
937 
938 /*
939  * SWI handler for netisr -- processes packets in a set of workstreams that
940  * it owns, woken up by calls to NWS_SIGNAL().  If this workstream is already
941  * being direct dispatched, go back to sleep and wait for the dispatching
942  * thread to wake us up again.
943  */
944 static void
945 swi_net(void *arg)
946 {
947 #ifdef NETISR_LOCKING
948 	struct rm_priotracker tracker;
949 #endif
950 	struct netisr_workstream *nwsp;
951 	u_int bits, prot;
952 
953 	nwsp = arg;
954 
955 #ifdef DEVICE_POLLING
956 	KASSERT(nws_count == 1,
957 	    ("%s: device_polling but nws_count != 1", __func__));
958 	netisr_poll();
959 #endif
960 #ifdef NETISR_LOCKING
961 	NETISR_RLOCK(&tracker);
962 #endif
963 	NWS_LOCK(nwsp);
964 	KASSERT(!(nwsp->nws_flags & NWS_RUNNING), ("swi_net: running"));
965 	if (nwsp->nws_flags & NWS_DISPATCHING)
966 		goto out;
967 	nwsp->nws_flags |= NWS_RUNNING;
968 	nwsp->nws_flags &= ~NWS_SCHEDULED;
969 	while ((bits = nwsp->nws_pendingbits) != 0) {
970 		while (bits != 0) {
971 			prot = ffs(bits) - 1;
972 			bits &= ~(1 << prot);
973 			(void)netisr_process_workstream_proto(nwsp, prot);
974 		}
975 	}
976 	nwsp->nws_flags &= ~NWS_RUNNING;
977 out:
978 	NWS_UNLOCK(nwsp);
979 #ifdef NETISR_LOCKING
980 	NETISR_RUNLOCK(&tracker);
981 #endif
982 #ifdef DEVICE_POLLING
983 	netisr_pollmore();
984 #endif
985 }
986 
987 static int
988 netisr_queue_workstream(struct netisr_workstream *nwsp, u_int proto,
989     struct netisr_work *npwp, struct mbuf *m, int *dosignalp)
990 {
991 
992 	NWS_LOCK_ASSERT(nwsp);
993 
994 	*dosignalp = 0;
995 	if (npwp->nw_len < npwp->nw_qlimit) {
996 		m_rcvif_serialize(m);
997 		m->m_nextpkt = NULL;
998 		if (npwp->nw_head == NULL) {
999 			npwp->nw_head = m;
1000 			npwp->nw_tail = m;
1001 		} else {
1002 			npwp->nw_tail->m_nextpkt = m;
1003 			npwp->nw_tail = m;
1004 		}
1005 		npwp->nw_len++;
1006 		if (npwp->nw_len > npwp->nw_watermark)
1007 			npwp->nw_watermark = npwp->nw_len;
1008 
1009 		/*
1010 		 * We must set the bit regardless of NWS_RUNNING, so that
1011 		 * swi_net() keeps calling netisr_process_workstream_proto().
1012 		 */
1013 		nwsp->nws_pendingbits |= (1 << proto);
1014 		if (!(nwsp->nws_flags &
1015 		    (NWS_RUNNING | NWS_DISPATCHING | NWS_SCHEDULED))) {
1016 			nwsp->nws_flags |= NWS_SCHEDULED;
1017 			*dosignalp = 1;	/* Defer until unlocked. */
1018 		}
1019 		npwp->nw_queued++;
1020 		return (0);
1021 	} else {
1022 		m_freem(m);
1023 		npwp->nw_qdrops++;
1024 		return (ENOBUFS);
1025 	}
1026 }
1027 
1028 static int
1029 netisr_queue_internal(u_int proto, struct mbuf *m, u_int cpuid)
1030 {
1031 	struct netisr_workstream *nwsp;
1032 	struct netisr_work *npwp;
1033 	int dosignal, error;
1034 
1035 #ifdef NETISR_LOCKING
1036 	NETISR_LOCK_ASSERT();
1037 #endif
1038 	KASSERT(cpuid <= mp_maxid, ("%s: cpuid too big (%u, %u)", __func__,
1039 	    cpuid, mp_maxid));
1040 	KASSERT(!CPU_ABSENT(cpuid), ("%s: CPU %u absent", __func__, cpuid));
1041 
1042 	dosignal = 0;
1043 	error = 0;
1044 	nwsp = DPCPU_ID_PTR(cpuid, nws);
1045 	npwp = &nwsp->nws_work[proto];
1046 	NWS_LOCK(nwsp);
1047 	error = netisr_queue_workstream(nwsp, proto, npwp, m, &dosignal);
1048 	NWS_UNLOCK(nwsp);
1049 	if (dosignal)
1050 		NWS_SIGNAL(nwsp);
1051 	return (error);
1052 }
1053 
1054 int
1055 netisr_queue_src(u_int proto, uintptr_t source, struct mbuf *m)
1056 {
1057 #ifdef NETISR_LOCKING
1058 	struct rm_priotracker tracker;
1059 #endif
1060 	u_int cpuid;
1061 	int error;
1062 
1063 	KASSERT(proto < NETISR_MAXPROT,
1064 	    ("%s: invalid proto %u", __func__, proto));
1065 
1066 #ifdef NETISR_LOCKING
1067 	NETISR_RLOCK(&tracker);
1068 #endif
1069 	KASSERT(netisr_proto[proto].np_handler != NULL,
1070 	    ("%s: invalid proto %u", __func__, proto));
1071 
1072 #ifdef VIMAGE
1073 	if (V_netisr_enable[proto] == 0) {
1074 		m_freem(m);
1075 		return (ENOPROTOOPT);
1076 	}
1077 #endif
1078 
1079 	m = netisr_select_cpuid(&netisr_proto[proto], NETISR_DISPATCH_DEFERRED,
1080 	    source, m, &cpuid);
1081 	if (m != NULL) {
1082 		KASSERT(!CPU_ABSENT(cpuid), ("%s: CPU %u absent", __func__,
1083 		    cpuid));
1084 		VNET_ASSERT(m->m_pkthdr.rcvif != NULL,
1085 		    ("%s:%d rcvif == NULL: m=%p", __func__, __LINE__, m));
1086 		error = netisr_queue_internal(proto, m, cpuid);
1087 	} else
1088 		error = ENOBUFS;
1089 #ifdef NETISR_LOCKING
1090 	NETISR_RUNLOCK(&tracker);
1091 #endif
1092 	return (error);
1093 }
1094 
1095 int
1096 netisr_queue(u_int proto, struct mbuf *m)
1097 {
1098 
1099 	return (netisr_queue_src(proto, 0, m));
1100 }
1101 
1102 /*
1103  * Dispatch a packet for netisr processing; direct dispatch is permitted by
1104  * calling context.
1105  */
1106 int
1107 netisr_dispatch_src(u_int proto, uintptr_t source, struct mbuf *m)
1108 {
1109 #ifdef NETISR_LOCKING
1110 	struct rm_priotracker tracker;
1111 #endif
1112 	struct netisr_workstream *nwsp;
1113 	struct netisr_proto *npp;
1114 	struct netisr_work *npwp;
1115 	int dosignal, error;
1116 	u_int cpuid, dispatch_policy;
1117 
1118 	NET_EPOCH_ASSERT();
1119 	KASSERT(proto < NETISR_MAXPROT,
1120 	    ("%s: invalid proto %u", __func__, proto));
1121 #ifdef NETISR_LOCKING
1122 	NETISR_RLOCK(&tracker);
1123 #endif
1124 	npp = &netisr_proto[proto];
1125 	KASSERT(npp->np_handler != NULL, ("%s: invalid proto %u", __func__,
1126 	    proto));
1127 
1128 #ifdef VIMAGE
1129 	if (V_netisr_enable[proto] == 0) {
1130 		m_freem(m);
1131 		return (ENOPROTOOPT);
1132 	}
1133 #endif
1134 
1135 	dispatch_policy = netisr_get_dispatch(npp);
1136 	if (dispatch_policy == NETISR_DISPATCH_DEFERRED)
1137 		return (netisr_queue_src(proto, source, m));
1138 
1139 	/*
1140 	 * If direct dispatch is forced, then unconditionally dispatch
1141 	 * without a formal CPU selection.  Borrow the current CPU's stats,
1142 	 * even if there's no worker on it.  In this case we don't update
1143 	 * nws_flags because all netisr processing will be source ordered due
1144 	 * to always being forced to directly dispatch.
1145 	 */
1146 	if (dispatch_policy == NETISR_DISPATCH_DIRECT) {
1147 		nwsp = DPCPU_PTR(nws);
1148 		npwp = &nwsp->nws_work[proto];
1149 		npwp->nw_dispatched++;
1150 		npwp->nw_handled++;
1151 		netisr_proto[proto].np_handler(m);
1152 		error = 0;
1153 		goto out_unlock;
1154 	}
1155 
1156 	KASSERT(dispatch_policy == NETISR_DISPATCH_HYBRID,
1157 	    ("%s: unknown dispatch policy (%u)", __func__, dispatch_policy));
1158 
1159 	/*
1160 	 * Otherwise, we execute in a hybrid mode where we will try to direct
1161 	 * dispatch if we're on the right CPU and the netisr worker isn't
1162 	 * already running.
1163 	 */
1164 	sched_pin();
1165 	m = netisr_select_cpuid(&netisr_proto[proto], NETISR_DISPATCH_HYBRID,
1166 	    source, m, &cpuid);
1167 	if (m == NULL) {
1168 		error = ENOBUFS;
1169 		goto out_unpin;
1170 	}
1171 	KASSERT(!CPU_ABSENT(cpuid), ("%s: CPU %u absent", __func__, cpuid));
1172 	if (cpuid != curcpu)
1173 		goto queue_fallback;
1174 	nwsp = DPCPU_PTR(nws);
1175 	npwp = &nwsp->nws_work[proto];
1176 
1177 	/*-
1178 	 * We are willing to direct dispatch only if three conditions hold:
1179 	 *
1180 	 * (1) The netisr worker isn't already running,
1181 	 * (2) Another thread isn't already directly dispatching, and
1182 	 * (3) The netisr hasn't already been woken up.
1183 	 */
1184 	NWS_LOCK(nwsp);
1185 	if (nwsp->nws_flags & (NWS_RUNNING | NWS_DISPATCHING | NWS_SCHEDULED)) {
1186 		error = netisr_queue_workstream(nwsp, proto, npwp, m,
1187 		    &dosignal);
1188 		NWS_UNLOCK(nwsp);
1189 		if (dosignal)
1190 			NWS_SIGNAL(nwsp);
1191 		goto out_unpin;
1192 	}
1193 
1194 	/*
1195 	 * The current thread is now effectively the netisr worker, so set
1196 	 * the dispatching flag to prevent concurrent processing of the
1197 	 * stream from another thread (even the netisr worker), which could
1198 	 * otherwise lead to effective misordering of the stream.
1199 	 */
1200 	nwsp->nws_flags |= NWS_DISPATCHING;
1201 	NWS_UNLOCK(nwsp);
1202 	netisr_proto[proto].np_handler(m);
1203 	NWS_LOCK(nwsp);
1204 	nwsp->nws_flags &= ~NWS_DISPATCHING;
1205 	npwp->nw_handled++;
1206 	npwp->nw_hybrid_dispatched++;
1207 
1208 	/*
1209 	 * If other work was enqueued by another thread while we were direct
1210 	 * dispatching, we need to signal the netisr worker to do that work.
1211 	 * In the future, we might want to do some of that work in the
1212 	 * current thread, rather than trigger further context switches.  If
1213 	 * so, we'll want to establish a reasonable bound on the work done in
1214 	 * the "borrowed" context.
1215 	 */
1216 	if (nwsp->nws_pendingbits != 0) {
1217 		nwsp->nws_flags |= NWS_SCHEDULED;
1218 		dosignal = 1;
1219 	} else
1220 		dosignal = 0;
1221 	NWS_UNLOCK(nwsp);
1222 	if (dosignal)
1223 		NWS_SIGNAL(nwsp);
1224 	error = 0;
1225 	goto out_unpin;
1226 
1227 queue_fallback:
1228 	error = netisr_queue_internal(proto, m, cpuid);
1229 out_unpin:
1230 	sched_unpin();
1231 out_unlock:
1232 #ifdef NETISR_LOCKING
1233 	NETISR_RUNLOCK(&tracker);
1234 #endif
1235 	return (error);
1236 }
1237 
1238 int
1239 netisr_dispatch(u_int proto, struct mbuf *m)
1240 {
1241 
1242 	return (netisr_dispatch_src(proto, 0, m));
1243 }
1244 
1245 #ifdef DEVICE_POLLING
1246 /*
1247  * Kernel polling borrows a netisr thread to run interface polling in; this
1248  * function allows kernel polling to request that the netisr thread be
1249  * scheduled even if no packets are pending for protocols.
1250  */
1251 void
1252 netisr_sched_poll(void)
1253 {
1254 	struct netisr_workstream *nwsp;
1255 
1256 	nwsp = DPCPU_ID_PTR(nws_array[0], nws);
1257 	NWS_SIGNAL(nwsp);
1258 }
1259 #endif
1260 
1261 static void
1262 netisr_start_swi(u_int cpuid, struct pcpu *pc)
1263 {
1264 	char swiname[12];
1265 	struct netisr_workstream *nwsp;
1266 	int error;
1267 
1268 	KASSERT(!CPU_ABSENT(cpuid), ("%s: CPU %u absent", __func__, cpuid));
1269 
1270 	nwsp = DPCPU_ID_PTR(cpuid, nws);
1271 	mtx_init(&nwsp->nws_mtx, "netisr_mtx", NULL, MTX_DEF);
1272 	nwsp->nws_cpu = cpuid;
1273 	snprintf(swiname, sizeof(swiname), "netisr %u", cpuid);
1274 	error = swi_add(&nwsp->nws_intr_event, swiname, swi_net, nwsp,
1275 	    SWI_NET, INTR_TYPE_NET | INTR_MPSAFE, &nwsp->nws_swi_cookie);
1276 	if (error)
1277 		panic("%s: swi_add %d", __func__, error);
1278 	pc->pc_netisr = nwsp->nws_intr_event;
1279 	if (netisr_bindthreads) {
1280 		error = intr_event_bind(nwsp->nws_intr_event, cpuid);
1281 		if (error != 0)
1282 			printf("%s: cpu %u: intr_event_bind: %d", __func__,
1283 			    cpuid, error);
1284 	}
1285 	NETISR_WLOCK();
1286 	nws_array[nws_count] = nwsp->nws_cpu;
1287 	nws_count++;
1288 	NETISR_WUNLOCK();
1289 }
1290 
1291 /*
1292  * Initialize the netisr subsystem.  We rely on BSS and static initialization
1293  * of most fields in global data structures.
1294  *
1295  * Start a worker thread for the boot CPU so that we can support network
1296  * traffic immediately in case the network stack is used before additional
1297  * CPUs are started (for example, diskless boot).
1298  */
1299 static void
1300 netisr_init(void *arg)
1301 {
1302 	struct pcpu *pc;
1303 
1304 	NETISR_LOCK_INIT();
1305 	if (netisr_maxthreads == 0 || netisr_maxthreads < -1 )
1306 		netisr_maxthreads = 1;		/* default behavior */
1307 	else if (netisr_maxthreads == -1)
1308 		netisr_maxthreads = mp_ncpus;	/* use max cpus */
1309 	if (netisr_maxthreads > mp_ncpus) {
1310 		printf("netisr_init: forcing maxthreads from %d to %d\n",
1311 		    netisr_maxthreads, mp_ncpus);
1312 		netisr_maxthreads = mp_ncpus;
1313 	}
1314 	if (netisr_defaultqlimit > netisr_maxqlimit) {
1315 		printf("netisr_init: forcing defaultqlimit from %d to %d\n",
1316 		    netisr_defaultqlimit, netisr_maxqlimit);
1317 		netisr_defaultqlimit = netisr_maxqlimit;
1318 	}
1319 #ifdef DEVICE_POLLING
1320 	/*
1321 	 * The device polling code is not yet aware of how to deal with
1322 	 * multiple netisr threads, so for the time being compiling in device
1323 	 * polling disables parallel netisr workers.
1324 	 */
1325 	if (netisr_maxthreads != 1 || netisr_bindthreads != 0) {
1326 		printf("netisr_init: forcing maxthreads to 1 and "
1327 		    "bindthreads to 0 for device polling\n");
1328 		netisr_maxthreads = 1;
1329 		netisr_bindthreads = 0;
1330 	}
1331 #endif
1332 
1333 #ifdef EARLY_AP_STARTUP
1334 	STAILQ_FOREACH(pc, &cpuhead, pc_allcpu) {
1335 		if (nws_count >= netisr_maxthreads)
1336 			break;
1337 		netisr_start_swi(pc->pc_cpuid, pc);
1338 	}
1339 #else
1340 	pc = get_pcpu();
1341 	netisr_start_swi(pc->pc_cpuid, pc);
1342 #endif
1343 }
1344 SYSINIT(netisr_init, SI_SUB_SOFTINTR, SI_ORDER_FIRST, netisr_init, NULL);
1345 
1346 #ifndef EARLY_AP_STARTUP
1347 /*
1348  * Start worker threads for additional CPUs.  No attempt to gracefully handle
1349  * work reassignment, we don't yet support dynamic reconfiguration.
1350  */
1351 static void
1352 netisr_start(void *arg)
1353 {
1354 	struct pcpu *pc;
1355 
1356 	STAILQ_FOREACH(pc, &cpuhead, pc_allcpu) {
1357 		if (nws_count >= netisr_maxthreads)
1358 			break;
1359 		/* Worker will already be present for boot CPU. */
1360 		if (pc->pc_netisr != NULL)
1361 			continue;
1362 		netisr_start_swi(pc->pc_cpuid, pc);
1363 	}
1364 }
1365 SYSINIT(netisr_start, SI_SUB_SMP, SI_ORDER_MIDDLE, netisr_start, NULL);
1366 #endif
1367 
1368 /*
1369  * Sysctl monitoring for netisr: query a list of registered protocols.
1370  */
1371 static int
1372 sysctl_netisr_proto(SYSCTL_HANDLER_ARGS)
1373 {
1374 	struct rm_priotracker tracker;
1375 	struct sysctl_netisr_proto *snpp, *snp_array;
1376 	struct netisr_proto *npp;
1377 	u_int counter, proto;
1378 	int error;
1379 
1380 	if (req->newptr != NULL)
1381 		return (EINVAL);
1382 	snp_array = malloc(sizeof(*snp_array) * NETISR_MAXPROT, M_TEMP,
1383 	    M_ZERO | M_WAITOK);
1384 	counter = 0;
1385 	NETISR_RLOCK(&tracker);
1386 	for (proto = 0; proto < NETISR_MAXPROT; proto++) {
1387 		npp = &netisr_proto[proto];
1388 		if (npp->np_name == NULL)
1389 			continue;
1390 		snpp = &snp_array[counter];
1391 		snpp->snp_version = sizeof(*snpp);
1392 		strlcpy(snpp->snp_name, npp->np_name, NETISR_NAMEMAXLEN);
1393 		snpp->snp_proto = proto;
1394 		snpp->snp_qlimit = npp->np_qlimit;
1395 		snpp->snp_policy = npp->np_policy;
1396 		snpp->snp_dispatch = npp->np_dispatch;
1397 		if (npp->np_m2flow != NULL)
1398 			snpp->snp_flags |= NETISR_SNP_FLAGS_M2FLOW;
1399 		if (npp->np_m2cpuid != NULL)
1400 			snpp->snp_flags |= NETISR_SNP_FLAGS_M2CPUID;
1401 		if (npp->np_drainedcpu != NULL)
1402 			snpp->snp_flags |= NETISR_SNP_FLAGS_DRAINEDCPU;
1403 		counter++;
1404 	}
1405 	NETISR_RUNLOCK(&tracker);
1406 	KASSERT(counter <= NETISR_MAXPROT,
1407 	    ("sysctl_netisr_proto: counter too big (%d)", counter));
1408 	error = SYSCTL_OUT(req, snp_array, sizeof(*snp_array) * counter);
1409 	free(snp_array, M_TEMP);
1410 	return (error);
1411 }
1412 
1413 SYSCTL_PROC(_net_isr, OID_AUTO, proto,
1414     CTLFLAG_RD|CTLTYPE_STRUCT|CTLFLAG_MPSAFE, 0, 0, sysctl_netisr_proto,
1415     "S,sysctl_netisr_proto",
1416     "Return list of protocols registered with netisr");
1417 
1418 /*
1419  * Sysctl monitoring for netisr: query a list of workstreams.
1420  */
1421 static int
1422 sysctl_netisr_workstream(SYSCTL_HANDLER_ARGS)
1423 {
1424 	struct rm_priotracker tracker;
1425 	struct sysctl_netisr_workstream *snwsp, *snws_array;
1426 	struct netisr_workstream *nwsp;
1427 	u_int counter, cpuid;
1428 	int error;
1429 
1430 	if (req->newptr != NULL)
1431 		return (EINVAL);
1432 	snws_array = malloc(sizeof(*snws_array) * MAXCPU, M_TEMP,
1433 	    M_ZERO | M_WAITOK);
1434 	counter = 0;
1435 	NETISR_RLOCK(&tracker);
1436 	CPU_FOREACH(cpuid) {
1437 		nwsp = DPCPU_ID_PTR(cpuid, nws);
1438 		if (nwsp->nws_intr_event == NULL)
1439 			continue;
1440 		NWS_LOCK(nwsp);
1441 		snwsp = &snws_array[counter];
1442 		snwsp->snws_version = sizeof(*snwsp);
1443 
1444 		/*
1445 		 * For now, we equate workstream IDs and CPU IDs in the
1446 		 * kernel, but expose them independently to userspace in case
1447 		 * that assumption changes in the future.
1448 		 */
1449 		snwsp->snws_wsid = cpuid;
1450 		snwsp->snws_cpu = cpuid;
1451 		if (nwsp->nws_intr_event != NULL)
1452 			snwsp->snws_flags |= NETISR_SNWS_FLAGS_INTR;
1453 		NWS_UNLOCK(nwsp);
1454 		counter++;
1455 	}
1456 	NETISR_RUNLOCK(&tracker);
1457 	KASSERT(counter <= MAXCPU,
1458 	    ("sysctl_netisr_workstream: counter too big (%d)", counter));
1459 	error = SYSCTL_OUT(req, snws_array, sizeof(*snws_array) * counter);
1460 	free(snws_array, M_TEMP);
1461 	return (error);
1462 }
1463 
1464 SYSCTL_PROC(_net_isr, OID_AUTO, workstream,
1465     CTLFLAG_RD|CTLTYPE_STRUCT|CTLFLAG_MPSAFE, 0, 0, sysctl_netisr_workstream,
1466     "S,sysctl_netisr_workstream",
1467     "Return list of workstreams implemented by netisr");
1468 
1469 /*
1470  * Sysctl monitoring for netisr: query per-protocol data across all
1471  * workstreams.
1472  */
1473 static int
1474 sysctl_netisr_work(SYSCTL_HANDLER_ARGS)
1475 {
1476 	struct rm_priotracker tracker;
1477 	struct sysctl_netisr_work *snwp, *snw_array;
1478 	struct netisr_workstream *nwsp;
1479 	struct netisr_proto *npp;
1480 	struct netisr_work *nwp;
1481 	u_int counter, cpuid, proto;
1482 	int error;
1483 
1484 	if (req->newptr != NULL)
1485 		return (EINVAL);
1486 	snw_array = malloc(sizeof(*snw_array) * MAXCPU * NETISR_MAXPROT,
1487 	    M_TEMP, M_ZERO | M_WAITOK);
1488 	counter = 0;
1489 	NETISR_RLOCK(&tracker);
1490 	CPU_FOREACH(cpuid) {
1491 		nwsp = DPCPU_ID_PTR(cpuid, nws);
1492 		if (nwsp->nws_intr_event == NULL)
1493 			continue;
1494 		NWS_LOCK(nwsp);
1495 		for (proto = 0; proto < NETISR_MAXPROT; proto++) {
1496 			npp = &netisr_proto[proto];
1497 			if (npp->np_name == NULL)
1498 				continue;
1499 			nwp = &nwsp->nws_work[proto];
1500 			snwp = &snw_array[counter];
1501 			snwp->snw_version = sizeof(*snwp);
1502 			snwp->snw_wsid = cpuid;		/* See comment above. */
1503 			snwp->snw_proto = proto;
1504 			snwp->snw_len = nwp->nw_len;
1505 			snwp->snw_watermark = nwp->nw_watermark;
1506 			snwp->snw_dispatched = nwp->nw_dispatched;
1507 			snwp->snw_hybrid_dispatched =
1508 			    nwp->nw_hybrid_dispatched;
1509 			snwp->snw_qdrops = nwp->nw_qdrops;
1510 			snwp->snw_queued = nwp->nw_queued;
1511 			snwp->snw_handled = nwp->nw_handled;
1512 			counter++;
1513 		}
1514 		NWS_UNLOCK(nwsp);
1515 	}
1516 	KASSERT(counter <= MAXCPU * NETISR_MAXPROT,
1517 	    ("sysctl_netisr_work: counter too big (%d)", counter));
1518 	NETISR_RUNLOCK(&tracker);
1519 	error = SYSCTL_OUT(req, snw_array, sizeof(*snw_array) * counter);
1520 	free(snw_array, M_TEMP);
1521 	return (error);
1522 }
1523 
1524 SYSCTL_PROC(_net_isr, OID_AUTO, work,
1525     CTLFLAG_RD|CTLTYPE_STRUCT|CTLFLAG_MPSAFE, 0, 0, sysctl_netisr_work,
1526     "S,sysctl_netisr_work",
1527     "Return list of per-workstream, per-protocol work in netisr");
1528 
1529 #ifdef DDB
1530 DB_SHOW_COMMAND(netisr, db_show_netisr)
1531 {
1532 	struct netisr_workstream *nwsp;
1533 	struct netisr_work *nwp;
1534 	int first, proto;
1535 	u_int cpuid;
1536 
1537 	db_printf("%3s %6s %5s %5s %5s %8s %8s %8s %8s\n", "CPU", "Proto",
1538 	    "Len", "WMark", "Max", "Disp", "HDisp", "Drop", "Queue");
1539 	CPU_FOREACH(cpuid) {
1540 		nwsp = DPCPU_ID_PTR(cpuid, nws);
1541 		if (nwsp->nws_intr_event == NULL)
1542 			continue;
1543 		first = 1;
1544 		for (proto = 0; proto < NETISR_MAXPROT; proto++) {
1545 			if (netisr_proto[proto].np_handler == NULL)
1546 				continue;
1547 			nwp = &nwsp->nws_work[proto];
1548 			if (first) {
1549 				db_printf("%3d ", cpuid);
1550 				first = 0;
1551 			} else
1552 				db_printf("%3s ", "");
1553 			db_printf(
1554 			    "%6s %5d %5d %5d %8ju %8ju %8ju %8ju\n",
1555 			    netisr_proto[proto].np_name, nwp->nw_len,
1556 			    nwp->nw_watermark, nwp->nw_qlimit,
1557 			    nwp->nw_dispatched, nwp->nw_hybrid_dispatched,
1558 			    nwp->nw_qdrops, nwp->nw_queued);
1559 		}
1560 	}
1561 }
1562 #endif
1563