xref: /freebsd/tools/tools/netmap/lb.c (revision 2a63c3be158216222d89a073dcbd6a72ee4aab5a)
1 /*
2  * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *   1. Redistributions of source code must retain the above copyright
8  *      notice, this list of conditions and the following disclaimer.
9  *   2. Redistributions in binary form must reproduce the above copyright
10  *      notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23  * SUCH DAMAGE.
24  */
25 #include <ctype.h>
26 #include <errno.h>
27 #include <inttypes.h>
28 #include <libnetmap.h>
29 #include <netinet/in.h>		/* htonl */
30 #include <pthread.h>
31 #include <signal.h>
32 #include <stdbool.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <syslog.h>
37 #include <sys/ioctl.h>
38 #include <sys/poll.h>
39 #include <unistd.h>
40 
41 #include "pkt_hash.h"
42 #include "ctrs.h"
43 
44 
45 /*
46  * use our version of header structs, rather than bringing in a ton
47  * of platform specific ones
48  */
49 #ifndef ETH_ALEN
50 #define ETH_ALEN 6
51 #endif
52 
53 struct compact_eth_hdr {
54 	unsigned char h_dest[ETH_ALEN];
55 	unsigned char h_source[ETH_ALEN];
56 	u_int16_t h_proto;
57 };
58 
59 struct compact_ip_hdr {
60 	u_int8_t ihl:4, version:4;
61 	u_int8_t tos;
62 	u_int16_t tot_len;
63 	u_int16_t id;
64 	u_int16_t frag_off;
65 	u_int8_t ttl;
66 	u_int8_t protocol;
67 	u_int16_t check;
68 	u_int32_t saddr;
69 	u_int32_t daddr;
70 };
71 
72 struct compact_ipv6_hdr {
73 	u_int8_t priority:4, version:4;
74 	u_int8_t flow_lbl[3];
75 	u_int16_t payload_len;
76 	u_int8_t nexthdr;
77 	u_int8_t hop_limit;
78 	struct in6_addr saddr;
79 	struct in6_addr daddr;
80 };
81 
82 #define MAX_IFNAMELEN 	64
83 #define MAX_PORTNAMELEN	(MAX_IFNAMELEN + 40)
84 #define DEF_OUT_PIPES 	2
85 #define DEF_EXTRA_BUFS 	0
86 #define DEF_BATCH	2048
87 #define DEF_WAIT_LINK	2
88 #define DEF_STATS_INT	600
89 #define BUF_REVOKE	150
90 #define STAT_MSG_MAXSIZE 1024
91 
92 static struct {
93 	char ifname[MAX_IFNAMELEN + 1];
94 	char base_name[MAX_IFNAMELEN + 1];
95 	int netmap_fd;
96 	uint16_t output_rings;
97 	uint16_t num_groups;
98 	uint32_t extra_bufs;
99 	uint16_t batch;
100 	int stdout_interval;
101 	int syslog_interval;
102 	int wait_link;
103 	bool busy_wait;
104 } glob_arg;
105 
106 /*
107  * the overflow queue is a circular queue of buffers
108  */
109 struct overflow_queue {
110 	char name[MAX_IFNAMELEN + 16];
111 	struct netmap_slot *slots;
112 	uint32_t head;
113 	uint32_t tail;
114 	uint32_t n;
115 	uint32_t size;
116 };
117 
118 static struct overflow_queue *freeq;
119 
120 static inline int
oq_full(struct overflow_queue * q)121 oq_full(struct overflow_queue *q)
122 {
123 	return q->n >= q->size;
124 }
125 
126 static inline int
oq_empty(struct overflow_queue * q)127 oq_empty(struct overflow_queue *q)
128 {
129 	return q->n <= 0;
130 }
131 
132 static inline void
oq_enq(struct overflow_queue * q,const struct netmap_slot * s)133 oq_enq(struct overflow_queue *q, const struct netmap_slot *s)
134 {
135 	if (unlikely(oq_full(q))) {
136 		D("%s: queue full!", q->name);
137 		abort();
138 	}
139 	q->slots[q->tail] = *s;
140 	q->n++;
141 	q->tail++;
142 	if (q->tail >= q->size)
143 		q->tail = 0;
144 }
145 
146 static inline struct netmap_slot
oq_deq(struct overflow_queue * q)147 oq_deq(struct overflow_queue *q)
148 {
149 	struct netmap_slot s = q->slots[q->head];
150 	if (unlikely(oq_empty(q))) {
151 		D("%s: queue empty!", q->name);
152 		abort();
153 	}
154 	q->n--;
155 	q->head++;
156 	if (q->head >= q->size)
157 		q->head = 0;
158 	return s;
159 }
160 
161 static volatile int do_abort = 0;
162 
163 static uint64_t dropped = 0;
164 static uint64_t forwarded = 0;
165 static uint64_t received_bytes = 0;
166 static uint64_t received_pkts = 0;
167 static uint64_t non_ip = 0;
168 static uint32_t freeq_n = 0;
169 
170 struct port_des {
171 	char interface[MAX_PORTNAMELEN];
172 	struct my_ctrs ctr;
173 	unsigned int last_sync;
174 	uint32_t last_tail;
175 	struct overflow_queue *oq;
176 	struct nmport_d *nmd;
177 	struct netmap_ring *ring;
178 	struct group_des *group;
179 };
180 
181 static struct port_des *ports;
182 
183 /* each group of pipes receives all the packets */
184 struct group_des {
185 	char pipename[MAX_IFNAMELEN];
186 	struct port_des *ports;
187 	int first_id;
188 	int nports;
189 	int last;
190 	int custom_port;
191 };
192 
193 static struct group_des *groups;
194 
195 /* statistcs */
196 struct counters {
197 	struct timeval ts;
198 	struct my_ctrs *ctrs;
199 	uint64_t received_pkts;
200 	uint64_t received_bytes;
201 	uint64_t non_ip;
202 	uint32_t freeq_n;
203 	int status __attribute__((aligned(64)));
204 #define COUNTERS_EMPTY	0
205 #define COUNTERS_FULL	1
206 };
207 
208 static struct counters counters_buf;
209 
210 static void *
print_stats(void * arg)211 print_stats(void *arg)
212 {
213 	int npipes = glob_arg.output_rings;
214 	int sys_int = 0;
215 	(void)arg;
216 	struct my_ctrs cur, prev;
217 	struct my_ctrs *pipe_prev;
218 
219 	pipe_prev = calloc(npipes, sizeof(struct my_ctrs));
220 	if (pipe_prev == NULL) {
221 		D("out of memory");
222 		exit(1);
223 	}
224 
225 	char stat_msg[STAT_MSG_MAXSIZE] = "";
226 
227 	memset(&prev, 0, sizeof(prev));
228 	while (!do_abort) {
229 		int j, dosyslog = 0, dostdout = 0, newdata;
230 		uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0;
231 		struct my_ctrs x;
232 
233 		counters_buf.status = COUNTERS_EMPTY;
234 		newdata = 0;
235 		memset(&cur, 0, sizeof(cur));
236 		sleep(1);
237 		if (counters_buf.status == COUNTERS_FULL) {
238 			__sync_synchronize();
239 			newdata = 1;
240 			cur.t = counters_buf.ts;
241 			if (prev.t.tv_sec || prev.t.tv_usec) {
242 				usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 +
243 					cur.t.tv_usec - prev.t.tv_usec;
244 			}
245 		}
246 
247 		++sys_int;
248 		if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0)
249 				dostdout = 1;
250 		if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0)
251 				dosyslog = 1;
252 
253 		for (j = 0; j < npipes; ++j) {
254 			struct my_ctrs *c = &counters_buf.ctrs[j];
255 			cur.pkts += c->pkts;
256 			cur.drop += c->drop;
257 			cur.drop_bytes += c->drop_bytes;
258 			cur.bytes += c->bytes;
259 
260 			if (usec) {
261 				x.pkts = c->pkts - pipe_prev[j].pkts;
262 				x.drop = c->drop - pipe_prev[j].drop;
263 				x.bytes = c->bytes - pipe_prev[j].bytes;
264 				x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes;
265 				pps = (x.pkts*1000000 + usec/2) / usec;
266 				dps = (x.drop*1000000 + usec/2) / usec;
267 				bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
268 				dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
269 			}
270 			pipe_prev[j] = *c;
271 
272 			if ( (dosyslog || dostdout) && newdata )
273 				snprintf(stat_msg, STAT_MSG_MAXSIZE,
274 				       "{"
275 				       "\"ts\":%.6f,"
276 				       "\"interface\":\"%s\","
277 				       "\"output_ring\":%" PRIu16 ","
278 				       "\"packets_forwarded\":%" PRIu64 ","
279 				       "\"packets_dropped\":%" PRIu64 ","
280 				       "\"data_forward_rate_Mbps\":%.4f,"
281 				       "\"data_drop_rate_Mbps\":%.4f,"
282 				       "\"packet_forward_rate_kpps\":%.4f,"
283 				       "\"packet_drop_rate_kpps\":%.4f,"
284 				       "\"overflow_queue_size\":%" PRIu32
285 				       "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
286 				            ports[j].interface,
287 				            j,
288 				            c->pkts,
289 				            c->drop,
290 				            (double)bps / 1024 / 1024,
291 				            (double)dbps / 1024 / 1024,
292 				            (double)pps / 1000,
293 				            (double)dps / 1000,
294 				            c->oq_n);
295 
296 			if (dosyslog && stat_msg[0])
297 				syslog(LOG_INFO, "%s", stat_msg);
298 			if (dostdout && stat_msg[0])
299 				printf("%s\n", stat_msg);
300 		}
301 		if (usec) {
302 			x.pkts = cur.pkts - prev.pkts;
303 			x.drop = cur.drop - prev.drop;
304 			x.bytes = cur.bytes - prev.bytes;
305 			x.drop_bytes = cur.drop_bytes - prev.drop_bytes;
306 			pps = (x.pkts*1000000 + usec/2) / usec;
307 			dps = (x.drop*1000000 + usec/2) / usec;
308 			bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
309 			dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
310 		}
311 
312 		if ( (dosyslog || dostdout) && newdata )
313 			snprintf(stat_msg, STAT_MSG_MAXSIZE,
314 			         "{"
315 			         "\"ts\":%.6f,"
316 			         "\"interface\":\"%s\","
317 			         "\"output_ring\":null,"
318 			         "\"packets_received\":%" PRIu64 ","
319 			         "\"packets_forwarded\":%" PRIu64 ","
320 			         "\"packets_dropped\":%" PRIu64 ","
321 			         "\"non_ip_packets\":%" PRIu64 ","
322 			         "\"data_forward_rate_Mbps\":%.4f,"
323 			         "\"data_drop_rate_Mbps\":%.4f,"
324 			         "\"packet_forward_rate_kpps\":%.4f,"
325 			         "\"packet_drop_rate_kpps\":%.4f,"
326 			         "\"free_buffer_slots\":%" PRIu32
327 			         "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0),
328 			              glob_arg.ifname,
329 			              received_pkts,
330 			              cur.pkts,
331 			              cur.drop,
332 			              counters_buf.non_ip,
333 			              (double)bps / 1024 / 1024,
334 			              (double)dbps / 1024 / 1024,
335 			              (double)pps / 1000,
336 			              (double)dps / 1000,
337 			              counters_buf.freeq_n);
338 
339 		if (dosyslog && stat_msg[0])
340 			syslog(LOG_INFO, "%s", stat_msg);
341 		if (dostdout && stat_msg[0])
342 			printf("%s\n", stat_msg);
343 
344 		prev = cur;
345 	}
346 
347 	free(pipe_prev);
348 
349 	return NULL;
350 }
351 
352 static void
free_buffers(void)353 free_buffers(void)
354 {
355 	int i, tot = 0;
356 	struct port_des *rxport = &ports[glob_arg.output_rings];
357 
358 	/* build a netmap free list with the buffers in all the overflow queues */
359 	for (i = 0; i < glob_arg.output_rings + 1; i++) {
360 		struct port_des *cp = &ports[i];
361 		struct overflow_queue *q = cp->oq;
362 
363 		if (!q)
364 			continue;
365 
366 		while (q->n) {
367 			struct netmap_slot s = oq_deq(q);
368 			uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx);
369 
370 			*b = rxport->nmd->nifp->ni_bufs_head;
371 			rxport->nmd->nifp->ni_bufs_head = s.buf_idx;
372 			tot++;
373 		}
374 	}
375 	D("added %d buffers to netmap free list", tot);
376 
377 	for (i = 0; i < glob_arg.output_rings + 1; ++i) {
378 		nmport_close(ports[i].nmd);
379 	}
380 }
381 
382 
sigint_h(int sig)383 static void sigint_h(int sig)
384 {
385 	(void)sig;		/* UNUSED */
386 	do_abort = 1;
387 	signal(SIGINT, SIG_DFL);
388 }
389 
390 static void
usage(void)391 usage(void)
392 {
393 	printf("usage: lb [options]\n");
394 	printf("where options are:\n");
395 	printf("  -h              	view help text\n");
396 	printf("  -i iface        	interface name (required)\n");
397 	printf("  -p [prefix:]npipes	add a new group of output pipes\n");
398 	printf("  -B nbufs        	number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS);
399 	printf("  -b batch        	batch size (default: %d)\n", DEF_BATCH);
400 	printf("  -w seconds        	wait for link up (default: %d)\n", DEF_WAIT_LINK);
401 	printf("  -W                    enable busy waiting. this will run your CPU at 100%%\n");
402 	printf("  -s seconds      	seconds between syslog stats messages (default: 0)\n");
403 	printf("  -o seconds      	seconds between stdout stats messages (default: 0)\n");
404 	exit(0);
405 }
406 
407 static int
parse_pipes(const char * spec)408 parse_pipes(const char *spec)
409 {
410 	const char *end = index(spec, ':');
411 	static int max_groups = 0;
412 	struct group_des *g;
413 
414 	ND("spec %s num_groups %d", spec, glob_arg.num_groups);
415 	if (max_groups < glob_arg.num_groups + 1) {
416 		size_t size = sizeof(*g) * (glob_arg.num_groups + 1);
417 		groups = realloc(groups, size);
418 		if (groups == NULL) {
419 			D("out of memory");
420 			return 1;
421 		}
422 	}
423 	g = &groups[glob_arg.num_groups];
424 	memset(g, 0, sizeof(*g));
425 
426 	if (end != NULL) {
427 		if (end - spec > MAX_IFNAMELEN - 8) {
428 			D("name '%s' too long", spec);
429 			return 1;
430 		}
431 		if (end == spec) {
432 			D("missing prefix before ':' in '%s'", spec);
433 			return 1;
434 		}
435 		strncpy(g->pipename, spec, end - spec);
436 		g->custom_port = 1;
437 		end++;
438 	} else {
439 		/* no prefix, this group will use the
440 		 * name of the input port.
441 		 * This will be set in init_groups(),
442 		 * since here the input port may still
443 		 * be uninitialized
444 		 */
445 		end = spec;
446 	}
447 	if (*end == '\0') {
448 		g->nports = DEF_OUT_PIPES;
449 	} else {
450 		g->nports = atoi(end);
451 		if (g->nports < 1) {
452 			D("invalid number of pipes '%s' (must be at least 1)", end);
453 			return 1;
454 		}
455 	}
456 	glob_arg.output_rings += g->nports;
457 	glob_arg.num_groups++;
458 	return 0;
459 }
460 
461 /* complete the initialization of the groups data structure */
462 static void
init_groups(void)463 init_groups(void)
464 {
465 	int i, j, t = 0;
466 	struct group_des *g = NULL;
467 	for (i = 0; i < glob_arg.num_groups; i++) {
468 		g = &groups[i];
469 		g->ports = &ports[t];
470 		for (j = 0; j < g->nports; j++)
471 			g->ports[j].group = g;
472 		t += g->nports;
473 		if (!g->custom_port)
474 			strcpy(g->pipename, glob_arg.base_name);
475 		for (j = 0; j < i; j++) {
476 			struct group_des *h = &groups[j];
477 			if (!strcmp(h->pipename, g->pipename))
478 				g->first_id += h->nports;
479 		}
480 	}
481 	g->last = 1;
482 }
483 
484 
485 /* To support packets that span multiple slots (NS_MOREFRAG) we
486  * need to make sure of the following:
487  *
488  * - all fragments of the same packet must go to the same output pipe
489  * - when dropping, all fragments of the same packet must be dropped
490  *
491  * For the former point we remember and reuse the last hash computed
492  * in each input ring, and only update it when NS_MOREFRAG was not
493  * set in the last received slot (this marks the start of a new packet).
494  *
495  * For the latter point, we only update the output ring head pointer
496  * when an entire packet has been forwarded. We keep a shadow_head
497  * pointer to know where to put the next partial fragment and,
498  * when the need to drop arises, we roll it back to head.
499  */
500 struct morefrag {
501 	uint16_t last_flag;	/* for input rings */
502 	uint32_t last_hash;	/* for input rings */
503 	uint32_t shadow_head;	/* for output rings */
504 };
505 
506 /* push the packet described by slot rs to the group g.
507  * This may cause other buffers to be pushed down the
508  * chain headed by g.
509  * Return a free buffer.
510  */
511 static uint32_t
forward_packet(struct group_des * g,struct netmap_slot * rs)512 forward_packet(struct group_des *g, struct netmap_slot *rs)
513 {
514 	uint32_t hash = rs->ptr;
515 	uint32_t output_port = hash % g->nports;
516 	struct port_des *port = &g->ports[output_port];
517 	struct netmap_ring *ring = port->ring;
518 	struct overflow_queue *q = port->oq;
519 	struct morefrag *mf = (struct morefrag *)ring->sem;
520 	uint16_t curmf = rs->flags & NS_MOREFRAG;
521 
522 	/* Move the packet to the output pipe, unless there is
523 	 * either no space left on the ring, or there is some
524 	 * packet still in the overflow queue (since those must
525 	 * take precedence over the new one)
526 	*/
527 	if (mf->shadow_head != ring->tail && (q == NULL || oq_empty(q))) {
528 		struct netmap_slot *ts = &ring->slot[mf->shadow_head];
529 		struct netmap_slot old_slot = *ts;
530 
531 		ts->buf_idx = rs->buf_idx;
532 		ts->len = rs->len;
533 		ts->flags = rs->flags | NS_BUF_CHANGED;
534 		ts->ptr = rs->ptr;
535 		mf->shadow_head = nm_ring_next(ring, mf->shadow_head);
536 		if (!curmf) {
537 			ring->head = mf->shadow_head;
538 		}
539 		ND("curmf %2x ts->flags %2x shadow_head %3u head %3u tail %3u",
540 				curmf, ts->flags, mf->shadow_head, ring->head, ring->tail);
541 		port->ctr.bytes += rs->len;
542 		port->ctr.pkts++;
543 		forwarded++;
544 		return old_slot.buf_idx;
545 	}
546 
547 	/* use the overflow queue, if available */
548 	if (q == NULL || oq_full(q)) {
549 		uint32_t scan;
550 		/* no space left on the ring and no overflow queue
551 		 * available: we are forced to drop the packet
552 		 */
553 
554 		/* drop previous fragments, if any */
555 		for (scan = ring->head; scan != mf->shadow_head;
556 				scan = nm_ring_next(ring, scan)) {
557 			struct netmap_slot *ts = &ring->slot[scan];
558 			dropped++;
559 			port->ctr.drop_bytes += ts->len;
560 		}
561 		mf->shadow_head = ring->head;
562 
563 		dropped++;
564 		port->ctr.drop++;
565 		port->ctr.drop_bytes += rs->len;
566 		return rs->buf_idx;
567 	}
568 
569 	oq_enq(q, rs);
570 
571 	/*
572 	 * we cannot continue down the chain and we need to
573 	 * return a free buffer now. We take it from the free queue.
574 	 */
575 	if (oq_empty(freeq)) {
576 		/* the free queue is empty. Revoke some buffers
577 		 * from the longest overflow queue
578 		 */
579 		uint32_t j;
580 		struct port_des *lp = &ports[0];
581 		uint32_t max = lp->oq->n;
582 
583 		/* let lp point to the port with the longest queue */
584 		for (j = 1; j < glob_arg.output_rings; j++) {
585 			struct port_des *cp = &ports[j];
586 			if (cp->oq->n > max) {
587 				lp = cp;
588 				max = cp->oq->n;
589 			}
590 		}
591 
592 		/* move the oldest BUF_REVOKE buffers from the
593 		 * lp queue to the free queue
594 		 *
595 		 * We cannot revoke a partially received packet.
596 		 * To make thinks simple we make sure to leave
597 		 * at least NETMAP_MAX_FRAGS slots in the queue.
598 		 */
599 		for (j = 0; lp->oq->n > NETMAP_MAX_FRAGS && j < BUF_REVOKE; j++) {
600 			struct netmap_slot tmp = oq_deq(lp->oq);
601 
602 			dropped++;
603 			lp->ctr.drop++;
604 			lp->ctr.drop_bytes += tmp.len;
605 
606 			oq_enq(freeq, &tmp);
607 		}
608 
609 		ND(1, "revoked %d buffers from %s", j, lq->name);
610 	}
611 
612 	return oq_deq(freeq).buf_idx;
613 }
614 
main(int argc,char ** argv)615 int main(int argc, char **argv)
616 {
617 	int ch;
618 	uint32_t i;
619 	int rv;
620 	int poll_timeout = 10; /* default */
621 
622 	glob_arg.ifname[0] = '\0';
623 	glob_arg.output_rings = 0;
624 	glob_arg.batch = DEF_BATCH;
625 	glob_arg.wait_link = DEF_WAIT_LINK;
626 	glob_arg.busy_wait = false;
627 	glob_arg.syslog_interval = 0;
628 	glob_arg.stdout_interval = 0;
629 
630 	while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) {
631 		switch (ch) {
632 		case 'i':
633 			D("interface is %s", optarg);
634 			if (strlen(optarg) > MAX_IFNAMELEN - 8) {
635 				D("ifname too long %s", optarg);
636 				return 1;
637 			}
638 			if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) {
639 				sprintf(glob_arg.ifname, "netmap:%s", optarg);
640 			} else {
641 				strcpy(glob_arg.ifname, optarg);
642 			}
643 			break;
644 
645 		case 'p':
646 			if (parse_pipes(optarg)) {
647 				usage();
648 				return 1;
649 			}
650 			break;
651 
652 		case 'B':
653 			glob_arg.extra_bufs = atoi(optarg);
654 			D("requested %d extra buffers", glob_arg.extra_bufs);
655 			break;
656 
657 		case 'b':
658 			glob_arg.batch = atoi(optarg);
659 			D("batch is %d", glob_arg.batch);
660 			break;
661 
662 		case 'w':
663 			glob_arg.wait_link = atoi(optarg);
664 			D("link wait for up time is %d", glob_arg.wait_link);
665 			break;
666 
667 		case 'W':
668 			glob_arg.busy_wait = true;
669 			break;
670 
671 		case 'o':
672 			glob_arg.stdout_interval = atoi(optarg);
673 			break;
674 
675 		case 's':
676 			glob_arg.syslog_interval = atoi(optarg);
677 			break;
678 
679 		case 'h':
680 			usage();
681 			return 0;
682 			break;
683 
684 		default:
685 			D("bad option %c %s", ch, optarg);
686 			usage();
687 			return 1;
688 		}
689 	}
690 
691 	if (glob_arg.ifname[0] == '\0') {
692 		D("missing interface name");
693 		usage();
694 		return 1;
695 	}
696 
697 	if (glob_arg.num_groups == 0)
698 		parse_pipes("");
699 
700 	if (glob_arg.syslog_interval) {
701 		setlogmask(LOG_UPTO(LOG_INFO));
702 		openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1);
703 	}
704 
705 	uint32_t npipes = glob_arg.output_rings;
706 
707 
708 	pthread_t stat_thread;
709 
710 	ports = calloc(npipes + 1, sizeof(struct port_des));
711 	if (!ports) {
712 		D("failed to allocate the stats array");
713 		return 1;
714 	}
715 	struct port_des *rxport = &ports[npipes];
716 
717 	rxport->nmd = nmport_prepare(glob_arg.ifname);
718 	if (rxport->nmd == NULL) {
719 		D("cannot parse %s", glob_arg.ifname);
720 		return (1);
721 	}
722 	/* extract the base name */
723 	strncpy(glob_arg.base_name, rxport->nmd->hdr.nr_name, MAX_IFNAMELEN);
724 
725 	init_groups();
726 
727 	memset(&counters_buf, 0, sizeof(counters_buf));
728 	counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs));
729 	if (!counters_buf.ctrs) {
730 		D("failed to allocate the counters snapshot buffer");
731 		return 1;
732 	}
733 
734 	rxport->nmd->reg.nr_extra_bufs = glob_arg.extra_bufs;
735 
736 	if (nmport_open_desc(rxport->nmd) < 0) {
737 		D("cannot open %s", glob_arg.ifname);
738 		return (1);
739 	}
740 	D("successfully opened %s", glob_arg.ifname);
741 
742 	uint32_t extra_bufs = rxport->nmd->reg.nr_extra_bufs;
743 	struct overflow_queue *oq = NULL;
744 	/* reference ring to access the buffers */
745 	rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0);
746 
747 	if (!glob_arg.extra_bufs)
748 		goto run;
749 
750 	D("obtained %d extra buffers", extra_bufs);
751 	if (!extra_bufs)
752 		goto run;
753 
754 	/* one overflow queue for each output pipe, plus one for the
755 	 * free extra buffers
756 	 */
757 	oq = calloc(npipes + 1, sizeof(struct overflow_queue));
758 	if (!oq) {
759 		D("failed to allocated overflow queues descriptors");
760 		goto run;
761 	}
762 
763 	freeq = &oq[npipes];
764 	rxport->oq = freeq;
765 
766 	freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
767 	if (!freeq->slots) {
768 		D("failed to allocate the free list");
769 	}
770 	freeq->size = extra_bufs;
771 	snprintf(freeq->name, MAX_IFNAMELEN, "free queue");
772 
773 	/*
774 	 * the list of buffers uses the first uint32_t in each buffer
775 	 * as the index of the next buffer.
776 	 */
777 	uint32_t scan;
778 	for (scan = rxport->nmd->nifp->ni_bufs_head;
779 	     scan;
780 	     scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan))
781 	{
782 		struct netmap_slot s;
783 		s.len = s.flags = 0;
784 		s.ptr = 0;
785 		s.buf_idx = scan;
786 		ND("freeq <- %d", s.buf_idx);
787 		oq_enq(freeq, &s);
788 	}
789 
790 
791 	if (freeq->n != extra_bufs) {
792 		D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d",
793 				extra_bufs, freeq->n);
794 		return 1;
795 	}
796 	rxport->nmd->nifp->ni_bufs_head = 0;
797 
798 run:
799 	atexit(free_buffers);
800 
801 	int j, t = 0;
802 	for (j = 0; j < glob_arg.num_groups; j++) {
803 		struct group_des *g = &groups[j];
804 		int k;
805 		for (k = 0; k < g->nports; ++k) {
806 			struct port_des *p = &g->ports[k];
807 			snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d",
808 					(strncmp(g->pipename, "vale", 4) ? "netmap:" : ""),
809 					g->pipename, g->first_id + k,
810 					rxport->nmd->reg.nr_mem_id);
811 			D("opening pipe named %s", p->interface);
812 
813 			p->nmd = nmport_open(p->interface);
814 
815 			if (p->nmd == NULL) {
816 				D("cannot open %s", p->interface);
817 				return (1);
818 			} else if (p->nmd->mem != rxport->nmd->mem) {
819 				D("failed to open pipe #%d in zero-copy mode, "
820 					"please close any application that uses either pipe %s}%d, "
821 				        "or %s{%d, and retry",
822 					k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k);
823 				return (1);
824 			} else {
825 				struct morefrag *mf;
826 
827 				D("successfully opened pipe #%d %s (tx slots: %d)",
828 				  k + 1, p->interface, p->nmd->reg.nr_tx_slots);
829 				p->ring = NETMAP_TXRING(p->nmd->nifp, 0);
830 				p->last_tail = nm_ring_next(p->ring, p->ring->tail);
831 				mf = (struct morefrag *)p->ring->sem;
832 				mf->last_flag = 0;	/* unused */
833 				mf->last_hash = 0;	/* unused */
834 				mf->shadow_head = p->ring->head;
835 			}
836 			D("zerocopy %s",
837 			  (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled");
838 
839 			if (extra_bufs) {
840 				struct overflow_queue *q = &oq[t + k];
841 				q->slots = calloc(extra_bufs, sizeof(struct netmap_slot));
842 				if (!q->slots) {
843 					D("failed to allocate overflow queue for pipe %d", k);
844 					/* make all overflow queue management fail */
845 					extra_bufs = 0;
846 				}
847 				q->size = extra_bufs;
848 				snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k);
849 				p->oq = q;
850 			}
851 		}
852 		t += g->nports;
853 	}
854 
855 	if (glob_arg.extra_bufs && !extra_bufs) {
856 		if (oq) {
857 			for (i = 0; i < npipes + 1; i++) {
858 				free(oq[i].slots);
859 				oq[i].slots = NULL;
860 			}
861 			free(oq);
862 			oq = NULL;
863 		}
864 		D("*** overflow queues disabled ***");
865 	}
866 
867 	sleep(glob_arg.wait_link);
868 
869 	/* start stats thread after wait_link */
870 	if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) {
871 		D("unable to create the stats thread: %s", strerror(errno));
872 		return 1;
873 	}
874 
875 	struct pollfd pollfd[npipes + 1];
876 	memset(&pollfd, 0, sizeof(pollfd));
877 	signal(SIGINT, sigint_h);
878 
879 	/* make sure we wake up as often as needed, even when there are no
880 	 * packets coming in
881 	 */
882 	if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout)
883 		poll_timeout = glob_arg.syslog_interval;
884 	if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout)
885 		poll_timeout = glob_arg.stdout_interval;
886 
887 	/* initialize the morefrag structures for the input rings */
888 	for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
889 		struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
890 		struct morefrag *mf = (struct morefrag *)rxring->sem;
891 
892 		mf->last_flag = 0;
893 		mf->last_hash = 0;
894 		mf->shadow_head = 0; /* unused */
895 	}
896 
897 	while (!do_abort) {
898 		u_int polli = 0;
899 
900 		for (i = 0; i < npipes; ++i) {
901 			struct netmap_ring *ring = ports[i].ring;
902 			int pending = nm_tx_pending(ring);
903 
904 			/* if there are packets pending, we want to be notified when
905 			 * tail moves, so we let cur=tail
906 			 */
907 			ring->cur = pending ? ring->tail : ring->head;
908 
909 			if (!glob_arg.busy_wait && !pending) {
910 				/* no need to poll, there are no packets pending */
911 				continue;
912 			}
913 			pollfd[polli].fd = ports[i].nmd->fd;
914 			pollfd[polli].events = POLLOUT;
915 			pollfd[polli].revents = 0;
916 			++polli;
917 		}
918 
919 		pollfd[polli].fd = rxport->nmd->fd;
920 		pollfd[polli].events = POLLIN;
921 		pollfd[polli].revents = 0;
922 		++polli;
923 
924 		ND(5, "polling %d file descriptors", polli);
925 		rv = poll(pollfd, polli, poll_timeout);
926 		if (rv <= 0) {
927 			if (rv < 0 && errno != EAGAIN && errno != EINTR)
928 				RD(1, "poll error %s", strerror(errno));
929 			goto send_stats;
930 		}
931 
932 		/* if there are several groups, try pushing released packets from
933 		 * upstream groups to the downstream ones.
934 		 *
935 		 * It is important to do this before returned slots are reused
936 		 * for new transmissions. For the same reason, this must be
937 		 * done starting from the last group going backwards.
938 		 */
939 		for (i = glob_arg.num_groups - 1U; i > 0; i--) {
940 			struct group_des *g = &groups[i - 1];
941 
942 			for (j = 0; j < g->nports; j++) {
943 				struct port_des *p = &g->ports[j];
944 				struct netmap_ring *ring = p->ring;
945 				uint32_t last = p->last_tail,
946 					 stop = nm_ring_next(ring, ring->tail);
947 
948 				/* slight abuse of the API here: we touch the slot
949 				 * pointed to by tail
950 				 */
951 				for ( ; last != stop; last = nm_ring_next(ring, last)) {
952 					struct netmap_slot *rs = &ring->slot[last];
953 					// XXX less aggressive?
954 					rs->buf_idx = forward_packet(g + 1, rs);
955 					rs->flags = NS_BUF_CHANGED;
956 					rs->ptr = 0;
957 				}
958 				p->last_tail = last;
959 			}
960 		}
961 
962 
963 
964 		if (oq) {
965 			/* try to push packets from the overflow queues
966 			 * to the corresponding pipes
967 			 */
968 			for (i = 0; i < npipes; i++) {
969 				struct port_des *p = &ports[i];
970 				struct overflow_queue *q = p->oq;
971 				uint32_t k;
972 				int64_t lim;
973 				struct netmap_ring *ring;
974 				struct netmap_slot *slot;
975 				struct morefrag *mf;
976 
977 				if (oq_empty(q))
978 					continue;
979 				ring = p->ring;
980 				mf = (struct morefrag *)ring->sem;
981 				lim = ring->tail - mf->shadow_head;
982 				if (!lim)
983 					continue;
984 				if (lim < 0)
985 					lim += ring->num_slots;
986 				if (q->n < lim)
987 					lim = q->n;
988 				for (k = 0; k < lim; k++) {
989 					struct netmap_slot s = oq_deq(q), tmp;
990 					tmp.ptr = 0;
991 					slot = &ring->slot[mf->shadow_head];
992 					tmp.buf_idx = slot->buf_idx;
993 					oq_enq(freeq, &tmp);
994 					*slot = s;
995 					slot->flags |= NS_BUF_CHANGED;
996 					mf->shadow_head = nm_ring_next(ring, mf->shadow_head);
997 					if (!(slot->flags & NS_MOREFRAG))
998 						ring->head = mf->shadow_head;
999 				}
1000 			}
1001 		}
1002 
1003 		/* push any new packets from the input port to the first group */
1004 		int batch = 0;
1005 		for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) {
1006 			struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i);
1007 			struct morefrag *mf = (struct morefrag *)rxring->sem;
1008 
1009 			//D("prepare to scan rings");
1010 			int next_head = rxring->head;
1011 			struct netmap_slot *next_slot = &rxring->slot[next_head];
1012 			const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
1013 			while (!nm_ring_empty(rxring)) {
1014 				struct netmap_slot *rs = next_slot;
1015 				struct group_des *g = &groups[0];
1016 				++received_pkts;
1017 				received_bytes += rs->len;
1018 
1019 				// CHOOSE THE CORRECT OUTPUT PIPE
1020 				// If the previous slot had NS_MOREFRAG set, this is another
1021 				// fragment of the last packet and it should go to the same
1022 				// output pipe as before.
1023 				if (!mf->last_flag) {
1024 					// 'B' is just a hashing seed
1025 					mf->last_hash = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B');
1026 				}
1027 				mf->last_flag = rs->flags & NS_MOREFRAG;
1028 				rs->ptr = mf->last_hash;
1029 				if (rs->ptr == 0) {
1030 					non_ip++; // XXX ??
1031 				}
1032 				// prefetch the buffer for the next round
1033 				next_head = nm_ring_next(rxring, next_head);
1034 				next_slot = &rxring->slot[next_head];
1035 				next_buf = NETMAP_BUF(rxring, next_slot->buf_idx);
1036 				__builtin_prefetch(next_buf);
1037 				rs->buf_idx = forward_packet(g, rs);
1038 				rs->flags = NS_BUF_CHANGED;
1039 				rxring->head = rxring->cur = next_head;
1040 
1041 				batch++;
1042 				if (unlikely(batch >= glob_arg.batch)) {
1043 					ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL);
1044 					batch = 0;
1045 				}
1046 				ND(1,
1047 				   "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64"   Percent: %.2f",
1048 				   forwarded, dropped,
1049 				   ((float)dropped / (float)forwarded * 100));
1050 			}
1051 
1052 		}
1053 
1054 	send_stats:
1055 		if (counters_buf.status == COUNTERS_FULL)
1056 			continue;
1057 		/* take a new snapshot of the counters */
1058 		gettimeofday(&counters_buf.ts, NULL);
1059 		for (i = 0; i < npipes; i++) {
1060 			struct my_ctrs *c = &counters_buf.ctrs[i];
1061 			*c = ports[i].ctr;
1062 			/*
1063 			 * If there are overflow queues, copy the number of them for each
1064 			 * port to the ctrs.oq_n variable for each port.
1065 			 */
1066 			if (ports[i].oq != NULL)
1067 				c->oq_n = ports[i].oq->n;
1068 		}
1069 		counters_buf.received_pkts = received_pkts;
1070 		counters_buf.received_bytes = received_bytes;
1071 		counters_buf.non_ip = non_ip;
1072 		if (freeq != NULL)
1073 			counters_buf.freeq_n = freeq->n;
1074 		__sync_synchronize();
1075 		counters_buf.status = COUNTERS_FULL;
1076 	}
1077 
1078 	/*
1079 	 * If freeq exists, copy the number to the freeq_n member of the
1080 	 * message struct, otherwise set it to 0.
1081 	 */
1082 	if (freeq != NULL) {
1083 		freeq_n = freeq->n;
1084 	} else {
1085 		freeq_n = 0;
1086 	}
1087 
1088 	pthread_join(stat_thread, NULL);
1089 
1090 	printf("%"PRIu64" packets forwarded.  %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded,
1091 	       dropped, forwarded + dropped);
1092 	return 0;
1093 }
1094