1 /*- 2 * Copyright (c) 2004 Robert N. M. Watson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD$ 27 */ 28 29 #include <sys/types.h> 30 #include <sys/socket.h> 31 #include <sys/time.h> 32 #include <sys/poll.h> 33 34 #include <netinet/in.h> 35 #include <netdb.h> /* getaddrinfo */ 36 37 #include <arpa/inet.h> 38 39 #include <stdio.h> 40 #include <stdlib.h> 41 #include <string.h> 42 #include <unistd.h> /* close */ 43 44 #define MAXSOCK 20 45 46 #include <pthread.h> 47 #include <fcntl.h> 48 #include <time.h> /* clock_getres() */ 49 50 static int round_to(int n, int l) 51 { 52 return ((n + l - 1)/l)*l; 53 } 54 55 /* 56 * Each socket uses multiple threads so the receiver is 57 * more efficient. A collector thread runs the stats. 58 */ 59 struct td_desc { 60 pthread_t td_id; 61 uint64_t count; /* rx counter */ 62 uint64_t byte_count; /* rx byte counter */ 63 int fd; 64 char *buf; 65 int buflen; 66 }; 67 68 static void 69 usage(void) 70 { 71 72 fprintf(stderr, "netreceive port [nthreads]\n"); 73 exit(-1); 74 } 75 76 static __inline void 77 timespec_add(struct timespec *tsa, struct timespec *tsb) 78 { 79 80 tsa->tv_sec += tsb->tv_sec; 81 tsa->tv_nsec += tsb->tv_nsec; 82 if (tsa->tv_nsec >= 1000000000) { 83 tsa->tv_sec++; 84 tsa->tv_nsec -= 1000000000; 85 } 86 } 87 88 static __inline void 89 timespec_sub(struct timespec *tsa, struct timespec *tsb) 90 { 91 92 tsa->tv_sec -= tsb->tv_sec; 93 tsa->tv_nsec -= tsb->tv_nsec; 94 if (tsa->tv_nsec < 0) { 95 tsa->tv_sec--; 96 tsa->tv_nsec += 1000000000; 97 } 98 } 99 100 static void * 101 rx_body(void *data) 102 { 103 struct td_desc *t = data; 104 struct pollfd fds; 105 int y; 106 107 fds.fd = t->fd; 108 fds.events = POLLIN; 109 110 for (;;) { 111 if (poll(&fds, 1, -1) < 0) 112 perror("poll on thread"); 113 if (!(fds.revents & POLLIN)) 114 continue; 115 for (;;) { 116 y = recv(t->fd, t->buf, t->buflen, MSG_DONTWAIT); 117 if (y < 0) 118 break; 119 t->count++; 120 t->byte_count += y; 121 } 122 } 123 return NULL; 124 } 125 126 static struct td_desc ** 127 make_threads(int *s, int nsock, int nthreads) 128 { 129 int i, si, nt = nsock * nthreads; 130 int lb = round_to(nt * sizeof (struct td_desc *), 64); 131 int td_len = round_to(sizeof(struct td_desc), 64); // cache align 132 char *m = calloc(1, lb + td_len * nt); 133 struct td_desc **tp; 134 135 printf("td len %d -> %d\n", (int)sizeof(struct td_desc) , td_len); 136 /* pointers plus the structs */ 137 if (m == NULL) { 138 perror("no room for pointers!"); 139 exit(1); 140 } 141 tp = (struct td_desc **)m; 142 m += lb; /* skip the pointers */ 143 for (si = i = 0; i < nt; i++, m += td_len) { 144 tp[i] = (struct td_desc *)m; 145 tp[i]->fd = s[si]; 146 tp[i]->buflen = 65536; 147 tp[i]->buf = calloc(1, tp[i]->buflen); 148 if (++si == nsock) 149 si = 0; 150 if (pthread_create(&tp[i]->td_id, NULL, rx_body, tp[i])) { 151 perror("unable to create thread"); 152 exit(1); 153 } 154 } 155 return tp; 156 } 157 158 static void 159 main_thread(struct td_desc **tp, int nsock, int nthreads) 160 { 161 uint64_t c0, c1, bc0, bc1; 162 struct timespec now, then, delta; 163 /* now the parent collects and prints results */ 164 c0 = c1 = bc0 = bc1 = 0; 165 clock_gettime(CLOCK_REALTIME, &then); 166 fprintf(stderr, "start at %ld.%09ld\n", then.tv_sec, then.tv_nsec); 167 while (1) { 168 int i, nt = nsock * nthreads; 169 int64_t dn; 170 uint64_t pps, bps; 171 172 if (poll(NULL, 0, 500) < 0) 173 perror("poll"); 174 c0 = bc0 = 0; 175 for (i = 0; i < nt; i++) { 176 c0 += tp[i]->count; 177 bc0 += tp[i]->byte_count; 178 } 179 dn = c0 - c1; 180 clock_gettime(CLOCK_REALTIME, &now); 181 delta = now; 182 timespec_sub(&delta, &then); 183 then = now; 184 pps = dn; 185 pps = (pps * 1000000000) / (delta.tv_sec*1000000000 + delta.tv_nsec + 1); 186 bps = ((bc0 - bc1) * 8000000000) / (delta.tv_sec*1000000000 + delta.tv_nsec + 1); 187 fprintf(stderr, " %9ld pps %8.3f Mbps", (long)pps, .000001*bps); 188 fprintf(stderr, " - %d pkts in %ld.%09ld ns\n", 189 (int)dn, delta.tv_sec, delta.tv_nsec); 190 c1 = c0; 191 bc1 = bc0; 192 } 193 } 194 195 int 196 main(int argc, char *argv[]) 197 { 198 struct addrinfo hints, *res, *res0; 199 char *dummy, *packet; 200 int port; 201 int error, v, nthreads = 1; 202 struct td_desc **tp; 203 const char *cause = NULL; 204 int s[MAXSOCK]; 205 int nsock; 206 207 if (argc < 2) 208 usage(); 209 210 memset(&hints, 0, sizeof(hints)); 211 hints.ai_family = PF_UNSPEC; 212 hints.ai_socktype = SOCK_DGRAM; 213 hints.ai_flags = AI_PASSIVE; 214 215 port = strtoul(argv[1], &dummy, 10); 216 if (port < 1 || port > 65535 || *dummy != '\0') 217 usage(); 218 if (argc > 2) 219 nthreads = strtoul(argv[2], &dummy, 10); 220 if (nthreads < 1 || nthreads > 64) 221 usage(); 222 223 packet = malloc(65536); 224 if (packet == NULL) { 225 perror("malloc"); 226 return (-1); 227 } 228 bzero(packet, 65536); 229 230 error = getaddrinfo(NULL, argv[1], &hints, &res0); 231 if (error) { 232 perror(gai_strerror(error)); 233 return (-1); 234 /*NOTREACHED*/ 235 } 236 237 nsock = 0; 238 for (res = res0; res && nsock < MAXSOCK; res = res->ai_next) { 239 s[nsock] = socket(res->ai_family, res->ai_socktype, 240 res->ai_protocol); 241 if (s[nsock] < 0) { 242 cause = "socket"; 243 continue; 244 } 245 246 v = 128 * 1024; 247 if (setsockopt(s[nsock], SOL_SOCKET, SO_RCVBUF, &v, sizeof(v)) < 0) { 248 cause = "SO_RCVBUF"; 249 close(s[nsock]); 250 continue; 251 } 252 if (bind(s[nsock], res->ai_addr, res->ai_addrlen) < 0) { 253 cause = "bind"; 254 close(s[nsock]); 255 continue; 256 } 257 (void) listen(s[nsock], 5); 258 nsock++; 259 } 260 if (nsock == 0) { 261 perror(cause); 262 return (-1); 263 /*NOTREACHED*/ 264 } 265 266 printf("netreceive %d sockets x %d threads listening on UDP port %d\n", 267 nsock, nthreads, (u_short)port); 268 269 tp = make_threads(s, nsock, nthreads); 270 main_thread(tp, nsock, nthreads); 271 272 /*NOTREACHED*/ 273 freeaddrinfo(res0); 274 } 275