1 /*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/types.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/refcount.h> 39 #include <sys/stat.h> 40 41 #include <geom/gate/g_gate.h> 42 43 #include <err.h> 44 #include <errno.h> 45 #include <fcntl.h> 46 #include <libgeom.h> 47 #include <pthread.h> 48 #include <signal.h> 49 #include <stdint.h> 50 #include <stdio.h> 51 #include <string.h> 52 #include <sysexits.h> 53 #include <unistd.h> 54 55 #include <activemap.h> 56 #include <nv.h> 57 #include <rangelock.h> 58 59 #include "control.h" 60 #include "event.h" 61 #include "hast.h" 62 #include "hast_proto.h" 63 #include "hastd.h" 64 #include "hooks.h" 65 #include "metadata.h" 66 #include "proto.h" 67 #include "pjdlog.h" 68 #include "subr.h" 69 #include "synch.h" 70 71 /* The is only one remote component for now. */ 72 #define ISREMOTE(no) ((no) == 1) 73 74 struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 unsigned int hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 /* 93 * Request was already confirmed to GEOM Gate. 94 */ 95 bool hio_done; 96 /* 97 * Remember replication from the time the request was initiated, 98 * so we won't get confused when replication changes on reload. 99 */ 100 int hio_replication; 101 TAILQ_ENTRY(hio) *hio_next; 102 }; 103 #define hio_free_next hio_next[0] 104 #define hio_done_next hio_next[0] 105 106 /* 107 * Free list holds unused structures. When free list is empty, we have to wait 108 * until some in-progress requests are freed. 109 */ 110 static TAILQ_HEAD(, hio) hio_free_list; 111 static pthread_mutex_t hio_free_list_lock; 112 static pthread_cond_t hio_free_list_cond; 113 /* 114 * There is one send list for every component. One requests is placed on all 115 * send lists - each component gets the same request, but each component is 116 * responsible for managing his own send list. 117 */ 118 static TAILQ_HEAD(, hio) *hio_send_list; 119 static pthread_mutex_t *hio_send_list_lock; 120 static pthread_cond_t *hio_send_list_cond; 121 /* 122 * There is one recv list for every component, although local components don't 123 * use recv lists as local requests are done synchronously. 124 */ 125 static TAILQ_HEAD(, hio) *hio_recv_list; 126 static pthread_mutex_t *hio_recv_list_lock; 127 static pthread_cond_t *hio_recv_list_cond; 128 /* 129 * Request is placed on done list by the slowest component (the one that 130 * decreased hio_countdown from 1 to 0). 131 */ 132 static TAILQ_HEAD(, hio) hio_done_list; 133 static pthread_mutex_t hio_done_list_lock; 134 static pthread_cond_t hio_done_list_cond; 135 /* 136 * Structure below are for interaction with sync thread. 137 */ 138 static bool sync_inprogress; 139 static pthread_mutex_t sync_lock; 140 static pthread_cond_t sync_cond; 141 /* 142 * The lock below allows to synchornize access to remote connections. 143 */ 144 static pthread_rwlock_t *hio_remote_lock; 145 146 /* 147 * Lock to synchronize metadata updates. Also synchronize access to 148 * hr_primary_localcnt and hr_primary_remotecnt fields. 149 */ 150 static pthread_mutex_t metadata_lock; 151 152 /* 153 * Maximum number of outstanding I/O requests. 154 */ 155 #define HAST_HIO_MAX 256 156 /* 157 * Number of components. At this point there are only two components: local 158 * and remote, but in the future it might be possible to use multiple local 159 * and remote components. 160 */ 161 #define HAST_NCOMPONENTS 2 162 163 #define ISCONNECTED(res, no) \ 164 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 165 166 #define QUEUE_INSERT1(hio, name, ncomp) do { \ 167 bool _wakeup; \ 168 \ 169 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 170 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 171 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 172 hio_next[(ncomp)]); \ 173 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 174 if (_wakeup) \ 175 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 176 } while (0) 177 #define QUEUE_INSERT2(hio, name) do { \ 178 bool _wakeup; \ 179 \ 180 mtx_lock(&hio_##name##_list_lock); \ 181 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 182 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 183 mtx_unlock(&hio_##name##_list_lock); \ 184 if (_wakeup) \ 185 cv_signal(&hio_##name##_list_cond); \ 186 } while (0) 187 #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 188 bool _last; \ 189 \ 190 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 191 _last = false; \ 192 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 193 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 194 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 195 if ((timeout) != 0) \ 196 _last = true; \ 197 } \ 198 if (hio != NULL) { \ 199 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 200 hio_next[(ncomp)]); \ 201 } \ 202 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 203 } while (0) 204 #define QUEUE_TAKE2(hio, name) do { \ 205 mtx_lock(&hio_##name##_list_lock); \ 206 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 207 cv_wait(&hio_##name##_list_cond, \ 208 &hio_##name##_list_lock); \ 209 } \ 210 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 211 mtx_unlock(&hio_##name##_list_lock); \ 212 } while (0) 213 214 #define SYNCREQ(hio) do { \ 215 (hio)->hio_ggio.gctl_unit = -1; \ 216 (hio)->hio_ggio.gctl_seq = 1; \ 217 } while (0) 218 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 219 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 220 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 221 222 static struct hast_resource *gres; 223 224 static pthread_mutex_t range_lock; 225 static struct rangelocks *range_regular; 226 static bool range_regular_wait; 227 static pthread_cond_t range_regular_cond; 228 static struct rangelocks *range_sync; 229 static bool range_sync_wait; 230 static pthread_cond_t range_sync_cond; 231 static bool fullystarted; 232 233 static void *ggate_recv_thread(void *arg); 234 static void *local_send_thread(void *arg); 235 static void *remote_send_thread(void *arg); 236 static void *remote_recv_thread(void *arg); 237 static void *ggate_send_thread(void *arg); 238 static void *sync_thread(void *arg); 239 static void *guard_thread(void *arg); 240 241 static void 242 cleanup(struct hast_resource *res) 243 { 244 int rerrno; 245 246 /* Remember errno. */ 247 rerrno = errno; 248 249 /* Destroy ggate provider if we created one. */ 250 if (res->hr_ggateunit >= 0) { 251 struct g_gate_ctl_destroy ggiod; 252 253 bzero(&ggiod, sizeof(ggiod)); 254 ggiod.gctl_version = G_GATE_VERSION; 255 ggiod.gctl_unit = res->hr_ggateunit; 256 ggiod.gctl_force = 1; 257 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 258 pjdlog_errno(LOG_WARNING, 259 "Unable to destroy hast/%s device", 260 res->hr_provname); 261 } 262 res->hr_ggateunit = -1; 263 } 264 265 /* Restore errno. */ 266 errno = rerrno; 267 } 268 269 static __dead2 void 270 primary_exit(int exitcode, const char *fmt, ...) 271 { 272 va_list ap; 273 274 PJDLOG_ASSERT(exitcode != EX_OK); 275 va_start(ap, fmt); 276 pjdlogv_errno(LOG_ERR, fmt, ap); 277 va_end(ap); 278 cleanup(gres); 279 exit(exitcode); 280 } 281 282 static __dead2 void 283 primary_exitx(int exitcode, const char *fmt, ...) 284 { 285 va_list ap; 286 287 va_start(ap, fmt); 288 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 289 va_end(ap); 290 cleanup(gres); 291 exit(exitcode); 292 } 293 294 static int 295 hast_activemap_flush(struct hast_resource *res) 296 { 297 const unsigned char *buf; 298 size_t size; 299 300 buf = activemap_bitmap(res->hr_amp, &size); 301 PJDLOG_ASSERT(buf != NULL); 302 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 303 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 304 (ssize_t)size) { 305 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 306 return (-1); 307 } 308 if (res->hr_metaflush == 1 && g_flush(res->hr_localfd) == -1) { 309 if (errno == EOPNOTSUPP) { 310 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 311 res->hr_localpath); 312 res->hr_metaflush = 0; 313 } else { 314 pjdlog_errno(LOG_ERR, 315 "Unable to flush disk cache on activemap update"); 316 return (-1); 317 } 318 } 319 return (0); 320 } 321 322 static bool 323 real_remote(const struct hast_resource *res) 324 { 325 326 return (strcmp(res->hr_remoteaddr, "none") != 0); 327 } 328 329 static void 330 init_environment(struct hast_resource *res __unused) 331 { 332 struct hio *hio; 333 unsigned int ii, ncomps; 334 335 /* 336 * In the future it might be per-resource value. 337 */ 338 ncomps = HAST_NCOMPONENTS; 339 340 /* 341 * Allocate memory needed by lists. 342 */ 343 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 344 if (hio_send_list == NULL) { 345 primary_exitx(EX_TEMPFAIL, 346 "Unable to allocate %zu bytes of memory for send lists.", 347 sizeof(hio_send_list[0]) * ncomps); 348 } 349 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 350 if (hio_send_list_lock == NULL) { 351 primary_exitx(EX_TEMPFAIL, 352 "Unable to allocate %zu bytes of memory for send list locks.", 353 sizeof(hio_send_list_lock[0]) * ncomps); 354 } 355 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 356 if (hio_send_list_cond == NULL) { 357 primary_exitx(EX_TEMPFAIL, 358 "Unable to allocate %zu bytes of memory for send list condition variables.", 359 sizeof(hio_send_list_cond[0]) * ncomps); 360 } 361 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 362 if (hio_recv_list == NULL) { 363 primary_exitx(EX_TEMPFAIL, 364 "Unable to allocate %zu bytes of memory for recv lists.", 365 sizeof(hio_recv_list[0]) * ncomps); 366 } 367 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 368 if (hio_recv_list_lock == NULL) { 369 primary_exitx(EX_TEMPFAIL, 370 "Unable to allocate %zu bytes of memory for recv list locks.", 371 sizeof(hio_recv_list_lock[0]) * ncomps); 372 } 373 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 374 if (hio_recv_list_cond == NULL) { 375 primary_exitx(EX_TEMPFAIL, 376 "Unable to allocate %zu bytes of memory for recv list condition variables.", 377 sizeof(hio_recv_list_cond[0]) * ncomps); 378 } 379 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 380 if (hio_remote_lock == NULL) { 381 primary_exitx(EX_TEMPFAIL, 382 "Unable to allocate %zu bytes of memory for remote connections locks.", 383 sizeof(hio_remote_lock[0]) * ncomps); 384 } 385 386 /* 387 * Initialize lists, their locks and theirs condition variables. 388 */ 389 TAILQ_INIT(&hio_free_list); 390 mtx_init(&hio_free_list_lock); 391 cv_init(&hio_free_list_cond); 392 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 393 TAILQ_INIT(&hio_send_list[ii]); 394 mtx_init(&hio_send_list_lock[ii]); 395 cv_init(&hio_send_list_cond[ii]); 396 TAILQ_INIT(&hio_recv_list[ii]); 397 mtx_init(&hio_recv_list_lock[ii]); 398 cv_init(&hio_recv_list_cond[ii]); 399 rw_init(&hio_remote_lock[ii]); 400 } 401 TAILQ_INIT(&hio_done_list); 402 mtx_init(&hio_done_list_lock); 403 cv_init(&hio_done_list_cond); 404 mtx_init(&metadata_lock); 405 406 /* 407 * Allocate requests pool and initialize requests. 408 */ 409 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 410 hio = malloc(sizeof(*hio)); 411 if (hio == NULL) { 412 primary_exitx(EX_TEMPFAIL, 413 "Unable to allocate %zu bytes of memory for hio request.", 414 sizeof(*hio)); 415 } 416 hio->hio_countdown = 0; 417 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 418 if (hio->hio_errors == NULL) { 419 primary_exitx(EX_TEMPFAIL, 420 "Unable allocate %zu bytes of memory for hio errors.", 421 sizeof(hio->hio_errors[0]) * ncomps); 422 } 423 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 424 if (hio->hio_next == NULL) { 425 primary_exitx(EX_TEMPFAIL, 426 "Unable allocate %zu bytes of memory for hio_next field.", 427 sizeof(hio->hio_next[0]) * ncomps); 428 } 429 hio->hio_ggio.gctl_version = G_GATE_VERSION; 430 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 431 if (hio->hio_ggio.gctl_data == NULL) { 432 primary_exitx(EX_TEMPFAIL, 433 "Unable to allocate %zu bytes of memory for gctl_data.", 434 MAXPHYS); 435 } 436 hio->hio_ggio.gctl_length = MAXPHYS; 437 hio->hio_ggio.gctl_error = 0; 438 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 439 } 440 } 441 442 static bool 443 init_resuid(struct hast_resource *res) 444 { 445 446 mtx_lock(&metadata_lock); 447 if (res->hr_resuid != 0) { 448 mtx_unlock(&metadata_lock); 449 return (false); 450 } else { 451 /* Initialize unique resource identifier. */ 452 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 453 mtx_unlock(&metadata_lock); 454 if (metadata_write(res) < 0) 455 exit(EX_NOINPUT); 456 return (true); 457 } 458 } 459 460 static void 461 init_local(struct hast_resource *res) 462 { 463 unsigned char *buf; 464 size_t mapsize; 465 466 if (metadata_read(res, true) < 0) 467 exit(EX_NOINPUT); 468 mtx_init(&res->hr_amp_lock); 469 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 470 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 471 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 472 } 473 mtx_init(&range_lock); 474 cv_init(&range_regular_cond); 475 if (rangelock_init(&range_regular) < 0) 476 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 477 cv_init(&range_sync_cond); 478 if (rangelock_init(&range_sync) < 0) 479 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 480 mapsize = activemap_ondisk_size(res->hr_amp); 481 buf = calloc(1, mapsize); 482 if (buf == NULL) { 483 primary_exitx(EX_TEMPFAIL, 484 "Unable to allocate buffer for activemap."); 485 } 486 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 487 (ssize_t)mapsize) { 488 primary_exit(EX_NOINPUT, "Unable to read activemap"); 489 } 490 activemap_copyin(res->hr_amp, buf, mapsize); 491 free(buf); 492 if (res->hr_resuid != 0) 493 return; 494 /* 495 * We're using provider for the first time. Initialize local and remote 496 * counters. We don't initialize resuid here, as we want to do it just 497 * in time. The reason for this is that we want to inform secondary 498 * that there were no writes yet, so there is no need to synchronize 499 * anything. 500 */ 501 res->hr_primary_localcnt = 0; 502 res->hr_primary_remotecnt = 0; 503 if (metadata_write(res) < 0) 504 exit(EX_NOINPUT); 505 } 506 507 static int 508 primary_connect(struct hast_resource *res, struct proto_conn **connp) 509 { 510 struct proto_conn *conn; 511 int16_t val; 512 513 val = 1; 514 if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) { 515 primary_exit(EX_TEMPFAIL, 516 "Unable to send connection request to parent"); 517 } 518 if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) { 519 primary_exit(EX_TEMPFAIL, 520 "Unable to receive reply to connection request from parent"); 521 } 522 if (val != 0) { 523 errno = val; 524 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 525 res->hr_remoteaddr); 526 return (-1); 527 } 528 if (proto_connection_recv(res->hr_conn, true, &conn) < 0) { 529 primary_exit(EX_TEMPFAIL, 530 "Unable to receive connection from parent"); 531 } 532 if (proto_connect_wait(conn, res->hr_timeout) < 0) { 533 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 534 res->hr_remoteaddr); 535 proto_close(conn); 536 return (-1); 537 } 538 /* Error in setting timeout is not critical, but why should it fail? */ 539 if (proto_timeout(conn, res->hr_timeout) < 0) 540 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 541 542 *connp = conn; 543 544 return (0); 545 } 546 547 static int 548 init_remote(struct hast_resource *res, struct proto_conn **inp, 549 struct proto_conn **outp) 550 { 551 struct proto_conn *in, *out; 552 struct nv *nvout, *nvin; 553 const unsigned char *token; 554 unsigned char *map; 555 const char *errmsg; 556 int32_t extentsize; 557 int64_t datasize; 558 uint32_t mapsize; 559 size_t size; 560 int error; 561 562 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 563 PJDLOG_ASSERT(real_remote(res)); 564 565 in = out = NULL; 566 errmsg = NULL; 567 568 if (primary_connect(res, &out) == -1) 569 return (ECONNREFUSED); 570 571 error = ECONNABORTED; 572 573 /* 574 * First handshake step. 575 * Setup outgoing connection with remote node. 576 */ 577 nvout = nv_alloc(); 578 nv_add_string(nvout, res->hr_name, "resource"); 579 if (nv_error(nvout) != 0) { 580 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 581 "Unable to allocate header for connection with %s", 582 res->hr_remoteaddr); 583 nv_free(nvout); 584 goto close; 585 } 586 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 587 pjdlog_errno(LOG_WARNING, 588 "Unable to send handshake header to %s", 589 res->hr_remoteaddr); 590 nv_free(nvout); 591 goto close; 592 } 593 nv_free(nvout); 594 if (hast_proto_recv_hdr(out, &nvin) < 0) { 595 pjdlog_errno(LOG_WARNING, 596 "Unable to receive handshake header from %s", 597 res->hr_remoteaddr); 598 goto close; 599 } 600 errmsg = nv_get_string(nvin, "errmsg"); 601 if (errmsg != NULL) { 602 pjdlog_warning("%s", errmsg); 603 if (nv_exists(nvin, "wait")) 604 error = EBUSY; 605 nv_free(nvin); 606 goto close; 607 } 608 token = nv_get_uint8_array(nvin, &size, "token"); 609 if (token == NULL) { 610 pjdlog_warning("Handshake header from %s has no 'token' field.", 611 res->hr_remoteaddr); 612 nv_free(nvin); 613 goto close; 614 } 615 if (size != sizeof(res->hr_token)) { 616 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 617 res->hr_remoteaddr, size, sizeof(res->hr_token)); 618 nv_free(nvin); 619 goto close; 620 } 621 bcopy(token, res->hr_token, sizeof(res->hr_token)); 622 nv_free(nvin); 623 624 /* 625 * Second handshake step. 626 * Setup incoming connection with remote node. 627 */ 628 if (primary_connect(res, &in) == -1) 629 goto close; 630 631 nvout = nv_alloc(); 632 nv_add_string(nvout, res->hr_name, "resource"); 633 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 634 "token"); 635 if (res->hr_resuid == 0) { 636 /* 637 * The resuid field was not yet initialized. 638 * Because we do synchronization inside init_resuid(), it is 639 * possible that someone already initialized it, the function 640 * will return false then, but if we successfully initialized 641 * it, we will get true. True means that there were no writes 642 * to this resource yet and we want to inform secondary that 643 * synchronization is not needed by sending "virgin" argument. 644 */ 645 if (init_resuid(res)) 646 nv_add_int8(nvout, 1, "virgin"); 647 } 648 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 649 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 650 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 651 if (nv_error(nvout) != 0) { 652 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 653 "Unable to allocate header for connection with %s", 654 res->hr_remoteaddr); 655 nv_free(nvout); 656 goto close; 657 } 658 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 659 pjdlog_errno(LOG_WARNING, 660 "Unable to send handshake header to %s", 661 res->hr_remoteaddr); 662 nv_free(nvout); 663 goto close; 664 } 665 nv_free(nvout); 666 if (hast_proto_recv_hdr(out, &nvin) < 0) { 667 pjdlog_errno(LOG_WARNING, 668 "Unable to receive handshake header from %s", 669 res->hr_remoteaddr); 670 goto close; 671 } 672 errmsg = nv_get_string(nvin, "errmsg"); 673 if (errmsg != NULL) { 674 pjdlog_warning("%s", errmsg); 675 nv_free(nvin); 676 goto close; 677 } 678 datasize = nv_get_int64(nvin, "datasize"); 679 if (datasize != res->hr_datasize) { 680 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 681 (intmax_t)res->hr_datasize, (intmax_t)datasize); 682 nv_free(nvin); 683 goto close; 684 } 685 extentsize = nv_get_int32(nvin, "extentsize"); 686 if (extentsize != res->hr_extentsize) { 687 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 688 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 689 nv_free(nvin); 690 goto close; 691 } 692 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 693 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 694 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 695 if (nv_exists(nvin, "virgin")) { 696 /* 697 * Secondary was reinitialized, bump localcnt if it is 0 as 698 * only we have the data. 699 */ 700 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 701 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 702 703 if (res->hr_primary_localcnt == 0) { 704 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 705 706 mtx_lock(&metadata_lock); 707 res->hr_primary_localcnt++; 708 pjdlog_debug(1, "Increasing localcnt to %ju.", 709 (uintmax_t)res->hr_primary_localcnt); 710 (void)metadata_write(res); 711 mtx_unlock(&metadata_lock); 712 } 713 } 714 map = NULL; 715 mapsize = nv_get_uint32(nvin, "mapsize"); 716 if (mapsize > 0) { 717 map = malloc(mapsize); 718 if (map == NULL) { 719 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 720 (uintmax_t)mapsize); 721 nv_free(nvin); 722 goto close; 723 } 724 /* 725 * Remote node have some dirty extents on its own, lets 726 * download its activemap. 727 */ 728 if (hast_proto_recv_data(res, out, nvin, map, 729 mapsize) < 0) { 730 pjdlog_errno(LOG_ERR, 731 "Unable to receive remote activemap"); 732 nv_free(nvin); 733 free(map); 734 goto close; 735 } 736 /* 737 * Merge local and remote bitmaps. 738 */ 739 activemap_merge(res->hr_amp, map, mapsize); 740 free(map); 741 /* 742 * Now that we merged bitmaps from both nodes, flush it to the 743 * disk before we start to synchronize. 744 */ 745 (void)hast_activemap_flush(res); 746 } 747 nv_free(nvin); 748 #ifdef notyet 749 /* Setup directions. */ 750 if (proto_send(out, NULL, 0) == -1) 751 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 752 if (proto_recv(in, NULL, 0) == -1) 753 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 754 #endif 755 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 756 if (inp != NULL && outp != NULL) { 757 *inp = in; 758 *outp = out; 759 } else { 760 res->hr_remotein = in; 761 res->hr_remoteout = out; 762 } 763 event_send(res, EVENT_CONNECT); 764 return (0); 765 close: 766 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 767 event_send(res, EVENT_SPLITBRAIN); 768 proto_close(out); 769 if (in != NULL) 770 proto_close(in); 771 return (error); 772 } 773 774 static void 775 sync_start(void) 776 { 777 778 mtx_lock(&sync_lock); 779 sync_inprogress = true; 780 mtx_unlock(&sync_lock); 781 cv_signal(&sync_cond); 782 } 783 784 static void 785 sync_stop(void) 786 { 787 788 mtx_lock(&sync_lock); 789 if (sync_inprogress) 790 sync_inprogress = false; 791 mtx_unlock(&sync_lock); 792 } 793 794 static void 795 init_ggate(struct hast_resource *res) 796 { 797 struct g_gate_ctl_create ggiocreate; 798 struct g_gate_ctl_cancel ggiocancel; 799 800 /* 801 * We communicate with ggate via /dev/ggctl. Open it. 802 */ 803 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 804 if (res->hr_ggatefd < 0) 805 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 806 /* 807 * Create provider before trying to connect, as connection failure 808 * is not critical, but may take some time. 809 */ 810 bzero(&ggiocreate, sizeof(ggiocreate)); 811 ggiocreate.gctl_version = G_GATE_VERSION; 812 ggiocreate.gctl_mediasize = res->hr_datasize; 813 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 814 ggiocreate.gctl_flags = 0; 815 ggiocreate.gctl_maxcount = 0; 816 ggiocreate.gctl_timeout = 0; 817 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 818 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 819 res->hr_provname); 820 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 821 pjdlog_info("Device hast/%s created.", res->hr_provname); 822 res->hr_ggateunit = ggiocreate.gctl_unit; 823 return; 824 } 825 if (errno != EEXIST) { 826 primary_exit(EX_OSERR, "Unable to create hast/%s device", 827 res->hr_provname); 828 } 829 pjdlog_debug(1, 830 "Device hast/%s already exists, we will try to take it over.", 831 res->hr_provname); 832 /* 833 * If we received EEXIST, we assume that the process who created the 834 * provider died and didn't clean up. In that case we will start from 835 * where he left of. 836 */ 837 bzero(&ggiocancel, sizeof(ggiocancel)); 838 ggiocancel.gctl_version = G_GATE_VERSION; 839 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 840 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 841 res->hr_provname); 842 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 843 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 844 res->hr_ggateunit = ggiocancel.gctl_unit; 845 return; 846 } 847 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 848 res->hr_provname); 849 } 850 851 void 852 hastd_primary(struct hast_resource *res) 853 { 854 pthread_t td; 855 pid_t pid; 856 int error, mode, debuglevel; 857 858 /* 859 * Create communication channel for sending control commands from 860 * parent to child. 861 */ 862 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) { 863 /* TODO: There's no need for this to be fatal error. */ 864 KEEP_ERRNO((void)pidfile_remove(pfh)); 865 pjdlog_exit(EX_OSERR, 866 "Unable to create control sockets between parent and child"); 867 } 868 /* 869 * Create communication channel for sending events from child to parent. 870 */ 871 if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) { 872 /* TODO: There's no need for this to be fatal error. */ 873 KEEP_ERRNO((void)pidfile_remove(pfh)); 874 pjdlog_exit(EX_OSERR, 875 "Unable to create event sockets between child and parent"); 876 } 877 /* 878 * Create communication channel for sending connection requests from 879 * child to parent. 880 */ 881 if (proto_client(NULL, "socketpair://", &res->hr_conn) < 0) { 882 /* TODO: There's no need for this to be fatal error. */ 883 KEEP_ERRNO((void)pidfile_remove(pfh)); 884 pjdlog_exit(EX_OSERR, 885 "Unable to create connection sockets between child and parent"); 886 } 887 888 pid = fork(); 889 if (pid < 0) { 890 /* TODO: There's no need for this to be fatal error. */ 891 KEEP_ERRNO((void)pidfile_remove(pfh)); 892 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 893 } 894 895 if (pid > 0) { 896 /* This is parent. */ 897 /* Declare that we are receiver. */ 898 proto_recv(res->hr_event, NULL, 0); 899 proto_recv(res->hr_conn, NULL, 0); 900 /* Declare that we are sender. */ 901 proto_send(res->hr_ctrl, NULL, 0); 902 res->hr_workerpid = pid; 903 return; 904 } 905 906 gres = res; 907 mode = pjdlog_mode_get(); 908 debuglevel = pjdlog_debug_get(); 909 910 /* Declare that we are sender. */ 911 proto_send(res->hr_event, NULL, 0); 912 proto_send(res->hr_conn, NULL, 0); 913 /* Declare that we are receiver. */ 914 proto_recv(res->hr_ctrl, NULL, 0); 915 descriptors_cleanup(res); 916 917 descriptors_assert(res, mode); 918 919 pjdlog_init(mode); 920 pjdlog_debug_set(debuglevel); 921 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 922 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 923 924 init_local(res); 925 init_ggate(res); 926 init_environment(res); 927 928 if (drop_privs(res) != 0) { 929 cleanup(res); 930 exit(EX_CONFIG); 931 } 932 pjdlog_info("Privileges successfully dropped."); 933 934 /* 935 * Create the guard thread first, so we can handle signals from the 936 * very begining. 937 */ 938 error = pthread_create(&td, NULL, guard_thread, res); 939 PJDLOG_ASSERT(error == 0); 940 /* 941 * Create the control thread before sending any event to the parent, 942 * as we can deadlock when parent sends control request to worker, 943 * but worker has no control thread started yet, so parent waits. 944 * In the meantime worker sends an event to the parent, but parent 945 * is unable to handle the event, because it waits for control 946 * request response. 947 */ 948 error = pthread_create(&td, NULL, ctrl_thread, res); 949 PJDLOG_ASSERT(error == 0); 950 if (real_remote(res)) { 951 error = init_remote(res, NULL, NULL); 952 if (error == 0) { 953 sync_start(); 954 } else if (error == EBUSY) { 955 time_t start = time(NULL); 956 957 pjdlog_warning("Waiting for remote node to become %s for %ds.", 958 role2str(HAST_ROLE_SECONDARY), 959 res->hr_timeout); 960 for (;;) { 961 sleep(1); 962 error = init_remote(res, NULL, NULL); 963 if (error != EBUSY) 964 break; 965 if (time(NULL) > start + res->hr_timeout) 966 break; 967 } 968 if (error == EBUSY) { 969 pjdlog_warning("Remote node is still %s, starting anyway.", 970 role2str(HAST_ROLE_PRIMARY)); 971 } 972 } 973 } 974 error = pthread_create(&td, NULL, ggate_recv_thread, res); 975 PJDLOG_ASSERT(error == 0); 976 error = pthread_create(&td, NULL, local_send_thread, res); 977 PJDLOG_ASSERT(error == 0); 978 error = pthread_create(&td, NULL, remote_send_thread, res); 979 PJDLOG_ASSERT(error == 0); 980 error = pthread_create(&td, NULL, remote_recv_thread, res); 981 PJDLOG_ASSERT(error == 0); 982 error = pthread_create(&td, NULL, ggate_send_thread, res); 983 PJDLOG_ASSERT(error == 0); 984 fullystarted = true; 985 (void)sync_thread(res); 986 } 987 988 static void 989 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 990 { 991 char msg[1024]; 992 va_list ap; 993 int len; 994 995 va_start(ap, fmt); 996 len = vsnprintf(msg, sizeof(msg), fmt, ap); 997 va_end(ap); 998 if ((size_t)len < sizeof(msg)) { 999 switch (ggio->gctl_cmd) { 1000 case BIO_READ: 1001 (void)snprintf(msg + len, sizeof(msg) - len, 1002 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 1003 (uintmax_t)ggio->gctl_length); 1004 break; 1005 case BIO_DELETE: 1006 (void)snprintf(msg + len, sizeof(msg) - len, 1007 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 1008 (uintmax_t)ggio->gctl_length); 1009 break; 1010 case BIO_FLUSH: 1011 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 1012 break; 1013 case BIO_WRITE: 1014 (void)snprintf(msg + len, sizeof(msg) - len, 1015 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 1016 (uintmax_t)ggio->gctl_length); 1017 break; 1018 default: 1019 (void)snprintf(msg + len, sizeof(msg) - len, 1020 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 1021 break; 1022 } 1023 } 1024 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1025 } 1026 1027 static void 1028 remote_close(struct hast_resource *res, int ncomp) 1029 { 1030 1031 rw_wlock(&hio_remote_lock[ncomp]); 1032 /* 1033 * Check for a race between dropping rlock and acquiring wlock - 1034 * another thread can close connection in-between. 1035 */ 1036 if (!ISCONNECTED(res, ncomp)) { 1037 PJDLOG_ASSERT(res->hr_remotein == NULL); 1038 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1039 rw_unlock(&hio_remote_lock[ncomp]); 1040 return; 1041 } 1042 1043 PJDLOG_ASSERT(res->hr_remotein != NULL); 1044 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1045 1046 pjdlog_debug(2, "Closing incoming connection to %s.", 1047 res->hr_remoteaddr); 1048 proto_close(res->hr_remotein); 1049 res->hr_remotein = NULL; 1050 pjdlog_debug(2, "Closing outgoing connection to %s.", 1051 res->hr_remoteaddr); 1052 proto_close(res->hr_remoteout); 1053 res->hr_remoteout = NULL; 1054 1055 rw_unlock(&hio_remote_lock[ncomp]); 1056 1057 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1058 1059 /* 1060 * Stop synchronization if in-progress. 1061 */ 1062 sync_stop(); 1063 1064 event_send(res, EVENT_DISCONNECT); 1065 } 1066 1067 /* 1068 * Acknowledge write completion to the kernel, but don't update activemap yet. 1069 */ 1070 static void 1071 write_complete(struct hast_resource *res, struct hio *hio) 1072 { 1073 struct g_gate_ctl_io *ggio; 1074 unsigned int ncomp; 1075 1076 PJDLOG_ASSERT(!hio->hio_done); 1077 1078 ggio = &hio->hio_ggio; 1079 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1080 1081 /* 1082 * Bump local count if this is first write after 1083 * connection failure with remote node. 1084 */ 1085 ncomp = 1; 1086 rw_rlock(&hio_remote_lock[ncomp]); 1087 if (!ISCONNECTED(res, ncomp)) { 1088 mtx_lock(&metadata_lock); 1089 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1090 res->hr_primary_localcnt++; 1091 pjdlog_debug(1, "Increasing localcnt to %ju.", 1092 (uintmax_t)res->hr_primary_localcnt); 1093 (void)metadata_write(res); 1094 } 1095 mtx_unlock(&metadata_lock); 1096 } 1097 rw_unlock(&hio_remote_lock[ncomp]); 1098 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1099 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1100 hio->hio_done = true; 1101 } 1102 1103 /* 1104 * Thread receives ggate I/O requests from the kernel and passes them to 1105 * appropriate threads: 1106 * WRITE - always goes to both local_send and remote_send threads 1107 * READ (when the block is up-to-date on local component) - 1108 * only local_send thread 1109 * READ (when the block isn't up-to-date on local component) - 1110 * only remote_send thread 1111 * DELETE - always goes to both local_send and remote_send threads 1112 * FLUSH - always goes to both local_send and remote_send threads 1113 */ 1114 static void * 1115 ggate_recv_thread(void *arg) 1116 { 1117 struct hast_resource *res = arg; 1118 struct g_gate_ctl_io *ggio; 1119 struct hio *hio; 1120 unsigned int ii, ncomp, ncomps; 1121 int error; 1122 1123 for (;;) { 1124 pjdlog_debug(2, "ggate_recv: Taking free request."); 1125 QUEUE_TAKE2(hio, free); 1126 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1127 ggio = &hio->hio_ggio; 1128 ggio->gctl_unit = res->hr_ggateunit; 1129 ggio->gctl_length = MAXPHYS; 1130 ggio->gctl_error = 0; 1131 hio->hio_done = false; 1132 hio->hio_replication = res->hr_replication; 1133 pjdlog_debug(2, 1134 "ggate_recv: (%p) Waiting for request from the kernel.", 1135 hio); 1136 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 1137 if (sigexit_received) 1138 pthread_exit(NULL); 1139 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1140 } 1141 error = ggio->gctl_error; 1142 switch (error) { 1143 case 0: 1144 break; 1145 case ECANCELED: 1146 /* Exit gracefully. */ 1147 if (!sigexit_received) { 1148 pjdlog_debug(2, 1149 "ggate_recv: (%p) Received cancel from the kernel.", 1150 hio); 1151 pjdlog_info("Received cancel from the kernel, exiting."); 1152 } 1153 pthread_exit(NULL); 1154 case ENOMEM: 1155 /* 1156 * Buffer too small? Impossible, we allocate MAXPHYS 1157 * bytes - request can't be bigger than that. 1158 */ 1159 /* FALLTHROUGH */ 1160 case ENXIO: 1161 default: 1162 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1163 strerror(error)); 1164 } 1165 1166 ncomp = 0; 1167 ncomps = HAST_NCOMPONENTS; 1168 1169 for (ii = 0; ii < ncomps; ii++) 1170 hio->hio_errors[ii] = EINVAL; 1171 reqlog(LOG_DEBUG, 2, ggio, 1172 "ggate_recv: (%p) Request received from the kernel: ", 1173 hio); 1174 1175 /* 1176 * Inform all components about new write request. 1177 * For read request prefer local component unless the given 1178 * range is out-of-date, then use remote component. 1179 */ 1180 switch (ggio->gctl_cmd) { 1181 case BIO_READ: 1182 res->hr_stat_read++; 1183 ncomps = 1; 1184 mtx_lock(&metadata_lock); 1185 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1186 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1187 /* 1188 * This range is up-to-date on local component, 1189 * so handle request locally. 1190 */ 1191 /* Local component is 0 for now. */ 1192 ncomp = 0; 1193 } else /* if (res->hr_syncsrc == 1194 HAST_SYNCSRC_SECONDARY) */ { 1195 PJDLOG_ASSERT(res->hr_syncsrc == 1196 HAST_SYNCSRC_SECONDARY); 1197 /* 1198 * This range is out-of-date on local component, 1199 * so send request to the remote node. 1200 */ 1201 /* Remote component is 1 for now. */ 1202 ncomp = 1; 1203 } 1204 mtx_unlock(&metadata_lock); 1205 break; 1206 case BIO_WRITE: 1207 res->hr_stat_write++; 1208 if (res->hr_resuid == 0 && 1209 res->hr_primary_localcnt == 0) { 1210 /* This is first write. */ 1211 res->hr_primary_localcnt = 1; 1212 } 1213 for (;;) { 1214 mtx_lock(&range_lock); 1215 if (rangelock_islocked(range_sync, 1216 ggio->gctl_offset, ggio->gctl_length)) { 1217 pjdlog_debug(2, 1218 "regular: Range offset=%jd length=%zu locked.", 1219 (intmax_t)ggio->gctl_offset, 1220 (size_t)ggio->gctl_length); 1221 range_regular_wait = true; 1222 cv_wait(&range_regular_cond, &range_lock); 1223 range_regular_wait = false; 1224 mtx_unlock(&range_lock); 1225 continue; 1226 } 1227 if (rangelock_add(range_regular, 1228 ggio->gctl_offset, ggio->gctl_length) < 0) { 1229 mtx_unlock(&range_lock); 1230 pjdlog_debug(2, 1231 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1232 (intmax_t)ggio->gctl_offset, 1233 (size_t)ggio->gctl_length); 1234 sleep(1); 1235 continue; 1236 } 1237 mtx_unlock(&range_lock); 1238 break; 1239 } 1240 mtx_lock(&res->hr_amp_lock); 1241 if (activemap_write_start(res->hr_amp, 1242 ggio->gctl_offset, ggio->gctl_length)) { 1243 res->hr_stat_activemap_update++; 1244 (void)hast_activemap_flush(res); 1245 } 1246 mtx_unlock(&res->hr_amp_lock); 1247 break; 1248 case BIO_DELETE: 1249 res->hr_stat_delete++; 1250 break; 1251 case BIO_FLUSH: 1252 res->hr_stat_flush++; 1253 break; 1254 } 1255 pjdlog_debug(2, 1256 "ggate_recv: (%p) Moving request to the send queues.", hio); 1257 refcount_init(&hio->hio_countdown, ncomps); 1258 for (ii = ncomp; ii < ncomps; ii++) 1259 QUEUE_INSERT1(hio, send, ii); 1260 } 1261 /* NOTREACHED */ 1262 return (NULL); 1263 } 1264 1265 /* 1266 * Thread reads from or writes to local component. 1267 * If local read fails, it redirects it to remote_send thread. 1268 */ 1269 static void * 1270 local_send_thread(void *arg) 1271 { 1272 struct hast_resource *res = arg; 1273 struct g_gate_ctl_io *ggio; 1274 struct hio *hio; 1275 unsigned int ncomp, rncomp; 1276 ssize_t ret; 1277 1278 /* Local component is 0 for now. */ 1279 ncomp = 0; 1280 /* Remote component is 1 for now. */ 1281 rncomp = 1; 1282 1283 for (;;) { 1284 pjdlog_debug(2, "local_send: Taking request."); 1285 QUEUE_TAKE1(hio, send, ncomp, 0); 1286 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1287 ggio = &hio->hio_ggio; 1288 switch (ggio->gctl_cmd) { 1289 case BIO_READ: 1290 ret = pread(res->hr_localfd, ggio->gctl_data, 1291 ggio->gctl_length, 1292 ggio->gctl_offset + res->hr_localoff); 1293 if (ret == ggio->gctl_length) 1294 hio->hio_errors[ncomp] = 0; 1295 else if (!ISSYNCREQ(hio)) { 1296 /* 1297 * If READ failed, try to read from remote node. 1298 */ 1299 if (ret < 0) { 1300 reqlog(LOG_WARNING, 0, ggio, 1301 "Local request failed (%s), trying remote node. ", 1302 strerror(errno)); 1303 } else if (ret != ggio->gctl_length) { 1304 reqlog(LOG_WARNING, 0, ggio, 1305 "Local request failed (%zd != %jd), trying remote node. ", 1306 ret, (intmax_t)ggio->gctl_length); 1307 } 1308 QUEUE_INSERT1(hio, send, rncomp); 1309 continue; 1310 } 1311 break; 1312 case BIO_WRITE: 1313 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1314 ggio->gctl_length, 1315 ggio->gctl_offset + res->hr_localoff); 1316 if (ret < 0) { 1317 hio->hio_errors[ncomp] = errno; 1318 reqlog(LOG_WARNING, 0, ggio, 1319 "Local request failed (%s): ", 1320 strerror(errno)); 1321 } else if (ret != ggio->gctl_length) { 1322 hio->hio_errors[ncomp] = EIO; 1323 reqlog(LOG_WARNING, 0, ggio, 1324 "Local request failed (%zd != %jd): ", 1325 ret, (intmax_t)ggio->gctl_length); 1326 } else { 1327 hio->hio_errors[ncomp] = 0; 1328 if (hio->hio_replication == 1329 HAST_REPLICATION_ASYNC) { 1330 ggio->gctl_error = 0; 1331 write_complete(res, hio); 1332 } 1333 } 1334 break; 1335 case BIO_DELETE: 1336 ret = g_delete(res->hr_localfd, 1337 ggio->gctl_offset + res->hr_localoff, 1338 ggio->gctl_length); 1339 if (ret < 0) { 1340 hio->hio_errors[ncomp] = errno; 1341 reqlog(LOG_WARNING, 0, ggio, 1342 "Local request failed (%s): ", 1343 strerror(errno)); 1344 } else { 1345 hio->hio_errors[ncomp] = 0; 1346 } 1347 break; 1348 case BIO_FLUSH: 1349 if (!res->hr_localflush) { 1350 ret = -1; 1351 errno = EOPNOTSUPP; 1352 break; 1353 } 1354 ret = g_flush(res->hr_localfd); 1355 if (ret < 0) { 1356 if (errno == EOPNOTSUPP) 1357 res->hr_localflush = false; 1358 hio->hio_errors[ncomp] = errno; 1359 reqlog(LOG_WARNING, 0, ggio, 1360 "Local request failed (%s): ", 1361 strerror(errno)); 1362 } else { 1363 hio->hio_errors[ncomp] = 0; 1364 } 1365 break; 1366 } 1367 if (!refcount_release(&hio->hio_countdown)) 1368 continue; 1369 if (ISSYNCREQ(hio)) { 1370 mtx_lock(&sync_lock); 1371 SYNCREQDONE(hio); 1372 mtx_unlock(&sync_lock); 1373 cv_signal(&sync_cond); 1374 } else { 1375 pjdlog_debug(2, 1376 "local_send: (%p) Moving request to the done queue.", 1377 hio); 1378 QUEUE_INSERT2(hio, done); 1379 } 1380 } 1381 /* NOTREACHED */ 1382 return (NULL); 1383 } 1384 1385 static void 1386 keepalive_send(struct hast_resource *res, unsigned int ncomp) 1387 { 1388 struct nv *nv; 1389 1390 rw_rlock(&hio_remote_lock[ncomp]); 1391 1392 if (!ISCONNECTED(res, ncomp)) { 1393 rw_unlock(&hio_remote_lock[ncomp]); 1394 return; 1395 } 1396 1397 PJDLOG_ASSERT(res->hr_remotein != NULL); 1398 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1399 1400 nv = nv_alloc(); 1401 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1402 if (nv_error(nv) != 0) { 1403 rw_unlock(&hio_remote_lock[ncomp]); 1404 nv_free(nv); 1405 pjdlog_debug(1, 1406 "keepalive_send: Unable to prepare header to send."); 1407 return; 1408 } 1409 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1410 rw_unlock(&hio_remote_lock[ncomp]); 1411 pjdlog_common(LOG_DEBUG, 1, errno, 1412 "keepalive_send: Unable to send request"); 1413 nv_free(nv); 1414 remote_close(res, ncomp); 1415 return; 1416 } 1417 1418 rw_unlock(&hio_remote_lock[ncomp]); 1419 nv_free(nv); 1420 pjdlog_debug(2, "keepalive_send: Request sent."); 1421 } 1422 1423 /* 1424 * Thread sends request to secondary node. 1425 */ 1426 static void * 1427 remote_send_thread(void *arg) 1428 { 1429 struct hast_resource *res = arg; 1430 struct g_gate_ctl_io *ggio; 1431 time_t lastcheck, now; 1432 struct hio *hio; 1433 struct nv *nv; 1434 unsigned int ncomp; 1435 bool wakeup; 1436 uint64_t offset, length; 1437 uint8_t cmd; 1438 void *data; 1439 1440 /* Remote component is 1 for now. */ 1441 ncomp = 1; 1442 lastcheck = time(NULL); 1443 1444 for (;;) { 1445 pjdlog_debug(2, "remote_send: Taking request."); 1446 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1447 if (hio == NULL) { 1448 now = time(NULL); 1449 if (lastcheck + HAST_KEEPALIVE <= now) { 1450 keepalive_send(res, ncomp); 1451 lastcheck = now; 1452 } 1453 continue; 1454 } 1455 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1456 ggio = &hio->hio_ggio; 1457 switch (ggio->gctl_cmd) { 1458 case BIO_READ: 1459 cmd = HIO_READ; 1460 data = NULL; 1461 offset = ggio->gctl_offset; 1462 length = ggio->gctl_length; 1463 break; 1464 case BIO_WRITE: 1465 cmd = HIO_WRITE; 1466 data = ggio->gctl_data; 1467 offset = ggio->gctl_offset; 1468 length = ggio->gctl_length; 1469 break; 1470 case BIO_DELETE: 1471 cmd = HIO_DELETE; 1472 data = NULL; 1473 offset = ggio->gctl_offset; 1474 length = ggio->gctl_length; 1475 break; 1476 case BIO_FLUSH: 1477 cmd = HIO_FLUSH; 1478 data = NULL; 1479 offset = 0; 1480 length = 0; 1481 break; 1482 default: 1483 PJDLOG_ABORT("invalid condition"); 1484 } 1485 nv = nv_alloc(); 1486 nv_add_uint8(nv, cmd, "cmd"); 1487 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1488 nv_add_uint64(nv, offset, "offset"); 1489 nv_add_uint64(nv, length, "length"); 1490 if (nv_error(nv) != 0) { 1491 hio->hio_errors[ncomp] = nv_error(nv); 1492 pjdlog_debug(2, 1493 "remote_send: (%p) Unable to prepare header to send.", 1494 hio); 1495 reqlog(LOG_ERR, 0, ggio, 1496 "Unable to prepare header to send (%s): ", 1497 strerror(nv_error(nv))); 1498 /* Move failed request immediately to the done queue. */ 1499 goto done_queue; 1500 } 1501 /* 1502 * Protect connection from disappearing. 1503 */ 1504 rw_rlock(&hio_remote_lock[ncomp]); 1505 if (!ISCONNECTED(res, ncomp)) { 1506 rw_unlock(&hio_remote_lock[ncomp]); 1507 hio->hio_errors[ncomp] = ENOTCONN; 1508 goto done_queue; 1509 } 1510 /* 1511 * Move the request to recv queue before sending it, because 1512 * in different order we can get reply before we move request 1513 * to recv queue. 1514 */ 1515 pjdlog_debug(2, 1516 "remote_send: (%p) Moving request to the recv queue.", 1517 hio); 1518 mtx_lock(&hio_recv_list_lock[ncomp]); 1519 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1520 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1521 mtx_unlock(&hio_recv_list_lock[ncomp]); 1522 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1523 data != NULL ? length : 0) < 0) { 1524 hio->hio_errors[ncomp] = errno; 1525 rw_unlock(&hio_remote_lock[ncomp]); 1526 pjdlog_debug(2, 1527 "remote_send: (%p) Unable to send request.", hio); 1528 reqlog(LOG_ERR, 0, ggio, 1529 "Unable to send request (%s): ", 1530 strerror(hio->hio_errors[ncomp])); 1531 remote_close(res, ncomp); 1532 /* 1533 * Take request back from the receive queue and move 1534 * it immediately to the done queue. 1535 */ 1536 mtx_lock(&hio_recv_list_lock[ncomp]); 1537 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1538 hio_next[ncomp]); 1539 mtx_unlock(&hio_recv_list_lock[ncomp]); 1540 goto done_queue; 1541 } 1542 rw_unlock(&hio_remote_lock[ncomp]); 1543 nv_free(nv); 1544 if (wakeup) 1545 cv_signal(&hio_recv_list_cond[ncomp]); 1546 continue; 1547 done_queue: 1548 nv_free(nv); 1549 if (ISSYNCREQ(hio)) { 1550 if (!refcount_release(&hio->hio_countdown)) 1551 continue; 1552 mtx_lock(&sync_lock); 1553 SYNCREQDONE(hio); 1554 mtx_unlock(&sync_lock); 1555 cv_signal(&sync_cond); 1556 continue; 1557 } 1558 if (ggio->gctl_cmd == BIO_WRITE) { 1559 mtx_lock(&res->hr_amp_lock); 1560 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1561 ggio->gctl_length)) { 1562 (void)hast_activemap_flush(res); 1563 } 1564 mtx_unlock(&res->hr_amp_lock); 1565 } 1566 if (!refcount_release(&hio->hio_countdown)) 1567 continue; 1568 pjdlog_debug(2, 1569 "remote_send: (%p) Moving request to the done queue.", 1570 hio); 1571 QUEUE_INSERT2(hio, done); 1572 } 1573 /* NOTREACHED */ 1574 return (NULL); 1575 } 1576 1577 /* 1578 * Thread receives answer from secondary node and passes it to ggate_send 1579 * thread. 1580 */ 1581 static void * 1582 remote_recv_thread(void *arg) 1583 { 1584 struct hast_resource *res = arg; 1585 struct g_gate_ctl_io *ggio; 1586 struct hio *hio; 1587 struct nv *nv; 1588 unsigned int ncomp; 1589 uint64_t seq; 1590 int error; 1591 1592 /* Remote component is 1 for now. */ 1593 ncomp = 1; 1594 1595 for (;;) { 1596 /* Wait until there is anything to receive. */ 1597 mtx_lock(&hio_recv_list_lock[ncomp]); 1598 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1599 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1600 cv_wait(&hio_recv_list_cond[ncomp], 1601 &hio_recv_list_lock[ncomp]); 1602 } 1603 mtx_unlock(&hio_recv_list_lock[ncomp]); 1604 1605 rw_rlock(&hio_remote_lock[ncomp]); 1606 if (!ISCONNECTED(res, ncomp)) { 1607 rw_unlock(&hio_remote_lock[ncomp]); 1608 /* 1609 * Connection is dead, so move all pending requests to 1610 * the done queue (one-by-one). 1611 */ 1612 mtx_lock(&hio_recv_list_lock[ncomp]); 1613 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1614 PJDLOG_ASSERT(hio != NULL); 1615 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1616 hio_next[ncomp]); 1617 mtx_unlock(&hio_recv_list_lock[ncomp]); 1618 goto done_queue; 1619 } 1620 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1621 pjdlog_errno(LOG_ERR, 1622 "Unable to receive reply header"); 1623 rw_unlock(&hio_remote_lock[ncomp]); 1624 remote_close(res, ncomp); 1625 continue; 1626 } 1627 rw_unlock(&hio_remote_lock[ncomp]); 1628 seq = nv_get_uint64(nv, "seq"); 1629 if (seq == 0) { 1630 pjdlog_error("Header contains no 'seq' field."); 1631 nv_free(nv); 1632 continue; 1633 } 1634 mtx_lock(&hio_recv_list_lock[ncomp]); 1635 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1636 if (hio->hio_ggio.gctl_seq == seq) { 1637 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1638 hio_next[ncomp]); 1639 break; 1640 } 1641 } 1642 mtx_unlock(&hio_recv_list_lock[ncomp]); 1643 if (hio == NULL) { 1644 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1645 (uintmax_t)seq); 1646 nv_free(nv); 1647 continue; 1648 } 1649 ggio = &hio->hio_ggio; 1650 error = nv_get_int16(nv, "error"); 1651 if (error != 0) { 1652 /* Request failed on remote side. */ 1653 hio->hio_errors[ncomp] = error; 1654 reqlog(LOG_WARNING, 0, ggio, 1655 "Remote request failed (%s): ", strerror(error)); 1656 nv_free(nv); 1657 goto done_queue; 1658 } 1659 switch (ggio->gctl_cmd) { 1660 case BIO_READ: 1661 rw_rlock(&hio_remote_lock[ncomp]); 1662 if (!ISCONNECTED(res, ncomp)) { 1663 rw_unlock(&hio_remote_lock[ncomp]); 1664 nv_free(nv); 1665 goto done_queue; 1666 } 1667 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1668 ggio->gctl_data, ggio->gctl_length) < 0) { 1669 hio->hio_errors[ncomp] = errno; 1670 pjdlog_errno(LOG_ERR, 1671 "Unable to receive reply data"); 1672 rw_unlock(&hio_remote_lock[ncomp]); 1673 nv_free(nv); 1674 remote_close(res, ncomp); 1675 goto done_queue; 1676 } 1677 rw_unlock(&hio_remote_lock[ncomp]); 1678 break; 1679 case BIO_WRITE: 1680 case BIO_DELETE: 1681 case BIO_FLUSH: 1682 break; 1683 default: 1684 PJDLOG_ABORT("invalid condition"); 1685 } 1686 hio->hio_errors[ncomp] = 0; 1687 nv_free(nv); 1688 done_queue: 1689 if (!refcount_release(&hio->hio_countdown)) 1690 continue; 1691 if (ISSYNCREQ(hio)) { 1692 mtx_lock(&sync_lock); 1693 SYNCREQDONE(hio); 1694 mtx_unlock(&sync_lock); 1695 cv_signal(&sync_cond); 1696 } else { 1697 pjdlog_debug(2, 1698 "remote_recv: (%p) Moving request to the done queue.", 1699 hio); 1700 QUEUE_INSERT2(hio, done); 1701 } 1702 } 1703 /* NOTREACHED */ 1704 return (NULL); 1705 } 1706 1707 /* 1708 * Thread sends answer to the kernel. 1709 */ 1710 static void * 1711 ggate_send_thread(void *arg) 1712 { 1713 struct hast_resource *res = arg; 1714 struct g_gate_ctl_io *ggio; 1715 struct hio *hio; 1716 unsigned int ii, ncomps; 1717 1718 ncomps = HAST_NCOMPONENTS; 1719 1720 for (;;) { 1721 pjdlog_debug(2, "ggate_send: Taking request."); 1722 QUEUE_TAKE2(hio, done); 1723 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1724 ggio = &hio->hio_ggio; 1725 for (ii = 0; ii < ncomps; ii++) { 1726 if (hio->hio_errors[ii] == 0) { 1727 /* 1728 * One successful request is enough to declare 1729 * success. 1730 */ 1731 ggio->gctl_error = 0; 1732 break; 1733 } 1734 } 1735 if (ii == ncomps) { 1736 /* 1737 * None of the requests were successful. 1738 * Use the error from local component except the 1739 * case when we did only remote request. 1740 */ 1741 if (ggio->gctl_cmd == BIO_READ && 1742 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1743 ggio->gctl_error = hio->hio_errors[1]; 1744 else 1745 ggio->gctl_error = hio->hio_errors[0]; 1746 } 1747 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1748 mtx_lock(&res->hr_amp_lock); 1749 if (activemap_write_complete(res->hr_amp, 1750 ggio->gctl_offset, ggio->gctl_length)) { 1751 res->hr_stat_activemap_update++; 1752 (void)hast_activemap_flush(res); 1753 } 1754 mtx_unlock(&res->hr_amp_lock); 1755 } 1756 if (ggio->gctl_cmd == BIO_WRITE) { 1757 /* 1758 * Unlock range we locked. 1759 */ 1760 mtx_lock(&range_lock); 1761 rangelock_del(range_regular, ggio->gctl_offset, 1762 ggio->gctl_length); 1763 if (range_sync_wait) 1764 cv_signal(&range_sync_cond); 1765 mtx_unlock(&range_lock); 1766 if (!hio->hio_done) 1767 write_complete(res, hio); 1768 } else { 1769 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) { 1770 primary_exit(EX_OSERR, 1771 "G_GATE_CMD_DONE failed"); 1772 } 1773 } 1774 pjdlog_debug(2, 1775 "ggate_send: (%p) Moving request to the free queue.", hio); 1776 QUEUE_INSERT2(hio, free); 1777 } 1778 /* NOTREACHED */ 1779 return (NULL); 1780 } 1781 1782 /* 1783 * Thread synchronize local and remote components. 1784 */ 1785 static void * 1786 sync_thread(void *arg __unused) 1787 { 1788 struct hast_resource *res = arg; 1789 struct hio *hio; 1790 struct g_gate_ctl_io *ggio; 1791 struct timeval tstart, tend, tdiff; 1792 unsigned int ii, ncomp, ncomps; 1793 off_t offset, length, synced; 1794 bool dorewind; 1795 int syncext; 1796 1797 ncomps = HAST_NCOMPONENTS; 1798 dorewind = true; 1799 synced = 0; 1800 offset = -1; 1801 1802 for (;;) { 1803 mtx_lock(&sync_lock); 1804 if (offset >= 0 && !sync_inprogress) { 1805 gettimeofday(&tend, NULL); 1806 timersub(&tend, &tstart, &tdiff); 1807 pjdlog_info("Synchronization interrupted after %#.0T. " 1808 "%NB synchronized so far.", &tdiff, 1809 (intmax_t)synced); 1810 event_send(res, EVENT_SYNCINTR); 1811 } 1812 while (!sync_inprogress) { 1813 dorewind = true; 1814 synced = 0; 1815 cv_wait(&sync_cond, &sync_lock); 1816 } 1817 mtx_unlock(&sync_lock); 1818 /* 1819 * Obtain offset at which we should synchronize. 1820 * Rewind synchronization if needed. 1821 */ 1822 mtx_lock(&res->hr_amp_lock); 1823 if (dorewind) 1824 activemap_sync_rewind(res->hr_amp); 1825 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1826 if (syncext != -1) { 1827 /* 1828 * We synchronized entire syncext extent, we can mark 1829 * it as clean now. 1830 */ 1831 if (activemap_extent_complete(res->hr_amp, syncext)) 1832 (void)hast_activemap_flush(res); 1833 } 1834 mtx_unlock(&res->hr_amp_lock); 1835 if (dorewind) { 1836 dorewind = false; 1837 if (offset < 0) 1838 pjdlog_info("Nodes are in sync."); 1839 else { 1840 pjdlog_info("Synchronization started. %NB to go.", 1841 (intmax_t)(res->hr_extentsize * 1842 activemap_ndirty(res->hr_amp))); 1843 event_send(res, EVENT_SYNCSTART); 1844 gettimeofday(&tstart, NULL); 1845 } 1846 } 1847 if (offset < 0) { 1848 sync_stop(); 1849 pjdlog_debug(1, "Nothing to synchronize."); 1850 /* 1851 * Synchronization complete, make both localcnt and 1852 * remotecnt equal. 1853 */ 1854 ncomp = 1; 1855 rw_rlock(&hio_remote_lock[ncomp]); 1856 if (ISCONNECTED(res, ncomp)) { 1857 if (synced > 0) { 1858 int64_t bps; 1859 1860 gettimeofday(&tend, NULL); 1861 timersub(&tend, &tstart, &tdiff); 1862 bps = (int64_t)((double)synced / 1863 ((double)tdiff.tv_sec + 1864 (double)tdiff.tv_usec / 1000000)); 1865 pjdlog_info("Synchronization complete. " 1866 "%NB synchronized in %#.0lT (%NB/sec).", 1867 (intmax_t)synced, &tdiff, 1868 (intmax_t)bps); 1869 event_send(res, EVENT_SYNCDONE); 1870 } 1871 mtx_lock(&metadata_lock); 1872 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1873 res->hr_primary_localcnt = 1874 res->hr_secondary_remotecnt; 1875 res->hr_primary_remotecnt = 1876 res->hr_secondary_localcnt; 1877 pjdlog_debug(1, 1878 "Setting localcnt to %ju and remotecnt to %ju.", 1879 (uintmax_t)res->hr_primary_localcnt, 1880 (uintmax_t)res->hr_primary_remotecnt); 1881 (void)metadata_write(res); 1882 mtx_unlock(&metadata_lock); 1883 } 1884 rw_unlock(&hio_remote_lock[ncomp]); 1885 continue; 1886 } 1887 pjdlog_debug(2, "sync: Taking free request."); 1888 QUEUE_TAKE2(hio, free); 1889 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1890 /* 1891 * Lock the range we are going to synchronize. We don't want 1892 * race where someone writes between our read and write. 1893 */ 1894 for (;;) { 1895 mtx_lock(&range_lock); 1896 if (rangelock_islocked(range_regular, offset, length)) { 1897 pjdlog_debug(2, 1898 "sync: Range offset=%jd length=%jd locked.", 1899 (intmax_t)offset, (intmax_t)length); 1900 range_sync_wait = true; 1901 cv_wait(&range_sync_cond, &range_lock); 1902 range_sync_wait = false; 1903 mtx_unlock(&range_lock); 1904 continue; 1905 } 1906 if (rangelock_add(range_sync, offset, length) < 0) { 1907 mtx_unlock(&range_lock); 1908 pjdlog_debug(2, 1909 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1910 (intmax_t)offset, (intmax_t)length); 1911 sleep(1); 1912 continue; 1913 } 1914 mtx_unlock(&range_lock); 1915 break; 1916 } 1917 /* 1918 * First read the data from synchronization source. 1919 */ 1920 SYNCREQ(hio); 1921 ggio = &hio->hio_ggio; 1922 ggio->gctl_cmd = BIO_READ; 1923 ggio->gctl_offset = offset; 1924 ggio->gctl_length = length; 1925 ggio->gctl_error = 0; 1926 hio->hio_done = false; 1927 hio->hio_replication = res->hr_replication; 1928 for (ii = 0; ii < ncomps; ii++) 1929 hio->hio_errors[ii] = EINVAL; 1930 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1931 hio); 1932 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1933 hio); 1934 mtx_lock(&metadata_lock); 1935 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1936 /* 1937 * This range is up-to-date on local component, 1938 * so handle request locally. 1939 */ 1940 /* Local component is 0 for now. */ 1941 ncomp = 0; 1942 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1943 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1944 /* 1945 * This range is out-of-date on local component, 1946 * so send request to the remote node. 1947 */ 1948 /* Remote component is 1 for now. */ 1949 ncomp = 1; 1950 } 1951 mtx_unlock(&metadata_lock); 1952 refcount_init(&hio->hio_countdown, 1); 1953 QUEUE_INSERT1(hio, send, ncomp); 1954 1955 /* 1956 * Let's wait for READ to finish. 1957 */ 1958 mtx_lock(&sync_lock); 1959 while (!ISSYNCREQDONE(hio)) 1960 cv_wait(&sync_cond, &sync_lock); 1961 mtx_unlock(&sync_lock); 1962 1963 if (hio->hio_errors[ncomp] != 0) { 1964 pjdlog_error("Unable to read synchronization data: %s.", 1965 strerror(hio->hio_errors[ncomp])); 1966 goto free_queue; 1967 } 1968 1969 /* 1970 * We read the data from synchronization source, now write it 1971 * to synchronization target. 1972 */ 1973 SYNCREQ(hio); 1974 ggio->gctl_cmd = BIO_WRITE; 1975 for (ii = 0; ii < ncomps; ii++) 1976 hio->hio_errors[ii] = EINVAL; 1977 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1978 hio); 1979 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1980 hio); 1981 mtx_lock(&metadata_lock); 1982 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1983 /* 1984 * This range is up-to-date on local component, 1985 * so we update remote component. 1986 */ 1987 /* Remote component is 1 for now. */ 1988 ncomp = 1; 1989 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1990 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1991 /* 1992 * This range is out-of-date on local component, 1993 * so we update it. 1994 */ 1995 /* Local component is 0 for now. */ 1996 ncomp = 0; 1997 } 1998 mtx_unlock(&metadata_lock); 1999 2000 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2001 hio); 2002 refcount_init(&hio->hio_countdown, 1); 2003 QUEUE_INSERT1(hio, send, ncomp); 2004 2005 /* 2006 * Let's wait for WRITE to finish. 2007 */ 2008 mtx_lock(&sync_lock); 2009 while (!ISSYNCREQDONE(hio)) 2010 cv_wait(&sync_cond, &sync_lock); 2011 mtx_unlock(&sync_lock); 2012 2013 if (hio->hio_errors[ncomp] != 0) { 2014 pjdlog_error("Unable to write synchronization data: %s.", 2015 strerror(hio->hio_errors[ncomp])); 2016 goto free_queue; 2017 } 2018 2019 synced += length; 2020 free_queue: 2021 mtx_lock(&range_lock); 2022 rangelock_del(range_sync, offset, length); 2023 if (range_regular_wait) 2024 cv_signal(&range_regular_cond); 2025 mtx_unlock(&range_lock); 2026 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2027 hio); 2028 QUEUE_INSERT2(hio, free); 2029 } 2030 /* NOTREACHED */ 2031 return (NULL); 2032 } 2033 2034 void 2035 primary_config_reload(struct hast_resource *res, struct nv *nv) 2036 { 2037 unsigned int ii, ncomps; 2038 int modified, vint; 2039 const char *vstr; 2040 2041 pjdlog_info("Reloading configuration..."); 2042 2043 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2044 PJDLOG_ASSERT(gres == res); 2045 nv_assert(nv, "remoteaddr"); 2046 nv_assert(nv, "sourceaddr"); 2047 nv_assert(nv, "replication"); 2048 nv_assert(nv, "checksum"); 2049 nv_assert(nv, "compression"); 2050 nv_assert(nv, "timeout"); 2051 nv_assert(nv, "exec"); 2052 nv_assert(nv, "metaflush"); 2053 2054 ncomps = HAST_NCOMPONENTS; 2055 2056 #define MODIFIED_REMOTEADDR 0x01 2057 #define MODIFIED_SOURCEADDR 0x02 2058 #define MODIFIED_REPLICATION 0x04 2059 #define MODIFIED_CHECKSUM 0x08 2060 #define MODIFIED_COMPRESSION 0x10 2061 #define MODIFIED_TIMEOUT 0x20 2062 #define MODIFIED_EXEC 0x40 2063 #define MODIFIED_METAFLUSH 0x80 2064 modified = 0; 2065 2066 vstr = nv_get_string(nv, "remoteaddr"); 2067 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2068 /* 2069 * Don't copy res->hr_remoteaddr to gres just yet. 2070 * We want remote_close() to log disconnect from the old 2071 * addresses, not from the new ones. 2072 */ 2073 modified |= MODIFIED_REMOTEADDR; 2074 } 2075 vstr = nv_get_string(nv, "sourceaddr"); 2076 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2077 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2078 modified |= MODIFIED_SOURCEADDR; 2079 } 2080 vint = nv_get_int32(nv, "replication"); 2081 if (gres->hr_replication != vint) { 2082 gres->hr_replication = vint; 2083 modified |= MODIFIED_REPLICATION; 2084 } 2085 vint = nv_get_int32(nv, "checksum"); 2086 if (gres->hr_checksum != vint) { 2087 gres->hr_checksum = vint; 2088 modified |= MODIFIED_CHECKSUM; 2089 } 2090 vint = nv_get_int32(nv, "compression"); 2091 if (gres->hr_compression != vint) { 2092 gres->hr_compression = vint; 2093 modified |= MODIFIED_COMPRESSION; 2094 } 2095 vint = nv_get_int32(nv, "timeout"); 2096 if (gres->hr_timeout != vint) { 2097 gres->hr_timeout = vint; 2098 modified |= MODIFIED_TIMEOUT; 2099 } 2100 vstr = nv_get_string(nv, "exec"); 2101 if (strcmp(gres->hr_exec, vstr) != 0) { 2102 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2103 modified |= MODIFIED_EXEC; 2104 } 2105 vint = nv_get_int32(nv, "metaflush"); 2106 if (gres->hr_metaflush != vint) { 2107 gres->hr_metaflush = vint; 2108 modified |= MODIFIED_METAFLUSH; 2109 } 2110 2111 /* 2112 * Change timeout for connected sockets. 2113 * Don't bother if we need to reconnect. 2114 */ 2115 if ((modified & MODIFIED_TIMEOUT) != 0 && 2116 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2117 for (ii = 0; ii < ncomps; ii++) { 2118 if (!ISREMOTE(ii)) 2119 continue; 2120 rw_rlock(&hio_remote_lock[ii]); 2121 if (!ISCONNECTED(gres, ii)) { 2122 rw_unlock(&hio_remote_lock[ii]); 2123 continue; 2124 } 2125 rw_unlock(&hio_remote_lock[ii]); 2126 if (proto_timeout(gres->hr_remotein, 2127 gres->hr_timeout) < 0) { 2128 pjdlog_errno(LOG_WARNING, 2129 "Unable to set connection timeout"); 2130 } 2131 if (proto_timeout(gres->hr_remoteout, 2132 gres->hr_timeout) < 0) { 2133 pjdlog_errno(LOG_WARNING, 2134 "Unable to set connection timeout"); 2135 } 2136 } 2137 } 2138 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2139 for (ii = 0; ii < ncomps; ii++) { 2140 if (!ISREMOTE(ii)) 2141 continue; 2142 remote_close(gres, ii); 2143 } 2144 if (modified & MODIFIED_REMOTEADDR) { 2145 vstr = nv_get_string(nv, "remoteaddr"); 2146 strlcpy(gres->hr_remoteaddr, vstr, 2147 sizeof(gres->hr_remoteaddr)); 2148 } 2149 } 2150 #undef MODIFIED_REMOTEADDR 2151 #undef MODIFIED_SOURCEADDR 2152 #undef MODIFIED_REPLICATION 2153 #undef MODIFIED_CHECKSUM 2154 #undef MODIFIED_COMPRESSION 2155 #undef MODIFIED_TIMEOUT 2156 #undef MODIFIED_EXEC 2157 #undef MODIFIED_METAFLUSH 2158 2159 pjdlog_info("Configuration reloaded successfully."); 2160 } 2161 2162 static void 2163 guard_one(struct hast_resource *res, unsigned int ncomp) 2164 { 2165 struct proto_conn *in, *out; 2166 2167 if (!ISREMOTE(ncomp)) 2168 return; 2169 2170 rw_rlock(&hio_remote_lock[ncomp]); 2171 2172 if (!real_remote(res)) { 2173 rw_unlock(&hio_remote_lock[ncomp]); 2174 return; 2175 } 2176 2177 if (ISCONNECTED(res, ncomp)) { 2178 PJDLOG_ASSERT(res->hr_remotein != NULL); 2179 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2180 rw_unlock(&hio_remote_lock[ncomp]); 2181 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2182 res->hr_remoteaddr); 2183 return; 2184 } 2185 2186 PJDLOG_ASSERT(res->hr_remotein == NULL); 2187 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2188 /* 2189 * Upgrade the lock. It doesn't have to be atomic as no other thread 2190 * can change connection status from disconnected to connected. 2191 */ 2192 rw_unlock(&hio_remote_lock[ncomp]); 2193 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2194 res->hr_remoteaddr); 2195 in = out = NULL; 2196 if (init_remote(res, &in, &out) == 0) { 2197 rw_wlock(&hio_remote_lock[ncomp]); 2198 PJDLOG_ASSERT(res->hr_remotein == NULL); 2199 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2200 PJDLOG_ASSERT(in != NULL && out != NULL); 2201 res->hr_remotein = in; 2202 res->hr_remoteout = out; 2203 rw_unlock(&hio_remote_lock[ncomp]); 2204 pjdlog_info("Successfully reconnected to %s.", 2205 res->hr_remoteaddr); 2206 sync_start(); 2207 } else { 2208 /* Both connections should be NULL. */ 2209 PJDLOG_ASSERT(res->hr_remotein == NULL); 2210 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2211 PJDLOG_ASSERT(in == NULL && out == NULL); 2212 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2213 res->hr_remoteaddr); 2214 } 2215 } 2216 2217 /* 2218 * Thread guards remote connections and reconnects when needed, handles 2219 * signals, etc. 2220 */ 2221 static void * 2222 guard_thread(void *arg) 2223 { 2224 struct hast_resource *res = arg; 2225 unsigned int ii, ncomps; 2226 struct timespec timeout; 2227 time_t lastcheck, now; 2228 sigset_t mask; 2229 int signo; 2230 2231 ncomps = HAST_NCOMPONENTS; 2232 lastcheck = time(NULL); 2233 2234 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2235 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2236 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2237 2238 timeout.tv_sec = HAST_KEEPALIVE; 2239 timeout.tv_nsec = 0; 2240 signo = -1; 2241 2242 for (;;) { 2243 switch (signo) { 2244 case SIGINT: 2245 case SIGTERM: 2246 sigexit_received = true; 2247 primary_exitx(EX_OK, 2248 "Termination signal received, exiting."); 2249 break; 2250 default: 2251 break; 2252 } 2253 2254 /* 2255 * Don't check connections until we fully started, 2256 * as we may still be looping, waiting for remote node 2257 * to switch from primary to secondary. 2258 */ 2259 if (fullystarted) { 2260 pjdlog_debug(2, "remote_guard: Checking connections."); 2261 now = time(NULL); 2262 if (lastcheck + HAST_KEEPALIVE <= now) { 2263 for (ii = 0; ii < ncomps; ii++) 2264 guard_one(res, ii); 2265 lastcheck = now; 2266 } 2267 } 2268 signo = sigtimedwait(&mask, NULL, &timeout); 2269 } 2270 /* NOTREACHED */ 2271 return (NULL); 2272 } 2273