1 /* 2 * Copyright (c) 2005 Ammasso, Inc. All rights reserved. 3 * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the 9 * OpenIB.org BSD license below: 10 * 11 * Redistribution and use in source and binary forms, with or 12 * without modification, are permitted provided that the following 13 * conditions are met: 14 * 15 * - Redistributions of source code must retain the above 16 * copyright notice, this list of conditions and the following 17 * disclaimer. 18 * 19 * - Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials 22 * provided with the distribution. 23 * 24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 31 * SOFTWARE. 32 */ 33 34 #include <getopt.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <stdio.h> 38 #include <errno.h> 39 #include <sys/types.h> 40 #include <netinet/in.h> 41 #include <sys/socket.h> 42 #include <netdb.h> 43 #include <byteswap.h> 44 #include <semaphore.h> 45 #include <arpa/inet.h> 46 #include <pthread.h> 47 #include <inttypes.h> 48 49 #include <rdma/rdma_cma.h> 50 #include <infiniband/arch.h> 51 52 static int debug = 0; 53 #define DEBUG_LOG if (debug) printf 54 55 /* 56 * rping "ping/pong" loop: 57 * client sends source rkey/addr/len 58 * server receives source rkey/add/len 59 * server rdma reads "ping" data from source 60 * server sends "go ahead" on rdma read completion 61 * client sends sink rkey/addr/len 62 * server receives sink rkey/addr/len 63 * server rdma writes "pong" data to sink 64 * server sends "go ahead" on rdma write completion 65 * <repeat loop> 66 */ 67 68 /* 69 * These states are used to signal events between the completion handler 70 * and the main client or server thread. 71 * 72 * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV, 73 * and RDMA_WRITE_COMPLETE for each ping. 74 */ 75 enum test_state { 76 IDLE = 1, 77 CONNECT_REQUEST, 78 ADDR_RESOLVED, 79 ROUTE_RESOLVED, 80 CONNECTED, 81 RDMA_READ_ADV, 82 RDMA_READ_COMPLETE, 83 RDMA_WRITE_ADV, 84 RDMA_WRITE_COMPLETE, 85 ERROR 86 }; 87 88 struct rping_rdma_info { 89 uint64_t buf; 90 uint32_t rkey; 91 uint32_t size; 92 }; 93 94 /* 95 * Default max buffer size for IO... 96 */ 97 #define RPING_BUFSIZE 64*1024 98 #define RPING_SQ_DEPTH 16 99 100 /* Default string for print data and 101 * minimum buffer size 102 */ 103 #define _stringify( _x ) # _x 104 #define stringify( _x ) _stringify( _x ) 105 106 #define RPING_MSG_FMT "rdma-ping-%d: " 107 #define RPING_MIN_BUFSIZE sizeof(stringify(INT_MAX)) + sizeof(RPING_MSG_FMT) 108 109 /* 110 * Control block struct. 111 */ 112 struct rping_cb { 113 int server; /* 0 iff client */ 114 pthread_t cqthread; 115 pthread_t persistent_server_thread; 116 struct ibv_comp_channel *channel; 117 struct ibv_cq *cq; 118 struct ibv_pd *pd; 119 struct ibv_qp *qp; 120 121 struct ibv_recv_wr rq_wr; /* recv work request record */ 122 struct ibv_sge recv_sgl; /* recv single SGE */ 123 struct rping_rdma_info recv_buf;/* malloc'd buffer */ 124 struct ibv_mr *recv_mr; /* MR associated with this buffer */ 125 126 struct ibv_send_wr sq_wr; /* send work request record */ 127 struct ibv_sge send_sgl; 128 struct rping_rdma_info send_buf;/* single send buf */ 129 struct ibv_mr *send_mr; 130 131 struct ibv_send_wr rdma_sq_wr; /* rdma work request record */ 132 struct ibv_sge rdma_sgl; /* rdma single SGE */ 133 char *rdma_buf; /* used as rdma sink */ 134 struct ibv_mr *rdma_mr; 135 136 uint32_t remote_rkey; /* remote guys RKEY */ 137 uint64_t remote_addr; /* remote guys TO */ 138 uint32_t remote_len; /* remote guys LEN */ 139 140 char *start_buf; /* rdma read src */ 141 struct ibv_mr *start_mr; 142 143 enum test_state state; /* used for cond/signalling */ 144 sem_t sem; 145 146 struct sockaddr_storage sin; 147 uint16_t port; /* dst port in NBO */ 148 int verbose; /* verbose logging */ 149 int count; /* ping count */ 150 int size; /* ping data size */ 151 int validate; /* validate ping data */ 152 153 /* CM stuff */ 154 pthread_t cmthread; 155 struct rdma_event_channel *cm_channel; 156 struct rdma_cm_id *cm_id; /* connection on client side,*/ 157 /* listener on service side. */ 158 struct rdma_cm_id *child_cm_id; /* connection on server side */ 159 }; 160 161 static int rping_cma_event_handler(struct rdma_cm_id *cma_id, 162 struct rdma_cm_event *event) 163 { 164 int ret = 0; 165 struct rping_cb *cb = cma_id->context; 166 167 DEBUG_LOG("cma_event type %s cma_id %p (%s)\n", 168 rdma_event_str(event->event), cma_id, 169 (cma_id == cb->cm_id) ? "parent" : "child"); 170 171 switch (event->event) { 172 case RDMA_CM_EVENT_ADDR_RESOLVED: 173 cb->state = ADDR_RESOLVED; 174 ret = rdma_resolve_route(cma_id, 2000); 175 if (ret) { 176 cb->state = ERROR; 177 perror("rdma_resolve_route"); 178 sem_post(&cb->sem); 179 } 180 break; 181 182 case RDMA_CM_EVENT_ROUTE_RESOLVED: 183 cb->state = ROUTE_RESOLVED; 184 sem_post(&cb->sem); 185 break; 186 187 case RDMA_CM_EVENT_CONNECT_REQUEST: 188 cb->state = CONNECT_REQUEST; 189 cb->child_cm_id = cma_id; 190 DEBUG_LOG("child cma %p\n", cb->child_cm_id); 191 sem_post(&cb->sem); 192 break; 193 194 case RDMA_CM_EVENT_ESTABLISHED: 195 DEBUG_LOG("ESTABLISHED\n"); 196 197 /* 198 * Server will wake up when first RECV completes. 199 */ 200 if (!cb->server) { 201 cb->state = CONNECTED; 202 } 203 sem_post(&cb->sem); 204 break; 205 206 case RDMA_CM_EVENT_ADDR_ERROR: 207 case RDMA_CM_EVENT_ROUTE_ERROR: 208 case RDMA_CM_EVENT_CONNECT_ERROR: 209 case RDMA_CM_EVENT_UNREACHABLE: 210 case RDMA_CM_EVENT_REJECTED: 211 fprintf(stderr, "cma event %s, error %d\n", 212 rdma_event_str(event->event), event->status); 213 sem_post(&cb->sem); 214 ret = -1; 215 break; 216 217 case RDMA_CM_EVENT_DISCONNECTED: 218 fprintf(stderr, "%s DISCONNECT EVENT...\n", 219 cb->server ? "server" : "client"); 220 sem_post(&cb->sem); 221 break; 222 223 case RDMA_CM_EVENT_DEVICE_REMOVAL: 224 fprintf(stderr, "cma detected device removal!!!!\n"); 225 ret = -1; 226 break; 227 228 default: 229 fprintf(stderr, "unhandled event: %s, ignoring\n", 230 rdma_event_str(event->event)); 231 break; 232 } 233 234 return ret; 235 } 236 237 static int server_recv(struct rping_cb *cb, struct ibv_wc *wc) 238 { 239 if (wc->byte_len != sizeof(cb->recv_buf)) { 240 fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len); 241 return -1; 242 } 243 244 cb->remote_rkey = ntohl(cb->recv_buf.rkey); 245 cb->remote_addr = ntohll(cb->recv_buf.buf); 246 cb->remote_len = ntohl(cb->recv_buf.size); 247 DEBUG_LOG("Received rkey %x addr %" PRIx64 " len %d from peer\n", 248 cb->remote_rkey, cb->remote_addr, cb->remote_len); 249 250 if (cb->state <= CONNECTED || cb->state == RDMA_WRITE_COMPLETE) 251 cb->state = RDMA_READ_ADV; 252 else 253 cb->state = RDMA_WRITE_ADV; 254 255 return 0; 256 } 257 258 static int client_recv(struct rping_cb *cb, struct ibv_wc *wc) 259 { 260 if (wc->byte_len != sizeof(cb->recv_buf)) { 261 fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len); 262 return -1; 263 } 264 265 if (cb->state == RDMA_READ_ADV) 266 cb->state = RDMA_WRITE_ADV; 267 else 268 cb->state = RDMA_WRITE_COMPLETE; 269 270 return 0; 271 } 272 273 static int rping_cq_event_handler(struct rping_cb *cb) 274 { 275 struct ibv_wc wc; 276 struct ibv_recv_wr *bad_wr; 277 int ret; 278 279 while ((ret = ibv_poll_cq(cb->cq, 1, &wc)) == 1) { 280 ret = 0; 281 282 if (wc.status) { 283 fprintf(stderr, "cq completion failed status %d\n", 284 wc.status); 285 if (wc.status != IBV_WC_WR_FLUSH_ERR) 286 ret = -1; 287 goto error; 288 } 289 290 switch (wc.opcode) { 291 case IBV_WC_SEND: 292 DEBUG_LOG("send completion\n"); 293 break; 294 295 case IBV_WC_RDMA_WRITE: 296 DEBUG_LOG("rdma write completion\n"); 297 cb->state = RDMA_WRITE_COMPLETE; 298 sem_post(&cb->sem); 299 break; 300 301 case IBV_WC_RDMA_READ: 302 DEBUG_LOG("rdma read completion\n"); 303 cb->state = RDMA_READ_COMPLETE; 304 sem_post(&cb->sem); 305 break; 306 307 case IBV_WC_RECV: 308 DEBUG_LOG("recv completion\n"); 309 ret = cb->server ? server_recv(cb, &wc) : 310 client_recv(cb, &wc); 311 if (ret) { 312 fprintf(stderr, "recv wc error: %d\n", ret); 313 goto error; 314 } 315 316 ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr); 317 if (ret) { 318 fprintf(stderr, "post recv error: %d\n", ret); 319 goto error; 320 } 321 sem_post(&cb->sem); 322 break; 323 324 default: 325 DEBUG_LOG("unknown!!!!! completion\n"); 326 ret = -1; 327 goto error; 328 } 329 } 330 if (ret) { 331 fprintf(stderr, "poll error %d\n", ret); 332 goto error; 333 } 334 return 0; 335 336 error: 337 cb->state = ERROR; 338 sem_post(&cb->sem); 339 return ret; 340 } 341 342 static int rping_accept(struct rping_cb *cb) 343 { 344 struct rdma_conn_param conn_param; 345 int ret; 346 347 DEBUG_LOG("accepting client connection request\n"); 348 349 memset(&conn_param, 0, sizeof conn_param); 350 conn_param.responder_resources = 1; 351 conn_param.initiator_depth = 1; 352 353 ret = rdma_accept(cb->child_cm_id, &conn_param); 354 if (ret) { 355 perror("rdma_accept"); 356 return ret; 357 } 358 359 sem_wait(&cb->sem); 360 if (cb->state == ERROR) { 361 fprintf(stderr, "wait for CONNECTED state %d\n", cb->state); 362 return -1; 363 } 364 return 0; 365 } 366 367 static void rping_setup_wr(struct rping_cb *cb) 368 { 369 cb->recv_sgl.addr = (uint64_t) (unsigned long) &cb->recv_buf; 370 cb->recv_sgl.length = sizeof cb->recv_buf; 371 cb->recv_sgl.lkey = cb->recv_mr->lkey; 372 cb->rq_wr.sg_list = &cb->recv_sgl; 373 cb->rq_wr.num_sge = 1; 374 375 cb->send_sgl.addr = (uint64_t) (unsigned long) &cb->send_buf; 376 cb->send_sgl.length = sizeof cb->send_buf; 377 cb->send_sgl.lkey = cb->send_mr->lkey; 378 379 cb->sq_wr.opcode = IBV_WR_SEND; 380 cb->sq_wr.send_flags = IBV_SEND_SIGNALED; 381 cb->sq_wr.sg_list = &cb->send_sgl; 382 cb->sq_wr.num_sge = 1; 383 384 cb->rdma_sgl.addr = (uint64_t) (unsigned long) cb->rdma_buf; 385 cb->rdma_sgl.lkey = cb->rdma_mr->lkey; 386 cb->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED; 387 cb->rdma_sq_wr.sg_list = &cb->rdma_sgl; 388 cb->rdma_sq_wr.num_sge = 1; 389 } 390 391 static int rping_setup_buffers(struct rping_cb *cb) 392 { 393 int ret; 394 395 DEBUG_LOG("rping_setup_buffers called on cb %p\n", cb); 396 397 cb->recv_mr = ibv_reg_mr(cb->pd, &cb->recv_buf, sizeof cb->recv_buf, 398 IBV_ACCESS_LOCAL_WRITE); 399 if (!cb->recv_mr) { 400 fprintf(stderr, "recv_buf reg_mr failed\n"); 401 return errno; 402 } 403 404 cb->send_mr = ibv_reg_mr(cb->pd, &cb->send_buf, sizeof cb->send_buf, 0); 405 if (!cb->send_mr) { 406 fprintf(stderr, "send_buf reg_mr failed\n"); 407 ret = errno; 408 goto err1; 409 } 410 411 cb->rdma_buf = malloc(cb->size); 412 if (!cb->rdma_buf) { 413 fprintf(stderr, "rdma_buf malloc failed\n"); 414 ret = -ENOMEM; 415 goto err2; 416 } 417 418 cb->rdma_mr = ibv_reg_mr(cb->pd, cb->rdma_buf, cb->size, 419 IBV_ACCESS_LOCAL_WRITE | 420 IBV_ACCESS_REMOTE_READ | 421 IBV_ACCESS_REMOTE_WRITE); 422 if (!cb->rdma_mr) { 423 fprintf(stderr, "rdma_buf reg_mr failed\n"); 424 ret = errno; 425 goto err3; 426 } 427 428 if (!cb->server) { 429 cb->start_buf = malloc(cb->size); 430 if (!cb->start_buf) { 431 fprintf(stderr, "start_buf malloc failed\n"); 432 ret = -ENOMEM; 433 goto err4; 434 } 435 436 cb->start_mr = ibv_reg_mr(cb->pd, cb->start_buf, cb->size, 437 IBV_ACCESS_LOCAL_WRITE | 438 IBV_ACCESS_REMOTE_READ | 439 IBV_ACCESS_REMOTE_WRITE); 440 if (!cb->start_mr) { 441 fprintf(stderr, "start_buf reg_mr failed\n"); 442 ret = errno; 443 goto err5; 444 } 445 } 446 447 rping_setup_wr(cb); 448 DEBUG_LOG("allocated & registered buffers...\n"); 449 return 0; 450 451 err5: 452 free(cb->start_buf); 453 err4: 454 ibv_dereg_mr(cb->rdma_mr); 455 err3: 456 free(cb->rdma_buf); 457 err2: 458 ibv_dereg_mr(cb->send_mr); 459 err1: 460 ibv_dereg_mr(cb->recv_mr); 461 return ret; 462 } 463 464 static void rping_free_buffers(struct rping_cb *cb) 465 { 466 DEBUG_LOG("rping_free_buffers called on cb %p\n", cb); 467 ibv_dereg_mr(cb->recv_mr); 468 ibv_dereg_mr(cb->send_mr); 469 ibv_dereg_mr(cb->rdma_mr); 470 free(cb->rdma_buf); 471 if (!cb->server) { 472 ibv_dereg_mr(cb->start_mr); 473 free(cb->start_buf); 474 } 475 } 476 477 static int rping_create_qp(struct rping_cb *cb) 478 { 479 struct ibv_qp_init_attr init_attr; 480 int ret; 481 482 memset(&init_attr, 0, sizeof(init_attr)); 483 init_attr.cap.max_send_wr = RPING_SQ_DEPTH; 484 init_attr.cap.max_recv_wr = 2; 485 init_attr.cap.max_recv_sge = 1; 486 init_attr.cap.max_send_sge = 1; 487 init_attr.qp_type = IBV_QPT_RC; 488 init_attr.send_cq = cb->cq; 489 init_attr.recv_cq = cb->cq; 490 491 if (cb->server) { 492 ret = rdma_create_qp(cb->child_cm_id, cb->pd, &init_attr); 493 if (!ret) 494 cb->qp = cb->child_cm_id->qp; 495 } else { 496 ret = rdma_create_qp(cb->cm_id, cb->pd, &init_attr); 497 if (!ret) 498 cb->qp = cb->cm_id->qp; 499 } 500 501 return ret; 502 } 503 504 static void rping_free_qp(struct rping_cb *cb) 505 { 506 ibv_destroy_qp(cb->qp); 507 ibv_destroy_cq(cb->cq); 508 ibv_destroy_comp_channel(cb->channel); 509 ibv_dealloc_pd(cb->pd); 510 } 511 512 static int rping_setup_qp(struct rping_cb *cb, struct rdma_cm_id *cm_id) 513 { 514 int ret; 515 516 cb->pd = ibv_alloc_pd(cm_id->verbs); 517 if (!cb->pd) { 518 fprintf(stderr, "ibv_alloc_pd failed\n"); 519 return errno; 520 } 521 DEBUG_LOG("created pd %p\n", cb->pd); 522 523 cb->channel = ibv_create_comp_channel(cm_id->verbs); 524 if (!cb->channel) { 525 fprintf(stderr, "ibv_create_comp_channel failed\n"); 526 ret = errno; 527 goto err1; 528 } 529 DEBUG_LOG("created channel %p\n", cb->channel); 530 531 cb->cq = ibv_create_cq(cm_id->verbs, RPING_SQ_DEPTH * 2, cb, 532 cb->channel, 0); 533 if (!cb->cq) { 534 fprintf(stderr, "ibv_create_cq failed\n"); 535 ret = errno; 536 goto err2; 537 } 538 DEBUG_LOG("created cq %p\n", cb->cq); 539 540 ret = ibv_req_notify_cq(cb->cq, 0); 541 if (ret) { 542 fprintf(stderr, "ibv_create_cq failed\n"); 543 ret = errno; 544 goto err3; 545 } 546 547 ret = rping_create_qp(cb); 548 if (ret) { 549 perror("rdma_create_qp"); 550 goto err3; 551 } 552 DEBUG_LOG("created qp %p\n", cb->qp); 553 return 0; 554 555 err3: 556 ibv_destroy_cq(cb->cq); 557 err2: 558 ibv_destroy_comp_channel(cb->channel); 559 err1: 560 ibv_dealloc_pd(cb->pd); 561 return ret; 562 } 563 564 static void *cm_thread(void *arg) 565 { 566 struct rping_cb *cb = arg; 567 struct rdma_cm_event *event; 568 int ret; 569 570 while (1) { 571 ret = rdma_get_cm_event(cb->cm_channel, &event); 572 if (ret) { 573 perror("rdma_get_cm_event"); 574 exit(ret); 575 } 576 ret = rping_cma_event_handler(event->id, event); 577 rdma_ack_cm_event(event); 578 if (ret) 579 exit(ret); 580 } 581 } 582 583 static void *cq_thread(void *arg) 584 { 585 struct rping_cb *cb = arg; 586 struct ibv_cq *ev_cq; 587 void *ev_ctx; 588 int ret; 589 590 DEBUG_LOG("cq_thread started.\n"); 591 592 while (1) { 593 pthread_testcancel(); 594 595 ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx); 596 if (ret) { 597 fprintf(stderr, "Failed to get cq event!\n"); 598 pthread_exit(NULL); 599 } 600 if (ev_cq != cb->cq) { 601 fprintf(stderr, "Unknown CQ!\n"); 602 pthread_exit(NULL); 603 } 604 ret = ibv_req_notify_cq(cb->cq, 0); 605 if (ret) { 606 fprintf(stderr, "Failed to set notify!\n"); 607 pthread_exit(NULL); 608 } 609 ret = rping_cq_event_handler(cb); 610 ibv_ack_cq_events(cb->cq, 1); 611 if (ret) 612 pthread_exit(NULL); 613 } 614 } 615 616 static void rping_format_send(struct rping_cb *cb, char *buf, struct ibv_mr *mr) 617 { 618 struct rping_rdma_info *info = &cb->send_buf; 619 620 info->buf = htonll((uint64_t) (unsigned long) buf); 621 info->rkey = htonl(mr->rkey); 622 info->size = htonl(cb->size); 623 624 DEBUG_LOG("RDMA addr %" PRIx64" rkey %x len %d\n", 625 ntohll(info->buf), ntohl(info->rkey), ntohl(info->size)); 626 } 627 628 static int rping_test_server(struct rping_cb *cb) 629 { 630 struct ibv_send_wr *bad_wr; 631 int ret; 632 633 while (1) { 634 /* Wait for client's Start STAG/TO/Len */ 635 sem_wait(&cb->sem); 636 if (cb->state != RDMA_READ_ADV) { 637 fprintf(stderr, "wait for RDMA_READ_ADV state %d\n", 638 cb->state); 639 ret = -1; 640 break; 641 } 642 643 DEBUG_LOG("server received sink adv\n"); 644 645 /* Issue RDMA Read. */ 646 cb->rdma_sq_wr.opcode = IBV_WR_RDMA_READ; 647 cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey; 648 cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr; 649 cb->rdma_sq_wr.sg_list->length = cb->remote_len; 650 651 ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr); 652 if (ret) { 653 fprintf(stderr, "post send error %d\n", ret); 654 break; 655 } 656 DEBUG_LOG("server posted rdma read req \n"); 657 658 /* Wait for read completion */ 659 sem_wait(&cb->sem); 660 if (cb->state != RDMA_READ_COMPLETE) { 661 fprintf(stderr, "wait for RDMA_READ_COMPLETE state %d\n", 662 cb->state); 663 ret = -1; 664 break; 665 } 666 DEBUG_LOG("server received read complete\n"); 667 668 /* Display data in recv buf */ 669 if (cb->verbose) 670 printf("server ping data: %s\n", cb->rdma_buf); 671 672 /* Tell client to continue */ 673 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr); 674 if (ret) { 675 fprintf(stderr, "post send error %d\n", ret); 676 break; 677 } 678 DEBUG_LOG("server posted go ahead\n"); 679 680 /* Wait for client's RDMA STAG/TO/Len */ 681 sem_wait(&cb->sem); 682 if (cb->state != RDMA_WRITE_ADV) { 683 fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n", 684 cb->state); 685 ret = -1; 686 break; 687 } 688 DEBUG_LOG("server received sink adv\n"); 689 690 /* RDMA Write echo data */ 691 cb->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE; 692 cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey; 693 cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr; 694 cb->rdma_sq_wr.sg_list->length = strlen(cb->rdma_buf) + 1; 695 DEBUG_LOG("rdma write from lkey %x laddr %" PRIx64 " len %d\n", 696 cb->rdma_sq_wr.sg_list->lkey, 697 cb->rdma_sq_wr.sg_list->addr, 698 cb->rdma_sq_wr.sg_list->length); 699 700 ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr); 701 if (ret) { 702 fprintf(stderr, "post send error %d\n", ret); 703 break; 704 } 705 706 /* Wait for completion */ 707 ret = sem_wait(&cb->sem); 708 if (cb->state != RDMA_WRITE_COMPLETE) { 709 fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n", 710 cb->state); 711 ret = -1; 712 break; 713 } 714 DEBUG_LOG("server rdma write complete \n"); 715 716 /* Tell client to begin again */ 717 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr); 718 if (ret) { 719 fprintf(stderr, "post send error %d\n", ret); 720 break; 721 } 722 DEBUG_LOG("server posted go ahead\n"); 723 } 724 725 return ret; 726 } 727 728 static int rping_bind_server(struct rping_cb *cb) 729 { 730 int ret; 731 732 if (cb->sin.ss_family == AF_INET) 733 ((struct sockaddr_in *) &cb->sin)->sin_port = cb->port; 734 else 735 ((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port; 736 737 ret = rdma_bind_addr(cb->cm_id, (struct sockaddr *) &cb->sin); 738 if (ret) { 739 perror("rdma_bind_addr"); 740 return ret; 741 } 742 DEBUG_LOG("rdma_bind_addr successful\n"); 743 744 DEBUG_LOG("rdma_listen\n"); 745 ret = rdma_listen(cb->cm_id, 3); 746 if (ret) { 747 perror("rdma_listen"); 748 return ret; 749 } 750 751 return 0; 752 } 753 754 static struct rping_cb *clone_cb(struct rping_cb *listening_cb) 755 { 756 struct rping_cb *cb = malloc(sizeof *cb); 757 if (!cb) 758 return NULL; 759 *cb = *listening_cb; 760 cb->child_cm_id->context = cb; 761 return cb; 762 } 763 764 static void free_cb(struct rping_cb *cb) 765 { 766 free(cb); 767 } 768 769 static void *rping_persistent_server_thread(void *arg) 770 { 771 struct rping_cb *cb = arg; 772 struct ibv_recv_wr *bad_wr; 773 int ret; 774 775 ret = rping_setup_qp(cb, cb->child_cm_id); 776 if (ret) { 777 fprintf(stderr, "setup_qp failed: %d\n", ret); 778 goto err0; 779 } 780 781 ret = rping_setup_buffers(cb); 782 if (ret) { 783 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret); 784 goto err1; 785 } 786 787 ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr); 788 if (ret) { 789 fprintf(stderr, "ibv_post_recv failed: %d\n", ret); 790 goto err2; 791 } 792 793 pthread_create(&cb->cqthread, NULL, cq_thread, cb); 794 795 ret = rping_accept(cb); 796 if (ret) { 797 fprintf(stderr, "connect error %d\n", ret); 798 goto err3; 799 } 800 801 rping_test_server(cb); 802 rdma_disconnect(cb->child_cm_id); 803 rping_free_buffers(cb); 804 rping_free_qp(cb); 805 pthread_cancel(cb->cqthread); 806 pthread_join(cb->cqthread, NULL); 807 rdma_destroy_id(cb->child_cm_id); 808 free_cb(cb); 809 return NULL; 810 err3: 811 pthread_cancel(cb->cqthread); 812 pthread_join(cb->cqthread, NULL); 813 err2: 814 rping_free_buffers(cb); 815 err1: 816 rping_free_qp(cb); 817 err0: 818 free_cb(cb); 819 return NULL; 820 } 821 822 static int rping_run_persistent_server(struct rping_cb *listening_cb) 823 { 824 int ret; 825 struct rping_cb *cb; 826 827 ret = rping_bind_server(listening_cb); 828 if (ret) 829 return ret; 830 831 while (1) { 832 sem_wait(&listening_cb->sem); 833 if (listening_cb->state != CONNECT_REQUEST) { 834 fprintf(stderr, "wait for CONNECT_REQUEST state %d\n", 835 listening_cb->state); 836 return -1; 837 } 838 839 cb = clone_cb(listening_cb); 840 if (!cb) 841 return -1; 842 pthread_create(&cb->persistent_server_thread, NULL, rping_persistent_server_thread, cb); 843 } 844 return 0; 845 } 846 847 static int rping_run_server(struct rping_cb *cb) 848 { 849 struct ibv_recv_wr *bad_wr; 850 int ret; 851 852 ret = rping_bind_server(cb); 853 if (ret) 854 return ret; 855 856 sem_wait(&cb->sem); 857 if (cb->state != CONNECT_REQUEST) { 858 fprintf(stderr, "wait for CONNECT_REQUEST state %d\n", 859 cb->state); 860 return -1; 861 } 862 863 ret = rping_setup_qp(cb, cb->child_cm_id); 864 if (ret) { 865 fprintf(stderr, "setup_qp failed: %d\n", ret); 866 return ret; 867 } 868 869 ret = rping_setup_buffers(cb); 870 if (ret) { 871 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret); 872 goto err1; 873 } 874 875 ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr); 876 if (ret) { 877 fprintf(stderr, "ibv_post_recv failed: %d\n", ret); 878 goto err2; 879 } 880 881 pthread_create(&cb->cqthread, NULL, cq_thread, cb); 882 883 ret = rping_accept(cb); 884 if (ret) { 885 fprintf(stderr, "connect error %d\n", ret); 886 goto err2; 887 } 888 889 rping_test_server(cb); 890 rdma_disconnect(cb->child_cm_id); 891 rdma_destroy_id(cb->child_cm_id); 892 err2: 893 rping_free_buffers(cb); 894 err1: 895 rping_free_qp(cb); 896 897 return ret; 898 } 899 900 static int rping_test_client(struct rping_cb *cb) 901 { 902 int ping, start, cc, i, ret = 0; 903 struct ibv_send_wr *bad_wr; 904 unsigned char c; 905 906 start = 65; 907 for (ping = 0; !cb->count || ping < cb->count; ping++) { 908 cb->state = RDMA_READ_ADV; 909 910 /* Put some ascii text in the buffer. */ 911 cc = sprintf(cb->start_buf, RPING_MSG_FMT, ping); 912 for (i = cc, c = start; i < cb->size; i++) { 913 cb->start_buf[i] = c; 914 c++; 915 if (c > 122) 916 c = 65; 917 } 918 start++; 919 if (start > 122) 920 start = 65; 921 cb->start_buf[cb->size - 1] = 0; 922 923 rping_format_send(cb, cb->start_buf, cb->start_mr); 924 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr); 925 if (ret) { 926 fprintf(stderr, "post send error %d\n", ret); 927 break; 928 } 929 930 /* Wait for server to ACK */ 931 sem_wait(&cb->sem); 932 if (cb->state != RDMA_WRITE_ADV) { 933 fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n", 934 cb->state); 935 ret = -1; 936 break; 937 } 938 939 rping_format_send(cb, cb->rdma_buf, cb->rdma_mr); 940 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr); 941 if (ret) { 942 fprintf(stderr, "post send error %d\n", ret); 943 break; 944 } 945 946 /* Wait for the server to say the RDMA Write is complete. */ 947 sem_wait(&cb->sem); 948 if (cb->state != RDMA_WRITE_COMPLETE) { 949 fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n", 950 cb->state); 951 ret = -1; 952 break; 953 } 954 955 if (cb->validate) 956 if (memcmp(cb->start_buf, cb->rdma_buf, cb->size)) { 957 fprintf(stderr, "data mismatch!\n"); 958 ret = -1; 959 break; 960 } 961 962 if (cb->verbose) 963 printf("ping data: %s\n", cb->rdma_buf); 964 } 965 966 return ret; 967 } 968 969 static int rping_connect_client(struct rping_cb *cb) 970 { 971 struct rdma_conn_param conn_param; 972 int ret; 973 974 memset(&conn_param, 0, sizeof conn_param); 975 conn_param.responder_resources = 1; 976 conn_param.initiator_depth = 1; 977 conn_param.retry_count = 10; 978 979 ret = rdma_connect(cb->cm_id, &conn_param); 980 if (ret) { 981 perror("rdma_connect"); 982 return ret; 983 } 984 985 sem_wait(&cb->sem); 986 if (cb->state != CONNECTED) { 987 fprintf(stderr, "wait for CONNECTED state %d\n", cb->state); 988 return -1; 989 } 990 991 DEBUG_LOG("rmda_connect successful\n"); 992 return 0; 993 } 994 995 static int rping_bind_client(struct rping_cb *cb) 996 { 997 int ret; 998 999 if (cb->sin.ss_family == AF_INET) 1000 ((struct sockaddr_in *) &cb->sin)->sin_port = cb->port; 1001 else 1002 ((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port; 1003 1004 ret = rdma_resolve_addr(cb->cm_id, NULL, (struct sockaddr *) &cb->sin, 2000); 1005 if (ret) { 1006 perror("rdma_resolve_addr"); 1007 return ret; 1008 } 1009 1010 sem_wait(&cb->sem); 1011 if (cb->state != ROUTE_RESOLVED) { 1012 fprintf(stderr, "waiting for addr/route resolution state %d\n", 1013 cb->state); 1014 return -1; 1015 } 1016 1017 DEBUG_LOG("rdma_resolve_addr - rdma_resolve_route successful\n"); 1018 return 0; 1019 } 1020 1021 static int rping_run_client(struct rping_cb *cb) 1022 { 1023 struct ibv_recv_wr *bad_wr; 1024 int ret; 1025 1026 ret = rping_bind_client(cb); 1027 if (ret) 1028 return ret; 1029 1030 ret = rping_setup_qp(cb, cb->cm_id); 1031 if (ret) { 1032 fprintf(stderr, "setup_qp failed: %d\n", ret); 1033 return ret; 1034 } 1035 1036 ret = rping_setup_buffers(cb); 1037 if (ret) { 1038 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret); 1039 goto err1; 1040 } 1041 1042 ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr); 1043 if (ret) { 1044 fprintf(stderr, "ibv_post_recv failed: %d\n", ret); 1045 goto err2; 1046 } 1047 1048 pthread_create(&cb->cqthread, NULL, cq_thread, cb); 1049 1050 ret = rping_connect_client(cb); 1051 if (ret) { 1052 fprintf(stderr, "connect error %d\n", ret); 1053 goto err2; 1054 } 1055 1056 rping_test_client(cb); 1057 rdma_disconnect(cb->cm_id); 1058 err2: 1059 rping_free_buffers(cb); 1060 err1: 1061 rping_free_qp(cb); 1062 1063 return ret; 1064 } 1065 1066 static int get_addr(char *dst, struct sockaddr *addr) 1067 { 1068 struct addrinfo *res; 1069 int ret; 1070 1071 ret = getaddrinfo(dst, NULL, NULL, &res); 1072 if (ret) { 1073 printf("getaddrinfo failed - invalid hostname or IP address\n"); 1074 return ret; 1075 } 1076 1077 if (res->ai_family == PF_INET) 1078 memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in)); 1079 else if (res->ai_family == PF_INET6) 1080 memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in6)); 1081 else 1082 ret = -1; 1083 1084 freeaddrinfo(res); 1085 return ret; 1086 } 1087 1088 static void usage(char *name) 1089 { 1090 printf("%s -s [-vVd] [-S size] [-C count] [-a addr] [-p port]\n", 1091 name); 1092 printf("%s -c [-vVd] [-S size] [-C count] -a addr [-p port]\n", 1093 name); 1094 printf("\t-c\t\tclient side\n"); 1095 printf("\t-s\t\tserver side. To bind to any address with IPv6 use -a ::0\n"); 1096 printf("\t-v\t\tdisplay ping data to stdout\n"); 1097 printf("\t-V\t\tvalidate ping data\n"); 1098 printf("\t-d\t\tdebug printfs\n"); 1099 printf("\t-S size \tping data size\n"); 1100 printf("\t-C count\tping count times\n"); 1101 printf("\t-a addr\t\taddress\n"); 1102 printf("\t-p port\t\tport\n"); 1103 printf("\t-P\t\tpersistent server mode allowing multiple connections\n"); 1104 } 1105 1106 int main(int argc, char *argv[]) 1107 { 1108 struct rping_cb *cb; 1109 int op; 1110 int ret = 0; 1111 int persistent_server = 0; 1112 1113 cb = malloc(sizeof(*cb)); 1114 if (!cb) 1115 return -ENOMEM; 1116 1117 memset(cb, 0, sizeof(*cb)); 1118 cb->server = -1; 1119 cb->state = IDLE; 1120 cb->size = 64; 1121 cb->sin.ss_family = PF_INET; 1122 cb->port = htons(7174); 1123 sem_init(&cb->sem, 0, 0); 1124 1125 opterr = 0; 1126 while ((op=getopt(argc, argv, "a:Pp:C:S:t:scvVd")) != -1) { 1127 switch (op) { 1128 case 'a': 1129 ret = get_addr(optarg, (struct sockaddr *) &cb->sin); 1130 break; 1131 case 'P': 1132 persistent_server = 1; 1133 break; 1134 case 'p': 1135 cb->port = htons(atoi(optarg)); 1136 DEBUG_LOG("port %d\n", (int) atoi(optarg)); 1137 break; 1138 case 's': 1139 cb->server = 1; 1140 DEBUG_LOG("server\n"); 1141 break; 1142 case 'c': 1143 cb->server = 0; 1144 DEBUG_LOG("client\n"); 1145 break; 1146 case 'S': 1147 cb->size = atoi(optarg); 1148 if ((cb->size < RPING_MIN_BUFSIZE) || 1149 (cb->size > (RPING_BUFSIZE - 1))) { 1150 fprintf(stderr, "Invalid size %d " 1151 "(valid range is %d to %d)\n", 1152 (int)cb->size, (int)(RPING_MIN_BUFSIZE), 1153 (int)(RPING_BUFSIZE)); 1154 ret = EINVAL; 1155 } else 1156 DEBUG_LOG("size %d\n", (int) atoi(optarg)); 1157 break; 1158 case 'C': 1159 cb->count = atoi(optarg); 1160 if (cb->count < 0) { 1161 fprintf(stderr, "Invalid count %d\n", 1162 cb->count); 1163 ret = EINVAL; 1164 } else 1165 DEBUG_LOG("count %d\n", (int) cb->count); 1166 break; 1167 case 'v': 1168 cb->verbose++; 1169 DEBUG_LOG("verbose\n"); 1170 break; 1171 case 'V': 1172 cb->validate++; 1173 DEBUG_LOG("validate data\n"); 1174 break; 1175 case 'd': 1176 debug++; 1177 break; 1178 default: 1179 usage("rping"); 1180 ret = EINVAL; 1181 goto out; 1182 } 1183 } 1184 if (ret) 1185 goto out; 1186 1187 if (cb->server == -1) { 1188 usage("rping"); 1189 ret = EINVAL; 1190 goto out; 1191 } 1192 1193 cb->cm_channel = rdma_create_event_channel(); 1194 if (!cb->cm_channel) { 1195 perror("rdma_create_event_channel"); 1196 goto out; 1197 } 1198 1199 ret = rdma_create_id(cb->cm_channel, &cb->cm_id, cb, RDMA_PS_TCP); 1200 if (ret) { 1201 perror("rdma_create_id"); 1202 goto out2; 1203 } 1204 DEBUG_LOG("created cm_id %p\n", cb->cm_id); 1205 1206 pthread_create(&cb->cmthread, NULL, cm_thread, cb); 1207 1208 if (cb->server) { 1209 if (persistent_server) 1210 ret = rping_run_persistent_server(cb); 1211 else 1212 ret = rping_run_server(cb); 1213 } else 1214 ret = rping_run_client(cb); 1215 1216 DEBUG_LOG("destroy cm_id %p\n", cb->cm_id); 1217 rdma_destroy_id(cb->cm_id); 1218 out2: 1219 rdma_destroy_event_channel(cb->cm_channel); 1220 out: 1221 free(cb); 1222 return ret; 1223 } 1224