xref: /freebsd/sys/net/netisr.c (revision aa79fe245de7616cda41b69a296a5ce209c95c45)
1 /*-
2  * Copyright (c) 2007-2009 Robert N. M. Watson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  */
26 
27 #include <sys/cdefs.h>
28 __FBSDID("$FreeBSD$");
29 
30 /*
31  * netisr is a packet dispatch service, allowing synchronous (directly
32  * dispatched) and asynchronous (deferred dispatch) processing of packets by
33  * registered protocol handlers.  Callers pass a protocol identifier and
34  * packet to netisr, along with a direct dispatch hint, and work will either
35  * be immediately processed with the registered handler, or passed to a
36  * kernel software interrupt (SWI) thread for deferred dispatch.  Callers
37  * will generally select one or the other based on:
38  *
39  * - Might directly dispatching a netisr handler lead to code reentrance or
40  *   lock recursion, such as entering the socket code from the socket code.
41  * - Might directly dispatching a netisr handler lead to recursive
42  *   processing, such as when decapsulating several wrapped layers of tunnel
43  *   information (IPSEC within IPSEC within ...).
44  *
45  * Maintaining ordering for protocol streams is a critical design concern.
46  * Enforcing ordering limits the opportunity for concurrency, but maintains
47  * the strong ordering requirements found in some protocols, such as TCP.  Of
48  * related concern is CPU affinity--it is desirable to process all data
49  * associated with a particular stream on the same CPU over time in order to
50  * avoid acquiring locks associated with the connection on different CPUs,
51  * keep connection data in one cache, and to generally encourage associated
52  * user threads to live on the same CPU as the stream.  It's also desirable
53  * to avoid lock migration and contention where locks are associated with
54  * more than one flow.
55  *
56  * netisr supports several policy variations, represented by the
57  * NETISR_POLICY_* constants, allowing protocols to play a varying role in
58  * identifying flows, assigning work to CPUs, etc.  These are described in
59  * detail in netisr.h.
60  */
61 
62 #include "opt_ddb.h"
63 #include "opt_device_polling.h"
64 
65 #include <sys/param.h>
66 #include <sys/bus.h>
67 #include <sys/kernel.h>
68 #include <sys/kthread.h>
69 #include <sys/interrupt.h>
70 #include <sys/lock.h>
71 #include <sys/mbuf.h>
72 #include <sys/mutex.h>
73 #include <sys/proc.h>
74 #include <sys/rmlock.h>
75 #include <sys/sched.h>
76 #include <sys/smp.h>
77 #include <sys/socket.h>
78 #include <sys/sysctl.h>
79 #include <sys/systm.h>
80 #include <sys/vimage.h>
81 
82 #ifdef DDB
83 #include <ddb/ddb.h>
84 #endif
85 
86 #include <net/if.h>
87 #include <net/if_var.h>
88 #include <net/netisr.h>
89 
90 /*-
91  * Synchronize use and modification of the registered netisr data structures;
92  * acquire a read lock while modifying the set of registered protocols to
93  * prevent partially registered or unregistered protocols from being run.
94  *
95  * The following data structures and fields are protected by this lock:
96  *
97  * - The np array, including all fields of struct netisr_proto.
98  * - The nws array, including all fields of struct netisr_worker.
99  * - The nws_array array.
100  *
101  * Note: the NETISR_LOCKING define controls whether read locks are acquired
102  * in packet processing paths requiring netisr registration stability.  This
103  * is disabled by default as it can lead to a measurable performance
104  * degradation even with rmlocks (3%-6% for loopback ping-pong traffic), and
105  * because netisr registration and unregistration is extremely rare at
106  * runtime.  If it becomes more common, this decision should be revisited.
107  *
108  * XXXRW: rmlocks don't support assertions.
109  */
110 static struct rmlock	netisr_rmlock;
111 #define	NETISR_LOCK_INIT()	rm_init_flags(&netisr_rmlock, "netisr", \
112 				    RM_NOWITNESS)
113 #define	NETISR_LOCK_ASSERT()
114 #define	NETISR_RLOCK(tracker)	rm_rlock(&netisr_rmlock, (tracker))
115 #define	NETISR_RUNLOCK(tracker)	rm_runlock(&netisr_rmlock, (tracker))
116 #define	NETISR_WLOCK()		rm_wlock(&netisr_rmlock)
117 #define	NETISR_WUNLOCK()	rm_wunlock(&netisr_rmlock)
118 /* #define	NETISR_LOCKING */
119 
120 SYSCTL_NODE(_net, OID_AUTO, isr, CTLFLAG_RW, 0, "netisr");
121 
122 /*-
123  * Three direct dispatch policies are supported:
124  *
125  * - Always defer: all work is scheduled for a netisr, regardless of context.
126  *   (!direct)
127  *
128  * - Hybrid: if the executing context allows direct dispatch, and we're
129  *   running on the CPU the work would be done on, then direct dispatch if it
130  *   wouldn't violate ordering constraints on the workstream.
131  *   (direct && !direct_force)
132  *
133  * - Always direct: if the executing context allows direct dispatch, always
134  *   direct dispatch.  (direct && direct_force)
135  *
136  * Notice that changing the global policy could lead to short periods of
137  * misordered processing, but this is considered acceptable as compared to
138  * the complexity of enforcing ordering during policy changes.
139  */
140 static int	netisr_direct_force = 1;	/* Always direct dispatch. */
141 TUNABLE_INT("net.isr.direct_force", &netisr_direct_force);
142 SYSCTL_INT(_net_isr, OID_AUTO, direct_force, CTLFLAG_RW,
143     &netisr_direct_force, 0, "Force direct dispatch");
144 
145 static int	netisr_direct = 1;	/* Enable direct dispatch. */
146 TUNABLE_INT("net.isr.direct", &netisr_direct);
147 SYSCTL_INT(_net_isr, OID_AUTO, direct, CTLFLAG_RW,
148     &netisr_direct, 0, "Enable direct dispatch");
149 
150 /*
151  * Allow the administrator to limit the number of threads (CPUs) to use for
152  * netisr.  We don't check netisr_maxthreads before creating the thread for
153  * CPU 0, so in practice we ignore values <= 1.  This must be set at boot.
154  * We will create at most one thread per CPU.
155  */
156 static int	netisr_maxthreads = 1;		/* Max number of threads. */
157 TUNABLE_INT("net.isr.maxthreads", &netisr_maxthreads);
158 SYSCTL_INT(_net_isr, OID_AUTO, maxthreads, CTLFLAG_RD,
159     &netisr_maxthreads, 0,
160     "Use at most this many CPUs for netisr processing");
161 
162 static int	netisr_bindthreads = 0;		/* Bind threads to CPUs. */
163 TUNABLE_INT("net.isr.bindthreads", &netisr_bindthreads);
164 SYSCTL_INT(_net_isr, OID_AUTO, bindthreads, CTLFLAG_RD,
165     &netisr_bindthreads, 0, "Bind netisr threads to CPUs.");
166 
167 /*
168  * Limit per-workstream queues to at most net.isr.maxqlimit, both for initial
169  * configuration and later modification using netisr_setqlimit().
170  */
171 #define	NETISR_DEFAULT_MAXQLIMIT	10240
172 static u_int	netisr_maxqlimit = NETISR_DEFAULT_MAXQLIMIT;
173 TUNABLE_INT("net.isr.maxqlimit", &netisr_maxqlimit);
174 SYSCTL_INT(_net_isr, OID_AUTO, maxqlimit, CTLFLAG_RD,
175     &netisr_maxqlimit, 0,
176     "Maximum netisr per-protocol, per-CPU queue depth.");
177 
178 /*
179  * The default per-workstream queue limit for protocols that don't initialize
180  * the nh_qlimit field of their struct netisr_handler.  If this is set above
181  * netisr_maxqlimit, we truncate it to the maximum during boot.
182  */
183 #define	NETISR_DEFAULT_DEFAULTQLIMIT	256
184 static u_int	netisr_defaultqlimit = NETISR_DEFAULT_DEFAULTQLIMIT;
185 TUNABLE_INT("net.isr.defaultqlimit", &netisr_defaultqlimit);
186 SYSCTL_INT(_net_isr, OID_AUTO, defaultqlimit, CTLFLAG_RD,
187     &netisr_defaultqlimit, 0,
188     "Default netisr per-protocol, per-CPU queue limit if not set by protocol");
189 
190 /*
191  * Each protocol is described by a struct netisr_proto, which holds all
192  * global per-protocol information.  This data structure is set up by
193  * netisr_register(), and derived from the public struct netisr_handler.
194  */
195 struct netisr_proto {
196 	const char	*np_name;	/* Character string protocol name. */
197 	netisr_handler_t *np_handler;	/* Protocol handler. */
198 	netisr_m2flow_t	*np_m2flow;	/* Query flow for untagged packet. */
199 	netisr_m2cpuid_t *np_m2cpuid;	/* Query CPU to process packet on. */
200 	netisr_drainedcpu_t *np_drainedcpu; /* Callback when drained a queue. */
201 	u_int		 np_qlimit;	/* Maximum per-CPU queue depth. */
202 	u_int		 np_policy;	/* Work placement policy. */
203 };
204 
205 #define	NETISR_MAXPROT		16		/* Compile-time limit. */
206 
207 /*
208  * The np array describes all registered protocols, indexed by protocol
209  * number.
210  */
211 static struct netisr_proto	np[NETISR_MAXPROT];
212 
213 /*
214  * Protocol-specific work for each workstream is described by struct
215  * netisr_work.  Each work descriptor consists of an mbuf queue and
216  * statistics.
217  */
218 struct netisr_work {
219 	/*
220 	 * Packet queue, linked by m_nextpkt.
221 	 */
222 	struct mbuf	*nw_head;
223 	struct mbuf	*nw_tail;
224 	u_int		 nw_len;
225 	u_int		 nw_qlimit;
226 	u_int		 nw_watermark;
227 
228 	/*
229 	 * Statistics -- written unlocked, but mostly from curcpu.
230 	 */
231 	u_int64_t	 nw_dispatched; /* Number of direct dispatches. */
232 	u_int64_t	 nw_hybrid_dispatched; /* "" hybrid dispatches. */
233 	u_int64_t	 nw_qdrops;	/* "" drops. */
234 	u_int64_t	 nw_queued;	/* "" enqueues. */
235 	u_int64_t	 nw_handled;	/* "" handled in worker. */
236 };
237 
238 /*
239  * Workstreams hold a set of ordered work across each protocol, and are
240  * described by netisr_workstream.  Each workstream is associated with a
241  * worker thread, which in turn is pinned to a CPU.  Work associated with a
242  * workstream can be processd in other threads during direct dispatch;
243  * concurrent processing is prevented by the NWS_RUNNING flag, which
244  * indicates that a thread is already processing the work queue.
245  */
246 struct netisr_workstream {
247 	struct intr_event *nws_intr_event;	/* Handler for stream. */
248 	void		*nws_swi_cookie;	/* swi(9) cookie for stream. */
249 	struct mtx	 nws_mtx;		/* Synchronize work. */
250 	u_int		 nws_cpu;		/* CPU pinning. */
251 	u_int		 nws_flags;		/* Wakeup flags. */
252 	u_int		 nws_pendingbits;	/* Scheduled protocols. */
253 
254 	/*
255 	 * Each protocol has per-workstream data.
256 	 */
257 	struct netisr_work	nws_work[NETISR_MAXPROT];
258 } __aligned(CACHE_LINE_SIZE);
259 
260 /*
261  * Per-CPU workstream data, indexed by CPU ID.
262  */
263 static struct netisr_workstream		 nws[MAXCPU];
264 
265 /*
266  * Map contiguous values between 0 and nws_count into CPU IDs appropriate for
267  * indexing the nws[] array.  This allows constructions of the form
268  * nws[nws_array(arbitraryvalue % nws_count)].
269  */
270 static u_int				 nws_array[MAXCPU];
271 
272 /*
273  * Number of registered workstreams.  Will be at most the number of running
274  * CPUs once fully started.
275  */
276 static u_int				 nws_count;
277 SYSCTL_INT(_net_isr, OID_AUTO, numthreads, CTLFLAG_RD,
278     &nws_count, 0, "Number of extant netisr threads.");
279 
280 /*
281  * Per-workstream flags.
282  */
283 #define	NWS_RUNNING	0x00000001	/* Currently running in a thread. */
284 #define	NWS_DISPATCHING	0x00000002	/* Currently being direct-dispatched. */
285 #define	NWS_SCHEDULED	0x00000004	/* Signal issued. */
286 
287 /*
288  * Synchronization for each workstream: a mutex protects all mutable fields
289  * in each stream, including per-protocol state (mbuf queues).  The SWI is
290  * woken up if asynchronous dispatch is required.
291  */
292 #define	NWS_LOCK(s)		mtx_lock(&(s)->nws_mtx)
293 #define	NWS_LOCK_ASSERT(s)	mtx_assert(&(s)->nws_mtx, MA_OWNED)
294 #define	NWS_UNLOCK(s)		mtx_unlock(&(s)->nws_mtx)
295 #define	NWS_SIGNAL(s)		swi_sched((s)->nws_swi_cookie, 0)
296 
297 /*
298  * Utility routines for protocols that implement their own mapping of flows
299  * to CPUs.
300  */
301 u_int
302 netisr_get_cpucount(void)
303 {
304 
305 	return (nws_count);
306 }
307 
308 u_int
309 netisr_get_cpuid(u_int cpunumber)
310 {
311 
312 	KASSERT(cpunumber < nws_count, ("%s: %u > %u", __func__, cpunumber,
313 	    nws_count));
314 
315 	return (nws_array[cpunumber]);
316 }
317 
318 /*
319  * The default implementation of -> CPU ID mapping.
320  *
321  * Non-static so that protocols can use it to map their own work to specific
322  * CPUs in a manner consistent to netisr for affinity purposes.
323  */
324 u_int
325 netisr_default_flow2cpu(u_int flowid)
326 {
327 
328 	return (nws_array[flowid % nws_count]);
329 }
330 
331 /*
332  * Register a new netisr handler, which requires initializing per-protocol
333  * fields for each workstream.  All netisr work is briefly suspended while
334  * the protocol is installed.
335  */
336 void
337 netisr_register(const struct netisr_handler *nhp)
338 {
339 	struct netisr_work *npwp;
340 	const char *name;
341 	u_int i, proto;
342 
343 	proto = nhp->nh_proto;
344 	name = nhp->nh_name;
345 
346 	/*
347 	 * Test that the requested registration is valid.
348 	 */
349 	KASSERT(nhp->nh_name != NULL,
350 	    ("%s: nh_name NULL for %u", __func__, proto));
351 	KASSERT(nhp->nh_handler != NULL,
352 	    ("%s: nh_handler NULL for %s", __func__, name));
353 	KASSERT(nhp->nh_policy == NETISR_POLICY_SOURCE ||
354 	    nhp->nh_policy == NETISR_POLICY_FLOW ||
355 	    nhp->nh_policy == NETISR_POLICY_CPU,
356 	    ("%s: unsupported nh_policy %u for %s", __func__,
357 	    nhp->nh_policy, name));
358 	KASSERT(nhp->nh_policy == NETISR_POLICY_FLOW ||
359 	    nhp->nh_m2flow == NULL,
360 	    ("%s: nh_policy != FLOW but m2flow defined for %s", __func__,
361 	    name));
362 	KASSERT(nhp->nh_policy == NETISR_POLICY_CPU || nhp->nh_m2cpuid == NULL,
363 	    ("%s: nh_policy != CPU but m2cpuid defined for %s", __func__,
364 	    name));
365 	KASSERT(nhp->nh_policy != NETISR_POLICY_CPU || nhp->nh_m2cpuid != NULL,
366 	    ("%s: nh_policy == CPU but m2cpuid not defined for %s", __func__,
367 	    name));
368 	KASSERT(proto < NETISR_MAXPROT,
369 	    ("%s(%u, %s): protocol too big", __func__, proto, name));
370 
371 	/*
372 	 * Test that no existing registration exists for this protocol.
373 	 */
374 	NETISR_WLOCK();
375 	KASSERT(np[proto].np_name == NULL,
376 	    ("%s(%u, %s): name present", __func__, proto, name));
377 	KASSERT(np[proto].np_handler == NULL,
378 	    ("%s(%u, %s): handler present", __func__, proto, name));
379 
380 	np[proto].np_name = name;
381 	np[proto].np_handler = nhp->nh_handler;
382 	np[proto].np_m2flow = nhp->nh_m2flow;
383 	np[proto].np_m2cpuid = nhp->nh_m2cpuid;
384 	np[proto].np_drainedcpu = nhp->nh_drainedcpu;
385 	if (nhp->nh_qlimit == 0)
386 		np[proto].np_qlimit = netisr_defaultqlimit;
387 	else if (nhp->nh_qlimit > netisr_maxqlimit) {
388 		printf("%s: %s requested queue limit %u capped to "
389 		    "net.isr.maxqlimit %u\n", __func__, name, nhp->nh_qlimit,
390 		    netisr_maxqlimit);
391 		np[proto].np_qlimit = netisr_maxqlimit;
392 	} else
393 		np[proto].np_qlimit = nhp->nh_qlimit;
394 	np[proto].np_policy = nhp->nh_policy;
395 	for (i = 0; i < MAXCPU; i++) {
396 		npwp = &nws[i].nws_work[proto];
397 		bzero(npwp, sizeof(*npwp));
398 		npwp->nw_qlimit = np[proto].np_qlimit;
399 	}
400 	NETISR_WUNLOCK();
401 }
402 
403 /*
404  * Clear drop counters across all workstreams for a protocol.
405  */
406 void
407 netisr_clearqdrops(const struct netisr_handler *nhp)
408 {
409 	struct netisr_work *npwp;
410 #ifdef INVARIANTS
411 	const char *name;
412 #endif
413 	u_int i, proto;
414 
415 	proto = nhp->nh_proto;
416 #ifdef INVARIANTS
417 	name = nhp->nh_name;
418 #endif
419 	KASSERT(proto < NETISR_MAXPROT,
420 	    ("%s(%u): protocol too big for %s", __func__, proto, name));
421 
422 	NETISR_WLOCK();
423 	KASSERT(np[proto].np_handler != NULL,
424 	    ("%s(%u): protocol not registered for %s", __func__, proto,
425 	    name));
426 
427 	for (i = 0; i < MAXCPU; i++) {
428 		npwp = &nws[i].nws_work[proto];
429 		npwp->nw_qdrops = 0;
430 	}
431 	NETISR_WUNLOCK();
432 }
433 
434 /*
435  * Query the current drop counters across all workstreams for a protocol.
436  */
437 void
438 netisr_getqdrops(const struct netisr_handler *nhp, u_int64_t *qdropp)
439 {
440 	struct netisr_work *npwp;
441 	struct rm_priotracker tracker;
442 #ifdef INVARIANTS
443 	const char *name;
444 #endif
445 	u_int i, proto;
446 
447 	*qdropp = 0;
448 	proto = nhp->nh_proto;
449 #ifdef INVARIANTS
450 	name = nhp->nh_name;
451 #endif
452 	KASSERT(proto < NETISR_MAXPROT,
453 	    ("%s(%u): protocol too big for %s", __func__, proto, name));
454 
455 	NETISR_RLOCK(&tracker);
456 	KASSERT(np[proto].np_handler != NULL,
457 	    ("%s(%u): protocol not registered for %s", __func__, proto,
458 	    name));
459 
460 	for (i = 0; i < MAXCPU; i++) {
461 		npwp = &nws[i].nws_work[proto];
462 		*qdropp += npwp->nw_qdrops;
463 	}
464 	NETISR_RUNLOCK(&tracker);
465 }
466 
467 /*
468  * Query the current queue limit for per-workstream queues for a protocol.
469  */
470 void
471 netisr_getqlimit(const struct netisr_handler *nhp, u_int *qlimitp)
472 {
473 	struct rm_priotracker tracker;
474 #ifdef INVARIANTS
475 	const char *name;
476 #endif
477 	u_int proto;
478 
479 	proto = nhp->nh_proto;
480 #ifdef INVARIANTS
481 	name = nhp->nh_name;
482 #endif
483 	KASSERT(proto < NETISR_MAXPROT,
484 	    ("%s(%u): protocol too big for %s", __func__, proto, name));
485 
486 	NETISR_RLOCK(&tracker);
487 	KASSERT(np[proto].np_handler != NULL,
488 	    ("%s(%u): protocol not registered for %s", __func__, proto,
489 	    name));
490 	*qlimitp = np[proto].np_qlimit;
491 	NETISR_RUNLOCK(&tracker);
492 }
493 
494 /*
495  * Update the queue limit across per-workstream queues for a protocol.  We
496  * simply change the limits, and don't drain overflowed packets as they will
497  * (hopefully) take care of themselves shortly.
498  */
499 int
500 netisr_setqlimit(const struct netisr_handler *nhp, u_int qlimit)
501 {
502 	struct netisr_work *npwp;
503 #ifdef INVARIANTS
504 	const char *name;
505 #endif
506 	u_int i, proto;
507 
508 	if (qlimit > netisr_maxqlimit)
509 		return (EINVAL);
510 
511 	proto = nhp->nh_proto;
512 #ifdef INVARIANTS
513 	name = nhp->nh_name;
514 #endif
515 	KASSERT(proto < NETISR_MAXPROT,
516 	    ("%s(%u): protocol too big for %s", __func__, proto, name));
517 
518 	NETISR_WLOCK();
519 	KASSERT(np[proto].np_handler != NULL,
520 	    ("%s(%u): protocol not registered for %s", __func__, proto,
521 	    name));
522 
523 	np[proto].np_qlimit = qlimit;
524 	for (i = 0; i < MAXCPU; i++) {
525 		npwp = &nws[i].nws_work[proto];
526 		npwp->nw_qlimit = qlimit;
527 	}
528 	NETISR_WUNLOCK();
529 	return (0);
530 }
531 
532 /*
533  * Drain all packets currently held in a particular protocol work queue.
534  */
535 static void
536 netisr_drain_proto(struct netisr_work *npwp)
537 {
538 	struct mbuf *m;
539 
540 	/*
541 	 * We would assert the lock on the workstream but it's not passed in.
542 	 */
543 	while ((m = npwp->nw_head) != NULL) {
544 		npwp->nw_head = m->m_nextpkt;
545 		m->m_nextpkt = NULL;
546 		if (npwp->nw_head == NULL)
547 			npwp->nw_tail = NULL;
548 		npwp->nw_len--;
549 		m_freem(m);
550 	}
551 	KASSERT(npwp->nw_tail == NULL, ("%s: tail", __func__));
552 	KASSERT(npwp->nw_len == 0, ("%s: len", __func__));
553 }
554 
555 /*
556  * Remove the registration of a network protocol, which requires clearing
557  * per-protocol fields across all workstreams, including freeing all mbufs in
558  * the queues at time of unregister.  All work in netisr is briefly suspended
559  * while this takes place.
560  */
561 void
562 netisr_unregister(const struct netisr_handler *nhp)
563 {
564 	struct netisr_work *npwp;
565 #ifdef INVARIANTS
566 	const char *name;
567 #endif
568 	u_int i, proto;
569 
570 	proto = nhp->nh_proto;
571 #ifdef INVARIANTS
572 	name = nhp->nh_name;
573 #endif
574 	KASSERT(proto < NETISR_MAXPROT,
575 	    ("%s(%u): protocol too big for %s", __func__, proto, name));
576 
577 	NETISR_WLOCK();
578 	KASSERT(np[proto].np_handler != NULL,
579 	    ("%s(%u): protocol not registered for %s", __func__, proto,
580 	    name));
581 
582 	np[proto].np_name = NULL;
583 	np[proto].np_handler = NULL;
584 	np[proto].np_m2flow = NULL;
585 	np[proto].np_m2cpuid = NULL;
586 	np[proto].np_qlimit = 0;
587 	np[proto].np_policy = 0;
588 	for (i = 0; i < MAXCPU; i++) {
589 		npwp = &nws[i].nws_work[proto];
590 		netisr_drain_proto(npwp);
591 		bzero(npwp, sizeof(*npwp));
592 	}
593 	NETISR_WUNLOCK();
594 }
595 
596 /*
597  * Look up the workstream given a packet and source identifier.  Do this by
598  * checking the protocol's policy, and optionally call out to the protocol
599  * for assistance if required.
600  */
601 static struct mbuf *
602 netisr_select_cpuid(struct netisr_proto *npp, uintptr_t source,
603     struct mbuf *m, u_int *cpuidp)
604 {
605 	struct ifnet *ifp;
606 
607 	NETISR_LOCK_ASSERT();
608 
609 	/*
610 	 * In the event we have only one worker, shortcut and deliver to it
611 	 * without further ado.
612 	 */
613 	if (nws_count == 1) {
614 		*cpuidp = nws_array[0];
615 		return (m);
616 	}
617 
618 	/*
619 	 * What happens next depends on the policy selected by the protocol.
620 	 * If we want to support per-interface policies, we should do that
621 	 * here first.
622 	 */
623 	switch (npp->np_policy) {
624 	case NETISR_POLICY_CPU:
625 		return (npp->np_m2cpuid(m, source, cpuidp));
626 
627 	case NETISR_POLICY_FLOW:
628 		if (!(m->m_flags & M_FLOWID) && npp->np_m2flow != NULL) {
629 			m = npp->np_m2flow(m, source);
630 			if (m == NULL)
631 				return (NULL);
632 		}
633 		if (m->m_flags & M_FLOWID) {
634 			*cpuidp =
635 			    netisr_default_flow2cpu(m->m_pkthdr.flowid);
636 			return (m);
637 		}
638 		/* FALLTHROUGH */
639 
640 	case NETISR_POLICY_SOURCE:
641 		ifp = m->m_pkthdr.rcvif;
642 		if (ifp != NULL)
643 			*cpuidp = nws_array[(ifp->if_index + source) %
644 			    nws_count];
645 		else
646 			*cpuidp = nws_array[source % nws_count];
647 		return (m);
648 
649 	default:
650 		panic("%s: invalid policy %u for %s", __func__,
651 		    npp->np_policy, npp->np_name);
652 	}
653 }
654 
655 /*
656  * Process packets associated with a workstream and protocol.  For reasons of
657  * fairness, we process up to one complete netisr queue at a time, moving the
658  * queue to a stack-local queue for processing, but do not loop refreshing
659  * from the global queue.  The caller is responsible for deciding whether to
660  * loop, and for setting the NWS_RUNNING flag.  The passed workstream will be
661  * locked on entry and relocked before return, but will be released while
662  * processing.  The number of packets processed is returned.
663  */
664 static u_int
665 netisr_process_workstream_proto(struct netisr_workstream *nwsp, u_int proto)
666 {
667 	struct netisr_work local_npw, *npwp;
668 	u_int handled;
669 	struct mbuf *m;
670 
671 	NETISR_LOCK_ASSERT();
672 	NWS_LOCK_ASSERT(nwsp);
673 
674 	KASSERT(nwsp->nws_flags & NWS_RUNNING,
675 	    ("%s(%u): not running", __func__, proto));
676 	KASSERT(proto >= 0 && proto < NETISR_MAXPROT,
677 	    ("%s(%u): invalid proto\n", __func__, proto));
678 
679 	npwp = &nwsp->nws_work[proto];
680 	if (npwp->nw_len == 0)
681 		return (0);
682 
683 	/*
684 	 * Move the global work queue to a thread-local work queue.
685 	 *
686 	 * Notice that this means the effective maximum length of the queue
687 	 * is actually twice that of the maximum queue length specified in
688 	 * the protocol registration call.
689 	 */
690 	handled = npwp->nw_len;
691 	local_npw = *npwp;
692 	npwp->nw_head = NULL;
693 	npwp->nw_tail = NULL;
694 	npwp->nw_len = 0;
695 	nwsp->nws_pendingbits &= ~(1 << proto);
696 	NWS_UNLOCK(nwsp);
697 	while ((m = local_npw.nw_head) != NULL) {
698 		local_npw.nw_head = m->m_nextpkt;
699 		m->m_nextpkt = NULL;
700 		if (local_npw.nw_head == NULL)
701 			local_npw.nw_tail = NULL;
702 		local_npw.nw_len--;
703 		VNET_ASSERT(m->m_pkthdr.rcvif != NULL);
704 		CURVNET_SET(m->m_pkthdr.rcvif->if_vnet);
705 		np[proto].np_handler(m);
706 		CURVNET_RESTORE();
707 	}
708 	KASSERT(local_npw.nw_len == 0,
709 	    ("%s(%u): len %u", __func__, proto, local_npw.nw_len));
710 	if (np[proto].np_drainedcpu)
711 		np[proto].np_drainedcpu(nwsp->nws_cpu);
712 	NWS_LOCK(nwsp);
713 	npwp->nw_handled += handled;
714 	return (handled);
715 }
716 
717 /*
718  * SWI handler for netisr -- processes prackets in a set of workstreams that
719  * it owns, woken up by calls to NWS_SIGNAL().  If this workstream is already
720  * being direct dispatched, go back to sleep and wait for the dispatching
721  * thread to wake us up again.
722  */
723 static void
724 swi_net(void *arg)
725 {
726 #ifdef NETISR_LOCKING
727 	struct rm_priotracker tracker;
728 #endif
729 	struct netisr_workstream *nwsp;
730 	u_int bits, prot;
731 
732 	nwsp = arg;
733 
734 #ifdef DEVICE_POLLING
735 	KASSERT(nws_count == 1,
736 	    ("%s: device_polling but nws_count != 1", __func__));
737 	netisr_poll();
738 #endif
739 #ifdef NETISR_LOCKING
740 	NETISR_RLOCK(&tracker);
741 #endif
742 	NWS_LOCK(nwsp);
743 	KASSERT(!(nwsp->nws_flags & NWS_RUNNING), ("swi_net: running"));
744 	if (nwsp->nws_flags & NWS_DISPATCHING)
745 		goto out;
746 	nwsp->nws_flags |= NWS_RUNNING;
747 	nwsp->nws_flags &= ~NWS_SCHEDULED;
748 	while ((bits = nwsp->nws_pendingbits) != 0) {
749 		while ((prot = ffs(bits)) != 0) {
750 			prot--;
751 			bits &= ~(1 << prot);
752 			(void)netisr_process_workstream_proto(nwsp, prot);
753 		}
754 	}
755 	nwsp->nws_flags &= ~NWS_RUNNING;
756 out:
757 	NWS_UNLOCK(nwsp);
758 #ifdef NETISR_LOCKING
759 	NETISR_RUNLOCK(&tracker);
760 #endif
761 #ifdef DEVICE_POLLING
762 	netisr_pollmore();
763 #endif
764 }
765 
766 static int
767 netisr_queue_workstream(struct netisr_workstream *nwsp, u_int proto,
768     struct netisr_work *npwp, struct mbuf *m, int *dosignalp)
769 {
770 
771 	NWS_LOCK_ASSERT(nwsp);
772 
773 	*dosignalp = 0;
774 	if (npwp->nw_len < npwp->nw_qlimit) {
775 		m->m_nextpkt = NULL;
776 		if (npwp->nw_head == NULL) {
777 			npwp->nw_head = m;
778 			npwp->nw_tail = m;
779 		} else {
780 			npwp->nw_tail->m_nextpkt = m;
781 			npwp->nw_tail = m;
782 		}
783 		npwp->nw_len++;
784 		if (npwp->nw_len > npwp->nw_watermark)
785 			npwp->nw_watermark = npwp->nw_len;
786 		nwsp->nws_pendingbits |= (1 << proto);
787 		if (!(nwsp->nws_flags &
788 		    (NWS_RUNNING | NWS_DISPATCHING | NWS_SCHEDULED))) {
789 			nwsp->nws_flags |= NWS_SCHEDULED;
790 			*dosignalp = 1;	/* Defer until unlocked. */
791 		}
792 		npwp->nw_queued++;
793 		return (0);
794 	} else {
795 		npwp->nw_qdrops++;
796 		return (ENOBUFS);
797 	}
798 }
799 
800 static int
801 netisr_queue_internal(u_int proto, struct mbuf *m, u_int cpuid)
802 {
803 	struct netisr_workstream *nwsp;
804 	struct netisr_work *npwp;
805 	int dosignal, error;
806 
807 #ifdef NETISR_LOCKING
808 	NETISR_LOCK_ASSERT();
809 #endif
810 	KASSERT(cpuid < MAXCPU, ("%s: cpuid too big (%u, %u)", __func__,
811 	    cpuid, MAXCPU));
812 
813 	dosignal = 0;
814 	error = 0;
815 	nwsp = &nws[cpuid];
816 	npwp = &nwsp->nws_work[proto];
817 	NWS_LOCK(nwsp);
818 	error = netisr_queue_workstream(nwsp, proto, npwp, m, &dosignal);
819 	NWS_UNLOCK(nwsp);
820 	if (dosignal)
821 		NWS_SIGNAL(nwsp);
822 	return (error);
823 }
824 
825 int
826 netisr_queue_src(u_int proto, uintptr_t source, struct mbuf *m)
827 {
828 #ifdef NETISR_LOCKING
829 	struct rm_priotracker tracker;
830 #endif
831 	u_int cpuid;
832 	int error;
833 
834 	KASSERT(proto < NETISR_MAXPROT,
835 	    ("%s: invalid proto %u", __func__, proto));
836 
837 #ifdef NETISR_LOCKING
838 	NETISR_RLOCK(&tracker);
839 #endif
840 	KASSERT(np[proto].np_handler != NULL,
841 	    ("%s: invalid proto %u", __func__, proto));
842 
843 	m = netisr_select_cpuid(&np[proto], source, m, &cpuid);
844 	if (m != NULL)
845 		error = netisr_queue_internal(proto, m, cpuid);
846 	else
847 		error = ENOBUFS;
848 #ifdef NETISR_LOCKING
849 	NETISR_RUNLOCK(&tracker);
850 #endif
851 	return (error);
852 }
853 
854 int
855 netisr_queue(u_int proto, struct mbuf *m)
856 {
857 
858 	return (netisr_queue_src(proto, 0, m));
859 }
860 
861 /*
862  * Dispatch a packet for netisr processing, direct dispatch permitted by
863  * calling context.
864  */
865 int
866 netisr_dispatch_src(u_int proto, uintptr_t source, struct mbuf *m)
867 {
868 #ifdef NETISR_LOCKING
869 	struct rm_priotracker tracker;
870 #endif
871 	struct netisr_workstream *nwsp;
872 	struct netisr_work *npwp;
873 	int dosignal, error;
874 	u_int cpuid;
875 
876 	/*
877 	 * If direct dispatch is entirely disabled, fall back on queueing.
878 	 */
879 	if (!netisr_direct)
880 		return (netisr_queue_src(proto, source, m));
881 
882 	KASSERT(proto < NETISR_MAXPROT,
883 	    ("%s: invalid proto %u", __func__, proto));
884 #ifdef NETISR_LOCKING
885 	NETISR_RLOCK(&tracker);
886 #endif
887 	KASSERT(np[proto].np_handler != NULL,
888 	    ("%s: invalid proto %u", __func__, proto));
889 
890 	/*
891 	 * If direct dispatch is forced, then unconditionally dispatch
892 	 * without a formal CPU selection.  Borrow the current CPU's stats,
893 	 * even if there's no worker on it.  In this case we don't update
894 	 * nws_flags because all netisr processing will be source ordered due
895 	 * to always being forced to directly dispatch.
896 	 */
897 	if (netisr_direct_force) {
898 		nwsp = &nws[curcpu];
899 		npwp = &nwsp->nws_work[proto];
900 		npwp->nw_dispatched++;
901 		npwp->nw_handled++;
902 		np[proto].np_handler(m);
903 		error = 0;
904 		goto out_unlock;
905 	}
906 
907 	/*
908 	 * Otherwise, we execute in a hybrid mode where we will try to direct
909 	 * dispatch if we're on the right CPU and the netisr worker isn't
910 	 * already running.
911 	 */
912 	m = netisr_select_cpuid(&np[proto], source, m, &cpuid);
913 	if (m == NULL) {
914 		error = ENOBUFS;
915 		goto out_unlock;
916 	}
917 	sched_pin();
918 	if (cpuid != curcpu)
919 		goto queue_fallback;
920 	nwsp = &nws[cpuid];
921 	npwp = &nwsp->nws_work[proto];
922 
923 	/*-
924 	 * We are willing to direct dispatch only if three conditions hold:
925 	 *
926 	 * (1) The netisr worker isn't already running,
927 	 * (2) Another thread isn't already directly dispatching, and
928 	 * (3) The netisr hasn't already been woken up.
929 	 */
930 	NWS_LOCK(nwsp);
931 	if (nwsp->nws_flags & (NWS_RUNNING | NWS_DISPATCHING | NWS_SCHEDULED)) {
932 		error = netisr_queue_workstream(nwsp, proto, npwp, m,
933 		    &dosignal);
934 		NWS_UNLOCK(nws);
935 		if (dosignal)
936 			NWS_SIGNAL(nwsp);
937 		goto out_unpin;
938 	}
939 
940 	/*
941 	 * The current thread is now effectively the netisr worker, so set
942 	 * the dispatching flag to prevent concurrent processing of the
943 	 * stream from another thread (even the netisr worker), which could
944 	 * otherwise lead to effective misordering of the stream.
945 	 */
946 	nwsp->nws_flags |= NWS_DISPATCHING;
947 	NWS_UNLOCK(nwsp);
948 	np[proto].np_handler(m);
949 	NWS_LOCK(nwsp);
950 	nwsp->nws_flags &= ~NWS_DISPATCHING;
951 	npwp->nw_handled++;
952 	npwp->nw_hybrid_dispatched++;
953 
954 	/*
955 	 * If other work was enqueued by another thread while we were direct
956 	 * dispatching, we need to signal the netisr worker to do that work.
957 	 * In the future, we might want to do some of that work in the
958 	 * current thread, rather than trigger further context switches.  If
959 	 * so, we'll want to establish a reasonable bound on the work done in
960 	 * the "borrowed" context.
961 	 */
962 	if (nwsp->nws_pendingbits != 0) {
963 		nwsp->nws_flags |= NWS_SCHEDULED;
964 		dosignal = 1;
965 	} else
966 		dosignal = 0;
967 	NWS_UNLOCK(nwsp);
968 	if (dosignal)
969 		NWS_SIGNAL(nwsp);
970 	error = 0;
971 	goto out_unpin;
972 
973 queue_fallback:
974 	error = netisr_queue_internal(proto, m, cpuid);
975 out_unpin:
976 	sched_unpin();
977 out_unlock:
978 #ifdef NETISR_LOCKING
979 	NETISR_RUNLOCK(&tracker);
980 #endif
981 	return (error);
982 }
983 
984 int
985 netisr_dispatch(u_int proto, struct mbuf *m)
986 {
987 
988 	return (netisr_dispatch_src(proto, 0, m));
989 }
990 
991 #ifdef DEVICE_POLLING
992 /*
993  * Kernel polling borrows a netisr thread to run interface polling in; this
994  * function allows kernel polling to request that the netisr thread be
995  * scheduled even if no packets are pending for protocols.
996  */
997 void
998 netisr_sched_poll(void)
999 {
1000 	struct netisr_workstream *nwsp;
1001 
1002 	nwsp = &nws[nws_array[0]];
1003 	NWS_SIGNAL(nwsp);
1004 }
1005 #endif
1006 
1007 static void
1008 netisr_start_swi(u_int cpuid, struct pcpu *pc)
1009 {
1010 	char swiname[12];
1011 	struct netisr_workstream *nwsp;
1012 	int error;
1013 
1014 	nwsp = &nws[cpuid];
1015 	mtx_init(&nwsp->nws_mtx, "netisr_mtx", NULL, MTX_DEF);
1016 	nwsp->nws_cpu = cpuid;
1017 	snprintf(swiname, sizeof(swiname), "netisr %u", cpuid);
1018 	error = swi_add(&nwsp->nws_intr_event, swiname, swi_net, nwsp,
1019 	    SWI_NET, INTR_MPSAFE, &nwsp->nws_swi_cookie);
1020 	if (error)
1021 		panic("%s: swi_add %d", __func__, error);
1022 	pc->pc_netisr = nwsp->nws_intr_event;
1023 	if (netisr_bindthreads) {
1024 		error = intr_event_bind(nwsp->nws_intr_event, cpuid);
1025 		if (error != 0)
1026 			printf("%s: cpu %u: intr_event_bind: %d", __func__,
1027 			    cpuid, error);
1028 	}
1029 	NETISR_WLOCK();
1030 	nws_array[nws_count] = nwsp->nws_cpu;
1031 	nws_count++;
1032 	NETISR_WUNLOCK();
1033 }
1034 
1035 /*
1036  * Initialize the netisr subsystem.  We rely on BSS and static initialization
1037  * of most fields in global data structures.
1038  *
1039  * Start a worker thread for the boot CPU so that we can support network
1040  * traffic immediately in case the network stack is used before additional
1041  * CPUs are started (for example, diskless boot).
1042  */
1043 static void
1044 netisr_init(void *arg)
1045 {
1046 
1047 	KASSERT(curcpu == 0, ("%s: not on CPU 0", __func__));
1048 
1049 	NETISR_LOCK_INIT();
1050 	if (netisr_maxthreads < 1) {
1051 		printf("netisr2: forcing maxthreads to 1\n");
1052 		netisr_maxthreads = 1;
1053 	}
1054 	if (netisr_maxthreads > MAXCPU) {
1055 		printf("netisr2: forcing maxthreads to %d\n", MAXCPU);
1056 		netisr_maxthreads = MAXCPU;
1057 	}
1058 	if (netisr_defaultqlimit > netisr_maxqlimit) {
1059 		printf("netisr2: forcing defaultqlimit to %d\n",
1060 		    netisr_maxqlimit);
1061 		netisr_defaultqlimit = netisr_maxqlimit;
1062 	}
1063 #ifdef DEVICE_POLLING
1064 	/*
1065 	 * The device polling code is not yet aware of how to deal with
1066 	 * multiple netisr threads, so for the time being compiling in device
1067 	 * polling disables parallel netisr workers.
1068 	 */
1069 	if (netisr_maxthreads != 1 || netisr_bindthreads != 0) {
1070 		printf("netisr2: forcing maxthreads to 1 and bindthreads to "
1071 		    "0 for device polling\n");
1072 		netisr_maxthreads = 1;
1073 		netisr_bindthreads = 0;
1074 	}
1075 #endif
1076 
1077 	netisr_start_swi(curcpu, pcpu_find(curcpu));
1078 }
1079 SYSINIT(netisr_init, SI_SUB_SOFTINTR, SI_ORDER_FIRST, netisr_init, NULL);
1080 
1081 /*
1082  * Start worker threads for additional CPUs.  No attempt to gracefully handle
1083  * work reassignment, we don't yet support dynamic reconfiguration.
1084  */
1085 static void
1086 netisr_start(void *arg)
1087 {
1088 	struct pcpu *pc;
1089 
1090 	SLIST_FOREACH(pc, &cpuhead, pc_allcpu) {
1091 		if (nws_count >= netisr_maxthreads)
1092 			break;
1093 		/* XXXRW: Is skipping absent CPUs still required here? */
1094 		if (CPU_ABSENT(pc->pc_cpuid))
1095 			continue;
1096 		/* Worker will already be present for boot CPU. */
1097 		if (pc->pc_netisr != NULL)
1098 			continue;
1099 		netisr_start_swi(pc->pc_cpuid, pc);
1100 	}
1101 }
1102 SYSINIT(netisr_start, SI_SUB_SMP, SI_ORDER_MIDDLE, netisr_start, NULL);
1103 
1104 #ifdef DDB
1105 DB_SHOW_COMMAND(netisr, db_show_netisr)
1106 {
1107 	struct netisr_workstream *nwsp;
1108 	struct netisr_work *nwp;
1109 	int first, proto;
1110 	u_int cpu;
1111 
1112 	db_printf("%3s %6s %5s %5s %5s %8s %8s %8s %8s\n", "CPU", "Proto",
1113 	    "Len", "WMark", "Max", "Disp", "HDisp", "Drop", "Queue");
1114 	for (cpu = 0; cpu < MAXCPU; cpu++) {
1115 		nwsp = &nws[cpu];
1116 		if (nwsp->nws_intr_event == NULL)
1117 			continue;
1118 		first = 1;
1119 		for (proto = 0; proto < NETISR_MAXPROT; proto++) {
1120 			if (np[proto].np_handler == NULL)
1121 				continue;
1122 			nwp = &nwsp->nws_work[proto];
1123 			if (first) {
1124 				db_printf("%3d ", cpu);
1125 				first = 0;
1126 			} else
1127 				db_printf("%3s ", "");
1128 			db_printf(
1129 			    "%6s %5d %5d %5d %8ju %8ju %8ju %8ju\n",
1130 			    np[proto].np_name, nwp->nw_len,
1131 			    nwp->nw_watermark, nwp->nw_qlimit,
1132 			    nwp->nw_dispatched, nwp->nw_hybrid_dispatched,
1133 			    nwp->nw_qdrops, nwp->nw_queued);
1134 		}
1135 	}
1136 }
1137 #endif
1138