1 /*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/types.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/refcount.h> 39 #include <sys/stat.h> 40 41 #include <geom/gate/g_gate.h> 42 43 #include <err.h> 44 #include <errno.h> 45 #include <fcntl.h> 46 #include <libgeom.h> 47 #include <pthread.h> 48 #include <signal.h> 49 #include <stdint.h> 50 #include <stdio.h> 51 #include <string.h> 52 #include <sysexits.h> 53 #include <unistd.h> 54 55 #include <activemap.h> 56 #include <nv.h> 57 #include <rangelock.h> 58 59 #include "control.h" 60 #include "event.h" 61 #include "hast.h" 62 #include "hast_proto.h" 63 #include "hastd.h" 64 #include "hooks.h" 65 #include "metadata.h" 66 #include "proto.h" 67 #include "pjdlog.h" 68 #include "subr.h" 69 #include "synch.h" 70 71 /* The is only one remote component for now. */ 72 #define ISREMOTE(no) ((no) == 1) 73 74 struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 unsigned int hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 /* 93 * Request was already confirmed to GEOM Gate. 94 */ 95 bool hio_done; 96 /* 97 * Remember replication from the time the request was initiated, 98 * so we won't get confused when replication changes on reload. 99 */ 100 int hio_replication; 101 TAILQ_ENTRY(hio) *hio_next; 102 }; 103 #define hio_free_next hio_next[0] 104 #define hio_done_next hio_next[0] 105 106 /* 107 * Free list holds unused structures. When free list is empty, we have to wait 108 * until some in-progress requests are freed. 109 */ 110 static TAILQ_HEAD(, hio) hio_free_list; 111 static pthread_mutex_t hio_free_list_lock; 112 static pthread_cond_t hio_free_list_cond; 113 /* 114 * There is one send list for every component. One requests is placed on all 115 * send lists - each component gets the same request, but each component is 116 * responsible for managing his own send list. 117 */ 118 static TAILQ_HEAD(, hio) *hio_send_list; 119 static pthread_mutex_t *hio_send_list_lock; 120 static pthread_cond_t *hio_send_list_cond; 121 /* 122 * There is one recv list for every component, although local components don't 123 * use recv lists as local requests are done synchronously. 124 */ 125 static TAILQ_HEAD(, hio) *hio_recv_list; 126 static pthread_mutex_t *hio_recv_list_lock; 127 static pthread_cond_t *hio_recv_list_cond; 128 /* 129 * Request is placed on done list by the slowest component (the one that 130 * decreased hio_countdown from 1 to 0). 131 */ 132 static TAILQ_HEAD(, hio) hio_done_list; 133 static pthread_mutex_t hio_done_list_lock; 134 static pthread_cond_t hio_done_list_cond; 135 /* 136 * Structure below are for interaction with sync thread. 137 */ 138 static bool sync_inprogress; 139 static pthread_mutex_t sync_lock; 140 static pthread_cond_t sync_cond; 141 /* 142 * The lock below allows to synchornize access to remote connections. 143 */ 144 static pthread_rwlock_t *hio_remote_lock; 145 146 /* 147 * Lock to synchronize metadata updates. Also synchronize access to 148 * hr_primary_localcnt and hr_primary_remotecnt fields. 149 */ 150 static pthread_mutex_t metadata_lock; 151 152 /* 153 * Maximum number of outstanding I/O requests. 154 */ 155 #define HAST_HIO_MAX 256 156 /* 157 * Number of components. At this point there are only two components: local 158 * and remote, but in the future it might be possible to use multiple local 159 * and remote components. 160 */ 161 #define HAST_NCOMPONENTS 2 162 163 #define ISCONNECTED(res, no) \ 164 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 165 166 #define QUEUE_INSERT1(hio, name, ncomp) do { \ 167 bool _wakeup; \ 168 \ 169 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 170 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 171 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 172 hio_next[(ncomp)]); \ 173 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 174 if (_wakeup) \ 175 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 176 } while (0) 177 #define QUEUE_INSERT2(hio, name) do { \ 178 bool _wakeup; \ 179 \ 180 mtx_lock(&hio_##name##_list_lock); \ 181 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 182 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 183 mtx_unlock(&hio_##name##_list_lock); \ 184 if (_wakeup) \ 185 cv_signal(&hio_##name##_list_cond); \ 186 } while (0) 187 #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 188 bool _last; \ 189 \ 190 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 191 _last = false; \ 192 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 193 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 194 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 195 if ((timeout) != 0) \ 196 _last = true; \ 197 } \ 198 if (hio != NULL) { \ 199 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 200 hio_next[(ncomp)]); \ 201 } \ 202 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 203 } while (0) 204 #define QUEUE_TAKE2(hio, name) do { \ 205 mtx_lock(&hio_##name##_list_lock); \ 206 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 207 cv_wait(&hio_##name##_list_cond, \ 208 &hio_##name##_list_lock); \ 209 } \ 210 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 211 mtx_unlock(&hio_##name##_list_lock); \ 212 } while (0) 213 214 #define SYNCREQ(hio) do { \ 215 (hio)->hio_ggio.gctl_unit = -1; \ 216 (hio)->hio_ggio.gctl_seq = 1; \ 217 } while (0) 218 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 219 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 220 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 221 222 static struct hast_resource *gres; 223 224 static pthread_mutex_t range_lock; 225 static struct rangelocks *range_regular; 226 static bool range_regular_wait; 227 static pthread_cond_t range_regular_cond; 228 static struct rangelocks *range_sync; 229 static bool range_sync_wait; 230 static pthread_cond_t range_sync_cond; 231 static bool fullystarted; 232 233 static void *ggate_recv_thread(void *arg); 234 static void *local_send_thread(void *arg); 235 static void *remote_send_thread(void *arg); 236 static void *remote_recv_thread(void *arg); 237 static void *ggate_send_thread(void *arg); 238 static void *sync_thread(void *arg); 239 static void *guard_thread(void *arg); 240 241 static void 242 cleanup(struct hast_resource *res) 243 { 244 int rerrno; 245 246 /* Remember errno. */ 247 rerrno = errno; 248 249 /* Destroy ggate provider if we created one. */ 250 if (res->hr_ggateunit >= 0) { 251 struct g_gate_ctl_destroy ggiod; 252 253 bzero(&ggiod, sizeof(ggiod)); 254 ggiod.gctl_version = G_GATE_VERSION; 255 ggiod.gctl_unit = res->hr_ggateunit; 256 ggiod.gctl_force = 1; 257 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 258 pjdlog_errno(LOG_WARNING, 259 "Unable to destroy hast/%s device", 260 res->hr_provname); 261 } 262 res->hr_ggateunit = -1; 263 } 264 265 /* Restore errno. */ 266 errno = rerrno; 267 } 268 269 static __dead2 void 270 primary_exit(int exitcode, const char *fmt, ...) 271 { 272 va_list ap; 273 274 PJDLOG_ASSERT(exitcode != EX_OK); 275 va_start(ap, fmt); 276 pjdlogv_errno(LOG_ERR, fmt, ap); 277 va_end(ap); 278 cleanup(gres); 279 exit(exitcode); 280 } 281 282 static __dead2 void 283 primary_exitx(int exitcode, const char *fmt, ...) 284 { 285 va_list ap; 286 287 va_start(ap, fmt); 288 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 289 va_end(ap); 290 cleanup(gres); 291 exit(exitcode); 292 } 293 294 static int 295 hast_activemap_flush(struct hast_resource *res) 296 { 297 const unsigned char *buf; 298 size_t size; 299 300 buf = activemap_bitmap(res->hr_amp, &size); 301 PJDLOG_ASSERT(buf != NULL); 302 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 303 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 304 (ssize_t)size) { 305 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 306 return (-1); 307 } 308 if (res->hr_metaflush == 1 && g_flush(res->hr_localfd) == -1) { 309 if (errno == EOPNOTSUPP) { 310 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 311 res->hr_localpath); 312 res->hr_metaflush = 0; 313 } else { 314 pjdlog_errno(LOG_ERR, 315 "Unable to flush disk cache on activemap update"); 316 return (-1); 317 } 318 } 319 return (0); 320 } 321 322 static bool 323 real_remote(const struct hast_resource *res) 324 { 325 326 return (strcmp(res->hr_remoteaddr, "none") != 0); 327 } 328 329 static void 330 init_environment(struct hast_resource *res __unused) 331 { 332 struct hio *hio; 333 unsigned int ii, ncomps; 334 335 /* 336 * In the future it might be per-resource value. 337 */ 338 ncomps = HAST_NCOMPONENTS; 339 340 /* 341 * Allocate memory needed by lists. 342 */ 343 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 344 if (hio_send_list == NULL) { 345 primary_exitx(EX_TEMPFAIL, 346 "Unable to allocate %zu bytes of memory for send lists.", 347 sizeof(hio_send_list[0]) * ncomps); 348 } 349 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 350 if (hio_send_list_lock == NULL) { 351 primary_exitx(EX_TEMPFAIL, 352 "Unable to allocate %zu bytes of memory for send list locks.", 353 sizeof(hio_send_list_lock[0]) * ncomps); 354 } 355 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 356 if (hio_send_list_cond == NULL) { 357 primary_exitx(EX_TEMPFAIL, 358 "Unable to allocate %zu bytes of memory for send list condition variables.", 359 sizeof(hio_send_list_cond[0]) * ncomps); 360 } 361 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 362 if (hio_recv_list == NULL) { 363 primary_exitx(EX_TEMPFAIL, 364 "Unable to allocate %zu bytes of memory for recv lists.", 365 sizeof(hio_recv_list[0]) * ncomps); 366 } 367 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 368 if (hio_recv_list_lock == NULL) { 369 primary_exitx(EX_TEMPFAIL, 370 "Unable to allocate %zu bytes of memory for recv list locks.", 371 sizeof(hio_recv_list_lock[0]) * ncomps); 372 } 373 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 374 if (hio_recv_list_cond == NULL) { 375 primary_exitx(EX_TEMPFAIL, 376 "Unable to allocate %zu bytes of memory for recv list condition variables.", 377 sizeof(hio_recv_list_cond[0]) * ncomps); 378 } 379 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 380 if (hio_remote_lock == NULL) { 381 primary_exitx(EX_TEMPFAIL, 382 "Unable to allocate %zu bytes of memory for remote connections locks.", 383 sizeof(hio_remote_lock[0]) * ncomps); 384 } 385 386 /* 387 * Initialize lists, their locks and theirs condition variables. 388 */ 389 TAILQ_INIT(&hio_free_list); 390 mtx_init(&hio_free_list_lock); 391 cv_init(&hio_free_list_cond); 392 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 393 TAILQ_INIT(&hio_send_list[ii]); 394 mtx_init(&hio_send_list_lock[ii]); 395 cv_init(&hio_send_list_cond[ii]); 396 TAILQ_INIT(&hio_recv_list[ii]); 397 mtx_init(&hio_recv_list_lock[ii]); 398 cv_init(&hio_recv_list_cond[ii]); 399 rw_init(&hio_remote_lock[ii]); 400 } 401 TAILQ_INIT(&hio_done_list); 402 mtx_init(&hio_done_list_lock); 403 cv_init(&hio_done_list_cond); 404 mtx_init(&metadata_lock); 405 406 /* 407 * Allocate requests pool and initialize requests. 408 */ 409 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 410 hio = malloc(sizeof(*hio)); 411 if (hio == NULL) { 412 primary_exitx(EX_TEMPFAIL, 413 "Unable to allocate %zu bytes of memory for hio request.", 414 sizeof(*hio)); 415 } 416 hio->hio_countdown = 0; 417 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 418 if (hio->hio_errors == NULL) { 419 primary_exitx(EX_TEMPFAIL, 420 "Unable allocate %zu bytes of memory for hio errors.", 421 sizeof(hio->hio_errors[0]) * ncomps); 422 } 423 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 424 if (hio->hio_next == NULL) { 425 primary_exitx(EX_TEMPFAIL, 426 "Unable allocate %zu bytes of memory for hio_next field.", 427 sizeof(hio->hio_next[0]) * ncomps); 428 } 429 hio->hio_ggio.gctl_version = G_GATE_VERSION; 430 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 431 if (hio->hio_ggio.gctl_data == NULL) { 432 primary_exitx(EX_TEMPFAIL, 433 "Unable to allocate %zu bytes of memory for gctl_data.", 434 MAXPHYS); 435 } 436 hio->hio_ggio.gctl_length = MAXPHYS; 437 hio->hio_ggio.gctl_error = 0; 438 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 439 } 440 } 441 442 static bool 443 init_resuid(struct hast_resource *res) 444 { 445 446 mtx_lock(&metadata_lock); 447 if (res->hr_resuid != 0) { 448 mtx_unlock(&metadata_lock); 449 return (false); 450 } else { 451 /* Initialize unique resource identifier. */ 452 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 453 mtx_unlock(&metadata_lock); 454 if (metadata_write(res) == -1) 455 exit(EX_NOINPUT); 456 return (true); 457 } 458 } 459 460 static void 461 init_local(struct hast_resource *res) 462 { 463 unsigned char *buf; 464 size_t mapsize; 465 466 if (metadata_read(res, true) == -1) 467 exit(EX_NOINPUT); 468 mtx_init(&res->hr_amp_lock); 469 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 470 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 471 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 472 } 473 mtx_init(&range_lock); 474 cv_init(&range_regular_cond); 475 if (rangelock_init(&range_regular) == -1) 476 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 477 cv_init(&range_sync_cond); 478 if (rangelock_init(&range_sync) == -1) 479 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 480 mapsize = activemap_ondisk_size(res->hr_amp); 481 buf = calloc(1, mapsize); 482 if (buf == NULL) { 483 primary_exitx(EX_TEMPFAIL, 484 "Unable to allocate buffer for activemap."); 485 } 486 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 487 (ssize_t)mapsize) { 488 primary_exit(EX_NOINPUT, "Unable to read activemap"); 489 } 490 activemap_copyin(res->hr_amp, buf, mapsize); 491 free(buf); 492 if (res->hr_resuid != 0) 493 return; 494 /* 495 * We're using provider for the first time. Initialize local and remote 496 * counters. We don't initialize resuid here, as we want to do it just 497 * in time. The reason for this is that we want to inform secondary 498 * that there were no writes yet, so there is no need to synchronize 499 * anything. 500 */ 501 res->hr_primary_localcnt = 0; 502 res->hr_primary_remotecnt = 0; 503 if (metadata_write(res) == -1) 504 exit(EX_NOINPUT); 505 } 506 507 static int 508 primary_connect(struct hast_resource *res, struct proto_conn **connp) 509 { 510 struct proto_conn *conn; 511 int16_t val; 512 513 val = 1; 514 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 515 primary_exit(EX_TEMPFAIL, 516 "Unable to send connection request to parent"); 517 } 518 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 519 primary_exit(EX_TEMPFAIL, 520 "Unable to receive reply to connection request from parent"); 521 } 522 if (val != 0) { 523 errno = val; 524 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 525 res->hr_remoteaddr); 526 return (-1); 527 } 528 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 529 primary_exit(EX_TEMPFAIL, 530 "Unable to receive connection from parent"); 531 } 532 if (proto_connect_wait(conn, res->hr_timeout) == -1) { 533 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 534 res->hr_remoteaddr); 535 proto_close(conn); 536 return (-1); 537 } 538 /* Error in setting timeout is not critical, but why should it fail? */ 539 if (proto_timeout(conn, res->hr_timeout) == -1) 540 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 541 542 *connp = conn; 543 544 return (0); 545 } 546 547 static int 548 init_remote(struct hast_resource *res, struct proto_conn **inp, 549 struct proto_conn **outp) 550 { 551 struct proto_conn *in, *out; 552 struct nv *nvout, *nvin; 553 const unsigned char *token; 554 unsigned char *map; 555 const char *errmsg; 556 int32_t extentsize; 557 int64_t datasize; 558 uint32_t mapsize; 559 size_t size; 560 int error; 561 562 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 563 PJDLOG_ASSERT(real_remote(res)); 564 565 in = out = NULL; 566 errmsg = NULL; 567 568 if (primary_connect(res, &out) == -1) 569 return (ECONNREFUSED); 570 571 error = ECONNABORTED; 572 573 /* 574 * First handshake step. 575 * Setup outgoing connection with remote node. 576 */ 577 nvout = nv_alloc(); 578 nv_add_string(nvout, res->hr_name, "resource"); 579 if (nv_error(nvout) != 0) { 580 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 581 "Unable to allocate header for connection with %s", 582 res->hr_remoteaddr); 583 nv_free(nvout); 584 goto close; 585 } 586 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 587 pjdlog_errno(LOG_WARNING, 588 "Unable to send handshake header to %s", 589 res->hr_remoteaddr); 590 nv_free(nvout); 591 goto close; 592 } 593 nv_free(nvout); 594 if (hast_proto_recv_hdr(out, &nvin) == -1) { 595 pjdlog_errno(LOG_WARNING, 596 "Unable to receive handshake header from %s", 597 res->hr_remoteaddr); 598 goto close; 599 } 600 errmsg = nv_get_string(nvin, "errmsg"); 601 if (errmsg != NULL) { 602 pjdlog_warning("%s", errmsg); 603 if (nv_exists(nvin, "wait")) 604 error = EBUSY; 605 nv_free(nvin); 606 goto close; 607 } 608 token = nv_get_uint8_array(nvin, &size, "token"); 609 if (token == NULL) { 610 pjdlog_warning("Handshake header from %s has no 'token' field.", 611 res->hr_remoteaddr); 612 nv_free(nvin); 613 goto close; 614 } 615 if (size != sizeof(res->hr_token)) { 616 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 617 res->hr_remoteaddr, size, sizeof(res->hr_token)); 618 nv_free(nvin); 619 goto close; 620 } 621 bcopy(token, res->hr_token, sizeof(res->hr_token)); 622 nv_free(nvin); 623 624 /* 625 * Second handshake step. 626 * Setup incoming connection with remote node. 627 */ 628 if (primary_connect(res, &in) == -1) 629 goto close; 630 631 nvout = nv_alloc(); 632 nv_add_string(nvout, res->hr_name, "resource"); 633 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 634 "token"); 635 if (res->hr_resuid == 0) { 636 /* 637 * The resuid field was not yet initialized. 638 * Because we do synchronization inside init_resuid(), it is 639 * possible that someone already initialized it, the function 640 * will return false then, but if we successfully initialized 641 * it, we will get true. True means that there were no writes 642 * to this resource yet and we want to inform secondary that 643 * synchronization is not needed by sending "virgin" argument. 644 */ 645 if (init_resuid(res)) 646 nv_add_int8(nvout, 1, "virgin"); 647 } 648 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 649 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 650 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 651 if (nv_error(nvout) != 0) { 652 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 653 "Unable to allocate header for connection with %s", 654 res->hr_remoteaddr); 655 nv_free(nvout); 656 goto close; 657 } 658 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 659 pjdlog_errno(LOG_WARNING, 660 "Unable to send handshake header to %s", 661 res->hr_remoteaddr); 662 nv_free(nvout); 663 goto close; 664 } 665 nv_free(nvout); 666 if (hast_proto_recv_hdr(out, &nvin) == -1) { 667 pjdlog_errno(LOG_WARNING, 668 "Unable to receive handshake header from %s", 669 res->hr_remoteaddr); 670 goto close; 671 } 672 errmsg = nv_get_string(nvin, "errmsg"); 673 if (errmsg != NULL) { 674 pjdlog_warning("%s", errmsg); 675 nv_free(nvin); 676 goto close; 677 } 678 datasize = nv_get_int64(nvin, "datasize"); 679 if (datasize != res->hr_datasize) { 680 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 681 (intmax_t)res->hr_datasize, (intmax_t)datasize); 682 nv_free(nvin); 683 goto close; 684 } 685 extentsize = nv_get_int32(nvin, "extentsize"); 686 if (extentsize != res->hr_extentsize) { 687 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 688 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 689 nv_free(nvin); 690 goto close; 691 } 692 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 693 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 694 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 695 if (nv_exists(nvin, "virgin")) { 696 /* 697 * Secondary was reinitialized, bump localcnt if it is 0 as 698 * only we have the data. 699 */ 700 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 701 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 702 703 if (res->hr_primary_localcnt == 0) { 704 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 705 706 mtx_lock(&metadata_lock); 707 res->hr_primary_localcnt++; 708 pjdlog_debug(1, "Increasing localcnt to %ju.", 709 (uintmax_t)res->hr_primary_localcnt); 710 (void)metadata_write(res); 711 mtx_unlock(&metadata_lock); 712 } 713 } 714 map = NULL; 715 mapsize = nv_get_uint32(nvin, "mapsize"); 716 if (mapsize > 0) { 717 map = malloc(mapsize); 718 if (map == NULL) { 719 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 720 (uintmax_t)mapsize); 721 nv_free(nvin); 722 goto close; 723 } 724 /* 725 * Remote node have some dirty extents on its own, lets 726 * download its activemap. 727 */ 728 if (hast_proto_recv_data(res, out, nvin, map, 729 mapsize) == -1) { 730 pjdlog_errno(LOG_ERR, 731 "Unable to receive remote activemap"); 732 nv_free(nvin); 733 free(map); 734 goto close; 735 } 736 /* 737 * Merge local and remote bitmaps. 738 */ 739 activemap_merge(res->hr_amp, map, mapsize); 740 free(map); 741 /* 742 * Now that we merged bitmaps from both nodes, flush it to the 743 * disk before we start to synchronize. 744 */ 745 (void)hast_activemap_flush(res); 746 } 747 nv_free(nvin); 748 #ifdef notyet 749 /* Setup directions. */ 750 if (proto_send(out, NULL, 0) == -1) 751 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 752 if (proto_recv(in, NULL, 0) == -1) 753 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 754 #endif 755 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 756 if (inp != NULL && outp != NULL) { 757 *inp = in; 758 *outp = out; 759 } else { 760 res->hr_remotein = in; 761 res->hr_remoteout = out; 762 } 763 event_send(res, EVENT_CONNECT); 764 return (0); 765 close: 766 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 767 event_send(res, EVENT_SPLITBRAIN); 768 proto_close(out); 769 if (in != NULL) 770 proto_close(in); 771 return (error); 772 } 773 774 static void 775 sync_start(void) 776 { 777 778 mtx_lock(&sync_lock); 779 sync_inprogress = true; 780 mtx_unlock(&sync_lock); 781 cv_signal(&sync_cond); 782 } 783 784 static void 785 sync_stop(void) 786 { 787 788 mtx_lock(&sync_lock); 789 if (sync_inprogress) 790 sync_inprogress = false; 791 mtx_unlock(&sync_lock); 792 } 793 794 static void 795 init_ggate(struct hast_resource *res) 796 { 797 struct g_gate_ctl_create ggiocreate; 798 struct g_gate_ctl_cancel ggiocancel; 799 800 /* 801 * We communicate with ggate via /dev/ggctl. Open it. 802 */ 803 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 804 if (res->hr_ggatefd == -1) 805 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 806 /* 807 * Create provider before trying to connect, as connection failure 808 * is not critical, but may take some time. 809 */ 810 bzero(&ggiocreate, sizeof(ggiocreate)); 811 ggiocreate.gctl_version = G_GATE_VERSION; 812 ggiocreate.gctl_mediasize = res->hr_datasize; 813 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 814 ggiocreate.gctl_flags = 0; 815 ggiocreate.gctl_maxcount = 0; 816 ggiocreate.gctl_timeout = 0; 817 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 818 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 819 res->hr_provname); 820 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 821 pjdlog_info("Device hast/%s created.", res->hr_provname); 822 res->hr_ggateunit = ggiocreate.gctl_unit; 823 return; 824 } 825 if (errno != EEXIST) { 826 primary_exit(EX_OSERR, "Unable to create hast/%s device", 827 res->hr_provname); 828 } 829 pjdlog_debug(1, 830 "Device hast/%s already exists, we will try to take it over.", 831 res->hr_provname); 832 /* 833 * If we received EEXIST, we assume that the process who created the 834 * provider died and didn't clean up. In that case we will start from 835 * where he left of. 836 */ 837 bzero(&ggiocancel, sizeof(ggiocancel)); 838 ggiocancel.gctl_version = G_GATE_VERSION; 839 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 840 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 841 res->hr_provname); 842 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 843 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 844 res->hr_ggateunit = ggiocancel.gctl_unit; 845 return; 846 } 847 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 848 res->hr_provname); 849 } 850 851 void 852 hastd_primary(struct hast_resource *res) 853 { 854 pthread_t td; 855 pid_t pid; 856 int error, mode, debuglevel; 857 858 /* 859 * Create communication channel for sending control commands from 860 * parent to child. 861 */ 862 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 863 /* TODO: There's no need for this to be fatal error. */ 864 KEEP_ERRNO((void)pidfile_remove(pfh)); 865 pjdlog_exit(EX_OSERR, 866 "Unable to create control sockets between parent and child"); 867 } 868 /* 869 * Create communication channel for sending events from child to parent. 870 */ 871 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 872 /* TODO: There's no need for this to be fatal error. */ 873 KEEP_ERRNO((void)pidfile_remove(pfh)); 874 pjdlog_exit(EX_OSERR, 875 "Unable to create event sockets between child and parent"); 876 } 877 /* 878 * Create communication channel for sending connection requests from 879 * child to parent. 880 */ 881 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 882 /* TODO: There's no need for this to be fatal error. */ 883 KEEP_ERRNO((void)pidfile_remove(pfh)); 884 pjdlog_exit(EX_OSERR, 885 "Unable to create connection sockets between child and parent"); 886 } 887 888 pid = fork(); 889 if (pid == -1) { 890 /* TODO: There's no need for this to be fatal error. */ 891 KEEP_ERRNO((void)pidfile_remove(pfh)); 892 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 893 } 894 895 if (pid > 0) { 896 /* This is parent. */ 897 /* Declare that we are receiver. */ 898 proto_recv(res->hr_event, NULL, 0); 899 proto_recv(res->hr_conn, NULL, 0); 900 /* Declare that we are sender. */ 901 proto_send(res->hr_ctrl, NULL, 0); 902 res->hr_workerpid = pid; 903 return; 904 } 905 906 gres = res; 907 mode = pjdlog_mode_get(); 908 debuglevel = pjdlog_debug_get(); 909 910 /* Declare that we are sender. */ 911 proto_send(res->hr_event, NULL, 0); 912 proto_send(res->hr_conn, NULL, 0); 913 /* Declare that we are receiver. */ 914 proto_recv(res->hr_ctrl, NULL, 0); 915 descriptors_cleanup(res); 916 917 descriptors_assert(res, mode); 918 919 pjdlog_init(mode); 920 pjdlog_debug_set(debuglevel); 921 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 922 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 923 924 init_local(res); 925 init_ggate(res); 926 init_environment(res); 927 928 if (drop_privs(res) != 0) { 929 cleanup(res); 930 exit(EX_CONFIG); 931 } 932 pjdlog_info("Privileges successfully dropped."); 933 934 /* 935 * Create the guard thread first, so we can handle signals from the 936 * very beginning. 937 */ 938 error = pthread_create(&td, NULL, guard_thread, res); 939 PJDLOG_ASSERT(error == 0); 940 /* 941 * Create the control thread before sending any event to the parent, 942 * as we can deadlock when parent sends control request to worker, 943 * but worker has no control thread started yet, so parent waits. 944 * In the meantime worker sends an event to the parent, but parent 945 * is unable to handle the event, because it waits for control 946 * request response. 947 */ 948 error = pthread_create(&td, NULL, ctrl_thread, res); 949 PJDLOG_ASSERT(error == 0); 950 if (real_remote(res)) { 951 error = init_remote(res, NULL, NULL); 952 if (error == 0) { 953 sync_start(); 954 } else if (error == EBUSY) { 955 time_t start = time(NULL); 956 957 pjdlog_warning("Waiting for remote node to become %s for %ds.", 958 role2str(HAST_ROLE_SECONDARY), 959 res->hr_timeout); 960 for (;;) { 961 sleep(1); 962 error = init_remote(res, NULL, NULL); 963 if (error != EBUSY) 964 break; 965 if (time(NULL) > start + res->hr_timeout) 966 break; 967 } 968 if (error == EBUSY) { 969 pjdlog_warning("Remote node is still %s, starting anyway.", 970 role2str(HAST_ROLE_PRIMARY)); 971 } 972 } 973 } 974 error = pthread_create(&td, NULL, ggate_recv_thread, res); 975 PJDLOG_ASSERT(error == 0); 976 error = pthread_create(&td, NULL, local_send_thread, res); 977 PJDLOG_ASSERT(error == 0); 978 error = pthread_create(&td, NULL, remote_send_thread, res); 979 PJDLOG_ASSERT(error == 0); 980 error = pthread_create(&td, NULL, remote_recv_thread, res); 981 PJDLOG_ASSERT(error == 0); 982 error = pthread_create(&td, NULL, ggate_send_thread, res); 983 PJDLOG_ASSERT(error == 0); 984 fullystarted = true; 985 (void)sync_thread(res); 986 } 987 988 static void 989 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 990 { 991 char msg[1024]; 992 va_list ap; 993 int len; 994 995 va_start(ap, fmt); 996 len = vsnprintf(msg, sizeof(msg), fmt, ap); 997 va_end(ap); 998 if ((size_t)len < sizeof(msg)) { 999 switch (ggio->gctl_cmd) { 1000 case BIO_READ: 1001 (void)snprintf(msg + len, sizeof(msg) - len, 1002 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 1003 (uintmax_t)ggio->gctl_length); 1004 break; 1005 case BIO_DELETE: 1006 (void)snprintf(msg + len, sizeof(msg) - len, 1007 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 1008 (uintmax_t)ggio->gctl_length); 1009 break; 1010 case BIO_FLUSH: 1011 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 1012 break; 1013 case BIO_WRITE: 1014 (void)snprintf(msg + len, sizeof(msg) - len, 1015 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 1016 (uintmax_t)ggio->gctl_length); 1017 break; 1018 default: 1019 (void)snprintf(msg + len, sizeof(msg) - len, 1020 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 1021 break; 1022 } 1023 } 1024 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1025 } 1026 1027 static void 1028 remote_close(struct hast_resource *res, int ncomp) 1029 { 1030 1031 rw_wlock(&hio_remote_lock[ncomp]); 1032 /* 1033 * Check for a race between dropping rlock and acquiring wlock - 1034 * another thread can close connection in-between. 1035 */ 1036 if (!ISCONNECTED(res, ncomp)) { 1037 PJDLOG_ASSERT(res->hr_remotein == NULL); 1038 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1039 rw_unlock(&hio_remote_lock[ncomp]); 1040 return; 1041 } 1042 1043 PJDLOG_ASSERT(res->hr_remotein != NULL); 1044 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1045 1046 pjdlog_debug(2, "Closing incoming connection to %s.", 1047 res->hr_remoteaddr); 1048 proto_close(res->hr_remotein); 1049 res->hr_remotein = NULL; 1050 pjdlog_debug(2, "Closing outgoing connection to %s.", 1051 res->hr_remoteaddr); 1052 proto_close(res->hr_remoteout); 1053 res->hr_remoteout = NULL; 1054 1055 rw_unlock(&hio_remote_lock[ncomp]); 1056 1057 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1058 1059 /* 1060 * Stop synchronization if in-progress. 1061 */ 1062 sync_stop(); 1063 1064 event_send(res, EVENT_DISCONNECT); 1065 } 1066 1067 /* 1068 * Acknowledge write completion to the kernel, but don't update activemap yet. 1069 */ 1070 static void 1071 write_complete(struct hast_resource *res, struct hio *hio) 1072 { 1073 struct g_gate_ctl_io *ggio; 1074 unsigned int ncomp; 1075 1076 PJDLOG_ASSERT(!hio->hio_done); 1077 1078 ggio = &hio->hio_ggio; 1079 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1080 1081 /* 1082 * Bump local count if this is first write after 1083 * connection failure with remote node. 1084 */ 1085 ncomp = 1; 1086 rw_rlock(&hio_remote_lock[ncomp]); 1087 if (!ISCONNECTED(res, ncomp)) { 1088 mtx_lock(&metadata_lock); 1089 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1090 res->hr_primary_localcnt++; 1091 pjdlog_debug(1, "Increasing localcnt to %ju.", 1092 (uintmax_t)res->hr_primary_localcnt); 1093 (void)metadata_write(res); 1094 } 1095 mtx_unlock(&metadata_lock); 1096 } 1097 rw_unlock(&hio_remote_lock[ncomp]); 1098 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1099 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1100 hio->hio_done = true; 1101 } 1102 1103 /* 1104 * Thread receives ggate I/O requests from the kernel and passes them to 1105 * appropriate threads: 1106 * WRITE - always goes to both local_send and remote_send threads 1107 * READ (when the block is up-to-date on local component) - 1108 * only local_send thread 1109 * READ (when the block isn't up-to-date on local component) - 1110 * only remote_send thread 1111 * DELETE - always goes to both local_send and remote_send threads 1112 * FLUSH - always goes to both local_send and remote_send threads 1113 */ 1114 static void * 1115 ggate_recv_thread(void *arg) 1116 { 1117 struct hast_resource *res = arg; 1118 struct g_gate_ctl_io *ggio; 1119 struct hio *hio; 1120 unsigned int ii, ncomp, ncomps; 1121 int error; 1122 1123 for (;;) { 1124 pjdlog_debug(2, "ggate_recv: Taking free request."); 1125 QUEUE_TAKE2(hio, free); 1126 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1127 ggio = &hio->hio_ggio; 1128 ggio->gctl_unit = res->hr_ggateunit; 1129 ggio->gctl_length = MAXPHYS; 1130 ggio->gctl_error = 0; 1131 hio->hio_done = false; 1132 hio->hio_replication = res->hr_replication; 1133 pjdlog_debug(2, 1134 "ggate_recv: (%p) Waiting for request from the kernel.", 1135 hio); 1136 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1137 if (sigexit_received) 1138 pthread_exit(NULL); 1139 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1140 } 1141 error = ggio->gctl_error; 1142 switch (error) { 1143 case 0: 1144 break; 1145 case ECANCELED: 1146 /* Exit gracefully. */ 1147 if (!sigexit_received) { 1148 pjdlog_debug(2, 1149 "ggate_recv: (%p) Received cancel from the kernel.", 1150 hio); 1151 pjdlog_info("Received cancel from the kernel, exiting."); 1152 } 1153 pthread_exit(NULL); 1154 case ENOMEM: 1155 /* 1156 * Buffer too small? Impossible, we allocate MAXPHYS 1157 * bytes - request can't be bigger than that. 1158 */ 1159 /* FALLTHROUGH */ 1160 case ENXIO: 1161 default: 1162 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1163 strerror(error)); 1164 } 1165 1166 ncomp = 0; 1167 ncomps = HAST_NCOMPONENTS; 1168 1169 for (ii = 0; ii < ncomps; ii++) 1170 hio->hio_errors[ii] = EINVAL; 1171 reqlog(LOG_DEBUG, 2, ggio, 1172 "ggate_recv: (%p) Request received from the kernel: ", 1173 hio); 1174 1175 /* 1176 * Inform all components about new write request. 1177 * For read request prefer local component unless the given 1178 * range is out-of-date, then use remote component. 1179 */ 1180 switch (ggio->gctl_cmd) { 1181 case BIO_READ: 1182 res->hr_stat_read++; 1183 ncomps = 1; 1184 mtx_lock(&metadata_lock); 1185 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1186 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1187 /* 1188 * This range is up-to-date on local component, 1189 * so handle request locally. 1190 */ 1191 /* Local component is 0 for now. */ 1192 ncomp = 0; 1193 } else /* if (res->hr_syncsrc == 1194 HAST_SYNCSRC_SECONDARY) */ { 1195 PJDLOG_ASSERT(res->hr_syncsrc == 1196 HAST_SYNCSRC_SECONDARY); 1197 /* 1198 * This range is out-of-date on local component, 1199 * so send request to the remote node. 1200 */ 1201 /* Remote component is 1 for now. */ 1202 ncomp = 1; 1203 } 1204 mtx_unlock(&metadata_lock); 1205 break; 1206 case BIO_WRITE: 1207 res->hr_stat_write++; 1208 if (res->hr_resuid == 0 && 1209 res->hr_primary_localcnt == 0) { 1210 /* This is first write. */ 1211 res->hr_primary_localcnt = 1; 1212 } 1213 for (;;) { 1214 mtx_lock(&range_lock); 1215 if (rangelock_islocked(range_sync, 1216 ggio->gctl_offset, ggio->gctl_length)) { 1217 pjdlog_debug(2, 1218 "regular: Range offset=%jd length=%zu locked.", 1219 (intmax_t)ggio->gctl_offset, 1220 (size_t)ggio->gctl_length); 1221 range_regular_wait = true; 1222 cv_wait(&range_regular_cond, &range_lock); 1223 range_regular_wait = false; 1224 mtx_unlock(&range_lock); 1225 continue; 1226 } 1227 if (rangelock_add(range_regular, 1228 ggio->gctl_offset, ggio->gctl_length) == -1) { 1229 mtx_unlock(&range_lock); 1230 pjdlog_debug(2, 1231 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1232 (intmax_t)ggio->gctl_offset, 1233 (size_t)ggio->gctl_length); 1234 sleep(1); 1235 continue; 1236 } 1237 mtx_unlock(&range_lock); 1238 break; 1239 } 1240 mtx_lock(&res->hr_amp_lock); 1241 if (activemap_write_start(res->hr_amp, 1242 ggio->gctl_offset, ggio->gctl_length)) { 1243 res->hr_stat_activemap_update++; 1244 (void)hast_activemap_flush(res); 1245 } 1246 mtx_unlock(&res->hr_amp_lock); 1247 break; 1248 case BIO_DELETE: 1249 res->hr_stat_delete++; 1250 break; 1251 case BIO_FLUSH: 1252 res->hr_stat_flush++; 1253 break; 1254 } 1255 pjdlog_debug(2, 1256 "ggate_recv: (%p) Moving request to the send queues.", hio); 1257 refcount_init(&hio->hio_countdown, ncomps); 1258 for (ii = ncomp; ii < ncomp + ncomps; ii++) 1259 QUEUE_INSERT1(hio, send, ii); 1260 } 1261 /* NOTREACHED */ 1262 return (NULL); 1263 } 1264 1265 /* 1266 * Thread reads from or writes to local component. 1267 * If local read fails, it redirects it to remote_send thread. 1268 */ 1269 static void * 1270 local_send_thread(void *arg) 1271 { 1272 struct hast_resource *res = arg; 1273 struct g_gate_ctl_io *ggio; 1274 struct hio *hio; 1275 unsigned int ncomp, rncomp; 1276 ssize_t ret; 1277 1278 /* Local component is 0 for now. */ 1279 ncomp = 0; 1280 /* Remote component is 1 for now. */ 1281 rncomp = 1; 1282 1283 for (;;) { 1284 pjdlog_debug(2, "local_send: Taking request."); 1285 QUEUE_TAKE1(hio, send, ncomp, 0); 1286 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1287 ggio = &hio->hio_ggio; 1288 switch (ggio->gctl_cmd) { 1289 case BIO_READ: 1290 ret = pread(res->hr_localfd, ggio->gctl_data, 1291 ggio->gctl_length, 1292 ggio->gctl_offset + res->hr_localoff); 1293 if (ret == ggio->gctl_length) 1294 hio->hio_errors[ncomp] = 0; 1295 else if (!ISSYNCREQ(hio)) { 1296 /* 1297 * If READ failed, try to read from remote node. 1298 */ 1299 if (ret == -1) { 1300 reqlog(LOG_WARNING, 0, ggio, 1301 "Local request failed (%s), trying remote node. ", 1302 strerror(errno)); 1303 } else if (ret != ggio->gctl_length) { 1304 reqlog(LOG_WARNING, 0, ggio, 1305 "Local request failed (%zd != %jd), trying remote node. ", 1306 ret, (intmax_t)ggio->gctl_length); 1307 } 1308 QUEUE_INSERT1(hio, send, rncomp); 1309 continue; 1310 } 1311 break; 1312 case BIO_WRITE: 1313 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1314 ggio->gctl_length, 1315 ggio->gctl_offset + res->hr_localoff); 1316 if (ret == -1) { 1317 hio->hio_errors[ncomp] = errno; 1318 reqlog(LOG_WARNING, 0, ggio, 1319 "Local request failed (%s): ", 1320 strerror(errno)); 1321 } else if (ret != ggio->gctl_length) { 1322 hio->hio_errors[ncomp] = EIO; 1323 reqlog(LOG_WARNING, 0, ggio, 1324 "Local request failed (%zd != %jd): ", 1325 ret, (intmax_t)ggio->gctl_length); 1326 } else { 1327 hio->hio_errors[ncomp] = 0; 1328 if (hio->hio_replication == 1329 HAST_REPLICATION_ASYNC && 1330 !ISSYNCREQ(hio)) { 1331 ggio->gctl_error = 0; 1332 write_complete(res, hio); 1333 } 1334 } 1335 break; 1336 case BIO_DELETE: 1337 ret = g_delete(res->hr_localfd, 1338 ggio->gctl_offset + res->hr_localoff, 1339 ggio->gctl_length); 1340 if (ret == -1) { 1341 hio->hio_errors[ncomp] = errno; 1342 reqlog(LOG_WARNING, 0, ggio, 1343 "Local request failed (%s): ", 1344 strerror(errno)); 1345 } else { 1346 hio->hio_errors[ncomp] = 0; 1347 } 1348 break; 1349 case BIO_FLUSH: 1350 if (!res->hr_localflush) { 1351 ret = -1; 1352 errno = EOPNOTSUPP; 1353 break; 1354 } 1355 ret = g_flush(res->hr_localfd); 1356 if (ret == -1) { 1357 if (errno == EOPNOTSUPP) 1358 res->hr_localflush = false; 1359 hio->hio_errors[ncomp] = errno; 1360 reqlog(LOG_WARNING, 0, ggio, 1361 "Local request failed (%s): ", 1362 strerror(errno)); 1363 } else { 1364 hio->hio_errors[ncomp] = 0; 1365 } 1366 break; 1367 } 1368 if (!refcount_release(&hio->hio_countdown)) 1369 continue; 1370 if (ISSYNCREQ(hio)) { 1371 mtx_lock(&sync_lock); 1372 SYNCREQDONE(hio); 1373 mtx_unlock(&sync_lock); 1374 cv_signal(&sync_cond); 1375 } else { 1376 pjdlog_debug(2, 1377 "local_send: (%p) Moving request to the done queue.", 1378 hio); 1379 QUEUE_INSERT2(hio, done); 1380 } 1381 } 1382 /* NOTREACHED */ 1383 return (NULL); 1384 } 1385 1386 static void 1387 keepalive_send(struct hast_resource *res, unsigned int ncomp) 1388 { 1389 struct nv *nv; 1390 1391 rw_rlock(&hio_remote_lock[ncomp]); 1392 1393 if (!ISCONNECTED(res, ncomp)) { 1394 rw_unlock(&hio_remote_lock[ncomp]); 1395 return; 1396 } 1397 1398 PJDLOG_ASSERT(res->hr_remotein != NULL); 1399 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1400 1401 nv = nv_alloc(); 1402 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1403 if (nv_error(nv) != 0) { 1404 rw_unlock(&hio_remote_lock[ncomp]); 1405 nv_free(nv); 1406 pjdlog_debug(1, 1407 "keepalive_send: Unable to prepare header to send."); 1408 return; 1409 } 1410 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1411 rw_unlock(&hio_remote_lock[ncomp]); 1412 pjdlog_common(LOG_DEBUG, 1, errno, 1413 "keepalive_send: Unable to send request"); 1414 nv_free(nv); 1415 remote_close(res, ncomp); 1416 return; 1417 } 1418 1419 rw_unlock(&hio_remote_lock[ncomp]); 1420 nv_free(nv); 1421 pjdlog_debug(2, "keepalive_send: Request sent."); 1422 } 1423 1424 /* 1425 * Thread sends request to secondary node. 1426 */ 1427 static void * 1428 remote_send_thread(void *arg) 1429 { 1430 struct hast_resource *res = arg; 1431 struct g_gate_ctl_io *ggio; 1432 time_t lastcheck, now; 1433 struct hio *hio; 1434 struct nv *nv; 1435 unsigned int ncomp; 1436 bool wakeup; 1437 uint64_t offset, length; 1438 uint8_t cmd; 1439 void *data; 1440 1441 /* Remote component is 1 for now. */ 1442 ncomp = 1; 1443 lastcheck = time(NULL); 1444 1445 for (;;) { 1446 pjdlog_debug(2, "remote_send: Taking request."); 1447 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1448 if (hio == NULL) { 1449 now = time(NULL); 1450 if (lastcheck + HAST_KEEPALIVE <= now) { 1451 keepalive_send(res, ncomp); 1452 lastcheck = now; 1453 } 1454 continue; 1455 } 1456 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1457 ggio = &hio->hio_ggio; 1458 switch (ggio->gctl_cmd) { 1459 case BIO_READ: 1460 cmd = HIO_READ; 1461 data = NULL; 1462 offset = ggio->gctl_offset; 1463 length = ggio->gctl_length; 1464 break; 1465 case BIO_WRITE: 1466 cmd = HIO_WRITE; 1467 data = ggio->gctl_data; 1468 offset = ggio->gctl_offset; 1469 length = ggio->gctl_length; 1470 break; 1471 case BIO_DELETE: 1472 cmd = HIO_DELETE; 1473 data = NULL; 1474 offset = ggio->gctl_offset; 1475 length = ggio->gctl_length; 1476 break; 1477 case BIO_FLUSH: 1478 cmd = HIO_FLUSH; 1479 data = NULL; 1480 offset = 0; 1481 length = 0; 1482 break; 1483 default: 1484 PJDLOG_ABORT("invalid condition"); 1485 } 1486 nv = nv_alloc(); 1487 nv_add_uint8(nv, cmd, "cmd"); 1488 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1489 nv_add_uint64(nv, offset, "offset"); 1490 nv_add_uint64(nv, length, "length"); 1491 if (nv_error(nv) != 0) { 1492 hio->hio_errors[ncomp] = nv_error(nv); 1493 pjdlog_debug(2, 1494 "remote_send: (%p) Unable to prepare header to send.", 1495 hio); 1496 reqlog(LOG_ERR, 0, ggio, 1497 "Unable to prepare header to send (%s): ", 1498 strerror(nv_error(nv))); 1499 /* Move failed request immediately to the done queue. */ 1500 goto done_queue; 1501 } 1502 /* 1503 * Protect connection from disappearing. 1504 */ 1505 rw_rlock(&hio_remote_lock[ncomp]); 1506 if (!ISCONNECTED(res, ncomp)) { 1507 rw_unlock(&hio_remote_lock[ncomp]); 1508 hio->hio_errors[ncomp] = ENOTCONN; 1509 goto done_queue; 1510 } 1511 /* 1512 * Move the request to recv queue before sending it, because 1513 * in different order we can get reply before we move request 1514 * to recv queue. 1515 */ 1516 pjdlog_debug(2, 1517 "remote_send: (%p) Moving request to the recv queue.", 1518 hio); 1519 mtx_lock(&hio_recv_list_lock[ncomp]); 1520 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1521 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1522 mtx_unlock(&hio_recv_list_lock[ncomp]); 1523 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1524 data != NULL ? length : 0) == -1) { 1525 hio->hio_errors[ncomp] = errno; 1526 rw_unlock(&hio_remote_lock[ncomp]); 1527 pjdlog_debug(2, 1528 "remote_send: (%p) Unable to send request.", hio); 1529 reqlog(LOG_ERR, 0, ggio, 1530 "Unable to send request (%s): ", 1531 strerror(hio->hio_errors[ncomp])); 1532 remote_close(res, ncomp); 1533 /* 1534 * Take request back from the receive queue and move 1535 * it immediately to the done queue. 1536 */ 1537 mtx_lock(&hio_recv_list_lock[ncomp]); 1538 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1539 hio_next[ncomp]); 1540 mtx_unlock(&hio_recv_list_lock[ncomp]); 1541 goto done_queue; 1542 } 1543 rw_unlock(&hio_remote_lock[ncomp]); 1544 nv_free(nv); 1545 if (wakeup) 1546 cv_signal(&hio_recv_list_cond[ncomp]); 1547 continue; 1548 done_queue: 1549 nv_free(nv); 1550 if (ISSYNCREQ(hio)) { 1551 if (!refcount_release(&hio->hio_countdown)) 1552 continue; 1553 mtx_lock(&sync_lock); 1554 SYNCREQDONE(hio); 1555 mtx_unlock(&sync_lock); 1556 cv_signal(&sync_cond); 1557 continue; 1558 } 1559 if (ggio->gctl_cmd == BIO_WRITE) { 1560 mtx_lock(&res->hr_amp_lock); 1561 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1562 ggio->gctl_length)) { 1563 (void)hast_activemap_flush(res); 1564 } 1565 mtx_unlock(&res->hr_amp_lock); 1566 } 1567 if (!refcount_release(&hio->hio_countdown)) 1568 continue; 1569 pjdlog_debug(2, 1570 "remote_send: (%p) Moving request to the done queue.", 1571 hio); 1572 QUEUE_INSERT2(hio, done); 1573 } 1574 /* NOTREACHED */ 1575 return (NULL); 1576 } 1577 1578 /* 1579 * Thread receives answer from secondary node and passes it to ggate_send 1580 * thread. 1581 */ 1582 static void * 1583 remote_recv_thread(void *arg) 1584 { 1585 struct hast_resource *res = arg; 1586 struct g_gate_ctl_io *ggio; 1587 struct hio *hio; 1588 struct nv *nv; 1589 unsigned int ncomp; 1590 uint64_t seq; 1591 int error; 1592 1593 /* Remote component is 1 for now. */ 1594 ncomp = 1; 1595 1596 for (;;) { 1597 /* Wait until there is anything to receive. */ 1598 mtx_lock(&hio_recv_list_lock[ncomp]); 1599 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1600 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1601 cv_wait(&hio_recv_list_cond[ncomp], 1602 &hio_recv_list_lock[ncomp]); 1603 } 1604 mtx_unlock(&hio_recv_list_lock[ncomp]); 1605 1606 rw_rlock(&hio_remote_lock[ncomp]); 1607 if (!ISCONNECTED(res, ncomp)) { 1608 rw_unlock(&hio_remote_lock[ncomp]); 1609 /* 1610 * Connection is dead, so move all pending requests to 1611 * the done queue (one-by-one). 1612 */ 1613 mtx_lock(&hio_recv_list_lock[ncomp]); 1614 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1615 PJDLOG_ASSERT(hio != NULL); 1616 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1617 hio_next[ncomp]); 1618 mtx_unlock(&hio_recv_list_lock[ncomp]); 1619 goto done_queue; 1620 } 1621 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1622 pjdlog_errno(LOG_ERR, 1623 "Unable to receive reply header"); 1624 rw_unlock(&hio_remote_lock[ncomp]); 1625 remote_close(res, ncomp); 1626 continue; 1627 } 1628 rw_unlock(&hio_remote_lock[ncomp]); 1629 seq = nv_get_uint64(nv, "seq"); 1630 if (seq == 0) { 1631 pjdlog_error("Header contains no 'seq' field."); 1632 nv_free(nv); 1633 continue; 1634 } 1635 mtx_lock(&hio_recv_list_lock[ncomp]); 1636 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1637 if (hio->hio_ggio.gctl_seq == seq) { 1638 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1639 hio_next[ncomp]); 1640 break; 1641 } 1642 } 1643 mtx_unlock(&hio_recv_list_lock[ncomp]); 1644 if (hio == NULL) { 1645 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1646 (uintmax_t)seq); 1647 nv_free(nv); 1648 continue; 1649 } 1650 ggio = &hio->hio_ggio; 1651 error = nv_get_int16(nv, "error"); 1652 if (error != 0) { 1653 /* Request failed on remote side. */ 1654 hio->hio_errors[ncomp] = error; 1655 reqlog(LOG_WARNING, 0, ggio, 1656 "Remote request failed (%s): ", strerror(error)); 1657 nv_free(nv); 1658 goto done_queue; 1659 } 1660 switch (ggio->gctl_cmd) { 1661 case BIO_READ: 1662 rw_rlock(&hio_remote_lock[ncomp]); 1663 if (!ISCONNECTED(res, ncomp)) { 1664 rw_unlock(&hio_remote_lock[ncomp]); 1665 nv_free(nv); 1666 goto done_queue; 1667 } 1668 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1669 ggio->gctl_data, ggio->gctl_length) == -1) { 1670 hio->hio_errors[ncomp] = errno; 1671 pjdlog_errno(LOG_ERR, 1672 "Unable to receive reply data"); 1673 rw_unlock(&hio_remote_lock[ncomp]); 1674 nv_free(nv); 1675 remote_close(res, ncomp); 1676 goto done_queue; 1677 } 1678 rw_unlock(&hio_remote_lock[ncomp]); 1679 break; 1680 case BIO_WRITE: 1681 case BIO_DELETE: 1682 case BIO_FLUSH: 1683 break; 1684 default: 1685 PJDLOG_ABORT("invalid condition"); 1686 } 1687 hio->hio_errors[ncomp] = 0; 1688 nv_free(nv); 1689 done_queue: 1690 if (!refcount_release(&hio->hio_countdown)) 1691 continue; 1692 if (ISSYNCREQ(hio)) { 1693 mtx_lock(&sync_lock); 1694 SYNCREQDONE(hio); 1695 mtx_unlock(&sync_lock); 1696 cv_signal(&sync_cond); 1697 } else { 1698 pjdlog_debug(2, 1699 "remote_recv: (%p) Moving request to the done queue.", 1700 hio); 1701 QUEUE_INSERT2(hio, done); 1702 } 1703 } 1704 /* NOTREACHED */ 1705 return (NULL); 1706 } 1707 1708 /* 1709 * Thread sends answer to the kernel. 1710 */ 1711 static void * 1712 ggate_send_thread(void *arg) 1713 { 1714 struct hast_resource *res = arg; 1715 struct g_gate_ctl_io *ggio; 1716 struct hio *hio; 1717 unsigned int ii, ncomps; 1718 1719 ncomps = HAST_NCOMPONENTS; 1720 1721 for (;;) { 1722 pjdlog_debug(2, "ggate_send: Taking request."); 1723 QUEUE_TAKE2(hio, done); 1724 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1725 ggio = &hio->hio_ggio; 1726 for (ii = 0; ii < ncomps; ii++) { 1727 if (hio->hio_errors[ii] == 0) { 1728 /* 1729 * One successful request is enough to declare 1730 * success. 1731 */ 1732 ggio->gctl_error = 0; 1733 break; 1734 } 1735 } 1736 if (ii == ncomps) { 1737 /* 1738 * None of the requests were successful. 1739 * Use the error from local component except the 1740 * case when we did only remote request. 1741 */ 1742 if (ggio->gctl_cmd == BIO_READ && 1743 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1744 ggio->gctl_error = hio->hio_errors[1]; 1745 else 1746 ggio->gctl_error = hio->hio_errors[0]; 1747 } 1748 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1749 mtx_lock(&res->hr_amp_lock); 1750 if (activemap_write_complete(res->hr_amp, 1751 ggio->gctl_offset, ggio->gctl_length)) { 1752 res->hr_stat_activemap_update++; 1753 (void)hast_activemap_flush(res); 1754 } 1755 mtx_unlock(&res->hr_amp_lock); 1756 } 1757 if (ggio->gctl_cmd == BIO_WRITE) { 1758 /* 1759 * Unlock range we locked. 1760 */ 1761 mtx_lock(&range_lock); 1762 rangelock_del(range_regular, ggio->gctl_offset, 1763 ggio->gctl_length); 1764 if (range_sync_wait) 1765 cv_signal(&range_sync_cond); 1766 mtx_unlock(&range_lock); 1767 if (!hio->hio_done) 1768 write_complete(res, hio); 1769 } else { 1770 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1771 primary_exit(EX_OSERR, 1772 "G_GATE_CMD_DONE failed"); 1773 } 1774 } 1775 pjdlog_debug(2, 1776 "ggate_send: (%p) Moving request to the free queue.", hio); 1777 QUEUE_INSERT2(hio, free); 1778 } 1779 /* NOTREACHED */ 1780 return (NULL); 1781 } 1782 1783 /* 1784 * Thread synchronize local and remote components. 1785 */ 1786 static void * 1787 sync_thread(void *arg __unused) 1788 { 1789 struct hast_resource *res = arg; 1790 struct hio *hio; 1791 struct g_gate_ctl_io *ggio; 1792 struct timeval tstart, tend, tdiff; 1793 unsigned int ii, ncomp, ncomps; 1794 off_t offset, length, synced; 1795 bool dorewind; 1796 int syncext; 1797 1798 ncomps = HAST_NCOMPONENTS; 1799 dorewind = true; 1800 synced = 0; 1801 offset = -1; 1802 1803 for (;;) { 1804 mtx_lock(&sync_lock); 1805 if (offset >= 0 && !sync_inprogress) { 1806 gettimeofday(&tend, NULL); 1807 timersub(&tend, &tstart, &tdiff); 1808 pjdlog_info("Synchronization interrupted after %#.0T. " 1809 "%NB synchronized so far.", &tdiff, 1810 (intmax_t)synced); 1811 event_send(res, EVENT_SYNCINTR); 1812 } 1813 while (!sync_inprogress) { 1814 dorewind = true; 1815 synced = 0; 1816 cv_wait(&sync_cond, &sync_lock); 1817 } 1818 mtx_unlock(&sync_lock); 1819 /* 1820 * Obtain offset at which we should synchronize. 1821 * Rewind synchronization if needed. 1822 */ 1823 mtx_lock(&res->hr_amp_lock); 1824 if (dorewind) 1825 activemap_sync_rewind(res->hr_amp); 1826 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1827 if (syncext != -1) { 1828 /* 1829 * We synchronized entire syncext extent, we can mark 1830 * it as clean now. 1831 */ 1832 if (activemap_extent_complete(res->hr_amp, syncext)) 1833 (void)hast_activemap_flush(res); 1834 } 1835 mtx_unlock(&res->hr_amp_lock); 1836 if (dorewind) { 1837 dorewind = false; 1838 if (offset == -1) 1839 pjdlog_info("Nodes are in sync."); 1840 else { 1841 pjdlog_info("Synchronization started. %NB to go.", 1842 (intmax_t)(res->hr_extentsize * 1843 activemap_ndirty(res->hr_amp))); 1844 event_send(res, EVENT_SYNCSTART); 1845 gettimeofday(&tstart, NULL); 1846 } 1847 } 1848 if (offset == -1) { 1849 sync_stop(); 1850 pjdlog_debug(1, "Nothing to synchronize."); 1851 /* 1852 * Synchronization complete, make both localcnt and 1853 * remotecnt equal. 1854 */ 1855 ncomp = 1; 1856 rw_rlock(&hio_remote_lock[ncomp]); 1857 if (ISCONNECTED(res, ncomp)) { 1858 if (synced > 0) { 1859 int64_t bps; 1860 1861 gettimeofday(&tend, NULL); 1862 timersub(&tend, &tstart, &tdiff); 1863 bps = (int64_t)((double)synced / 1864 ((double)tdiff.tv_sec + 1865 (double)tdiff.tv_usec / 1000000)); 1866 pjdlog_info("Synchronization complete. " 1867 "%NB synchronized in %#.0lT (%NB/sec).", 1868 (intmax_t)synced, &tdiff, 1869 (intmax_t)bps); 1870 event_send(res, EVENT_SYNCDONE); 1871 } 1872 mtx_lock(&metadata_lock); 1873 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1874 res->hr_primary_localcnt = 1875 res->hr_secondary_remotecnt; 1876 res->hr_primary_remotecnt = 1877 res->hr_secondary_localcnt; 1878 pjdlog_debug(1, 1879 "Setting localcnt to %ju and remotecnt to %ju.", 1880 (uintmax_t)res->hr_primary_localcnt, 1881 (uintmax_t)res->hr_primary_remotecnt); 1882 (void)metadata_write(res); 1883 mtx_unlock(&metadata_lock); 1884 } 1885 rw_unlock(&hio_remote_lock[ncomp]); 1886 continue; 1887 } 1888 pjdlog_debug(2, "sync: Taking free request."); 1889 QUEUE_TAKE2(hio, free); 1890 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1891 /* 1892 * Lock the range we are going to synchronize. We don't want 1893 * race where someone writes between our read and write. 1894 */ 1895 for (;;) { 1896 mtx_lock(&range_lock); 1897 if (rangelock_islocked(range_regular, offset, length)) { 1898 pjdlog_debug(2, 1899 "sync: Range offset=%jd length=%jd locked.", 1900 (intmax_t)offset, (intmax_t)length); 1901 range_sync_wait = true; 1902 cv_wait(&range_sync_cond, &range_lock); 1903 range_sync_wait = false; 1904 mtx_unlock(&range_lock); 1905 continue; 1906 } 1907 if (rangelock_add(range_sync, offset, length) == -1) { 1908 mtx_unlock(&range_lock); 1909 pjdlog_debug(2, 1910 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1911 (intmax_t)offset, (intmax_t)length); 1912 sleep(1); 1913 continue; 1914 } 1915 mtx_unlock(&range_lock); 1916 break; 1917 } 1918 /* 1919 * First read the data from synchronization source. 1920 */ 1921 SYNCREQ(hio); 1922 ggio = &hio->hio_ggio; 1923 ggio->gctl_cmd = BIO_READ; 1924 ggio->gctl_offset = offset; 1925 ggio->gctl_length = length; 1926 ggio->gctl_error = 0; 1927 hio->hio_done = false; 1928 hio->hio_replication = res->hr_replication; 1929 for (ii = 0; ii < ncomps; ii++) 1930 hio->hio_errors[ii] = EINVAL; 1931 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1932 hio); 1933 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1934 hio); 1935 mtx_lock(&metadata_lock); 1936 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1937 /* 1938 * This range is up-to-date on local component, 1939 * so handle request locally. 1940 */ 1941 /* Local component is 0 for now. */ 1942 ncomp = 0; 1943 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1944 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1945 /* 1946 * This range is out-of-date on local component, 1947 * so send request to the remote node. 1948 */ 1949 /* Remote component is 1 for now. */ 1950 ncomp = 1; 1951 } 1952 mtx_unlock(&metadata_lock); 1953 refcount_init(&hio->hio_countdown, 1); 1954 QUEUE_INSERT1(hio, send, ncomp); 1955 1956 /* 1957 * Let's wait for READ to finish. 1958 */ 1959 mtx_lock(&sync_lock); 1960 while (!ISSYNCREQDONE(hio)) 1961 cv_wait(&sync_cond, &sync_lock); 1962 mtx_unlock(&sync_lock); 1963 1964 if (hio->hio_errors[ncomp] != 0) { 1965 pjdlog_error("Unable to read synchronization data: %s.", 1966 strerror(hio->hio_errors[ncomp])); 1967 goto free_queue; 1968 } 1969 1970 /* 1971 * We read the data from synchronization source, now write it 1972 * to synchronization target. 1973 */ 1974 SYNCREQ(hio); 1975 ggio->gctl_cmd = BIO_WRITE; 1976 for (ii = 0; ii < ncomps; ii++) 1977 hio->hio_errors[ii] = EINVAL; 1978 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1979 hio); 1980 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1981 hio); 1982 mtx_lock(&metadata_lock); 1983 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1984 /* 1985 * This range is up-to-date on local component, 1986 * so we update remote component. 1987 */ 1988 /* Remote component is 1 for now. */ 1989 ncomp = 1; 1990 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1991 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1992 /* 1993 * This range is out-of-date on local component, 1994 * so we update it. 1995 */ 1996 /* Local component is 0 for now. */ 1997 ncomp = 0; 1998 } 1999 mtx_unlock(&metadata_lock); 2000 2001 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2002 hio); 2003 refcount_init(&hio->hio_countdown, 1); 2004 QUEUE_INSERT1(hio, send, ncomp); 2005 2006 /* 2007 * Let's wait for WRITE to finish. 2008 */ 2009 mtx_lock(&sync_lock); 2010 while (!ISSYNCREQDONE(hio)) 2011 cv_wait(&sync_cond, &sync_lock); 2012 mtx_unlock(&sync_lock); 2013 2014 if (hio->hio_errors[ncomp] != 0) { 2015 pjdlog_error("Unable to write synchronization data: %s.", 2016 strerror(hio->hio_errors[ncomp])); 2017 goto free_queue; 2018 } 2019 2020 synced += length; 2021 free_queue: 2022 mtx_lock(&range_lock); 2023 rangelock_del(range_sync, offset, length); 2024 if (range_regular_wait) 2025 cv_signal(&range_regular_cond); 2026 mtx_unlock(&range_lock); 2027 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2028 hio); 2029 QUEUE_INSERT2(hio, free); 2030 } 2031 /* NOTREACHED */ 2032 return (NULL); 2033 } 2034 2035 void 2036 primary_config_reload(struct hast_resource *res, struct nv *nv) 2037 { 2038 unsigned int ii, ncomps; 2039 int modified, vint; 2040 const char *vstr; 2041 2042 pjdlog_info("Reloading configuration..."); 2043 2044 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2045 PJDLOG_ASSERT(gres == res); 2046 nv_assert(nv, "remoteaddr"); 2047 nv_assert(nv, "sourceaddr"); 2048 nv_assert(nv, "replication"); 2049 nv_assert(nv, "checksum"); 2050 nv_assert(nv, "compression"); 2051 nv_assert(nv, "timeout"); 2052 nv_assert(nv, "exec"); 2053 nv_assert(nv, "metaflush"); 2054 2055 ncomps = HAST_NCOMPONENTS; 2056 2057 #define MODIFIED_REMOTEADDR 0x01 2058 #define MODIFIED_SOURCEADDR 0x02 2059 #define MODIFIED_REPLICATION 0x04 2060 #define MODIFIED_CHECKSUM 0x08 2061 #define MODIFIED_COMPRESSION 0x10 2062 #define MODIFIED_TIMEOUT 0x20 2063 #define MODIFIED_EXEC 0x40 2064 #define MODIFIED_METAFLUSH 0x80 2065 modified = 0; 2066 2067 vstr = nv_get_string(nv, "remoteaddr"); 2068 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2069 /* 2070 * Don't copy res->hr_remoteaddr to gres just yet. 2071 * We want remote_close() to log disconnect from the old 2072 * addresses, not from the new ones. 2073 */ 2074 modified |= MODIFIED_REMOTEADDR; 2075 } 2076 vstr = nv_get_string(nv, "sourceaddr"); 2077 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2078 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2079 modified |= MODIFIED_SOURCEADDR; 2080 } 2081 vint = nv_get_int32(nv, "replication"); 2082 if (gres->hr_replication != vint) { 2083 gres->hr_replication = vint; 2084 modified |= MODIFIED_REPLICATION; 2085 } 2086 vint = nv_get_int32(nv, "checksum"); 2087 if (gres->hr_checksum != vint) { 2088 gres->hr_checksum = vint; 2089 modified |= MODIFIED_CHECKSUM; 2090 } 2091 vint = nv_get_int32(nv, "compression"); 2092 if (gres->hr_compression != vint) { 2093 gres->hr_compression = vint; 2094 modified |= MODIFIED_COMPRESSION; 2095 } 2096 vint = nv_get_int32(nv, "timeout"); 2097 if (gres->hr_timeout != vint) { 2098 gres->hr_timeout = vint; 2099 modified |= MODIFIED_TIMEOUT; 2100 } 2101 vstr = nv_get_string(nv, "exec"); 2102 if (strcmp(gres->hr_exec, vstr) != 0) { 2103 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2104 modified |= MODIFIED_EXEC; 2105 } 2106 vint = nv_get_int32(nv, "metaflush"); 2107 if (gres->hr_metaflush != vint) { 2108 gres->hr_metaflush = vint; 2109 modified |= MODIFIED_METAFLUSH; 2110 } 2111 2112 /* 2113 * Change timeout for connected sockets. 2114 * Don't bother if we need to reconnect. 2115 */ 2116 if ((modified & MODIFIED_TIMEOUT) != 0 && 2117 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2118 for (ii = 0; ii < ncomps; ii++) { 2119 if (!ISREMOTE(ii)) 2120 continue; 2121 rw_rlock(&hio_remote_lock[ii]); 2122 if (!ISCONNECTED(gres, ii)) { 2123 rw_unlock(&hio_remote_lock[ii]); 2124 continue; 2125 } 2126 rw_unlock(&hio_remote_lock[ii]); 2127 if (proto_timeout(gres->hr_remotein, 2128 gres->hr_timeout) == -1) { 2129 pjdlog_errno(LOG_WARNING, 2130 "Unable to set connection timeout"); 2131 } 2132 if (proto_timeout(gres->hr_remoteout, 2133 gres->hr_timeout) == -1) { 2134 pjdlog_errno(LOG_WARNING, 2135 "Unable to set connection timeout"); 2136 } 2137 } 2138 } 2139 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2140 for (ii = 0; ii < ncomps; ii++) { 2141 if (!ISREMOTE(ii)) 2142 continue; 2143 remote_close(gres, ii); 2144 } 2145 if (modified & MODIFIED_REMOTEADDR) { 2146 vstr = nv_get_string(nv, "remoteaddr"); 2147 strlcpy(gres->hr_remoteaddr, vstr, 2148 sizeof(gres->hr_remoteaddr)); 2149 } 2150 } 2151 #undef MODIFIED_REMOTEADDR 2152 #undef MODIFIED_SOURCEADDR 2153 #undef MODIFIED_REPLICATION 2154 #undef MODIFIED_CHECKSUM 2155 #undef MODIFIED_COMPRESSION 2156 #undef MODIFIED_TIMEOUT 2157 #undef MODIFIED_EXEC 2158 #undef MODIFIED_METAFLUSH 2159 2160 pjdlog_info("Configuration reloaded successfully."); 2161 } 2162 2163 static void 2164 guard_one(struct hast_resource *res, unsigned int ncomp) 2165 { 2166 struct proto_conn *in, *out; 2167 2168 if (!ISREMOTE(ncomp)) 2169 return; 2170 2171 rw_rlock(&hio_remote_lock[ncomp]); 2172 2173 if (!real_remote(res)) { 2174 rw_unlock(&hio_remote_lock[ncomp]); 2175 return; 2176 } 2177 2178 if (ISCONNECTED(res, ncomp)) { 2179 PJDLOG_ASSERT(res->hr_remotein != NULL); 2180 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2181 rw_unlock(&hio_remote_lock[ncomp]); 2182 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2183 res->hr_remoteaddr); 2184 return; 2185 } 2186 2187 PJDLOG_ASSERT(res->hr_remotein == NULL); 2188 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2189 /* 2190 * Upgrade the lock. It doesn't have to be atomic as no other thread 2191 * can change connection status from disconnected to connected. 2192 */ 2193 rw_unlock(&hio_remote_lock[ncomp]); 2194 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2195 res->hr_remoteaddr); 2196 in = out = NULL; 2197 if (init_remote(res, &in, &out) == 0) { 2198 rw_wlock(&hio_remote_lock[ncomp]); 2199 PJDLOG_ASSERT(res->hr_remotein == NULL); 2200 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2201 PJDLOG_ASSERT(in != NULL && out != NULL); 2202 res->hr_remotein = in; 2203 res->hr_remoteout = out; 2204 rw_unlock(&hio_remote_lock[ncomp]); 2205 pjdlog_info("Successfully reconnected to %s.", 2206 res->hr_remoteaddr); 2207 sync_start(); 2208 } else { 2209 /* Both connections should be NULL. */ 2210 PJDLOG_ASSERT(res->hr_remotein == NULL); 2211 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2212 PJDLOG_ASSERT(in == NULL && out == NULL); 2213 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2214 res->hr_remoteaddr); 2215 } 2216 } 2217 2218 /* 2219 * Thread guards remote connections and reconnects when needed, handles 2220 * signals, etc. 2221 */ 2222 static void * 2223 guard_thread(void *arg) 2224 { 2225 struct hast_resource *res = arg; 2226 unsigned int ii, ncomps; 2227 struct timespec timeout; 2228 time_t lastcheck, now; 2229 sigset_t mask; 2230 int signo; 2231 2232 ncomps = HAST_NCOMPONENTS; 2233 lastcheck = time(NULL); 2234 2235 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2236 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2237 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2238 2239 timeout.tv_sec = HAST_KEEPALIVE; 2240 timeout.tv_nsec = 0; 2241 signo = -1; 2242 2243 for (;;) { 2244 switch (signo) { 2245 case SIGINT: 2246 case SIGTERM: 2247 sigexit_received = true; 2248 primary_exitx(EX_OK, 2249 "Termination signal received, exiting."); 2250 break; 2251 default: 2252 break; 2253 } 2254 2255 /* 2256 * Don't check connections until we fully started, 2257 * as we may still be looping, waiting for remote node 2258 * to switch from primary to secondary. 2259 */ 2260 if (fullystarted) { 2261 pjdlog_debug(2, "remote_guard: Checking connections."); 2262 now = time(NULL); 2263 if (lastcheck + HAST_KEEPALIVE <= now) { 2264 for (ii = 0; ii < ncomps; ii++) 2265 guard_one(res, ii); 2266 lastcheck = now; 2267 } 2268 } 2269 signo = sigtimedwait(&mask, NULL, &timeout); 2270 } 2271 /* NOTREACHED */ 2272 return (NULL); 2273 } 2274