1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD 3 * 4 * Copyright (c) 2004 Pawel Jakub Dawidek <pjd@FreeBSD.org> 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 * 28 * $FreeBSD$ 29 */ 30 31 #include <sys/param.h> 32 #include <sys/bio.h> 33 #include <sys/disk.h> 34 #include <sys/endian.h> 35 #include <sys/ioctl.h> 36 #include <sys/queue.h> 37 #include <sys/socket.h> 38 #include <sys/stat.h> 39 #include <sys/time.h> 40 #include <arpa/inet.h> 41 #include <netinet/in.h> 42 #include <netinet/tcp.h> 43 #include <assert.h> 44 #include <err.h> 45 #include <errno.h> 46 #include <fcntl.h> 47 #include <libgen.h> 48 #include <libutil.h> 49 #include <paths.h> 50 #include <pthread.h> 51 #include <signal.h> 52 #include <stdarg.h> 53 #include <stdio.h> 54 #include <stdlib.h> 55 #include <stdint.h> 56 #include <string.h> 57 #include <syslog.h> 58 #include <unistd.h> 59 60 #include "ggate.h" 61 62 63 #define GGATED_EXPORT_FILE "/etc/gg.exports" 64 65 struct ggd_connection { 66 off_t c_mediasize; 67 unsigned c_sectorsize; 68 unsigned c_flags; /* flags (RO/RW) */ 69 int c_diskfd; 70 int c_sendfd; 71 int c_recvfd; 72 time_t c_birthtime; 73 char *c_path; 74 uint64_t c_token; 75 in_addr_t c_srcip; 76 LIST_ENTRY(ggd_connection) c_next; 77 }; 78 79 struct ggd_request { 80 struct g_gate_hdr r_hdr; 81 char *r_data; 82 TAILQ_ENTRY(ggd_request) r_next; 83 }; 84 #define r_cmd r_hdr.gh_cmd 85 #define r_offset r_hdr.gh_offset 86 #define r_length r_hdr.gh_length 87 #define r_error r_hdr.gh_error 88 89 struct ggd_export { 90 char *e_path; /* path to device/file */ 91 in_addr_t e_ip; /* remote IP address */ 92 in_addr_t e_mask; /* IP mask */ 93 unsigned e_flags; /* flags (RO/RW) */ 94 SLIST_ENTRY(ggd_export) e_next; 95 }; 96 97 static const char *exports_file = GGATED_EXPORT_FILE; 98 static int got_sighup = 0; 99 static in_addr_t bindaddr; 100 101 static TAILQ_HEAD(, ggd_request) inqueue = TAILQ_HEAD_INITIALIZER(inqueue); 102 static TAILQ_HEAD(, ggd_request) outqueue = TAILQ_HEAD_INITIALIZER(outqueue); 103 static pthread_mutex_t inqueue_mtx, outqueue_mtx; 104 static pthread_cond_t inqueue_cond, outqueue_cond; 105 106 static SLIST_HEAD(, ggd_export) exports = SLIST_HEAD_INITIALIZER(exports); 107 static LIST_HEAD(, ggd_connection) connections = LIST_HEAD_INITIALIZER(connections); 108 109 static void *recv_thread(void *arg); 110 static void *disk_thread(void *arg); 111 static void *send_thread(void *arg); 112 113 static void 114 usage(void) 115 { 116 117 fprintf(stderr, "usage: %s [-nv] [-a address] [-F pidfile] [-p port] " 118 "[-R rcvbuf] [-S sndbuf] [exports file]\n", getprogname()); 119 exit(EXIT_FAILURE); 120 } 121 122 static char * 123 ip2str(in_addr_t ip) 124 { 125 static char sip[16]; 126 127 snprintf(sip, sizeof(sip), "%u.%u.%u.%u", 128 ((ip >> 24) & 0xff), 129 ((ip >> 16) & 0xff), 130 ((ip >> 8) & 0xff), 131 (ip & 0xff)); 132 return (sip); 133 } 134 135 static in_addr_t 136 countmask(unsigned m) 137 { 138 in_addr_t mask; 139 140 if (m == 0) { 141 mask = 0x0; 142 } else { 143 mask = 1 << (32 - m); 144 mask--; 145 mask = ~mask; 146 } 147 return (mask); 148 } 149 150 static void 151 line_parse(char *line, unsigned lineno) 152 { 153 struct ggd_export *ex; 154 char *word, *path, *sflags; 155 unsigned flags, i, vmask; 156 in_addr_t ip, mask; 157 158 ip = mask = flags = vmask = 0; 159 path = NULL; 160 sflags = NULL; 161 162 for (i = 0, word = strtok(line, " \t"); word != NULL; 163 i++, word = strtok(NULL, " \t")) { 164 switch (i) { 165 case 0: /* IP address or host name */ 166 ip = g_gate_str2ip(strsep(&word, "/")); 167 if (ip == INADDR_NONE) { 168 g_gate_xlog("Invalid IP/host name at line %u.", 169 lineno); 170 } 171 ip = ntohl(ip); 172 if (word == NULL) 173 vmask = 32; 174 else { 175 errno = 0; 176 vmask = strtoul(word, NULL, 10); 177 if (vmask == 0 && errno != 0) { 178 g_gate_xlog("Invalid IP mask value at " 179 "line %u.", lineno); 180 } 181 if ((unsigned)vmask > 32) { 182 g_gate_xlog("Invalid IP mask value at line %u.", 183 lineno); 184 } 185 } 186 mask = countmask(vmask); 187 break; 188 case 1: /* flags */ 189 if (strcasecmp("rd", word) == 0 || 190 strcasecmp("ro", word) == 0) { 191 flags = O_RDONLY; 192 } else if (strcasecmp("wo", word) == 0) { 193 flags = O_WRONLY; 194 } else if (strcasecmp("rw", word) == 0) { 195 flags = O_RDWR; 196 } else { 197 g_gate_xlog("Invalid value in flags field at " 198 "line %u.", lineno); 199 } 200 sflags = word; 201 break; 202 case 2: /* path */ 203 if (strlen(word) >= MAXPATHLEN) { 204 g_gate_xlog("Path too long at line %u. ", 205 lineno); 206 } 207 path = word; 208 break; 209 default: 210 g_gate_xlog("Too many arguments at line %u. ", lineno); 211 } 212 } 213 if (i != 3) 214 g_gate_xlog("Too few arguments at line %u.", lineno); 215 216 ex = malloc(sizeof(*ex)); 217 if (ex == NULL) 218 g_gate_xlog("Not enough memory."); 219 ex->e_path = strdup(path); 220 if (ex->e_path == NULL) 221 g_gate_xlog("Not enough memory."); 222 223 /* Made 'and' here. */ 224 ex->e_ip = (ip & mask); 225 ex->e_mask = mask; 226 ex->e_flags = flags; 227 228 SLIST_INSERT_HEAD(&exports, ex, e_next); 229 230 g_gate_log(LOG_DEBUG, "Added %s/%u %s %s to exports list.", 231 ip2str(ex->e_ip), vmask, path, sflags); 232 } 233 234 static void 235 exports_clear(void) 236 { 237 struct ggd_export *ex; 238 239 while (!SLIST_EMPTY(&exports)) { 240 ex = SLIST_FIRST(&exports); 241 SLIST_REMOVE_HEAD(&exports, e_next); 242 free(ex); 243 } 244 } 245 246 #define EXPORTS_LINE_SIZE 2048 247 static void 248 exports_get(void) 249 { 250 char buf[EXPORTS_LINE_SIZE], *line; 251 unsigned lineno = 0, objs = 0, len; 252 FILE *fd; 253 254 exports_clear(); 255 256 fd = fopen(exports_file, "r"); 257 if (fd == NULL) { 258 g_gate_xlog("Cannot open exports file (%s): %s.", exports_file, 259 strerror(errno)); 260 } 261 262 g_gate_log(LOG_INFO, "Reading exports file (%s).", exports_file); 263 264 for (;;) { 265 if (fgets(buf, sizeof(buf), fd) == NULL) { 266 if (feof(fd)) 267 break; 268 269 g_gate_xlog("Error while reading exports file: %s.", 270 strerror(errno)); 271 } 272 273 /* Increase line count. */ 274 lineno++; 275 276 /* Skip spaces and tabs. */ 277 for (line = buf; *line == ' ' || *line == '\t'; ++line) 278 ; 279 280 /* Empty line, comment or empty line at the end of file. */ 281 if (*line == '\n' || *line == '#' || *line == '\0') 282 continue; 283 284 len = strlen(line); 285 if (line[len - 1] == '\n') { 286 /* Remove new line char. */ 287 line[len - 1] = '\0'; 288 } else { 289 if (!feof(fd)) 290 g_gate_xlog("Line %u too long.", lineno); 291 } 292 293 line_parse(line, lineno); 294 objs++; 295 } 296 297 fclose(fd); 298 299 if (objs == 0) 300 g_gate_xlog("There are no objects to export."); 301 302 g_gate_log(LOG_INFO, "Exporting %u object(s).", objs); 303 } 304 305 static int 306 exports_check(struct ggd_export *ex, struct g_gate_cinit *cinit, 307 struct ggd_connection *conn) 308 { 309 char ipmask[32]; /* 32 == strlen("xxx.xxx.xxx.xxx/xxx.xxx.xxx.xxx")+1 */ 310 int error = 0, flags; 311 312 strlcpy(ipmask, ip2str(ex->e_ip), sizeof(ipmask)); 313 strlcat(ipmask, "/", sizeof(ipmask)); 314 strlcat(ipmask, ip2str(ex->e_mask), sizeof(ipmask)); 315 if ((cinit->gc_flags & GGATE_FLAG_RDONLY) != 0) { 316 if (ex->e_flags == O_WRONLY) { 317 g_gate_log(LOG_WARNING, "Read-only access requested, " 318 "but %s (%s) is exported write-only.", ex->e_path, 319 ipmask); 320 return (EPERM); 321 } else { 322 conn->c_flags |= GGATE_FLAG_RDONLY; 323 } 324 } else if ((cinit->gc_flags & GGATE_FLAG_WRONLY) != 0) { 325 if (ex->e_flags == O_RDONLY) { 326 g_gate_log(LOG_WARNING, "Write-only access requested, " 327 "but %s (%s) is exported read-only.", ex->e_path, 328 ipmask); 329 return (EPERM); 330 } else { 331 conn->c_flags |= GGATE_FLAG_WRONLY; 332 } 333 } else { 334 if (ex->e_flags == O_RDONLY) { 335 g_gate_log(LOG_WARNING, "Read-write access requested, " 336 "but %s (%s) is exported read-only.", ex->e_path, 337 ipmask); 338 return (EPERM); 339 } else if (ex->e_flags == O_WRONLY) { 340 g_gate_log(LOG_WARNING, "Read-write access requested, " 341 "but %s (%s) is exported write-only.", ex->e_path, 342 ipmask); 343 return (EPERM); 344 } 345 } 346 if ((conn->c_flags & GGATE_FLAG_RDONLY) != 0) 347 flags = O_RDONLY; 348 else if ((conn->c_flags & GGATE_FLAG_WRONLY) != 0) 349 flags = O_WRONLY; 350 else 351 flags = O_RDWR; 352 conn->c_diskfd = open(ex->e_path, flags); 353 if (conn->c_diskfd == -1) { 354 error = errno; 355 g_gate_log(LOG_ERR, "Cannot open %s: %s.", ex->e_path, 356 strerror(error)); 357 return (error); 358 } 359 return (0); 360 } 361 362 static struct ggd_export * 363 exports_find(struct sockaddr *s, struct g_gate_cinit *cinit, 364 struct ggd_connection *conn) 365 { 366 struct ggd_export *ex; 367 in_addr_t ip; 368 int error; 369 370 ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); 371 SLIST_FOREACH(ex, &exports, e_next) { 372 if ((ip & ex->e_mask) != ex->e_ip) { 373 g_gate_log(LOG_DEBUG, "exports[%s]: IP mismatch.", 374 ex->e_path); 375 continue; 376 } 377 if (strcmp(cinit->gc_path, ex->e_path) != 0) { 378 g_gate_log(LOG_DEBUG, "exports[%s]: Path mismatch.", 379 ex->e_path); 380 continue; 381 } 382 error = exports_check(ex, cinit, conn); 383 if (error == 0) 384 return (ex); 385 else { 386 errno = error; 387 return (NULL); 388 } 389 } 390 g_gate_log(LOG_WARNING, "Unauthorized connection from: %s.", 391 ip2str(ip)); 392 errno = EPERM; 393 return (NULL); 394 } 395 396 /* 397 * Remove timed out connections. 398 */ 399 static void 400 connection_cleanups(void) 401 { 402 struct ggd_connection *conn, *tconn; 403 time_t now; 404 405 time(&now); 406 LIST_FOREACH_SAFE(conn, &connections, c_next, tconn) { 407 if (now - conn->c_birthtime > 10) { 408 LIST_REMOVE(conn, c_next); 409 g_gate_log(LOG_NOTICE, 410 "Connection from %s [%s] removed.", 411 ip2str(conn->c_srcip), conn->c_path); 412 close(conn->c_diskfd); 413 close(conn->c_sendfd); 414 close(conn->c_recvfd); 415 free(conn->c_path); 416 free(conn); 417 } 418 } 419 } 420 421 static struct ggd_connection * 422 connection_find(struct g_gate_cinit *cinit) 423 { 424 struct ggd_connection *conn; 425 426 LIST_FOREACH(conn, &connections, c_next) { 427 if (conn->c_token == cinit->gc_token) 428 break; 429 } 430 return (conn); 431 } 432 433 static struct ggd_connection * 434 connection_new(struct g_gate_cinit *cinit, struct sockaddr *s, int sfd) 435 { 436 struct ggd_connection *conn; 437 in_addr_t ip; 438 439 /* 440 * First, look for old connections. 441 * We probably should do it every X seconds, but what for? 442 * It is only dangerous if an attacker wants to overload connections 443 * queue, so here is a good place to do the cleanups. 444 */ 445 connection_cleanups(); 446 447 conn = malloc(sizeof(*conn)); 448 if (conn == NULL) 449 return (NULL); 450 conn->c_path = strdup(cinit->gc_path); 451 if (conn->c_path == NULL) { 452 free(conn); 453 return (NULL); 454 } 455 conn->c_token = cinit->gc_token; 456 ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); 457 conn->c_srcip = ip; 458 conn->c_sendfd = conn->c_recvfd = -1; 459 if ((cinit->gc_flags & GGATE_FLAG_SEND) != 0) 460 conn->c_sendfd = sfd; 461 else 462 conn->c_recvfd = sfd; 463 conn->c_mediasize = 0; 464 conn->c_sectorsize = 0; 465 time(&conn->c_birthtime); 466 conn->c_flags = cinit->gc_flags; 467 LIST_INSERT_HEAD(&connections, conn, c_next); 468 g_gate_log(LOG_DEBUG, "Connection created [%s, %s].", ip2str(ip), 469 conn->c_path); 470 return (conn); 471 } 472 473 static int 474 connection_add(struct ggd_connection *conn, struct g_gate_cinit *cinit, 475 struct sockaddr *s, int sfd) 476 { 477 in_addr_t ip; 478 479 ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); 480 if ((cinit->gc_flags & GGATE_FLAG_SEND) != 0) { 481 if (conn->c_sendfd != -1) { 482 g_gate_log(LOG_WARNING, 483 "Send socket already exists [%s, %s].", ip2str(ip), 484 conn->c_path); 485 return (EEXIST); 486 } 487 conn->c_sendfd = sfd; 488 } else { 489 if (conn->c_recvfd != -1) { 490 g_gate_log(LOG_WARNING, 491 "Receive socket already exists [%s, %s].", 492 ip2str(ip), conn->c_path); 493 return (EEXIST); 494 } 495 conn->c_recvfd = sfd; 496 } 497 g_gate_log(LOG_DEBUG, "Connection added [%s, %s].", ip2str(ip), 498 conn->c_path); 499 return (0); 500 } 501 502 /* 503 * Remove one socket from the given connection or the whole 504 * connection if sfd == -1. 505 */ 506 static void 507 connection_remove(struct ggd_connection *conn) 508 { 509 510 LIST_REMOVE(conn, c_next); 511 g_gate_log(LOG_DEBUG, "Connection removed [%s %s].", 512 ip2str(conn->c_srcip), conn->c_path); 513 if (conn->c_sendfd != -1) 514 close(conn->c_sendfd); 515 if (conn->c_recvfd != -1) 516 close(conn->c_recvfd); 517 free(conn->c_path); 518 free(conn); 519 } 520 521 static int 522 connection_ready(struct ggd_connection *conn) 523 { 524 525 return (conn->c_sendfd != -1 && conn->c_recvfd != -1); 526 } 527 528 static void 529 connection_launch(struct ggd_connection *conn) 530 { 531 pthread_t td; 532 int error, pid; 533 534 pid = fork(); 535 if (pid > 0) 536 return; 537 else if (pid == -1) { 538 g_gate_log(LOG_ERR, "Cannot fork: %s.", strerror(errno)); 539 return; 540 } 541 g_gate_log(LOG_DEBUG, "Process created [%s].", conn->c_path); 542 543 /* 544 * Create condition variables and mutexes for in-queue and out-queue 545 * synchronization. 546 */ 547 error = pthread_mutex_init(&inqueue_mtx, NULL); 548 if (error != 0) { 549 g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.", 550 strerror(error)); 551 } 552 error = pthread_cond_init(&inqueue_cond, NULL); 553 if (error != 0) { 554 g_gate_xlog("pthread_cond_init(inqueue_cond): %s.", 555 strerror(error)); 556 } 557 error = pthread_mutex_init(&outqueue_mtx, NULL); 558 if (error != 0) { 559 g_gate_xlog("pthread_mutex_init(outqueue_mtx): %s.", 560 strerror(error)); 561 } 562 error = pthread_cond_init(&outqueue_cond, NULL); 563 if (error != 0) { 564 g_gate_xlog("pthread_cond_init(outqueue_cond): %s.", 565 strerror(error)); 566 } 567 568 /* 569 * Create threads: 570 * recvtd - thread for receiving I/O request 571 * diskio - thread for doing I/O request 572 * sendtd - thread for sending I/O requests back 573 */ 574 error = pthread_create(&td, NULL, send_thread, conn); 575 if (error != 0) { 576 g_gate_xlog("pthread_create(send_thread): %s.", 577 strerror(error)); 578 } 579 error = pthread_create(&td, NULL, recv_thread, conn); 580 if (error != 0) { 581 g_gate_xlog("pthread_create(recv_thread): %s.", 582 strerror(error)); 583 } 584 disk_thread(conn); 585 } 586 587 static void 588 sendfail(int sfd, int error, const char *fmt, ...) 589 { 590 struct g_gate_sinit sinit; 591 va_list ap; 592 ssize_t data; 593 594 memset(&sinit, 0, sizeof(sinit)); 595 sinit.gs_error = error; 596 g_gate_swap2n_sinit(&sinit); 597 data = g_gate_send(sfd, &sinit, sizeof(sinit), 0); 598 g_gate_swap2h_sinit(&sinit); 599 if (data != sizeof(sinit)) { 600 g_gate_log(LOG_WARNING, "Cannot send initial packet: %s.", 601 strerror(errno)); 602 return; 603 } 604 if (fmt != NULL) { 605 va_start(ap, fmt); 606 g_gate_vlog(LOG_WARNING, fmt, ap); 607 va_end(ap); 608 } 609 } 610 611 static void * 612 malloc_waitok(size_t size) 613 { 614 void *p; 615 616 while ((p = malloc(size)) == NULL) { 617 g_gate_log(LOG_DEBUG, "Cannot allocate %zu bytes.", size); 618 sleep(1); 619 } 620 return (p); 621 } 622 623 static void * 624 recv_thread(void *arg) 625 { 626 struct ggd_connection *conn; 627 struct ggd_request *req; 628 ssize_t data; 629 int error, fd; 630 631 conn = arg; 632 g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); 633 fd = conn->c_recvfd; 634 for (;;) { 635 /* 636 * Get header packet. 637 */ 638 req = malloc_waitok(sizeof(*req)); 639 data = g_gate_recv(fd, &req->r_hdr, sizeof(req->r_hdr), 640 MSG_WAITALL); 641 if (data == 0) { 642 g_gate_log(LOG_DEBUG, "Process %u exiting.", getpid()); 643 exit(EXIT_SUCCESS); 644 } else if (data == -1) { 645 g_gate_xlog("Error while receiving hdr packet: %s.", 646 strerror(errno)); 647 } else if (data != sizeof(req->r_hdr)) { 648 g_gate_xlog("Malformed hdr packet received."); 649 } 650 g_gate_log(LOG_DEBUG, "Received hdr packet."); 651 g_gate_swap2h_hdr(&req->r_hdr); 652 653 g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, 654 (intmax_t)req->r_offset, (unsigned)req->r_length); 655 656 /* 657 * Allocate memory for data. 658 */ 659 req->r_data = malloc_waitok(req->r_length); 660 661 /* 662 * Receive data to write for WRITE request. 663 */ 664 if (req->r_cmd == GGATE_CMD_WRITE) { 665 g_gate_log(LOG_DEBUG, "Waiting for %u bytes of data...", 666 req->r_length); 667 data = g_gate_recv(fd, req->r_data, req->r_length, 668 MSG_WAITALL); 669 if (data == -1) { 670 g_gate_xlog("Error while receiving data: %s.", 671 strerror(errno)); 672 } 673 } 674 675 /* 676 * Put the request onto the incoming queue. 677 */ 678 error = pthread_mutex_lock(&inqueue_mtx); 679 assert(error == 0); 680 TAILQ_INSERT_TAIL(&inqueue, req, r_next); 681 error = pthread_cond_signal(&inqueue_cond); 682 assert(error == 0); 683 error = pthread_mutex_unlock(&inqueue_mtx); 684 assert(error == 0); 685 } 686 } 687 688 static void * 689 disk_thread(void *arg) 690 { 691 struct ggd_connection *conn; 692 struct ggd_request *req; 693 ssize_t data; 694 int error, fd; 695 696 conn = arg; 697 g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); 698 fd = conn->c_diskfd; 699 for (;;) { 700 /* 701 * Get a request from the incoming queue. 702 */ 703 error = pthread_mutex_lock(&inqueue_mtx); 704 assert(error == 0); 705 while ((req = TAILQ_FIRST(&inqueue)) == NULL) { 706 error = pthread_cond_wait(&inqueue_cond, &inqueue_mtx); 707 assert(error == 0); 708 } 709 TAILQ_REMOVE(&inqueue, req, r_next); 710 error = pthread_mutex_unlock(&inqueue_mtx); 711 assert(error == 0); 712 713 /* 714 * Check the request. 715 */ 716 assert(req->r_cmd == GGATE_CMD_READ || req->r_cmd == GGATE_CMD_WRITE); 717 assert(req->r_offset + req->r_length <= (uintmax_t)conn->c_mediasize); 718 assert((req->r_offset % conn->c_sectorsize) == 0); 719 assert((req->r_length % conn->c_sectorsize) == 0); 720 721 g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, 722 (intmax_t)req->r_offset, (unsigned)req->r_length); 723 724 /* 725 * Do the request. 726 */ 727 data = 0; 728 switch (req->r_cmd) { 729 case GGATE_CMD_READ: 730 data = pread(fd, req->r_data, req->r_length, 731 req->r_offset); 732 break; 733 case GGATE_CMD_WRITE: 734 data = pwrite(fd, req->r_data, req->r_length, 735 req->r_offset); 736 /* Free data memory here - better sooner. */ 737 free(req->r_data); 738 req->r_data = NULL; 739 break; 740 } 741 if (data != (ssize_t)req->r_length) { 742 /* Report short reads/writes as I/O errors. */ 743 if (errno == 0) 744 errno = EIO; 745 g_gate_log(LOG_ERR, "Disk error: %s", strerror(errno)); 746 req->r_error = errno; 747 if (req->r_data != NULL) { 748 free(req->r_data); 749 req->r_data = NULL; 750 } 751 } 752 753 /* 754 * Put the request onto the outgoing queue. 755 */ 756 error = pthread_mutex_lock(&outqueue_mtx); 757 assert(error == 0); 758 TAILQ_INSERT_TAIL(&outqueue, req, r_next); 759 error = pthread_cond_signal(&outqueue_cond); 760 assert(error == 0); 761 error = pthread_mutex_unlock(&outqueue_mtx); 762 assert(error == 0); 763 } 764 765 /* NOTREACHED */ 766 return (NULL); 767 } 768 769 static void * 770 send_thread(void *arg) 771 { 772 struct ggd_connection *conn; 773 struct ggd_request *req; 774 ssize_t data; 775 int error, fd; 776 777 conn = arg; 778 g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); 779 fd = conn->c_sendfd; 780 for (;;) { 781 /* 782 * Get a request from the outgoing queue. 783 */ 784 error = pthread_mutex_lock(&outqueue_mtx); 785 assert(error == 0); 786 while ((req = TAILQ_FIRST(&outqueue)) == NULL) { 787 error = pthread_cond_wait(&outqueue_cond, 788 &outqueue_mtx); 789 assert(error == 0); 790 } 791 TAILQ_REMOVE(&outqueue, req, r_next); 792 error = pthread_mutex_unlock(&outqueue_mtx); 793 assert(error == 0); 794 795 g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, 796 (intmax_t)req->r_offset, (unsigned)req->r_length); 797 798 /* 799 * Send the request. 800 */ 801 g_gate_swap2n_hdr(&req->r_hdr); 802 if (g_gate_send(fd, &req->r_hdr, sizeof(req->r_hdr), 0) == -1) { 803 g_gate_xlog("Error while sending hdr packet: %s.", 804 strerror(errno)); 805 } 806 g_gate_log(LOG_DEBUG, "Sent hdr packet."); 807 g_gate_swap2h_hdr(&req->r_hdr); 808 if (req->r_data != NULL) { 809 data = g_gate_send(fd, req->r_data, req->r_length, 0); 810 if (data != (ssize_t)req->r_length) { 811 g_gate_xlog("Error while sending data: %s.", 812 strerror(errno)); 813 } 814 g_gate_log(LOG_DEBUG, 815 "Sent %zd bytes (offset=%ju, size=%zu).", data, 816 (uintmax_t)req->r_offset, (size_t)req->r_length); 817 free(req->r_data); 818 } 819 free(req); 820 } 821 822 /* NOTREACHED */ 823 return (NULL); 824 } 825 826 static void 827 log_connection(struct sockaddr *from) 828 { 829 in_addr_t ip; 830 831 ip = htonl(((struct sockaddr_in *)(void *)from)->sin_addr.s_addr); 832 g_gate_log(LOG_INFO, "Connection from: %s.", ip2str(ip)); 833 } 834 835 static int 836 handshake(struct sockaddr *from, int sfd) 837 { 838 struct g_gate_version ver; 839 struct g_gate_cinit cinit; 840 struct g_gate_sinit sinit; 841 struct ggd_connection *conn; 842 struct ggd_export *ex; 843 ssize_t data; 844 845 log_connection(from); 846 /* 847 * Phase 1: Version verification. 848 */ 849 g_gate_log(LOG_DEBUG, "Receiving version packet."); 850 data = g_gate_recv(sfd, &ver, sizeof(ver), MSG_WAITALL); 851 g_gate_swap2h_version(&ver); 852 if (data != sizeof(ver)) { 853 g_gate_log(LOG_WARNING, "Malformed version packet."); 854 return (0); 855 } 856 g_gate_log(LOG_DEBUG, "Version packet received."); 857 if (memcmp(ver.gv_magic, GGATE_MAGIC, strlen(GGATE_MAGIC)) != 0) { 858 g_gate_log(LOG_WARNING, "Invalid magic field."); 859 return (0); 860 } 861 if (ver.gv_version != GGATE_VERSION) { 862 g_gate_log(LOG_WARNING, "Version %u is not supported.", 863 ver.gv_version); 864 return (0); 865 } 866 ver.gv_error = 0; 867 g_gate_swap2n_version(&ver); 868 data = g_gate_send(sfd, &ver, sizeof(ver), 0); 869 g_gate_swap2h_version(&ver); 870 if (data == -1) { 871 sendfail(sfd, errno, "Error while sending version packet: %s.", 872 strerror(errno)); 873 return (0); 874 } 875 876 /* 877 * Phase 2: Request verification. 878 */ 879 g_gate_log(LOG_DEBUG, "Receiving initial packet."); 880 data = g_gate_recv(sfd, &cinit, sizeof(cinit), MSG_WAITALL); 881 g_gate_swap2h_cinit(&cinit); 882 if (data != sizeof(cinit)) { 883 g_gate_log(LOG_WARNING, "Malformed initial packet."); 884 return (0); 885 } 886 g_gate_log(LOG_DEBUG, "Initial packet received."); 887 conn = connection_find(&cinit); 888 if (conn != NULL) { 889 /* 890 * Connection should already exists. 891 */ 892 g_gate_log(LOG_DEBUG, "Found existing connection (token=%lu).", 893 (unsigned long)conn->c_token); 894 if (connection_add(conn, &cinit, from, sfd) == -1) { 895 connection_remove(conn); 896 return (0); 897 } 898 } else { 899 /* 900 * New connection, allocate space. 901 */ 902 conn = connection_new(&cinit, from, sfd); 903 if (conn == NULL) { 904 sendfail(sfd, ENOMEM, 905 "Cannot allocate new connection."); 906 return (0); 907 } 908 g_gate_log(LOG_DEBUG, "New connection created (token=%lu).", 909 (unsigned long)conn->c_token); 910 } 911 912 ex = exports_find(from, &cinit, conn); 913 if (ex == NULL) { 914 sendfail(sfd, errno, NULL); 915 connection_remove(conn); 916 return (0); 917 } 918 if (conn->c_mediasize == 0) { 919 conn->c_mediasize = g_gate_mediasize(conn->c_diskfd); 920 conn->c_sectorsize = g_gate_sectorsize(conn->c_diskfd); 921 } 922 sinit.gs_mediasize = conn->c_mediasize; 923 sinit.gs_sectorsize = conn->c_sectorsize; 924 sinit.gs_error = 0; 925 926 g_gate_log(LOG_DEBUG, "Sending initial packet."); 927 928 g_gate_swap2n_sinit(&sinit); 929 data = g_gate_send(sfd, &sinit, sizeof(sinit), 0); 930 g_gate_swap2h_sinit(&sinit); 931 if (data == -1) { 932 sendfail(sfd, errno, "Error while sending initial packet: %s.", 933 strerror(errno)); 934 return (0); 935 } 936 937 if (connection_ready(conn)) { 938 connection_launch(conn); 939 connection_remove(conn); 940 } 941 return (1); 942 } 943 944 static void 945 huphandler(int sig __unused) 946 { 947 948 got_sighup = 1; 949 } 950 951 int 952 main(int argc, char *argv[]) 953 { 954 const char *ggated_pidfile = _PATH_VARRUN "/ggated.pid"; 955 struct pidfh *pfh; 956 struct sockaddr_in serv; 957 struct sockaddr from; 958 socklen_t fromlen; 959 pid_t otherpid; 960 int ch, sfd, tmpsfd; 961 unsigned port; 962 963 bindaddr = htonl(INADDR_ANY); 964 port = G_GATE_PORT; 965 while ((ch = getopt(argc, argv, "a:hnp:F:R:S:v")) != -1) { 966 switch (ch) { 967 case 'a': 968 bindaddr = g_gate_str2ip(optarg); 969 if (bindaddr == INADDR_NONE) { 970 errx(EXIT_FAILURE, 971 "Invalid IP/host name to bind to."); 972 } 973 break; 974 case 'F': 975 ggated_pidfile = optarg; 976 break; 977 case 'n': 978 nagle = 0; 979 break; 980 case 'p': 981 errno = 0; 982 port = strtoul(optarg, NULL, 10); 983 if (port == 0 && errno != 0) 984 errx(EXIT_FAILURE, "Invalid port."); 985 break; 986 case 'R': 987 errno = 0; 988 rcvbuf = strtoul(optarg, NULL, 10); 989 if (rcvbuf == 0 && errno != 0) 990 errx(EXIT_FAILURE, "Invalid rcvbuf."); 991 break; 992 case 'S': 993 errno = 0; 994 sndbuf = strtoul(optarg, NULL, 10); 995 if (sndbuf == 0 && errno != 0) 996 errx(EXIT_FAILURE, "Invalid sndbuf."); 997 break; 998 case 'v': 999 g_gate_verbose++; 1000 break; 1001 case 'h': 1002 default: 1003 usage(); 1004 } 1005 } 1006 argc -= optind; 1007 argv += optind; 1008 1009 if (argv[0] != NULL) 1010 exports_file = argv[0]; 1011 exports_get(); 1012 1013 pfh = pidfile_open(ggated_pidfile, 0600, &otherpid); 1014 if (pfh == NULL) { 1015 if (errno == EEXIST) { 1016 errx(EXIT_FAILURE, "Daemon already running, pid: %jd.", 1017 (intmax_t)otherpid); 1018 } 1019 err(EXIT_FAILURE, "Cannot open/create pidfile"); 1020 } 1021 1022 if (!g_gate_verbose) { 1023 /* Run in daemon mode. */ 1024 if (daemon(0, 0) == -1) 1025 g_gate_xlog("Cannot daemonize: %s", strerror(errno)); 1026 } 1027 1028 pidfile_write(pfh); 1029 1030 signal(SIGCHLD, SIG_IGN); 1031 1032 sfd = socket(AF_INET, SOCK_STREAM, 0); 1033 if (sfd == -1) 1034 g_gate_xlog("Cannot open stream socket: %s.", strerror(errno)); 1035 bzero(&serv, sizeof(serv)); 1036 serv.sin_family = AF_INET; 1037 serv.sin_addr.s_addr = bindaddr; 1038 serv.sin_port = htons(port); 1039 1040 g_gate_socket_settings(sfd); 1041 1042 if (bind(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) 1043 g_gate_xlog("bind(): %s.", strerror(errno)); 1044 if (listen(sfd, 5) == -1) 1045 g_gate_xlog("listen(): %s.", strerror(errno)); 1046 1047 g_gate_log(LOG_INFO, "Listen on port: %d.", port); 1048 1049 signal(SIGHUP, huphandler); 1050 1051 for (;;) { 1052 fromlen = sizeof(from); 1053 tmpsfd = accept(sfd, &from, &fromlen); 1054 if (tmpsfd == -1) 1055 g_gate_xlog("accept(): %s.", strerror(errno)); 1056 1057 if (got_sighup) { 1058 got_sighup = 0; 1059 exports_get(); 1060 } 1061 1062 if (!handshake(&from, tmpsfd)) 1063 close(tmpsfd); 1064 } 1065 close(sfd); 1066 pidfile_remove(pfh); 1067 exit(EXIT_SUCCESS); 1068 } 1069