1 /*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/types.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/refcount.h> 39 #include <sys/stat.h> 40 41 #include <geom/gate/g_gate.h> 42 43 #include <err.h> 44 #include <errno.h> 45 #include <fcntl.h> 46 #include <libgeom.h> 47 #include <pthread.h> 48 #include <signal.h> 49 #include <stdint.h> 50 #include <stdio.h> 51 #include <string.h> 52 #include <sysexits.h> 53 #include <unistd.h> 54 55 #include <activemap.h> 56 #include <nv.h> 57 #include <rangelock.h> 58 59 #include "control.h" 60 #include "event.h" 61 #include "hast.h" 62 #include "hast_proto.h" 63 #include "hastd.h" 64 #include "hooks.h" 65 #include "metadata.h" 66 #include "proto.h" 67 #include "pjdlog.h" 68 #include "subr.h" 69 #include "synch.h" 70 71 /* The is only one remote component for now. */ 72 #define ISREMOTE(no) ((no) == 1) 73 74 struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 unsigned int hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 /* 93 * Request was already confirmed to GEOM Gate. 94 */ 95 bool hio_done; 96 /* 97 * Remember replication from the time the request was initiated, 98 * so we won't get confused when replication changes on reload. 99 */ 100 int hio_replication; 101 TAILQ_ENTRY(hio) *hio_next; 102 }; 103 #define hio_free_next hio_next[0] 104 #define hio_done_next hio_next[0] 105 106 /* 107 * Free list holds unused structures. When free list is empty, we have to wait 108 * until some in-progress requests are freed. 109 */ 110 static TAILQ_HEAD(, hio) hio_free_list; 111 static pthread_mutex_t hio_free_list_lock; 112 static pthread_cond_t hio_free_list_cond; 113 /* 114 * There is one send list for every component. One requests is placed on all 115 * send lists - each component gets the same request, but each component is 116 * responsible for managing his own send list. 117 */ 118 static TAILQ_HEAD(, hio) *hio_send_list; 119 static pthread_mutex_t *hio_send_list_lock; 120 static pthread_cond_t *hio_send_list_cond; 121 /* 122 * There is one recv list for every component, although local components don't 123 * use recv lists as local requests are done synchronously. 124 */ 125 static TAILQ_HEAD(, hio) *hio_recv_list; 126 static pthread_mutex_t *hio_recv_list_lock; 127 static pthread_cond_t *hio_recv_list_cond; 128 /* 129 * Request is placed on done list by the slowest component (the one that 130 * decreased hio_countdown from 1 to 0). 131 */ 132 static TAILQ_HEAD(, hio) hio_done_list; 133 static pthread_mutex_t hio_done_list_lock; 134 static pthread_cond_t hio_done_list_cond; 135 /* 136 * Structure below are for interaction with sync thread. 137 */ 138 static bool sync_inprogress; 139 static pthread_mutex_t sync_lock; 140 static pthread_cond_t sync_cond; 141 /* 142 * The lock below allows to synchornize access to remote connections. 143 */ 144 static pthread_rwlock_t *hio_remote_lock; 145 146 /* 147 * Lock to synchronize metadata updates. Also synchronize access to 148 * hr_primary_localcnt and hr_primary_remotecnt fields. 149 */ 150 static pthread_mutex_t metadata_lock; 151 152 /* 153 * Maximum number of outstanding I/O requests. 154 */ 155 #define HAST_HIO_MAX 256 156 /* 157 * Number of components. At this point there are only two components: local 158 * and remote, but in the future it might be possible to use multiple local 159 * and remote components. 160 */ 161 #define HAST_NCOMPONENTS 2 162 163 #define ISCONNECTED(res, no) \ 164 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 165 166 #define QUEUE_INSERT1(hio, name, ncomp) do { \ 167 bool _wakeup; \ 168 \ 169 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 170 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 171 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 172 hio_next[(ncomp)]); \ 173 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 174 if (_wakeup) \ 175 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 176 } while (0) 177 #define QUEUE_INSERT2(hio, name) do { \ 178 bool _wakeup; \ 179 \ 180 mtx_lock(&hio_##name##_list_lock); \ 181 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 182 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 183 mtx_unlock(&hio_##name##_list_lock); \ 184 if (_wakeup) \ 185 cv_signal(&hio_##name##_list_cond); \ 186 } while (0) 187 #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 188 bool _last; \ 189 \ 190 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 191 _last = false; \ 192 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 193 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 194 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 195 if ((timeout) != 0) \ 196 _last = true; \ 197 } \ 198 if (hio != NULL) { \ 199 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 200 hio_next[(ncomp)]); \ 201 } \ 202 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 203 } while (0) 204 #define QUEUE_TAKE2(hio, name) do { \ 205 mtx_lock(&hio_##name##_list_lock); \ 206 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 207 cv_wait(&hio_##name##_list_cond, \ 208 &hio_##name##_list_lock); \ 209 } \ 210 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 211 mtx_unlock(&hio_##name##_list_lock); \ 212 } while (0) 213 214 #define SYNCREQ(hio) do { \ 215 (hio)->hio_ggio.gctl_unit = -1; \ 216 (hio)->hio_ggio.gctl_seq = 1; \ 217 } while (0) 218 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 219 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 220 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 221 222 static struct hast_resource *gres; 223 224 static pthread_mutex_t range_lock; 225 static struct rangelocks *range_regular; 226 static bool range_regular_wait; 227 static pthread_cond_t range_regular_cond; 228 static struct rangelocks *range_sync; 229 static bool range_sync_wait; 230 static pthread_cond_t range_sync_cond; 231 static bool fullystarted; 232 233 static void *ggate_recv_thread(void *arg); 234 static void *local_send_thread(void *arg); 235 static void *remote_send_thread(void *arg); 236 static void *remote_recv_thread(void *arg); 237 static void *ggate_send_thread(void *arg); 238 static void *sync_thread(void *arg); 239 static void *guard_thread(void *arg); 240 241 static void 242 cleanup(struct hast_resource *res) 243 { 244 int rerrno; 245 246 /* Remember errno. */ 247 rerrno = errno; 248 249 /* Destroy ggate provider if we created one. */ 250 if (res->hr_ggateunit >= 0) { 251 struct g_gate_ctl_destroy ggiod; 252 253 bzero(&ggiod, sizeof(ggiod)); 254 ggiod.gctl_version = G_GATE_VERSION; 255 ggiod.gctl_unit = res->hr_ggateunit; 256 ggiod.gctl_force = 1; 257 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 258 pjdlog_errno(LOG_WARNING, 259 "Unable to destroy hast/%s device", 260 res->hr_provname); 261 } 262 res->hr_ggateunit = -1; 263 } 264 265 /* Restore errno. */ 266 errno = rerrno; 267 } 268 269 static __dead2 void 270 primary_exit(int exitcode, const char *fmt, ...) 271 { 272 va_list ap; 273 274 PJDLOG_ASSERT(exitcode != EX_OK); 275 va_start(ap, fmt); 276 pjdlogv_errno(LOG_ERR, fmt, ap); 277 va_end(ap); 278 cleanup(gres); 279 exit(exitcode); 280 } 281 282 static __dead2 void 283 primary_exitx(int exitcode, const char *fmt, ...) 284 { 285 va_list ap; 286 287 va_start(ap, fmt); 288 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 289 va_end(ap); 290 cleanup(gres); 291 exit(exitcode); 292 } 293 294 static int 295 hast_activemap_flush(struct hast_resource *res) 296 { 297 const unsigned char *buf; 298 size_t size; 299 300 buf = activemap_bitmap(res->hr_amp, &size); 301 PJDLOG_ASSERT(buf != NULL); 302 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 303 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 304 (ssize_t)size) { 305 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 306 return (-1); 307 } 308 if (res->hr_metaflush == 1 && g_flush(res->hr_localfd) == -1) { 309 if (errno == EOPNOTSUPP) { 310 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 311 res->hr_localpath); 312 res->hr_metaflush = 0; 313 } else { 314 pjdlog_errno(LOG_ERR, 315 "Unable to flush disk cache on activemap update"); 316 return (-1); 317 } 318 } 319 return (0); 320 } 321 322 static bool 323 real_remote(const struct hast_resource *res) 324 { 325 326 return (strcmp(res->hr_remoteaddr, "none") != 0); 327 } 328 329 static void 330 init_environment(struct hast_resource *res __unused) 331 { 332 struct hio *hio; 333 unsigned int ii, ncomps; 334 335 /* 336 * In the future it might be per-resource value. 337 */ 338 ncomps = HAST_NCOMPONENTS; 339 340 /* 341 * Allocate memory needed by lists. 342 */ 343 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 344 if (hio_send_list == NULL) { 345 primary_exitx(EX_TEMPFAIL, 346 "Unable to allocate %zu bytes of memory for send lists.", 347 sizeof(hio_send_list[0]) * ncomps); 348 } 349 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 350 if (hio_send_list_lock == NULL) { 351 primary_exitx(EX_TEMPFAIL, 352 "Unable to allocate %zu bytes of memory for send list locks.", 353 sizeof(hio_send_list_lock[0]) * ncomps); 354 } 355 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 356 if (hio_send_list_cond == NULL) { 357 primary_exitx(EX_TEMPFAIL, 358 "Unable to allocate %zu bytes of memory for send list condition variables.", 359 sizeof(hio_send_list_cond[0]) * ncomps); 360 } 361 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 362 if (hio_recv_list == NULL) { 363 primary_exitx(EX_TEMPFAIL, 364 "Unable to allocate %zu bytes of memory for recv lists.", 365 sizeof(hio_recv_list[0]) * ncomps); 366 } 367 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 368 if (hio_recv_list_lock == NULL) { 369 primary_exitx(EX_TEMPFAIL, 370 "Unable to allocate %zu bytes of memory for recv list locks.", 371 sizeof(hio_recv_list_lock[0]) * ncomps); 372 } 373 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 374 if (hio_recv_list_cond == NULL) { 375 primary_exitx(EX_TEMPFAIL, 376 "Unable to allocate %zu bytes of memory for recv list condition variables.", 377 sizeof(hio_recv_list_cond[0]) * ncomps); 378 } 379 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 380 if (hio_remote_lock == NULL) { 381 primary_exitx(EX_TEMPFAIL, 382 "Unable to allocate %zu bytes of memory for remote connections locks.", 383 sizeof(hio_remote_lock[0]) * ncomps); 384 } 385 386 /* 387 * Initialize lists, their locks and theirs condition variables. 388 */ 389 TAILQ_INIT(&hio_free_list); 390 mtx_init(&hio_free_list_lock); 391 cv_init(&hio_free_list_cond); 392 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 393 TAILQ_INIT(&hio_send_list[ii]); 394 mtx_init(&hio_send_list_lock[ii]); 395 cv_init(&hio_send_list_cond[ii]); 396 TAILQ_INIT(&hio_recv_list[ii]); 397 mtx_init(&hio_recv_list_lock[ii]); 398 cv_init(&hio_recv_list_cond[ii]); 399 rw_init(&hio_remote_lock[ii]); 400 } 401 TAILQ_INIT(&hio_done_list); 402 mtx_init(&hio_done_list_lock); 403 cv_init(&hio_done_list_cond); 404 mtx_init(&metadata_lock); 405 406 /* 407 * Allocate requests pool and initialize requests. 408 */ 409 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 410 hio = malloc(sizeof(*hio)); 411 if (hio == NULL) { 412 primary_exitx(EX_TEMPFAIL, 413 "Unable to allocate %zu bytes of memory for hio request.", 414 sizeof(*hio)); 415 } 416 hio->hio_countdown = 0; 417 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 418 if (hio->hio_errors == NULL) { 419 primary_exitx(EX_TEMPFAIL, 420 "Unable allocate %zu bytes of memory for hio errors.", 421 sizeof(hio->hio_errors[0]) * ncomps); 422 } 423 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 424 if (hio->hio_next == NULL) { 425 primary_exitx(EX_TEMPFAIL, 426 "Unable allocate %zu bytes of memory for hio_next field.", 427 sizeof(hio->hio_next[0]) * ncomps); 428 } 429 hio->hio_ggio.gctl_version = G_GATE_VERSION; 430 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 431 if (hio->hio_ggio.gctl_data == NULL) { 432 primary_exitx(EX_TEMPFAIL, 433 "Unable to allocate %zu bytes of memory for gctl_data.", 434 MAXPHYS); 435 } 436 hio->hio_ggio.gctl_length = MAXPHYS; 437 hio->hio_ggio.gctl_error = 0; 438 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 439 } 440 } 441 442 static bool 443 init_resuid(struct hast_resource *res) 444 { 445 446 mtx_lock(&metadata_lock); 447 if (res->hr_resuid != 0) { 448 mtx_unlock(&metadata_lock); 449 return (false); 450 } else { 451 /* Initialize unique resource identifier. */ 452 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 453 mtx_unlock(&metadata_lock); 454 if (metadata_write(res) == -1) 455 exit(EX_NOINPUT); 456 return (true); 457 } 458 } 459 460 static void 461 init_local(struct hast_resource *res) 462 { 463 unsigned char *buf; 464 size_t mapsize; 465 466 if (metadata_read(res, true) == -1) 467 exit(EX_NOINPUT); 468 mtx_init(&res->hr_amp_lock); 469 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 470 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 471 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 472 } 473 mtx_init(&range_lock); 474 cv_init(&range_regular_cond); 475 if (rangelock_init(&range_regular) == -1) 476 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 477 cv_init(&range_sync_cond); 478 if (rangelock_init(&range_sync) == -1) 479 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 480 mapsize = activemap_ondisk_size(res->hr_amp); 481 buf = calloc(1, mapsize); 482 if (buf == NULL) { 483 primary_exitx(EX_TEMPFAIL, 484 "Unable to allocate buffer for activemap."); 485 } 486 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 487 (ssize_t)mapsize) { 488 primary_exit(EX_NOINPUT, "Unable to read activemap"); 489 } 490 activemap_copyin(res->hr_amp, buf, mapsize); 491 free(buf); 492 if (res->hr_resuid != 0) 493 return; 494 /* 495 * We're using provider for the first time. Initialize local and remote 496 * counters. We don't initialize resuid here, as we want to do it just 497 * in time. The reason for this is that we want to inform secondary 498 * that there were no writes yet, so there is no need to synchronize 499 * anything. 500 */ 501 res->hr_primary_localcnt = 0; 502 res->hr_primary_remotecnt = 0; 503 if (metadata_write(res) == -1) 504 exit(EX_NOINPUT); 505 } 506 507 static int 508 primary_connect(struct hast_resource *res, struct proto_conn **connp) 509 { 510 struct proto_conn *conn; 511 int16_t val; 512 513 val = 1; 514 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 515 primary_exit(EX_TEMPFAIL, 516 "Unable to send connection request to parent"); 517 } 518 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 519 primary_exit(EX_TEMPFAIL, 520 "Unable to receive reply to connection request from parent"); 521 } 522 if (val != 0) { 523 errno = val; 524 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 525 res->hr_remoteaddr); 526 return (-1); 527 } 528 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 529 primary_exit(EX_TEMPFAIL, 530 "Unable to receive connection from parent"); 531 } 532 if (proto_connect_wait(conn, res->hr_timeout) == -1) { 533 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 534 res->hr_remoteaddr); 535 proto_close(conn); 536 return (-1); 537 } 538 /* Error in setting timeout is not critical, but why should it fail? */ 539 if (proto_timeout(conn, res->hr_timeout) == -1) 540 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 541 542 *connp = conn; 543 544 return (0); 545 } 546 547 /* 548 * Function instructs GEOM_GATE to handle reads directly from within the kernel. 549 */ 550 static void 551 enable_direct_reads(struct hast_resource *res) 552 { 553 struct g_gate_ctl_modify ggiomodify; 554 555 bzero(&ggiomodify, sizeof(ggiomodify)); 556 ggiomodify.gctl_version = G_GATE_VERSION; 557 ggiomodify.gctl_unit = res->hr_ggateunit; 558 ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; 559 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, 560 sizeof(ggiomodify.gctl_readprov)); 561 ggiomodify.gctl_readoffset = res->hr_localoff; 562 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) 563 pjdlog_debug(1, "Direct reads enabled."); 564 else 565 pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); 566 } 567 568 static int 569 init_remote(struct hast_resource *res, struct proto_conn **inp, 570 struct proto_conn **outp) 571 { 572 struct proto_conn *in, *out; 573 struct nv *nvout, *nvin; 574 const unsigned char *token; 575 unsigned char *map; 576 const char *errmsg; 577 int32_t extentsize; 578 int64_t datasize; 579 uint32_t mapsize; 580 size_t size; 581 int error; 582 583 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 584 PJDLOG_ASSERT(real_remote(res)); 585 586 in = out = NULL; 587 errmsg = NULL; 588 589 if (primary_connect(res, &out) == -1) 590 return (ECONNREFUSED); 591 592 error = ECONNABORTED; 593 594 /* 595 * First handshake step. 596 * Setup outgoing connection with remote node. 597 */ 598 nvout = nv_alloc(); 599 nv_add_string(nvout, res->hr_name, "resource"); 600 if (nv_error(nvout) != 0) { 601 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 602 "Unable to allocate header for connection with %s", 603 res->hr_remoteaddr); 604 nv_free(nvout); 605 goto close; 606 } 607 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 608 pjdlog_errno(LOG_WARNING, 609 "Unable to send handshake header to %s", 610 res->hr_remoteaddr); 611 nv_free(nvout); 612 goto close; 613 } 614 nv_free(nvout); 615 if (hast_proto_recv_hdr(out, &nvin) == -1) { 616 pjdlog_errno(LOG_WARNING, 617 "Unable to receive handshake header from %s", 618 res->hr_remoteaddr); 619 goto close; 620 } 621 errmsg = nv_get_string(nvin, "errmsg"); 622 if (errmsg != NULL) { 623 pjdlog_warning("%s", errmsg); 624 if (nv_exists(nvin, "wait")) 625 error = EBUSY; 626 nv_free(nvin); 627 goto close; 628 } 629 token = nv_get_uint8_array(nvin, &size, "token"); 630 if (token == NULL) { 631 pjdlog_warning("Handshake header from %s has no 'token' field.", 632 res->hr_remoteaddr); 633 nv_free(nvin); 634 goto close; 635 } 636 if (size != sizeof(res->hr_token)) { 637 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 638 res->hr_remoteaddr, size, sizeof(res->hr_token)); 639 nv_free(nvin); 640 goto close; 641 } 642 bcopy(token, res->hr_token, sizeof(res->hr_token)); 643 nv_free(nvin); 644 645 /* 646 * Second handshake step. 647 * Setup incoming connection with remote node. 648 */ 649 if (primary_connect(res, &in) == -1) 650 goto close; 651 652 nvout = nv_alloc(); 653 nv_add_string(nvout, res->hr_name, "resource"); 654 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 655 "token"); 656 if (res->hr_resuid == 0) { 657 /* 658 * The resuid field was not yet initialized. 659 * Because we do synchronization inside init_resuid(), it is 660 * possible that someone already initialized it, the function 661 * will return false then, but if we successfully initialized 662 * it, we will get true. True means that there were no writes 663 * to this resource yet and we want to inform secondary that 664 * synchronization is not needed by sending "virgin" argument. 665 */ 666 if (init_resuid(res)) 667 nv_add_int8(nvout, 1, "virgin"); 668 } 669 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 670 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 671 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 672 if (nv_error(nvout) != 0) { 673 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 674 "Unable to allocate header for connection with %s", 675 res->hr_remoteaddr); 676 nv_free(nvout); 677 goto close; 678 } 679 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 680 pjdlog_errno(LOG_WARNING, 681 "Unable to send handshake header to %s", 682 res->hr_remoteaddr); 683 nv_free(nvout); 684 goto close; 685 } 686 nv_free(nvout); 687 if (hast_proto_recv_hdr(out, &nvin) == -1) { 688 pjdlog_errno(LOG_WARNING, 689 "Unable to receive handshake header from %s", 690 res->hr_remoteaddr); 691 goto close; 692 } 693 errmsg = nv_get_string(nvin, "errmsg"); 694 if (errmsg != NULL) { 695 pjdlog_warning("%s", errmsg); 696 nv_free(nvin); 697 goto close; 698 } 699 datasize = nv_get_int64(nvin, "datasize"); 700 if (datasize != res->hr_datasize) { 701 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 702 (intmax_t)res->hr_datasize, (intmax_t)datasize); 703 nv_free(nvin); 704 goto close; 705 } 706 extentsize = nv_get_int32(nvin, "extentsize"); 707 if (extentsize != res->hr_extentsize) { 708 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 709 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 710 nv_free(nvin); 711 goto close; 712 } 713 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 714 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 715 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 716 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) 717 enable_direct_reads(res); 718 if (nv_exists(nvin, "virgin")) { 719 /* 720 * Secondary was reinitialized, bump localcnt if it is 0 as 721 * only we have the data. 722 */ 723 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 724 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 725 726 if (res->hr_primary_localcnt == 0) { 727 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 728 729 mtx_lock(&metadata_lock); 730 res->hr_primary_localcnt++; 731 pjdlog_debug(1, "Increasing localcnt to %ju.", 732 (uintmax_t)res->hr_primary_localcnt); 733 (void)metadata_write(res); 734 mtx_unlock(&metadata_lock); 735 } 736 } 737 map = NULL; 738 mapsize = nv_get_uint32(nvin, "mapsize"); 739 if (mapsize > 0) { 740 map = malloc(mapsize); 741 if (map == NULL) { 742 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 743 (uintmax_t)mapsize); 744 nv_free(nvin); 745 goto close; 746 } 747 /* 748 * Remote node have some dirty extents on its own, lets 749 * download its activemap. 750 */ 751 if (hast_proto_recv_data(res, out, nvin, map, 752 mapsize) == -1) { 753 pjdlog_errno(LOG_ERR, 754 "Unable to receive remote activemap"); 755 nv_free(nvin); 756 free(map); 757 goto close; 758 } 759 /* 760 * Merge local and remote bitmaps. 761 */ 762 activemap_merge(res->hr_amp, map, mapsize); 763 free(map); 764 /* 765 * Now that we merged bitmaps from both nodes, flush it to the 766 * disk before we start to synchronize. 767 */ 768 (void)hast_activemap_flush(res); 769 } 770 nv_free(nvin); 771 #ifdef notyet 772 /* Setup directions. */ 773 if (proto_send(out, NULL, 0) == -1) 774 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 775 if (proto_recv(in, NULL, 0) == -1) 776 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 777 #endif 778 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 779 if (inp != NULL && outp != NULL) { 780 *inp = in; 781 *outp = out; 782 } else { 783 res->hr_remotein = in; 784 res->hr_remoteout = out; 785 } 786 event_send(res, EVENT_CONNECT); 787 return (0); 788 close: 789 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 790 event_send(res, EVENT_SPLITBRAIN); 791 proto_close(out); 792 if (in != NULL) 793 proto_close(in); 794 return (error); 795 } 796 797 static void 798 sync_start(void) 799 { 800 801 mtx_lock(&sync_lock); 802 sync_inprogress = true; 803 mtx_unlock(&sync_lock); 804 cv_signal(&sync_cond); 805 } 806 807 static void 808 sync_stop(void) 809 { 810 811 mtx_lock(&sync_lock); 812 if (sync_inprogress) 813 sync_inprogress = false; 814 mtx_unlock(&sync_lock); 815 } 816 817 static void 818 init_ggate(struct hast_resource *res) 819 { 820 struct g_gate_ctl_create ggiocreate; 821 struct g_gate_ctl_cancel ggiocancel; 822 823 /* 824 * We communicate with ggate via /dev/ggctl. Open it. 825 */ 826 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 827 if (res->hr_ggatefd == -1) 828 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 829 /* 830 * Create provider before trying to connect, as connection failure 831 * is not critical, but may take some time. 832 */ 833 bzero(&ggiocreate, sizeof(ggiocreate)); 834 ggiocreate.gctl_version = G_GATE_VERSION; 835 ggiocreate.gctl_mediasize = res->hr_datasize; 836 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 837 ggiocreate.gctl_flags = 0; 838 ggiocreate.gctl_maxcount = 0; 839 ggiocreate.gctl_timeout = 0; 840 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 841 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 842 res->hr_provname); 843 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 844 pjdlog_info("Device hast/%s created.", res->hr_provname); 845 res->hr_ggateunit = ggiocreate.gctl_unit; 846 return; 847 } 848 if (errno != EEXIST) { 849 primary_exit(EX_OSERR, "Unable to create hast/%s device", 850 res->hr_provname); 851 } 852 pjdlog_debug(1, 853 "Device hast/%s already exists, we will try to take it over.", 854 res->hr_provname); 855 /* 856 * If we received EEXIST, we assume that the process who created the 857 * provider died and didn't clean up. In that case we will start from 858 * where he left of. 859 */ 860 bzero(&ggiocancel, sizeof(ggiocancel)); 861 ggiocancel.gctl_version = G_GATE_VERSION; 862 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 863 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 864 res->hr_provname); 865 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 866 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 867 res->hr_ggateunit = ggiocancel.gctl_unit; 868 return; 869 } 870 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 871 res->hr_provname); 872 } 873 874 void 875 hastd_primary(struct hast_resource *res) 876 { 877 pthread_t td; 878 pid_t pid; 879 int error, mode, debuglevel; 880 881 /* 882 * Create communication channel for sending control commands from 883 * parent to child. 884 */ 885 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 886 /* TODO: There's no need for this to be fatal error. */ 887 KEEP_ERRNO((void)pidfile_remove(pfh)); 888 pjdlog_exit(EX_OSERR, 889 "Unable to create control sockets between parent and child"); 890 } 891 /* 892 * Create communication channel for sending events from child to parent. 893 */ 894 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 895 /* TODO: There's no need for this to be fatal error. */ 896 KEEP_ERRNO((void)pidfile_remove(pfh)); 897 pjdlog_exit(EX_OSERR, 898 "Unable to create event sockets between child and parent"); 899 } 900 /* 901 * Create communication channel for sending connection requests from 902 * child to parent. 903 */ 904 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 905 /* TODO: There's no need for this to be fatal error. */ 906 KEEP_ERRNO((void)pidfile_remove(pfh)); 907 pjdlog_exit(EX_OSERR, 908 "Unable to create connection sockets between child and parent"); 909 } 910 911 pid = fork(); 912 if (pid == -1) { 913 /* TODO: There's no need for this to be fatal error. */ 914 KEEP_ERRNO((void)pidfile_remove(pfh)); 915 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 916 } 917 918 if (pid > 0) { 919 /* This is parent. */ 920 /* Declare that we are receiver. */ 921 proto_recv(res->hr_event, NULL, 0); 922 proto_recv(res->hr_conn, NULL, 0); 923 /* Declare that we are sender. */ 924 proto_send(res->hr_ctrl, NULL, 0); 925 res->hr_workerpid = pid; 926 return; 927 } 928 929 gres = res; 930 mode = pjdlog_mode_get(); 931 debuglevel = pjdlog_debug_get(); 932 933 /* Declare that we are sender. */ 934 proto_send(res->hr_event, NULL, 0); 935 proto_send(res->hr_conn, NULL, 0); 936 /* Declare that we are receiver. */ 937 proto_recv(res->hr_ctrl, NULL, 0); 938 descriptors_cleanup(res); 939 940 descriptors_assert(res, mode); 941 942 pjdlog_init(mode); 943 pjdlog_debug_set(debuglevel); 944 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 945 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 946 947 init_local(res); 948 init_ggate(res); 949 init_environment(res); 950 951 if (drop_privs(res) != 0) { 952 cleanup(res); 953 exit(EX_CONFIG); 954 } 955 pjdlog_info("Privileges successfully dropped."); 956 957 /* 958 * Create the guard thread first, so we can handle signals from the 959 * very beginning. 960 */ 961 error = pthread_create(&td, NULL, guard_thread, res); 962 PJDLOG_ASSERT(error == 0); 963 /* 964 * Create the control thread before sending any event to the parent, 965 * as we can deadlock when parent sends control request to worker, 966 * but worker has no control thread started yet, so parent waits. 967 * In the meantime worker sends an event to the parent, but parent 968 * is unable to handle the event, because it waits for control 969 * request response. 970 */ 971 error = pthread_create(&td, NULL, ctrl_thread, res); 972 PJDLOG_ASSERT(error == 0); 973 if (real_remote(res)) { 974 error = init_remote(res, NULL, NULL); 975 if (error == 0) { 976 sync_start(); 977 } else if (error == EBUSY) { 978 time_t start = time(NULL); 979 980 pjdlog_warning("Waiting for remote node to become %s for %ds.", 981 role2str(HAST_ROLE_SECONDARY), 982 res->hr_timeout); 983 for (;;) { 984 sleep(1); 985 error = init_remote(res, NULL, NULL); 986 if (error != EBUSY) 987 break; 988 if (time(NULL) > start + res->hr_timeout) 989 break; 990 } 991 if (error == EBUSY) { 992 pjdlog_warning("Remote node is still %s, starting anyway.", 993 role2str(HAST_ROLE_PRIMARY)); 994 } 995 } 996 } 997 error = pthread_create(&td, NULL, ggate_recv_thread, res); 998 PJDLOG_ASSERT(error == 0); 999 error = pthread_create(&td, NULL, local_send_thread, res); 1000 PJDLOG_ASSERT(error == 0); 1001 error = pthread_create(&td, NULL, remote_send_thread, res); 1002 PJDLOG_ASSERT(error == 0); 1003 error = pthread_create(&td, NULL, remote_recv_thread, res); 1004 PJDLOG_ASSERT(error == 0); 1005 error = pthread_create(&td, NULL, ggate_send_thread, res); 1006 PJDLOG_ASSERT(error == 0); 1007 fullystarted = true; 1008 (void)sync_thread(res); 1009 } 1010 1011 static void 1012 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 1013 { 1014 char msg[1024]; 1015 va_list ap; 1016 1017 va_start(ap, fmt); 1018 (void)vsnprintf(msg, sizeof(msg), fmt, ap); 1019 va_end(ap); 1020 switch (ggio->gctl_cmd) { 1021 case BIO_READ: 1022 (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1023 (uintmax_t)ggio->gctl_offset, 1024 (uintmax_t)ggio->gctl_length); 1025 break; 1026 case BIO_DELETE: 1027 (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1028 (uintmax_t)ggio->gctl_offset, 1029 (uintmax_t)ggio->gctl_length); 1030 break; 1031 case BIO_FLUSH: 1032 (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1033 break; 1034 case BIO_WRITE: 1035 (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1036 (uintmax_t)ggio->gctl_offset, 1037 (uintmax_t)ggio->gctl_length); 1038 break; 1039 default: 1040 (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1041 (unsigned int)ggio->gctl_cmd); 1042 break; 1043 } 1044 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1045 } 1046 1047 static void 1048 remote_close(struct hast_resource *res, int ncomp) 1049 { 1050 1051 rw_wlock(&hio_remote_lock[ncomp]); 1052 /* 1053 * Check for a race between dropping rlock and acquiring wlock - 1054 * another thread can close connection in-between. 1055 */ 1056 if (!ISCONNECTED(res, ncomp)) { 1057 PJDLOG_ASSERT(res->hr_remotein == NULL); 1058 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1059 rw_unlock(&hio_remote_lock[ncomp]); 1060 return; 1061 } 1062 1063 PJDLOG_ASSERT(res->hr_remotein != NULL); 1064 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1065 1066 pjdlog_debug(2, "Closing incoming connection to %s.", 1067 res->hr_remoteaddr); 1068 proto_close(res->hr_remotein); 1069 res->hr_remotein = NULL; 1070 pjdlog_debug(2, "Closing outgoing connection to %s.", 1071 res->hr_remoteaddr); 1072 proto_close(res->hr_remoteout); 1073 res->hr_remoteout = NULL; 1074 1075 rw_unlock(&hio_remote_lock[ncomp]); 1076 1077 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1078 1079 /* 1080 * Stop synchronization if in-progress. 1081 */ 1082 sync_stop(); 1083 1084 event_send(res, EVENT_DISCONNECT); 1085 } 1086 1087 /* 1088 * Acknowledge write completion to the kernel, but don't update activemap yet. 1089 */ 1090 static void 1091 write_complete(struct hast_resource *res, struct hio *hio) 1092 { 1093 struct g_gate_ctl_io *ggio; 1094 unsigned int ncomp; 1095 1096 PJDLOG_ASSERT(!hio->hio_done); 1097 1098 ggio = &hio->hio_ggio; 1099 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1100 1101 /* 1102 * Bump local count if this is first write after 1103 * connection failure with remote node. 1104 */ 1105 ncomp = 1; 1106 rw_rlock(&hio_remote_lock[ncomp]); 1107 if (!ISCONNECTED(res, ncomp)) { 1108 mtx_lock(&metadata_lock); 1109 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1110 res->hr_primary_localcnt++; 1111 pjdlog_debug(1, "Increasing localcnt to %ju.", 1112 (uintmax_t)res->hr_primary_localcnt); 1113 (void)metadata_write(res); 1114 } 1115 mtx_unlock(&metadata_lock); 1116 } 1117 rw_unlock(&hio_remote_lock[ncomp]); 1118 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1119 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1120 hio->hio_done = true; 1121 } 1122 1123 /* 1124 * Thread receives ggate I/O requests from the kernel and passes them to 1125 * appropriate threads: 1126 * WRITE - always goes to both local_send and remote_send threads 1127 * READ (when the block is up-to-date on local component) - 1128 * only local_send thread 1129 * READ (when the block isn't up-to-date on local component) - 1130 * only remote_send thread 1131 * DELETE - always goes to both local_send and remote_send threads 1132 * FLUSH - always goes to both local_send and remote_send threads 1133 */ 1134 static void * 1135 ggate_recv_thread(void *arg) 1136 { 1137 struct hast_resource *res = arg; 1138 struct g_gate_ctl_io *ggio; 1139 struct hio *hio; 1140 unsigned int ii, ncomp, ncomps; 1141 int error; 1142 1143 for (;;) { 1144 pjdlog_debug(2, "ggate_recv: Taking free request."); 1145 QUEUE_TAKE2(hio, free); 1146 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1147 ggio = &hio->hio_ggio; 1148 ggio->gctl_unit = res->hr_ggateunit; 1149 ggio->gctl_length = MAXPHYS; 1150 ggio->gctl_error = 0; 1151 hio->hio_done = false; 1152 hio->hio_replication = res->hr_replication; 1153 pjdlog_debug(2, 1154 "ggate_recv: (%p) Waiting for request from the kernel.", 1155 hio); 1156 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1157 if (sigexit_received) 1158 pthread_exit(NULL); 1159 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1160 } 1161 error = ggio->gctl_error; 1162 switch (error) { 1163 case 0: 1164 break; 1165 case ECANCELED: 1166 /* Exit gracefully. */ 1167 if (!sigexit_received) { 1168 pjdlog_debug(2, 1169 "ggate_recv: (%p) Received cancel from the kernel.", 1170 hio); 1171 pjdlog_info("Received cancel from the kernel, exiting."); 1172 } 1173 pthread_exit(NULL); 1174 case ENOMEM: 1175 /* 1176 * Buffer too small? Impossible, we allocate MAXPHYS 1177 * bytes - request can't be bigger than that. 1178 */ 1179 /* FALLTHROUGH */ 1180 case ENXIO: 1181 default: 1182 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1183 strerror(error)); 1184 } 1185 1186 ncomp = 0; 1187 ncomps = HAST_NCOMPONENTS; 1188 1189 for (ii = 0; ii < ncomps; ii++) 1190 hio->hio_errors[ii] = EINVAL; 1191 reqlog(LOG_DEBUG, 2, ggio, 1192 "ggate_recv: (%p) Request received from the kernel: ", 1193 hio); 1194 1195 /* 1196 * Inform all components about new write request. 1197 * For read request prefer local component unless the given 1198 * range is out-of-date, then use remote component. 1199 */ 1200 switch (ggio->gctl_cmd) { 1201 case BIO_READ: 1202 res->hr_stat_read++; 1203 ncomps = 1; 1204 mtx_lock(&metadata_lock); 1205 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1206 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1207 /* 1208 * This range is up-to-date on local component, 1209 * so handle request locally. 1210 */ 1211 /* Local component is 0 for now. */ 1212 ncomp = 0; 1213 } else /* if (res->hr_syncsrc == 1214 HAST_SYNCSRC_SECONDARY) */ { 1215 PJDLOG_ASSERT(res->hr_syncsrc == 1216 HAST_SYNCSRC_SECONDARY); 1217 /* 1218 * This range is out-of-date on local component, 1219 * so send request to the remote node. 1220 */ 1221 /* Remote component is 1 for now. */ 1222 ncomp = 1; 1223 } 1224 mtx_unlock(&metadata_lock); 1225 break; 1226 case BIO_WRITE: 1227 res->hr_stat_write++; 1228 if (res->hr_resuid == 0 && 1229 res->hr_primary_localcnt == 0) { 1230 /* This is first write. */ 1231 res->hr_primary_localcnt = 1; 1232 } 1233 for (;;) { 1234 mtx_lock(&range_lock); 1235 if (rangelock_islocked(range_sync, 1236 ggio->gctl_offset, ggio->gctl_length)) { 1237 pjdlog_debug(2, 1238 "regular: Range offset=%jd length=%zu locked.", 1239 (intmax_t)ggio->gctl_offset, 1240 (size_t)ggio->gctl_length); 1241 range_regular_wait = true; 1242 cv_wait(&range_regular_cond, &range_lock); 1243 range_regular_wait = false; 1244 mtx_unlock(&range_lock); 1245 continue; 1246 } 1247 if (rangelock_add(range_regular, 1248 ggio->gctl_offset, ggio->gctl_length) == -1) { 1249 mtx_unlock(&range_lock); 1250 pjdlog_debug(2, 1251 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1252 (intmax_t)ggio->gctl_offset, 1253 (size_t)ggio->gctl_length); 1254 sleep(1); 1255 continue; 1256 } 1257 mtx_unlock(&range_lock); 1258 break; 1259 } 1260 mtx_lock(&res->hr_amp_lock); 1261 if (activemap_write_start(res->hr_amp, 1262 ggio->gctl_offset, ggio->gctl_length)) { 1263 res->hr_stat_activemap_update++; 1264 (void)hast_activemap_flush(res); 1265 } 1266 mtx_unlock(&res->hr_amp_lock); 1267 break; 1268 case BIO_DELETE: 1269 res->hr_stat_delete++; 1270 break; 1271 case BIO_FLUSH: 1272 res->hr_stat_flush++; 1273 break; 1274 } 1275 pjdlog_debug(2, 1276 "ggate_recv: (%p) Moving request to the send queues.", hio); 1277 refcount_init(&hio->hio_countdown, ncomps); 1278 for (ii = ncomp; ii < ncomp + ncomps; ii++) 1279 QUEUE_INSERT1(hio, send, ii); 1280 } 1281 /* NOTREACHED */ 1282 return (NULL); 1283 } 1284 1285 /* 1286 * Thread reads from or writes to local component. 1287 * If local read fails, it redirects it to remote_send thread. 1288 */ 1289 static void * 1290 local_send_thread(void *arg) 1291 { 1292 struct hast_resource *res = arg; 1293 struct g_gate_ctl_io *ggio; 1294 struct hio *hio; 1295 unsigned int ncomp, rncomp; 1296 ssize_t ret; 1297 1298 /* Local component is 0 for now. */ 1299 ncomp = 0; 1300 /* Remote component is 1 for now. */ 1301 rncomp = 1; 1302 1303 for (;;) { 1304 pjdlog_debug(2, "local_send: Taking request."); 1305 QUEUE_TAKE1(hio, send, ncomp, 0); 1306 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1307 ggio = &hio->hio_ggio; 1308 switch (ggio->gctl_cmd) { 1309 case BIO_READ: 1310 ret = pread(res->hr_localfd, ggio->gctl_data, 1311 ggio->gctl_length, 1312 ggio->gctl_offset + res->hr_localoff); 1313 if (ret == ggio->gctl_length) 1314 hio->hio_errors[ncomp] = 0; 1315 else if (!ISSYNCREQ(hio)) { 1316 /* 1317 * If READ failed, try to read from remote node. 1318 */ 1319 if (ret == -1) { 1320 reqlog(LOG_WARNING, 0, ggio, 1321 "Local request failed (%s), trying remote node. ", 1322 strerror(errno)); 1323 } else if (ret != ggio->gctl_length) { 1324 reqlog(LOG_WARNING, 0, ggio, 1325 "Local request failed (%zd != %jd), trying remote node. ", 1326 ret, (intmax_t)ggio->gctl_length); 1327 } 1328 QUEUE_INSERT1(hio, send, rncomp); 1329 continue; 1330 } 1331 break; 1332 case BIO_WRITE: 1333 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1334 ggio->gctl_length, 1335 ggio->gctl_offset + res->hr_localoff); 1336 if (ret == -1) { 1337 hio->hio_errors[ncomp] = errno; 1338 reqlog(LOG_WARNING, 0, ggio, 1339 "Local request failed (%s): ", 1340 strerror(errno)); 1341 } else if (ret != ggio->gctl_length) { 1342 hio->hio_errors[ncomp] = EIO; 1343 reqlog(LOG_WARNING, 0, ggio, 1344 "Local request failed (%zd != %jd): ", 1345 ret, (intmax_t)ggio->gctl_length); 1346 } else { 1347 hio->hio_errors[ncomp] = 0; 1348 if (hio->hio_replication == 1349 HAST_REPLICATION_ASYNC && 1350 !ISSYNCREQ(hio)) { 1351 ggio->gctl_error = 0; 1352 write_complete(res, hio); 1353 } 1354 } 1355 break; 1356 case BIO_DELETE: 1357 ret = g_delete(res->hr_localfd, 1358 ggio->gctl_offset + res->hr_localoff, 1359 ggio->gctl_length); 1360 if (ret == -1) { 1361 hio->hio_errors[ncomp] = errno; 1362 reqlog(LOG_WARNING, 0, ggio, 1363 "Local request failed (%s): ", 1364 strerror(errno)); 1365 } else { 1366 hio->hio_errors[ncomp] = 0; 1367 } 1368 break; 1369 case BIO_FLUSH: 1370 if (!res->hr_localflush) { 1371 ret = -1; 1372 errno = EOPNOTSUPP; 1373 break; 1374 } 1375 ret = g_flush(res->hr_localfd); 1376 if (ret == -1) { 1377 if (errno == EOPNOTSUPP) 1378 res->hr_localflush = false; 1379 hio->hio_errors[ncomp] = errno; 1380 reqlog(LOG_WARNING, 0, ggio, 1381 "Local request failed (%s): ", 1382 strerror(errno)); 1383 } else { 1384 hio->hio_errors[ncomp] = 0; 1385 } 1386 break; 1387 } 1388 if (!refcount_release(&hio->hio_countdown)) 1389 continue; 1390 if (ISSYNCREQ(hio)) { 1391 mtx_lock(&sync_lock); 1392 SYNCREQDONE(hio); 1393 mtx_unlock(&sync_lock); 1394 cv_signal(&sync_cond); 1395 } else { 1396 pjdlog_debug(2, 1397 "local_send: (%p) Moving request to the done queue.", 1398 hio); 1399 QUEUE_INSERT2(hio, done); 1400 } 1401 } 1402 /* NOTREACHED */ 1403 return (NULL); 1404 } 1405 1406 static void 1407 keepalive_send(struct hast_resource *res, unsigned int ncomp) 1408 { 1409 struct nv *nv; 1410 1411 rw_rlock(&hio_remote_lock[ncomp]); 1412 1413 if (!ISCONNECTED(res, ncomp)) { 1414 rw_unlock(&hio_remote_lock[ncomp]); 1415 return; 1416 } 1417 1418 PJDLOG_ASSERT(res->hr_remotein != NULL); 1419 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1420 1421 nv = nv_alloc(); 1422 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1423 if (nv_error(nv) != 0) { 1424 rw_unlock(&hio_remote_lock[ncomp]); 1425 nv_free(nv); 1426 pjdlog_debug(1, 1427 "keepalive_send: Unable to prepare header to send."); 1428 return; 1429 } 1430 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1431 rw_unlock(&hio_remote_lock[ncomp]); 1432 pjdlog_common(LOG_DEBUG, 1, errno, 1433 "keepalive_send: Unable to send request"); 1434 nv_free(nv); 1435 remote_close(res, ncomp); 1436 return; 1437 } 1438 1439 rw_unlock(&hio_remote_lock[ncomp]); 1440 nv_free(nv); 1441 pjdlog_debug(2, "keepalive_send: Request sent."); 1442 } 1443 1444 /* 1445 * Thread sends request to secondary node. 1446 */ 1447 static void * 1448 remote_send_thread(void *arg) 1449 { 1450 struct hast_resource *res = arg; 1451 struct g_gate_ctl_io *ggio; 1452 time_t lastcheck, now; 1453 struct hio *hio; 1454 struct nv *nv; 1455 unsigned int ncomp; 1456 bool wakeup; 1457 uint64_t offset, length; 1458 uint8_t cmd; 1459 void *data; 1460 1461 /* Remote component is 1 for now. */ 1462 ncomp = 1; 1463 lastcheck = time(NULL); 1464 1465 for (;;) { 1466 pjdlog_debug(2, "remote_send: Taking request."); 1467 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1468 if (hio == NULL) { 1469 now = time(NULL); 1470 if (lastcheck + HAST_KEEPALIVE <= now) { 1471 keepalive_send(res, ncomp); 1472 lastcheck = now; 1473 } 1474 continue; 1475 } 1476 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1477 ggio = &hio->hio_ggio; 1478 switch (ggio->gctl_cmd) { 1479 case BIO_READ: 1480 cmd = HIO_READ; 1481 data = NULL; 1482 offset = ggio->gctl_offset; 1483 length = ggio->gctl_length; 1484 break; 1485 case BIO_WRITE: 1486 cmd = HIO_WRITE; 1487 data = ggio->gctl_data; 1488 offset = ggio->gctl_offset; 1489 length = ggio->gctl_length; 1490 break; 1491 case BIO_DELETE: 1492 cmd = HIO_DELETE; 1493 data = NULL; 1494 offset = ggio->gctl_offset; 1495 length = ggio->gctl_length; 1496 break; 1497 case BIO_FLUSH: 1498 cmd = HIO_FLUSH; 1499 data = NULL; 1500 offset = 0; 1501 length = 0; 1502 break; 1503 default: 1504 PJDLOG_ABORT("invalid condition"); 1505 } 1506 nv = nv_alloc(); 1507 nv_add_uint8(nv, cmd, "cmd"); 1508 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1509 nv_add_uint64(nv, offset, "offset"); 1510 nv_add_uint64(nv, length, "length"); 1511 if (nv_error(nv) != 0) { 1512 hio->hio_errors[ncomp] = nv_error(nv); 1513 pjdlog_debug(2, 1514 "remote_send: (%p) Unable to prepare header to send.", 1515 hio); 1516 reqlog(LOG_ERR, 0, ggio, 1517 "Unable to prepare header to send (%s): ", 1518 strerror(nv_error(nv))); 1519 /* Move failed request immediately to the done queue. */ 1520 goto done_queue; 1521 } 1522 /* 1523 * Protect connection from disappearing. 1524 */ 1525 rw_rlock(&hio_remote_lock[ncomp]); 1526 if (!ISCONNECTED(res, ncomp)) { 1527 rw_unlock(&hio_remote_lock[ncomp]); 1528 hio->hio_errors[ncomp] = ENOTCONN; 1529 goto done_queue; 1530 } 1531 /* 1532 * Move the request to recv queue before sending it, because 1533 * in different order we can get reply before we move request 1534 * to recv queue. 1535 */ 1536 pjdlog_debug(2, 1537 "remote_send: (%p) Moving request to the recv queue.", 1538 hio); 1539 mtx_lock(&hio_recv_list_lock[ncomp]); 1540 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1541 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1542 mtx_unlock(&hio_recv_list_lock[ncomp]); 1543 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1544 data != NULL ? length : 0) == -1) { 1545 hio->hio_errors[ncomp] = errno; 1546 rw_unlock(&hio_remote_lock[ncomp]); 1547 pjdlog_debug(2, 1548 "remote_send: (%p) Unable to send request.", hio); 1549 reqlog(LOG_ERR, 0, ggio, 1550 "Unable to send request (%s): ", 1551 strerror(hio->hio_errors[ncomp])); 1552 remote_close(res, ncomp); 1553 /* 1554 * Take request back from the receive queue and move 1555 * it immediately to the done queue. 1556 */ 1557 mtx_lock(&hio_recv_list_lock[ncomp]); 1558 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1559 hio_next[ncomp]); 1560 mtx_unlock(&hio_recv_list_lock[ncomp]); 1561 goto done_queue; 1562 } 1563 rw_unlock(&hio_remote_lock[ncomp]); 1564 nv_free(nv); 1565 if (wakeup) 1566 cv_signal(&hio_recv_list_cond[ncomp]); 1567 continue; 1568 done_queue: 1569 nv_free(nv); 1570 if (ISSYNCREQ(hio)) { 1571 if (!refcount_release(&hio->hio_countdown)) 1572 continue; 1573 mtx_lock(&sync_lock); 1574 SYNCREQDONE(hio); 1575 mtx_unlock(&sync_lock); 1576 cv_signal(&sync_cond); 1577 continue; 1578 } 1579 if (ggio->gctl_cmd == BIO_WRITE) { 1580 mtx_lock(&res->hr_amp_lock); 1581 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1582 ggio->gctl_length)) { 1583 (void)hast_activemap_flush(res); 1584 } 1585 mtx_unlock(&res->hr_amp_lock); 1586 } 1587 if (!refcount_release(&hio->hio_countdown)) 1588 continue; 1589 pjdlog_debug(2, 1590 "remote_send: (%p) Moving request to the done queue.", 1591 hio); 1592 QUEUE_INSERT2(hio, done); 1593 } 1594 /* NOTREACHED */ 1595 return (NULL); 1596 } 1597 1598 /* 1599 * Thread receives answer from secondary node and passes it to ggate_send 1600 * thread. 1601 */ 1602 static void * 1603 remote_recv_thread(void *arg) 1604 { 1605 struct hast_resource *res = arg; 1606 struct g_gate_ctl_io *ggio; 1607 struct hio *hio; 1608 struct nv *nv; 1609 unsigned int ncomp; 1610 uint64_t seq; 1611 int error; 1612 1613 /* Remote component is 1 for now. */ 1614 ncomp = 1; 1615 1616 for (;;) { 1617 /* Wait until there is anything to receive. */ 1618 mtx_lock(&hio_recv_list_lock[ncomp]); 1619 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1620 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1621 cv_wait(&hio_recv_list_cond[ncomp], 1622 &hio_recv_list_lock[ncomp]); 1623 } 1624 mtx_unlock(&hio_recv_list_lock[ncomp]); 1625 1626 rw_rlock(&hio_remote_lock[ncomp]); 1627 if (!ISCONNECTED(res, ncomp)) { 1628 rw_unlock(&hio_remote_lock[ncomp]); 1629 /* 1630 * Connection is dead, so move all pending requests to 1631 * the done queue (one-by-one). 1632 */ 1633 mtx_lock(&hio_recv_list_lock[ncomp]); 1634 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1635 PJDLOG_ASSERT(hio != NULL); 1636 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1637 hio_next[ncomp]); 1638 mtx_unlock(&hio_recv_list_lock[ncomp]); 1639 goto done_queue; 1640 } 1641 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1642 pjdlog_errno(LOG_ERR, 1643 "Unable to receive reply header"); 1644 rw_unlock(&hio_remote_lock[ncomp]); 1645 remote_close(res, ncomp); 1646 continue; 1647 } 1648 rw_unlock(&hio_remote_lock[ncomp]); 1649 seq = nv_get_uint64(nv, "seq"); 1650 if (seq == 0) { 1651 pjdlog_error("Header contains no 'seq' field."); 1652 nv_free(nv); 1653 continue; 1654 } 1655 mtx_lock(&hio_recv_list_lock[ncomp]); 1656 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1657 if (hio->hio_ggio.gctl_seq == seq) { 1658 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1659 hio_next[ncomp]); 1660 break; 1661 } 1662 } 1663 mtx_unlock(&hio_recv_list_lock[ncomp]); 1664 if (hio == NULL) { 1665 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1666 (uintmax_t)seq); 1667 nv_free(nv); 1668 continue; 1669 } 1670 ggio = &hio->hio_ggio; 1671 error = nv_get_int16(nv, "error"); 1672 if (error != 0) { 1673 /* Request failed on remote side. */ 1674 hio->hio_errors[ncomp] = error; 1675 reqlog(LOG_WARNING, 0, ggio, 1676 "Remote request failed (%s): ", strerror(error)); 1677 nv_free(nv); 1678 goto done_queue; 1679 } 1680 switch (ggio->gctl_cmd) { 1681 case BIO_READ: 1682 rw_rlock(&hio_remote_lock[ncomp]); 1683 if (!ISCONNECTED(res, ncomp)) { 1684 rw_unlock(&hio_remote_lock[ncomp]); 1685 nv_free(nv); 1686 goto done_queue; 1687 } 1688 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1689 ggio->gctl_data, ggio->gctl_length) == -1) { 1690 hio->hio_errors[ncomp] = errno; 1691 pjdlog_errno(LOG_ERR, 1692 "Unable to receive reply data"); 1693 rw_unlock(&hio_remote_lock[ncomp]); 1694 nv_free(nv); 1695 remote_close(res, ncomp); 1696 goto done_queue; 1697 } 1698 rw_unlock(&hio_remote_lock[ncomp]); 1699 break; 1700 case BIO_WRITE: 1701 case BIO_DELETE: 1702 case BIO_FLUSH: 1703 break; 1704 default: 1705 PJDLOG_ABORT("invalid condition"); 1706 } 1707 hio->hio_errors[ncomp] = 0; 1708 nv_free(nv); 1709 done_queue: 1710 if (!refcount_release(&hio->hio_countdown)) 1711 continue; 1712 if (ISSYNCREQ(hio)) { 1713 mtx_lock(&sync_lock); 1714 SYNCREQDONE(hio); 1715 mtx_unlock(&sync_lock); 1716 cv_signal(&sync_cond); 1717 } else { 1718 pjdlog_debug(2, 1719 "remote_recv: (%p) Moving request to the done queue.", 1720 hio); 1721 QUEUE_INSERT2(hio, done); 1722 } 1723 } 1724 /* NOTREACHED */ 1725 return (NULL); 1726 } 1727 1728 /* 1729 * Thread sends answer to the kernel. 1730 */ 1731 static void * 1732 ggate_send_thread(void *arg) 1733 { 1734 struct hast_resource *res = arg; 1735 struct g_gate_ctl_io *ggio; 1736 struct hio *hio; 1737 unsigned int ii, ncomps; 1738 1739 ncomps = HAST_NCOMPONENTS; 1740 1741 for (;;) { 1742 pjdlog_debug(2, "ggate_send: Taking request."); 1743 QUEUE_TAKE2(hio, done); 1744 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1745 ggio = &hio->hio_ggio; 1746 for (ii = 0; ii < ncomps; ii++) { 1747 if (hio->hio_errors[ii] == 0) { 1748 /* 1749 * One successful request is enough to declare 1750 * success. 1751 */ 1752 ggio->gctl_error = 0; 1753 break; 1754 } 1755 } 1756 if (ii == ncomps) { 1757 /* 1758 * None of the requests were successful. 1759 * Use the error from local component except the 1760 * case when we did only remote request. 1761 */ 1762 if (ggio->gctl_cmd == BIO_READ && 1763 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1764 ggio->gctl_error = hio->hio_errors[1]; 1765 else 1766 ggio->gctl_error = hio->hio_errors[0]; 1767 } 1768 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1769 mtx_lock(&res->hr_amp_lock); 1770 if (activemap_write_complete(res->hr_amp, 1771 ggio->gctl_offset, ggio->gctl_length)) { 1772 res->hr_stat_activemap_update++; 1773 (void)hast_activemap_flush(res); 1774 } 1775 mtx_unlock(&res->hr_amp_lock); 1776 } 1777 if (ggio->gctl_cmd == BIO_WRITE) { 1778 /* 1779 * Unlock range we locked. 1780 */ 1781 mtx_lock(&range_lock); 1782 rangelock_del(range_regular, ggio->gctl_offset, 1783 ggio->gctl_length); 1784 if (range_sync_wait) 1785 cv_signal(&range_sync_cond); 1786 mtx_unlock(&range_lock); 1787 if (!hio->hio_done) 1788 write_complete(res, hio); 1789 } else { 1790 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1791 primary_exit(EX_OSERR, 1792 "G_GATE_CMD_DONE failed"); 1793 } 1794 } 1795 pjdlog_debug(2, 1796 "ggate_send: (%p) Moving request to the free queue.", hio); 1797 QUEUE_INSERT2(hio, free); 1798 } 1799 /* NOTREACHED */ 1800 return (NULL); 1801 } 1802 1803 /* 1804 * Thread synchronize local and remote components. 1805 */ 1806 static void * 1807 sync_thread(void *arg __unused) 1808 { 1809 struct hast_resource *res = arg; 1810 struct hio *hio; 1811 struct g_gate_ctl_io *ggio; 1812 struct timeval tstart, tend, tdiff; 1813 unsigned int ii, ncomp, ncomps; 1814 off_t offset, length, synced; 1815 bool dorewind, directreads; 1816 int syncext; 1817 1818 ncomps = HAST_NCOMPONENTS; 1819 dorewind = true; 1820 synced = 0; 1821 offset = -1; 1822 directreads = false; 1823 1824 for (;;) { 1825 mtx_lock(&sync_lock); 1826 if (offset >= 0 && !sync_inprogress) { 1827 gettimeofday(&tend, NULL); 1828 timersub(&tend, &tstart, &tdiff); 1829 pjdlog_info("Synchronization interrupted after %#.0T. " 1830 "%NB synchronized so far.", &tdiff, 1831 (intmax_t)synced); 1832 event_send(res, EVENT_SYNCINTR); 1833 } 1834 while (!sync_inprogress) { 1835 dorewind = true; 1836 synced = 0; 1837 cv_wait(&sync_cond, &sync_lock); 1838 } 1839 mtx_unlock(&sync_lock); 1840 /* 1841 * Obtain offset at which we should synchronize. 1842 * Rewind synchronization if needed. 1843 */ 1844 mtx_lock(&res->hr_amp_lock); 1845 if (dorewind) 1846 activemap_sync_rewind(res->hr_amp); 1847 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1848 if (syncext != -1) { 1849 /* 1850 * We synchronized entire syncext extent, we can mark 1851 * it as clean now. 1852 */ 1853 if (activemap_extent_complete(res->hr_amp, syncext)) 1854 (void)hast_activemap_flush(res); 1855 } 1856 mtx_unlock(&res->hr_amp_lock); 1857 if (dorewind) { 1858 dorewind = false; 1859 if (offset == -1) 1860 pjdlog_info("Nodes are in sync."); 1861 else { 1862 pjdlog_info("Synchronization started. %NB to go.", 1863 (intmax_t)(res->hr_extentsize * 1864 activemap_ndirty(res->hr_amp))); 1865 event_send(res, EVENT_SYNCSTART); 1866 gettimeofday(&tstart, NULL); 1867 } 1868 } 1869 if (offset == -1) { 1870 sync_stop(); 1871 pjdlog_debug(1, "Nothing to synchronize."); 1872 /* 1873 * Synchronization complete, make both localcnt and 1874 * remotecnt equal. 1875 */ 1876 ncomp = 1; 1877 rw_rlock(&hio_remote_lock[ncomp]); 1878 if (ISCONNECTED(res, ncomp)) { 1879 if (synced > 0) { 1880 int64_t bps; 1881 1882 gettimeofday(&tend, NULL); 1883 timersub(&tend, &tstart, &tdiff); 1884 bps = (int64_t)((double)synced / 1885 ((double)tdiff.tv_sec + 1886 (double)tdiff.tv_usec / 1000000)); 1887 pjdlog_info("Synchronization complete. " 1888 "%NB synchronized in %#.0lT (%NB/sec).", 1889 (intmax_t)synced, &tdiff, 1890 (intmax_t)bps); 1891 event_send(res, EVENT_SYNCDONE); 1892 } 1893 mtx_lock(&metadata_lock); 1894 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1895 directreads = true; 1896 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1897 res->hr_primary_localcnt = 1898 res->hr_secondary_remotecnt; 1899 res->hr_primary_remotecnt = 1900 res->hr_secondary_localcnt; 1901 pjdlog_debug(1, 1902 "Setting localcnt to %ju and remotecnt to %ju.", 1903 (uintmax_t)res->hr_primary_localcnt, 1904 (uintmax_t)res->hr_primary_remotecnt); 1905 (void)metadata_write(res); 1906 mtx_unlock(&metadata_lock); 1907 } 1908 rw_unlock(&hio_remote_lock[ncomp]); 1909 if (directreads) { 1910 directreads = false; 1911 enable_direct_reads(res); 1912 } 1913 continue; 1914 } 1915 pjdlog_debug(2, "sync: Taking free request."); 1916 QUEUE_TAKE2(hio, free); 1917 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1918 /* 1919 * Lock the range we are going to synchronize. We don't want 1920 * race where someone writes between our read and write. 1921 */ 1922 for (;;) { 1923 mtx_lock(&range_lock); 1924 if (rangelock_islocked(range_regular, offset, length)) { 1925 pjdlog_debug(2, 1926 "sync: Range offset=%jd length=%jd locked.", 1927 (intmax_t)offset, (intmax_t)length); 1928 range_sync_wait = true; 1929 cv_wait(&range_sync_cond, &range_lock); 1930 range_sync_wait = false; 1931 mtx_unlock(&range_lock); 1932 continue; 1933 } 1934 if (rangelock_add(range_sync, offset, length) == -1) { 1935 mtx_unlock(&range_lock); 1936 pjdlog_debug(2, 1937 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1938 (intmax_t)offset, (intmax_t)length); 1939 sleep(1); 1940 continue; 1941 } 1942 mtx_unlock(&range_lock); 1943 break; 1944 } 1945 /* 1946 * First read the data from synchronization source. 1947 */ 1948 SYNCREQ(hio); 1949 ggio = &hio->hio_ggio; 1950 ggio->gctl_cmd = BIO_READ; 1951 ggio->gctl_offset = offset; 1952 ggio->gctl_length = length; 1953 ggio->gctl_error = 0; 1954 hio->hio_done = false; 1955 hio->hio_replication = res->hr_replication; 1956 for (ii = 0; ii < ncomps; ii++) 1957 hio->hio_errors[ii] = EINVAL; 1958 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1959 hio); 1960 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1961 hio); 1962 mtx_lock(&metadata_lock); 1963 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1964 /* 1965 * This range is up-to-date on local component, 1966 * so handle request locally. 1967 */ 1968 /* Local component is 0 for now. */ 1969 ncomp = 0; 1970 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1971 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1972 /* 1973 * This range is out-of-date on local component, 1974 * so send request to the remote node. 1975 */ 1976 /* Remote component is 1 for now. */ 1977 ncomp = 1; 1978 } 1979 mtx_unlock(&metadata_lock); 1980 refcount_init(&hio->hio_countdown, 1); 1981 QUEUE_INSERT1(hio, send, ncomp); 1982 1983 /* 1984 * Let's wait for READ to finish. 1985 */ 1986 mtx_lock(&sync_lock); 1987 while (!ISSYNCREQDONE(hio)) 1988 cv_wait(&sync_cond, &sync_lock); 1989 mtx_unlock(&sync_lock); 1990 1991 if (hio->hio_errors[ncomp] != 0) { 1992 pjdlog_error("Unable to read synchronization data: %s.", 1993 strerror(hio->hio_errors[ncomp])); 1994 goto free_queue; 1995 } 1996 1997 /* 1998 * We read the data from synchronization source, now write it 1999 * to synchronization target. 2000 */ 2001 SYNCREQ(hio); 2002 ggio->gctl_cmd = BIO_WRITE; 2003 for (ii = 0; ii < ncomps; ii++) 2004 hio->hio_errors[ii] = EINVAL; 2005 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2006 hio); 2007 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2008 hio); 2009 mtx_lock(&metadata_lock); 2010 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2011 /* 2012 * This range is up-to-date on local component, 2013 * so we update remote component. 2014 */ 2015 /* Remote component is 1 for now. */ 2016 ncomp = 1; 2017 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2018 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2019 /* 2020 * This range is out-of-date on local component, 2021 * so we update it. 2022 */ 2023 /* Local component is 0 for now. */ 2024 ncomp = 0; 2025 } 2026 mtx_unlock(&metadata_lock); 2027 2028 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2029 hio); 2030 refcount_init(&hio->hio_countdown, 1); 2031 QUEUE_INSERT1(hio, send, ncomp); 2032 2033 /* 2034 * Let's wait for WRITE to finish. 2035 */ 2036 mtx_lock(&sync_lock); 2037 while (!ISSYNCREQDONE(hio)) 2038 cv_wait(&sync_cond, &sync_lock); 2039 mtx_unlock(&sync_lock); 2040 2041 if (hio->hio_errors[ncomp] != 0) { 2042 pjdlog_error("Unable to write synchronization data: %s.", 2043 strerror(hio->hio_errors[ncomp])); 2044 goto free_queue; 2045 } 2046 2047 synced += length; 2048 free_queue: 2049 mtx_lock(&range_lock); 2050 rangelock_del(range_sync, offset, length); 2051 if (range_regular_wait) 2052 cv_signal(&range_regular_cond); 2053 mtx_unlock(&range_lock); 2054 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2055 hio); 2056 QUEUE_INSERT2(hio, free); 2057 } 2058 /* NOTREACHED */ 2059 return (NULL); 2060 } 2061 2062 void 2063 primary_config_reload(struct hast_resource *res, struct nv *nv) 2064 { 2065 unsigned int ii, ncomps; 2066 int modified, vint; 2067 const char *vstr; 2068 2069 pjdlog_info("Reloading configuration..."); 2070 2071 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2072 PJDLOG_ASSERT(gres == res); 2073 nv_assert(nv, "remoteaddr"); 2074 nv_assert(nv, "sourceaddr"); 2075 nv_assert(nv, "replication"); 2076 nv_assert(nv, "checksum"); 2077 nv_assert(nv, "compression"); 2078 nv_assert(nv, "timeout"); 2079 nv_assert(nv, "exec"); 2080 nv_assert(nv, "metaflush"); 2081 2082 ncomps = HAST_NCOMPONENTS; 2083 2084 #define MODIFIED_REMOTEADDR 0x01 2085 #define MODIFIED_SOURCEADDR 0x02 2086 #define MODIFIED_REPLICATION 0x04 2087 #define MODIFIED_CHECKSUM 0x08 2088 #define MODIFIED_COMPRESSION 0x10 2089 #define MODIFIED_TIMEOUT 0x20 2090 #define MODIFIED_EXEC 0x40 2091 #define MODIFIED_METAFLUSH 0x80 2092 modified = 0; 2093 2094 vstr = nv_get_string(nv, "remoteaddr"); 2095 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2096 /* 2097 * Don't copy res->hr_remoteaddr to gres just yet. 2098 * We want remote_close() to log disconnect from the old 2099 * addresses, not from the new ones. 2100 */ 2101 modified |= MODIFIED_REMOTEADDR; 2102 } 2103 vstr = nv_get_string(nv, "sourceaddr"); 2104 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2105 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2106 modified |= MODIFIED_SOURCEADDR; 2107 } 2108 vint = nv_get_int32(nv, "replication"); 2109 if (gres->hr_replication != vint) { 2110 gres->hr_replication = vint; 2111 modified |= MODIFIED_REPLICATION; 2112 } 2113 vint = nv_get_int32(nv, "checksum"); 2114 if (gres->hr_checksum != vint) { 2115 gres->hr_checksum = vint; 2116 modified |= MODIFIED_CHECKSUM; 2117 } 2118 vint = nv_get_int32(nv, "compression"); 2119 if (gres->hr_compression != vint) { 2120 gres->hr_compression = vint; 2121 modified |= MODIFIED_COMPRESSION; 2122 } 2123 vint = nv_get_int32(nv, "timeout"); 2124 if (gres->hr_timeout != vint) { 2125 gres->hr_timeout = vint; 2126 modified |= MODIFIED_TIMEOUT; 2127 } 2128 vstr = nv_get_string(nv, "exec"); 2129 if (strcmp(gres->hr_exec, vstr) != 0) { 2130 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2131 modified |= MODIFIED_EXEC; 2132 } 2133 vint = nv_get_int32(nv, "metaflush"); 2134 if (gres->hr_metaflush != vint) { 2135 gres->hr_metaflush = vint; 2136 modified |= MODIFIED_METAFLUSH; 2137 } 2138 2139 /* 2140 * Change timeout for connected sockets. 2141 * Don't bother if we need to reconnect. 2142 */ 2143 if ((modified & MODIFIED_TIMEOUT) != 0 && 2144 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2145 for (ii = 0; ii < ncomps; ii++) { 2146 if (!ISREMOTE(ii)) 2147 continue; 2148 rw_rlock(&hio_remote_lock[ii]); 2149 if (!ISCONNECTED(gres, ii)) { 2150 rw_unlock(&hio_remote_lock[ii]); 2151 continue; 2152 } 2153 rw_unlock(&hio_remote_lock[ii]); 2154 if (proto_timeout(gres->hr_remotein, 2155 gres->hr_timeout) == -1) { 2156 pjdlog_errno(LOG_WARNING, 2157 "Unable to set connection timeout"); 2158 } 2159 if (proto_timeout(gres->hr_remoteout, 2160 gres->hr_timeout) == -1) { 2161 pjdlog_errno(LOG_WARNING, 2162 "Unable to set connection timeout"); 2163 } 2164 } 2165 } 2166 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2167 for (ii = 0; ii < ncomps; ii++) { 2168 if (!ISREMOTE(ii)) 2169 continue; 2170 remote_close(gres, ii); 2171 } 2172 if (modified & MODIFIED_REMOTEADDR) { 2173 vstr = nv_get_string(nv, "remoteaddr"); 2174 strlcpy(gres->hr_remoteaddr, vstr, 2175 sizeof(gres->hr_remoteaddr)); 2176 } 2177 } 2178 #undef MODIFIED_REMOTEADDR 2179 #undef MODIFIED_SOURCEADDR 2180 #undef MODIFIED_REPLICATION 2181 #undef MODIFIED_CHECKSUM 2182 #undef MODIFIED_COMPRESSION 2183 #undef MODIFIED_TIMEOUT 2184 #undef MODIFIED_EXEC 2185 #undef MODIFIED_METAFLUSH 2186 2187 pjdlog_info("Configuration reloaded successfully."); 2188 } 2189 2190 static void 2191 guard_one(struct hast_resource *res, unsigned int ncomp) 2192 { 2193 struct proto_conn *in, *out; 2194 2195 if (!ISREMOTE(ncomp)) 2196 return; 2197 2198 rw_rlock(&hio_remote_lock[ncomp]); 2199 2200 if (!real_remote(res)) { 2201 rw_unlock(&hio_remote_lock[ncomp]); 2202 return; 2203 } 2204 2205 if (ISCONNECTED(res, ncomp)) { 2206 PJDLOG_ASSERT(res->hr_remotein != NULL); 2207 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2208 rw_unlock(&hio_remote_lock[ncomp]); 2209 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2210 res->hr_remoteaddr); 2211 return; 2212 } 2213 2214 PJDLOG_ASSERT(res->hr_remotein == NULL); 2215 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2216 /* 2217 * Upgrade the lock. It doesn't have to be atomic as no other thread 2218 * can change connection status from disconnected to connected. 2219 */ 2220 rw_unlock(&hio_remote_lock[ncomp]); 2221 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2222 res->hr_remoteaddr); 2223 in = out = NULL; 2224 if (init_remote(res, &in, &out) == 0) { 2225 rw_wlock(&hio_remote_lock[ncomp]); 2226 PJDLOG_ASSERT(res->hr_remotein == NULL); 2227 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2228 PJDLOG_ASSERT(in != NULL && out != NULL); 2229 res->hr_remotein = in; 2230 res->hr_remoteout = out; 2231 rw_unlock(&hio_remote_lock[ncomp]); 2232 pjdlog_info("Successfully reconnected to %s.", 2233 res->hr_remoteaddr); 2234 sync_start(); 2235 } else { 2236 /* Both connections should be NULL. */ 2237 PJDLOG_ASSERT(res->hr_remotein == NULL); 2238 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2239 PJDLOG_ASSERT(in == NULL && out == NULL); 2240 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2241 res->hr_remoteaddr); 2242 } 2243 } 2244 2245 /* 2246 * Thread guards remote connections and reconnects when needed, handles 2247 * signals, etc. 2248 */ 2249 static void * 2250 guard_thread(void *arg) 2251 { 2252 struct hast_resource *res = arg; 2253 unsigned int ii, ncomps; 2254 struct timespec timeout; 2255 time_t lastcheck, now; 2256 sigset_t mask; 2257 int signo; 2258 2259 ncomps = HAST_NCOMPONENTS; 2260 lastcheck = time(NULL); 2261 2262 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2263 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2264 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2265 2266 timeout.tv_sec = HAST_KEEPALIVE; 2267 timeout.tv_nsec = 0; 2268 signo = -1; 2269 2270 for (;;) { 2271 switch (signo) { 2272 case SIGINT: 2273 case SIGTERM: 2274 sigexit_received = true; 2275 primary_exitx(EX_OK, 2276 "Termination signal received, exiting."); 2277 break; 2278 default: 2279 break; 2280 } 2281 2282 /* 2283 * Don't check connections until we fully started, 2284 * as we may still be looping, waiting for remote node 2285 * to switch from primary to secondary. 2286 */ 2287 if (fullystarted) { 2288 pjdlog_debug(2, "remote_guard: Checking connections."); 2289 now = time(NULL); 2290 if (lastcheck + HAST_KEEPALIVE <= now) { 2291 for (ii = 0; ii < ncomps; ii++) 2292 guard_one(res, ii); 2293 lastcheck = now; 2294 } 2295 } 2296 signo = sigtimedwait(&mask, NULL, &timeout); 2297 } 2298 /* NOTREACHED */ 2299 return (NULL); 2300 } 2301