1 /* 2 * Copyright (c) 2005 Ammasso, Inc. All rights reserved. 3 * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the 9 * OpenIB.org BSD license below: 10 * 11 * Redistribution and use in source and binary forms, with or 12 * without modification, are permitted provided that the following 13 * conditions are met: 14 * 15 * - Redistributions of source code must retain the above 16 * copyright notice, this list of conditions and the following 17 * disclaimer. 18 * 19 * - Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials 22 * provided with the distribution. 23 * 24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 31 * SOFTWARE. 32 */ 33 34 #include <getopt.h> 35 #include <stdlib.h> 36 #include <string.h> 37 #include <stdio.h> 38 #include <errno.h> 39 #include <sys/types.h> 40 #include <netinet/in.h> 41 #include <sys/socket.h> 42 #include <netdb.h> 43 #include <semaphore.h> 44 #include <arpa/inet.h> 45 #include <pthread.h> 46 #include <inttypes.h> 47 48 #include <rdma/rdma_cma.h> 49 #include <infiniband/arch.h> 50 51 static int debug = 0; 52 #define DEBUG_LOG if (debug) printf 53 54 /* 55 * rping "ping/pong" loop: 56 * client sends source rkey/addr/len 57 * server receives source rkey/add/len 58 * server rdma reads "ping" data from source 59 * server sends "go ahead" on rdma read completion 60 * client sends sink rkey/addr/len 61 * server receives sink rkey/addr/len 62 * server rdma writes "pong" data to sink 63 * server sends "go ahead" on rdma write completion 64 * <repeat loop> 65 */ 66 67 /* 68 * These states are used to signal events between the completion handler 69 * and the main client or server thread. 70 * 71 * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV, 72 * and RDMA_WRITE_COMPLETE for each ping. 73 */ 74 enum test_state { 75 IDLE = 1, 76 CONNECT_REQUEST, 77 ADDR_RESOLVED, 78 ROUTE_RESOLVED, 79 CONNECTED, 80 RDMA_READ_ADV, 81 RDMA_READ_COMPLETE, 82 RDMA_WRITE_ADV, 83 RDMA_WRITE_COMPLETE, 84 ERROR 85 }; 86 87 struct rping_rdma_info { 88 uint64_t buf; 89 uint32_t rkey; 90 uint32_t size; 91 }; 92 93 /* 94 * Default max buffer size for IO... 95 */ 96 #define RPING_BUFSIZE 64*1024 97 #define RPING_SQ_DEPTH 16 98 99 /* Default string for print data and 100 * minimum buffer size 101 */ 102 #define _stringify( _x ) # _x 103 #define stringify( _x ) _stringify( _x ) 104 105 #define RPING_MSG_FMT "rdma-ping-%d: " 106 #define RPING_MIN_BUFSIZE sizeof(stringify(INT_MAX)) + sizeof(RPING_MSG_FMT) 107 108 /* 109 * Control block struct. 110 */ 111 struct rping_cb { 112 int server; /* 0 iff client */ 113 pthread_t cqthread; 114 pthread_t persistent_server_thread; 115 struct ibv_comp_channel *channel; 116 struct ibv_cq *cq; 117 struct ibv_pd *pd; 118 struct ibv_qp *qp; 119 120 struct ibv_recv_wr rq_wr; /* recv work request record */ 121 struct ibv_sge recv_sgl; /* recv single SGE */ 122 struct rping_rdma_info recv_buf;/* malloc'd buffer */ 123 struct ibv_mr *recv_mr; /* MR associated with this buffer */ 124 125 struct ibv_send_wr sq_wr; /* send work request record */ 126 struct ibv_sge send_sgl; 127 struct rping_rdma_info send_buf;/* single send buf */ 128 struct ibv_mr *send_mr; 129 130 struct ibv_send_wr rdma_sq_wr; /* rdma work request record */ 131 struct ibv_sge rdma_sgl; /* rdma single SGE */ 132 char *rdma_buf; /* used as rdma sink */ 133 struct ibv_mr *rdma_mr; 134 135 uint32_t remote_rkey; /* remote guys RKEY */ 136 uint64_t remote_addr; /* remote guys TO */ 137 uint32_t remote_len; /* remote guys LEN */ 138 139 char *start_buf; /* rdma read src */ 140 struct ibv_mr *start_mr; 141 142 enum test_state state; /* used for cond/signalling */ 143 sem_t sem; 144 145 struct sockaddr_storage sin; 146 uint16_t port; /* dst port in NBO */ 147 int verbose; /* verbose logging */ 148 int count; /* ping count */ 149 int size; /* ping data size */ 150 int validate; /* validate ping data */ 151 152 /* CM stuff */ 153 pthread_t cmthread; 154 struct rdma_event_channel *cm_channel; 155 struct rdma_cm_id *cm_id; /* connection on client side,*/ 156 /* listener on service side. */ 157 struct rdma_cm_id *child_cm_id; /* connection on server side */ 158 }; 159 160 static int rping_cma_event_handler(struct rdma_cm_id *cma_id, 161 struct rdma_cm_event *event) 162 { 163 int ret = 0; 164 struct rping_cb *cb = cma_id->context; 165 166 DEBUG_LOG("cma_event type %s cma_id %p (%s)\n", 167 rdma_event_str(event->event), cma_id, 168 (cma_id == cb->cm_id) ? "parent" : "child"); 169 170 switch (event->event) { 171 case RDMA_CM_EVENT_ADDR_RESOLVED: 172 cb->state = ADDR_RESOLVED; 173 ret = rdma_resolve_route(cma_id, 2000); 174 if (ret) { 175 cb->state = ERROR; 176 perror("rdma_resolve_route"); 177 sem_post(&cb->sem); 178 } 179 break; 180 181 case RDMA_CM_EVENT_ROUTE_RESOLVED: 182 cb->state = ROUTE_RESOLVED; 183 sem_post(&cb->sem); 184 break; 185 186 case RDMA_CM_EVENT_CONNECT_REQUEST: 187 cb->state = CONNECT_REQUEST; 188 cb->child_cm_id = cma_id; 189 DEBUG_LOG("child cma %p\n", cb->child_cm_id); 190 sem_post(&cb->sem); 191 break; 192 193 case RDMA_CM_EVENT_ESTABLISHED: 194 DEBUG_LOG("ESTABLISHED\n"); 195 196 /* 197 * Server will wake up when first RECV completes. 198 */ 199 if (!cb->server) { 200 cb->state = CONNECTED; 201 } 202 sem_post(&cb->sem); 203 break; 204 205 case RDMA_CM_EVENT_ADDR_ERROR: 206 case RDMA_CM_EVENT_ROUTE_ERROR: 207 case RDMA_CM_EVENT_CONNECT_ERROR: 208 case RDMA_CM_EVENT_UNREACHABLE: 209 case RDMA_CM_EVENT_REJECTED: 210 fprintf(stderr, "cma event %s, error %d\n", 211 rdma_event_str(event->event), event->status); 212 sem_post(&cb->sem); 213 ret = -1; 214 break; 215 216 case RDMA_CM_EVENT_DISCONNECTED: 217 fprintf(stderr, "%s DISCONNECT EVENT...\n", 218 cb->server ? "server" : "client"); 219 sem_post(&cb->sem); 220 break; 221 222 case RDMA_CM_EVENT_DEVICE_REMOVAL: 223 fprintf(stderr, "cma detected device removal!!!!\n"); 224 ret = -1; 225 break; 226 227 default: 228 fprintf(stderr, "unhandled event: %s, ignoring\n", 229 rdma_event_str(event->event)); 230 break; 231 } 232 233 return ret; 234 } 235 236 static int server_recv(struct rping_cb *cb, struct ibv_wc *wc) 237 { 238 if (wc->byte_len != sizeof(cb->recv_buf)) { 239 fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len); 240 return -1; 241 } 242 243 cb->remote_rkey = ntohl(cb->recv_buf.rkey); 244 cb->remote_addr = ntohll(cb->recv_buf.buf); 245 cb->remote_len = ntohl(cb->recv_buf.size); 246 DEBUG_LOG("Received rkey %x addr %" PRIx64 " len %d from peer\n", 247 cb->remote_rkey, cb->remote_addr, cb->remote_len); 248 249 if (cb->state <= CONNECTED || cb->state == RDMA_WRITE_COMPLETE) 250 cb->state = RDMA_READ_ADV; 251 else 252 cb->state = RDMA_WRITE_ADV; 253 254 return 0; 255 } 256 257 static int client_recv(struct rping_cb *cb, struct ibv_wc *wc) 258 { 259 if (wc->byte_len != sizeof(cb->recv_buf)) { 260 fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len); 261 return -1; 262 } 263 264 if (cb->state == RDMA_READ_ADV) 265 cb->state = RDMA_WRITE_ADV; 266 else 267 cb->state = RDMA_WRITE_COMPLETE; 268 269 return 0; 270 } 271 272 static int rping_cq_event_handler(struct rping_cb *cb) 273 { 274 struct ibv_wc wc; 275 struct ibv_recv_wr *bad_wr; 276 int ret; 277 278 while ((ret = ibv_poll_cq(cb->cq, 1, &wc)) == 1) { 279 ret = 0; 280 281 if (wc.status) { 282 if (wc.status != IBV_WC_WR_FLUSH_ERR) 283 fprintf(stderr, "cq completion failed status %d\n", 284 wc.status); 285 ret = -1; 286 goto error; 287 } 288 289 switch (wc.opcode) { 290 case IBV_WC_SEND: 291 DEBUG_LOG("send completion\n"); 292 break; 293 294 case IBV_WC_RDMA_WRITE: 295 DEBUG_LOG("rdma write completion\n"); 296 cb->state = RDMA_WRITE_COMPLETE; 297 sem_post(&cb->sem); 298 break; 299 300 case IBV_WC_RDMA_READ: 301 DEBUG_LOG("rdma read completion\n"); 302 cb->state = RDMA_READ_COMPLETE; 303 sem_post(&cb->sem); 304 break; 305 306 case IBV_WC_RECV: 307 DEBUG_LOG("recv completion\n"); 308 ret = cb->server ? server_recv(cb, &wc) : 309 client_recv(cb, &wc); 310 if (ret) { 311 fprintf(stderr, "recv wc error: %d\n", ret); 312 goto error; 313 } 314 315 ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr); 316 if (ret) { 317 fprintf(stderr, "post recv error: %d\n", ret); 318 goto error; 319 } 320 sem_post(&cb->sem); 321 break; 322 323 default: 324 DEBUG_LOG("unknown!!!!! completion\n"); 325 ret = -1; 326 goto error; 327 } 328 } 329 if (ret) { 330 fprintf(stderr, "poll error %d\n", ret); 331 goto error; 332 } 333 return 0; 334 335 error: 336 cb->state = ERROR; 337 sem_post(&cb->sem); 338 return ret; 339 } 340 341 static int rping_accept(struct rping_cb *cb) 342 { 343 struct rdma_conn_param conn_param; 344 int ret; 345 346 DEBUG_LOG("accepting client connection request\n"); 347 348 memset(&conn_param, 0, sizeof conn_param); 349 conn_param.responder_resources = 1; 350 conn_param.initiator_depth = 1; 351 352 ret = rdma_accept(cb->child_cm_id, &conn_param); 353 if (ret) { 354 perror("rdma_accept"); 355 return ret; 356 } 357 358 sem_wait(&cb->sem); 359 if (cb->state == ERROR) { 360 fprintf(stderr, "wait for CONNECTED state %d\n", cb->state); 361 return -1; 362 } 363 return 0; 364 } 365 366 static void rping_setup_wr(struct rping_cb *cb) 367 { 368 cb->recv_sgl.addr = (uint64_t) (unsigned long) &cb->recv_buf; 369 cb->recv_sgl.length = sizeof cb->recv_buf; 370 cb->recv_sgl.lkey = cb->recv_mr->lkey; 371 cb->rq_wr.sg_list = &cb->recv_sgl; 372 cb->rq_wr.num_sge = 1; 373 374 cb->send_sgl.addr = (uint64_t) (unsigned long) &cb->send_buf; 375 cb->send_sgl.length = sizeof cb->send_buf; 376 cb->send_sgl.lkey = cb->send_mr->lkey; 377 378 cb->sq_wr.opcode = IBV_WR_SEND; 379 cb->sq_wr.send_flags = IBV_SEND_SIGNALED; 380 cb->sq_wr.sg_list = &cb->send_sgl; 381 cb->sq_wr.num_sge = 1; 382 383 cb->rdma_sgl.addr = (uint64_t) (unsigned long) cb->rdma_buf; 384 cb->rdma_sgl.lkey = cb->rdma_mr->lkey; 385 cb->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED; 386 cb->rdma_sq_wr.sg_list = &cb->rdma_sgl; 387 cb->rdma_sq_wr.num_sge = 1; 388 } 389 390 static int rping_setup_buffers(struct rping_cb *cb) 391 { 392 int ret; 393 394 DEBUG_LOG("rping_setup_buffers called on cb %p\n", cb); 395 396 cb->recv_mr = ibv_reg_mr(cb->pd, &cb->recv_buf, sizeof cb->recv_buf, 397 IBV_ACCESS_LOCAL_WRITE); 398 if (!cb->recv_mr) { 399 fprintf(stderr, "recv_buf reg_mr failed\n"); 400 return errno; 401 } 402 403 cb->send_mr = ibv_reg_mr(cb->pd, &cb->send_buf, sizeof cb->send_buf, 0); 404 if (!cb->send_mr) { 405 fprintf(stderr, "send_buf reg_mr failed\n"); 406 ret = errno; 407 goto err1; 408 } 409 410 cb->rdma_buf = malloc(cb->size); 411 if (!cb->rdma_buf) { 412 fprintf(stderr, "rdma_buf malloc failed\n"); 413 ret = -ENOMEM; 414 goto err2; 415 } 416 417 cb->rdma_mr = ibv_reg_mr(cb->pd, cb->rdma_buf, cb->size, 418 IBV_ACCESS_LOCAL_WRITE | 419 IBV_ACCESS_REMOTE_READ | 420 IBV_ACCESS_REMOTE_WRITE); 421 if (!cb->rdma_mr) { 422 fprintf(stderr, "rdma_buf reg_mr failed\n"); 423 ret = errno; 424 goto err3; 425 } 426 427 if (!cb->server) { 428 cb->start_buf = malloc(cb->size); 429 if (!cb->start_buf) { 430 fprintf(stderr, "start_buf malloc failed\n"); 431 ret = -ENOMEM; 432 goto err4; 433 } 434 435 cb->start_mr = ibv_reg_mr(cb->pd, cb->start_buf, cb->size, 436 IBV_ACCESS_LOCAL_WRITE | 437 IBV_ACCESS_REMOTE_READ | 438 IBV_ACCESS_REMOTE_WRITE); 439 if (!cb->start_mr) { 440 fprintf(stderr, "start_buf reg_mr failed\n"); 441 ret = errno; 442 goto err5; 443 } 444 } 445 446 rping_setup_wr(cb); 447 DEBUG_LOG("allocated & registered buffers...\n"); 448 return 0; 449 450 err5: 451 free(cb->start_buf); 452 err4: 453 ibv_dereg_mr(cb->rdma_mr); 454 err3: 455 free(cb->rdma_buf); 456 err2: 457 ibv_dereg_mr(cb->send_mr); 458 err1: 459 ibv_dereg_mr(cb->recv_mr); 460 return ret; 461 } 462 463 static void rping_free_buffers(struct rping_cb *cb) 464 { 465 DEBUG_LOG("rping_free_buffers called on cb %p\n", cb); 466 ibv_dereg_mr(cb->recv_mr); 467 ibv_dereg_mr(cb->send_mr); 468 ibv_dereg_mr(cb->rdma_mr); 469 free(cb->rdma_buf); 470 if (!cb->server) { 471 ibv_dereg_mr(cb->start_mr); 472 free(cb->start_buf); 473 } 474 } 475 476 static int rping_create_qp(struct rping_cb *cb) 477 { 478 struct ibv_qp_init_attr init_attr; 479 int ret; 480 481 memset(&init_attr, 0, sizeof(init_attr)); 482 init_attr.cap.max_send_wr = RPING_SQ_DEPTH; 483 init_attr.cap.max_recv_wr = 2; 484 init_attr.cap.max_recv_sge = 1; 485 init_attr.cap.max_send_sge = 1; 486 init_attr.qp_type = IBV_QPT_RC; 487 init_attr.send_cq = cb->cq; 488 init_attr.recv_cq = cb->cq; 489 490 if (cb->server) { 491 ret = rdma_create_qp(cb->child_cm_id, cb->pd, &init_attr); 492 if (!ret) 493 cb->qp = cb->child_cm_id->qp; 494 } else { 495 ret = rdma_create_qp(cb->cm_id, cb->pd, &init_attr); 496 if (!ret) 497 cb->qp = cb->cm_id->qp; 498 } 499 500 return ret; 501 } 502 503 static void rping_free_qp(struct rping_cb *cb) 504 { 505 ibv_destroy_qp(cb->qp); 506 ibv_destroy_cq(cb->cq); 507 ibv_destroy_comp_channel(cb->channel); 508 ibv_dealloc_pd(cb->pd); 509 } 510 511 static int rping_setup_qp(struct rping_cb *cb, struct rdma_cm_id *cm_id) 512 { 513 int ret; 514 515 cb->pd = ibv_alloc_pd(cm_id->verbs); 516 if (!cb->pd) { 517 fprintf(stderr, "ibv_alloc_pd failed\n"); 518 return errno; 519 } 520 DEBUG_LOG("created pd %p\n", cb->pd); 521 522 cb->channel = ibv_create_comp_channel(cm_id->verbs); 523 if (!cb->channel) { 524 fprintf(stderr, "ibv_create_comp_channel failed\n"); 525 ret = errno; 526 goto err1; 527 } 528 DEBUG_LOG("created channel %p\n", cb->channel); 529 530 cb->cq = ibv_create_cq(cm_id->verbs, RPING_SQ_DEPTH * 2, cb, 531 cb->channel, 0); 532 if (!cb->cq) { 533 fprintf(stderr, "ibv_create_cq failed\n"); 534 ret = errno; 535 goto err2; 536 } 537 DEBUG_LOG("created cq %p\n", cb->cq); 538 539 ret = ibv_req_notify_cq(cb->cq, 0); 540 if (ret) { 541 fprintf(stderr, "ibv_create_cq failed\n"); 542 ret = errno; 543 goto err3; 544 } 545 546 ret = rping_create_qp(cb); 547 if (ret) { 548 perror("rdma_create_qp"); 549 goto err3; 550 } 551 DEBUG_LOG("created qp %p\n", cb->qp); 552 return 0; 553 554 err3: 555 ibv_destroy_cq(cb->cq); 556 err2: 557 ibv_destroy_comp_channel(cb->channel); 558 err1: 559 ibv_dealloc_pd(cb->pd); 560 return ret; 561 } 562 563 static void *cm_thread(void *arg) 564 { 565 struct rping_cb *cb = arg; 566 struct rdma_cm_event *event; 567 int ret; 568 569 while (1) { 570 ret = rdma_get_cm_event(cb->cm_channel, &event); 571 if (ret) { 572 perror("rdma_get_cm_event"); 573 exit(ret); 574 } 575 ret = rping_cma_event_handler(event->id, event); 576 rdma_ack_cm_event(event); 577 if (ret) 578 exit(ret); 579 } 580 } 581 582 static void *cq_thread(void *arg) 583 { 584 struct rping_cb *cb = arg; 585 struct ibv_cq *ev_cq; 586 void *ev_ctx; 587 int ret; 588 589 DEBUG_LOG("cq_thread started.\n"); 590 591 while (1) { 592 pthread_testcancel(); 593 594 ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx); 595 if (ret) { 596 fprintf(stderr, "Failed to get cq event!\n"); 597 pthread_exit(NULL); 598 } 599 if (ev_cq != cb->cq) { 600 fprintf(stderr, "Unknown CQ!\n"); 601 pthread_exit(NULL); 602 } 603 ret = ibv_req_notify_cq(cb->cq, 0); 604 if (ret) { 605 fprintf(stderr, "Failed to set notify!\n"); 606 pthread_exit(NULL); 607 } 608 ret = rping_cq_event_handler(cb); 609 ibv_ack_cq_events(cb->cq, 1); 610 if (ret) 611 pthread_exit(NULL); 612 } 613 } 614 615 static void rping_format_send(struct rping_cb *cb, char *buf, struct ibv_mr *mr) 616 { 617 struct rping_rdma_info *info = &cb->send_buf; 618 619 info->buf = htonll((uint64_t) (unsigned long) buf); 620 info->rkey = htonl(mr->rkey); 621 info->size = htonl(cb->size); 622 623 DEBUG_LOG("RDMA addr %" PRIx64" rkey %x len %d\n", 624 ntohll(info->buf), ntohl(info->rkey), ntohl(info->size)); 625 } 626 627 static int rping_test_server(struct rping_cb *cb) 628 { 629 struct ibv_send_wr *bad_wr; 630 int ret; 631 632 while (1) { 633 /* Wait for client's Start STAG/TO/Len */ 634 sem_wait(&cb->sem); 635 if (cb->state != RDMA_READ_ADV) { 636 fprintf(stderr, "wait for RDMA_READ_ADV state %d\n", 637 cb->state); 638 ret = -1; 639 break; 640 } 641 642 DEBUG_LOG("server received sink adv\n"); 643 644 /* Issue RDMA Read. */ 645 cb->rdma_sq_wr.opcode = IBV_WR_RDMA_READ; 646 cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey; 647 cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr; 648 cb->rdma_sq_wr.sg_list->length = cb->remote_len; 649 650 ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr); 651 if (ret) { 652 fprintf(stderr, "post send error %d\n", ret); 653 break; 654 } 655 DEBUG_LOG("server posted rdma read req \n"); 656 657 /* Wait for read completion */ 658 sem_wait(&cb->sem); 659 if (cb->state != RDMA_READ_COMPLETE) { 660 fprintf(stderr, "wait for RDMA_READ_COMPLETE state %d\n", 661 cb->state); 662 ret = -1; 663 break; 664 } 665 DEBUG_LOG("server received read complete\n"); 666 667 /* Display data in recv buf */ 668 if (cb->verbose) 669 printf("server ping data: %s\n", cb->rdma_buf); 670 671 /* Tell client to continue */ 672 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr); 673 if (ret) { 674 fprintf(stderr, "post send error %d\n", ret); 675 break; 676 } 677 DEBUG_LOG("server posted go ahead\n"); 678 679 /* Wait for client's RDMA STAG/TO/Len */ 680 sem_wait(&cb->sem); 681 if (cb->state != RDMA_WRITE_ADV) { 682 fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n", 683 cb->state); 684 ret = -1; 685 break; 686 } 687 DEBUG_LOG("server received sink adv\n"); 688 689 /* RDMA Write echo data */ 690 cb->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE; 691 cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey; 692 cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr; 693 cb->rdma_sq_wr.sg_list->length = strlen(cb->rdma_buf) + 1; 694 DEBUG_LOG("rdma write from lkey %x laddr %" PRIx64 " len %d\n", 695 cb->rdma_sq_wr.sg_list->lkey, 696 cb->rdma_sq_wr.sg_list->addr, 697 cb->rdma_sq_wr.sg_list->length); 698 699 ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr); 700 if (ret) { 701 fprintf(stderr, "post send error %d\n", ret); 702 break; 703 } 704 705 /* Wait for completion */ 706 ret = sem_wait(&cb->sem); 707 if (cb->state != RDMA_WRITE_COMPLETE) { 708 fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n", 709 cb->state); 710 ret = -1; 711 break; 712 } 713 DEBUG_LOG("server rdma write complete \n"); 714 715 /* Tell client to begin again */ 716 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr); 717 if (ret) { 718 fprintf(stderr, "post send error %d\n", ret); 719 break; 720 } 721 DEBUG_LOG("server posted go ahead\n"); 722 } 723 724 return ret; 725 } 726 727 static int rping_bind_server(struct rping_cb *cb) 728 { 729 int ret; 730 731 if (cb->sin.ss_family == AF_INET) 732 ((struct sockaddr_in *) &cb->sin)->sin_port = cb->port; 733 else 734 ((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port; 735 736 ret = rdma_bind_addr(cb->cm_id, (struct sockaddr *) &cb->sin); 737 if (ret) { 738 perror("rdma_bind_addr"); 739 return ret; 740 } 741 DEBUG_LOG("rdma_bind_addr successful\n"); 742 743 DEBUG_LOG("rdma_listen\n"); 744 ret = rdma_listen(cb->cm_id, 3); 745 if (ret) { 746 perror("rdma_listen"); 747 return ret; 748 } 749 750 return 0; 751 } 752 753 static struct rping_cb *clone_cb(struct rping_cb *listening_cb) 754 { 755 struct rping_cb *cb = malloc(sizeof *cb); 756 if (!cb) 757 return NULL; 758 *cb = *listening_cb; 759 cb->child_cm_id->context = cb; 760 return cb; 761 } 762 763 static void free_cb(struct rping_cb *cb) 764 { 765 free(cb); 766 } 767 768 static void *rping_persistent_server_thread(void *arg) 769 { 770 struct rping_cb *cb = arg; 771 struct ibv_recv_wr *bad_wr; 772 int ret; 773 774 ret = rping_setup_qp(cb, cb->child_cm_id); 775 if (ret) { 776 fprintf(stderr, "setup_qp failed: %d\n", ret); 777 goto err0; 778 } 779 780 ret = rping_setup_buffers(cb); 781 if (ret) { 782 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret); 783 goto err1; 784 } 785 786 ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr); 787 if (ret) { 788 fprintf(stderr, "ibv_post_recv failed: %d\n", ret); 789 goto err2; 790 } 791 792 pthread_create(&cb->cqthread, NULL, cq_thread, cb); 793 794 ret = rping_accept(cb); 795 if (ret) { 796 fprintf(stderr, "connect error %d\n", ret); 797 goto err3; 798 } 799 800 rping_test_server(cb); 801 rdma_disconnect(cb->child_cm_id); 802 pthread_join(cb->cqthread, NULL); 803 rping_free_buffers(cb); 804 rping_free_qp(cb); 805 rdma_destroy_id(cb->child_cm_id); 806 free_cb(cb); 807 return NULL; 808 err3: 809 pthread_cancel(cb->cqthread); 810 pthread_join(cb->cqthread, NULL); 811 err2: 812 rping_free_buffers(cb); 813 err1: 814 rping_free_qp(cb); 815 err0: 816 free_cb(cb); 817 return NULL; 818 } 819 820 static int rping_run_persistent_server(struct rping_cb *listening_cb) 821 { 822 int ret; 823 struct rping_cb *cb; 824 825 ret = rping_bind_server(listening_cb); 826 if (ret) 827 return ret; 828 829 while (1) { 830 sem_wait(&listening_cb->sem); 831 if (listening_cb->state != CONNECT_REQUEST) { 832 fprintf(stderr, "wait for CONNECT_REQUEST state %d\n", 833 listening_cb->state); 834 return -1; 835 } 836 837 cb = clone_cb(listening_cb); 838 if (!cb) 839 return -1; 840 pthread_create(&cb->persistent_server_thread, NULL, rping_persistent_server_thread, cb); 841 } 842 return 0; 843 } 844 845 static int rping_run_server(struct rping_cb *cb) 846 { 847 struct ibv_recv_wr *bad_wr; 848 int ret; 849 850 ret = rping_bind_server(cb); 851 if (ret) 852 return ret; 853 854 sem_wait(&cb->sem); 855 if (cb->state != CONNECT_REQUEST) { 856 fprintf(stderr, "wait for CONNECT_REQUEST state %d\n", 857 cb->state); 858 return -1; 859 } 860 861 ret = rping_setup_qp(cb, cb->child_cm_id); 862 if (ret) { 863 fprintf(stderr, "setup_qp failed: %d\n", ret); 864 return ret; 865 } 866 867 ret = rping_setup_buffers(cb); 868 if (ret) { 869 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret); 870 goto err1; 871 } 872 873 ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr); 874 if (ret) { 875 fprintf(stderr, "ibv_post_recv failed: %d\n", ret); 876 goto err2; 877 } 878 879 pthread_create(&cb->cqthread, NULL, cq_thread, cb); 880 881 ret = rping_accept(cb); 882 if (ret) { 883 fprintf(stderr, "connect error %d\n", ret); 884 goto err2; 885 } 886 887 rping_test_server(cb); 888 rdma_disconnect(cb->child_cm_id); 889 pthread_join(cb->cqthread, NULL); 890 rdma_destroy_id(cb->child_cm_id); 891 err2: 892 rping_free_buffers(cb); 893 err1: 894 rping_free_qp(cb); 895 896 return ret; 897 } 898 899 static int rping_test_client(struct rping_cb *cb) 900 { 901 int ping, start, cc, i, ret = 0; 902 struct ibv_send_wr *bad_wr; 903 unsigned char c; 904 905 start = 65; 906 for (ping = 0; !cb->count || ping < cb->count; ping++) { 907 cb->state = RDMA_READ_ADV; 908 909 /* Put some ascii text in the buffer. */ 910 cc = sprintf(cb->start_buf, RPING_MSG_FMT, ping); 911 for (i = cc, c = start; i < cb->size; i++) { 912 cb->start_buf[i] = c; 913 c++; 914 if (c > 122) 915 c = 65; 916 } 917 start++; 918 if (start > 122) 919 start = 65; 920 cb->start_buf[cb->size - 1] = 0; 921 922 rping_format_send(cb, cb->start_buf, cb->start_mr); 923 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr); 924 if (ret) { 925 fprintf(stderr, "post send error %d\n", ret); 926 break; 927 } 928 929 /* Wait for server to ACK */ 930 sem_wait(&cb->sem); 931 if (cb->state != RDMA_WRITE_ADV) { 932 fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n", 933 cb->state); 934 ret = -1; 935 break; 936 } 937 938 rping_format_send(cb, cb->rdma_buf, cb->rdma_mr); 939 ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr); 940 if (ret) { 941 fprintf(stderr, "post send error %d\n", ret); 942 break; 943 } 944 945 /* Wait for the server to say the RDMA Write is complete. */ 946 sem_wait(&cb->sem); 947 if (cb->state != RDMA_WRITE_COMPLETE) { 948 fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n", 949 cb->state); 950 ret = -1; 951 break; 952 } 953 954 if (cb->validate) 955 if (memcmp(cb->start_buf, cb->rdma_buf, cb->size)) { 956 fprintf(stderr, "data mismatch!\n"); 957 ret = -1; 958 break; 959 } 960 961 if (cb->verbose) 962 printf("ping data: %s\n", cb->rdma_buf); 963 } 964 965 return ret; 966 } 967 968 static int rping_connect_client(struct rping_cb *cb) 969 { 970 struct rdma_conn_param conn_param; 971 int ret; 972 973 memset(&conn_param, 0, sizeof conn_param); 974 conn_param.responder_resources = 1; 975 conn_param.initiator_depth = 1; 976 conn_param.retry_count = 10; 977 978 ret = rdma_connect(cb->cm_id, &conn_param); 979 if (ret) { 980 perror("rdma_connect"); 981 return ret; 982 } 983 984 sem_wait(&cb->sem); 985 if (cb->state != CONNECTED) { 986 fprintf(stderr, "wait for CONNECTED state %d\n", cb->state); 987 return -1; 988 } 989 990 DEBUG_LOG("rmda_connect successful\n"); 991 return 0; 992 } 993 994 static int rping_bind_client(struct rping_cb *cb) 995 { 996 int ret; 997 998 if (cb->sin.ss_family == AF_INET) 999 ((struct sockaddr_in *) &cb->sin)->sin_port = cb->port; 1000 else 1001 ((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port; 1002 1003 ret = rdma_resolve_addr(cb->cm_id, NULL, (struct sockaddr *) &cb->sin, 2000); 1004 if (ret) { 1005 perror("rdma_resolve_addr"); 1006 return ret; 1007 } 1008 1009 sem_wait(&cb->sem); 1010 if (cb->state != ROUTE_RESOLVED) { 1011 fprintf(stderr, "waiting for addr/route resolution state %d\n", 1012 cb->state); 1013 return -1; 1014 } 1015 1016 DEBUG_LOG("rdma_resolve_addr - rdma_resolve_route successful\n"); 1017 return 0; 1018 } 1019 1020 static int rping_run_client(struct rping_cb *cb) 1021 { 1022 struct ibv_recv_wr *bad_wr; 1023 int ret; 1024 1025 ret = rping_bind_client(cb); 1026 if (ret) 1027 return ret; 1028 1029 ret = rping_setup_qp(cb, cb->cm_id); 1030 if (ret) { 1031 fprintf(stderr, "setup_qp failed: %d\n", ret); 1032 return ret; 1033 } 1034 1035 ret = rping_setup_buffers(cb); 1036 if (ret) { 1037 fprintf(stderr, "rping_setup_buffers failed: %d\n", ret); 1038 goto err1; 1039 } 1040 1041 ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr); 1042 if (ret) { 1043 fprintf(stderr, "ibv_post_recv failed: %d\n", ret); 1044 goto err2; 1045 } 1046 1047 pthread_create(&cb->cqthread, NULL, cq_thread, cb); 1048 1049 ret = rping_connect_client(cb); 1050 if (ret) { 1051 fprintf(stderr, "connect error %d\n", ret); 1052 goto err3; 1053 } 1054 1055 ret = rping_test_client(cb); 1056 if (ret) { 1057 fprintf(stderr, "rping client failed: %d\n", ret); 1058 goto err4; 1059 } 1060 ret = 0; 1061 err4: 1062 rdma_disconnect(cb->cm_id); 1063 err3: 1064 pthread_join(cb->cqthread, NULL); 1065 err2: 1066 rping_free_buffers(cb); 1067 err1: 1068 rping_free_qp(cb); 1069 1070 return ret; 1071 } 1072 1073 static int get_addr(char *dst, struct sockaddr *addr) 1074 { 1075 struct addrinfo *res; 1076 int ret; 1077 1078 ret = getaddrinfo(dst, NULL, NULL, &res); 1079 if (ret) { 1080 printf("getaddrinfo failed - invalid hostname or IP address\n"); 1081 return ret; 1082 } 1083 1084 if (res->ai_family == PF_INET) 1085 memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in)); 1086 else if (res->ai_family == PF_INET6) 1087 memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in6)); 1088 else 1089 ret = -1; 1090 1091 freeaddrinfo(res); 1092 return ret; 1093 } 1094 1095 static void usage(char *name) 1096 { 1097 printf("%s -s [-vVd] [-S size] [-C count] [-a addr] [-p port]\n", 1098 name); 1099 printf("%s -c [-vVd] [-S size] [-C count] -a addr [-p port]\n", 1100 name); 1101 printf("\t-c\t\tclient side\n"); 1102 printf("\t-s\t\tserver side. To bind to any address with IPv6 use -a ::0\n"); 1103 printf("\t-v\t\tdisplay ping data to stdout\n"); 1104 printf("\t-V\t\tvalidate ping data\n"); 1105 printf("\t-d\t\tdebug printfs\n"); 1106 printf("\t-S size \tping data size\n"); 1107 printf("\t-C count\tping count times\n"); 1108 printf("\t-a addr\t\taddress\n"); 1109 printf("\t-p port\t\tport\n"); 1110 printf("\t-P\t\tpersistent server mode allowing multiple connections\n"); 1111 } 1112 1113 int main(int argc, char *argv[]) 1114 { 1115 struct rping_cb *cb; 1116 int op; 1117 int ret = 0; 1118 int persistent_server = 0; 1119 1120 cb = malloc(sizeof(*cb)); 1121 if (!cb) 1122 return -ENOMEM; 1123 1124 memset(cb, 0, sizeof(*cb)); 1125 cb->server = -1; 1126 cb->state = IDLE; 1127 cb->size = 64; 1128 cb->sin.ss_family = PF_INET; 1129 cb->port = htons(7174); 1130 sem_init(&cb->sem, 0, 0); 1131 1132 opterr = 0; 1133 while ((op=getopt(argc, argv, "a:Pp:C:S:t:scvVd")) != -1) { 1134 switch (op) { 1135 case 'a': 1136 ret = get_addr(optarg, (struct sockaddr *) &cb->sin); 1137 break; 1138 case 'P': 1139 persistent_server = 1; 1140 break; 1141 case 'p': 1142 cb->port = htons(atoi(optarg)); 1143 DEBUG_LOG("port %d\n", (int) atoi(optarg)); 1144 break; 1145 case 's': 1146 cb->server = 1; 1147 DEBUG_LOG("server\n"); 1148 break; 1149 case 'c': 1150 cb->server = 0; 1151 DEBUG_LOG("client\n"); 1152 break; 1153 case 'S': 1154 cb->size = atoi(optarg); 1155 if ((cb->size < RPING_MIN_BUFSIZE) || 1156 (cb->size > (RPING_BUFSIZE - 1))) { 1157 fprintf(stderr, "Invalid size %d " 1158 "(valid range is %d to %d)\n", 1159 (int)cb->size, (int)(RPING_MIN_BUFSIZE), 1160 (int)(RPING_BUFSIZE)); 1161 ret = EINVAL; 1162 } else 1163 DEBUG_LOG("size %d\n", (int) atoi(optarg)); 1164 break; 1165 case 'C': 1166 cb->count = atoi(optarg); 1167 if (cb->count < 0) { 1168 fprintf(stderr, "Invalid count %d\n", 1169 cb->count); 1170 ret = EINVAL; 1171 } else 1172 DEBUG_LOG("count %d\n", (int) cb->count); 1173 break; 1174 case 'v': 1175 cb->verbose++; 1176 DEBUG_LOG("verbose\n"); 1177 break; 1178 case 'V': 1179 cb->validate++; 1180 DEBUG_LOG("validate data\n"); 1181 break; 1182 case 'd': 1183 debug++; 1184 break; 1185 default: 1186 usage("rping"); 1187 ret = EINVAL; 1188 goto out; 1189 } 1190 } 1191 if (ret) 1192 goto out; 1193 1194 if (cb->server == -1) { 1195 usage("rping"); 1196 ret = EINVAL; 1197 goto out; 1198 } 1199 1200 cb->cm_channel = rdma_create_event_channel(); 1201 if (!cb->cm_channel) { 1202 perror("rdma_create_event_channel"); 1203 goto out; 1204 } 1205 1206 ret = rdma_create_id(cb->cm_channel, &cb->cm_id, cb, RDMA_PS_TCP); 1207 if (ret) { 1208 perror("rdma_create_id"); 1209 goto out2; 1210 } 1211 DEBUG_LOG("created cm_id %p\n", cb->cm_id); 1212 1213 pthread_create(&cb->cmthread, NULL, cm_thread, cb); 1214 1215 if (cb->server) { 1216 if (persistent_server) 1217 ret = rping_run_persistent_server(cb); 1218 else 1219 ret = rping_run_server(cb); 1220 } else 1221 ret = rping_run_client(cb); 1222 1223 DEBUG_LOG("destroy cm_id %p\n", cb->cm_id); 1224 rdma_destroy_id(cb->cm_id); 1225 out2: 1226 rdma_destroy_event_channel(cb->cm_channel); 1227 out: 1228 free(cb); 1229 return ret; 1230 } 1231