xref: /freebsd/contrib/ofed/librdmacm/examples/riostream.c (revision a90b9d0159070121c221b966469c3e36d912bf82)
1 /*
2  * Copyright (c) 2011-2012 Intel Corporation.  All rights reserved.
3  * Copyright (c) 2014 Mellanox Technologies LTD. All rights reserved.
4  *
5  * This software is available to you under the OpenIB.org BSD license
6  * below:
7  *
8  *     Redistribution and use in source and binary forms, with or
9  *     without modification, are permitted provided that the following
10  *     conditions are met:
11  *
12  *      - Redistributions of source code must retain the above
13  *        copyright notice, this list of conditions and the following
14  *        disclaimer.
15  *
16  *      - Redistributions in binary form must reproduce the above
17  *        copyright notice, this list of conditions and the following
18  *        disclaimer in the documentation and/or other materials
19  *        provided with the distribution.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
22  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
23  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
24  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
25  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
26  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
27  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28  * SOFTWARE.
29  */
30 
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <strings.h>
35 #include <errno.h>
36 #include <getopt.h>
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <sys/time.h>
40 #include <sys/wait.h>
41 #include <netdb.h>
42 #include <fcntl.h>
43 #include <unistd.h>
44 #include <netinet/tcp.h>
45 
46 #include <rdma/rdma_cma.h>
47 #include <rdma/rsocket.h>
48 #include <util/compiler.h>
49 #include "common.h"
50 
51 struct test_size_param {
52 	int size;
53 	int option;
54 };
55 
56 static struct test_size_param test_size[] = {
57 	{ 1 <<  6, 0 },
58 	{ 1 <<  7, 1 }, { (1 <<  7) + (1 <<  6), 1},
59 	{ 1 <<  8, 1 }, { (1 <<  8) + (1 <<  7), 1},
60 	{ 1 <<  9, 1 }, { (1 <<  9) + (1 <<  8), 1},
61 	{ 1 << 10, 1 }, { (1 << 10) + (1 <<  9), 1},
62 	{ 1 << 11, 1 }, { (1 << 11) + (1 << 10), 1},
63 	{ 1 << 12, 0 }, { (1 << 12) + (1 << 11), 1},
64 	{ 1 << 13, 1 }, { (1 << 13) + (1 << 12), 1},
65 	{ 1 << 14, 1 }, { (1 << 14) + (1 << 13), 1},
66 	{ 1 << 15, 1 }, { (1 << 15) + (1 << 14), 1},
67 	{ 1 << 16, 0 }, { (1 << 16) + (1 << 15), 1},
68 	{ 1 << 17, 1 }, { (1 << 17) + (1 << 16), 1},
69 	{ 1 << 18, 1 }, { (1 << 18) + (1 << 17), 1},
70 	{ 1 << 19, 1 }, { (1 << 19) + (1 << 18), 1},
71 	{ 1 << 20, 0 }, { (1 << 20) + (1 << 19), 1},
72 	{ 1 << 21, 1 }, { (1 << 21) + (1 << 20), 1},
73 	{ 1 << 22, 1 }, { (1 << 22) + (1 << 21), 1},
74 };
75 #define TEST_CNT (sizeof test_size / sizeof test_size[0])
76 
77 static int rs, lrs;
78 static int use_async;
79 static int use_rgai;
80 static int verify;
81 static int flags = MSG_DONTWAIT;
82 static int poll_timeout = 0;
83 static int custom;
84 static enum rs_optimization optimization;
85 static int size_option;
86 static int iterations = 1;
87 static int transfer_size = 1000;
88 static int transfer_count = 1000;
89 static int buffer_size, inline_size = 64;
90 static char test_name[10] = "custom";
91 static const char *port = "7471";
92 static char *dst_addr;
93 static char *src_addr;
94 static struct timeval start, end;
95 static void *buf;
96 static volatile uint8_t *poll_byte;
97 static struct rdma_addrinfo rai_hints;
98 static struct addrinfo ai_hints;
99 
100 static void show_perf(void)
101 {
102 	char str[32];
103 	float usec;
104 	long long bytes;
105 
106 	usec = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
107 	bytes = (long long) iterations * transfer_count * transfer_size * 2;
108 
109 	/* name size transfers iterations bytes seconds Gb/sec usec/xfer */
110 	printf("%-10s", test_name);
111 	size_str(str, sizeof str, transfer_size);
112 	printf("%-8s", str);
113 	cnt_str(str, sizeof str, transfer_count);
114 	printf("%-8s", str);
115 	cnt_str(str, sizeof str, iterations);
116 	printf("%-8s", str);
117 	size_str(str, sizeof str, bytes);
118 	printf("%-8s", str);
119 	printf("%8.2fs%10.2f%11.2f\n",
120 		usec / 1000000., (bytes * 8) / (1000. * usec),
121 		(usec / iterations) / (transfer_count * 2));
122 }
123 
124 static void init_latency_test(int size)
125 {
126 	char sstr[5];
127 
128 	size_str(sstr, sizeof sstr, size);
129 	snprintf(test_name, sizeof test_name, "%s_lat", sstr);
130 	transfer_count = 1;
131 	transfer_size = size;
132 	iterations = size_to_count(transfer_size);
133 }
134 
135 static void init_bandwidth_test(int size)
136 {
137 	char sstr[5];
138 
139 	size_str(sstr, sizeof sstr, size);
140 	snprintf(test_name, sizeof test_name, "%s_bw", sstr);
141 	iterations = 1;
142 	transfer_size = size;
143 	transfer_count = size_to_count(transfer_size);
144 }
145 
146 static int send_msg(int size)
147 {
148 	struct pollfd fds;
149 	int offset, ret;
150 
151 	if (use_async) {
152 		fds.fd = rs;
153 		fds.events = POLLOUT;
154 	}
155 
156 	for (offset = 0; offset < size; ) {
157 		if (use_async) {
158 			ret = do_poll(&fds, poll_timeout);
159 			if (ret)
160 				return ret;
161 		}
162 
163 		ret = rsend(rs, buf + offset, size - offset, flags);
164 		if (ret > 0) {
165 			offset += ret;
166 		} else if (errno != EWOULDBLOCK && errno != EAGAIN) {
167 			perror("rsend");
168 			return ret;
169 		}
170 	}
171 
172 	return 0;
173 }
174 
175 static int send_xfer(int size)
176 {
177 	struct pollfd fds;
178 	int offset, ret;
179 
180 	if (use_async) {
181 		fds.fd = rs;
182 		fds.events = POLLOUT;
183 	}
184 
185 	for (offset = 0; offset < size; ) {
186 		if (use_async) {
187 			ret = do_poll(&fds, poll_timeout);
188 			if (ret)
189 				return ret;
190 		}
191 
192 		ret = riowrite(rs, buf + offset, size - offset, offset, flags);
193 		if (ret > 0) {
194 			offset += ret;
195 		} else if (errno != EWOULDBLOCK && errno != EAGAIN) {
196 			perror("riowrite");
197 			return ret;
198 		}
199 	}
200 
201 	return 0;
202 }
203 
204 static int recv_msg(int size)
205 {
206 	struct pollfd fds;
207 	int offset, ret;
208 
209 	if (use_async) {
210 		fds.fd = rs;
211 		fds.events = POLLIN;
212 	}
213 
214 	for (offset = 0; offset < size; ) {
215 		if (use_async) {
216 			ret = do_poll(&fds, poll_timeout);
217 			if (ret)
218 				return ret;
219 		}
220 
221 		ret = rrecv(rs, buf + offset, size - offset, flags);
222 		if (ret > 0) {
223 			offset += ret;
224 		} else if (errno != EWOULDBLOCK && errno != EAGAIN) {
225 			perror("rrecv");
226 			return ret;
227 		}
228 	}
229 
230 	return 0;
231 }
232 
233 static int recv_xfer(int size, uint8_t marker)
234 {
235 	int ret;
236 
237 	while (*poll_byte != marker)
238 		;
239 
240 	if (verify) {
241 		ret = verify_buf(buf, size - 1);
242 		if (ret)
243 			return ret;
244 	}
245 
246 	return 0;
247 }
248 
249 static int sync_test(void)
250 {
251 	int ret;
252 
253 	ret = dst_addr ? send_msg(16) : recv_msg(16);
254 	if (ret)
255 		return ret;
256 
257 	return dst_addr ? recv_msg(16) : send_msg(16);
258 }
259 
260 static int run_test(void)
261 {
262 	int ret, i, t;
263 	off_t offset;
264 	uint8_t marker = 0;
265 
266 	poll_byte = buf + transfer_size - 1;
267 	*poll_byte = -1;
268 	offset = riomap(rs, buf, transfer_size, PROT_WRITE, 0, 0);
269 	if (offset ==  -1) {
270 		perror("riomap");
271 		ret = -1;
272 		goto out;
273 	}
274 	ret = sync_test();
275 	if (ret)
276 		goto out;
277 
278 	gettimeofday(&start, NULL);
279 	for (i = 0; i < iterations; i++) {
280 		if (dst_addr) {
281 			for (t = 0; t < transfer_count - 1; t++) {
282 				ret = send_xfer(transfer_size);
283 				if (ret)
284 					goto out;
285 			}
286 			*poll_byte = (uint8_t) marker++;
287 			if (verify)
288 				format_buf(buf, transfer_size - 1);
289 			ret = send_xfer(transfer_size);
290 			if (ret)
291 				goto out;
292 
293 			ret = recv_xfer(transfer_size, marker++);
294 		} else {
295 			ret = recv_xfer(transfer_size, marker++);
296 			if (ret)
297 				goto out;
298 
299 			for (t = 0; t < transfer_count - 1; t++) {
300 				ret = send_xfer(transfer_size);
301 				if (ret)
302 					goto out;
303 			}
304 			*poll_byte = (uint8_t) marker++;
305 			if (verify)
306 				format_buf(buf, transfer_size - 1);
307 			ret = send_xfer(transfer_size);
308 		}
309 		if (ret)
310 			goto out;
311 	}
312 	gettimeofday(&end, NULL);
313 	show_perf();
314 	ret = riounmap(rs, buf, transfer_size);
315 
316 out:
317 	return ret;
318 }
319 
320 static void set_options(int fd)
321 {
322 	int val;
323 
324 	if (buffer_size) {
325 		rsetsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &buffer_size,
326 			    sizeof buffer_size);
327 		rsetsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &buffer_size,
328 			    sizeof buffer_size);
329 	} else {
330 		val = 1 << 19;
331 		rsetsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &val, sizeof val);
332 		rsetsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &val, sizeof val);
333 	}
334 
335 	val = 1;
336 	rsetsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void *) &val, sizeof(val));
337 	rsetsockopt(fd, SOL_RDMA, RDMA_IOMAPSIZE, (void *) &val, sizeof val);
338 
339 	if (flags & MSG_DONTWAIT)
340 		rfcntl(fd, F_SETFL, O_NONBLOCK);
341 
342 	/* Inline size based on experimental data */
343 	if (optimization == opt_latency) {
344 		rsetsockopt(fd, SOL_RDMA, RDMA_INLINE, &inline_size,
345 			    sizeof inline_size);
346 	} else if (optimization == opt_bandwidth) {
347 		val = 0;
348 		rsetsockopt(fd, SOL_RDMA, RDMA_INLINE, &val, sizeof val);
349 	}
350 }
351 
352 static int server_listen(void)
353 {
354 	struct rdma_addrinfo *rai = NULL;
355 	struct addrinfo *ai;
356 	int val, ret;
357 
358 	if (use_rgai) {
359 		rai_hints.ai_flags |= RAI_PASSIVE;
360 		ret = rdma_getaddrinfo(src_addr, port, &rai_hints, &rai);
361 	} else {
362 		ai_hints.ai_flags |= AI_PASSIVE;
363 		ret = getaddrinfo(src_addr, port, &ai_hints, &ai);
364 	}
365 	if (ret) {
366 		printf("getaddrinfo: %s\n", gai_strerror(ret));
367 		return ret;
368 	}
369 
370 	lrs = rai ? rsocket(rai->ai_family, SOCK_STREAM, 0) :
371 		    rsocket(ai->ai_family, SOCK_STREAM, 0);
372 	if (lrs < 0) {
373 		perror("rsocket");
374 		ret = lrs;
375 		goto free;
376 	}
377 
378 	val = 1;
379 	ret = rsetsockopt(lrs, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
380 	if (ret) {
381 		perror("rsetsockopt SO_REUSEADDR");
382 		goto close;
383 	}
384 
385 	ret = rai ? rbind(lrs, rai->ai_src_addr, rai->ai_src_len) :
386 		    rbind(lrs, ai->ai_addr, ai->ai_addrlen);
387 	if (ret) {
388 		perror("rbind");
389 		goto close;
390 	}
391 
392 	ret = rlisten(lrs, 1);
393 	if (ret)
394 		perror("rlisten");
395 
396 close:
397 	if (ret)
398 		rclose(lrs);
399 free:
400 	if (rai)
401 		rdma_freeaddrinfo(rai);
402 	else
403 		freeaddrinfo(ai);
404 	return ret;
405 }
406 
407 static int server_connect(void)
408 {
409 	struct pollfd fds;
410 	int ret = 0;
411 
412 	set_options(lrs);
413 	do {
414 		if (use_async) {
415 			fds.fd = lrs;
416 			fds.events = POLLIN;
417 
418 			ret = do_poll(&fds, poll_timeout);
419 			if (ret) {
420 				perror("rpoll");
421 				return ret;
422 			}
423 		}
424 
425 		rs = raccept(lrs, NULL, NULL);
426 	} while (rs < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
427 	if (rs < 0) {
428 		perror("raccept");
429 		return rs;
430 	}
431 
432 	set_options(rs);
433 	return ret;
434 }
435 
436 static int client_connect(void)
437 {
438 	struct rdma_addrinfo *rai = NULL;
439 	struct addrinfo *ai;
440 	struct pollfd fds;
441 	int ret, err;
442 	socklen_t len;
443 
444 	ret = use_rgai ? rdma_getaddrinfo(dst_addr, port, &rai_hints, &rai) :
445 			 getaddrinfo(dst_addr, port, &ai_hints, &ai);
446 	if (ret) {
447 		printf("getaddrinfo: %s\n", gai_strerror(ret));
448 		return ret;
449 	}
450 
451 	rs = rai ? rsocket(rai->ai_family, SOCK_STREAM, 0) :
452 		   rsocket(ai->ai_family, SOCK_STREAM, 0);
453 	if (rs < 0) {
454 		perror("rsocket");
455 		ret = rs;
456 		goto free;
457 	}
458 
459 	set_options(rs);
460 	/* TODO: bind client to src_addr */
461 
462 	ret = rai ? rconnect(rs, rai->ai_dst_addr, rai->ai_dst_len) :
463 		    rconnect(rs, ai->ai_addr, ai->ai_addrlen);
464 	if (ret && (errno != EINPROGRESS)) {
465 		perror("rconnect");
466 		goto close;
467 	}
468 
469 	if (ret && (errno == EINPROGRESS)) {
470 		fds.fd = rs;
471 		fds.events = POLLOUT;
472 		ret = do_poll(&fds, poll_timeout);
473 		if (ret) {
474 			perror("rpoll");
475 			goto close;
476 		}
477 
478 		len = sizeof err;
479 		ret = rgetsockopt(rs, SOL_SOCKET, SO_ERROR, &err, &len);
480 		if (ret)
481 			goto close;
482 		if (err) {
483 			ret = -1;
484 			errno = err;
485 			perror("async rconnect");
486 		}
487 	}
488 
489 close:
490 	if (ret)
491 		rclose(rs);
492 free:
493 	if (rai)
494 		rdma_freeaddrinfo(rai);
495 	else
496 		freeaddrinfo(ai);
497 	return ret;
498 }
499 
500 static int run(void)
501 {
502 	int i, ret = 0;
503 
504 	buf = malloc(!custom ? test_size[TEST_CNT - 1].size : transfer_size);
505 	if (!buf) {
506 		perror("malloc");
507 		return -1;
508 	}
509 
510 	if (!dst_addr) {
511 		ret = server_listen();
512 		if (ret)
513 			goto free;
514 	}
515 
516 	printf("%-10s%-8s%-8s%-8s%-8s%8s %10s%13s\n",
517 	       "name", "bytes", "xfers", "iters", "total", "time", "Gb/sec", "usec/xfer");
518 	if (!custom) {
519 		optimization = opt_latency;
520 		ret = dst_addr ? client_connect() : server_connect();
521 		if (ret)
522 			goto free;
523 
524 		for (i = 0; i < TEST_CNT; i++) {
525 			if (test_size[i].option > size_option)
526 				continue;
527 			init_latency_test(test_size[i].size);
528 			run_test();
529 		}
530 		rshutdown(rs, SHUT_RDWR);
531 		rclose(rs);
532 
533 		optimization = opt_bandwidth;
534 		ret = dst_addr ? client_connect() : server_connect();
535 		if (ret)
536 			goto free;
537 		for (i = 0; i < TEST_CNT; i++) {
538 			if (test_size[i].option > size_option)
539 				continue;
540 			init_bandwidth_test(test_size[i].size);
541 			run_test();
542 		}
543 	} else {
544 		ret = dst_addr ? client_connect() : server_connect();
545 		if (ret)
546 			goto free;
547 
548 		ret = run_test();
549 	}
550 
551 	rshutdown(rs, SHUT_RDWR);
552 	rclose(rs);
553 free:
554 	free(buf);
555 	return ret;
556 }
557 
558 static int set_test_opt(const char *arg)
559 {
560 	if (strlen(arg) == 1) {
561 		switch (arg[0]) {
562 		case 'a':
563 			use_async = 1;
564 			break;
565 		case 'b':
566 			flags = (flags & ~MSG_DONTWAIT) | MSG_WAITALL;
567 			break;
568 		case 'n':
569 			flags |= MSG_DONTWAIT;
570 			break;
571 		case 'v':
572 			verify = 1;
573 			break;
574 		default:
575 			return -1;
576 		}
577 	} else {
578 		if (!strncasecmp("async", arg, 5)) {
579 			use_async = 1;
580 		} else if (!strncasecmp("block", arg, 5)) {
581 			flags = (flags & ~MSG_DONTWAIT) | MSG_WAITALL;
582 		} else if (!strncasecmp("nonblock", arg, 8)) {
583 			flags |= MSG_DONTWAIT;
584 		} else if (!strncasecmp("verify", arg, 6)) {
585 			verify = 1;
586 		} else {
587 			return -1;
588 		}
589 	}
590 	return 0;
591 }
592 
593 int main(int argc, char **argv)
594 {
595 	int op, ret;
596 
597 	ai_hints.ai_socktype = SOCK_STREAM;
598 	rai_hints.ai_port_space = RDMA_PS_TCP;
599 	while ((op = getopt(argc, argv, "s:b:f:B:i:I:C:S:p:T:")) != -1) {
600 		switch (op) {
601 		case 's':
602 			dst_addr = optarg;
603 			break;
604 		case 'b':
605 			src_addr = optarg;
606 			break;
607 		case 'f':
608 			if (!strncasecmp("ip", optarg, 2)) {
609 				ai_hints.ai_flags = AI_NUMERICHOST;
610 			} else if (!strncasecmp("gid", optarg, 3)) {
611 				rai_hints.ai_flags = RAI_NUMERICHOST | RAI_FAMILY;
612 				rai_hints.ai_family = AF_IB;
613 				use_rgai = 1;
614 			} else {
615 				fprintf(stderr, "Warning: unknown address format\n");
616 			}
617 			break;
618 		case 'B':
619 			buffer_size = atoi(optarg);
620 			break;
621 		case 'i':
622 			inline_size = atoi(optarg);
623 			break;
624 		case 'I':
625 			custom = 1;
626 			iterations = atoi(optarg);
627 			break;
628 		case 'C':
629 			custom = 1;
630 			transfer_count = atoi(optarg);
631 			break;
632 		case 'S':
633 			if (!strncasecmp("all", optarg, 3)) {
634 				size_option = 1;
635 			} else {
636 				custom = 1;
637 				transfer_size = atoi(optarg);
638 			}
639 			break;
640 		case 'p':
641 			port = optarg;
642 			break;
643 		case 'T':
644 			if (!set_test_opt(optarg))
645 				break;
646 			/* invalid option - fall through */
647 			SWITCH_FALLTHROUGH;
648 		default:
649 			printf("usage: %s\n", argv[0]);
650 			printf("\t[-s server_address]\n");
651 			printf("\t[-b bind_address]\n");
652 			printf("\t[-f address_format]\n");
653 			printf("\t    name, ip, ipv6, or gid\n");
654 			printf("\t[-B buffer_size]\n");
655 			printf("\t[-i inline_size]\n");
656 			printf("\t[-I iterations]\n");
657 			printf("\t[-C transfer_count]\n");
658 			printf("\t[-S transfer_size or all]\n");
659 			printf("\t[-p port_number]\n");
660 			printf("\t[-T test_option]\n");
661 			printf("\t    a|async - asynchronous operation (use poll)\n");
662 			printf("\t    b|blocking - use blocking calls\n");
663 			printf("\t    n|nonblocking - use nonblocking calls\n");
664 			printf("\t    v|verify - verify data\n");
665 			exit(1);
666 		}
667 	}
668 
669 	if (!(flags & MSG_DONTWAIT))
670 		poll_timeout = -1;
671 
672 	ret = run();
673 	return ret;
674 }
675