1 /* 2 * Copyright (c) 2011 Intel Corporation. All rights reserved. 3 * 4 * This software is available to you under the OpenIB.org BSD license 5 * below: 6 * 7 * Redistribution and use in source and binary forms, with or 8 * without modification, are permitted provided that the following 9 * conditions are met: 10 * 11 * - Redistributions of source code must retain the above 12 * copyright notice, this list of conditions and the following 13 * disclaimer. 14 * 15 * - Redistributions in binary form must reproduce the above 16 * copyright notice, this list of conditions and the following 17 * disclaimer in the documentation and/or other materials 18 * provided with the distribution. 19 * 20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 21 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 22 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV 23 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 24 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 25 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 26 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 27 * SOFTWARE. 28 */ 29 30 #include <stdio.h> 31 #include <stdlib.h> 32 #include <string.h> 33 #include <strings.h> 34 #include <errno.h> 35 #include <getopt.h> 36 #include <arpa/inet.h> 37 #include <sys/mman.h> 38 #include <sys/types.h> 39 #include <sys/socket.h> 40 #include <sys/time.h> 41 #include <sys/stat.h> 42 #include <fcntl.h> 43 #include <netdb.h> 44 #include <unistd.h> 45 46 #include <rdma/rsocket.h> 47 48 union rsocket_address { 49 struct sockaddr sa; 50 struct sockaddr_in sin; 51 struct sockaddr_in6 sin6; 52 struct sockaddr_storage storage; 53 }; 54 55 static const char *port = "7427"; 56 static char *dst_addr; 57 static char *dst_file; 58 static char *src_file; 59 static struct timeval start, end; 60 //static void buf[1024 * 1024]; 61 static uint64_t bytes; 62 static int fd; 63 static void *file_addr; 64 65 enum { 66 CMD_NOOP, 67 CMD_OPEN, 68 CMD_CLOSE, 69 CMD_WRITE, 70 CMD_RESP = 0x80, 71 }; 72 73 /* TODO: handle byte swapping */ 74 struct msg_hdr { 75 uint8_t version; 76 uint8_t command; 77 uint16_t len; 78 uint32_t data; 79 uint64_t id; 80 }; 81 82 struct msg_open { 83 struct msg_hdr hdr; 84 char path[0]; 85 }; 86 87 struct msg_write { 88 struct msg_hdr hdr; 89 uint64_t size; 90 }; 91 92 static void show_perf(void) 93 { 94 float usec; 95 96 usec = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); 97 98 printf("%lld bytes in %.2f seconds = %.2f Gb/sec\n", 99 (long long) bytes, usec / 1000000., (bytes * 8) / (1000. * usec)); 100 } 101 102 static char *_ntop(union rsocket_address *rsa) 103 { 104 static char addr[32]; 105 106 switch (rsa->sa.sa_family) { 107 case AF_INET: 108 inet_ntop(AF_INET, &rsa->sin.sin_addr, addr, sizeof addr); 109 break; 110 case AF_INET6: 111 inet_ntop(AF_INET6, &rsa->sin6.sin6_addr, addr, sizeof addr); 112 break; 113 default: 114 addr[0] = '\0'; 115 break; 116 } 117 118 return addr; 119 } 120 121 static size_t _recv(int rs, char *msg, size_t len) 122 { 123 size_t ret, offset; 124 125 for (offset = 0; offset < len; offset += ret) { 126 ret = rrecv(rs, msg + offset, len - offset, 0); 127 if (ret <= 0) 128 return ret; 129 } 130 131 return len; 132 } 133 134 static int msg_recv_hdr(int rs, struct msg_hdr *hdr) 135 { 136 int ret; 137 138 ret = _recv(rs, (char *) hdr, sizeof *hdr); 139 if (ret != sizeof *hdr) 140 return -1; 141 142 if (hdr->version || hdr->len < sizeof *hdr) { 143 printf("invalid version %d or length %d\n", 144 hdr->version, hdr->len); 145 return -1; 146 } 147 148 return sizeof *hdr; 149 } 150 151 static int msg_get_resp(int rs, struct msg_hdr *msg, uint8_t cmd) 152 { 153 int ret; 154 155 ret = msg_recv_hdr(rs, msg); 156 if (ret != sizeof *msg) 157 return ret; 158 159 if ((msg->len != sizeof *msg) || (msg->command != (cmd | CMD_RESP))) { 160 printf("invalid length %d or bad command response %x:%x\n", 161 msg->len, msg->command, cmd | CMD_RESP); 162 return -1; 163 } 164 165 return msg->data; 166 } 167 168 static void msg_send_resp(int rs, struct msg_hdr *msg, uint32_t status) 169 { 170 struct msg_hdr resp; 171 172 resp.version = 0; 173 resp.command = msg->command | CMD_RESP; 174 resp.len = sizeof resp; 175 resp.data = status; 176 resp.id = msg->id; 177 rsend(rs, (char *) &resp, sizeof resp, 0); 178 } 179 180 static int server_listen(void) 181 { 182 struct addrinfo hints, *res; 183 int ret, rs; 184 185 memset(&hints, 0, sizeof hints); 186 hints.ai_flags = RAI_PASSIVE; 187 ret = getaddrinfo(NULL, port, &hints, &res); 188 if (ret) { 189 printf("getaddrinfo failed: %s\n", gai_strerror(ret)); 190 return ret; 191 } 192 193 rs = rsocket(res->ai_family, res->ai_socktype, res->ai_protocol); 194 if (rs < 0) { 195 perror("rsocket failed\n"); 196 ret = rs; 197 goto free; 198 } 199 200 ret = 1; 201 ret = rsetsockopt(rs, SOL_SOCKET, SO_REUSEADDR, &ret, sizeof ret); 202 if (ret) { 203 perror("rsetsockopt failed"); 204 goto close; 205 } 206 207 ret = rbind(rs, res->ai_addr, res->ai_addrlen); 208 if (ret) { 209 perror("rbind failed"); 210 goto close; 211 } 212 213 ret = rlisten(rs, 1); 214 if (ret) { 215 perror("rlisten failed"); 216 goto close; 217 } 218 219 ret = rs; 220 goto free; 221 222 close: 223 rclose(rs); 224 free: 225 freeaddrinfo(res); 226 return ret; 227 } 228 229 static int server_open(int rs, struct msg_hdr *msg) 230 { 231 char *path = NULL; 232 int ret, len; 233 234 printf("opening: "); 235 fflush(NULL); 236 if (file_addr || fd > 0) { 237 printf("cannot open another file\n"); 238 ret = EBUSY; 239 goto out; 240 } 241 242 len = msg->len - sizeof *msg; 243 path = malloc(len); 244 if (!path) { 245 printf("cannot allocate path name\n"); 246 ret = ENOMEM; 247 goto out; 248 } 249 250 ret = _recv(rs, path, len); 251 if (ret != len) { 252 printf("error receiving path\n"); 253 goto out; 254 } 255 256 printf("%s, ", path); 257 fflush(NULL); 258 fd = open(path, O_RDWR | O_CREAT | O_TRUNC, msg->data); 259 if (fd < 0) { 260 printf("unable to open destination file\n"); 261 ret = errno; 262 } 263 264 ret = 0; 265 out: 266 if (path) 267 free(path); 268 269 msg_send_resp(rs, msg, ret); 270 return ret; 271 } 272 273 static void server_close(int rs, struct msg_hdr *msg) 274 { 275 printf("closing..."); 276 fflush(NULL); 277 msg_send_resp(rs, msg, 0); 278 279 if (file_addr) { 280 munmap(file_addr, bytes); 281 file_addr = NULL; 282 } 283 284 if (fd > 0) { 285 close(fd); 286 fd = 0; 287 } 288 printf("done\n"); 289 } 290 291 static int server_write(int rs, struct msg_hdr *msg) 292 { 293 size_t len; 294 int ret; 295 296 printf("transferring"); 297 fflush(NULL); 298 if (fd <= 0) { 299 printf("...file not opened\n"); 300 ret = EINVAL; 301 goto out; 302 } 303 304 if (msg->len != sizeof(struct msg_write)) { 305 printf("...invalid message length %d\n", msg->len); 306 ret = EINVAL; 307 goto out; 308 } 309 310 ret = _recv(rs, (char *) &bytes, sizeof bytes); 311 if (ret != sizeof bytes) 312 goto out; 313 314 ret = ftruncate(fd, bytes); 315 if (ret) 316 goto out; 317 318 file_addr = mmap(NULL, bytes, PROT_WRITE, MAP_SHARED, fd, 0); 319 if (file_addr == (void *) -1) { 320 printf("...error mapping file\n"); 321 ret = errno; 322 goto out; 323 } 324 325 printf("...%lld bytes...", (long long) bytes); 326 fflush(NULL); 327 len = _recv(rs, file_addr, bytes); 328 if (len != bytes) { 329 printf("...error receiving data\n"); 330 ret = (int) len; 331 } 332 out: 333 msg_send_resp(rs, msg, ret); 334 return ret; 335 } 336 337 static void server_process(int rs) 338 { 339 struct msg_hdr msg; 340 int ret; 341 342 do { 343 ret = msg_recv_hdr(rs, &msg); 344 if (ret != sizeof msg) 345 break; 346 347 switch (msg.command) { 348 case CMD_OPEN: 349 ret = server_open(rs, &msg); 350 break; 351 case CMD_CLOSE: 352 server_close(rs, &msg); 353 ret = 0; 354 break; 355 case CMD_WRITE: 356 ret = server_write(rs, &msg); 357 break; 358 default: 359 msg_send_resp(rs, &msg, EINVAL); 360 ret = -1; 361 break; 362 } 363 364 } while (!ret); 365 } 366 367 static int server_run(void) 368 { 369 int lrs, rs; 370 union rsocket_address rsa; 371 socklen_t len; 372 373 lrs = server_listen(); 374 if (lrs < 0) 375 return lrs; 376 377 while (1) { 378 len = sizeof rsa; 379 printf("waiting for connection..."); 380 fflush(NULL); 381 rs = raccept(lrs, &rsa.sa, &len); 382 383 printf("client: %s\n", _ntop(&rsa)); 384 server_process(rs); 385 386 rshutdown(rs, SHUT_RDWR); 387 rclose(rs); 388 } 389 return 0; 390 } 391 392 static int client_connect(void) 393 { 394 struct addrinfo *res; 395 int ret, rs; 396 397 ret = getaddrinfo(dst_addr, port, NULL, &res); 398 if (ret) { 399 printf("getaddrinfo failed: %s\n", gai_strerror(ret)); 400 return ret; 401 } 402 403 rs = rsocket(res->ai_family, res->ai_socktype, res->ai_protocol); 404 if (rs < 0) { 405 perror("rsocket failed\n"); 406 goto free; 407 } 408 409 ret = rconnect(rs, res->ai_addr, res->ai_addrlen); 410 if (ret) { 411 perror("rconnect failed\n"); 412 rclose(rs); 413 rs = ret; 414 } 415 416 free: 417 freeaddrinfo(res); 418 return rs; 419 } 420 421 static int client_open(int rs) 422 { 423 struct msg_open *msg; 424 struct stat stats; 425 uint32_t len; 426 int ret; 427 428 printf("opening..."); 429 fflush(NULL); 430 fd = open(src_file, O_RDONLY); 431 if (fd < 0) 432 return fd; 433 434 ret = fstat(fd, &stats); 435 if (ret < 0) 436 goto err1; 437 438 bytes = (uint64_t) stats.st_size; 439 file_addr = mmap(NULL, bytes, PROT_READ, MAP_SHARED, fd, 0); 440 if (file_addr == (void *) -1) { 441 ret = errno; 442 goto err1; 443 } 444 445 len = (((uint32_t) strlen(dst_file)) + 8) & 0xFFFFFFF8; 446 msg = calloc(1, sizeof(*msg) + len); 447 if (!msg) { 448 ret = -1; 449 goto err2; 450 } 451 452 msg->hdr.command = CMD_OPEN; 453 msg->hdr.len = sizeof(*msg) + len; 454 msg->hdr.data = (uint32_t) stats.st_mode; 455 strcpy(msg->path, dst_file); 456 ret = rsend(rs, msg, msg->hdr.len, 0); 457 if (ret != msg->hdr.len) 458 goto err3; 459 460 ret = msg_get_resp(rs, &msg->hdr, CMD_OPEN); 461 if (ret) 462 goto err3; 463 464 return 0; 465 466 err3: 467 free(msg); 468 err2: 469 munmap(file_addr, bytes); 470 err1: 471 close(fd); 472 return ret; 473 } 474 475 static int client_start_write(int rs) 476 { 477 struct msg_write msg; 478 int ret; 479 480 printf("transferring"); 481 fflush(NULL); 482 memset(&msg, 0, sizeof msg); 483 msg.hdr.command = CMD_WRITE; 484 msg.hdr.len = sizeof(msg); 485 msg.size = bytes; 486 487 ret = rsend(rs, &msg, sizeof msg, 0); 488 if (ret != msg.hdr.len) 489 return ret; 490 491 return 0; 492 } 493 494 static int client_close(int rs) 495 { 496 struct msg_hdr msg; 497 int ret; 498 499 printf("closing..."); 500 fflush(NULL); 501 memset(&msg, 0, sizeof msg); 502 msg.command = CMD_CLOSE; 503 msg.len = sizeof msg; 504 ret = rsend(rs, (char *) &msg, msg.len, 0); 505 if (ret != msg.len) 506 goto out; 507 508 ret = msg_get_resp(rs, &msg, CMD_CLOSE); 509 if (ret) 510 goto out; 511 512 printf("done\n"); 513 out: 514 munmap(file_addr, bytes); 515 close(fd); 516 return ret; 517 } 518 519 static int client_run(void) 520 { 521 struct msg_hdr ack; 522 int ret, rs; 523 size_t len; 524 525 rs = client_connect(); 526 if (rs < 0) 527 return rs; 528 529 ret = client_open(rs); 530 if (ret) 531 goto shutdown; 532 533 ret = client_start_write(rs); 534 if (ret) 535 goto close; 536 537 printf("..."); 538 fflush(NULL); 539 gettimeofday(&start, NULL); 540 len = rsend(rs, file_addr, bytes, 0); 541 if (len == bytes) 542 ret = msg_get_resp(rs, &ack, CMD_WRITE); 543 else 544 ret = (int) len; 545 546 gettimeofday(&end, NULL); 547 548 close: 549 client_close(rs); 550 shutdown: 551 rshutdown(rs, SHUT_RDWR); 552 rclose(rs); 553 if (!ret) 554 show_perf(); 555 return ret; 556 } 557 558 static void show_usage(char *program) 559 { 560 printf("usage 1: %s [options]\n", program); 561 printf("\t starts the server application\n"); 562 printf("\t[-p port_number]\n"); 563 printf("usage 2: %s source server[:destination] [options]\n", program); 564 printf("\t source - file name and path\n"); 565 printf("\t server - name or address\n"); 566 printf("\t destination - file name and path\n"); 567 printf("\t[-p port_number]\n"); 568 exit(1); 569 } 570 571 static void server_opts(int argc, char **argv) 572 { 573 int op; 574 575 while ((op = getopt(argc, argv, "p:")) != -1) { 576 switch (op) { 577 case 'p': 578 port = optarg; 579 break; 580 default: 581 show_usage(argv[0]); 582 } 583 } 584 } 585 586 static void client_opts(int argc, char **argv) 587 { 588 int op; 589 590 if (argc < 3) 591 show_usage(argv[0]); 592 593 src_file = argv[1]; 594 dst_addr = argv[2]; 595 dst_file = strchr(dst_addr, ':'); 596 if (dst_file) { 597 *dst_file = '\0'; 598 dst_file++; 599 } 600 if (!dst_file) 601 dst_file = src_file; 602 603 while ((op = getopt(argc, argv, "p:")) != -1) { 604 switch (op) { 605 case 'p': 606 port = optarg; 607 break; 608 default: 609 show_usage(argv[0]); 610 } 611 } 612 613 } 614 615 int main(int argc, char **argv) 616 { 617 int ret; 618 619 if (argc == 1 || argv[1][0] == '-') { 620 server_opts(argc, argv); 621 ret = server_run(); 622 } else { 623 client_opts(argc, argv); 624 ret = client_run(); 625 } 626 627 return ret; 628 } 629