1 /* 2 * Copyright (c) 2005 Ammasso, Inc. All rights reserved. 3 * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the 9 * OpenIB.org BSD license below: 10 * 11 * Redistribution and use in source and binary forms, with or 12 * without modification, are permitted provided that the following 13 * conditions are met: 14 * 15 * - Redistributions of source code must retain the above 16 * copyright notice, this list of conditions and the following 17 * disclaimer. 18 * 19 * - Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials 22 * provided with the distribution. 23 * 24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 31 * SOFTWARE. 32 */ 33 #define _GNU_SOURCE 34 #include <infiniband/endian.h> 35 #include <getopt.h> 36 #include <stdlib.h> 37 #include <string.h> 38 #include <stdio.h> 39 #include <errno.h> 40 #include <sys/types.h> 41 #include <sys/socket.h> 42 #include <netdb.h> 43 #include <semaphore.h> 44 #include <pthread.h> 45 #include <inttypes.h> 46 #include <libgen.h> 47 #include <rdma/rdma_cma.h> 48 49 static int debug = 0; 50 #define DEBUG_LOG if (debug) printf 51 52 /* 53 * rping "ping/pong" loop: 54 * client sends source rkey/addr/len 55 * server receives source rkey/add/len 56 * server rdma reads "ping" data from source 57 * server sends "go ahead" on rdma read completion 58 * client sends sink rkey/addr/len 59 * server receives sink rkey/addr/len 60 * server rdma writes "pong" data to sink 61 * server sends "go ahead" on rdma write completion 62 * <repeat loop> 63 */ 64 65 /* 66 * These states are used to signal events between the completion handler 67 * and the main client or server thread. 68 * 69 * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV, 70 * and RDMA_WRITE_COMPLETE for each ping. 71 */ 72 enum test_state { 73 IDLE = 1, 74 CONNECT_REQUEST, 75 ADDR_RESOLVED, 76 ROUTE_RESOLVED, 77 CONNECTED, 78 RDMA_READ_ADV, 79 RDMA_READ_COMPLETE, 80 RDMA_WRITE_ADV, 81 RDMA_WRITE_COMPLETE, 82 DISCONNECTED, 83 ERROR 84 }; 85 86 struct rping_rdma_info { 87 __be64 buf; 88 __be32 rkey; 89 __be32 size; 90 }; 91 92 /* 93 * Default max buffer size for IO... 94 */ 95 #define RPING_BUFSIZE 64*1024 96 #define RPING_SQ_DEPTH 16 97 98 /* Default string for print data and 99 * minimum buffer size 100 */ 101 #define _stringify( _x ) # _x 102 #define stringify( _x ) _stringify( _x ) 103 104 #define RPING_MSG_FMT "rdma-ping-%d: " 105 #define RPING_MIN_BUFSIZE sizeof(stringify(INT_MAX)) + sizeof(RPING_MSG_FMT) 106 107 /* 108 * Control block struct. 109 */ 110 struct rping_cb { 111 int server; /* 0 iff client */ 112 pthread_t cqthread; 113 pthread_t persistent_server_thread; 114 struct ibv_comp_channel *channel; 115 struct ibv_cq *cq; 116 struct ibv_pd *pd; 117 struct ibv_qp *qp; 118 119 struct ibv_recv_wr rq_wr; /* recv work request record */ 120 struct ibv_sge recv_sgl; /* recv single SGE */ 121 struct rping_rdma_info recv_buf;/* malloc'd buffer */ 122 struct ibv_mr *recv_mr; /* MR associated with this buffer */ 123 124 struct ibv_send_wr sq_wr; /* send work request record */ 125 struct ibv_sge send_sgl; 126 struct rping_rdma_info send_buf;/* single send buf */ 127 struct ibv_mr *send_mr; 128 129 struct ibv_send_wr rdma_sq_wr; /* rdma work request record */ 130 struct ibv_sge rdma_sgl; /* rdma single SGE */ 131 char *rdma_buf; /* used as rdma sink */ 132 struct ibv_mr *rdma_mr; 133 134 uint32_t remote_rkey; /* remote guys RKEY */ 135 uint64_t remote_addr; /* remote guys TO */ 136 uint32_t remote_len; /* remote guys LEN */ 137 138 char *start_buf; /* rdma read src */ 139 struct ibv_mr *start_mr; 140 141 enum test_state state; /* used for cond/signalling */ 142 sem_t sem; 143 144 struct sockaddr_storage sin; 145 struct sockaddr_storage ssource; 146 __be16 port; /* dst port in NBO */ 147 int verbose; /* verbose logging */ 148 int count; /* ping count */ 149 int size; /* ping data size */ 150 int validate; /* validate ping data */ 151 152 /* CM stuff */ 153 pthread_t cmthread; 154 struct rdma_event_channel *cm_channel; 155 struct rdma_cm_id *cm_id; /* connection on client side,*/ 156 /* listener on service side. */ 157 struct rdma_cm_id *child_cm_id; /* connection on server side */ 158 }; 159 160 static int rping_cma_event_handler(struct rdma_cm_id *cma_id, 161 struct rdma_cm_event *event) 162 { 163 int ret = 0; 164 struct rping_cb *cb = cma_id->context; 165 166 DEBUG_LOG("cma_event type %s cma_id %p (%s)\n", 167 rdma_event_str(event->event), cma_id, 168 (cma_id == cb->cm_id) ? "parent" : "child"); 169 170 switch (event->event) { 171 case RDMA_CM_EVENT_ADDR_RESOLVED: 172 cb->state = ADDR_RESOLVED; 173 ret = rdma_resolve_route(cma_id, 2000); 174 if (ret) { 175 cb->state = ERROR; 176 perror("rdma_resolve_route"); 177 sem_post(&cb->sem); 178 } 179 break; 180 181 case RDMA_CM_EVENT_ROUTE_RESOLVED: 182 cb->state = ROUTE_RESOLVED; 183 sem_post(&cb->sem); 184 break; 185 186 case RDMA_CM_EVENT_CONNECT_REQUEST: 187 cb->state = CONNECT_REQUEST; 188 cb->child_cm_id = cma_id; 189 DEBUG_LOG("child cma %p\n", cb->child_cm_id); 190 sem_post(&cb->sem); 191 break; 192 193 case RDMA_CM_EVENT_ESTABLISHED: 194 DEBUG_LOG("ESTABLISHED\n"); 195 196 /* 197 * Server will wake up when first RECV completes. 198 */ 199 if (!cb->server) { 200 cb->state = CONNECTED; 201 } 202 sem_post(&cb->sem); 203 break; 204 205 case RDMA_CM_EVENT_ADDR_ERROR: 206 case RDMA_CM_EVENT_ROUTE_ERROR: 207 case RDMA_CM_EVENT_CONNECT_ERROR: 208 case RDMA_CM_EVENT_UNREACHABLE: 209 case RDMA_CM_EVENT_REJECTED: 210 fprintf(stderr, "cma event %s, error %d\n", 211 rdma_event_str(event->event), event->status); 212 sem_post(&cb->sem); 213 ret = -1; 214 break; 215 216 case RDMA_CM_EVENT_DISCONNECTED: 217 fprintf(stderr, "%s DISCONNECT EVENT...\n", 218 cb->server ? "server" : "client"); 219 cb->state = DISCONNECTED; 220 sem_post(&cb->sem); 221 break; 222 223 case RDMA_CM_EVENT_DEVICE_REMOVAL: 224 fprintf(stderr, "cma detected device removal!!!!\n"); 225 cb->state = ERROR; 226 sem_post(&cb->sem); 227 ret = -1; 228 break; 229 230 default: 231 fprintf(stderr, "unhandled event: %s, ignoring\n", 232 rdma_event_str(event->event)); 233 break; 234 } 235 236 return ret; 237 } 238 239 static int server_recv(struct rping_cb *cb, struct ibv_wc *wc) 240 { 241 if (wc->byte_len != sizeof(cb->recv_buf)) { 242 fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len); 243 return -1; 244 } 245 246 cb->remote_rkey = be32toh(cb->recv_buf.rkey); 247 cb->remote_addr = be64toh(cb->recv_buf.buf); 248 cb->remote_len = be32toh(cb->recv_buf.size); 249 DEBUG_LOG("Received rkey %x addr %" PRIx64 " len %d from peer\n", 250 cb->remote_rkey, cb->remote_addr, cb->remote_len); 251 252 if (cb->state <= CONNECTED || cb->state == RDMA_WRITE_COMPLETE) 253 cb->state = RDMA_READ_ADV; 254 else 255 cb->state = RDMA_WRITE_ADV; 256 257 return 0; 258 } 259 260 static int client_recv(struct rping_cb *cb, struct ibv_wc *wc) 261 { 262 if (wc->byte_len != sizeof(cb->recv_buf)) { 263 fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len); 264 return -1; 265 } 266 267 if (cb->state == RDMA_READ_ADV) 268 cb->state = RDMA_WRITE_ADV; 269 else 270 cb->state = RDMA_WRITE_COMPLETE; 271 272 return 0; 273 } 274 275 static int rping_cq_event_handler(struct rping_cb *cb) 276 { 277 struct ibv_wc wc; 278 struct ibv_recv_wr *bad_wr; 279 int ret; 280 int flushed = 0; 281 282 while ((ret = ibv_poll_cq(cb->cq, 1, &wc)) == 1) { 283 ret = 0; 284 285 if (wc.status) { 286 if (wc.status == IBV_WC_WR_FLUSH_ERR) { 287 flushed = 1; 288 continue; 289 290 } 291 fprintf(stderr, 292 "cq completion failed status %d\n", 293 wc.status); 294 ret = -1; 295 goto error; 296 } 297 298 switch (wc.opcode) { 299 case IBV_WC_SEND: 300 DEBUG_LOG("send completion\n"); 301 break; 302 303 case IBV_WC_RDMA_WRITE: 304 DEBUG_LOG("rdma write completion\n"); 305 cb->state = RDMA_WRITE_COMPLETE; 306 sem_post(&cb->sem); 307 break; 308 309 case IBV_WC_RDMA_READ: 310 DEBUG_LOG("rdma read completion\n"); 311 cb->state = RDMA_READ_COMPLETE; 312 sem_post(&cb->sem); 313 break; 314 315 case IBV_WC_RECV: 316 DEBUG_LOG("recv completion\n"); 317 ret = cb->server ? server_recv(cb, &wc) : 318 client_recv(cb, &wc); 319 if (ret) { 320 fprintf(stderr, "recv wc error: %d\n", ret); 321 goto error; 322 } 323 324 ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr); 325 if (ret) { 326 fprintf(stderr, "post recv error: %d\n", ret); 327 goto error; 328 } 329 sem_post(&cb->sem); 330 break; 331 332 default: 333 DEBUG_LOG("unknown!!!!! completion\n"); 334 ret = -1; 335 goto error; 336 } 337 } 338 if (ret) { 339 fprintf(stderr, "poll error %d\n", ret); 340 goto error; 341 } 342 return flushed; 343 344 error: 345 cb->state = ERROR; 346 sem_post(&cb->sem); 347 return ret; 348 } 349 350 static int rping_accept(struct rping_cb *cb) 351 { 352 int ret; 353 354 DEBUG_LOG("accepting client connection request\n"); 355 356 ret = rdma_accept(cb->child_cm_id, NULL); 357 if (ret) { 358 perror("rdma_accept"); 359 return ret; 360 } 361 362 sem_wait(&cb->sem); 363 if (cb->state == ERROR) { 364 fprintf(stderr, "wait for CONNECTED state %d\n", cb->state); 365 return -1; 366 } 367 return 0; 368 } 369 370 static void rping_setup_wr(struct rping_cb *cb) 371 { 372 cb->recv_sgl.addr = (uint64_t) (unsigned long) &cb->recv_buf; 373 cb->recv_sgl.length = sizeof cb->recv_buf; 374 cb->recv_sgl.lkey = cb->recv_mr->lkey; 375 cb->rq_wr.sg_list = &cb->recv_sgl; 376 cb->rq_wr.num_sge = 1; 377 378 cb->send_sgl.addr = (uint64_t) (unsigned long) &cb->send_buf; 379 cb->send_sgl.length = sizeof cb->send_buf; 380 cb->send_sgl.lkey = cb->send_mr->lkey; 381 382 cb->sq_wr.opcode = IBV_WR_SEND; 383 cb->sq_wr.send_flags = IBV_SEND_SIGNALED; 384 cb->sq_wr.sg_list = &cb->send_sgl; 385 cb->sq_wr.num_sge = 1; 386 387 cb->rdma_sgl.addr = (uint64_t) (unsigned long) cb->rdma_buf; 388 cb->rdma_sgl.lkey = cb->rdma_mr->lkey; 389 cb->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED; 390 cb->rdma_sq_wr.sg_list = &cb->rdma_sgl; 391 cb->rdma_sq_wr.num_sge = 1; 392 } 393 394 static int rping_setup_buffers(struct rping_cb *cb) 395 { 396 int ret; 397 398 DEBUG_LOG("rping_setup_buffers called on cb %p\n", cb); 399 400 cb->recv_mr = ibv_reg_mr(cb->pd, &cb->recv_buf, sizeof cb->recv_buf, 401 IBV_ACCESS_LOCAL_WRITE); 402 if (!cb->recv_mr) { 403 fprintf(stderr, "recv_buf reg_mr failed\n"); 404 return errno; 405 } 406 407 cb->send_mr = ibv_reg_mr(cb->pd, &cb->send_buf, sizeof cb->send_buf, 0); 408 if (!cb->send_mr) { 409 fprintf(stderr, "send_buf reg_mr failed\n"); 410 ret = errno; 411 goto err1; 412 } 413 414 cb->rdma_buf = malloc(cb->size); 415 if (!cb->rdma_buf) { 416 fprintf(stderr, "rdma_buf malloc failed\n"); 417 ret = -ENOMEM; 418 goto err2; 419 } 420 421 cb->rdma_mr = ibv_reg_mr(cb->pd, cb->rdma_buf, cb->size, 422 IBV_ACCESS_LOCAL_WRITE | 423 IBV_ACCESS_REMOTE_READ | 424 IBV_ACCESS_REMOTE_WRITE); 425 if (!cb->rdma_mr) { 426 fprintf(stderr, "rdma_buf reg_mr failed\n"); 427 ret = errno; 428 goto err3; 429 } 430 431 if (!cb->server) { 432 cb->start_buf = malloc(cb->size); 433 if (!cb->start_buf) { 434 fprintf(stderr, "start_buf malloc failed\n"); 435 ret = -ENOMEM; 436 goto err4; 437 } 438 439 cb->start_mr = ibv_reg_mr(cb->pd, cb->start_buf, cb->size, 440 IBV_ACCESS_LOCAL_WRITE | 441 IBV_ACCESS_REMOTE_READ | 442 IBV_ACCESS_REMOTE_WRITE); 443 if (!cb->start_mr) { 444 fprintf(stderr, "start_buf reg_mr failed\n"); 445 ret = errno; 446 goto err5; 447 } 448 } 449 450 rping_setup_wr(cb); 451 DEBUG_LOG("allocated & registered buffers...\n"); 452 return 0; 453 454 err5: 455 free(cb->start_buf); 456 err4: 457 ibv_dereg_mr(cb->rdma_mr); 458 err3: 459 free(cb->rdma_buf); 460 err2: 461 ibv_dereg_mr(cb->send_mr); 462 err1: 463 ibv_dereg_mr(cb->recv_mr); 464 return ret; 465 } 466 467 static void rping_free_buffers(struct rping_cb *cb) 468 { 469 DEBUG_LOG("rping_free_buffers called on cb %p\n", cb); 470 ibv_dereg_mr(cb->recv_mr); 471 ibv_dereg_mr(cb->send_mr); 472 ibv_dereg_mr(cb->rdma_mr); 473 free(cb->rdma_buf); 474 if (!cb->server) { 475 ibv_dereg_mr(cb->start_mr); 476 free(cb->start_buf); 477 } 478 } 479 480 static int rping_create_qp(struct rping_cb *cb) 481 { 482 struct ibv_qp_init_attr init_attr; 483 int ret; 484 485 memset(&init_attr, 0, sizeof(init_attr)); 486 init_attr.cap.max_send_wr = RPING_SQ_DEPTH; 487 init_attr.cap.max_recv_wr = 2; 488 init_attr.cap.max_recv_sge = 1; 489 init_attr.cap.max_send_sge = 1; 490 init_attr.qp_type = IBV_QPT_RC; 491 init_attr.send_cq = cb->cq; 492 init_attr.recv_cq = cb->cq; 493 494 if (cb->server) { 495 ret = rdma_create_qp(cb->child_cm_id, cb->pd, &init_attr); 496 if (!ret) 497 cb->qp = cb->child_cm_id->qp; 498 } else { 499 ret = rdma_create_qp(cb->cm_id, cb->pd, &init_attr); 500 if (!ret) 501 cb->qp = cb->cm_id->qp; 502 } 503 504 return ret; 505 } 506 507 static void rping_free_qp(struct rping_cb *cb) 508 { 509 ibv_destroy_qp(cb->qp); 510 ibv_destroy_cq(cb->cq); 511 ibv_destroy_comp_channel(cb->channel); 512 ibv_dealloc_pd(cb->pd); 513 } 514 515 static int rping_setup_qp(struct rping_cb *cb, struct rdma_cm_id *cm_id) 516 { 517 int ret; 518 519 cb->pd = ibv_alloc_pd(cm_id->verbs); 520 if (!cb->pd) { 521 fprintf(stderr, "ibv_alloc_pd failed\n"); 522 return errno; 523 } 524 DEBUG_LOG("created pd %p\n", cb->pd); 525 526 cb->channel = ibv_create_comp_channel(cm_id->verbs); 527 if (!cb->channel) { 528 fprintf(stderr, "ibv_create_comp_channel failed\n"); 529 ret = errno; 530 goto err1; 531 } 532 DEBUG_LOG("created channel %p\n", cb->channel); 533 534 cb->cq = ibv_create_cq(cm_id->verbs, RPING_SQ_DEPTH * 2, cb, 535 cb->channel, 0); 536 if (!cb->cq) { 537 fprintf(stderr, "ibv_create_cq failed\n"); 538 ret = errno; 539 goto err2; 540 } 541 DEBUG_LOG("created cq %p\n", cb->cq); 542 543 ret = ibv_req_notify_cq(cb->cq, 0); 544 if (ret) { 545 fprintf(stderr, "ibv_create_cq failed\n"); 546 ret = errno; 547 goto err3; 548 } 549 550 ret = rping_create_qp(cb); 551 if (ret) { 552 perror("rdma_create_qp"); 553 goto err3; 554 } 555 DEBUG_LOG("created qp %p\n", cb->qp); 556 return 0; 557 558 err3: 559 ibv_destroy_cq(cb->cq); 560 err2: 561 ibv_destroy_comp_channel(cb->channel); 562 err1: 563 ibv_dealloc_pd(cb->pd); 564 return ret; 565 } 566 567 static void *cm_thread(void *arg) 568 { 569 struct rping_cb *cb = arg; 570 struct rdma_cm_event *event; 571 int ret; 572 573 while (1) { 574 ret = rdma_get_cm_event(cb->cm_channel, &event); 575 if (ret) { 576 perror("rdma_get_cm_event"); 577 exit(ret); 578 } 579 ret = rping_cma_event_handler(event->id, event); 580 rdma_ack_cm_event(event); 581 if (ret) 582 exit(ret); 583 } 584 } 585 586 static void *cq_thread(void *arg) 587 { 588 struct rping_cb *cb = arg; 589 struct ibv_cq *ev_cq; 590 void *ev_ctx; 591 int ret; 592 593 DEBUG_LOG("cq_thread started.\n"); 594 595 while (1) { 596 pthread_testcancel(); 597 598 ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx); 599 if (ret) { 600 fprintf(stderr, "Failed to get cq event!\n"); 601 pthread_exit(NULL); 602 } 603 if (ev_cq != cb->cq) { 604 fprintf(stderr, "Unknown CQ!\n"); 605 pthread_exit(NULL); 606 } 607 ret = ibv_req_notify_cq(cb->cq, 0); 608 if (ret) { 609 fprintf(stderr, "Failed to set notify!\n"); 610 pthread_exit(NULL); 611 } 612 ret = rping_cq_event_handler(cb); 613 ibv_ack_cq_events(cb->cq, 1); 614 if (ret) 615 pthread_exit(NULL); 616 } 617 } 618 619 static void rping_format_send(struct rping_cb *cb, char *buf, struct ibv_mr *mr) 620 { 621 struct rping_rdma_info *info = &cb->send_buf; 622 623 info->buf = htobe64((uint64_t) (unsigned long) buf); 624 info->rkey = htobe32(mr->rkey); 625 info->size = htobe32(cb->size); 626 627 DEBUG_LOG("RDMA addr %" PRIx64" rkey %x len %d\n", 628 be64toh(info->buf), be32toh(info->rkey), be32toh(info->size)); 629 } 630 631 static int rping_test_server(struct rping_cb *cb) 632 { 633 struct ibv_send_wr *bad_wr; 634 int ret; 635 636 while (1) { 637 /* Wait for client's Start STAG/TO/Len */ 638 sem_wait(&cb->sem); 639 if (cb->state != RDMA_READ_ADV) { 640 fprintf(stderr, "wait for RDMA_READ_ADV state %d\n", 641 cb->state); 642 ret = -1; 643 break; 644 } 645 646 DEBUG_LOG("server received sink adv\n"); 647 648 /* Issue RDMA Read. */ 649 cb->rdma_sq_wr.opcode = IBV_WR_RDMA_READ; 650 cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey; 651 cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr; 652 cb->rdma_sq_wr.sg_list->length = cb->remote_len; 653 654 ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr); 655 if (ret) { 656 fprintf(stderr, "post send error %d\n", ret); 657 break; 658 } 659 DEBUG_LOG("server posted rdma read req \n"); 660 661 /* Wait for read completion */ 662 sem_wait(&cb->sem); 663 if (cb->state != RDMA_READ_COMPLETE) { 664 fprintf(stderr, "wait for RDMA_READ_COMPLETE state %d\n", 665 cb->state); 666 ret = -1; 667 break; 668 } 669 DEBUG_LOG("server received read complete\n"); 670 671 /* Display data in recv buf */ 672 if (cb->verbose) 673 printf("server ping data: %s\n", cb->rdma_buf); 674 675 /* Tell client to continue */ 676 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr); 677 if (ret) { 678 fprintf(stderr, "post send error %d\n", ret); 679 break; 680 } 681 DEBUG_LOG("server posted go ahead\n"); 682 683 /* Wait for client's RDMA STAG/TO/Len */ 684 sem_wait(&cb->sem); 685 if (cb->state != RDMA_WRITE_ADV) { 686 fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n", 687 cb->state); 688 ret = -1; 689 break; 690 } 691 DEBUG_LOG("server received sink adv\n"); 692 693 /* RDMA Write echo data */ 694 cb->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE; 695 cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey; 696 cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr; 697 cb->rdma_sq_wr.sg_list->length = strlen(cb->rdma_buf) + 1; 698 DEBUG_LOG("rdma write from lkey %x laddr %" PRIx64 " len %d\n", 699 cb->rdma_sq_wr.sg_list->lkey, 700 cb->rdma_sq_wr.sg_list->addr, 701 cb->rdma_sq_wr.sg_list->length); 702 703 ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr); 704 if (ret) { 705 fprintf(stderr, "post send error %d\n", ret); 706 break; 707 } 708 709 /* Wait for completion */ 710 ret = sem_wait(&cb->sem); 711 if (cb->state != RDMA_WRITE_COMPLETE) { 712 fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n", 713 cb->state); 714 ret = -1; 715 break; 716 } 717 DEBUG_LOG("server rdma write complete \n"); 718 719 /* Tell client to begin again */ 720 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr); 721 if (ret) { 722 fprintf(stderr, "post send error %d\n", ret); 723 break; 724 } 725 DEBUG_LOG("server posted go ahead\n"); 726 } 727 728 return (cb->state == DISCONNECTED) ? 0 : ret; 729 } 730 731 static int rping_bind_server(struct rping_cb *cb) 732 { 733 int ret; 734 735 if (cb->sin.ss_family == AF_INET) 736 ((struct sockaddr_in *) &cb->sin)->sin_port = cb->port; 737 else 738 ((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port; 739 740 ret = rdma_bind_addr(cb->cm_id, (struct sockaddr *) &cb->sin); 741 if (ret) { 742 perror("rdma_bind_addr"); 743 return ret; 744 } 745 DEBUG_LOG("rdma_bind_addr successful\n"); 746 747 DEBUG_LOG("rdma_listen\n"); 748 ret = rdma_listen(cb->cm_id, 3); 749 if (ret) { 750 perror("rdma_listen"); 751 return ret; 752 } 753 754 return 0; 755 } 756 757 static struct rping_cb *clone_cb(struct rping_cb *listening_cb) 758 { 759 struct rping_cb *cb = malloc(sizeof *cb); 760 if (!cb) 761 return NULL; 762 memset(cb, 0, sizeof *cb); 763 *cb = *listening_cb; 764 cb->child_cm_id->context = cb; 765 return cb; 766 } 767 768 static void free_cb(struct rping_cb *cb) 769 { 770 free(cb); 771 } 772 773 static void *rping_persistent_server_thread(void *arg) 774 { 775 struct rping_cb *cb = arg; 776 struct ibv_recv_wr *bad_wr; 777 int ret; 778 779 ret = rping_setup_qp(cb, cb->child_cm_id); 780 if (ret) { 781 fprintf(stderr, "setup_qp failed: %d\n", ret); 782 goto err0; 783 } 784 785 ret = rping_setup_buffers(cb); 786 if (ret) { 787 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret); 788 goto err1; 789 } 790 791 ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr); 792 if (ret) { 793 fprintf(stderr, "ibv_post_recv failed: %d\n", ret); 794 goto err2; 795 } 796 797 ret = pthread_create(&cb->cqthread, NULL, cq_thread, cb); 798 if (ret) { 799 perror("pthread_create"); 800 goto err2; 801 } 802 803 ret = rping_accept(cb); 804 if (ret) { 805 fprintf(stderr, "connect error %d\n", ret); 806 goto err3; 807 } 808 809 rping_test_server(cb); 810 rdma_disconnect(cb->child_cm_id); 811 pthread_join(cb->cqthread, NULL); 812 rping_free_buffers(cb); 813 rping_free_qp(cb); 814 rdma_destroy_id(cb->child_cm_id); 815 free_cb(cb); 816 return NULL; 817 err3: 818 pthread_cancel(cb->cqthread); 819 pthread_join(cb->cqthread, NULL); 820 err2: 821 rping_free_buffers(cb); 822 err1: 823 rping_free_qp(cb); 824 err0: 825 free_cb(cb); 826 return NULL; 827 } 828 829 static int rping_run_persistent_server(struct rping_cb *listening_cb) 830 { 831 int ret; 832 struct rping_cb *cb; 833 pthread_attr_t attr; 834 835 ret = rping_bind_server(listening_cb); 836 if (ret) 837 return ret; 838 839 /* 840 * Set persistent server threads to DEATCHED state so 841 * they release all their resources when they exit. 842 */ 843 ret = pthread_attr_init(&attr); 844 if (ret) { 845 perror("pthread_attr_init"); 846 return ret; 847 } 848 ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 849 if (ret) { 850 perror("pthread_attr_setdetachstate"); 851 return ret; 852 } 853 854 while (1) { 855 sem_wait(&listening_cb->sem); 856 if (listening_cb->state != CONNECT_REQUEST) { 857 fprintf(stderr, "wait for CONNECT_REQUEST state %d\n", 858 listening_cb->state); 859 return -1; 860 } 861 862 cb = clone_cb(listening_cb); 863 if (!cb) 864 return -1; 865 866 ret = pthread_create(&cb->persistent_server_thread, &attr, rping_persistent_server_thread, cb); 867 if (ret) { 868 perror("pthread_create"); 869 return ret; 870 } 871 } 872 return 0; 873 } 874 875 static int rping_run_server(struct rping_cb *cb) 876 { 877 struct ibv_recv_wr *bad_wr; 878 int ret; 879 880 ret = rping_bind_server(cb); 881 if (ret) 882 return ret; 883 884 sem_wait(&cb->sem); 885 if (cb->state != CONNECT_REQUEST) { 886 fprintf(stderr, "wait for CONNECT_REQUEST state %d\n", 887 cb->state); 888 return -1; 889 } 890 891 ret = rping_setup_qp(cb, cb->child_cm_id); 892 if (ret) { 893 fprintf(stderr, "setup_qp failed: %d\n", ret); 894 return ret; 895 } 896 897 ret = rping_setup_buffers(cb); 898 if (ret) { 899 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret); 900 goto err1; 901 } 902 903 ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr); 904 if (ret) { 905 fprintf(stderr, "ibv_post_recv failed: %d\n", ret); 906 goto err2; 907 } 908 909 ret = pthread_create(&cb->cqthread, NULL, cq_thread, cb); 910 if (ret) { 911 perror("pthread_create"); 912 goto err2; 913 } 914 915 ret = rping_accept(cb); 916 if (ret) { 917 fprintf(stderr, "connect error %d\n", ret); 918 goto err2; 919 } 920 921 ret = rping_test_server(cb); 922 if (ret) { 923 fprintf(stderr, "rping server failed: %d\n", ret); 924 goto err3; 925 } 926 927 ret = 0; 928 err3: 929 rdma_disconnect(cb->child_cm_id); 930 pthread_join(cb->cqthread, NULL); 931 rdma_destroy_id(cb->child_cm_id); 932 err2: 933 rping_free_buffers(cb); 934 err1: 935 rping_free_qp(cb); 936 937 return ret; 938 } 939 940 static int rping_test_client(struct rping_cb *cb) 941 { 942 int ping, start, cc, i, ret = 0; 943 struct ibv_send_wr *bad_wr; 944 unsigned char c; 945 946 start = 65; 947 for (ping = 0; !cb->count || ping < cb->count; ping++) { 948 cb->state = RDMA_READ_ADV; 949 950 /* Put some ascii text in the buffer. */ 951 cc = snprintf(cb->start_buf, cb->size, RPING_MSG_FMT, ping); 952 for (i = cc, c = start; i < cb->size; i++) { 953 cb->start_buf[i] = c; 954 c++; 955 if (c > 122) 956 c = 65; 957 } 958 start++; 959 if (start > 122) 960 start = 65; 961 cb->start_buf[cb->size - 1] = 0; 962 963 rping_format_send(cb, cb->start_buf, cb->start_mr); 964 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr); 965 if (ret) { 966 fprintf(stderr, "post send error %d\n", ret); 967 break; 968 } 969 970 /* Wait for server to ACK */ 971 sem_wait(&cb->sem); 972 if (cb->state != RDMA_WRITE_ADV) { 973 fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n", 974 cb->state); 975 ret = -1; 976 break; 977 } 978 979 rping_format_send(cb, cb->rdma_buf, cb->rdma_mr); 980 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr); 981 if (ret) { 982 fprintf(stderr, "post send error %d\n", ret); 983 break; 984 } 985 986 /* Wait for the server to say the RDMA Write is complete. */ 987 sem_wait(&cb->sem); 988 if (cb->state != RDMA_WRITE_COMPLETE) { 989 fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n", 990 cb->state); 991 ret = -1; 992 break; 993 } 994 995 if (cb->validate) 996 if (memcmp(cb->start_buf, cb->rdma_buf, cb->size)) { 997 fprintf(stderr, "data mismatch!\n"); 998 ret = -1; 999 break; 1000 } 1001 1002 if (cb->verbose) 1003 printf("ping data: %s\n", cb->rdma_buf); 1004 } 1005 1006 return (cb->state == DISCONNECTED) ? 0 : ret; 1007 } 1008 1009 static int rping_connect_client(struct rping_cb *cb) 1010 { 1011 struct rdma_conn_param conn_param; 1012 int ret; 1013 1014 memset(&conn_param, 0, sizeof conn_param); 1015 conn_param.responder_resources = 1; 1016 conn_param.initiator_depth = 1; 1017 conn_param.retry_count = 7; 1018 1019 ret = rdma_connect(cb->cm_id, &conn_param); 1020 if (ret) { 1021 perror("rdma_connect"); 1022 return ret; 1023 } 1024 1025 sem_wait(&cb->sem); 1026 if (cb->state != CONNECTED) { 1027 fprintf(stderr, "wait for CONNECTED state %d\n", cb->state); 1028 return -1; 1029 } 1030 1031 DEBUG_LOG("rmda_connect successful\n"); 1032 return 0; 1033 } 1034 1035 static int rping_bind_client(struct rping_cb *cb) 1036 { 1037 int ret; 1038 1039 if (cb->sin.ss_family == AF_INET) 1040 ((struct sockaddr_in *) &cb->sin)->sin_port = cb->port; 1041 else 1042 ((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port; 1043 1044 if (cb->ssource.ss_family) 1045 ret = rdma_resolve_addr(cb->cm_id, (struct sockaddr *) &cb->ssource, 1046 (struct sockaddr *) &cb->sin, 2000); 1047 else 1048 ret = rdma_resolve_addr(cb->cm_id, NULL, (struct sockaddr *) &cb->sin, 2000); 1049 1050 if (ret) { 1051 perror("rdma_resolve_addr"); 1052 return ret; 1053 } 1054 1055 sem_wait(&cb->sem); 1056 if (cb->state != ROUTE_RESOLVED) { 1057 fprintf(stderr, "waiting for addr/route resolution state %d\n", 1058 cb->state); 1059 return -1; 1060 } 1061 1062 DEBUG_LOG("rdma_resolve_addr - rdma_resolve_route successful\n"); 1063 return 0; 1064 } 1065 1066 static int rping_run_client(struct rping_cb *cb) 1067 { 1068 struct ibv_recv_wr *bad_wr; 1069 int ret; 1070 1071 ret = rping_bind_client(cb); 1072 if (ret) 1073 return ret; 1074 1075 ret = rping_setup_qp(cb, cb->cm_id); 1076 if (ret) { 1077 fprintf(stderr, "setup_qp failed: %d\n", ret); 1078 return ret; 1079 } 1080 1081 ret = rping_setup_buffers(cb); 1082 if (ret) { 1083 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret); 1084 goto err1; 1085 } 1086 1087 ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr); 1088 if (ret) { 1089 fprintf(stderr, "ibv_post_recv failed: %d\n", ret); 1090 goto err2; 1091 } 1092 1093 ret = pthread_create(&cb->cqthread, NULL, cq_thread, cb); 1094 if (ret) { 1095 perror("pthread_create"); 1096 goto err2; 1097 } 1098 1099 ret = rping_connect_client(cb); 1100 if (ret) { 1101 fprintf(stderr, "connect error %d\n", ret); 1102 goto err3; 1103 } 1104 1105 ret = rping_test_client(cb); 1106 if (ret) { 1107 fprintf(stderr, "rping client failed: %d\n", ret); 1108 goto err4; 1109 } 1110 1111 ret = 0; 1112 err4: 1113 rdma_disconnect(cb->cm_id); 1114 err3: 1115 pthread_join(cb->cqthread, NULL); 1116 err2: 1117 rping_free_buffers(cb); 1118 err1: 1119 rping_free_qp(cb); 1120 1121 return ret; 1122 } 1123 1124 static int get_addr(char *dst, struct sockaddr *addr) 1125 { 1126 struct addrinfo *res; 1127 int ret; 1128 1129 ret = getaddrinfo(dst, NULL, NULL, &res); 1130 if (ret) { 1131 printf("getaddrinfo failed (%s) - invalid hostname or IP address\n", gai_strerror(ret)); 1132 return ret; 1133 } 1134 1135 if (res->ai_family == PF_INET) 1136 memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in)); 1137 else if (res->ai_family == PF_INET6) 1138 memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in6)); 1139 else 1140 ret = -1; 1141 1142 freeaddrinfo(res); 1143 return ret; 1144 } 1145 1146 static void usage(const char *name) 1147 { 1148 printf("%s -s [-vVd] [-S size] [-C count] [-a addr] [-p port]\n", 1149 basename(name)); 1150 printf("%s -c [-vVd] [-S size] [-C count] [-I addr] -a addr [-p port]\n", 1151 basename(name)); 1152 printf("\t-c\t\tclient side\n"); 1153 printf("\t-I\t\tSource address to bind to for client.\n"); 1154 printf("\t-s\t\tserver side. To bind to any address with IPv6 use -a ::0\n"); 1155 printf("\t-v\t\tdisplay ping data to stdout\n"); 1156 printf("\t-V\t\tvalidate ping data\n"); 1157 printf("\t-d\t\tdebug printfs\n"); 1158 printf("\t-S size \tping data size\n"); 1159 printf("\t-C count\tping count times\n"); 1160 printf("\t-a addr\t\taddress\n"); 1161 printf("\t-p port\t\tport\n"); 1162 printf("\t-P\t\tpersistent server mode allowing multiple connections\n"); 1163 } 1164 1165 int main(int argc, char *argv[]) 1166 { 1167 struct rping_cb *cb; 1168 int op; 1169 int ret = 0; 1170 int persistent_server = 0; 1171 1172 cb = malloc(sizeof(*cb)); 1173 if (!cb) 1174 return -ENOMEM; 1175 1176 memset(cb, 0, sizeof(*cb)); 1177 cb->server = -1; 1178 cb->state = IDLE; 1179 cb->size = 64; 1180 cb->sin.ss_family = PF_INET; 1181 cb->port = htobe16(7174); 1182 sem_init(&cb->sem, 0, 0); 1183 1184 opterr = 0; 1185 while ((op=getopt(argc, argv, "a:I:Pp:C:S:t:scvVd")) != -1) { 1186 switch (op) { 1187 case 'a': 1188 ret = get_addr(optarg, (struct sockaddr *) &cb->sin); 1189 break; 1190 case 'I': 1191 ret = get_addr(optarg, (struct sockaddr *) &cb->ssource); 1192 break; 1193 case 'P': 1194 persistent_server = 1; 1195 break; 1196 case 'p': 1197 cb->port = htobe16(atoi(optarg)); 1198 DEBUG_LOG("port %d\n", (int) atoi(optarg)); 1199 break; 1200 case 's': 1201 cb->server = 1; 1202 DEBUG_LOG("server\n"); 1203 break; 1204 case 'c': 1205 cb->server = 0; 1206 DEBUG_LOG("client\n"); 1207 break; 1208 case 'S': 1209 cb->size = atoi(optarg); 1210 if ((cb->size < RPING_MIN_BUFSIZE) || 1211 (cb->size > (RPING_BUFSIZE - 1))) { 1212 fprintf(stderr, "Invalid size %d " 1213 "(valid range is %zd to %d)\n", 1214 cb->size, RPING_MIN_BUFSIZE, RPING_BUFSIZE); 1215 ret = EINVAL; 1216 } else 1217 DEBUG_LOG("size %d\n", (int) atoi(optarg)); 1218 break; 1219 case 'C': 1220 cb->count = atoi(optarg); 1221 if (cb->count < 0) { 1222 fprintf(stderr, "Invalid count %d\n", 1223 cb->count); 1224 ret = EINVAL; 1225 } else 1226 DEBUG_LOG("count %d\n", (int) cb->count); 1227 break; 1228 case 'v': 1229 cb->verbose++; 1230 DEBUG_LOG("verbose\n"); 1231 break; 1232 case 'V': 1233 cb->validate++; 1234 DEBUG_LOG("validate data\n"); 1235 break; 1236 case 'd': 1237 debug++; 1238 break; 1239 default: 1240 usage("rping"); 1241 ret = EINVAL; 1242 goto out; 1243 } 1244 } 1245 if (ret) 1246 goto out; 1247 1248 if (cb->server == -1) { 1249 usage("rping"); 1250 ret = EINVAL; 1251 goto out; 1252 } 1253 1254 cb->cm_channel = rdma_create_event_channel(); 1255 if (!cb->cm_channel) { 1256 perror("rdma_create_event_channel"); 1257 ret = errno; 1258 goto out; 1259 } 1260 1261 ret = rdma_create_id(cb->cm_channel, &cb->cm_id, cb, RDMA_PS_TCP); 1262 if (ret) { 1263 perror("rdma_create_id"); 1264 goto out2; 1265 } 1266 DEBUG_LOG("created cm_id %p\n", cb->cm_id); 1267 1268 ret = pthread_create(&cb->cmthread, NULL, cm_thread, cb); 1269 if (ret) { 1270 perror("pthread_create"); 1271 goto out2; 1272 } 1273 1274 if (cb->server) { 1275 if (persistent_server) 1276 ret = rping_run_persistent_server(cb); 1277 else 1278 ret = rping_run_server(cb); 1279 } else { 1280 ret = rping_run_client(cb); 1281 } 1282 1283 DEBUG_LOG("destroy cm_id %p\n", cb->cm_id); 1284 rdma_destroy_id(cb->cm_id); 1285 out2: 1286 rdma_destroy_event_channel(cb->cm_channel); 1287 out: 1288 free(cb); 1289 return ret; 1290 } 1291