1 /*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/types.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/refcount.h> 39 #include <sys/stat.h> 40 41 #include <geom/gate/g_gate.h> 42 43 #include <err.h> 44 #include <errno.h> 45 #include <fcntl.h> 46 #include <libgeom.h> 47 #include <pthread.h> 48 #include <signal.h> 49 #include <stdint.h> 50 #include <stdio.h> 51 #include <string.h> 52 #include <sysexits.h> 53 #include <unistd.h> 54 55 #include <activemap.h> 56 #include <nv.h> 57 #include <rangelock.h> 58 59 #include "control.h" 60 #include "event.h" 61 #include "hast.h" 62 #include "hast_proto.h" 63 #include "hastd.h" 64 #include "hooks.h" 65 #include "metadata.h" 66 #include "proto.h" 67 #include "pjdlog.h" 68 #include "subr.h" 69 #include "synch.h" 70 71 /* The is only one remote component for now. */ 72 #define ISREMOTE(no) ((no) == 1) 73 74 struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 unsigned int hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 /* 93 * Request was already confirmed to GEOM Gate. 94 */ 95 bool hio_done; 96 /* 97 * Remember replication from the time the request was initiated, 98 * so we won't get confused when replication changes on reload. 99 */ 100 int hio_replication; 101 TAILQ_ENTRY(hio) *hio_next; 102 }; 103 #define hio_free_next hio_next[0] 104 #define hio_done_next hio_next[0] 105 106 /* 107 * Free list holds unused structures. When free list is empty, we have to wait 108 * until some in-progress requests are freed. 109 */ 110 static TAILQ_HEAD(, hio) hio_free_list; 111 static pthread_mutex_t hio_free_list_lock; 112 static pthread_cond_t hio_free_list_cond; 113 /* 114 * There is one send list for every component. One requests is placed on all 115 * send lists - each component gets the same request, but each component is 116 * responsible for managing his own send list. 117 */ 118 static TAILQ_HEAD(, hio) *hio_send_list; 119 static pthread_mutex_t *hio_send_list_lock; 120 static pthread_cond_t *hio_send_list_cond; 121 /* 122 * There is one recv list for every component, although local components don't 123 * use recv lists as local requests are done synchronously. 124 */ 125 static TAILQ_HEAD(, hio) *hio_recv_list; 126 static pthread_mutex_t *hio_recv_list_lock; 127 static pthread_cond_t *hio_recv_list_cond; 128 /* 129 * Request is placed on done list by the slowest component (the one that 130 * decreased hio_countdown from 1 to 0). 131 */ 132 static TAILQ_HEAD(, hio) hio_done_list; 133 static pthread_mutex_t hio_done_list_lock; 134 static pthread_cond_t hio_done_list_cond; 135 /* 136 * Structure below are for interaction with sync thread. 137 */ 138 static bool sync_inprogress; 139 static pthread_mutex_t sync_lock; 140 static pthread_cond_t sync_cond; 141 /* 142 * The lock below allows to synchornize access to remote connections. 143 */ 144 static pthread_rwlock_t *hio_remote_lock; 145 146 /* 147 * Lock to synchronize metadata updates. Also synchronize access to 148 * hr_primary_localcnt and hr_primary_remotecnt fields. 149 */ 150 static pthread_mutex_t metadata_lock; 151 152 /* 153 * Maximum number of outstanding I/O requests. 154 */ 155 #define HAST_HIO_MAX 256 156 /* 157 * Number of components. At this point there are only two components: local 158 * and remote, but in the future it might be possible to use multiple local 159 * and remote components. 160 */ 161 #define HAST_NCOMPONENTS 2 162 163 #define ISCONNECTED(res, no) \ 164 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 165 166 #define QUEUE_INSERT1(hio, name, ncomp) do { \ 167 bool _wakeup; \ 168 \ 169 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 170 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 171 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 172 hio_next[(ncomp)]); \ 173 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 174 if (_wakeup) \ 175 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 176 } while (0) 177 #define QUEUE_INSERT2(hio, name) do { \ 178 bool _wakeup; \ 179 \ 180 mtx_lock(&hio_##name##_list_lock); \ 181 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 182 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 183 mtx_unlock(&hio_##name##_list_lock); \ 184 if (_wakeup) \ 185 cv_signal(&hio_##name##_list_cond); \ 186 } while (0) 187 #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 188 bool _last; \ 189 \ 190 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 191 _last = false; \ 192 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 193 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 194 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 195 if ((timeout) != 0) \ 196 _last = true; \ 197 } \ 198 if (hio != NULL) { \ 199 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 200 hio_next[(ncomp)]); \ 201 } \ 202 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 203 } while (0) 204 #define QUEUE_TAKE2(hio, name) do { \ 205 mtx_lock(&hio_##name##_list_lock); \ 206 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 207 cv_wait(&hio_##name##_list_cond, \ 208 &hio_##name##_list_lock); \ 209 } \ 210 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 211 mtx_unlock(&hio_##name##_list_lock); \ 212 } while (0) 213 214 #define SYNCREQ(hio) do { \ 215 (hio)->hio_ggio.gctl_unit = -1; \ 216 (hio)->hio_ggio.gctl_seq = 1; \ 217 } while (0) 218 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 219 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 220 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 221 222 static struct hast_resource *gres; 223 224 static pthread_mutex_t range_lock; 225 static struct rangelocks *range_regular; 226 static bool range_regular_wait; 227 static pthread_cond_t range_regular_cond; 228 static struct rangelocks *range_sync; 229 static bool range_sync_wait; 230 static pthread_cond_t range_sync_cond; 231 static bool fullystarted; 232 233 static void *ggate_recv_thread(void *arg); 234 static void *local_send_thread(void *arg); 235 static void *remote_send_thread(void *arg); 236 static void *remote_recv_thread(void *arg); 237 static void *ggate_send_thread(void *arg); 238 static void *sync_thread(void *arg); 239 static void *guard_thread(void *arg); 240 241 static void 242 cleanup(struct hast_resource *res) 243 { 244 int rerrno; 245 246 /* Remember errno. */ 247 rerrno = errno; 248 249 /* Destroy ggate provider if we created one. */ 250 if (res->hr_ggateunit >= 0) { 251 struct g_gate_ctl_destroy ggiod; 252 253 bzero(&ggiod, sizeof(ggiod)); 254 ggiod.gctl_version = G_GATE_VERSION; 255 ggiod.gctl_unit = res->hr_ggateunit; 256 ggiod.gctl_force = 1; 257 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 258 pjdlog_errno(LOG_WARNING, 259 "Unable to destroy hast/%s device", 260 res->hr_provname); 261 } 262 res->hr_ggateunit = -1; 263 } 264 265 /* Restore errno. */ 266 errno = rerrno; 267 } 268 269 static __dead2 void 270 primary_exit(int exitcode, const char *fmt, ...) 271 { 272 va_list ap; 273 274 PJDLOG_ASSERT(exitcode != EX_OK); 275 va_start(ap, fmt); 276 pjdlogv_errno(LOG_ERR, fmt, ap); 277 va_end(ap); 278 cleanup(gres); 279 exit(exitcode); 280 } 281 282 static __dead2 void 283 primary_exitx(int exitcode, const char *fmt, ...) 284 { 285 va_list ap; 286 287 va_start(ap, fmt); 288 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 289 va_end(ap); 290 cleanup(gres); 291 exit(exitcode); 292 } 293 294 static int 295 hast_activemap_flush(struct hast_resource *res) 296 { 297 const unsigned char *buf; 298 size_t size; 299 300 buf = activemap_bitmap(res->hr_amp, &size); 301 PJDLOG_ASSERT(buf != NULL); 302 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 303 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 304 (ssize_t)size) { 305 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 306 return (-1); 307 } 308 if (res->hr_metaflush == 1 && g_flush(res->hr_localfd) == -1) { 309 if (errno == EOPNOTSUPP) { 310 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 311 res->hr_localpath); 312 res->hr_metaflush = 0; 313 } else { 314 pjdlog_errno(LOG_ERR, 315 "Unable to flush disk cache on activemap update"); 316 return (-1); 317 } 318 } 319 return (0); 320 } 321 322 static bool 323 real_remote(const struct hast_resource *res) 324 { 325 326 return (strcmp(res->hr_remoteaddr, "none") != 0); 327 } 328 329 static void 330 init_environment(struct hast_resource *res __unused) 331 { 332 struct hio *hio; 333 unsigned int ii, ncomps; 334 335 /* 336 * In the future it might be per-resource value. 337 */ 338 ncomps = HAST_NCOMPONENTS; 339 340 /* 341 * Allocate memory needed by lists. 342 */ 343 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 344 if (hio_send_list == NULL) { 345 primary_exitx(EX_TEMPFAIL, 346 "Unable to allocate %zu bytes of memory for send lists.", 347 sizeof(hio_send_list[0]) * ncomps); 348 } 349 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 350 if (hio_send_list_lock == NULL) { 351 primary_exitx(EX_TEMPFAIL, 352 "Unable to allocate %zu bytes of memory for send list locks.", 353 sizeof(hio_send_list_lock[0]) * ncomps); 354 } 355 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 356 if (hio_send_list_cond == NULL) { 357 primary_exitx(EX_TEMPFAIL, 358 "Unable to allocate %zu bytes of memory for send list condition variables.", 359 sizeof(hio_send_list_cond[0]) * ncomps); 360 } 361 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 362 if (hio_recv_list == NULL) { 363 primary_exitx(EX_TEMPFAIL, 364 "Unable to allocate %zu bytes of memory for recv lists.", 365 sizeof(hio_recv_list[0]) * ncomps); 366 } 367 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 368 if (hio_recv_list_lock == NULL) { 369 primary_exitx(EX_TEMPFAIL, 370 "Unable to allocate %zu bytes of memory for recv list locks.", 371 sizeof(hio_recv_list_lock[0]) * ncomps); 372 } 373 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 374 if (hio_recv_list_cond == NULL) { 375 primary_exitx(EX_TEMPFAIL, 376 "Unable to allocate %zu bytes of memory for recv list condition variables.", 377 sizeof(hio_recv_list_cond[0]) * ncomps); 378 } 379 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 380 if (hio_remote_lock == NULL) { 381 primary_exitx(EX_TEMPFAIL, 382 "Unable to allocate %zu bytes of memory for remote connections locks.", 383 sizeof(hio_remote_lock[0]) * ncomps); 384 } 385 386 /* 387 * Initialize lists, their locks and theirs condition variables. 388 */ 389 TAILQ_INIT(&hio_free_list); 390 mtx_init(&hio_free_list_lock); 391 cv_init(&hio_free_list_cond); 392 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 393 TAILQ_INIT(&hio_send_list[ii]); 394 mtx_init(&hio_send_list_lock[ii]); 395 cv_init(&hio_send_list_cond[ii]); 396 TAILQ_INIT(&hio_recv_list[ii]); 397 mtx_init(&hio_recv_list_lock[ii]); 398 cv_init(&hio_recv_list_cond[ii]); 399 rw_init(&hio_remote_lock[ii]); 400 } 401 TAILQ_INIT(&hio_done_list); 402 mtx_init(&hio_done_list_lock); 403 cv_init(&hio_done_list_cond); 404 mtx_init(&metadata_lock); 405 406 /* 407 * Allocate requests pool and initialize requests. 408 */ 409 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 410 hio = malloc(sizeof(*hio)); 411 if (hio == NULL) { 412 primary_exitx(EX_TEMPFAIL, 413 "Unable to allocate %zu bytes of memory for hio request.", 414 sizeof(*hio)); 415 } 416 hio->hio_countdown = 0; 417 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 418 if (hio->hio_errors == NULL) { 419 primary_exitx(EX_TEMPFAIL, 420 "Unable allocate %zu bytes of memory for hio errors.", 421 sizeof(hio->hio_errors[0]) * ncomps); 422 } 423 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 424 if (hio->hio_next == NULL) { 425 primary_exitx(EX_TEMPFAIL, 426 "Unable allocate %zu bytes of memory for hio_next field.", 427 sizeof(hio->hio_next[0]) * ncomps); 428 } 429 hio->hio_ggio.gctl_version = G_GATE_VERSION; 430 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 431 if (hio->hio_ggio.gctl_data == NULL) { 432 primary_exitx(EX_TEMPFAIL, 433 "Unable to allocate %zu bytes of memory for gctl_data.", 434 MAXPHYS); 435 } 436 hio->hio_ggio.gctl_length = MAXPHYS; 437 hio->hio_ggio.gctl_error = 0; 438 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 439 } 440 } 441 442 static bool 443 init_resuid(struct hast_resource *res) 444 { 445 446 mtx_lock(&metadata_lock); 447 if (res->hr_resuid != 0) { 448 mtx_unlock(&metadata_lock); 449 return (false); 450 } else { 451 /* Initialize unique resource identifier. */ 452 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 453 mtx_unlock(&metadata_lock); 454 if (metadata_write(res) == -1) 455 exit(EX_NOINPUT); 456 return (true); 457 } 458 } 459 460 static void 461 init_local(struct hast_resource *res) 462 { 463 unsigned char *buf; 464 size_t mapsize; 465 466 if (metadata_read(res, true) == -1) 467 exit(EX_NOINPUT); 468 mtx_init(&res->hr_amp_lock); 469 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 470 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 471 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 472 } 473 mtx_init(&range_lock); 474 cv_init(&range_regular_cond); 475 if (rangelock_init(&range_regular) == -1) 476 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 477 cv_init(&range_sync_cond); 478 if (rangelock_init(&range_sync) == -1) 479 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 480 mapsize = activemap_ondisk_size(res->hr_amp); 481 buf = calloc(1, mapsize); 482 if (buf == NULL) { 483 primary_exitx(EX_TEMPFAIL, 484 "Unable to allocate buffer for activemap."); 485 } 486 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 487 (ssize_t)mapsize) { 488 primary_exit(EX_NOINPUT, "Unable to read activemap"); 489 } 490 activemap_copyin(res->hr_amp, buf, mapsize); 491 free(buf); 492 if (res->hr_resuid != 0) 493 return; 494 /* 495 * We're using provider for the first time. Initialize local and remote 496 * counters. We don't initialize resuid here, as we want to do it just 497 * in time. The reason for this is that we want to inform secondary 498 * that there were no writes yet, so there is no need to synchronize 499 * anything. 500 */ 501 res->hr_primary_localcnt = 0; 502 res->hr_primary_remotecnt = 0; 503 if (metadata_write(res) == -1) 504 exit(EX_NOINPUT); 505 } 506 507 static int 508 primary_connect(struct hast_resource *res, struct proto_conn **connp) 509 { 510 struct proto_conn *conn; 511 int16_t val; 512 513 val = 1; 514 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 515 primary_exit(EX_TEMPFAIL, 516 "Unable to send connection request to parent"); 517 } 518 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 519 primary_exit(EX_TEMPFAIL, 520 "Unable to receive reply to connection request from parent"); 521 } 522 if (val != 0) { 523 errno = val; 524 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 525 res->hr_remoteaddr); 526 return (-1); 527 } 528 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 529 primary_exit(EX_TEMPFAIL, 530 "Unable to receive connection from parent"); 531 } 532 if (proto_connect_wait(conn, res->hr_timeout) == -1) { 533 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 534 res->hr_remoteaddr); 535 proto_close(conn); 536 return (-1); 537 } 538 /* Error in setting timeout is not critical, but why should it fail? */ 539 if (proto_timeout(conn, res->hr_timeout) == -1) 540 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 541 542 *connp = conn; 543 544 return (0); 545 } 546 547 static int 548 init_remote(struct hast_resource *res, struct proto_conn **inp, 549 struct proto_conn **outp) 550 { 551 struct proto_conn *in, *out; 552 struct nv *nvout, *nvin; 553 const unsigned char *token; 554 unsigned char *map; 555 const char *errmsg; 556 int32_t extentsize; 557 int64_t datasize; 558 uint32_t mapsize; 559 size_t size; 560 int error; 561 562 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 563 PJDLOG_ASSERT(real_remote(res)); 564 565 in = out = NULL; 566 errmsg = NULL; 567 568 if (primary_connect(res, &out) == -1) 569 return (ECONNREFUSED); 570 571 error = ECONNABORTED; 572 573 /* 574 * First handshake step. 575 * Setup outgoing connection with remote node. 576 */ 577 nvout = nv_alloc(); 578 nv_add_string(nvout, res->hr_name, "resource"); 579 if (nv_error(nvout) != 0) { 580 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 581 "Unable to allocate header for connection with %s", 582 res->hr_remoteaddr); 583 nv_free(nvout); 584 goto close; 585 } 586 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 587 pjdlog_errno(LOG_WARNING, 588 "Unable to send handshake header to %s", 589 res->hr_remoteaddr); 590 nv_free(nvout); 591 goto close; 592 } 593 nv_free(nvout); 594 if (hast_proto_recv_hdr(out, &nvin) == -1) { 595 pjdlog_errno(LOG_WARNING, 596 "Unable to receive handshake header from %s", 597 res->hr_remoteaddr); 598 goto close; 599 } 600 errmsg = nv_get_string(nvin, "errmsg"); 601 if (errmsg != NULL) { 602 pjdlog_warning("%s", errmsg); 603 if (nv_exists(nvin, "wait")) 604 error = EBUSY; 605 nv_free(nvin); 606 goto close; 607 } 608 token = nv_get_uint8_array(nvin, &size, "token"); 609 if (token == NULL) { 610 pjdlog_warning("Handshake header from %s has no 'token' field.", 611 res->hr_remoteaddr); 612 nv_free(nvin); 613 goto close; 614 } 615 if (size != sizeof(res->hr_token)) { 616 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 617 res->hr_remoteaddr, size, sizeof(res->hr_token)); 618 nv_free(nvin); 619 goto close; 620 } 621 bcopy(token, res->hr_token, sizeof(res->hr_token)); 622 nv_free(nvin); 623 624 /* 625 * Second handshake step. 626 * Setup incoming connection with remote node. 627 */ 628 if (primary_connect(res, &in) == -1) 629 goto close; 630 631 nvout = nv_alloc(); 632 nv_add_string(nvout, res->hr_name, "resource"); 633 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 634 "token"); 635 if (res->hr_resuid == 0) { 636 /* 637 * The resuid field was not yet initialized. 638 * Because we do synchronization inside init_resuid(), it is 639 * possible that someone already initialized it, the function 640 * will return false then, but if we successfully initialized 641 * it, we will get true. True means that there were no writes 642 * to this resource yet and we want to inform secondary that 643 * synchronization is not needed by sending "virgin" argument. 644 */ 645 if (init_resuid(res)) 646 nv_add_int8(nvout, 1, "virgin"); 647 } 648 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 649 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 650 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 651 if (nv_error(nvout) != 0) { 652 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 653 "Unable to allocate header for connection with %s", 654 res->hr_remoteaddr); 655 nv_free(nvout); 656 goto close; 657 } 658 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 659 pjdlog_errno(LOG_WARNING, 660 "Unable to send handshake header to %s", 661 res->hr_remoteaddr); 662 nv_free(nvout); 663 goto close; 664 } 665 nv_free(nvout); 666 if (hast_proto_recv_hdr(out, &nvin) == -1) { 667 pjdlog_errno(LOG_WARNING, 668 "Unable to receive handshake header from %s", 669 res->hr_remoteaddr); 670 goto close; 671 } 672 errmsg = nv_get_string(nvin, "errmsg"); 673 if (errmsg != NULL) { 674 pjdlog_warning("%s", errmsg); 675 nv_free(nvin); 676 goto close; 677 } 678 datasize = nv_get_int64(nvin, "datasize"); 679 if (datasize != res->hr_datasize) { 680 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 681 (intmax_t)res->hr_datasize, (intmax_t)datasize); 682 nv_free(nvin); 683 goto close; 684 } 685 extentsize = nv_get_int32(nvin, "extentsize"); 686 if (extentsize != res->hr_extentsize) { 687 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 688 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 689 nv_free(nvin); 690 goto close; 691 } 692 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 693 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 694 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 695 if (nv_exists(nvin, "virgin")) { 696 /* 697 * Secondary was reinitialized, bump localcnt if it is 0 as 698 * only we have the data. 699 */ 700 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 701 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 702 703 if (res->hr_primary_localcnt == 0) { 704 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 705 706 mtx_lock(&metadata_lock); 707 res->hr_primary_localcnt++; 708 pjdlog_debug(1, "Increasing localcnt to %ju.", 709 (uintmax_t)res->hr_primary_localcnt); 710 (void)metadata_write(res); 711 mtx_unlock(&metadata_lock); 712 } 713 } 714 map = NULL; 715 mapsize = nv_get_uint32(nvin, "mapsize"); 716 if (mapsize > 0) { 717 map = malloc(mapsize); 718 if (map == NULL) { 719 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 720 (uintmax_t)mapsize); 721 nv_free(nvin); 722 goto close; 723 } 724 /* 725 * Remote node have some dirty extents on its own, lets 726 * download its activemap. 727 */ 728 if (hast_proto_recv_data(res, out, nvin, map, 729 mapsize) == -1) { 730 pjdlog_errno(LOG_ERR, 731 "Unable to receive remote activemap"); 732 nv_free(nvin); 733 free(map); 734 goto close; 735 } 736 /* 737 * Merge local and remote bitmaps. 738 */ 739 activemap_merge(res->hr_amp, map, mapsize); 740 free(map); 741 /* 742 * Now that we merged bitmaps from both nodes, flush it to the 743 * disk before we start to synchronize. 744 */ 745 (void)hast_activemap_flush(res); 746 } 747 nv_free(nvin); 748 #ifdef notyet 749 /* Setup directions. */ 750 if (proto_send(out, NULL, 0) == -1) 751 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 752 if (proto_recv(in, NULL, 0) == -1) 753 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 754 #endif 755 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 756 if (inp != NULL && outp != NULL) { 757 *inp = in; 758 *outp = out; 759 } else { 760 res->hr_remotein = in; 761 res->hr_remoteout = out; 762 } 763 event_send(res, EVENT_CONNECT); 764 return (0); 765 close: 766 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 767 event_send(res, EVENT_SPLITBRAIN); 768 proto_close(out); 769 if (in != NULL) 770 proto_close(in); 771 return (error); 772 } 773 774 static void 775 sync_start(void) 776 { 777 778 mtx_lock(&sync_lock); 779 sync_inprogress = true; 780 mtx_unlock(&sync_lock); 781 cv_signal(&sync_cond); 782 } 783 784 static void 785 sync_stop(void) 786 { 787 788 mtx_lock(&sync_lock); 789 if (sync_inprogress) 790 sync_inprogress = false; 791 mtx_unlock(&sync_lock); 792 } 793 794 static void 795 init_ggate(struct hast_resource *res) 796 { 797 struct g_gate_ctl_create ggiocreate; 798 struct g_gate_ctl_cancel ggiocancel; 799 800 /* 801 * We communicate with ggate via /dev/ggctl. Open it. 802 */ 803 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 804 if (res->hr_ggatefd == -1) 805 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 806 /* 807 * Create provider before trying to connect, as connection failure 808 * is not critical, but may take some time. 809 */ 810 bzero(&ggiocreate, sizeof(ggiocreate)); 811 ggiocreate.gctl_version = G_GATE_VERSION; 812 ggiocreate.gctl_mediasize = res->hr_datasize; 813 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 814 ggiocreate.gctl_flags = 0; 815 ggiocreate.gctl_maxcount = 0; 816 ggiocreate.gctl_timeout = 0; 817 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 818 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 819 res->hr_provname); 820 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 821 pjdlog_info("Device hast/%s created.", res->hr_provname); 822 res->hr_ggateunit = ggiocreate.gctl_unit; 823 return; 824 } 825 if (errno != EEXIST) { 826 primary_exit(EX_OSERR, "Unable to create hast/%s device", 827 res->hr_provname); 828 } 829 pjdlog_debug(1, 830 "Device hast/%s already exists, we will try to take it over.", 831 res->hr_provname); 832 /* 833 * If we received EEXIST, we assume that the process who created the 834 * provider died and didn't clean up. In that case we will start from 835 * where he left of. 836 */ 837 bzero(&ggiocancel, sizeof(ggiocancel)); 838 ggiocancel.gctl_version = G_GATE_VERSION; 839 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 840 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 841 res->hr_provname); 842 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 843 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 844 res->hr_ggateunit = ggiocancel.gctl_unit; 845 return; 846 } 847 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 848 res->hr_provname); 849 } 850 851 void 852 hastd_primary(struct hast_resource *res) 853 { 854 pthread_t td; 855 pid_t pid; 856 int error, mode, debuglevel; 857 858 /* 859 * Create communication channel for sending control commands from 860 * parent to child. 861 */ 862 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 863 /* TODO: There's no need for this to be fatal error. */ 864 KEEP_ERRNO((void)pidfile_remove(pfh)); 865 pjdlog_exit(EX_OSERR, 866 "Unable to create control sockets between parent and child"); 867 } 868 /* 869 * Create communication channel for sending events from child to parent. 870 */ 871 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 872 /* TODO: There's no need for this to be fatal error. */ 873 KEEP_ERRNO((void)pidfile_remove(pfh)); 874 pjdlog_exit(EX_OSERR, 875 "Unable to create event sockets between child and parent"); 876 } 877 /* 878 * Create communication channel for sending connection requests from 879 * child to parent. 880 */ 881 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 882 /* TODO: There's no need for this to be fatal error. */ 883 KEEP_ERRNO((void)pidfile_remove(pfh)); 884 pjdlog_exit(EX_OSERR, 885 "Unable to create connection sockets between child and parent"); 886 } 887 888 pid = fork(); 889 if (pid == -1) { 890 /* TODO: There's no need for this to be fatal error. */ 891 KEEP_ERRNO((void)pidfile_remove(pfh)); 892 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 893 } 894 895 if (pid > 0) { 896 /* This is parent. */ 897 /* Declare that we are receiver. */ 898 proto_recv(res->hr_event, NULL, 0); 899 proto_recv(res->hr_conn, NULL, 0); 900 /* Declare that we are sender. */ 901 proto_send(res->hr_ctrl, NULL, 0); 902 res->hr_workerpid = pid; 903 return; 904 } 905 906 gres = res; 907 mode = pjdlog_mode_get(); 908 debuglevel = pjdlog_debug_get(); 909 910 /* Declare that we are sender. */ 911 proto_send(res->hr_event, NULL, 0); 912 proto_send(res->hr_conn, NULL, 0); 913 /* Declare that we are receiver. */ 914 proto_recv(res->hr_ctrl, NULL, 0); 915 descriptors_cleanup(res); 916 917 descriptors_assert(res, mode); 918 919 pjdlog_init(mode); 920 pjdlog_debug_set(debuglevel); 921 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 922 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 923 924 init_local(res); 925 init_ggate(res); 926 init_environment(res); 927 928 if (drop_privs(res) != 0) { 929 cleanup(res); 930 exit(EX_CONFIG); 931 } 932 pjdlog_info("Privileges successfully dropped."); 933 934 /* 935 * Create the guard thread first, so we can handle signals from the 936 * very beginning. 937 */ 938 error = pthread_create(&td, NULL, guard_thread, res); 939 PJDLOG_ASSERT(error == 0); 940 /* 941 * Create the control thread before sending any event to the parent, 942 * as we can deadlock when parent sends control request to worker, 943 * but worker has no control thread started yet, so parent waits. 944 * In the meantime worker sends an event to the parent, but parent 945 * is unable to handle the event, because it waits for control 946 * request response. 947 */ 948 error = pthread_create(&td, NULL, ctrl_thread, res); 949 PJDLOG_ASSERT(error == 0); 950 if (real_remote(res)) { 951 error = init_remote(res, NULL, NULL); 952 if (error == 0) { 953 sync_start(); 954 } else if (error == EBUSY) { 955 time_t start = time(NULL); 956 957 pjdlog_warning("Waiting for remote node to become %s for %ds.", 958 role2str(HAST_ROLE_SECONDARY), 959 res->hr_timeout); 960 for (;;) { 961 sleep(1); 962 error = init_remote(res, NULL, NULL); 963 if (error != EBUSY) 964 break; 965 if (time(NULL) > start + res->hr_timeout) 966 break; 967 } 968 if (error == EBUSY) { 969 pjdlog_warning("Remote node is still %s, starting anyway.", 970 role2str(HAST_ROLE_PRIMARY)); 971 } 972 } 973 } 974 error = pthread_create(&td, NULL, ggate_recv_thread, res); 975 PJDLOG_ASSERT(error == 0); 976 error = pthread_create(&td, NULL, local_send_thread, res); 977 PJDLOG_ASSERT(error == 0); 978 error = pthread_create(&td, NULL, remote_send_thread, res); 979 PJDLOG_ASSERT(error == 0); 980 error = pthread_create(&td, NULL, remote_recv_thread, res); 981 PJDLOG_ASSERT(error == 0); 982 error = pthread_create(&td, NULL, ggate_send_thread, res); 983 PJDLOG_ASSERT(error == 0); 984 fullystarted = true; 985 (void)sync_thread(res); 986 } 987 988 static void 989 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 990 { 991 char msg[1024]; 992 va_list ap; 993 994 va_start(ap, fmt); 995 (void)vsnprintf(msg, sizeof(msg), fmt, ap); 996 va_end(ap); 997 switch (ggio->gctl_cmd) { 998 case BIO_READ: 999 (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1000 (uintmax_t)ggio->gctl_offset, 1001 (uintmax_t)ggio->gctl_length); 1002 break; 1003 case BIO_DELETE: 1004 (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1005 (uintmax_t)ggio->gctl_offset, 1006 (uintmax_t)ggio->gctl_length); 1007 break; 1008 case BIO_FLUSH: 1009 (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1010 break; 1011 case BIO_WRITE: 1012 (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1013 (uintmax_t)ggio->gctl_offset, 1014 (uintmax_t)ggio->gctl_length); 1015 break; 1016 default: 1017 (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1018 (unsigned int)ggio->gctl_cmd); 1019 break; 1020 } 1021 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1022 } 1023 1024 static void 1025 remote_close(struct hast_resource *res, int ncomp) 1026 { 1027 1028 rw_wlock(&hio_remote_lock[ncomp]); 1029 /* 1030 * Check for a race between dropping rlock and acquiring wlock - 1031 * another thread can close connection in-between. 1032 */ 1033 if (!ISCONNECTED(res, ncomp)) { 1034 PJDLOG_ASSERT(res->hr_remotein == NULL); 1035 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1036 rw_unlock(&hio_remote_lock[ncomp]); 1037 return; 1038 } 1039 1040 PJDLOG_ASSERT(res->hr_remotein != NULL); 1041 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1042 1043 pjdlog_debug(2, "Closing incoming connection to %s.", 1044 res->hr_remoteaddr); 1045 proto_close(res->hr_remotein); 1046 res->hr_remotein = NULL; 1047 pjdlog_debug(2, "Closing outgoing connection to %s.", 1048 res->hr_remoteaddr); 1049 proto_close(res->hr_remoteout); 1050 res->hr_remoteout = NULL; 1051 1052 rw_unlock(&hio_remote_lock[ncomp]); 1053 1054 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1055 1056 /* 1057 * Stop synchronization if in-progress. 1058 */ 1059 sync_stop(); 1060 1061 event_send(res, EVENT_DISCONNECT); 1062 } 1063 1064 /* 1065 * Acknowledge write completion to the kernel, but don't update activemap yet. 1066 */ 1067 static void 1068 write_complete(struct hast_resource *res, struct hio *hio) 1069 { 1070 struct g_gate_ctl_io *ggio; 1071 unsigned int ncomp; 1072 1073 PJDLOG_ASSERT(!hio->hio_done); 1074 1075 ggio = &hio->hio_ggio; 1076 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1077 1078 /* 1079 * Bump local count if this is first write after 1080 * connection failure with remote node. 1081 */ 1082 ncomp = 1; 1083 rw_rlock(&hio_remote_lock[ncomp]); 1084 if (!ISCONNECTED(res, ncomp)) { 1085 mtx_lock(&metadata_lock); 1086 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1087 res->hr_primary_localcnt++; 1088 pjdlog_debug(1, "Increasing localcnt to %ju.", 1089 (uintmax_t)res->hr_primary_localcnt); 1090 (void)metadata_write(res); 1091 } 1092 mtx_unlock(&metadata_lock); 1093 } 1094 rw_unlock(&hio_remote_lock[ncomp]); 1095 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1096 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1097 hio->hio_done = true; 1098 } 1099 1100 /* 1101 * Thread receives ggate I/O requests from the kernel and passes them to 1102 * appropriate threads: 1103 * WRITE - always goes to both local_send and remote_send threads 1104 * READ (when the block is up-to-date on local component) - 1105 * only local_send thread 1106 * READ (when the block isn't up-to-date on local component) - 1107 * only remote_send thread 1108 * DELETE - always goes to both local_send and remote_send threads 1109 * FLUSH - always goes to both local_send and remote_send threads 1110 */ 1111 static void * 1112 ggate_recv_thread(void *arg) 1113 { 1114 struct hast_resource *res = arg; 1115 struct g_gate_ctl_io *ggio; 1116 struct hio *hio; 1117 unsigned int ii, ncomp, ncomps; 1118 int error; 1119 1120 for (;;) { 1121 pjdlog_debug(2, "ggate_recv: Taking free request."); 1122 QUEUE_TAKE2(hio, free); 1123 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1124 ggio = &hio->hio_ggio; 1125 ggio->gctl_unit = res->hr_ggateunit; 1126 ggio->gctl_length = MAXPHYS; 1127 ggio->gctl_error = 0; 1128 hio->hio_done = false; 1129 hio->hio_replication = res->hr_replication; 1130 pjdlog_debug(2, 1131 "ggate_recv: (%p) Waiting for request from the kernel.", 1132 hio); 1133 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1134 if (sigexit_received) 1135 pthread_exit(NULL); 1136 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1137 } 1138 error = ggio->gctl_error; 1139 switch (error) { 1140 case 0: 1141 break; 1142 case ECANCELED: 1143 /* Exit gracefully. */ 1144 if (!sigexit_received) { 1145 pjdlog_debug(2, 1146 "ggate_recv: (%p) Received cancel from the kernel.", 1147 hio); 1148 pjdlog_info("Received cancel from the kernel, exiting."); 1149 } 1150 pthread_exit(NULL); 1151 case ENOMEM: 1152 /* 1153 * Buffer too small? Impossible, we allocate MAXPHYS 1154 * bytes - request can't be bigger than that. 1155 */ 1156 /* FALLTHROUGH */ 1157 case ENXIO: 1158 default: 1159 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1160 strerror(error)); 1161 } 1162 1163 ncomp = 0; 1164 ncomps = HAST_NCOMPONENTS; 1165 1166 for (ii = 0; ii < ncomps; ii++) 1167 hio->hio_errors[ii] = EINVAL; 1168 reqlog(LOG_DEBUG, 2, ggio, 1169 "ggate_recv: (%p) Request received from the kernel: ", 1170 hio); 1171 1172 /* 1173 * Inform all components about new write request. 1174 * For read request prefer local component unless the given 1175 * range is out-of-date, then use remote component. 1176 */ 1177 switch (ggio->gctl_cmd) { 1178 case BIO_READ: 1179 res->hr_stat_read++; 1180 ncomps = 1; 1181 mtx_lock(&metadata_lock); 1182 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1183 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1184 /* 1185 * This range is up-to-date on local component, 1186 * so handle request locally. 1187 */ 1188 /* Local component is 0 for now. */ 1189 ncomp = 0; 1190 } else /* if (res->hr_syncsrc == 1191 HAST_SYNCSRC_SECONDARY) */ { 1192 PJDLOG_ASSERT(res->hr_syncsrc == 1193 HAST_SYNCSRC_SECONDARY); 1194 /* 1195 * This range is out-of-date on local component, 1196 * so send request to the remote node. 1197 */ 1198 /* Remote component is 1 for now. */ 1199 ncomp = 1; 1200 } 1201 mtx_unlock(&metadata_lock); 1202 break; 1203 case BIO_WRITE: 1204 res->hr_stat_write++; 1205 if (res->hr_resuid == 0 && 1206 res->hr_primary_localcnt == 0) { 1207 /* This is first write. */ 1208 res->hr_primary_localcnt = 1; 1209 } 1210 for (;;) { 1211 mtx_lock(&range_lock); 1212 if (rangelock_islocked(range_sync, 1213 ggio->gctl_offset, ggio->gctl_length)) { 1214 pjdlog_debug(2, 1215 "regular: Range offset=%jd length=%zu locked.", 1216 (intmax_t)ggio->gctl_offset, 1217 (size_t)ggio->gctl_length); 1218 range_regular_wait = true; 1219 cv_wait(&range_regular_cond, &range_lock); 1220 range_regular_wait = false; 1221 mtx_unlock(&range_lock); 1222 continue; 1223 } 1224 if (rangelock_add(range_regular, 1225 ggio->gctl_offset, ggio->gctl_length) == -1) { 1226 mtx_unlock(&range_lock); 1227 pjdlog_debug(2, 1228 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1229 (intmax_t)ggio->gctl_offset, 1230 (size_t)ggio->gctl_length); 1231 sleep(1); 1232 continue; 1233 } 1234 mtx_unlock(&range_lock); 1235 break; 1236 } 1237 mtx_lock(&res->hr_amp_lock); 1238 if (activemap_write_start(res->hr_amp, 1239 ggio->gctl_offset, ggio->gctl_length)) { 1240 res->hr_stat_activemap_update++; 1241 (void)hast_activemap_flush(res); 1242 } 1243 mtx_unlock(&res->hr_amp_lock); 1244 break; 1245 case BIO_DELETE: 1246 res->hr_stat_delete++; 1247 break; 1248 case BIO_FLUSH: 1249 res->hr_stat_flush++; 1250 break; 1251 } 1252 pjdlog_debug(2, 1253 "ggate_recv: (%p) Moving request to the send queues.", hio); 1254 refcount_init(&hio->hio_countdown, ncomps); 1255 for (ii = ncomp; ii < ncomp + ncomps; ii++) 1256 QUEUE_INSERT1(hio, send, ii); 1257 } 1258 /* NOTREACHED */ 1259 return (NULL); 1260 } 1261 1262 /* 1263 * Thread reads from or writes to local component. 1264 * If local read fails, it redirects it to remote_send thread. 1265 */ 1266 static void * 1267 local_send_thread(void *arg) 1268 { 1269 struct hast_resource *res = arg; 1270 struct g_gate_ctl_io *ggio; 1271 struct hio *hio; 1272 unsigned int ncomp, rncomp; 1273 ssize_t ret; 1274 1275 /* Local component is 0 for now. */ 1276 ncomp = 0; 1277 /* Remote component is 1 for now. */ 1278 rncomp = 1; 1279 1280 for (;;) { 1281 pjdlog_debug(2, "local_send: Taking request."); 1282 QUEUE_TAKE1(hio, send, ncomp, 0); 1283 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1284 ggio = &hio->hio_ggio; 1285 switch (ggio->gctl_cmd) { 1286 case BIO_READ: 1287 ret = pread(res->hr_localfd, ggio->gctl_data, 1288 ggio->gctl_length, 1289 ggio->gctl_offset + res->hr_localoff); 1290 if (ret == ggio->gctl_length) 1291 hio->hio_errors[ncomp] = 0; 1292 else if (!ISSYNCREQ(hio)) { 1293 /* 1294 * If READ failed, try to read from remote node. 1295 */ 1296 if (ret == -1) { 1297 reqlog(LOG_WARNING, 0, ggio, 1298 "Local request failed (%s), trying remote node. ", 1299 strerror(errno)); 1300 } else if (ret != ggio->gctl_length) { 1301 reqlog(LOG_WARNING, 0, ggio, 1302 "Local request failed (%zd != %jd), trying remote node. ", 1303 ret, (intmax_t)ggio->gctl_length); 1304 } 1305 QUEUE_INSERT1(hio, send, rncomp); 1306 continue; 1307 } 1308 break; 1309 case BIO_WRITE: 1310 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1311 ggio->gctl_length, 1312 ggio->gctl_offset + res->hr_localoff); 1313 if (ret == -1) { 1314 hio->hio_errors[ncomp] = errno; 1315 reqlog(LOG_WARNING, 0, ggio, 1316 "Local request failed (%s): ", 1317 strerror(errno)); 1318 } else if (ret != ggio->gctl_length) { 1319 hio->hio_errors[ncomp] = EIO; 1320 reqlog(LOG_WARNING, 0, ggio, 1321 "Local request failed (%zd != %jd): ", 1322 ret, (intmax_t)ggio->gctl_length); 1323 } else { 1324 hio->hio_errors[ncomp] = 0; 1325 if (hio->hio_replication == 1326 HAST_REPLICATION_ASYNC && 1327 !ISSYNCREQ(hio)) { 1328 ggio->gctl_error = 0; 1329 write_complete(res, hio); 1330 } 1331 } 1332 break; 1333 case BIO_DELETE: 1334 ret = g_delete(res->hr_localfd, 1335 ggio->gctl_offset + res->hr_localoff, 1336 ggio->gctl_length); 1337 if (ret == -1) { 1338 hio->hio_errors[ncomp] = errno; 1339 reqlog(LOG_WARNING, 0, ggio, 1340 "Local request failed (%s): ", 1341 strerror(errno)); 1342 } else { 1343 hio->hio_errors[ncomp] = 0; 1344 } 1345 break; 1346 case BIO_FLUSH: 1347 if (!res->hr_localflush) { 1348 ret = -1; 1349 errno = EOPNOTSUPP; 1350 break; 1351 } 1352 ret = g_flush(res->hr_localfd); 1353 if (ret == -1) { 1354 if (errno == EOPNOTSUPP) 1355 res->hr_localflush = false; 1356 hio->hio_errors[ncomp] = errno; 1357 reqlog(LOG_WARNING, 0, ggio, 1358 "Local request failed (%s): ", 1359 strerror(errno)); 1360 } else { 1361 hio->hio_errors[ncomp] = 0; 1362 } 1363 break; 1364 } 1365 if (!refcount_release(&hio->hio_countdown)) 1366 continue; 1367 if (ISSYNCREQ(hio)) { 1368 mtx_lock(&sync_lock); 1369 SYNCREQDONE(hio); 1370 mtx_unlock(&sync_lock); 1371 cv_signal(&sync_cond); 1372 } else { 1373 pjdlog_debug(2, 1374 "local_send: (%p) Moving request to the done queue.", 1375 hio); 1376 QUEUE_INSERT2(hio, done); 1377 } 1378 } 1379 /* NOTREACHED */ 1380 return (NULL); 1381 } 1382 1383 static void 1384 keepalive_send(struct hast_resource *res, unsigned int ncomp) 1385 { 1386 struct nv *nv; 1387 1388 rw_rlock(&hio_remote_lock[ncomp]); 1389 1390 if (!ISCONNECTED(res, ncomp)) { 1391 rw_unlock(&hio_remote_lock[ncomp]); 1392 return; 1393 } 1394 1395 PJDLOG_ASSERT(res->hr_remotein != NULL); 1396 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1397 1398 nv = nv_alloc(); 1399 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1400 if (nv_error(nv) != 0) { 1401 rw_unlock(&hio_remote_lock[ncomp]); 1402 nv_free(nv); 1403 pjdlog_debug(1, 1404 "keepalive_send: Unable to prepare header to send."); 1405 return; 1406 } 1407 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1408 rw_unlock(&hio_remote_lock[ncomp]); 1409 pjdlog_common(LOG_DEBUG, 1, errno, 1410 "keepalive_send: Unable to send request"); 1411 nv_free(nv); 1412 remote_close(res, ncomp); 1413 return; 1414 } 1415 1416 rw_unlock(&hio_remote_lock[ncomp]); 1417 nv_free(nv); 1418 pjdlog_debug(2, "keepalive_send: Request sent."); 1419 } 1420 1421 /* 1422 * Thread sends request to secondary node. 1423 */ 1424 static void * 1425 remote_send_thread(void *arg) 1426 { 1427 struct hast_resource *res = arg; 1428 struct g_gate_ctl_io *ggio; 1429 time_t lastcheck, now; 1430 struct hio *hio; 1431 struct nv *nv; 1432 unsigned int ncomp; 1433 bool wakeup; 1434 uint64_t offset, length; 1435 uint8_t cmd; 1436 void *data; 1437 1438 /* Remote component is 1 for now. */ 1439 ncomp = 1; 1440 lastcheck = time(NULL); 1441 1442 for (;;) { 1443 pjdlog_debug(2, "remote_send: Taking request."); 1444 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1445 if (hio == NULL) { 1446 now = time(NULL); 1447 if (lastcheck + HAST_KEEPALIVE <= now) { 1448 keepalive_send(res, ncomp); 1449 lastcheck = now; 1450 } 1451 continue; 1452 } 1453 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1454 ggio = &hio->hio_ggio; 1455 switch (ggio->gctl_cmd) { 1456 case BIO_READ: 1457 cmd = HIO_READ; 1458 data = NULL; 1459 offset = ggio->gctl_offset; 1460 length = ggio->gctl_length; 1461 break; 1462 case BIO_WRITE: 1463 cmd = HIO_WRITE; 1464 data = ggio->gctl_data; 1465 offset = ggio->gctl_offset; 1466 length = ggio->gctl_length; 1467 break; 1468 case BIO_DELETE: 1469 cmd = HIO_DELETE; 1470 data = NULL; 1471 offset = ggio->gctl_offset; 1472 length = ggio->gctl_length; 1473 break; 1474 case BIO_FLUSH: 1475 cmd = HIO_FLUSH; 1476 data = NULL; 1477 offset = 0; 1478 length = 0; 1479 break; 1480 default: 1481 PJDLOG_ABORT("invalid condition"); 1482 } 1483 nv = nv_alloc(); 1484 nv_add_uint8(nv, cmd, "cmd"); 1485 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1486 nv_add_uint64(nv, offset, "offset"); 1487 nv_add_uint64(nv, length, "length"); 1488 if (nv_error(nv) != 0) { 1489 hio->hio_errors[ncomp] = nv_error(nv); 1490 pjdlog_debug(2, 1491 "remote_send: (%p) Unable to prepare header to send.", 1492 hio); 1493 reqlog(LOG_ERR, 0, ggio, 1494 "Unable to prepare header to send (%s): ", 1495 strerror(nv_error(nv))); 1496 /* Move failed request immediately to the done queue. */ 1497 goto done_queue; 1498 } 1499 /* 1500 * Protect connection from disappearing. 1501 */ 1502 rw_rlock(&hio_remote_lock[ncomp]); 1503 if (!ISCONNECTED(res, ncomp)) { 1504 rw_unlock(&hio_remote_lock[ncomp]); 1505 hio->hio_errors[ncomp] = ENOTCONN; 1506 goto done_queue; 1507 } 1508 /* 1509 * Move the request to recv queue before sending it, because 1510 * in different order we can get reply before we move request 1511 * to recv queue. 1512 */ 1513 pjdlog_debug(2, 1514 "remote_send: (%p) Moving request to the recv queue.", 1515 hio); 1516 mtx_lock(&hio_recv_list_lock[ncomp]); 1517 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1518 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1519 mtx_unlock(&hio_recv_list_lock[ncomp]); 1520 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1521 data != NULL ? length : 0) == -1) { 1522 hio->hio_errors[ncomp] = errno; 1523 rw_unlock(&hio_remote_lock[ncomp]); 1524 pjdlog_debug(2, 1525 "remote_send: (%p) Unable to send request.", hio); 1526 reqlog(LOG_ERR, 0, ggio, 1527 "Unable to send request (%s): ", 1528 strerror(hio->hio_errors[ncomp])); 1529 remote_close(res, ncomp); 1530 /* 1531 * Take request back from the receive queue and move 1532 * it immediately to the done queue. 1533 */ 1534 mtx_lock(&hio_recv_list_lock[ncomp]); 1535 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1536 hio_next[ncomp]); 1537 mtx_unlock(&hio_recv_list_lock[ncomp]); 1538 goto done_queue; 1539 } 1540 rw_unlock(&hio_remote_lock[ncomp]); 1541 nv_free(nv); 1542 if (wakeup) 1543 cv_signal(&hio_recv_list_cond[ncomp]); 1544 continue; 1545 done_queue: 1546 nv_free(nv); 1547 if (ISSYNCREQ(hio)) { 1548 if (!refcount_release(&hio->hio_countdown)) 1549 continue; 1550 mtx_lock(&sync_lock); 1551 SYNCREQDONE(hio); 1552 mtx_unlock(&sync_lock); 1553 cv_signal(&sync_cond); 1554 continue; 1555 } 1556 if (ggio->gctl_cmd == BIO_WRITE) { 1557 mtx_lock(&res->hr_amp_lock); 1558 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1559 ggio->gctl_length)) { 1560 (void)hast_activemap_flush(res); 1561 } 1562 mtx_unlock(&res->hr_amp_lock); 1563 } 1564 if (!refcount_release(&hio->hio_countdown)) 1565 continue; 1566 pjdlog_debug(2, 1567 "remote_send: (%p) Moving request to the done queue.", 1568 hio); 1569 QUEUE_INSERT2(hio, done); 1570 } 1571 /* NOTREACHED */ 1572 return (NULL); 1573 } 1574 1575 /* 1576 * Thread receives answer from secondary node and passes it to ggate_send 1577 * thread. 1578 */ 1579 static void * 1580 remote_recv_thread(void *arg) 1581 { 1582 struct hast_resource *res = arg; 1583 struct g_gate_ctl_io *ggio; 1584 struct hio *hio; 1585 struct nv *nv; 1586 unsigned int ncomp; 1587 uint64_t seq; 1588 int error; 1589 1590 /* Remote component is 1 for now. */ 1591 ncomp = 1; 1592 1593 for (;;) { 1594 /* Wait until there is anything to receive. */ 1595 mtx_lock(&hio_recv_list_lock[ncomp]); 1596 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1597 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1598 cv_wait(&hio_recv_list_cond[ncomp], 1599 &hio_recv_list_lock[ncomp]); 1600 } 1601 mtx_unlock(&hio_recv_list_lock[ncomp]); 1602 1603 rw_rlock(&hio_remote_lock[ncomp]); 1604 if (!ISCONNECTED(res, ncomp)) { 1605 rw_unlock(&hio_remote_lock[ncomp]); 1606 /* 1607 * Connection is dead, so move all pending requests to 1608 * the done queue (one-by-one). 1609 */ 1610 mtx_lock(&hio_recv_list_lock[ncomp]); 1611 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1612 PJDLOG_ASSERT(hio != NULL); 1613 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1614 hio_next[ncomp]); 1615 mtx_unlock(&hio_recv_list_lock[ncomp]); 1616 goto done_queue; 1617 } 1618 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1619 pjdlog_errno(LOG_ERR, 1620 "Unable to receive reply header"); 1621 rw_unlock(&hio_remote_lock[ncomp]); 1622 remote_close(res, ncomp); 1623 continue; 1624 } 1625 rw_unlock(&hio_remote_lock[ncomp]); 1626 seq = nv_get_uint64(nv, "seq"); 1627 if (seq == 0) { 1628 pjdlog_error("Header contains no 'seq' field."); 1629 nv_free(nv); 1630 continue; 1631 } 1632 mtx_lock(&hio_recv_list_lock[ncomp]); 1633 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1634 if (hio->hio_ggio.gctl_seq == seq) { 1635 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1636 hio_next[ncomp]); 1637 break; 1638 } 1639 } 1640 mtx_unlock(&hio_recv_list_lock[ncomp]); 1641 if (hio == NULL) { 1642 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1643 (uintmax_t)seq); 1644 nv_free(nv); 1645 continue; 1646 } 1647 ggio = &hio->hio_ggio; 1648 error = nv_get_int16(nv, "error"); 1649 if (error != 0) { 1650 /* Request failed on remote side. */ 1651 hio->hio_errors[ncomp] = error; 1652 reqlog(LOG_WARNING, 0, ggio, 1653 "Remote request failed (%s): ", strerror(error)); 1654 nv_free(nv); 1655 goto done_queue; 1656 } 1657 switch (ggio->gctl_cmd) { 1658 case BIO_READ: 1659 rw_rlock(&hio_remote_lock[ncomp]); 1660 if (!ISCONNECTED(res, ncomp)) { 1661 rw_unlock(&hio_remote_lock[ncomp]); 1662 nv_free(nv); 1663 goto done_queue; 1664 } 1665 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1666 ggio->gctl_data, ggio->gctl_length) == -1) { 1667 hio->hio_errors[ncomp] = errno; 1668 pjdlog_errno(LOG_ERR, 1669 "Unable to receive reply data"); 1670 rw_unlock(&hio_remote_lock[ncomp]); 1671 nv_free(nv); 1672 remote_close(res, ncomp); 1673 goto done_queue; 1674 } 1675 rw_unlock(&hio_remote_lock[ncomp]); 1676 break; 1677 case BIO_WRITE: 1678 case BIO_DELETE: 1679 case BIO_FLUSH: 1680 break; 1681 default: 1682 PJDLOG_ABORT("invalid condition"); 1683 } 1684 hio->hio_errors[ncomp] = 0; 1685 nv_free(nv); 1686 done_queue: 1687 if (!refcount_release(&hio->hio_countdown)) 1688 continue; 1689 if (ISSYNCREQ(hio)) { 1690 mtx_lock(&sync_lock); 1691 SYNCREQDONE(hio); 1692 mtx_unlock(&sync_lock); 1693 cv_signal(&sync_cond); 1694 } else { 1695 pjdlog_debug(2, 1696 "remote_recv: (%p) Moving request to the done queue.", 1697 hio); 1698 QUEUE_INSERT2(hio, done); 1699 } 1700 } 1701 /* NOTREACHED */ 1702 return (NULL); 1703 } 1704 1705 /* 1706 * Thread sends answer to the kernel. 1707 */ 1708 static void * 1709 ggate_send_thread(void *arg) 1710 { 1711 struct hast_resource *res = arg; 1712 struct g_gate_ctl_io *ggio; 1713 struct hio *hio; 1714 unsigned int ii, ncomps; 1715 1716 ncomps = HAST_NCOMPONENTS; 1717 1718 for (;;) { 1719 pjdlog_debug(2, "ggate_send: Taking request."); 1720 QUEUE_TAKE2(hio, done); 1721 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1722 ggio = &hio->hio_ggio; 1723 for (ii = 0; ii < ncomps; ii++) { 1724 if (hio->hio_errors[ii] == 0) { 1725 /* 1726 * One successful request is enough to declare 1727 * success. 1728 */ 1729 ggio->gctl_error = 0; 1730 break; 1731 } 1732 } 1733 if (ii == ncomps) { 1734 /* 1735 * None of the requests were successful. 1736 * Use the error from local component except the 1737 * case when we did only remote request. 1738 */ 1739 if (ggio->gctl_cmd == BIO_READ && 1740 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1741 ggio->gctl_error = hio->hio_errors[1]; 1742 else 1743 ggio->gctl_error = hio->hio_errors[0]; 1744 } 1745 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1746 mtx_lock(&res->hr_amp_lock); 1747 if (activemap_write_complete(res->hr_amp, 1748 ggio->gctl_offset, ggio->gctl_length)) { 1749 res->hr_stat_activemap_update++; 1750 (void)hast_activemap_flush(res); 1751 } 1752 mtx_unlock(&res->hr_amp_lock); 1753 } 1754 if (ggio->gctl_cmd == BIO_WRITE) { 1755 /* 1756 * Unlock range we locked. 1757 */ 1758 mtx_lock(&range_lock); 1759 rangelock_del(range_regular, ggio->gctl_offset, 1760 ggio->gctl_length); 1761 if (range_sync_wait) 1762 cv_signal(&range_sync_cond); 1763 mtx_unlock(&range_lock); 1764 if (!hio->hio_done) 1765 write_complete(res, hio); 1766 } else { 1767 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1768 primary_exit(EX_OSERR, 1769 "G_GATE_CMD_DONE failed"); 1770 } 1771 } 1772 pjdlog_debug(2, 1773 "ggate_send: (%p) Moving request to the free queue.", hio); 1774 QUEUE_INSERT2(hio, free); 1775 } 1776 /* NOTREACHED */ 1777 return (NULL); 1778 } 1779 1780 /* 1781 * Thread synchronize local and remote components. 1782 */ 1783 static void * 1784 sync_thread(void *arg __unused) 1785 { 1786 struct hast_resource *res = arg; 1787 struct hio *hio; 1788 struct g_gate_ctl_io *ggio; 1789 struct timeval tstart, tend, tdiff; 1790 unsigned int ii, ncomp, ncomps; 1791 off_t offset, length, synced; 1792 bool dorewind; 1793 int syncext; 1794 1795 ncomps = HAST_NCOMPONENTS; 1796 dorewind = true; 1797 synced = 0; 1798 offset = -1; 1799 1800 for (;;) { 1801 mtx_lock(&sync_lock); 1802 if (offset >= 0 && !sync_inprogress) { 1803 gettimeofday(&tend, NULL); 1804 timersub(&tend, &tstart, &tdiff); 1805 pjdlog_info("Synchronization interrupted after %#.0T. " 1806 "%NB synchronized so far.", &tdiff, 1807 (intmax_t)synced); 1808 event_send(res, EVENT_SYNCINTR); 1809 } 1810 while (!sync_inprogress) { 1811 dorewind = true; 1812 synced = 0; 1813 cv_wait(&sync_cond, &sync_lock); 1814 } 1815 mtx_unlock(&sync_lock); 1816 /* 1817 * Obtain offset at which we should synchronize. 1818 * Rewind synchronization if needed. 1819 */ 1820 mtx_lock(&res->hr_amp_lock); 1821 if (dorewind) 1822 activemap_sync_rewind(res->hr_amp); 1823 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1824 if (syncext != -1) { 1825 /* 1826 * We synchronized entire syncext extent, we can mark 1827 * it as clean now. 1828 */ 1829 if (activemap_extent_complete(res->hr_amp, syncext)) 1830 (void)hast_activemap_flush(res); 1831 } 1832 mtx_unlock(&res->hr_amp_lock); 1833 if (dorewind) { 1834 dorewind = false; 1835 if (offset == -1) 1836 pjdlog_info("Nodes are in sync."); 1837 else { 1838 pjdlog_info("Synchronization started. %NB to go.", 1839 (intmax_t)(res->hr_extentsize * 1840 activemap_ndirty(res->hr_amp))); 1841 event_send(res, EVENT_SYNCSTART); 1842 gettimeofday(&tstart, NULL); 1843 } 1844 } 1845 if (offset == -1) { 1846 sync_stop(); 1847 pjdlog_debug(1, "Nothing to synchronize."); 1848 /* 1849 * Synchronization complete, make both localcnt and 1850 * remotecnt equal. 1851 */ 1852 ncomp = 1; 1853 rw_rlock(&hio_remote_lock[ncomp]); 1854 if (ISCONNECTED(res, ncomp)) { 1855 if (synced > 0) { 1856 int64_t bps; 1857 1858 gettimeofday(&tend, NULL); 1859 timersub(&tend, &tstart, &tdiff); 1860 bps = (int64_t)((double)synced / 1861 ((double)tdiff.tv_sec + 1862 (double)tdiff.tv_usec / 1000000)); 1863 pjdlog_info("Synchronization complete. " 1864 "%NB synchronized in %#.0lT (%NB/sec).", 1865 (intmax_t)synced, &tdiff, 1866 (intmax_t)bps); 1867 event_send(res, EVENT_SYNCDONE); 1868 } 1869 mtx_lock(&metadata_lock); 1870 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1871 res->hr_primary_localcnt = 1872 res->hr_secondary_remotecnt; 1873 res->hr_primary_remotecnt = 1874 res->hr_secondary_localcnt; 1875 pjdlog_debug(1, 1876 "Setting localcnt to %ju and remotecnt to %ju.", 1877 (uintmax_t)res->hr_primary_localcnt, 1878 (uintmax_t)res->hr_primary_remotecnt); 1879 (void)metadata_write(res); 1880 mtx_unlock(&metadata_lock); 1881 } 1882 rw_unlock(&hio_remote_lock[ncomp]); 1883 continue; 1884 } 1885 pjdlog_debug(2, "sync: Taking free request."); 1886 QUEUE_TAKE2(hio, free); 1887 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1888 /* 1889 * Lock the range we are going to synchronize. We don't want 1890 * race where someone writes between our read and write. 1891 */ 1892 for (;;) { 1893 mtx_lock(&range_lock); 1894 if (rangelock_islocked(range_regular, offset, length)) { 1895 pjdlog_debug(2, 1896 "sync: Range offset=%jd length=%jd locked.", 1897 (intmax_t)offset, (intmax_t)length); 1898 range_sync_wait = true; 1899 cv_wait(&range_sync_cond, &range_lock); 1900 range_sync_wait = false; 1901 mtx_unlock(&range_lock); 1902 continue; 1903 } 1904 if (rangelock_add(range_sync, offset, length) == -1) { 1905 mtx_unlock(&range_lock); 1906 pjdlog_debug(2, 1907 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1908 (intmax_t)offset, (intmax_t)length); 1909 sleep(1); 1910 continue; 1911 } 1912 mtx_unlock(&range_lock); 1913 break; 1914 } 1915 /* 1916 * First read the data from synchronization source. 1917 */ 1918 SYNCREQ(hio); 1919 ggio = &hio->hio_ggio; 1920 ggio->gctl_cmd = BIO_READ; 1921 ggio->gctl_offset = offset; 1922 ggio->gctl_length = length; 1923 ggio->gctl_error = 0; 1924 hio->hio_done = false; 1925 hio->hio_replication = res->hr_replication; 1926 for (ii = 0; ii < ncomps; ii++) 1927 hio->hio_errors[ii] = EINVAL; 1928 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1929 hio); 1930 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1931 hio); 1932 mtx_lock(&metadata_lock); 1933 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1934 /* 1935 * This range is up-to-date on local component, 1936 * so handle request locally. 1937 */ 1938 /* Local component is 0 for now. */ 1939 ncomp = 0; 1940 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1941 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1942 /* 1943 * This range is out-of-date on local component, 1944 * so send request to the remote node. 1945 */ 1946 /* Remote component is 1 for now. */ 1947 ncomp = 1; 1948 } 1949 mtx_unlock(&metadata_lock); 1950 refcount_init(&hio->hio_countdown, 1); 1951 QUEUE_INSERT1(hio, send, ncomp); 1952 1953 /* 1954 * Let's wait for READ to finish. 1955 */ 1956 mtx_lock(&sync_lock); 1957 while (!ISSYNCREQDONE(hio)) 1958 cv_wait(&sync_cond, &sync_lock); 1959 mtx_unlock(&sync_lock); 1960 1961 if (hio->hio_errors[ncomp] != 0) { 1962 pjdlog_error("Unable to read synchronization data: %s.", 1963 strerror(hio->hio_errors[ncomp])); 1964 goto free_queue; 1965 } 1966 1967 /* 1968 * We read the data from synchronization source, now write it 1969 * to synchronization target. 1970 */ 1971 SYNCREQ(hio); 1972 ggio->gctl_cmd = BIO_WRITE; 1973 for (ii = 0; ii < ncomps; ii++) 1974 hio->hio_errors[ii] = EINVAL; 1975 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1976 hio); 1977 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1978 hio); 1979 mtx_lock(&metadata_lock); 1980 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1981 /* 1982 * This range is up-to-date on local component, 1983 * so we update remote component. 1984 */ 1985 /* Remote component is 1 for now. */ 1986 ncomp = 1; 1987 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1988 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1989 /* 1990 * This range is out-of-date on local component, 1991 * so we update it. 1992 */ 1993 /* Local component is 0 for now. */ 1994 ncomp = 0; 1995 } 1996 mtx_unlock(&metadata_lock); 1997 1998 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1999 hio); 2000 refcount_init(&hio->hio_countdown, 1); 2001 QUEUE_INSERT1(hio, send, ncomp); 2002 2003 /* 2004 * Let's wait for WRITE to finish. 2005 */ 2006 mtx_lock(&sync_lock); 2007 while (!ISSYNCREQDONE(hio)) 2008 cv_wait(&sync_cond, &sync_lock); 2009 mtx_unlock(&sync_lock); 2010 2011 if (hio->hio_errors[ncomp] != 0) { 2012 pjdlog_error("Unable to write synchronization data: %s.", 2013 strerror(hio->hio_errors[ncomp])); 2014 goto free_queue; 2015 } 2016 2017 synced += length; 2018 free_queue: 2019 mtx_lock(&range_lock); 2020 rangelock_del(range_sync, offset, length); 2021 if (range_regular_wait) 2022 cv_signal(&range_regular_cond); 2023 mtx_unlock(&range_lock); 2024 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2025 hio); 2026 QUEUE_INSERT2(hio, free); 2027 } 2028 /* NOTREACHED */ 2029 return (NULL); 2030 } 2031 2032 void 2033 primary_config_reload(struct hast_resource *res, struct nv *nv) 2034 { 2035 unsigned int ii, ncomps; 2036 int modified, vint; 2037 const char *vstr; 2038 2039 pjdlog_info("Reloading configuration..."); 2040 2041 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2042 PJDLOG_ASSERT(gres == res); 2043 nv_assert(nv, "remoteaddr"); 2044 nv_assert(nv, "sourceaddr"); 2045 nv_assert(nv, "replication"); 2046 nv_assert(nv, "checksum"); 2047 nv_assert(nv, "compression"); 2048 nv_assert(nv, "timeout"); 2049 nv_assert(nv, "exec"); 2050 nv_assert(nv, "metaflush"); 2051 2052 ncomps = HAST_NCOMPONENTS; 2053 2054 #define MODIFIED_REMOTEADDR 0x01 2055 #define MODIFIED_SOURCEADDR 0x02 2056 #define MODIFIED_REPLICATION 0x04 2057 #define MODIFIED_CHECKSUM 0x08 2058 #define MODIFIED_COMPRESSION 0x10 2059 #define MODIFIED_TIMEOUT 0x20 2060 #define MODIFIED_EXEC 0x40 2061 #define MODIFIED_METAFLUSH 0x80 2062 modified = 0; 2063 2064 vstr = nv_get_string(nv, "remoteaddr"); 2065 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2066 /* 2067 * Don't copy res->hr_remoteaddr to gres just yet. 2068 * We want remote_close() to log disconnect from the old 2069 * addresses, not from the new ones. 2070 */ 2071 modified |= MODIFIED_REMOTEADDR; 2072 } 2073 vstr = nv_get_string(nv, "sourceaddr"); 2074 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2075 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2076 modified |= MODIFIED_SOURCEADDR; 2077 } 2078 vint = nv_get_int32(nv, "replication"); 2079 if (gres->hr_replication != vint) { 2080 gres->hr_replication = vint; 2081 modified |= MODIFIED_REPLICATION; 2082 } 2083 vint = nv_get_int32(nv, "checksum"); 2084 if (gres->hr_checksum != vint) { 2085 gres->hr_checksum = vint; 2086 modified |= MODIFIED_CHECKSUM; 2087 } 2088 vint = nv_get_int32(nv, "compression"); 2089 if (gres->hr_compression != vint) { 2090 gres->hr_compression = vint; 2091 modified |= MODIFIED_COMPRESSION; 2092 } 2093 vint = nv_get_int32(nv, "timeout"); 2094 if (gres->hr_timeout != vint) { 2095 gres->hr_timeout = vint; 2096 modified |= MODIFIED_TIMEOUT; 2097 } 2098 vstr = nv_get_string(nv, "exec"); 2099 if (strcmp(gres->hr_exec, vstr) != 0) { 2100 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2101 modified |= MODIFIED_EXEC; 2102 } 2103 vint = nv_get_int32(nv, "metaflush"); 2104 if (gres->hr_metaflush != vint) { 2105 gres->hr_metaflush = vint; 2106 modified |= MODIFIED_METAFLUSH; 2107 } 2108 2109 /* 2110 * Change timeout for connected sockets. 2111 * Don't bother if we need to reconnect. 2112 */ 2113 if ((modified & MODIFIED_TIMEOUT) != 0 && 2114 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2115 for (ii = 0; ii < ncomps; ii++) { 2116 if (!ISREMOTE(ii)) 2117 continue; 2118 rw_rlock(&hio_remote_lock[ii]); 2119 if (!ISCONNECTED(gres, ii)) { 2120 rw_unlock(&hio_remote_lock[ii]); 2121 continue; 2122 } 2123 rw_unlock(&hio_remote_lock[ii]); 2124 if (proto_timeout(gres->hr_remotein, 2125 gres->hr_timeout) == -1) { 2126 pjdlog_errno(LOG_WARNING, 2127 "Unable to set connection timeout"); 2128 } 2129 if (proto_timeout(gres->hr_remoteout, 2130 gres->hr_timeout) == -1) { 2131 pjdlog_errno(LOG_WARNING, 2132 "Unable to set connection timeout"); 2133 } 2134 } 2135 } 2136 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2137 for (ii = 0; ii < ncomps; ii++) { 2138 if (!ISREMOTE(ii)) 2139 continue; 2140 remote_close(gres, ii); 2141 } 2142 if (modified & MODIFIED_REMOTEADDR) { 2143 vstr = nv_get_string(nv, "remoteaddr"); 2144 strlcpy(gres->hr_remoteaddr, vstr, 2145 sizeof(gres->hr_remoteaddr)); 2146 } 2147 } 2148 #undef MODIFIED_REMOTEADDR 2149 #undef MODIFIED_SOURCEADDR 2150 #undef MODIFIED_REPLICATION 2151 #undef MODIFIED_CHECKSUM 2152 #undef MODIFIED_COMPRESSION 2153 #undef MODIFIED_TIMEOUT 2154 #undef MODIFIED_EXEC 2155 #undef MODIFIED_METAFLUSH 2156 2157 pjdlog_info("Configuration reloaded successfully."); 2158 } 2159 2160 static void 2161 guard_one(struct hast_resource *res, unsigned int ncomp) 2162 { 2163 struct proto_conn *in, *out; 2164 2165 if (!ISREMOTE(ncomp)) 2166 return; 2167 2168 rw_rlock(&hio_remote_lock[ncomp]); 2169 2170 if (!real_remote(res)) { 2171 rw_unlock(&hio_remote_lock[ncomp]); 2172 return; 2173 } 2174 2175 if (ISCONNECTED(res, ncomp)) { 2176 PJDLOG_ASSERT(res->hr_remotein != NULL); 2177 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2178 rw_unlock(&hio_remote_lock[ncomp]); 2179 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2180 res->hr_remoteaddr); 2181 return; 2182 } 2183 2184 PJDLOG_ASSERT(res->hr_remotein == NULL); 2185 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2186 /* 2187 * Upgrade the lock. It doesn't have to be atomic as no other thread 2188 * can change connection status from disconnected to connected. 2189 */ 2190 rw_unlock(&hio_remote_lock[ncomp]); 2191 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2192 res->hr_remoteaddr); 2193 in = out = NULL; 2194 if (init_remote(res, &in, &out) == 0) { 2195 rw_wlock(&hio_remote_lock[ncomp]); 2196 PJDLOG_ASSERT(res->hr_remotein == NULL); 2197 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2198 PJDLOG_ASSERT(in != NULL && out != NULL); 2199 res->hr_remotein = in; 2200 res->hr_remoteout = out; 2201 rw_unlock(&hio_remote_lock[ncomp]); 2202 pjdlog_info("Successfully reconnected to %s.", 2203 res->hr_remoteaddr); 2204 sync_start(); 2205 } else { 2206 /* Both connections should be NULL. */ 2207 PJDLOG_ASSERT(res->hr_remotein == NULL); 2208 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2209 PJDLOG_ASSERT(in == NULL && out == NULL); 2210 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2211 res->hr_remoteaddr); 2212 } 2213 } 2214 2215 /* 2216 * Thread guards remote connections and reconnects when needed, handles 2217 * signals, etc. 2218 */ 2219 static void * 2220 guard_thread(void *arg) 2221 { 2222 struct hast_resource *res = arg; 2223 unsigned int ii, ncomps; 2224 struct timespec timeout; 2225 time_t lastcheck, now; 2226 sigset_t mask; 2227 int signo; 2228 2229 ncomps = HAST_NCOMPONENTS; 2230 lastcheck = time(NULL); 2231 2232 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2233 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2234 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2235 2236 timeout.tv_sec = HAST_KEEPALIVE; 2237 timeout.tv_nsec = 0; 2238 signo = -1; 2239 2240 for (;;) { 2241 switch (signo) { 2242 case SIGINT: 2243 case SIGTERM: 2244 sigexit_received = true; 2245 primary_exitx(EX_OK, 2246 "Termination signal received, exiting."); 2247 break; 2248 default: 2249 break; 2250 } 2251 2252 /* 2253 * Don't check connections until we fully started, 2254 * as we may still be looping, waiting for remote node 2255 * to switch from primary to secondary. 2256 */ 2257 if (fullystarted) { 2258 pjdlog_debug(2, "remote_guard: Checking connections."); 2259 now = time(NULL); 2260 if (lastcheck + HAST_KEEPALIVE <= now) { 2261 for (ii = 0; ii < ncomps; ii++) 2262 guard_one(res, ii); 2263 lastcheck = now; 2264 } 2265 } 2266 signo = sigtimedwait(&mask, NULL, &timeout); 2267 } 2268 /* NOTREACHED */ 2269 return (NULL); 2270 } 2271