xref: /freebsd/contrib/ofed/librdmacm/examples/rstream.c (revision 0e8011faf58b743cc652e3b2ad0f7671227610df)
1 /*
2  * Copyright (c) 2011-2012 Intel Corporation.  All rights reserved.
3  * Copyright (c) 2014-2015 Mellanox Technologies LTD. All rights reserved.
4  *
5  * This software is available to you under the OpenIB.org BSD license
6  * below:
7  *
8  *     Redistribution and use in source and binary forms, with or
9  *     without modification, are permitted provided that the following
10  *     conditions are met:
11  *
12  *      - Redistributions of source code must retain the above
13  *        copyright notice, this list of conditions and the following
14  *        disclaimer.
15  *
16  *      - Redistributions in binary form must reproduce the above
17  *        copyright notice, this list of conditions and the following
18  *        disclaimer in the documentation and/or other materials
19  *        provided with the distribution.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
22  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
23  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
24  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
25  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
26  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
27  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28  * SOFTWARE.
29  */
30 
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <strings.h>
35 #include <errno.h>
36 #include <getopt.h>
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <sys/time.h>
40 #include <sys/wait.h>
41 #include <netdb.h>
42 #include <fcntl.h>
43 #include <unistd.h>
44 #include <netinet/tcp.h>
45 
46 #include <rdma/rdma_cma.h>
47 #include <rdma/rsocket.h>
48 #include <util/compiler.h>
49 #include "common.h"
50 
51 struct test_size_param {
52 	int size;
53 	int option;
54 };
55 
56 static struct test_size_param test_size[] = {
57 	{ 1 <<  6, 0 },
58 	{ 1 <<  7, 1 }, { (1 <<  7) + (1 <<  6), 1},
59 	{ 1 <<  8, 1 }, { (1 <<  8) + (1 <<  7), 1},
60 	{ 1 <<  9, 1 }, { (1 <<  9) + (1 <<  8), 1},
61 	{ 1 << 10, 1 }, { (1 << 10) + (1 <<  9), 1},
62 	{ 1 << 11, 1 }, { (1 << 11) + (1 << 10), 1},
63 	{ 1 << 12, 0 }, { (1 << 12) + (1 << 11), 1},
64 	{ 1 << 13, 1 }, { (1 << 13) + (1 << 12), 1},
65 	{ 1 << 14, 1 }, { (1 << 14) + (1 << 13), 1},
66 	{ 1 << 15, 1 }, { (1 << 15) + (1 << 14), 1},
67 	{ 1 << 16, 0 }, { (1 << 16) + (1 << 15), 1},
68 	{ 1 << 17, 1 }, { (1 << 17) + (1 << 16), 1},
69 	{ 1 << 18, 1 }, { (1 << 18) + (1 << 17), 1},
70 	{ 1 << 19, 1 }, { (1 << 19) + (1 << 18), 1},
71 	{ 1 << 20, 0 }, { (1 << 20) + (1 << 19), 1},
72 	{ 1 << 21, 1 }, { (1 << 21) + (1 << 20), 1},
73 	{ 1 << 22, 1 }, { (1 << 22) + (1 << 21), 1},
74 };
75 #define TEST_CNT (sizeof test_size / sizeof test_size[0])
76 
77 static int rs, lrs;
78 static int use_async;
79 static int use_rgai;
80 static int verify;
81 static int flags = MSG_DONTWAIT;
82 static int poll_timeout = 0;
83 static int custom;
84 static int use_fork;
85 static pid_t fork_pid;
86 static enum rs_optimization optimization;
87 static int size_option;
88 static int iterations = 1;
89 static int transfer_size = 1000;
90 static int transfer_count = 1000;
91 static int buffer_size, inline_size = 64;
92 static char test_name[10] = "custom";
93 static const char *port = "7471";
94 static int keepalive;
95 static char *dst_addr;
96 static char *src_addr;
97 static struct timeval start, end;
98 static void *buf;
99 static struct rdma_addrinfo rai_hints;
100 static struct addrinfo ai_hints;
101 
102 static void show_perf(void)
103 {
104 	char str[32];
105 	float usec;
106 	long long bytes;
107 
108 	usec = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
109 	bytes = (long long) iterations * transfer_count * transfer_size * 2;
110 
111 	/* name size transfers iterations bytes seconds Gb/sec usec/xfer */
112 	printf("%-10s", test_name);
113 	size_str(str, sizeof str, transfer_size);
114 	printf("%-8s", str);
115 	cnt_str(str, sizeof str, transfer_count);
116 	printf("%-8s", str);
117 	cnt_str(str, sizeof str, iterations);
118 	printf("%-8s", str);
119 	size_str(str, sizeof str, bytes);
120 	printf("%-8s", str);
121 	printf("%8.2fs%10.2f%11.2f\n",
122 		usec / 1000000., (bytes * 8) / (1000. * usec),
123 		(usec / iterations) / (transfer_count * 2));
124 }
125 
126 static void init_latency_test(int size)
127 {
128 	char sstr[5];
129 
130 	size_str(sstr, sizeof sstr, size);
131 	snprintf(test_name, sizeof test_name, "%s_lat", sstr);
132 	transfer_count = 1;
133 	transfer_size = size;
134 	iterations = size_to_count(transfer_size);
135 }
136 
137 static void init_bandwidth_test(int size)
138 {
139 	char sstr[5];
140 
141 	size_str(sstr, sizeof sstr, size);
142 	snprintf(test_name, sizeof test_name, "%s_bw", sstr);
143 	iterations = 1;
144 	transfer_size = size;
145 	transfer_count = size_to_count(transfer_size);
146 }
147 
148 static int send_xfer(int size)
149 {
150 	struct pollfd fds;
151 	int offset, ret;
152 
153 	if (verify)
154 		format_buf(buf, size);
155 
156 	if (use_async) {
157 		fds.fd = rs;
158 		fds.events = POLLOUT;
159 	}
160 
161 	for (offset = 0; offset < size; ) {
162 		if (use_async) {
163 			ret = do_poll(&fds, poll_timeout);
164 			if (ret)
165 				return ret;
166 		}
167 
168 		ret = rs_send(rs, buf + offset, size - offset, flags);
169 		if (ret > 0) {
170 			offset += ret;
171 		} else if (errno != EWOULDBLOCK && errno != EAGAIN) {
172 			perror("rsend");
173 			return ret;
174 		}
175 	}
176 
177 	return 0;
178 }
179 
180 static int recv_xfer(int size)
181 {
182 	struct pollfd fds;
183 	int offset, ret;
184 
185 	if (use_async) {
186 		fds.fd = rs;
187 		fds.events = POLLIN;
188 	}
189 
190 	for (offset = 0; offset < size; ) {
191 		if (use_async) {
192 			ret = do_poll(&fds, poll_timeout);
193 			if (ret)
194 				return ret;
195 		}
196 
197 		ret = rs_recv(rs, buf + offset, size - offset, flags);
198 		if (ret > 0) {
199 			offset += ret;
200 		} else if (errno != EWOULDBLOCK && errno != EAGAIN) {
201 			perror("rrecv");
202 			return ret;
203 		}
204 	}
205 
206 	if (verify) {
207 		ret = verify_buf(buf, size);
208 		if (ret)
209 			return ret;
210 	}
211 
212 	return 0;
213 }
214 
215 static int sync_test(void)
216 {
217 	int ret;
218 
219 	ret = dst_addr ? send_xfer(16) : recv_xfer(16);
220 	if (ret)
221 		return ret;
222 
223 	return dst_addr ? recv_xfer(16) : send_xfer(16);
224 }
225 
226 static int run_test(void)
227 {
228 	int ret, i, t;
229 
230 	ret = sync_test();
231 	if (ret)
232 		goto out;
233 
234 	gettimeofday(&start, NULL);
235 	for (i = 0; i < iterations; i++) {
236 		for (t = 0; t < transfer_count; t++) {
237 			ret = dst_addr ? send_xfer(transfer_size) :
238 					 recv_xfer(transfer_size);
239 			if (ret)
240 				goto out;
241 		}
242 
243 		for (t = 0; t < transfer_count; t++) {
244 			ret = dst_addr ? recv_xfer(transfer_size) :
245 					 send_xfer(transfer_size);
246 			if (ret)
247 				goto out;
248 		}
249 	}
250 	gettimeofday(&end, NULL);
251 	show_perf();
252 	ret = 0;
253 
254 out:
255 	return ret;
256 }
257 
258 static void set_keepalive(int fd)
259 {
260 	int optval;
261 	socklen_t optlen = sizeof(optlen);
262 
263 	optval = 1;
264 	if (rs_setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen)) {
265 		perror("rsetsockopt SO_KEEPALIVE");
266 		return;
267 	}
268 
269 	optval = keepalive;
270 	if (rs_setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, optlen))
271 		perror("rsetsockopt TCP_KEEPIDLE");
272 
273 	if (!(rs_getsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &optval, &optlen)))
274 		printf("Keepalive: %s\n", (optval ? "ON" : "OFF"));
275 
276 	if (!(rs_getsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, &optlen)))
277 		printf("  time: %i\n", optval);
278 }
279 
280 static void set_options(int fd)
281 {
282 	int val;
283 
284 	if (buffer_size) {
285 		rs_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &buffer_size,
286 			      sizeof buffer_size);
287 		rs_setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &buffer_size,
288 			      sizeof buffer_size);
289 	} else {
290 		val = 1 << 19;
291 		rs_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &val, sizeof val);
292 		rs_setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &val, sizeof val);
293 	}
294 
295 	val = 1;
296 	rs_setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void *) &val, sizeof(val));
297 
298 	if (flags & MSG_DONTWAIT)
299 		rs_fcntl(fd, F_SETFL, O_NONBLOCK);
300 
301 	if (use_rs) {
302 		/* Inline size based on experimental data */
303 		if (optimization == opt_latency) {
304 			rs_setsockopt(fd, SOL_RDMA, RDMA_INLINE, &inline_size,
305 				      sizeof inline_size);
306 		} else if (optimization == opt_bandwidth) {
307 			val = 0;
308 			rs_setsockopt(fd, SOL_RDMA, RDMA_INLINE, &val, sizeof val);
309 		}
310 	}
311 
312 	if (keepalive)
313 		set_keepalive(fd);
314 }
315 
316 static int server_listen(void)
317 {
318 	struct rdma_addrinfo *rai = NULL;
319 	struct addrinfo *ai;
320 	int val, ret;
321 
322 	if (use_rgai) {
323 		rai_hints.ai_flags |= RAI_PASSIVE;
324 		ret = rdma_getaddrinfo(src_addr, port, &rai_hints, &rai);
325 	} else {
326 		ai_hints.ai_flags |= AI_PASSIVE;
327 		ret = getaddrinfo(src_addr, port, &ai_hints, &ai);
328 	}
329 	if (ret) {
330 		printf("getaddrinfo: %s\n", gai_strerror(ret));
331 		return ret;
332 	}
333 
334 	lrs = rai ? rs_socket(rai->ai_family, SOCK_STREAM, 0) :
335 		    rs_socket(ai->ai_family, SOCK_STREAM, 0);
336 	if (lrs < 0) {
337 		perror("rsocket");
338 		ret = lrs;
339 		goto free;
340 	}
341 
342 	val = 1;
343 	ret = rs_setsockopt(lrs, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
344 	if (ret) {
345 		perror("rsetsockopt SO_REUSEADDR");
346 		goto close;
347 	}
348 
349 	ret = rai ? rs_bind(lrs, rai->ai_src_addr, rai->ai_src_len) :
350 		    rs_bind(lrs, ai->ai_addr, ai->ai_addrlen);
351 	if (ret) {
352 		perror("rbind");
353 		goto close;
354 	}
355 
356 	ret = rs_listen(lrs, 1);
357 	if (ret)
358 		perror("rlisten");
359 
360 close:
361 	if (ret)
362 		rs_close(lrs);
363 free:
364 	if (rai)
365 		rdma_freeaddrinfo(rai);
366 	else
367 		freeaddrinfo(ai);
368 	return ret;
369 }
370 
371 static int server_connect(void)
372 {
373 	struct pollfd fds;
374 	int ret = 0;
375 
376 	set_options(lrs);
377 	do {
378 		if (use_async) {
379 			fds.fd = lrs;
380 			fds.events = POLLIN;
381 
382 			ret = do_poll(&fds, poll_timeout);
383 			if (ret) {
384 				perror("rpoll");
385 				return ret;
386 			}
387 		}
388 
389 		rs = rs_accept(lrs, NULL, NULL);
390 	} while (rs < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
391 	if (rs < 0) {
392 		perror("raccept");
393 		return rs;
394 	}
395 
396 	if (use_fork)
397 		fork_pid = fork();
398 	if (!fork_pid)
399 		set_options(rs);
400 	return ret;
401 }
402 
403 static int client_connect(void)
404 {
405 	struct rdma_addrinfo *rai = NULL, *rai_src = NULL;
406 	struct addrinfo *ai, *ai_src;
407 	struct pollfd fds;
408 	int ret, err;
409 	socklen_t len;
410 
411 	ret = use_rgai ? rdma_getaddrinfo(dst_addr, port, &rai_hints, &rai) :
412 			 getaddrinfo(dst_addr, port, &ai_hints, &ai);
413 
414 	if (ret) {
415 		printf("getaddrinfo: %s\n", gai_strerror(ret));
416 		return ret;
417 	}
418 
419 	if (src_addr) {
420 		if (use_rgai) {
421 			rai_hints.ai_flags |= RAI_PASSIVE;
422 			ret = rdma_getaddrinfo(src_addr, port, &rai_hints, &rai_src);
423 		} else {
424 			ai_hints.ai_flags |= AI_PASSIVE;
425 			ret = getaddrinfo(src_addr, port, &ai_hints, &ai_src);
426 		}
427 		if (ret) {
428 			printf("getaddrinfo src_addr: %s\n", gai_strerror(ret));
429 			return ret;
430 		}
431 	}
432 
433 	rs = rai ? rs_socket(rai->ai_family, SOCK_STREAM, 0) :
434 		   rs_socket(ai->ai_family, SOCK_STREAM, 0);
435 	if (rs < 0) {
436 		perror("rsocket");
437 		ret = rs;
438 		goto free;
439 	}
440 
441 	set_options(rs);
442 
443 	if (src_addr) {
444 		ret = rai ? rs_bind(rs, rai_src->ai_src_addr, rai_src->ai_src_len) :
445 			    rs_bind(rs, ai_src->ai_addr, ai_src->ai_addrlen);
446 		if (ret) {
447 			perror("rbind");
448 			goto close;
449 		}
450 	}
451 
452 	if (rai && rai->ai_route) {
453 		ret = rs_setsockopt(rs, SOL_RDMA, RDMA_ROUTE, rai->ai_route,
454 				    rai->ai_route_len);
455 		if (ret) {
456 			perror("rsetsockopt RDMA_ROUTE");
457 			goto close;
458 		}
459 	}
460 
461 	ret = rai ? rs_connect(rs, rai->ai_dst_addr, rai->ai_dst_len) :
462 		    rs_connect(rs, ai->ai_addr, ai->ai_addrlen);
463 	if (ret && (errno != EINPROGRESS)) {
464 		perror("rconnect");
465 		goto close;
466 	}
467 
468 	if (ret && (errno == EINPROGRESS)) {
469 		fds.fd = rs;
470 		fds.events = POLLOUT;
471 		ret = do_poll(&fds, poll_timeout);
472 		if (ret) {
473 			perror("rpoll");
474 			goto close;
475 		}
476 
477 		len = sizeof err;
478 		ret = rs_getsockopt(rs, SOL_SOCKET, SO_ERROR, &err, &len);
479 		if (ret)
480 			goto close;
481 		if (err) {
482 			ret = -1;
483 			errno = err;
484 			perror("async rconnect");
485 		}
486 	}
487 
488 close:
489 	if (ret)
490 		rs_close(rs);
491 free:
492 	if (rai)
493 		rdma_freeaddrinfo(rai);
494 	else
495 		freeaddrinfo(ai);
496 	return ret;
497 }
498 
499 static int run(void)
500 {
501 	int i, ret = 0;
502 
503 	buf = malloc(!custom ? test_size[TEST_CNT - 1].size : transfer_size);
504 	if (!buf) {
505 		perror("malloc");
506 		return -1;
507 	}
508 
509 	if (!dst_addr) {
510 		ret = server_listen();
511 		if (ret)
512 			goto free;
513 	}
514 
515 	printf("%-10s%-8s%-8s%-8s%-8s%8s %10s%13s\n",
516 	       "name", "bytes", "xfers", "iters", "total", "time", "Gb/sec", "usec/xfer");
517 	if (!custom) {
518 		optimization = opt_latency;
519 		ret = dst_addr ? client_connect() : server_connect();
520 		if (ret)
521 			goto free;
522 
523 		for (i = 0; i < TEST_CNT && !fork_pid; i++) {
524 			if (test_size[i].option > size_option)
525 				continue;
526 			init_latency_test(test_size[i].size);
527 			run_test();
528 		}
529 		if (fork_pid)
530 			waitpid(fork_pid, NULL, 0);
531 		else
532 			rs_shutdown(rs, SHUT_RDWR);
533 		rs_close(rs);
534 
535 		if (!dst_addr && use_fork && !fork_pid)
536 			goto free;
537 
538 		optimization = opt_bandwidth;
539 		ret = dst_addr ? client_connect() : server_connect();
540 		if (ret)
541 			goto free;
542 		for (i = 0; i < TEST_CNT && !fork_pid; i++) {
543 			if (test_size[i].option > size_option)
544 				continue;
545 			init_bandwidth_test(test_size[i].size);
546 			run_test();
547 		}
548 	} else {
549 		ret = dst_addr ? client_connect() : server_connect();
550 		if (ret)
551 			goto free;
552 
553 		if (!fork_pid)
554 			ret = run_test();
555 	}
556 
557 	if (fork_pid)
558 		waitpid(fork_pid, NULL, 0);
559 	else
560 		rs_shutdown(rs, SHUT_RDWR);
561 	rs_close(rs);
562 free:
563 	free(buf);
564 	return ret;
565 }
566 
567 static int set_test_opt(const char *arg)
568 {
569 	if (strlen(arg) == 1) {
570 		switch (arg[0]) {
571 		case 's':
572 			use_rs = 0;
573 			break;
574 		case 'a':
575 			use_async = 1;
576 			break;
577 		case 'b':
578 			flags = (flags & ~MSG_DONTWAIT) | MSG_WAITALL;
579 			break;
580 		case 'f':
581 			use_fork = 1;
582 			use_rs = 0;
583 			break;
584 		case 'n':
585 			flags |= MSG_DONTWAIT;
586 			break;
587 		case 'r':
588 			use_rgai = 1;
589 			break;
590 		case 'v':
591 			verify = 1;
592 			break;
593 		default:
594 			return -1;
595 		}
596 	} else {
597 		if (!strncasecmp("socket", arg, 6)) {
598 			use_rs = 0;
599 		} else if (!strncasecmp("async", arg, 5)) {
600 			use_async = 1;
601 		} else if (!strncasecmp("block", arg, 5)) {
602 			flags = (flags & ~MSG_DONTWAIT) | MSG_WAITALL;
603 		} else if (!strncasecmp("nonblock", arg, 8)) {
604 			flags |= MSG_DONTWAIT;
605 		} else if (!strncasecmp("resolve", arg, 7)) {
606 			use_rgai = 1;
607 		} else if (!strncasecmp("verify", arg, 6)) {
608 			verify = 1;
609 		} else if (!strncasecmp("fork", arg, 4)) {
610 			use_fork = 1;
611 			use_rs = 0;
612 		} else {
613 			return -1;
614 		}
615 	}
616 	return 0;
617 }
618 
619 int main(int argc, char **argv)
620 {
621 	int op, ret;
622 
623 	ai_hints.ai_socktype = SOCK_STREAM;
624 	rai_hints.ai_port_space = RDMA_PS_TCP;
625 	while ((op = getopt(argc, argv, "s:b:f:B:i:I:C:S:p:k:T:")) != -1) {
626 		switch (op) {
627 		case 's':
628 			dst_addr = optarg;
629 			break;
630 		case 'b':
631 			src_addr = optarg;
632 			break;
633 		case 'f':
634 			if (!strncasecmp("ip", optarg, 2)) {
635 				ai_hints.ai_flags = AI_NUMERICHOST;
636 			} else if (!strncasecmp("gid", optarg, 3)) {
637 				rai_hints.ai_flags = RAI_NUMERICHOST | RAI_FAMILY;
638 				rai_hints.ai_family = AF_IB;
639 				use_rgai = 1;
640 			} else {
641 				fprintf(stderr, "Warning: unknown address format\n");
642 			}
643 			break;
644 		case 'B':
645 			buffer_size = atoi(optarg);
646 			break;
647 		case 'i':
648 			inline_size = atoi(optarg);
649 			break;
650 		case 'I':
651 			custom = 1;
652 			iterations = atoi(optarg);
653 			break;
654 		case 'C':
655 			custom = 1;
656 			transfer_count = atoi(optarg);
657 			break;
658 		case 'S':
659 			if (!strncasecmp("all", optarg, 3)) {
660 				size_option = 1;
661 			} else {
662 				custom = 1;
663 				transfer_size = atoi(optarg);
664 			}
665 			break;
666 		case 'p':
667 			port = optarg;
668 			break;
669 		case 'k':
670 			keepalive = atoi(optarg);
671 			break;
672 		case 'T':
673 			if (!set_test_opt(optarg))
674 				break;
675 			/* invalid option - fall through */
676 			SWITCH_FALLTHROUGH;
677 		default:
678 			printf("usage: %s\n", argv[0]);
679 			printf("\t[-s server_address]\n");
680 			printf("\t[-b bind_address]\n");
681 			printf("\t[-f address_format]\n");
682 			printf("\t    name, ip, ipv6, or gid\n");
683 			printf("\t[-B buffer_size]\n");
684 			printf("\t[-i inline_size]\n");
685 			printf("\t[-I iterations]\n");
686 			printf("\t[-C transfer_count]\n");
687 			printf("\t[-S transfer_size or all]\n");
688 			printf("\t[-p port_number]\n");
689 			printf("\t[-k keepalive_time]\n");
690 			printf("\t[-T test_option]\n");
691 			printf("\t    s|sockets - use standard tcp/ip sockets\n");
692 			printf("\t    a|async - asynchronous operation (use poll)\n");
693 			printf("\t    b|blocking - use blocking calls\n");
694 			printf("\t    f|fork - fork server processing\n");
695 			printf("\t    n|nonblocking - use nonblocking calls\n");
696 			printf("\t    r|resolve - use rdma cm to resolve address\n");
697 			printf("\t    v|verify - verify data\n");
698 			exit(1);
699 		}
700 	}
701 
702 	if (!(flags & MSG_DONTWAIT))
703 		poll_timeout = -1;
704 
705 	ret = run();
706 	return ret;
707 }
708