1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD 3 * 4 * Copyright (c) 2004 Pawel Jakub Dawidek <pjd@FreeBSD.org> 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 * 28 * $FreeBSD$ 29 */ 30 31 #include <sys/param.h> 32 #include <sys/bio.h> 33 #include <sys/disk.h> 34 #include <sys/endian.h> 35 #include <sys/ioctl.h> 36 #include <sys/queue.h> 37 #include <sys/socket.h> 38 #include <sys/stat.h> 39 #include <sys/time.h> 40 #include <arpa/inet.h> 41 #include <netinet/in.h> 42 #include <netinet/tcp.h> 43 #include <assert.h> 44 #include <err.h> 45 #include <errno.h> 46 #include <fcntl.h> 47 #include <libgen.h> 48 #include <libutil.h> 49 #include <paths.h> 50 #include <pthread.h> 51 #include <signal.h> 52 #include <stdarg.h> 53 #include <stdio.h> 54 #include <stdlib.h> 55 #include <stdint.h> 56 #include <string.h> 57 #include <syslog.h> 58 #include <unistd.h> 59 60 #include "ggate.h" 61 62 63 #define GGATED_EXPORT_FILE "/etc/gg.exports" 64 65 struct ggd_connection { 66 off_t c_mediasize; 67 unsigned c_sectorsize; 68 unsigned c_flags; /* flags (RO/RW) */ 69 int c_diskfd; 70 int c_sendfd; 71 int c_recvfd; 72 time_t c_birthtime; 73 char *c_path; 74 uint64_t c_token; 75 in_addr_t c_srcip; 76 LIST_ENTRY(ggd_connection) c_next; 77 }; 78 79 struct ggd_request { 80 struct g_gate_hdr r_hdr; 81 char *r_data; 82 TAILQ_ENTRY(ggd_request) r_next; 83 }; 84 #define r_cmd r_hdr.gh_cmd 85 #define r_offset r_hdr.gh_offset 86 #define r_length r_hdr.gh_length 87 #define r_error r_hdr.gh_error 88 89 struct ggd_export { 90 char *e_path; /* path to device/file */ 91 in_addr_t e_ip; /* remote IP address */ 92 in_addr_t e_mask; /* IP mask */ 93 unsigned e_flags; /* flags (RO/RW) */ 94 SLIST_ENTRY(ggd_export) e_next; 95 }; 96 97 static const char *exports_file = GGATED_EXPORT_FILE; 98 static int got_sighup = 0; 99 static in_addr_t bindaddr; 100 101 static TAILQ_HEAD(, ggd_request) inqueue = TAILQ_HEAD_INITIALIZER(inqueue); 102 static TAILQ_HEAD(, ggd_request) outqueue = TAILQ_HEAD_INITIALIZER(outqueue); 103 static pthread_mutex_t inqueue_mtx, outqueue_mtx; 104 static pthread_cond_t inqueue_cond, outqueue_cond; 105 106 static SLIST_HEAD(, ggd_export) exports = SLIST_HEAD_INITIALIZER(exports); 107 static LIST_HEAD(, ggd_connection) connections = LIST_HEAD_INITIALIZER(connections); 108 109 static void *recv_thread(void *arg); 110 static void *disk_thread(void *arg); 111 static void *send_thread(void *arg); 112 113 static void 114 usage(void) 115 { 116 117 fprintf(stderr, "usage: %s [-nv] [-a address] [-F pidfile] [-p port] " 118 "[-R rcvbuf] [-S sndbuf] [exports file]\n", getprogname()); 119 exit(EXIT_FAILURE); 120 } 121 122 static char * 123 ip2str(in_addr_t ip) 124 { 125 static char sip[16]; 126 127 snprintf(sip, sizeof(sip), "%u.%u.%u.%u", 128 ((ip >> 24) & 0xff), 129 ((ip >> 16) & 0xff), 130 ((ip >> 8) & 0xff), 131 (ip & 0xff)); 132 return (sip); 133 } 134 135 static in_addr_t 136 countmask(unsigned m) 137 { 138 in_addr_t mask; 139 140 if (m == 0) { 141 mask = 0x0; 142 } else { 143 mask = 1 << (32 - m); 144 mask--; 145 mask = ~mask; 146 } 147 return (mask); 148 } 149 150 static void 151 line_parse(char *line, unsigned lineno) 152 { 153 struct ggd_export *ex; 154 char *word, *path, *sflags; 155 unsigned flags, i, vmask; 156 in_addr_t ip, mask; 157 158 ip = mask = flags = vmask = 0; 159 path = NULL; 160 sflags = NULL; 161 162 for (i = 0, word = strtok(line, " \t"); word != NULL; 163 i++, word = strtok(NULL, " \t")) { 164 switch (i) { 165 case 0: /* IP address or host name */ 166 ip = g_gate_str2ip(strsep(&word, "/")); 167 if (ip == INADDR_NONE) { 168 g_gate_xlog("Invalid IP/host name at line %u.", 169 lineno); 170 } 171 ip = ntohl(ip); 172 if (word == NULL) 173 vmask = 32; 174 else { 175 errno = 0; 176 vmask = strtoul(word, NULL, 10); 177 if (vmask == 0 && errno != 0) { 178 g_gate_xlog("Invalid IP mask value at " 179 "line %u.", lineno); 180 } 181 if ((unsigned)vmask > 32) { 182 g_gate_xlog("Invalid IP mask value at line %u.", 183 lineno); 184 } 185 } 186 mask = countmask(vmask); 187 break; 188 case 1: /* flags */ 189 if (strcasecmp("rd", word) == 0 || 190 strcasecmp("ro", word) == 0) { 191 flags = O_RDONLY; 192 } else if (strcasecmp("wo", word) == 0) { 193 flags = O_WRONLY; 194 } else if (strcasecmp("rw", word) == 0) { 195 flags = O_RDWR; 196 } else { 197 g_gate_xlog("Invalid value in flags field at " 198 "line %u.", lineno); 199 } 200 sflags = word; 201 break; 202 case 2: /* path */ 203 if (strlen(word) >= MAXPATHLEN) { 204 g_gate_xlog("Path too long at line %u. ", 205 lineno); 206 } 207 path = word; 208 break; 209 default: 210 g_gate_xlog("Too many arguments at line %u. ", lineno); 211 } 212 } 213 if (i != 3) 214 g_gate_xlog("Too few arguments at line %u.", lineno); 215 216 ex = malloc(sizeof(*ex)); 217 if (ex == NULL) 218 g_gate_xlog("Not enough memory."); 219 ex->e_path = strdup(path); 220 if (ex->e_path == NULL) 221 g_gate_xlog("Not enough memory."); 222 223 /* Made 'and' here. */ 224 ex->e_ip = (ip & mask); 225 ex->e_mask = mask; 226 ex->e_flags = flags; 227 228 SLIST_INSERT_HEAD(&exports, ex, e_next); 229 230 g_gate_log(LOG_DEBUG, "Added %s/%u %s %s to exports list.", 231 ip2str(ex->e_ip), vmask, path, sflags); 232 } 233 234 static void 235 exports_clear(void) 236 { 237 struct ggd_export *ex; 238 239 while (!SLIST_EMPTY(&exports)) { 240 ex = SLIST_FIRST(&exports); 241 SLIST_REMOVE_HEAD(&exports, e_next); 242 free(ex); 243 } 244 } 245 246 #define EXPORTS_LINE_SIZE 2048 247 static void 248 exports_get(void) 249 { 250 char buf[EXPORTS_LINE_SIZE], *line; 251 unsigned lineno = 0, objs = 0, len; 252 FILE *fd; 253 254 exports_clear(); 255 256 fd = fopen(exports_file, "r"); 257 if (fd == NULL) { 258 g_gate_xlog("Cannot open exports file (%s): %s.", exports_file, 259 strerror(errno)); 260 } 261 262 g_gate_log(LOG_INFO, "Reading exports file (%s).", exports_file); 263 264 for (;;) { 265 if (fgets(buf, sizeof(buf), fd) == NULL) { 266 if (feof(fd)) 267 break; 268 269 g_gate_xlog("Error while reading exports file: %s.", 270 strerror(errno)); 271 } 272 273 /* Increase line count. */ 274 lineno++; 275 276 /* Skip spaces and tabs. */ 277 for (line = buf; *line == ' ' || *line == '\t'; ++line) 278 ; 279 280 /* Empty line, comment or empty line at the end of file. */ 281 if (*line == '\n' || *line == '#' || *line == '\0') 282 continue; 283 284 len = strlen(line); 285 if (line[len - 1] == '\n') { 286 /* Remove new line char. */ 287 line[len - 1] = '\0'; 288 } else { 289 if (!feof(fd)) 290 g_gate_xlog("Line %u too long.", lineno); 291 } 292 293 line_parse(line, lineno); 294 objs++; 295 } 296 297 fclose(fd); 298 299 if (objs == 0) 300 g_gate_xlog("There are no objects to export."); 301 302 g_gate_log(LOG_INFO, "Exporting %u object(s).", objs); 303 } 304 305 static int 306 exports_check(struct ggd_export *ex, struct g_gate_cinit *cinit, 307 struct ggd_connection *conn) 308 { 309 char ipmask[32]; /* 32 == strlen("xxx.xxx.xxx.xxx/xxx.xxx.xxx.xxx")+1 */ 310 int error = 0, flags; 311 312 strlcpy(ipmask, ip2str(ex->e_ip), sizeof(ipmask)); 313 strlcat(ipmask, "/", sizeof(ipmask)); 314 strlcat(ipmask, ip2str(ex->e_mask), sizeof(ipmask)); 315 if ((cinit->gc_flags & GGATE_FLAG_RDONLY) != 0) { 316 if (ex->e_flags == O_WRONLY) { 317 g_gate_log(LOG_WARNING, "Read-only access requested, " 318 "but %s (%s) is exported write-only.", ex->e_path, 319 ipmask); 320 return (EPERM); 321 } else { 322 conn->c_flags |= GGATE_FLAG_RDONLY; 323 } 324 } else if ((cinit->gc_flags & GGATE_FLAG_WRONLY) != 0) { 325 if (ex->e_flags == O_RDONLY) { 326 g_gate_log(LOG_WARNING, "Write-only access requested, " 327 "but %s (%s) is exported read-only.", ex->e_path, 328 ipmask); 329 return (EPERM); 330 } else { 331 conn->c_flags |= GGATE_FLAG_WRONLY; 332 } 333 } else { 334 if (ex->e_flags == O_RDONLY) { 335 g_gate_log(LOG_WARNING, "Read-write access requested, " 336 "but %s (%s) is exported read-only.", ex->e_path, 337 ipmask); 338 return (EPERM); 339 } else if (ex->e_flags == O_WRONLY) { 340 g_gate_log(LOG_WARNING, "Read-write access requested, " 341 "but %s (%s) is exported write-only.", ex->e_path, 342 ipmask); 343 return (EPERM); 344 } 345 } 346 if ((conn->c_flags & GGATE_FLAG_RDONLY) != 0) 347 flags = O_RDONLY; 348 else if ((conn->c_flags & GGATE_FLAG_WRONLY) != 0) 349 flags = O_WRONLY; 350 else 351 flags = O_RDWR; 352 conn->c_diskfd = open(ex->e_path, flags); 353 if (conn->c_diskfd == -1) { 354 error = errno; 355 g_gate_log(LOG_ERR, "Cannot open %s: %s.", ex->e_path, 356 strerror(error)); 357 return (error); 358 } 359 return (0); 360 } 361 362 static struct ggd_export * 363 exports_find(struct sockaddr *s, struct g_gate_cinit *cinit, 364 struct ggd_connection *conn) 365 { 366 struct ggd_export *ex; 367 in_addr_t ip; 368 int error; 369 370 ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); 371 SLIST_FOREACH(ex, &exports, e_next) { 372 if ((ip & ex->e_mask) != ex->e_ip) { 373 g_gate_log(LOG_DEBUG, "exports[%s]: IP mismatch.", 374 ex->e_path); 375 continue; 376 } 377 if (strcmp(cinit->gc_path, ex->e_path) != 0) { 378 g_gate_log(LOG_DEBUG, "exports[%s]: Path mismatch.", 379 ex->e_path); 380 continue; 381 } 382 error = exports_check(ex, cinit, conn); 383 if (error == 0) 384 return (ex); 385 else { 386 errno = error; 387 return (NULL); 388 } 389 } 390 g_gate_log(LOG_WARNING, "Unauthorized connection from: %s.", 391 ip2str(ip)); 392 errno = EPERM; 393 return (NULL); 394 } 395 396 /* 397 * Remove timed out connections. 398 */ 399 static void 400 connection_cleanups(void) 401 { 402 struct ggd_connection *conn, *tconn; 403 time_t now; 404 405 time(&now); 406 LIST_FOREACH_SAFE(conn, &connections, c_next, tconn) { 407 if (now - conn->c_birthtime > 10) { 408 LIST_REMOVE(conn, c_next); 409 g_gate_log(LOG_NOTICE, 410 "Connection from %s [%s] removed.", 411 ip2str(conn->c_srcip), conn->c_path); 412 close(conn->c_diskfd); 413 close(conn->c_sendfd); 414 close(conn->c_recvfd); 415 free(conn->c_path); 416 free(conn); 417 } 418 } 419 } 420 421 static struct ggd_connection * 422 connection_find(struct g_gate_cinit *cinit) 423 { 424 struct ggd_connection *conn; 425 426 LIST_FOREACH(conn, &connections, c_next) { 427 if (conn->c_token == cinit->gc_token) 428 break; 429 } 430 return (conn); 431 } 432 433 static struct ggd_connection * 434 connection_new(struct g_gate_cinit *cinit, struct sockaddr *s, int sfd) 435 { 436 struct ggd_connection *conn; 437 in_addr_t ip; 438 439 /* 440 * First, look for old connections. 441 * We probably should do it every X seconds, but what for? 442 * It is only dangerous if an attacker wants to overload connections 443 * queue, so here is a good place to do the cleanups. 444 */ 445 connection_cleanups(); 446 447 conn = malloc(sizeof(*conn)); 448 if (conn == NULL) 449 return (NULL); 450 conn->c_path = strdup(cinit->gc_path); 451 if (conn->c_path == NULL) { 452 free(conn); 453 return (NULL); 454 } 455 conn->c_token = cinit->gc_token; 456 ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); 457 conn->c_srcip = ip; 458 conn->c_sendfd = conn->c_recvfd = -1; 459 if ((cinit->gc_flags & GGATE_FLAG_SEND) != 0) 460 conn->c_sendfd = sfd; 461 else 462 conn->c_recvfd = sfd; 463 conn->c_mediasize = 0; 464 conn->c_sectorsize = 0; 465 time(&conn->c_birthtime); 466 conn->c_flags = cinit->gc_flags; 467 LIST_INSERT_HEAD(&connections, conn, c_next); 468 g_gate_log(LOG_DEBUG, "Connection created [%s, %s].", ip2str(ip), 469 conn->c_path); 470 return (conn); 471 } 472 473 static int 474 connection_add(struct ggd_connection *conn, struct g_gate_cinit *cinit, 475 struct sockaddr *s, int sfd) 476 { 477 in_addr_t ip; 478 479 ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); 480 if ((cinit->gc_flags & GGATE_FLAG_SEND) != 0) { 481 if (conn->c_sendfd != -1) { 482 g_gate_log(LOG_WARNING, 483 "Send socket already exists [%s, %s].", ip2str(ip), 484 conn->c_path); 485 return (EEXIST); 486 } 487 conn->c_sendfd = sfd; 488 } else { 489 if (conn->c_recvfd != -1) { 490 g_gate_log(LOG_WARNING, 491 "Receive socket already exists [%s, %s].", 492 ip2str(ip), conn->c_path); 493 return (EEXIST); 494 } 495 conn->c_recvfd = sfd; 496 } 497 g_gate_log(LOG_DEBUG, "Connection added [%s, %s].", ip2str(ip), 498 conn->c_path); 499 return (0); 500 } 501 502 /* 503 * Remove one socket from the given connection or the whole 504 * connection if sfd == -1. 505 */ 506 static void 507 connection_remove(struct ggd_connection *conn) 508 { 509 510 LIST_REMOVE(conn, c_next); 511 g_gate_log(LOG_DEBUG, "Connection removed [%s %s].", 512 ip2str(conn->c_srcip), conn->c_path); 513 if (conn->c_sendfd != -1) 514 close(conn->c_sendfd); 515 if (conn->c_recvfd != -1) 516 close(conn->c_recvfd); 517 free(conn->c_path); 518 free(conn); 519 } 520 521 static int 522 connection_ready(struct ggd_connection *conn) 523 { 524 525 return (conn->c_sendfd != -1 && conn->c_recvfd != -1); 526 } 527 528 static void 529 connection_launch(struct ggd_connection *conn) 530 { 531 pthread_t td; 532 int error, pid; 533 534 pid = fork(); 535 if (pid > 0) 536 return; 537 else if (pid == -1) { 538 g_gate_log(LOG_ERR, "Cannot fork: %s.", strerror(errno)); 539 return; 540 } 541 g_gate_log(LOG_DEBUG, "Process created [%s].", conn->c_path); 542 543 /* 544 * Create condition variables and mutexes for in-queue and out-queue 545 * synchronization. 546 */ 547 error = pthread_mutex_init(&inqueue_mtx, NULL); 548 if (error != 0) { 549 g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.", 550 strerror(error)); 551 } 552 error = pthread_cond_init(&inqueue_cond, NULL); 553 if (error != 0) { 554 g_gate_xlog("pthread_cond_init(inqueue_cond): %s.", 555 strerror(error)); 556 } 557 error = pthread_mutex_init(&outqueue_mtx, NULL); 558 if (error != 0) { 559 g_gate_xlog("pthread_mutex_init(outqueue_mtx): %s.", 560 strerror(error)); 561 } 562 error = pthread_cond_init(&outqueue_cond, NULL); 563 if (error != 0) { 564 g_gate_xlog("pthread_cond_init(outqueue_cond): %s.", 565 strerror(error)); 566 } 567 568 /* 569 * Create threads: 570 * recvtd - thread for receiving I/O request 571 * diskio - thread for doing I/O request 572 * sendtd - thread for sending I/O requests back 573 */ 574 error = pthread_create(&td, NULL, send_thread, conn); 575 if (error != 0) { 576 g_gate_xlog("pthread_create(send_thread): %s.", 577 strerror(error)); 578 } 579 error = pthread_create(&td, NULL, recv_thread, conn); 580 if (error != 0) { 581 g_gate_xlog("pthread_create(recv_thread): %s.", 582 strerror(error)); 583 } 584 disk_thread(conn); 585 } 586 587 static void 588 sendfail(int sfd, int error, const char *fmt, ...) 589 { 590 struct g_gate_sinit sinit; 591 va_list ap; 592 ssize_t data; 593 594 sinit.gs_error = error; 595 g_gate_swap2n_sinit(&sinit); 596 data = g_gate_send(sfd, &sinit, sizeof(sinit), 0); 597 g_gate_swap2h_sinit(&sinit); 598 if (data != sizeof(sinit)) { 599 g_gate_log(LOG_WARNING, "Cannot send initial packet: %s.", 600 strerror(errno)); 601 return; 602 } 603 if (fmt != NULL) { 604 va_start(ap, fmt); 605 g_gate_vlog(LOG_WARNING, fmt, ap); 606 va_end(ap); 607 } 608 } 609 610 static void * 611 malloc_waitok(size_t size) 612 { 613 void *p; 614 615 while ((p = malloc(size)) == NULL) { 616 g_gate_log(LOG_DEBUG, "Cannot allocate %zu bytes.", size); 617 sleep(1); 618 } 619 return (p); 620 } 621 622 static void * 623 recv_thread(void *arg) 624 { 625 struct ggd_connection *conn; 626 struct ggd_request *req; 627 ssize_t data; 628 int error, fd; 629 630 conn = arg; 631 g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); 632 fd = conn->c_recvfd; 633 for (;;) { 634 /* 635 * Get header packet. 636 */ 637 req = malloc_waitok(sizeof(*req)); 638 data = g_gate_recv(fd, &req->r_hdr, sizeof(req->r_hdr), 639 MSG_WAITALL); 640 if (data == 0) { 641 g_gate_log(LOG_DEBUG, "Process %u exiting.", getpid()); 642 exit(EXIT_SUCCESS); 643 } else if (data == -1) { 644 g_gate_xlog("Error while receiving hdr packet: %s.", 645 strerror(errno)); 646 } else if (data != sizeof(req->r_hdr)) { 647 g_gate_xlog("Malformed hdr packet received."); 648 } 649 g_gate_log(LOG_DEBUG, "Received hdr packet."); 650 g_gate_swap2h_hdr(&req->r_hdr); 651 652 g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, 653 (intmax_t)req->r_offset, (unsigned)req->r_length); 654 655 /* 656 * Allocate memory for data. 657 */ 658 req->r_data = malloc_waitok(req->r_length); 659 660 /* 661 * Receive data to write for WRITE request. 662 */ 663 if (req->r_cmd == GGATE_CMD_WRITE) { 664 g_gate_log(LOG_DEBUG, "Waiting for %u bytes of data...", 665 req->r_length); 666 data = g_gate_recv(fd, req->r_data, req->r_length, 667 MSG_WAITALL); 668 if (data == -1) { 669 g_gate_xlog("Error while receiving data: %s.", 670 strerror(errno)); 671 } 672 } 673 674 /* 675 * Put the request onto the incoming queue. 676 */ 677 error = pthread_mutex_lock(&inqueue_mtx); 678 assert(error == 0); 679 TAILQ_INSERT_TAIL(&inqueue, req, r_next); 680 error = pthread_cond_signal(&inqueue_cond); 681 assert(error == 0); 682 error = pthread_mutex_unlock(&inqueue_mtx); 683 assert(error == 0); 684 } 685 } 686 687 static void * 688 disk_thread(void *arg) 689 { 690 struct ggd_connection *conn; 691 struct ggd_request *req; 692 ssize_t data; 693 int error, fd; 694 695 conn = arg; 696 g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); 697 fd = conn->c_diskfd; 698 for (;;) { 699 /* 700 * Get a request from the incoming queue. 701 */ 702 error = pthread_mutex_lock(&inqueue_mtx); 703 assert(error == 0); 704 while ((req = TAILQ_FIRST(&inqueue)) == NULL) { 705 error = pthread_cond_wait(&inqueue_cond, &inqueue_mtx); 706 assert(error == 0); 707 } 708 TAILQ_REMOVE(&inqueue, req, r_next); 709 error = pthread_mutex_unlock(&inqueue_mtx); 710 assert(error == 0); 711 712 /* 713 * Check the request. 714 */ 715 assert(req->r_cmd == GGATE_CMD_READ || req->r_cmd == GGATE_CMD_WRITE); 716 assert(req->r_offset + req->r_length <= (uintmax_t)conn->c_mediasize); 717 assert((req->r_offset % conn->c_sectorsize) == 0); 718 assert((req->r_length % conn->c_sectorsize) == 0); 719 720 g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, 721 (intmax_t)req->r_offset, (unsigned)req->r_length); 722 723 /* 724 * Do the request. 725 */ 726 data = 0; 727 switch (req->r_cmd) { 728 case GGATE_CMD_READ: 729 data = pread(fd, req->r_data, req->r_length, 730 req->r_offset); 731 break; 732 case GGATE_CMD_WRITE: 733 data = pwrite(fd, req->r_data, req->r_length, 734 req->r_offset); 735 /* Free data memory here - better sooner. */ 736 free(req->r_data); 737 req->r_data = NULL; 738 break; 739 } 740 if (data != (ssize_t)req->r_length) { 741 /* Report short reads/writes as I/O errors. */ 742 if (errno == 0) 743 errno = EIO; 744 g_gate_log(LOG_ERR, "Disk error: %s", strerror(errno)); 745 req->r_error = errno; 746 if (req->r_data != NULL) { 747 free(req->r_data); 748 req->r_data = NULL; 749 } 750 } 751 752 /* 753 * Put the request onto the outgoing queue. 754 */ 755 error = pthread_mutex_lock(&outqueue_mtx); 756 assert(error == 0); 757 TAILQ_INSERT_TAIL(&outqueue, req, r_next); 758 error = pthread_cond_signal(&outqueue_cond); 759 assert(error == 0); 760 error = pthread_mutex_unlock(&outqueue_mtx); 761 assert(error == 0); 762 } 763 764 /* NOTREACHED */ 765 return (NULL); 766 } 767 768 static void * 769 send_thread(void *arg) 770 { 771 struct ggd_connection *conn; 772 struct ggd_request *req; 773 ssize_t data; 774 int error, fd; 775 776 conn = arg; 777 g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); 778 fd = conn->c_sendfd; 779 for (;;) { 780 /* 781 * Get a request from the outgoing queue. 782 */ 783 error = pthread_mutex_lock(&outqueue_mtx); 784 assert(error == 0); 785 while ((req = TAILQ_FIRST(&outqueue)) == NULL) { 786 error = pthread_cond_wait(&outqueue_cond, 787 &outqueue_mtx); 788 assert(error == 0); 789 } 790 TAILQ_REMOVE(&outqueue, req, r_next); 791 error = pthread_mutex_unlock(&outqueue_mtx); 792 assert(error == 0); 793 794 g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, 795 (intmax_t)req->r_offset, (unsigned)req->r_length); 796 797 /* 798 * Send the request. 799 */ 800 g_gate_swap2n_hdr(&req->r_hdr); 801 if (g_gate_send(fd, &req->r_hdr, sizeof(req->r_hdr), 0) == -1) { 802 g_gate_xlog("Error while sending hdr packet: %s.", 803 strerror(errno)); 804 } 805 g_gate_log(LOG_DEBUG, "Sent hdr packet."); 806 g_gate_swap2h_hdr(&req->r_hdr); 807 if (req->r_data != NULL) { 808 data = g_gate_send(fd, req->r_data, req->r_length, 0); 809 if (data != (ssize_t)req->r_length) { 810 g_gate_xlog("Error while sending data: %s.", 811 strerror(errno)); 812 } 813 g_gate_log(LOG_DEBUG, 814 "Sent %zd bytes (offset=%ju, size=%zu).", data, 815 (uintmax_t)req->r_offset, (size_t)req->r_length); 816 free(req->r_data); 817 } 818 free(req); 819 } 820 821 /* NOTREACHED */ 822 return (NULL); 823 } 824 825 static void 826 log_connection(struct sockaddr *from) 827 { 828 in_addr_t ip; 829 830 ip = htonl(((struct sockaddr_in *)(void *)from)->sin_addr.s_addr); 831 g_gate_log(LOG_INFO, "Connection from: %s.", ip2str(ip)); 832 } 833 834 static int 835 handshake(struct sockaddr *from, int sfd) 836 { 837 struct g_gate_version ver; 838 struct g_gate_cinit cinit; 839 struct g_gate_sinit sinit; 840 struct ggd_connection *conn; 841 struct ggd_export *ex; 842 ssize_t data; 843 844 log_connection(from); 845 /* 846 * Phase 1: Version verification. 847 */ 848 g_gate_log(LOG_DEBUG, "Receiving version packet."); 849 data = g_gate_recv(sfd, &ver, sizeof(ver), MSG_WAITALL); 850 g_gate_swap2h_version(&ver); 851 if (data != sizeof(ver)) { 852 g_gate_log(LOG_WARNING, "Malformed version packet."); 853 return (0); 854 } 855 g_gate_log(LOG_DEBUG, "Version packet received."); 856 if (memcmp(ver.gv_magic, GGATE_MAGIC, strlen(GGATE_MAGIC)) != 0) { 857 g_gate_log(LOG_WARNING, "Invalid magic field."); 858 return (0); 859 } 860 if (ver.gv_version != GGATE_VERSION) { 861 g_gate_log(LOG_WARNING, "Version %u is not supported.", 862 ver.gv_version); 863 return (0); 864 } 865 ver.gv_error = 0; 866 g_gate_swap2n_version(&ver); 867 data = g_gate_send(sfd, &ver, sizeof(ver), 0); 868 g_gate_swap2h_version(&ver); 869 if (data == -1) { 870 sendfail(sfd, errno, "Error while sending version packet: %s.", 871 strerror(errno)); 872 return (0); 873 } 874 875 /* 876 * Phase 2: Request verification. 877 */ 878 g_gate_log(LOG_DEBUG, "Receiving initial packet."); 879 data = g_gate_recv(sfd, &cinit, sizeof(cinit), MSG_WAITALL); 880 g_gate_swap2h_cinit(&cinit); 881 if (data != sizeof(cinit)) { 882 g_gate_log(LOG_WARNING, "Malformed initial packet."); 883 return (0); 884 } 885 g_gate_log(LOG_DEBUG, "Initial packet received."); 886 conn = connection_find(&cinit); 887 if (conn != NULL) { 888 /* 889 * Connection should already exists. 890 */ 891 g_gate_log(LOG_DEBUG, "Found existing connection (token=%lu).", 892 (unsigned long)conn->c_token); 893 if (connection_add(conn, &cinit, from, sfd) == -1) { 894 connection_remove(conn); 895 return (0); 896 } 897 } else { 898 /* 899 * New connection, allocate space. 900 */ 901 conn = connection_new(&cinit, from, sfd); 902 if (conn == NULL) { 903 sendfail(sfd, ENOMEM, 904 "Cannot allocate new connection."); 905 return (0); 906 } 907 g_gate_log(LOG_DEBUG, "New connection created (token=%lu).", 908 (unsigned long)conn->c_token); 909 } 910 911 ex = exports_find(from, &cinit, conn); 912 if (ex == NULL) { 913 sendfail(sfd, errno, NULL); 914 connection_remove(conn); 915 return (0); 916 } 917 if (conn->c_mediasize == 0) { 918 conn->c_mediasize = g_gate_mediasize(conn->c_diskfd); 919 conn->c_sectorsize = g_gate_sectorsize(conn->c_diskfd); 920 } 921 sinit.gs_mediasize = conn->c_mediasize; 922 sinit.gs_sectorsize = conn->c_sectorsize; 923 sinit.gs_error = 0; 924 925 g_gate_log(LOG_DEBUG, "Sending initial packet."); 926 927 g_gate_swap2n_sinit(&sinit); 928 data = g_gate_send(sfd, &sinit, sizeof(sinit), 0); 929 g_gate_swap2h_sinit(&sinit); 930 if (data == -1) { 931 sendfail(sfd, errno, "Error while sending initial packet: %s.", 932 strerror(errno)); 933 return (0); 934 } 935 936 if (connection_ready(conn)) { 937 connection_launch(conn); 938 connection_remove(conn); 939 } 940 return (1); 941 } 942 943 static void 944 huphandler(int sig __unused) 945 { 946 947 got_sighup = 1; 948 } 949 950 int 951 main(int argc, char *argv[]) 952 { 953 const char *ggated_pidfile = _PATH_VARRUN "/ggated.pid"; 954 struct pidfh *pfh; 955 struct sockaddr_in serv; 956 struct sockaddr from; 957 socklen_t fromlen; 958 pid_t otherpid; 959 int ch, sfd, tmpsfd; 960 unsigned port; 961 962 bindaddr = htonl(INADDR_ANY); 963 port = G_GATE_PORT; 964 while ((ch = getopt(argc, argv, "a:hnp:F:R:S:v")) != -1) { 965 switch (ch) { 966 case 'a': 967 bindaddr = g_gate_str2ip(optarg); 968 if (bindaddr == INADDR_NONE) { 969 errx(EXIT_FAILURE, 970 "Invalid IP/host name to bind to."); 971 } 972 break; 973 case 'F': 974 ggated_pidfile = optarg; 975 break; 976 case 'n': 977 nagle = 0; 978 break; 979 case 'p': 980 errno = 0; 981 port = strtoul(optarg, NULL, 10); 982 if (port == 0 && errno != 0) 983 errx(EXIT_FAILURE, "Invalid port."); 984 break; 985 case 'R': 986 errno = 0; 987 rcvbuf = strtoul(optarg, NULL, 10); 988 if (rcvbuf == 0 && errno != 0) 989 errx(EXIT_FAILURE, "Invalid rcvbuf."); 990 break; 991 case 'S': 992 errno = 0; 993 sndbuf = strtoul(optarg, NULL, 10); 994 if (sndbuf == 0 && errno != 0) 995 errx(EXIT_FAILURE, "Invalid sndbuf."); 996 break; 997 case 'v': 998 g_gate_verbose++; 999 break; 1000 case 'h': 1001 default: 1002 usage(); 1003 } 1004 } 1005 argc -= optind; 1006 argv += optind; 1007 1008 if (argv[0] != NULL) 1009 exports_file = argv[0]; 1010 exports_get(); 1011 1012 pfh = pidfile_open(ggated_pidfile, 0600, &otherpid); 1013 if (pfh == NULL) { 1014 if (errno == EEXIST) { 1015 errx(EXIT_FAILURE, "Daemon already running, pid: %jd.", 1016 (intmax_t)otherpid); 1017 } 1018 err(EXIT_FAILURE, "Cannot open/create pidfile"); 1019 } 1020 1021 if (!g_gate_verbose) { 1022 /* Run in daemon mode. */ 1023 if (daemon(0, 0) == -1) 1024 g_gate_xlog("Cannot daemonize: %s", strerror(errno)); 1025 } 1026 1027 pidfile_write(pfh); 1028 1029 signal(SIGCHLD, SIG_IGN); 1030 1031 sfd = socket(AF_INET, SOCK_STREAM, 0); 1032 if (sfd == -1) 1033 g_gate_xlog("Cannot open stream socket: %s.", strerror(errno)); 1034 bzero(&serv, sizeof(serv)); 1035 serv.sin_family = AF_INET; 1036 serv.sin_addr.s_addr = bindaddr; 1037 serv.sin_port = htons(port); 1038 1039 g_gate_socket_settings(sfd); 1040 1041 if (bind(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) 1042 g_gate_xlog("bind(): %s.", strerror(errno)); 1043 if (listen(sfd, 5) == -1) 1044 g_gate_xlog("listen(): %s.", strerror(errno)); 1045 1046 g_gate_log(LOG_INFO, "Listen on port: %d.", port); 1047 1048 signal(SIGHUP, huphandler); 1049 1050 for (;;) { 1051 fromlen = sizeof(from); 1052 tmpsfd = accept(sfd, &from, &fromlen); 1053 if (tmpsfd == -1) 1054 g_gate_xlog("accept(): %s.", strerror(errno)); 1055 1056 if (got_sighup) { 1057 got_sighup = 0; 1058 exports_get(); 1059 } 1060 1061 if (!handshake(&from, tmpsfd)) 1062 close(tmpsfd); 1063 } 1064 close(sfd); 1065 pidfile_remove(pfh); 1066 exit(EXIT_SUCCESS); 1067 } 1068