xref: /freebsd/contrib/ofed/librdmacm/examples/rping.c (revision a812392203d7c4c3f0db9d8a0f3391374c49c71f)
1 /*
2  * Copyright (c) 2005 Ammasso, Inc. All rights reserved.
3  * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the
9  * OpenIB.org BSD license below:
10  *
11  *     Redistribution and use in source and binary forms, with or
12  *     without modification, are permitted provided that the following
13  *     conditions are met:
14  *
15  *      - Redistributions of source code must retain the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer.
18  *
19  *      - Redistributions in binary form must reproduce the above
20  *        copyright notice, this list of conditions and the following
21  *        disclaimer in the documentation and/or other materials
22  *        provided with the distribution.
23  *
24  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31  * SOFTWARE.
32  */
33 
34 #include <getopt.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <stdio.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <netdb.h>
43 #include <semaphore.h>
44 #include <arpa/inet.h>
45 #include <pthread.h>
46 #include <inttypes.h>
47 
48 #include <rdma/rdma_cma.h>
49 #include <infiniband/arch.h>
50 
51 static int debug = 0;
52 #define DEBUG_LOG if (debug) printf
53 
54 /*
55  * rping "ping/pong" loop:
56  * 	client sends source rkey/addr/len
57  *	server receives source rkey/add/len
58  *	server rdma reads "ping" data from source
59  * 	server sends "go ahead" on rdma read completion
60  *	client sends sink rkey/addr/len
61  * 	server receives sink rkey/addr/len
62  * 	server rdma writes "pong" data to sink
63  * 	server sends "go ahead" on rdma write completion
64  * 	<repeat loop>
65  */
66 
67 /*
68  * These states are used to signal events between the completion handler
69  * and the main client or server thread.
70  *
71  * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV,
72  * and RDMA_WRITE_COMPLETE for each ping.
73  */
74 enum test_state {
75 	IDLE = 1,
76 	CONNECT_REQUEST,
77 	ADDR_RESOLVED,
78 	ROUTE_RESOLVED,
79 	CONNECTED,
80 	RDMA_READ_ADV,
81 	RDMA_READ_COMPLETE,
82 	RDMA_WRITE_ADV,
83 	RDMA_WRITE_COMPLETE,
84 	ERROR
85 };
86 
87 struct rping_rdma_info {
88 	uint64_t buf;
89 	uint32_t rkey;
90 	uint32_t size;
91 };
92 
93 /*
94  * Default max buffer size for IO...
95  */
96 #define RPING_BUFSIZE 64*1024
97 #define RPING_SQ_DEPTH 16
98 
99 /* Default string for print data and
100  * minimum buffer size
101  */
102 #define _stringify( _x ) # _x
103 #define stringify( _x ) _stringify( _x )
104 
105 #define RPING_MSG_FMT           "rdma-ping-%d: "
106 #define RPING_MIN_BUFSIZE       sizeof(stringify(INT_MAX)) + sizeof(RPING_MSG_FMT)
107 
108 /*
109  * Control block struct.
110  */
111 struct rping_cb {
112 	int server;			/* 0 iff client */
113 	pthread_t cqthread;
114 	pthread_t persistent_server_thread;
115 	struct ibv_comp_channel *channel;
116 	struct ibv_cq *cq;
117 	struct ibv_pd *pd;
118 	struct ibv_qp *qp;
119 
120 	struct ibv_recv_wr rq_wr;	/* recv work request record */
121 	struct ibv_sge recv_sgl;	/* recv single SGE */
122 	struct rping_rdma_info recv_buf;/* malloc'd buffer */
123 	struct ibv_mr *recv_mr;		/* MR associated with this buffer */
124 
125 	struct ibv_send_wr sq_wr;	/* send work request record */
126 	struct ibv_sge send_sgl;
127 	struct rping_rdma_info send_buf;/* single send buf */
128 	struct ibv_mr *send_mr;
129 
130 	struct ibv_send_wr rdma_sq_wr;	/* rdma work request record */
131 	struct ibv_sge rdma_sgl;	/* rdma single SGE */
132 	char *rdma_buf;			/* used as rdma sink */
133 	struct ibv_mr *rdma_mr;
134 
135 	uint32_t remote_rkey;		/* remote guys RKEY */
136 	uint64_t remote_addr;		/* remote guys TO */
137 	uint32_t remote_len;		/* remote guys LEN */
138 
139 	char *start_buf;		/* rdma read src */
140 	struct ibv_mr *start_mr;
141 
142 	enum test_state state;		/* used for cond/signalling */
143 	sem_t sem;
144 
145 	struct sockaddr_storage sin;
146 	uint16_t port;			/* dst port in NBO */
147 	int verbose;			/* verbose logging */
148 	int count;			/* ping count */
149 	int size;			/* ping data size */
150 	int validate;			/* validate ping data */
151 
152 	/* CM stuff */
153 	pthread_t cmthread;
154 	struct rdma_event_channel *cm_channel;
155 	struct rdma_cm_id *cm_id;	/* connection on client side,*/
156 					/* listener on service side. */
157 	struct rdma_cm_id *child_cm_id;	/* connection on server side */
158 };
159 
160 static int rping_cma_event_handler(struct rdma_cm_id *cma_id,
161 				    struct rdma_cm_event *event)
162 {
163 	int ret = 0;
164 	struct rping_cb *cb = cma_id->context;
165 
166 	DEBUG_LOG("cma_event type %s cma_id %p (%s)\n",
167 		  rdma_event_str(event->event), cma_id,
168 		  (cma_id == cb->cm_id) ? "parent" : "child");
169 
170 	switch (event->event) {
171 	case RDMA_CM_EVENT_ADDR_RESOLVED:
172 		cb->state = ADDR_RESOLVED;
173 		ret = rdma_resolve_route(cma_id, 2000);
174 		if (ret) {
175 			cb->state = ERROR;
176 			perror("rdma_resolve_route");
177 			sem_post(&cb->sem);
178 		}
179 		break;
180 
181 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
182 		cb->state = ROUTE_RESOLVED;
183 		sem_post(&cb->sem);
184 		break;
185 
186 	case RDMA_CM_EVENT_CONNECT_REQUEST:
187 		cb->state = CONNECT_REQUEST;
188 		cb->child_cm_id = cma_id;
189 		DEBUG_LOG("child cma %p\n", cb->child_cm_id);
190 		sem_post(&cb->sem);
191 		break;
192 
193 	case RDMA_CM_EVENT_ESTABLISHED:
194 		DEBUG_LOG("ESTABLISHED\n");
195 
196 		/*
197 		 * Server will wake up when first RECV completes.
198 		 */
199 		if (!cb->server) {
200 			cb->state = CONNECTED;
201 		}
202 		sem_post(&cb->sem);
203 		break;
204 
205 	case RDMA_CM_EVENT_ADDR_ERROR:
206 	case RDMA_CM_EVENT_ROUTE_ERROR:
207 	case RDMA_CM_EVENT_CONNECT_ERROR:
208 	case RDMA_CM_EVENT_UNREACHABLE:
209 	case RDMA_CM_EVENT_REJECTED:
210 		fprintf(stderr, "cma event %s, error %d\n",
211 			rdma_event_str(event->event), event->status);
212 		sem_post(&cb->sem);
213 		ret = -1;
214 		break;
215 
216 	case RDMA_CM_EVENT_DISCONNECTED:
217 		fprintf(stderr, "%s DISCONNECT EVENT...\n",
218 			cb->server ? "server" : "client");
219 		sem_post(&cb->sem);
220 		break;
221 
222 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
223 		fprintf(stderr, "cma detected device removal!!!!\n");
224 		ret = -1;
225 		break;
226 
227 	default:
228 		fprintf(stderr, "unhandled event: %s, ignoring\n",
229 			rdma_event_str(event->event));
230 		break;
231 	}
232 
233 	return ret;
234 }
235 
236 static int server_recv(struct rping_cb *cb, struct ibv_wc *wc)
237 {
238 	if (wc->byte_len != sizeof(cb->recv_buf)) {
239 		fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
240 		return -1;
241 	}
242 
243 	cb->remote_rkey = ntohl(cb->recv_buf.rkey);
244 	cb->remote_addr = ntohll(cb->recv_buf.buf);
245 	cb->remote_len  = ntohl(cb->recv_buf.size);
246 	DEBUG_LOG("Received rkey %x addr %" PRIx64 " len %d from peer\n",
247 		  cb->remote_rkey, cb->remote_addr, cb->remote_len);
248 
249 	if (cb->state <= CONNECTED || cb->state == RDMA_WRITE_COMPLETE)
250 		cb->state = RDMA_READ_ADV;
251 	else
252 		cb->state = RDMA_WRITE_ADV;
253 
254 	return 0;
255 }
256 
257 static int client_recv(struct rping_cb *cb, struct ibv_wc *wc)
258 {
259 	if (wc->byte_len != sizeof(cb->recv_buf)) {
260 		fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
261 		return -1;
262 	}
263 
264 	if (cb->state == RDMA_READ_ADV)
265 		cb->state = RDMA_WRITE_ADV;
266 	else
267 		cb->state = RDMA_WRITE_COMPLETE;
268 
269 	return 0;
270 }
271 
272 static int rping_cq_event_handler(struct rping_cb *cb)
273 {
274 	struct ibv_wc wc;
275 	struct ibv_recv_wr *bad_wr;
276 	int ret;
277 
278 	while ((ret = ibv_poll_cq(cb->cq, 1, &wc)) == 1) {
279 		ret = 0;
280 
281 		if (wc.status) {
282 			if (wc.status != IBV_WC_WR_FLUSH_ERR)
283 				fprintf(stderr, "cq completion failed status %d\n",
284 					wc.status);
285 			ret = -1;
286 			goto error;
287 		}
288 
289 		switch (wc.opcode) {
290 		case IBV_WC_SEND:
291 			DEBUG_LOG("send completion\n");
292 			break;
293 
294 		case IBV_WC_RDMA_WRITE:
295 			DEBUG_LOG("rdma write completion\n");
296 			cb->state = RDMA_WRITE_COMPLETE;
297 			sem_post(&cb->sem);
298 			break;
299 
300 		case IBV_WC_RDMA_READ:
301 			DEBUG_LOG("rdma read completion\n");
302 			cb->state = RDMA_READ_COMPLETE;
303 			sem_post(&cb->sem);
304 			break;
305 
306 		case IBV_WC_RECV:
307 			DEBUG_LOG("recv completion\n");
308 			ret = cb->server ? server_recv(cb, &wc) :
309 					   client_recv(cb, &wc);
310 			if (ret) {
311 				fprintf(stderr, "recv wc error: %d\n", ret);
312 				goto error;
313 			}
314 
315 			ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
316 			if (ret) {
317 				fprintf(stderr, "post recv error: %d\n", ret);
318 				goto error;
319 			}
320 			sem_post(&cb->sem);
321 			break;
322 
323 		default:
324 			DEBUG_LOG("unknown!!!!! completion\n");
325 			ret = -1;
326 			goto error;
327 		}
328 	}
329 	if (ret) {
330 		fprintf(stderr, "poll error %d\n", ret);
331 		goto error;
332 	}
333 	return 0;
334 
335 error:
336 	cb->state = ERROR;
337 	sem_post(&cb->sem);
338 	return ret;
339 }
340 
341 static int rping_accept(struct rping_cb *cb)
342 {
343 	struct rdma_conn_param conn_param;
344 	int ret;
345 
346 	DEBUG_LOG("accepting client connection request\n");
347 
348 	memset(&conn_param, 0, sizeof conn_param);
349 	conn_param.responder_resources = 1;
350 	conn_param.initiator_depth = 1;
351 
352 	ret = rdma_accept(cb->child_cm_id, &conn_param);
353 	if (ret) {
354 		perror("rdma_accept");
355 		return ret;
356 	}
357 
358 	sem_wait(&cb->sem);
359 	if (cb->state == ERROR) {
360 		fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
361 		return -1;
362 	}
363 	return 0;
364 }
365 
366 static void rping_setup_wr(struct rping_cb *cb)
367 {
368 	cb->recv_sgl.addr = (uint64_t) (unsigned long) &cb->recv_buf;
369 	cb->recv_sgl.length = sizeof cb->recv_buf;
370 	cb->recv_sgl.lkey = cb->recv_mr->lkey;
371 	cb->rq_wr.sg_list = &cb->recv_sgl;
372 	cb->rq_wr.num_sge = 1;
373 
374 	cb->send_sgl.addr = (uint64_t) (unsigned long) &cb->send_buf;
375 	cb->send_sgl.length = sizeof cb->send_buf;
376 	cb->send_sgl.lkey = cb->send_mr->lkey;
377 
378 	cb->sq_wr.opcode = IBV_WR_SEND;
379 	cb->sq_wr.send_flags = IBV_SEND_SIGNALED;
380 	cb->sq_wr.sg_list = &cb->send_sgl;
381 	cb->sq_wr.num_sge = 1;
382 
383 	cb->rdma_sgl.addr = (uint64_t) (unsigned long) cb->rdma_buf;
384 	cb->rdma_sgl.lkey = cb->rdma_mr->lkey;
385 	cb->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
386 	cb->rdma_sq_wr.sg_list = &cb->rdma_sgl;
387 	cb->rdma_sq_wr.num_sge = 1;
388 }
389 
390 static int rping_setup_buffers(struct rping_cb *cb)
391 {
392 	int ret;
393 
394 	DEBUG_LOG("rping_setup_buffers called on cb %p\n", cb);
395 
396 	cb->recv_mr = ibv_reg_mr(cb->pd, &cb->recv_buf, sizeof cb->recv_buf,
397 				 IBV_ACCESS_LOCAL_WRITE);
398 	if (!cb->recv_mr) {
399 		fprintf(stderr, "recv_buf reg_mr failed\n");
400 		return errno;
401 	}
402 
403 	cb->send_mr = ibv_reg_mr(cb->pd, &cb->send_buf, sizeof cb->send_buf, 0);
404 	if (!cb->send_mr) {
405 		fprintf(stderr, "send_buf reg_mr failed\n");
406 		ret = errno;
407 		goto err1;
408 	}
409 
410 	cb->rdma_buf = malloc(cb->size);
411 	if (!cb->rdma_buf) {
412 		fprintf(stderr, "rdma_buf malloc failed\n");
413 		ret = -ENOMEM;
414 		goto err2;
415 	}
416 
417 	cb->rdma_mr = ibv_reg_mr(cb->pd, cb->rdma_buf, cb->size,
418 				 IBV_ACCESS_LOCAL_WRITE |
419 				 IBV_ACCESS_REMOTE_READ |
420 				 IBV_ACCESS_REMOTE_WRITE);
421 	if (!cb->rdma_mr) {
422 		fprintf(stderr, "rdma_buf reg_mr failed\n");
423 		ret = errno;
424 		goto err3;
425 	}
426 
427 	if (!cb->server) {
428 		cb->start_buf = malloc(cb->size);
429 		if (!cb->start_buf) {
430 			fprintf(stderr, "start_buf malloc failed\n");
431 			ret = -ENOMEM;
432 			goto err4;
433 		}
434 
435 		cb->start_mr = ibv_reg_mr(cb->pd, cb->start_buf, cb->size,
436 					  IBV_ACCESS_LOCAL_WRITE |
437 					  IBV_ACCESS_REMOTE_READ |
438 					  IBV_ACCESS_REMOTE_WRITE);
439 		if (!cb->start_mr) {
440 			fprintf(stderr, "start_buf reg_mr failed\n");
441 			ret = errno;
442 			goto err5;
443 		}
444 	}
445 
446 	rping_setup_wr(cb);
447 	DEBUG_LOG("allocated & registered buffers...\n");
448 	return 0;
449 
450 err5:
451 	free(cb->start_buf);
452 err4:
453 	ibv_dereg_mr(cb->rdma_mr);
454 err3:
455 	free(cb->rdma_buf);
456 err2:
457 	ibv_dereg_mr(cb->send_mr);
458 err1:
459 	ibv_dereg_mr(cb->recv_mr);
460 	return ret;
461 }
462 
463 static void rping_free_buffers(struct rping_cb *cb)
464 {
465 	DEBUG_LOG("rping_free_buffers called on cb %p\n", cb);
466 	ibv_dereg_mr(cb->recv_mr);
467 	ibv_dereg_mr(cb->send_mr);
468 	ibv_dereg_mr(cb->rdma_mr);
469 	free(cb->rdma_buf);
470 	if (!cb->server) {
471 		ibv_dereg_mr(cb->start_mr);
472 		free(cb->start_buf);
473 	}
474 }
475 
476 static int rping_create_qp(struct rping_cb *cb)
477 {
478 	struct ibv_qp_init_attr init_attr;
479 	int ret;
480 
481 	memset(&init_attr, 0, sizeof(init_attr));
482 	init_attr.cap.max_send_wr = RPING_SQ_DEPTH;
483 	init_attr.cap.max_recv_wr = 2;
484 	init_attr.cap.max_recv_sge = 1;
485 	init_attr.cap.max_send_sge = 1;
486 	init_attr.qp_type = IBV_QPT_RC;
487 	init_attr.send_cq = cb->cq;
488 	init_attr.recv_cq = cb->cq;
489 
490 	if (cb->server) {
491 		ret = rdma_create_qp(cb->child_cm_id, cb->pd, &init_attr);
492 		if (!ret)
493 			cb->qp = cb->child_cm_id->qp;
494 	} else {
495 		ret = rdma_create_qp(cb->cm_id, cb->pd, &init_attr);
496 		if (!ret)
497 			cb->qp = cb->cm_id->qp;
498 	}
499 
500 	return ret;
501 }
502 
503 static void rping_free_qp(struct rping_cb *cb)
504 {
505 	ibv_destroy_qp(cb->qp);
506 	ibv_destroy_cq(cb->cq);
507 	ibv_destroy_comp_channel(cb->channel);
508 	ibv_dealloc_pd(cb->pd);
509 }
510 
511 static int rping_setup_qp(struct rping_cb *cb, struct rdma_cm_id *cm_id)
512 {
513 	int ret;
514 
515 	cb->pd = ibv_alloc_pd(cm_id->verbs);
516 	if (!cb->pd) {
517 		fprintf(stderr, "ibv_alloc_pd failed\n");
518 		return errno;
519 	}
520 	DEBUG_LOG("created pd %p\n", cb->pd);
521 
522 	cb->channel = ibv_create_comp_channel(cm_id->verbs);
523 	if (!cb->channel) {
524 		fprintf(stderr, "ibv_create_comp_channel failed\n");
525 		ret = errno;
526 		goto err1;
527 	}
528 	DEBUG_LOG("created channel %p\n", cb->channel);
529 
530 	cb->cq = ibv_create_cq(cm_id->verbs, RPING_SQ_DEPTH * 2, cb,
531 				cb->channel, 0);
532 	if (!cb->cq) {
533 		fprintf(stderr, "ibv_create_cq failed\n");
534 		ret = errno;
535 		goto err2;
536 	}
537 	DEBUG_LOG("created cq %p\n", cb->cq);
538 
539 	ret = ibv_req_notify_cq(cb->cq, 0);
540 	if (ret) {
541 		fprintf(stderr, "ibv_create_cq failed\n");
542 		ret = errno;
543 		goto err3;
544 	}
545 
546 	ret = rping_create_qp(cb);
547 	if (ret) {
548 		perror("rdma_create_qp");
549 		goto err3;
550 	}
551 	DEBUG_LOG("created qp %p\n", cb->qp);
552 	return 0;
553 
554 err3:
555 	ibv_destroy_cq(cb->cq);
556 err2:
557 	ibv_destroy_comp_channel(cb->channel);
558 err1:
559 	ibv_dealloc_pd(cb->pd);
560 	return ret;
561 }
562 
563 static void *cm_thread(void *arg)
564 {
565 	struct rping_cb *cb = arg;
566 	struct rdma_cm_event *event;
567 	int ret;
568 
569 	while (1) {
570 		ret = rdma_get_cm_event(cb->cm_channel, &event);
571 		if (ret) {
572 			perror("rdma_get_cm_event");
573 			exit(ret);
574 		}
575 		ret = rping_cma_event_handler(event->id, event);
576 		rdma_ack_cm_event(event);
577 		if (ret)
578 			exit(ret);
579 	}
580 }
581 
582 static void *cq_thread(void *arg)
583 {
584 	struct rping_cb *cb = arg;
585 	struct ibv_cq *ev_cq;
586 	void *ev_ctx;
587 	int ret;
588 
589 	DEBUG_LOG("cq_thread started.\n");
590 
591 	while (1) {
592 		pthread_testcancel();
593 
594 		ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx);
595 		if (ret) {
596 			fprintf(stderr, "Failed to get cq event!\n");
597 			pthread_exit(NULL);
598 		}
599 		if (ev_cq != cb->cq) {
600 			fprintf(stderr, "Unknown CQ!\n");
601 			pthread_exit(NULL);
602 		}
603 		ret = ibv_req_notify_cq(cb->cq, 0);
604 		if (ret) {
605 			fprintf(stderr, "Failed to set notify!\n");
606 			pthread_exit(NULL);
607 		}
608 		ret = rping_cq_event_handler(cb);
609 		ibv_ack_cq_events(cb->cq, 1);
610 		if (ret)
611 			pthread_exit(NULL);
612 	}
613 }
614 
615 static void rping_format_send(struct rping_cb *cb, char *buf, struct ibv_mr *mr)
616 {
617 	struct rping_rdma_info *info = &cb->send_buf;
618 
619 	info->buf = htonll((uint64_t) (unsigned long) buf);
620 	info->rkey = htonl(mr->rkey);
621 	info->size = htonl(cb->size);
622 
623 	DEBUG_LOG("RDMA addr %" PRIx64" rkey %x len %d\n",
624 		  ntohll(info->buf), ntohl(info->rkey), ntohl(info->size));
625 }
626 
627 static int rping_test_server(struct rping_cb *cb)
628 {
629 	struct ibv_send_wr *bad_wr;
630 	int ret;
631 
632 	while (1) {
633 		/* Wait for client's Start STAG/TO/Len */
634 		sem_wait(&cb->sem);
635 		if (cb->state != RDMA_READ_ADV) {
636 			fprintf(stderr, "wait for RDMA_READ_ADV state %d\n",
637 				cb->state);
638 			ret = -1;
639 			break;
640 		}
641 
642 		DEBUG_LOG("server received sink adv\n");
643 
644 		/* Issue RDMA Read. */
645 		cb->rdma_sq_wr.opcode = IBV_WR_RDMA_READ;
646 		cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
647 		cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
648 		cb->rdma_sq_wr.sg_list->length = cb->remote_len;
649 
650 		ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
651 		if (ret) {
652 			fprintf(stderr, "post send error %d\n", ret);
653 			break;
654 		}
655 		DEBUG_LOG("server posted rdma read req \n");
656 
657 		/* Wait for read completion */
658 		sem_wait(&cb->sem);
659 		if (cb->state != RDMA_READ_COMPLETE) {
660 			fprintf(stderr, "wait for RDMA_READ_COMPLETE state %d\n",
661 				cb->state);
662 			ret = -1;
663 			break;
664 		}
665 		DEBUG_LOG("server received read complete\n");
666 
667 		/* Display data in recv buf */
668 		if (cb->verbose)
669 			printf("server ping data: %s\n", cb->rdma_buf);
670 
671 		/* Tell client to continue */
672 		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
673 		if (ret) {
674 			fprintf(stderr, "post send error %d\n", ret);
675 			break;
676 		}
677 		DEBUG_LOG("server posted go ahead\n");
678 
679 		/* Wait for client's RDMA STAG/TO/Len */
680 		sem_wait(&cb->sem);
681 		if (cb->state != RDMA_WRITE_ADV) {
682 			fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
683 				cb->state);
684 			ret = -1;
685 			break;
686 		}
687 		DEBUG_LOG("server received sink adv\n");
688 
689 		/* RDMA Write echo data */
690 		cb->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE;
691 		cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
692 		cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
693 		cb->rdma_sq_wr.sg_list->length = strlen(cb->rdma_buf) + 1;
694 		DEBUG_LOG("rdma write from lkey %x laddr %" PRIx64 " len %d\n",
695 			  cb->rdma_sq_wr.sg_list->lkey,
696 			  cb->rdma_sq_wr.sg_list->addr,
697 			  cb->rdma_sq_wr.sg_list->length);
698 
699 		ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
700 		if (ret) {
701 			fprintf(stderr, "post send error %d\n", ret);
702 			break;
703 		}
704 
705 		/* Wait for completion */
706 		ret = sem_wait(&cb->sem);
707 		if (cb->state != RDMA_WRITE_COMPLETE) {
708 			fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
709 				cb->state);
710 			ret = -1;
711 			break;
712 		}
713 		DEBUG_LOG("server rdma write complete \n");
714 
715 		/* Tell client to begin again */
716 		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
717 		if (ret) {
718 			fprintf(stderr, "post send error %d\n", ret);
719 			break;
720 		}
721 		DEBUG_LOG("server posted go ahead\n");
722 	}
723 
724 	return ret;
725 }
726 
727 static int rping_bind_server(struct rping_cb *cb)
728 {
729 	int ret;
730 
731 	if (cb->sin.ss_family == AF_INET)
732 		((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
733 	else
734 		((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
735 
736 	ret = rdma_bind_addr(cb->cm_id, (struct sockaddr *) &cb->sin);
737 	if (ret) {
738 		perror("rdma_bind_addr");
739 		return ret;
740 	}
741 	DEBUG_LOG("rdma_bind_addr successful\n");
742 
743 	DEBUG_LOG("rdma_listen\n");
744 	ret = rdma_listen(cb->cm_id, 3);
745 	if (ret) {
746 		perror("rdma_listen");
747 		return ret;
748 	}
749 
750 	return 0;
751 }
752 
753 static struct rping_cb *clone_cb(struct rping_cb *listening_cb)
754 {
755 	struct rping_cb *cb = malloc(sizeof *cb);
756 	if (!cb)
757 		return NULL;
758 	*cb = *listening_cb;
759 	cb->child_cm_id->context = cb;
760 	return cb;
761 }
762 
763 static void free_cb(struct rping_cb *cb)
764 {
765 	free(cb);
766 }
767 
768 static void *rping_persistent_server_thread(void *arg)
769 {
770 	struct rping_cb *cb = arg;
771 	struct ibv_recv_wr *bad_wr;
772 	int ret;
773 
774 	ret = rping_setup_qp(cb, cb->child_cm_id);
775 	if (ret) {
776 		fprintf(stderr, "setup_qp failed: %d\n", ret);
777 		goto err0;
778 	}
779 
780 	ret = rping_setup_buffers(cb);
781 	if (ret) {
782 		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
783 		goto err1;
784 	}
785 
786 	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
787 	if (ret) {
788 		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
789 		goto err2;
790 	}
791 
792 	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
793 
794 	ret = rping_accept(cb);
795 	if (ret) {
796 		fprintf(stderr, "connect error %d\n", ret);
797 		goto err3;
798 	}
799 
800 	rping_test_server(cb);
801 	rdma_disconnect(cb->child_cm_id);
802 	pthread_join(cb->cqthread, NULL);
803 	rping_free_buffers(cb);
804 	rping_free_qp(cb);
805 	rdma_destroy_id(cb->child_cm_id);
806 	free_cb(cb);
807 	return NULL;
808 err3:
809 	pthread_cancel(cb->cqthread);
810 	pthread_join(cb->cqthread, NULL);
811 err2:
812 	rping_free_buffers(cb);
813 err1:
814 	rping_free_qp(cb);
815 err0:
816 	free_cb(cb);
817 	return NULL;
818 }
819 
820 static int rping_run_persistent_server(struct rping_cb *listening_cb)
821 {
822 	int ret;
823 	struct rping_cb *cb;
824 
825 	ret = rping_bind_server(listening_cb);
826 	if (ret)
827 		return ret;
828 
829 	while (1) {
830 		sem_wait(&listening_cb->sem);
831 		if (listening_cb->state != CONNECT_REQUEST) {
832 			fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
833 				listening_cb->state);
834 			return -1;
835 		}
836 
837 		cb = clone_cb(listening_cb);
838 		if (!cb)
839 			return -1;
840 		pthread_create(&cb->persistent_server_thread, NULL, rping_persistent_server_thread, cb);
841 	}
842 	return 0;
843 }
844 
845 static int rping_run_server(struct rping_cb *cb)
846 {
847 	struct ibv_recv_wr *bad_wr;
848 	int ret;
849 
850 	ret = rping_bind_server(cb);
851 	if (ret)
852 		return ret;
853 
854 	sem_wait(&cb->sem);
855 	if (cb->state != CONNECT_REQUEST) {
856 		fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
857 			cb->state);
858 		return -1;
859 	}
860 
861 	ret = rping_setup_qp(cb, cb->child_cm_id);
862 	if (ret) {
863 		fprintf(stderr, "setup_qp failed: %d\n", ret);
864 		return ret;
865 	}
866 
867 	ret = rping_setup_buffers(cb);
868 	if (ret) {
869 		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
870 		goto err1;
871 	}
872 
873 	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
874 	if (ret) {
875 		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
876 		goto err2;
877 	}
878 
879 	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
880 
881 	ret = rping_accept(cb);
882 	if (ret) {
883 		fprintf(stderr, "connect error %d\n", ret);
884 		goto err2;
885 	}
886 
887 	rping_test_server(cb);
888 	rdma_disconnect(cb->child_cm_id);
889 	pthread_join(cb->cqthread, NULL);
890 	rdma_destroy_id(cb->child_cm_id);
891 err2:
892 	rping_free_buffers(cb);
893 err1:
894 	rping_free_qp(cb);
895 
896 	return ret;
897 }
898 
899 static int rping_test_client(struct rping_cb *cb)
900 {
901 	int ping, start, cc, i, ret = 0;
902 	struct ibv_send_wr *bad_wr;
903 	unsigned char c;
904 
905 	start = 65;
906 	for (ping = 0; !cb->count || ping < cb->count; ping++) {
907 		cb->state = RDMA_READ_ADV;
908 
909 		/* Put some ascii text in the buffer. */
910 		cc = sprintf(cb->start_buf, RPING_MSG_FMT, ping);
911 		for (i = cc, c = start; i < cb->size; i++) {
912 			cb->start_buf[i] = c;
913 			c++;
914 			if (c > 122)
915 				c = 65;
916 		}
917 		start++;
918 		if (start > 122)
919 			start = 65;
920 		cb->start_buf[cb->size - 1] = 0;
921 
922 		rping_format_send(cb, cb->start_buf, cb->start_mr);
923 		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
924 		if (ret) {
925 			fprintf(stderr, "post send error %d\n", ret);
926 			break;
927 		}
928 
929 		/* Wait for server to ACK */
930 		sem_wait(&cb->sem);
931 		if (cb->state != RDMA_WRITE_ADV) {
932 			fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
933 				cb->state);
934 			ret = -1;
935 			break;
936 		}
937 
938 		rping_format_send(cb, cb->rdma_buf, cb->rdma_mr);
939 		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
940 		if (ret) {
941 			fprintf(stderr, "post send error %d\n", ret);
942 			break;
943 		}
944 
945 		/* Wait for the server to say the RDMA Write is complete. */
946 		sem_wait(&cb->sem);
947 		if (cb->state != RDMA_WRITE_COMPLETE) {
948 			fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
949 				cb->state);
950 			ret = -1;
951 			break;
952 		}
953 
954 		if (cb->validate)
955 			if (memcmp(cb->start_buf, cb->rdma_buf, cb->size)) {
956 				fprintf(stderr, "data mismatch!\n");
957 				ret = -1;
958 				break;
959 			}
960 
961 		if (cb->verbose)
962 			printf("ping data: %s\n", cb->rdma_buf);
963 	}
964 
965 	return ret;
966 }
967 
968 static int rping_connect_client(struct rping_cb *cb)
969 {
970 	struct rdma_conn_param conn_param;
971 	int ret;
972 
973 	memset(&conn_param, 0, sizeof conn_param);
974 	conn_param.responder_resources = 1;
975 	conn_param.initiator_depth = 1;
976 	conn_param.retry_count = 10;
977 
978 	ret = rdma_connect(cb->cm_id, &conn_param);
979 	if (ret) {
980 		perror("rdma_connect");
981 		return ret;
982 	}
983 
984 	sem_wait(&cb->sem);
985 	if (cb->state != CONNECTED) {
986 		fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
987 		return -1;
988 	}
989 
990 	DEBUG_LOG("rmda_connect successful\n");
991 	return 0;
992 }
993 
994 static int rping_bind_client(struct rping_cb *cb)
995 {
996 	int ret;
997 
998 	if (cb->sin.ss_family == AF_INET)
999 		((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
1000 	else
1001 		((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
1002 
1003 	ret = rdma_resolve_addr(cb->cm_id, NULL, (struct sockaddr *) &cb->sin, 2000);
1004 	if (ret) {
1005 		perror("rdma_resolve_addr");
1006 		return ret;
1007 	}
1008 
1009 	sem_wait(&cb->sem);
1010 	if (cb->state != ROUTE_RESOLVED) {
1011 		fprintf(stderr, "waiting for addr/route resolution state %d\n",
1012 			cb->state);
1013 		return -1;
1014 	}
1015 
1016 	DEBUG_LOG("rdma_resolve_addr - rdma_resolve_route successful\n");
1017 	return 0;
1018 }
1019 
1020 static int rping_run_client(struct rping_cb *cb)
1021 {
1022 	struct ibv_recv_wr *bad_wr;
1023 	int ret;
1024 
1025 	ret = rping_bind_client(cb);
1026 	if (ret)
1027 		return ret;
1028 
1029 	ret = rping_setup_qp(cb, cb->cm_id);
1030 	if (ret) {
1031 		fprintf(stderr, "setup_qp failed: %d\n", ret);
1032 		return ret;
1033 	}
1034 
1035 	ret = rping_setup_buffers(cb);
1036 	if (ret) {
1037 		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
1038 		goto err1;
1039 	}
1040 
1041 	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
1042 	if (ret) {
1043 		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
1044 		goto err2;
1045 	}
1046 
1047 	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
1048 
1049 	ret = rping_connect_client(cb);
1050 	if (ret) {
1051 		fprintf(stderr, "connect error %d\n", ret);
1052 		goto err3;
1053 	}
1054 
1055 	ret = rping_test_client(cb);
1056 	if (ret) {
1057 		fprintf(stderr, "rping client failed: %d\n", ret);
1058 		goto err4;
1059 	}
1060 	ret = 0;
1061 err4:
1062 	rdma_disconnect(cb->cm_id);
1063 err3:
1064 	pthread_join(cb->cqthread, NULL);
1065 err2:
1066 	rping_free_buffers(cb);
1067 err1:
1068 	rping_free_qp(cb);
1069 
1070 	return ret;
1071 }
1072 
1073 static int get_addr(char *dst, struct sockaddr *addr)
1074 {
1075 	struct addrinfo *res;
1076 	int ret;
1077 
1078 	ret = getaddrinfo(dst, NULL, NULL, &res);
1079 	if (ret) {
1080 		printf("getaddrinfo failed - invalid hostname or IP address\n");
1081 		return ret;
1082 	}
1083 
1084 	if (res->ai_family == PF_INET)
1085 		memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in));
1086 	else if (res->ai_family == PF_INET6)
1087 		memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in6));
1088 	else
1089 		ret = -1;
1090 
1091 	freeaddrinfo(res);
1092 	return ret;
1093 }
1094 
1095 static void usage(char *name)
1096 {
1097 	printf("%s -s [-vVd] [-S size] [-C count] [-a addr] [-p port]\n",
1098 	       name);
1099 	printf("%s -c [-vVd] [-S size] [-C count] -a addr [-p port]\n",
1100 	       name);
1101 	printf("\t-c\t\tclient side\n");
1102 	printf("\t-s\t\tserver side.  To bind to any address with IPv6 use -a ::0\n");
1103 	printf("\t-v\t\tdisplay ping data to stdout\n");
1104 	printf("\t-V\t\tvalidate ping data\n");
1105 	printf("\t-d\t\tdebug printfs\n");
1106 	printf("\t-S size \tping data size\n");
1107 	printf("\t-C count\tping count times\n");
1108 	printf("\t-a addr\t\taddress\n");
1109 	printf("\t-p port\t\tport\n");
1110 	printf("\t-P\t\tpersistent server mode allowing multiple connections\n");
1111 }
1112 
1113 int main(int argc, char *argv[])
1114 {
1115 	struct rping_cb *cb;
1116 	int op;
1117 	int ret = 0;
1118 	int persistent_server = 0;
1119 
1120 	cb = malloc(sizeof(*cb));
1121 	if (!cb)
1122 		return -ENOMEM;
1123 
1124 	memset(cb, 0, sizeof(*cb));
1125 	cb->server = -1;
1126 	cb->state = IDLE;
1127 	cb->size = 64;
1128 	cb->sin.ss_family = PF_INET;
1129 	cb->port = htons(7174);
1130 	sem_init(&cb->sem, 0, 0);
1131 
1132 	opterr = 0;
1133 	while ((op=getopt(argc, argv, "a:Pp:C:S:t:scvVd")) != -1) {
1134 		switch (op) {
1135 		case 'a':
1136 			ret = get_addr(optarg, (struct sockaddr *) &cb->sin);
1137 			break;
1138 		case 'P':
1139 			persistent_server = 1;
1140 			break;
1141 		case 'p':
1142 			cb->port = htons(atoi(optarg));
1143 			DEBUG_LOG("port %d\n", (int) atoi(optarg));
1144 			break;
1145 		case 's':
1146 			cb->server = 1;
1147 			DEBUG_LOG("server\n");
1148 			break;
1149 		case 'c':
1150 			cb->server = 0;
1151 			DEBUG_LOG("client\n");
1152 			break;
1153 		case 'S':
1154 			cb->size = atoi(optarg);
1155 			if ((cb->size < RPING_MIN_BUFSIZE) ||
1156 			    (cb->size > (RPING_BUFSIZE - 1))) {
1157 				fprintf(stderr, "Invalid size %d "
1158 				       "(valid range is %d to %d)\n",
1159 				       (int)cb->size, (int)(RPING_MIN_BUFSIZE),
1160 				       (int)(RPING_BUFSIZE));
1161 				ret = EINVAL;
1162 			} else
1163 				DEBUG_LOG("size %d\n", (int) atoi(optarg));
1164 			break;
1165 		case 'C':
1166 			cb->count = atoi(optarg);
1167 			if (cb->count < 0) {
1168 				fprintf(stderr, "Invalid count %d\n",
1169 					cb->count);
1170 				ret = EINVAL;
1171 			} else
1172 				DEBUG_LOG("count %d\n", (int) cb->count);
1173 			break;
1174 		case 'v':
1175 			cb->verbose++;
1176 			DEBUG_LOG("verbose\n");
1177 			break;
1178 		case 'V':
1179 			cb->validate++;
1180 			DEBUG_LOG("validate data\n");
1181 			break;
1182 		case 'd':
1183 			debug++;
1184 			break;
1185 		default:
1186 			usage("rping");
1187 			ret = EINVAL;
1188 			goto out;
1189 		}
1190 	}
1191 	if (ret)
1192 		goto out;
1193 
1194 	if (cb->server == -1) {
1195 		usage("rping");
1196 		ret = EINVAL;
1197 		goto out;
1198 	}
1199 
1200 	cb->cm_channel = rdma_create_event_channel();
1201 	if (!cb->cm_channel) {
1202 		perror("rdma_create_event_channel");
1203 		goto out;
1204 	}
1205 
1206 	ret = rdma_create_id(cb->cm_channel, &cb->cm_id, cb, RDMA_PS_TCP);
1207 	if (ret) {
1208 		perror("rdma_create_id");
1209 		goto out2;
1210 	}
1211 	DEBUG_LOG("created cm_id %p\n", cb->cm_id);
1212 
1213 	pthread_create(&cb->cmthread, NULL, cm_thread, cb);
1214 
1215 	if (cb->server) {
1216 		if (persistent_server)
1217 			ret = rping_run_persistent_server(cb);
1218 		else
1219 			ret = rping_run_server(cb);
1220 	} else
1221 		ret = rping_run_client(cb);
1222 
1223 	DEBUG_LOG("destroy cm_id %p\n", cb->cm_id);
1224 	rdma_destroy_id(cb->cm_id);
1225 out2:
1226 	rdma_destroy_event_channel(cb->cm_channel);
1227 out:
1228 	free(cb);
1229 	return ret;
1230 }
1231