1 /*- 2 * Copyright (c) 2004 Robert N. M. Watson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27 #include <sys/types.h> 28 #include <sys/socket.h> 29 #include <sys/time.h> 30 #include <sys/poll.h> 31 32 #include <netinet/in.h> 33 #include <netdb.h> /* getaddrinfo */ 34 35 #include <arpa/inet.h> 36 37 #include <stdio.h> 38 #include <stdlib.h> 39 #include <string.h> 40 #include <unistd.h> /* close */ 41 42 #define MAXSOCK 20 43 44 #include <pthread.h> 45 #include <fcntl.h> 46 #include <time.h> /* clock_getres() */ 47 48 static int round_to(int n, int l) 49 { 50 return ((n + l - 1)/l)*l; 51 } 52 53 /* 54 * Each socket uses multiple threads so the receiver is 55 * more efficient. A collector thread runs the stats. 56 */ 57 struct td_desc { 58 pthread_t td_id; 59 uint64_t count; /* rx counter */ 60 uint64_t byte_count; /* rx byte counter */ 61 int fd; 62 char *buf; 63 int buflen; 64 }; 65 66 static void 67 usage(void) 68 { 69 70 fprintf(stderr, "netreceive port [nthreads]\n"); 71 exit(-1); 72 } 73 74 static __inline void 75 timespec_add(struct timespec *tsa, struct timespec *tsb) 76 { 77 78 tsa->tv_sec += tsb->tv_sec; 79 tsa->tv_nsec += tsb->tv_nsec; 80 if (tsa->tv_nsec >= 1000000000) { 81 tsa->tv_sec++; 82 tsa->tv_nsec -= 1000000000; 83 } 84 } 85 86 static __inline void 87 timespec_sub(struct timespec *tsa, struct timespec *tsb) 88 { 89 90 tsa->tv_sec -= tsb->tv_sec; 91 tsa->tv_nsec -= tsb->tv_nsec; 92 if (tsa->tv_nsec < 0) { 93 tsa->tv_sec--; 94 tsa->tv_nsec += 1000000000; 95 } 96 } 97 98 static void * 99 rx_body(void *data) 100 { 101 struct td_desc *t = data; 102 struct pollfd fds; 103 int y; 104 105 fds.fd = t->fd; 106 fds.events = POLLIN; 107 108 for (;;) { 109 if (poll(&fds, 1, -1) < 0) 110 perror("poll on thread"); 111 if (!(fds.revents & POLLIN)) 112 continue; 113 for (;;) { 114 y = recv(t->fd, t->buf, t->buflen, MSG_DONTWAIT); 115 if (y < 0) 116 break; 117 t->count++; 118 t->byte_count += y; 119 } 120 } 121 return NULL; 122 } 123 124 static struct td_desc ** 125 make_threads(int *s, int nsock, int nthreads) 126 { 127 int i, si, nt = nsock * nthreads; 128 int lb = round_to(nt * sizeof (struct td_desc *), 64); 129 int td_len = round_to(sizeof(struct td_desc), 64); // cache align 130 char *m = calloc(1, lb + td_len * nt); 131 struct td_desc **tp; 132 133 printf("td len %d -> %d\n", (int)sizeof(struct td_desc) , td_len); 134 /* pointers plus the structs */ 135 if (m == NULL) { 136 perror("no room for pointers!"); 137 exit(1); 138 } 139 tp = (struct td_desc **)m; 140 m += lb; /* skip the pointers */ 141 for (si = i = 0; i < nt; i++, m += td_len) { 142 tp[i] = (struct td_desc *)m; 143 tp[i]->fd = s[si]; 144 tp[i]->buflen = 65536; 145 tp[i]->buf = calloc(1, tp[i]->buflen); 146 if (++si == nsock) 147 si = 0; 148 if (pthread_create(&tp[i]->td_id, NULL, rx_body, tp[i])) { 149 perror("unable to create thread"); 150 exit(1); 151 } 152 } 153 return tp; 154 } 155 156 static void 157 main_thread(struct td_desc **tp, int nsock, int nthreads) 158 { 159 uint64_t c0, c1, bc0, bc1; 160 struct timespec now, then, delta; 161 /* now the parent collects and prints results */ 162 c0 = c1 = bc0 = bc1 = 0; 163 clock_gettime(CLOCK_REALTIME, &then); 164 fprintf(stderr, "start at %ld.%09ld\n", then.tv_sec, then.tv_nsec); 165 while (1) { 166 int i, nt = nsock * nthreads; 167 int64_t dn; 168 uint64_t pps, bps; 169 170 if (poll(NULL, 0, 500) < 0) 171 perror("poll"); 172 c0 = bc0 = 0; 173 for (i = 0; i < nt; i++) { 174 c0 += tp[i]->count; 175 bc0 += tp[i]->byte_count; 176 } 177 dn = c0 - c1; 178 clock_gettime(CLOCK_REALTIME, &now); 179 delta = now; 180 timespec_sub(&delta, &then); 181 then = now; 182 pps = dn; 183 pps = (pps * 1000000000) / (delta.tv_sec*1000000000 + delta.tv_nsec + 1); 184 bps = ((bc0 - bc1) * 8000000000) / (delta.tv_sec*1000000000 + delta.tv_nsec + 1); 185 fprintf(stderr, " %9ld pps %8.3f Mbps", (long)pps, .000001*bps); 186 fprintf(stderr, " - %d pkts in %ld.%09ld ns\n", 187 (int)dn, delta.tv_sec, delta.tv_nsec); 188 c1 = c0; 189 bc1 = bc0; 190 } 191 } 192 193 int 194 main(int argc, char *argv[]) 195 { 196 struct addrinfo hints, *res, *res0; 197 char *dummy, *packet; 198 int port; 199 int error, v, nthreads = 1; 200 struct td_desc **tp; 201 const char *cause = NULL; 202 int s[MAXSOCK]; 203 int nsock; 204 205 if (argc < 2) 206 usage(); 207 208 memset(&hints, 0, sizeof(hints)); 209 hints.ai_family = PF_UNSPEC; 210 hints.ai_socktype = SOCK_DGRAM; 211 hints.ai_flags = AI_PASSIVE; 212 213 port = strtoul(argv[1], &dummy, 10); 214 if (port < 1 || port > 65535 || *dummy != '\0') 215 usage(); 216 if (argc > 2) 217 nthreads = strtoul(argv[2], &dummy, 10); 218 if (nthreads < 1 || nthreads > 64) 219 usage(); 220 221 packet = malloc(65536); 222 if (packet == NULL) { 223 perror("malloc"); 224 return (-1); 225 } 226 bzero(packet, 65536); 227 228 error = getaddrinfo(NULL, argv[1], &hints, &res0); 229 if (error) { 230 perror(gai_strerror(error)); 231 return (-1); 232 /*NOTREACHED*/ 233 } 234 235 nsock = 0; 236 for (res = res0; res && nsock < MAXSOCK; res = res->ai_next) { 237 s[nsock] = socket(res->ai_family, res->ai_socktype, 238 res->ai_protocol); 239 if (s[nsock] < 0) { 240 cause = "socket"; 241 continue; 242 } 243 244 v = 128 * 1024; 245 if (setsockopt(s[nsock], SOL_SOCKET, SO_RCVBUF, &v, sizeof(v)) < 0) { 246 cause = "SO_RCVBUF"; 247 close(s[nsock]); 248 continue; 249 } 250 if (bind(s[nsock], res->ai_addr, res->ai_addrlen) < 0) { 251 cause = "bind"; 252 close(s[nsock]); 253 continue; 254 } 255 (void) listen(s[nsock], 5); 256 nsock++; 257 } 258 if (nsock == 0) { 259 perror(cause); 260 return (-1); 261 /*NOTREACHED*/ 262 } 263 264 printf("netreceive %d sockets x %d threads listening on UDP port %d\n", 265 nsock, nthreads, (u_short)port); 266 267 tp = make_threads(s, nsock, nthreads); 268 main_thread(tp, nsock, nthreads); 269 270 /*NOTREACHED*/ 271 freeaddrinfo(res0); 272 } 273