1 /*
2 * Copyright (c) 2011 Intel Corporation. All rights reserved.
3 *
4 * This software is available to you under the OpenIB.org BSD license
5 * below:
6 *
7 * Redistribution and use in source and binary forms, with or
8 * without modification, are permitted provided that the following
9 * conditions are met:
10 *
11 * - Redistributions of source code must retain the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer.
14 *
15 * - Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following
17 * disclaimer in the documentation and/or other materials
18 * provided with the distribution.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
23 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
24 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
25 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
26 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <strings.h>
34 #include <errno.h>
35 #include <getopt.h>
36 #include <arpa/inet.h>
37 #include <sys/mman.h>
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <sys/time.h>
41 #include <sys/stat.h>
42 #include <fcntl.h>
43 #include <netdb.h>
44 #include <unistd.h>
45
46 #include <rdma/rsocket.h>
47
48 union rsocket_address {
49 struct sockaddr sa;
50 struct sockaddr_in sin;
51 struct sockaddr_in6 sin6;
52 struct sockaddr_storage storage;
53 };
54
55 static const char *port = "7427";
56 static char *dst_addr;
57 static char *dst_file;
58 static char *src_file;
59 static struct timeval start, end;
60 //static void buf[1024 * 1024];
61 static uint64_t bytes;
62 static int fd;
63 static void *file_addr;
64
65 enum {
66 CMD_NOOP,
67 CMD_OPEN,
68 CMD_CLOSE,
69 CMD_WRITE,
70 CMD_RESP = 0x80,
71 };
72
73 /* TODO: handle byte swapping */
74 struct msg_hdr {
75 uint8_t version;
76 uint8_t command;
77 uint16_t len;
78 uint32_t data;
79 uint64_t id;
80 };
81
82 struct msg_open {
83 struct msg_hdr hdr;
84 char path[0];
85 };
86
87 struct msg_write {
88 struct msg_hdr hdr;
89 uint64_t size;
90 };
91
show_perf(void)92 static void show_perf(void)
93 {
94 float usec;
95
96 usec = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
97
98 printf("%lld bytes in %.2f seconds = %.2f Gb/sec\n",
99 (long long) bytes, usec / 1000000., (bytes * 8) / (1000. * usec));
100 }
101
_ntop(union rsocket_address * rsa)102 static char *_ntop(union rsocket_address *rsa)
103 {
104 static char addr[32];
105
106 switch (rsa->sa.sa_family) {
107 case AF_INET:
108 inet_ntop(AF_INET, &rsa->sin.sin_addr, addr, sizeof addr);
109 break;
110 case AF_INET6:
111 inet_ntop(AF_INET6, &rsa->sin6.sin6_addr, addr, sizeof addr);
112 break;
113 default:
114 addr[0] = '\0';
115 break;
116 }
117
118 return addr;
119 }
120
_recv(int rs,char * msg,size_t len)121 static size_t _recv(int rs, char *msg, size_t len)
122 {
123 size_t ret, offset;
124
125 for (offset = 0; offset < len; offset += ret) {
126 ret = rrecv(rs, msg + offset, len - offset, 0);
127 if (ret <= 0)
128 return ret;
129 }
130
131 return len;
132 }
133
msg_recv_hdr(int rs,struct msg_hdr * hdr)134 static int msg_recv_hdr(int rs, struct msg_hdr *hdr)
135 {
136 int ret;
137
138 ret = _recv(rs, (char *) hdr, sizeof *hdr);
139 if (ret != sizeof *hdr)
140 return -1;
141
142 if (hdr->version || hdr->len < sizeof *hdr) {
143 printf("invalid version %d or length %d\n",
144 hdr->version, hdr->len);
145 return -1;
146 }
147
148 return sizeof *hdr;
149 }
150
msg_get_resp(int rs,struct msg_hdr * msg,uint8_t cmd)151 static int msg_get_resp(int rs, struct msg_hdr *msg, uint8_t cmd)
152 {
153 int ret;
154
155 ret = msg_recv_hdr(rs, msg);
156 if (ret != sizeof *msg)
157 return ret;
158
159 if ((msg->len != sizeof *msg) || (msg->command != (cmd | CMD_RESP))) {
160 printf("invalid length %d or bad command response %x:%x\n",
161 msg->len, msg->command, cmd | CMD_RESP);
162 return -1;
163 }
164
165 return msg->data;
166 }
167
msg_send_resp(int rs,struct msg_hdr * msg,uint32_t status)168 static void msg_send_resp(int rs, struct msg_hdr *msg, uint32_t status)
169 {
170 struct msg_hdr resp;
171
172 resp.version = 0;
173 resp.command = msg->command | CMD_RESP;
174 resp.len = sizeof resp;
175 resp.data = status;
176 resp.id = msg->id;
177 rsend(rs, (char *) &resp, sizeof resp, 0);
178 }
179
server_listen(void)180 static int server_listen(void)
181 {
182 struct addrinfo hints, *res;
183 int ret, rs;
184
185 memset(&hints, 0, sizeof hints);
186 hints.ai_flags = RAI_PASSIVE;
187 ret = getaddrinfo(NULL, port, &hints, &res);
188 if (ret) {
189 printf("getaddrinfo failed: %s\n", gai_strerror(ret));
190 return ret;
191 }
192
193 rs = rsocket(res->ai_family, res->ai_socktype, res->ai_protocol);
194 if (rs < 0) {
195 perror("rsocket failed\n");
196 ret = rs;
197 goto free;
198 }
199
200 ret = 1;
201 ret = rsetsockopt(rs, SOL_SOCKET, SO_REUSEADDR, &ret, sizeof ret);
202 if (ret) {
203 perror("rsetsockopt failed");
204 goto close;
205 }
206
207 ret = rbind(rs, res->ai_addr, res->ai_addrlen);
208 if (ret) {
209 perror("rbind failed");
210 goto close;
211 }
212
213 ret = rlisten(rs, 1);
214 if (ret) {
215 perror("rlisten failed");
216 goto close;
217 }
218
219 ret = rs;
220 goto free;
221
222 close:
223 rclose(rs);
224 free:
225 freeaddrinfo(res);
226 return ret;
227 }
228
server_open(int rs,struct msg_hdr * msg)229 static int server_open(int rs, struct msg_hdr *msg)
230 {
231 char *path = NULL;
232 int ret, len;
233
234 printf("opening: ");
235 fflush(NULL);
236 if (file_addr || fd > 0) {
237 printf("cannot open another file\n");
238 ret = EBUSY;
239 goto out;
240 }
241
242 len = msg->len - sizeof *msg;
243 path = malloc(len);
244 if (!path) {
245 printf("cannot allocate path name\n");
246 ret = ENOMEM;
247 goto out;
248 }
249
250 ret = _recv(rs, path, len);
251 if (ret != len) {
252 printf("error receiving path\n");
253 goto out;
254 }
255
256 printf("%s, ", path);
257 fflush(NULL);
258 fd = open(path, O_RDWR | O_CREAT | O_TRUNC, msg->data);
259 if (fd < 0) {
260 printf("unable to open destination file\n");
261 ret = errno;
262 }
263
264 ret = 0;
265 out:
266 if (path)
267 free(path);
268
269 msg_send_resp(rs, msg, ret);
270 return ret;
271 }
272
server_close(int rs,struct msg_hdr * msg)273 static void server_close(int rs, struct msg_hdr *msg)
274 {
275 printf("closing...");
276 fflush(NULL);
277 msg_send_resp(rs, msg, 0);
278
279 if (file_addr) {
280 munmap(file_addr, bytes);
281 file_addr = NULL;
282 }
283
284 if (fd > 0) {
285 close(fd);
286 fd = 0;
287 }
288 printf("done\n");
289 }
290
server_write(int rs,struct msg_hdr * msg)291 static int server_write(int rs, struct msg_hdr *msg)
292 {
293 size_t len;
294 int ret;
295
296 printf("transferring");
297 fflush(NULL);
298 if (fd <= 0) {
299 printf("...file not opened\n");
300 ret = EINVAL;
301 goto out;
302 }
303
304 if (msg->len != sizeof(struct msg_write)) {
305 printf("...invalid message length %d\n", msg->len);
306 ret = EINVAL;
307 goto out;
308 }
309
310 ret = _recv(rs, (char *) &bytes, sizeof bytes);
311 if (ret != sizeof bytes)
312 goto out;
313
314 ret = ftruncate(fd, bytes);
315 if (ret)
316 goto out;
317
318 file_addr = mmap(NULL, bytes, PROT_WRITE, MAP_SHARED, fd, 0);
319 if (file_addr == (void *) -1) {
320 printf("...error mapping file\n");
321 ret = errno;
322 goto out;
323 }
324
325 printf("...%lld bytes...", (long long) bytes);
326 fflush(NULL);
327 len = _recv(rs, file_addr, bytes);
328 if (len != bytes) {
329 printf("...error receiving data\n");
330 ret = (int) len;
331 }
332 out:
333 msg_send_resp(rs, msg, ret);
334 return ret;
335 }
336
server_process(int rs)337 static void server_process(int rs)
338 {
339 struct msg_hdr msg;
340 int ret;
341
342 do {
343 ret = msg_recv_hdr(rs, &msg);
344 if (ret != sizeof msg)
345 break;
346
347 switch (msg.command) {
348 case CMD_OPEN:
349 ret = server_open(rs, &msg);
350 break;
351 case CMD_CLOSE:
352 server_close(rs, &msg);
353 ret = 0;
354 break;
355 case CMD_WRITE:
356 ret = server_write(rs, &msg);
357 break;
358 default:
359 msg_send_resp(rs, &msg, EINVAL);
360 ret = -1;
361 break;
362 }
363
364 } while (!ret);
365 }
366
server_run(void)367 static int server_run(void)
368 {
369 int lrs, rs;
370 union rsocket_address rsa;
371 socklen_t len;
372
373 lrs = server_listen();
374 if (lrs < 0)
375 return lrs;
376
377 while (1) {
378 len = sizeof rsa;
379 printf("waiting for connection...");
380 fflush(NULL);
381 rs = raccept(lrs, &rsa.sa, &len);
382
383 printf("client: %s\n", _ntop(&rsa));
384 server_process(rs);
385
386 rshutdown(rs, SHUT_RDWR);
387 rclose(rs);
388 }
389 return 0;
390 }
391
client_connect(void)392 static int client_connect(void)
393 {
394 struct addrinfo *res;
395 int ret, rs;
396
397 ret = getaddrinfo(dst_addr, port, NULL, &res);
398 if (ret) {
399 printf("getaddrinfo failed: %s\n", gai_strerror(ret));
400 return ret;
401 }
402
403 rs = rsocket(res->ai_family, res->ai_socktype, res->ai_protocol);
404 if (rs < 0) {
405 perror("rsocket failed\n");
406 goto free;
407 }
408
409 ret = rconnect(rs, res->ai_addr, res->ai_addrlen);
410 if (ret) {
411 perror("rconnect failed\n");
412 rclose(rs);
413 rs = ret;
414 }
415
416 free:
417 freeaddrinfo(res);
418 return rs;
419 }
420
client_open(int rs)421 static int client_open(int rs)
422 {
423 struct msg_open *msg;
424 struct stat stats;
425 uint32_t len;
426 int ret;
427
428 printf("opening...");
429 fflush(NULL);
430 fd = open(src_file, O_RDONLY);
431 if (fd < 0)
432 return fd;
433
434 ret = fstat(fd, &stats);
435 if (ret < 0)
436 goto err1;
437
438 bytes = (uint64_t) stats.st_size;
439 file_addr = mmap(NULL, bytes, PROT_READ, MAP_SHARED, fd, 0);
440 if (file_addr == (void *) -1) {
441 ret = errno;
442 goto err1;
443 }
444
445 len = (((uint32_t) strlen(dst_file)) + 8) & 0xFFFFFFF8;
446 msg = calloc(1, sizeof(*msg) + len);
447 if (!msg) {
448 ret = -1;
449 goto err2;
450 }
451
452 msg->hdr.command = CMD_OPEN;
453 msg->hdr.len = sizeof(*msg) + len;
454 msg->hdr.data = (uint32_t) stats.st_mode;
455 strcpy(msg->path, dst_file);
456 ret = rsend(rs, msg, msg->hdr.len, 0);
457 if (ret != msg->hdr.len)
458 goto err3;
459
460 ret = msg_get_resp(rs, &msg->hdr, CMD_OPEN);
461 if (ret)
462 goto err3;
463
464 return 0;
465
466 err3:
467 free(msg);
468 err2:
469 munmap(file_addr, bytes);
470 err1:
471 close(fd);
472 return ret;
473 }
474
client_start_write(int rs)475 static int client_start_write(int rs)
476 {
477 struct msg_write msg;
478 int ret;
479
480 printf("transferring");
481 fflush(NULL);
482 memset(&msg, 0, sizeof msg);
483 msg.hdr.command = CMD_WRITE;
484 msg.hdr.len = sizeof(msg);
485 msg.size = bytes;
486
487 ret = rsend(rs, &msg, sizeof msg, 0);
488 if (ret != msg.hdr.len)
489 return ret;
490
491 return 0;
492 }
493
client_close(int rs)494 static int client_close(int rs)
495 {
496 struct msg_hdr msg;
497 int ret;
498
499 printf("closing...");
500 fflush(NULL);
501 memset(&msg, 0, sizeof msg);
502 msg.command = CMD_CLOSE;
503 msg.len = sizeof msg;
504 ret = rsend(rs, (char *) &msg, msg.len, 0);
505 if (ret != msg.len)
506 goto out;
507
508 ret = msg_get_resp(rs, &msg, CMD_CLOSE);
509 if (ret)
510 goto out;
511
512 printf("done\n");
513 out:
514 munmap(file_addr, bytes);
515 close(fd);
516 return ret;
517 }
518
client_run(void)519 static int client_run(void)
520 {
521 struct msg_hdr ack;
522 int ret, rs;
523 size_t len;
524
525 rs = client_connect();
526 if (rs < 0)
527 return rs;
528
529 ret = client_open(rs);
530 if (ret)
531 goto shutdown;
532
533 ret = client_start_write(rs);
534 if (ret)
535 goto close;
536
537 printf("...");
538 fflush(NULL);
539 gettimeofday(&start, NULL);
540 len = rsend(rs, file_addr, bytes, 0);
541 if (len == bytes)
542 ret = msg_get_resp(rs, &ack, CMD_WRITE);
543 else
544 ret = (int) len;
545
546 gettimeofday(&end, NULL);
547
548 close:
549 client_close(rs);
550 shutdown:
551 rshutdown(rs, SHUT_RDWR);
552 rclose(rs);
553 if (!ret)
554 show_perf();
555 return ret;
556 }
557
show_usage(char * program)558 static void show_usage(char *program)
559 {
560 printf("usage 1: %s [options]\n", program);
561 printf("\t starts the server application\n");
562 printf("\t[-p port_number]\n");
563 printf("usage 2: %s source server[:destination] [options]\n", program);
564 printf("\t source - file name and path\n");
565 printf("\t server - name or address\n");
566 printf("\t destination - file name and path\n");
567 printf("\t[-p port_number]\n");
568 exit(1);
569 }
570
server_opts(int argc,char ** argv)571 static void server_opts(int argc, char **argv)
572 {
573 int op;
574
575 while ((op = getopt(argc, argv, "p:")) != -1) {
576 switch (op) {
577 case 'p':
578 port = optarg;
579 break;
580 default:
581 show_usage(argv[0]);
582 }
583 }
584 }
585
client_opts(int argc,char ** argv)586 static void client_opts(int argc, char **argv)
587 {
588 int op;
589
590 if (argc < 3)
591 show_usage(argv[0]);
592
593 src_file = argv[1];
594 dst_addr = argv[2];
595 dst_file = strchr(dst_addr, ':');
596 if (dst_file) {
597 *dst_file = '\0';
598 dst_file++;
599 }
600 if (!dst_file)
601 dst_file = src_file;
602
603 while ((op = getopt(argc, argv, "p:")) != -1) {
604 switch (op) {
605 case 'p':
606 port = optarg;
607 break;
608 default:
609 show_usage(argv[0]);
610 }
611 }
612
613 }
614
main(int argc,char ** argv)615 int main(int argc, char **argv)
616 {
617 int ret;
618
619 if (argc == 1 || argv[1][0] == '-') {
620 server_opts(argc, argv);
621 ret = server_run();
622 } else {
623 client_opts(argc, argv);
624 ret = client_run();
625 }
626
627 return ret;
628 }
629