1 /*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/types.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/stat.h> 39 40 #include <geom/gate/g_gate.h> 41 42 #include <err.h> 43 #include <errno.h> 44 #include <fcntl.h> 45 #include <libgeom.h> 46 #include <pthread.h> 47 #include <signal.h> 48 #include <stdint.h> 49 #include <stdio.h> 50 #include <string.h> 51 #include <sysexits.h> 52 #include <unistd.h> 53 54 #include <activemap.h> 55 #include <nv.h> 56 #include <rangelock.h> 57 58 #include "control.h" 59 #include "event.h" 60 #include "hast.h" 61 #include "hast_proto.h" 62 #include "hastd.h" 63 #include "hooks.h" 64 #include "metadata.h" 65 #include "proto.h" 66 #include "pjdlog.h" 67 #include "refcnt.h" 68 #include "subr.h" 69 #include "synch.h" 70 71 /* The is only one remote component for now. */ 72 #define ISREMOTE(no) ((no) == 1) 73 74 struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 refcnt_t hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 /* 93 * Request was already confirmed to GEOM Gate. 94 */ 95 bool hio_done; 96 /* 97 * Remember replication from the time the request was initiated, 98 * so we won't get confused when replication changes on reload. 99 */ 100 int hio_replication; 101 TAILQ_ENTRY(hio) *hio_next; 102 }; 103 #define hio_free_next hio_next[0] 104 #define hio_done_next hio_next[0] 105 106 /* 107 * Free list holds unused structures. When free list is empty, we have to wait 108 * until some in-progress requests are freed. 109 */ 110 static TAILQ_HEAD(, hio) hio_free_list; 111 static size_t hio_free_list_size; 112 static pthread_mutex_t hio_free_list_lock; 113 static pthread_cond_t hio_free_list_cond; 114 /* 115 * There is one send list for every component. One requests is placed on all 116 * send lists - each component gets the same request, but each component is 117 * responsible for managing his own send list. 118 */ 119 static TAILQ_HEAD(, hio) *hio_send_list; 120 static size_t *hio_send_list_size; 121 static pthread_mutex_t *hio_send_list_lock; 122 static pthread_cond_t *hio_send_list_cond; 123 #define hio_send_local_list_size hio_send_list_size[0] 124 #define hio_send_remote_list_size hio_send_list_size[1] 125 /* 126 * There is one recv list for every component, although local components don't 127 * use recv lists as local requests are done synchronously. 128 */ 129 static TAILQ_HEAD(, hio) *hio_recv_list; 130 static size_t *hio_recv_list_size; 131 static pthread_mutex_t *hio_recv_list_lock; 132 static pthread_cond_t *hio_recv_list_cond; 133 #define hio_recv_remote_list_size hio_recv_list_size[1] 134 /* 135 * Request is placed on done list by the slowest component (the one that 136 * decreased hio_countdown from 1 to 0). 137 */ 138 static TAILQ_HEAD(, hio) hio_done_list; 139 static size_t hio_done_list_size; 140 static pthread_mutex_t hio_done_list_lock; 141 static pthread_cond_t hio_done_list_cond; 142 /* 143 * Structure below are for interaction with sync thread. 144 */ 145 static bool sync_inprogress; 146 static pthread_mutex_t sync_lock; 147 static pthread_cond_t sync_cond; 148 /* 149 * The lock below allows to synchornize access to remote connections. 150 */ 151 static pthread_rwlock_t *hio_remote_lock; 152 153 /* 154 * Lock to synchronize metadata updates. Also synchronize access to 155 * hr_primary_localcnt and hr_primary_remotecnt fields. 156 */ 157 static pthread_mutex_t metadata_lock; 158 159 /* 160 * Maximum number of outstanding I/O requests. 161 */ 162 #define HAST_HIO_MAX 256 163 /* 164 * Number of components. At this point there are only two components: local 165 * and remote, but in the future it might be possible to use multiple local 166 * and remote components. 167 */ 168 #define HAST_NCOMPONENTS 2 169 170 #define ISCONNECTED(res, no) \ 171 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 172 173 #define QUEUE_INSERT1(hio, name, ncomp) do { \ 174 bool _wakeup; \ 175 \ 176 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 177 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 178 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 179 hio_next[(ncomp)]); \ 180 hio_##name##_list_size[(ncomp)]++; \ 181 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 182 if (_wakeup) \ 183 cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \ 184 } while (0) 185 #define QUEUE_INSERT2(hio, name) do { \ 186 bool _wakeup; \ 187 \ 188 mtx_lock(&hio_##name##_list_lock); \ 189 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 190 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 191 hio_##name##_list_size++; \ 192 mtx_unlock(&hio_##name##_list_lock); \ 193 if (_wakeup) \ 194 cv_broadcast(&hio_##name##_list_cond); \ 195 } while (0) 196 #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 197 bool _last; \ 198 \ 199 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 200 _last = false; \ 201 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 202 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 203 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 204 if ((timeout) != 0) \ 205 _last = true; \ 206 } \ 207 if (hio != NULL) { \ 208 PJDLOG_ASSERT(hio_##name##_list_size[(ncomp)] != 0); \ 209 hio_##name##_list_size[(ncomp)]--; \ 210 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 211 hio_next[(ncomp)]); \ 212 } \ 213 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 214 } while (0) 215 #define QUEUE_TAKE2(hio, name) do { \ 216 mtx_lock(&hio_##name##_list_lock); \ 217 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 218 cv_wait(&hio_##name##_list_cond, \ 219 &hio_##name##_list_lock); \ 220 } \ 221 PJDLOG_ASSERT(hio_##name##_list_size != 0); \ 222 hio_##name##_list_size--; \ 223 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 224 mtx_unlock(&hio_##name##_list_lock); \ 225 } while (0) 226 227 #define SYNCREQ(hio) do { \ 228 (hio)->hio_ggio.gctl_unit = -1; \ 229 (hio)->hio_ggio.gctl_seq = 1; \ 230 } while (0) 231 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 232 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 233 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 234 235 static struct hast_resource *gres; 236 237 static pthread_mutex_t range_lock; 238 static struct rangelocks *range_regular; 239 static bool range_regular_wait; 240 static pthread_cond_t range_regular_cond; 241 static struct rangelocks *range_sync; 242 static bool range_sync_wait; 243 static pthread_cond_t range_sync_cond; 244 static bool fullystarted; 245 246 static void *ggate_recv_thread(void *arg); 247 static void *local_send_thread(void *arg); 248 static void *remote_send_thread(void *arg); 249 static void *remote_recv_thread(void *arg); 250 static void *ggate_send_thread(void *arg); 251 static void *sync_thread(void *arg); 252 static void *guard_thread(void *arg); 253 254 static void 255 output_status_aux(struct nv *nvout) 256 { 257 258 nv_add_uint64(nvout, (uint64_t)hio_free_list_size, 259 "idle_queue_size"); 260 nv_add_uint64(nvout, (uint64_t)hio_send_local_list_size, 261 "local_queue_size"); 262 nv_add_uint64(nvout, (uint64_t)hio_send_remote_list_size, 263 "send_queue_size"); 264 nv_add_uint64(nvout, (uint64_t)hio_recv_remote_list_size, 265 "recv_queue_size"); 266 nv_add_uint64(nvout, (uint64_t)hio_done_list_size, 267 "done_queue_size"); 268 } 269 270 static void 271 cleanup(struct hast_resource *res) 272 { 273 int rerrno; 274 275 /* Remember errno. */ 276 rerrno = errno; 277 278 /* Destroy ggate provider if we created one. */ 279 if (res->hr_ggateunit >= 0) { 280 struct g_gate_ctl_destroy ggiod; 281 282 bzero(&ggiod, sizeof(ggiod)); 283 ggiod.gctl_version = G_GATE_VERSION; 284 ggiod.gctl_unit = res->hr_ggateunit; 285 ggiod.gctl_force = 1; 286 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 287 pjdlog_errno(LOG_WARNING, 288 "Unable to destroy hast/%s device", 289 res->hr_provname); 290 } 291 res->hr_ggateunit = -1; 292 } 293 294 /* Restore errno. */ 295 errno = rerrno; 296 } 297 298 static __dead2 void 299 primary_exit(int exitcode, const char *fmt, ...) 300 { 301 va_list ap; 302 303 PJDLOG_ASSERT(exitcode != EX_OK); 304 va_start(ap, fmt); 305 pjdlogv_errno(LOG_ERR, fmt, ap); 306 va_end(ap); 307 cleanup(gres); 308 exit(exitcode); 309 } 310 311 static __dead2 void 312 primary_exitx(int exitcode, const char *fmt, ...) 313 { 314 va_list ap; 315 316 va_start(ap, fmt); 317 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 318 va_end(ap); 319 cleanup(gres); 320 exit(exitcode); 321 } 322 323 /* Expects res->hr_amp locked, returns unlocked. */ 324 static int 325 hast_activemap_flush(struct hast_resource *res) 326 { 327 const unsigned char *buf; 328 size_t size; 329 int ret; 330 331 mtx_lock(&res->hr_amp_diskmap_lock); 332 buf = activemap_bitmap(res->hr_amp, &size); 333 mtx_unlock(&res->hr_amp_lock); 334 PJDLOG_ASSERT(buf != NULL); 335 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 336 ret = 0; 337 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 338 (ssize_t)size) { 339 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 340 res->hr_stat_activemap_write_error++; 341 ret = -1; 342 } 343 if (ret == 0 && res->hr_metaflush == 1 && 344 g_flush(res->hr_localfd) == -1) { 345 if (errno == EOPNOTSUPP) { 346 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 347 res->hr_localpath); 348 res->hr_metaflush = 0; 349 } else { 350 pjdlog_errno(LOG_ERR, 351 "Unable to flush disk cache on activemap update"); 352 res->hr_stat_activemap_flush_error++; 353 ret = -1; 354 } 355 } 356 mtx_unlock(&res->hr_amp_diskmap_lock); 357 return (ret); 358 } 359 360 static bool 361 real_remote(const struct hast_resource *res) 362 { 363 364 return (strcmp(res->hr_remoteaddr, "none") != 0); 365 } 366 367 static void 368 init_environment(struct hast_resource *res __unused) 369 { 370 struct hio *hio; 371 unsigned int ii, ncomps; 372 373 /* 374 * In the future it might be per-resource value. 375 */ 376 ncomps = HAST_NCOMPONENTS; 377 378 /* 379 * Allocate memory needed by lists. 380 */ 381 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 382 if (hio_send_list == NULL) { 383 primary_exitx(EX_TEMPFAIL, 384 "Unable to allocate %zu bytes of memory for send lists.", 385 sizeof(hio_send_list[0]) * ncomps); 386 } 387 hio_send_list_size = malloc(sizeof(hio_send_list_size[0]) * ncomps); 388 if (hio_send_list_size == NULL) { 389 primary_exitx(EX_TEMPFAIL, 390 "Unable to allocate %zu bytes of memory for send list counters.", 391 sizeof(hio_send_list_size[0]) * ncomps); 392 } 393 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 394 if (hio_send_list_lock == NULL) { 395 primary_exitx(EX_TEMPFAIL, 396 "Unable to allocate %zu bytes of memory for send list locks.", 397 sizeof(hio_send_list_lock[0]) * ncomps); 398 } 399 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 400 if (hio_send_list_cond == NULL) { 401 primary_exitx(EX_TEMPFAIL, 402 "Unable to allocate %zu bytes of memory for send list condition variables.", 403 sizeof(hio_send_list_cond[0]) * ncomps); 404 } 405 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 406 if (hio_recv_list == NULL) { 407 primary_exitx(EX_TEMPFAIL, 408 "Unable to allocate %zu bytes of memory for recv lists.", 409 sizeof(hio_recv_list[0]) * ncomps); 410 } 411 hio_recv_list_size = malloc(sizeof(hio_recv_list_size[0]) * ncomps); 412 if (hio_recv_list_size == NULL) { 413 primary_exitx(EX_TEMPFAIL, 414 "Unable to allocate %zu bytes of memory for recv list counters.", 415 sizeof(hio_recv_list_size[0]) * ncomps); 416 } 417 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 418 if (hio_recv_list_lock == NULL) { 419 primary_exitx(EX_TEMPFAIL, 420 "Unable to allocate %zu bytes of memory for recv list locks.", 421 sizeof(hio_recv_list_lock[0]) * ncomps); 422 } 423 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 424 if (hio_recv_list_cond == NULL) { 425 primary_exitx(EX_TEMPFAIL, 426 "Unable to allocate %zu bytes of memory for recv list condition variables.", 427 sizeof(hio_recv_list_cond[0]) * ncomps); 428 } 429 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 430 if (hio_remote_lock == NULL) { 431 primary_exitx(EX_TEMPFAIL, 432 "Unable to allocate %zu bytes of memory for remote connections locks.", 433 sizeof(hio_remote_lock[0]) * ncomps); 434 } 435 436 /* 437 * Initialize lists, their counters, locks and condition variables. 438 */ 439 TAILQ_INIT(&hio_free_list); 440 mtx_init(&hio_free_list_lock); 441 cv_init(&hio_free_list_cond); 442 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 443 TAILQ_INIT(&hio_send_list[ii]); 444 hio_send_list_size[ii] = 0; 445 mtx_init(&hio_send_list_lock[ii]); 446 cv_init(&hio_send_list_cond[ii]); 447 TAILQ_INIT(&hio_recv_list[ii]); 448 hio_recv_list_size[ii] = 0; 449 mtx_init(&hio_recv_list_lock[ii]); 450 cv_init(&hio_recv_list_cond[ii]); 451 rw_init(&hio_remote_lock[ii]); 452 } 453 TAILQ_INIT(&hio_done_list); 454 mtx_init(&hio_done_list_lock); 455 cv_init(&hio_done_list_cond); 456 mtx_init(&metadata_lock); 457 458 /* 459 * Allocate requests pool and initialize requests. 460 */ 461 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 462 hio = malloc(sizeof(*hio)); 463 if (hio == NULL) { 464 primary_exitx(EX_TEMPFAIL, 465 "Unable to allocate %zu bytes of memory for hio request.", 466 sizeof(*hio)); 467 } 468 refcnt_init(&hio->hio_countdown, 0); 469 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 470 if (hio->hio_errors == NULL) { 471 primary_exitx(EX_TEMPFAIL, 472 "Unable allocate %zu bytes of memory for hio errors.", 473 sizeof(hio->hio_errors[0]) * ncomps); 474 } 475 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 476 if (hio->hio_next == NULL) { 477 primary_exitx(EX_TEMPFAIL, 478 "Unable allocate %zu bytes of memory for hio_next field.", 479 sizeof(hio->hio_next[0]) * ncomps); 480 } 481 hio->hio_ggio.gctl_version = G_GATE_VERSION; 482 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 483 if (hio->hio_ggio.gctl_data == NULL) { 484 primary_exitx(EX_TEMPFAIL, 485 "Unable to allocate %zu bytes of memory for gctl_data.", 486 MAXPHYS); 487 } 488 hio->hio_ggio.gctl_length = MAXPHYS; 489 hio->hio_ggio.gctl_error = 0; 490 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 491 hio_free_list_size++; 492 } 493 } 494 495 static bool 496 init_resuid(struct hast_resource *res) 497 { 498 499 mtx_lock(&metadata_lock); 500 if (res->hr_resuid != 0) { 501 mtx_unlock(&metadata_lock); 502 return (false); 503 } else { 504 /* Initialize unique resource identifier. */ 505 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 506 mtx_unlock(&metadata_lock); 507 if (metadata_write(res) == -1) 508 exit(EX_NOINPUT); 509 return (true); 510 } 511 } 512 513 static void 514 init_local(struct hast_resource *res) 515 { 516 unsigned char *buf; 517 size_t mapsize; 518 519 if (metadata_read(res, true) == -1) 520 exit(EX_NOINPUT); 521 mtx_init(&res->hr_amp_lock); 522 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 523 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 524 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 525 } 526 mtx_init(&range_lock); 527 cv_init(&range_regular_cond); 528 if (rangelock_init(&range_regular) == -1) 529 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 530 cv_init(&range_sync_cond); 531 if (rangelock_init(&range_sync) == -1) 532 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 533 mapsize = activemap_ondisk_size(res->hr_amp); 534 buf = calloc(1, mapsize); 535 if (buf == NULL) { 536 primary_exitx(EX_TEMPFAIL, 537 "Unable to allocate buffer for activemap."); 538 } 539 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 540 (ssize_t)mapsize) { 541 primary_exit(EX_NOINPUT, "Unable to read activemap"); 542 } 543 activemap_copyin(res->hr_amp, buf, mapsize); 544 free(buf); 545 if (res->hr_resuid != 0) 546 return; 547 /* 548 * We're using provider for the first time. Initialize local and remote 549 * counters. We don't initialize resuid here, as we want to do it just 550 * in time. The reason for this is that we want to inform secondary 551 * that there were no writes yet, so there is no need to synchronize 552 * anything. 553 */ 554 res->hr_primary_localcnt = 0; 555 res->hr_primary_remotecnt = 0; 556 if (metadata_write(res) == -1) 557 exit(EX_NOINPUT); 558 } 559 560 static int 561 primary_connect(struct hast_resource *res, struct proto_conn **connp) 562 { 563 struct proto_conn *conn; 564 int16_t val; 565 566 val = 1; 567 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 568 primary_exit(EX_TEMPFAIL, 569 "Unable to send connection request to parent"); 570 } 571 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 572 primary_exit(EX_TEMPFAIL, 573 "Unable to receive reply to connection request from parent"); 574 } 575 if (val != 0) { 576 errno = val; 577 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 578 res->hr_remoteaddr); 579 return (-1); 580 } 581 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 582 primary_exit(EX_TEMPFAIL, 583 "Unable to receive connection from parent"); 584 } 585 if (proto_connect_wait(conn, res->hr_timeout) == -1) { 586 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 587 res->hr_remoteaddr); 588 proto_close(conn); 589 return (-1); 590 } 591 /* Error in setting timeout is not critical, but why should it fail? */ 592 if (proto_timeout(conn, res->hr_timeout) == -1) 593 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 594 595 *connp = conn; 596 597 return (0); 598 } 599 600 /* 601 * Function instructs GEOM_GATE to handle reads directly from within the kernel. 602 */ 603 static void 604 enable_direct_reads(struct hast_resource *res) 605 { 606 struct g_gate_ctl_modify ggiomodify; 607 608 bzero(&ggiomodify, sizeof(ggiomodify)); 609 ggiomodify.gctl_version = G_GATE_VERSION; 610 ggiomodify.gctl_unit = res->hr_ggateunit; 611 ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; 612 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, 613 sizeof(ggiomodify.gctl_readprov)); 614 ggiomodify.gctl_readoffset = res->hr_localoff; 615 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) 616 pjdlog_debug(1, "Direct reads enabled."); 617 else 618 pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); 619 } 620 621 static int 622 init_remote(struct hast_resource *res, struct proto_conn **inp, 623 struct proto_conn **outp) 624 { 625 struct proto_conn *in, *out; 626 struct nv *nvout, *nvin; 627 const unsigned char *token; 628 unsigned char *map; 629 const char *errmsg; 630 int32_t extentsize; 631 int64_t datasize; 632 uint32_t mapsize; 633 uint8_t version; 634 size_t size; 635 int error; 636 637 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 638 PJDLOG_ASSERT(real_remote(res)); 639 640 in = out = NULL; 641 errmsg = NULL; 642 643 if (primary_connect(res, &out) == -1) 644 return (ECONNREFUSED); 645 646 error = ECONNABORTED; 647 648 /* 649 * First handshake step. 650 * Setup outgoing connection with remote node. 651 */ 652 nvout = nv_alloc(); 653 nv_add_string(nvout, res->hr_name, "resource"); 654 nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); 655 if (nv_error(nvout) != 0) { 656 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 657 "Unable to allocate header for connection with %s", 658 res->hr_remoteaddr); 659 nv_free(nvout); 660 goto close; 661 } 662 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 663 pjdlog_errno(LOG_WARNING, 664 "Unable to send handshake header to %s", 665 res->hr_remoteaddr); 666 nv_free(nvout); 667 goto close; 668 } 669 nv_free(nvout); 670 if (hast_proto_recv_hdr(out, &nvin) == -1) { 671 pjdlog_errno(LOG_WARNING, 672 "Unable to receive handshake header from %s", 673 res->hr_remoteaddr); 674 goto close; 675 } 676 errmsg = nv_get_string(nvin, "errmsg"); 677 if (errmsg != NULL) { 678 pjdlog_warning("%s", errmsg); 679 if (nv_exists(nvin, "wait")) 680 error = EBUSY; 681 nv_free(nvin); 682 goto close; 683 } 684 version = nv_get_uint8(nvin, "version"); 685 if (version == 0) { 686 /* 687 * If no version is sent, it means this is protocol version 1. 688 */ 689 version = 1; 690 } 691 if (version > HAST_PROTO_VERSION) { 692 pjdlog_warning("Invalid version received (%hhu).", version); 693 nv_free(nvin); 694 goto close; 695 } 696 res->hr_version = version; 697 pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); 698 token = nv_get_uint8_array(nvin, &size, "token"); 699 if (token == NULL) { 700 pjdlog_warning("Handshake header from %s has no 'token' field.", 701 res->hr_remoteaddr); 702 nv_free(nvin); 703 goto close; 704 } 705 if (size != sizeof(res->hr_token)) { 706 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 707 res->hr_remoteaddr, size, sizeof(res->hr_token)); 708 nv_free(nvin); 709 goto close; 710 } 711 bcopy(token, res->hr_token, sizeof(res->hr_token)); 712 nv_free(nvin); 713 714 /* 715 * Second handshake step. 716 * Setup incoming connection with remote node. 717 */ 718 if (primary_connect(res, &in) == -1) 719 goto close; 720 721 nvout = nv_alloc(); 722 nv_add_string(nvout, res->hr_name, "resource"); 723 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 724 "token"); 725 if (res->hr_resuid == 0) { 726 /* 727 * The resuid field was not yet initialized. 728 * Because we do synchronization inside init_resuid(), it is 729 * possible that someone already initialized it, the function 730 * will return false then, but if we successfully initialized 731 * it, we will get true. True means that there were no writes 732 * to this resource yet and we want to inform secondary that 733 * synchronization is not needed by sending "virgin" argument. 734 */ 735 if (init_resuid(res)) 736 nv_add_int8(nvout, 1, "virgin"); 737 } 738 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 739 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 740 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 741 if (nv_error(nvout) != 0) { 742 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 743 "Unable to allocate header for connection with %s", 744 res->hr_remoteaddr); 745 nv_free(nvout); 746 goto close; 747 } 748 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 749 pjdlog_errno(LOG_WARNING, 750 "Unable to send handshake header to %s", 751 res->hr_remoteaddr); 752 nv_free(nvout); 753 goto close; 754 } 755 nv_free(nvout); 756 if (hast_proto_recv_hdr(out, &nvin) == -1) { 757 pjdlog_errno(LOG_WARNING, 758 "Unable to receive handshake header from %s", 759 res->hr_remoteaddr); 760 goto close; 761 } 762 errmsg = nv_get_string(nvin, "errmsg"); 763 if (errmsg != NULL) { 764 pjdlog_warning("%s", errmsg); 765 nv_free(nvin); 766 goto close; 767 } 768 datasize = nv_get_int64(nvin, "datasize"); 769 if (datasize != res->hr_datasize) { 770 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 771 (intmax_t)res->hr_datasize, (intmax_t)datasize); 772 nv_free(nvin); 773 goto close; 774 } 775 extentsize = nv_get_int32(nvin, "extentsize"); 776 if (extentsize != res->hr_extentsize) { 777 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 778 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 779 nv_free(nvin); 780 goto close; 781 } 782 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 783 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 784 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 785 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) 786 enable_direct_reads(res); 787 if (nv_exists(nvin, "virgin")) { 788 /* 789 * Secondary was reinitialized, bump localcnt if it is 0 as 790 * only we have the data. 791 */ 792 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 793 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 794 795 if (res->hr_primary_localcnt == 0) { 796 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 797 798 mtx_lock(&metadata_lock); 799 res->hr_primary_localcnt++; 800 pjdlog_debug(1, "Increasing localcnt to %ju.", 801 (uintmax_t)res->hr_primary_localcnt); 802 (void)metadata_write(res); 803 mtx_unlock(&metadata_lock); 804 } 805 } 806 map = NULL; 807 mapsize = nv_get_uint32(nvin, "mapsize"); 808 if (mapsize > 0) { 809 map = malloc(mapsize); 810 if (map == NULL) { 811 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 812 (uintmax_t)mapsize); 813 nv_free(nvin); 814 goto close; 815 } 816 /* 817 * Remote node have some dirty extents on its own, lets 818 * download its activemap. 819 */ 820 if (hast_proto_recv_data(res, out, nvin, map, 821 mapsize) == -1) { 822 pjdlog_errno(LOG_ERR, 823 "Unable to receive remote activemap"); 824 nv_free(nvin); 825 free(map); 826 goto close; 827 } 828 mtx_lock(&res->hr_amp_lock); 829 /* 830 * Merge local and remote bitmaps. 831 */ 832 activemap_merge(res->hr_amp, map, mapsize); 833 free(map); 834 /* 835 * Now that we merged bitmaps from both nodes, flush it to the 836 * disk before we start to synchronize. 837 */ 838 (void)hast_activemap_flush(res); 839 } 840 nv_free(nvin); 841 #ifdef notyet 842 /* Setup directions. */ 843 if (proto_send(out, NULL, 0) == -1) 844 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 845 if (proto_recv(in, NULL, 0) == -1) 846 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 847 #endif 848 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 849 if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && 850 res->hr_version < 2) { 851 pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); 852 res->hr_replication = HAST_REPLICATION_FULLSYNC; 853 } else if (res->hr_replication != res->hr_original_replication) { 854 /* 855 * This is in case hastd disconnected and was upgraded. 856 */ 857 res->hr_replication = res->hr_original_replication; 858 } 859 if (inp != NULL && outp != NULL) { 860 *inp = in; 861 *outp = out; 862 } else { 863 res->hr_remotein = in; 864 res->hr_remoteout = out; 865 } 866 event_send(res, EVENT_CONNECT); 867 return (0); 868 close: 869 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 870 event_send(res, EVENT_SPLITBRAIN); 871 proto_close(out); 872 if (in != NULL) 873 proto_close(in); 874 return (error); 875 } 876 877 static void 878 sync_start(void) 879 { 880 881 mtx_lock(&sync_lock); 882 sync_inprogress = true; 883 mtx_unlock(&sync_lock); 884 cv_signal(&sync_cond); 885 } 886 887 static void 888 sync_stop(void) 889 { 890 891 mtx_lock(&sync_lock); 892 if (sync_inprogress) 893 sync_inprogress = false; 894 mtx_unlock(&sync_lock); 895 } 896 897 static void 898 init_ggate(struct hast_resource *res) 899 { 900 struct g_gate_ctl_create ggiocreate; 901 struct g_gate_ctl_cancel ggiocancel; 902 903 /* 904 * We communicate with ggate via /dev/ggctl. Open it. 905 */ 906 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 907 if (res->hr_ggatefd == -1) 908 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 909 /* 910 * Create provider before trying to connect, as connection failure 911 * is not critical, but may take some time. 912 */ 913 bzero(&ggiocreate, sizeof(ggiocreate)); 914 ggiocreate.gctl_version = G_GATE_VERSION; 915 ggiocreate.gctl_mediasize = res->hr_datasize; 916 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 917 ggiocreate.gctl_flags = 0; 918 ggiocreate.gctl_maxcount = 0; 919 ggiocreate.gctl_timeout = 0; 920 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 921 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 922 res->hr_provname); 923 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 924 pjdlog_info("Device hast/%s created.", res->hr_provname); 925 res->hr_ggateunit = ggiocreate.gctl_unit; 926 return; 927 } 928 if (errno != EEXIST) { 929 primary_exit(EX_OSERR, "Unable to create hast/%s device", 930 res->hr_provname); 931 } 932 pjdlog_debug(1, 933 "Device hast/%s already exists, we will try to take it over.", 934 res->hr_provname); 935 /* 936 * If we received EEXIST, we assume that the process who created the 937 * provider died and didn't clean up. In that case we will start from 938 * where he left of. 939 */ 940 bzero(&ggiocancel, sizeof(ggiocancel)); 941 ggiocancel.gctl_version = G_GATE_VERSION; 942 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 943 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 944 res->hr_provname); 945 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 946 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 947 res->hr_ggateunit = ggiocancel.gctl_unit; 948 return; 949 } 950 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 951 res->hr_provname); 952 } 953 954 void 955 hastd_primary(struct hast_resource *res) 956 { 957 pthread_t td; 958 pid_t pid; 959 int error, mode, debuglevel; 960 961 /* 962 * Create communication channel for sending control commands from 963 * parent to child. 964 */ 965 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 966 /* TODO: There's no need for this to be fatal error. */ 967 KEEP_ERRNO((void)pidfile_remove(pfh)); 968 pjdlog_exit(EX_OSERR, 969 "Unable to create control sockets between parent and child"); 970 } 971 /* 972 * Create communication channel for sending events from child to parent. 973 */ 974 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 975 /* TODO: There's no need for this to be fatal error. */ 976 KEEP_ERRNO((void)pidfile_remove(pfh)); 977 pjdlog_exit(EX_OSERR, 978 "Unable to create event sockets between child and parent"); 979 } 980 /* 981 * Create communication channel for sending connection requests from 982 * child to parent. 983 */ 984 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 985 /* TODO: There's no need for this to be fatal error. */ 986 KEEP_ERRNO((void)pidfile_remove(pfh)); 987 pjdlog_exit(EX_OSERR, 988 "Unable to create connection sockets between child and parent"); 989 } 990 991 pid = fork(); 992 if (pid == -1) { 993 /* TODO: There's no need for this to be fatal error. */ 994 KEEP_ERRNO((void)pidfile_remove(pfh)); 995 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 996 } 997 998 if (pid > 0) { 999 /* This is parent. */ 1000 /* Declare that we are receiver. */ 1001 proto_recv(res->hr_event, NULL, 0); 1002 proto_recv(res->hr_conn, NULL, 0); 1003 /* Declare that we are sender. */ 1004 proto_send(res->hr_ctrl, NULL, 0); 1005 res->hr_workerpid = pid; 1006 return; 1007 } 1008 1009 gres = res; 1010 res->output_status_aux = output_status_aux; 1011 mode = pjdlog_mode_get(); 1012 debuglevel = pjdlog_debug_get(); 1013 1014 /* Declare that we are sender. */ 1015 proto_send(res->hr_event, NULL, 0); 1016 proto_send(res->hr_conn, NULL, 0); 1017 /* Declare that we are receiver. */ 1018 proto_recv(res->hr_ctrl, NULL, 0); 1019 descriptors_cleanup(res); 1020 1021 descriptors_assert(res, mode); 1022 1023 pjdlog_init(mode); 1024 pjdlog_debug_set(debuglevel); 1025 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 1026 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 1027 1028 init_local(res); 1029 init_ggate(res); 1030 init_environment(res); 1031 1032 if (drop_privs(res) != 0) { 1033 cleanup(res); 1034 exit(EX_CONFIG); 1035 } 1036 pjdlog_info("Privileges successfully dropped."); 1037 1038 /* 1039 * Create the guard thread first, so we can handle signals from the 1040 * very beginning. 1041 */ 1042 error = pthread_create(&td, NULL, guard_thread, res); 1043 PJDLOG_ASSERT(error == 0); 1044 /* 1045 * Create the control thread before sending any event to the parent, 1046 * as we can deadlock when parent sends control request to worker, 1047 * but worker has no control thread started yet, so parent waits. 1048 * In the meantime worker sends an event to the parent, but parent 1049 * is unable to handle the event, because it waits for control 1050 * request response. 1051 */ 1052 error = pthread_create(&td, NULL, ctrl_thread, res); 1053 PJDLOG_ASSERT(error == 0); 1054 if (real_remote(res)) { 1055 error = init_remote(res, NULL, NULL); 1056 if (error == 0) { 1057 sync_start(); 1058 } else if (error == EBUSY) { 1059 time_t start = time(NULL); 1060 1061 pjdlog_warning("Waiting for remote node to become %s for %ds.", 1062 role2str(HAST_ROLE_SECONDARY), 1063 res->hr_timeout); 1064 for (;;) { 1065 sleep(1); 1066 error = init_remote(res, NULL, NULL); 1067 if (error != EBUSY) 1068 break; 1069 if (time(NULL) > start + res->hr_timeout) 1070 break; 1071 } 1072 if (error == EBUSY) { 1073 pjdlog_warning("Remote node is still %s, starting anyway.", 1074 role2str(HAST_ROLE_PRIMARY)); 1075 } 1076 } 1077 } 1078 error = pthread_create(&td, NULL, ggate_recv_thread, res); 1079 PJDLOG_ASSERT(error == 0); 1080 error = pthread_create(&td, NULL, local_send_thread, res); 1081 PJDLOG_ASSERT(error == 0); 1082 error = pthread_create(&td, NULL, remote_send_thread, res); 1083 PJDLOG_ASSERT(error == 0); 1084 error = pthread_create(&td, NULL, remote_recv_thread, res); 1085 PJDLOG_ASSERT(error == 0); 1086 error = pthread_create(&td, NULL, ggate_send_thread, res); 1087 PJDLOG_ASSERT(error == 0); 1088 fullystarted = true; 1089 (void)sync_thread(res); 1090 } 1091 1092 static void 1093 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, 1094 const char *fmt, ...) 1095 { 1096 char msg[1024]; 1097 va_list ap; 1098 1099 va_start(ap, fmt); 1100 (void)vsnprintf(msg, sizeof(msg), fmt, ap); 1101 va_end(ap); 1102 switch (ggio->gctl_cmd) { 1103 case BIO_READ: 1104 (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1105 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1106 break; 1107 case BIO_DELETE: 1108 (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1109 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1110 break; 1111 case BIO_FLUSH: 1112 (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1113 break; 1114 case BIO_WRITE: 1115 (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1116 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1117 break; 1118 default: 1119 (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1120 (unsigned int)ggio->gctl_cmd); 1121 break; 1122 } 1123 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1124 } 1125 1126 static void 1127 remote_close(struct hast_resource *res, int ncomp) 1128 { 1129 1130 rw_wlock(&hio_remote_lock[ncomp]); 1131 /* 1132 * Check for a race between dropping rlock and acquiring wlock - 1133 * another thread can close connection in-between. 1134 */ 1135 if (!ISCONNECTED(res, ncomp)) { 1136 PJDLOG_ASSERT(res->hr_remotein == NULL); 1137 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1138 rw_unlock(&hio_remote_lock[ncomp]); 1139 return; 1140 } 1141 1142 PJDLOG_ASSERT(res->hr_remotein != NULL); 1143 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1144 1145 pjdlog_debug(2, "Closing incoming connection to %s.", 1146 res->hr_remoteaddr); 1147 proto_close(res->hr_remotein); 1148 res->hr_remotein = NULL; 1149 pjdlog_debug(2, "Closing outgoing connection to %s.", 1150 res->hr_remoteaddr); 1151 proto_close(res->hr_remoteout); 1152 res->hr_remoteout = NULL; 1153 1154 rw_unlock(&hio_remote_lock[ncomp]); 1155 1156 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1157 1158 /* 1159 * Stop synchronization if in-progress. 1160 */ 1161 sync_stop(); 1162 1163 event_send(res, EVENT_DISCONNECT); 1164 } 1165 1166 /* 1167 * Acknowledge write completion to the kernel, but don't update activemap yet. 1168 */ 1169 static void 1170 write_complete(struct hast_resource *res, struct hio *hio) 1171 { 1172 struct g_gate_ctl_io *ggio; 1173 unsigned int ncomp; 1174 1175 PJDLOG_ASSERT(!hio->hio_done); 1176 1177 ggio = &hio->hio_ggio; 1178 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1179 1180 /* 1181 * Bump local count if this is first write after 1182 * connection failure with remote node. 1183 */ 1184 ncomp = 1; 1185 rw_rlock(&hio_remote_lock[ncomp]); 1186 if (!ISCONNECTED(res, ncomp)) { 1187 mtx_lock(&metadata_lock); 1188 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1189 res->hr_primary_localcnt++; 1190 pjdlog_debug(1, "Increasing localcnt to %ju.", 1191 (uintmax_t)res->hr_primary_localcnt); 1192 (void)metadata_write(res); 1193 } 1194 mtx_unlock(&metadata_lock); 1195 } 1196 rw_unlock(&hio_remote_lock[ncomp]); 1197 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1198 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1199 hio->hio_done = true; 1200 } 1201 1202 /* 1203 * Thread receives ggate I/O requests from the kernel and passes them to 1204 * appropriate threads: 1205 * WRITE - always goes to both local_send and remote_send threads 1206 * READ (when the block is up-to-date on local component) - 1207 * only local_send thread 1208 * READ (when the block isn't up-to-date on local component) - 1209 * only remote_send thread 1210 * DELETE - always goes to both local_send and remote_send threads 1211 * FLUSH - always goes to both local_send and remote_send threads 1212 */ 1213 static void * 1214 ggate_recv_thread(void *arg) 1215 { 1216 struct hast_resource *res = arg; 1217 struct g_gate_ctl_io *ggio; 1218 struct hio *hio; 1219 unsigned int ii, ncomp, ncomps; 1220 int error; 1221 1222 for (;;) { 1223 pjdlog_debug(2, "ggate_recv: Taking free request."); 1224 QUEUE_TAKE2(hio, free); 1225 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1226 ggio = &hio->hio_ggio; 1227 ggio->gctl_unit = res->hr_ggateunit; 1228 ggio->gctl_length = MAXPHYS; 1229 ggio->gctl_error = 0; 1230 hio->hio_done = false; 1231 hio->hio_replication = res->hr_replication; 1232 pjdlog_debug(2, 1233 "ggate_recv: (%p) Waiting for request from the kernel.", 1234 hio); 1235 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1236 if (sigexit_received) 1237 pthread_exit(NULL); 1238 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1239 } 1240 error = ggio->gctl_error; 1241 switch (error) { 1242 case 0: 1243 break; 1244 case ECANCELED: 1245 /* Exit gracefully. */ 1246 if (!sigexit_received) { 1247 pjdlog_debug(2, 1248 "ggate_recv: (%p) Received cancel from the kernel.", 1249 hio); 1250 pjdlog_info("Received cancel from the kernel, exiting."); 1251 } 1252 pthread_exit(NULL); 1253 case ENOMEM: 1254 /* 1255 * Buffer too small? Impossible, we allocate MAXPHYS 1256 * bytes - request can't be bigger than that. 1257 */ 1258 /* FALLTHROUGH */ 1259 case ENXIO: 1260 default: 1261 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1262 strerror(error)); 1263 } 1264 1265 ncomp = 0; 1266 ncomps = HAST_NCOMPONENTS; 1267 1268 for (ii = 0; ii < ncomps; ii++) 1269 hio->hio_errors[ii] = EINVAL; 1270 reqlog(LOG_DEBUG, 2, ggio, 1271 "ggate_recv: (%p) Request received from the kernel: ", 1272 hio); 1273 1274 /* 1275 * Inform all components about new write request. 1276 * For read request prefer local component unless the given 1277 * range is out-of-date, then use remote component. 1278 */ 1279 switch (ggio->gctl_cmd) { 1280 case BIO_READ: 1281 res->hr_stat_read++; 1282 ncomps = 1; 1283 mtx_lock(&metadata_lock); 1284 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1285 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1286 /* 1287 * This range is up-to-date on local component, 1288 * so handle request locally. 1289 */ 1290 /* Local component is 0 for now. */ 1291 ncomp = 0; 1292 } else /* if (res->hr_syncsrc == 1293 HAST_SYNCSRC_SECONDARY) */ { 1294 PJDLOG_ASSERT(res->hr_syncsrc == 1295 HAST_SYNCSRC_SECONDARY); 1296 /* 1297 * This range is out-of-date on local component, 1298 * so send request to the remote node. 1299 */ 1300 /* Remote component is 1 for now. */ 1301 ncomp = 1; 1302 } 1303 mtx_unlock(&metadata_lock); 1304 break; 1305 case BIO_WRITE: 1306 res->hr_stat_write++; 1307 if (res->hr_resuid == 0 && 1308 res->hr_primary_localcnt == 0) { 1309 /* This is first write. */ 1310 res->hr_primary_localcnt = 1; 1311 } 1312 for (;;) { 1313 mtx_lock(&range_lock); 1314 if (rangelock_islocked(range_sync, 1315 ggio->gctl_offset, ggio->gctl_length)) { 1316 pjdlog_debug(2, 1317 "regular: Range offset=%jd length=%zu locked.", 1318 (intmax_t)ggio->gctl_offset, 1319 (size_t)ggio->gctl_length); 1320 range_regular_wait = true; 1321 cv_wait(&range_regular_cond, &range_lock); 1322 range_regular_wait = false; 1323 mtx_unlock(&range_lock); 1324 continue; 1325 } 1326 if (rangelock_add(range_regular, 1327 ggio->gctl_offset, ggio->gctl_length) == -1) { 1328 mtx_unlock(&range_lock); 1329 pjdlog_debug(2, 1330 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1331 (intmax_t)ggio->gctl_offset, 1332 (size_t)ggio->gctl_length); 1333 sleep(1); 1334 continue; 1335 } 1336 mtx_unlock(&range_lock); 1337 break; 1338 } 1339 mtx_lock(&res->hr_amp_lock); 1340 if (activemap_write_start(res->hr_amp, 1341 ggio->gctl_offset, ggio->gctl_length)) { 1342 res->hr_stat_activemap_update++; 1343 (void)hast_activemap_flush(res); 1344 } else { 1345 mtx_unlock(&res->hr_amp_lock); 1346 } 1347 break; 1348 case BIO_DELETE: 1349 res->hr_stat_delete++; 1350 break; 1351 case BIO_FLUSH: 1352 res->hr_stat_flush++; 1353 break; 1354 } 1355 pjdlog_debug(2, 1356 "ggate_recv: (%p) Moving request to the send queues.", hio); 1357 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1358 ggio->gctl_cmd == BIO_WRITE) { 1359 /* Each remote request needs two responses in memsync. */ 1360 refcnt_init(&hio->hio_countdown, ncomps + 1); 1361 } else { 1362 refcnt_init(&hio->hio_countdown, ncomps); 1363 } 1364 for (ii = ncomp; ii < ncomps; ii++) 1365 QUEUE_INSERT1(hio, send, ii); 1366 } 1367 /* NOTREACHED */ 1368 return (NULL); 1369 } 1370 1371 /* 1372 * Thread reads from or writes to local component. 1373 * If local read fails, it redirects it to remote_send thread. 1374 */ 1375 static void * 1376 local_send_thread(void *arg) 1377 { 1378 struct hast_resource *res = arg; 1379 struct g_gate_ctl_io *ggio; 1380 struct hio *hio; 1381 unsigned int ncomp, rncomp; 1382 ssize_t ret; 1383 1384 /* Local component is 0 for now. */ 1385 ncomp = 0; 1386 /* Remote component is 1 for now. */ 1387 rncomp = 1; 1388 1389 for (;;) { 1390 pjdlog_debug(2, "local_send: Taking request."); 1391 QUEUE_TAKE1(hio, send, ncomp, 0); 1392 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1393 ggio = &hio->hio_ggio; 1394 switch (ggio->gctl_cmd) { 1395 case BIO_READ: 1396 ret = pread(res->hr_localfd, ggio->gctl_data, 1397 ggio->gctl_length, 1398 ggio->gctl_offset + res->hr_localoff); 1399 if (ret == ggio->gctl_length) 1400 hio->hio_errors[ncomp] = 0; 1401 else if (!ISSYNCREQ(hio)) { 1402 /* 1403 * If READ failed, try to read from remote node. 1404 */ 1405 if (ret == -1) { 1406 reqlog(LOG_WARNING, 0, ggio, 1407 "Local request failed (%s), trying remote node. ", 1408 strerror(errno)); 1409 } else if (ret != ggio->gctl_length) { 1410 reqlog(LOG_WARNING, 0, ggio, 1411 "Local request failed (%zd != %jd), trying remote node. ", 1412 ret, (intmax_t)ggio->gctl_length); 1413 } 1414 QUEUE_INSERT1(hio, send, rncomp); 1415 continue; 1416 } 1417 break; 1418 case BIO_WRITE: 1419 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1420 ggio->gctl_length, 1421 ggio->gctl_offset + res->hr_localoff); 1422 if (ret == -1) { 1423 hio->hio_errors[ncomp] = errno; 1424 reqlog(LOG_WARNING, 0, ggio, 1425 "Local request failed (%s): ", 1426 strerror(errno)); 1427 } else if (ret != ggio->gctl_length) { 1428 hio->hio_errors[ncomp] = EIO; 1429 reqlog(LOG_WARNING, 0, ggio, 1430 "Local request failed (%zd != %jd): ", 1431 ret, (intmax_t)ggio->gctl_length); 1432 } else { 1433 hio->hio_errors[ncomp] = 0; 1434 if (hio->hio_replication == 1435 HAST_REPLICATION_ASYNC) { 1436 ggio->gctl_error = 0; 1437 write_complete(res, hio); 1438 } 1439 } 1440 break; 1441 case BIO_DELETE: 1442 ret = g_delete(res->hr_localfd, 1443 ggio->gctl_offset + res->hr_localoff, 1444 ggio->gctl_length); 1445 if (ret == -1) { 1446 hio->hio_errors[ncomp] = errno; 1447 reqlog(LOG_WARNING, 0, ggio, 1448 "Local request failed (%s): ", 1449 strerror(errno)); 1450 } else { 1451 hio->hio_errors[ncomp] = 0; 1452 } 1453 break; 1454 case BIO_FLUSH: 1455 if (!res->hr_localflush) { 1456 ret = -1; 1457 errno = EOPNOTSUPP; 1458 break; 1459 } 1460 ret = g_flush(res->hr_localfd); 1461 if (ret == -1) { 1462 if (errno == EOPNOTSUPP) 1463 res->hr_localflush = false; 1464 hio->hio_errors[ncomp] = errno; 1465 reqlog(LOG_WARNING, 0, ggio, 1466 "Local request failed (%s): ", 1467 strerror(errno)); 1468 } else { 1469 hio->hio_errors[ncomp] = 0; 1470 } 1471 break; 1472 } 1473 1474 if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1475 ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1476 if (refcnt_release(&hio->hio_countdown) > 0) 1477 continue; 1478 } else { 1479 /* 1480 * Depending on hio_countdown value, requests finished 1481 * in the following order: 1482 * 0: remote memsync, remote final, local write 1483 * 1: remote memsync, local write, (remote final) 1484 * 2: local write, (remote memsync), (remote final) 1485 */ 1486 switch (refcnt_release(&hio->hio_countdown)) { 1487 case 0: 1488 /* 1489 * Local write finished as last. 1490 */ 1491 break; 1492 case 1: 1493 /* 1494 * Local write finished after remote memsync 1495 * reply arrvied. We can complete the write now. 1496 */ 1497 if (hio->hio_errors[0] == 0) 1498 write_complete(res, hio); 1499 continue; 1500 case 2: 1501 /* 1502 * Local write finished as first. 1503 */ 1504 continue; 1505 default: 1506 PJDLOG_ABORT("Invalid hio_countdown."); 1507 } 1508 } 1509 if (ISSYNCREQ(hio)) { 1510 mtx_lock(&sync_lock); 1511 SYNCREQDONE(hio); 1512 mtx_unlock(&sync_lock); 1513 cv_signal(&sync_cond); 1514 } else { 1515 pjdlog_debug(2, 1516 "local_send: (%p) Moving request to the done queue.", 1517 hio); 1518 QUEUE_INSERT2(hio, done); 1519 } 1520 } 1521 /* NOTREACHED */ 1522 return (NULL); 1523 } 1524 1525 static void 1526 keepalive_send(struct hast_resource *res, unsigned int ncomp) 1527 { 1528 struct nv *nv; 1529 1530 rw_rlock(&hio_remote_lock[ncomp]); 1531 1532 if (!ISCONNECTED(res, ncomp)) { 1533 rw_unlock(&hio_remote_lock[ncomp]); 1534 return; 1535 } 1536 1537 PJDLOG_ASSERT(res->hr_remotein != NULL); 1538 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1539 1540 nv = nv_alloc(); 1541 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1542 if (nv_error(nv) != 0) { 1543 rw_unlock(&hio_remote_lock[ncomp]); 1544 nv_free(nv); 1545 pjdlog_debug(1, 1546 "keepalive_send: Unable to prepare header to send."); 1547 return; 1548 } 1549 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1550 rw_unlock(&hio_remote_lock[ncomp]); 1551 pjdlog_common(LOG_DEBUG, 1, errno, 1552 "keepalive_send: Unable to send request"); 1553 nv_free(nv); 1554 remote_close(res, ncomp); 1555 return; 1556 } 1557 1558 rw_unlock(&hio_remote_lock[ncomp]); 1559 nv_free(nv); 1560 pjdlog_debug(2, "keepalive_send: Request sent."); 1561 } 1562 1563 /* 1564 * Thread sends request to secondary node. 1565 */ 1566 static void * 1567 remote_send_thread(void *arg) 1568 { 1569 struct hast_resource *res = arg; 1570 struct g_gate_ctl_io *ggio; 1571 time_t lastcheck, now; 1572 struct hio *hio; 1573 struct nv *nv; 1574 unsigned int ncomp; 1575 bool wakeup; 1576 uint64_t offset, length; 1577 uint8_t cmd; 1578 void *data; 1579 1580 /* Remote component is 1 for now. */ 1581 ncomp = 1; 1582 lastcheck = time(NULL); 1583 1584 for (;;) { 1585 pjdlog_debug(2, "remote_send: Taking request."); 1586 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1587 if (hio == NULL) { 1588 now = time(NULL); 1589 if (lastcheck + HAST_KEEPALIVE <= now) { 1590 keepalive_send(res, ncomp); 1591 lastcheck = now; 1592 } 1593 continue; 1594 } 1595 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1596 ggio = &hio->hio_ggio; 1597 switch (ggio->gctl_cmd) { 1598 case BIO_READ: 1599 cmd = HIO_READ; 1600 data = NULL; 1601 offset = ggio->gctl_offset; 1602 length = ggio->gctl_length; 1603 break; 1604 case BIO_WRITE: 1605 cmd = HIO_WRITE; 1606 data = ggio->gctl_data; 1607 offset = ggio->gctl_offset; 1608 length = ggio->gctl_length; 1609 break; 1610 case BIO_DELETE: 1611 cmd = HIO_DELETE; 1612 data = NULL; 1613 offset = ggio->gctl_offset; 1614 length = ggio->gctl_length; 1615 break; 1616 case BIO_FLUSH: 1617 cmd = HIO_FLUSH; 1618 data = NULL; 1619 offset = 0; 1620 length = 0; 1621 break; 1622 default: 1623 PJDLOG_ABORT("invalid condition"); 1624 } 1625 nv = nv_alloc(); 1626 nv_add_uint8(nv, cmd, "cmd"); 1627 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1628 nv_add_uint64(nv, offset, "offset"); 1629 nv_add_uint64(nv, length, "length"); 1630 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1631 ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { 1632 nv_add_uint8(nv, 1, "memsync"); 1633 } 1634 if (nv_error(nv) != 0) { 1635 hio->hio_errors[ncomp] = nv_error(nv); 1636 pjdlog_debug(2, 1637 "remote_send: (%p) Unable to prepare header to send.", 1638 hio); 1639 reqlog(LOG_ERR, 0, ggio, 1640 "Unable to prepare header to send (%s): ", 1641 strerror(nv_error(nv))); 1642 /* Move failed request immediately to the done queue. */ 1643 goto done_queue; 1644 } 1645 /* 1646 * Protect connection from disappearing. 1647 */ 1648 rw_rlock(&hio_remote_lock[ncomp]); 1649 if (!ISCONNECTED(res, ncomp)) { 1650 rw_unlock(&hio_remote_lock[ncomp]); 1651 hio->hio_errors[ncomp] = ENOTCONN; 1652 goto done_queue; 1653 } 1654 /* 1655 * Move the request to recv queue before sending it, because 1656 * in different order we can get reply before we move request 1657 * to recv queue. 1658 */ 1659 pjdlog_debug(2, 1660 "remote_send: (%p) Moving request to the recv queue.", 1661 hio); 1662 mtx_lock(&hio_recv_list_lock[ncomp]); 1663 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1664 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1665 hio_recv_list_size[ncomp]++; 1666 mtx_unlock(&hio_recv_list_lock[ncomp]); 1667 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1668 data != NULL ? length : 0) == -1) { 1669 hio->hio_errors[ncomp] = errno; 1670 rw_unlock(&hio_remote_lock[ncomp]); 1671 pjdlog_debug(2, 1672 "remote_send: (%p) Unable to send request.", hio); 1673 reqlog(LOG_ERR, 0, ggio, 1674 "Unable to send request (%s): ", 1675 strerror(hio->hio_errors[ncomp])); 1676 remote_close(res, ncomp); 1677 /* 1678 * Take request back from the receive queue and move 1679 * it immediately to the done queue. 1680 */ 1681 mtx_lock(&hio_recv_list_lock[ncomp]); 1682 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1683 hio_next[ncomp]); 1684 hio_recv_list_size[ncomp]--; 1685 mtx_unlock(&hio_recv_list_lock[ncomp]); 1686 goto done_queue; 1687 } 1688 rw_unlock(&hio_remote_lock[ncomp]); 1689 nv_free(nv); 1690 if (wakeup) 1691 cv_signal(&hio_recv_list_cond[ncomp]); 1692 continue; 1693 done_queue: 1694 nv_free(nv); 1695 if (ISSYNCREQ(hio)) { 1696 if (refcnt_release(&hio->hio_countdown) > 0) 1697 continue; 1698 mtx_lock(&sync_lock); 1699 SYNCREQDONE(hio); 1700 mtx_unlock(&sync_lock); 1701 cv_signal(&sync_cond); 1702 continue; 1703 } 1704 if (ggio->gctl_cmd == BIO_WRITE) { 1705 mtx_lock(&res->hr_amp_lock); 1706 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1707 ggio->gctl_length)) { 1708 (void)hast_activemap_flush(res); 1709 } else { 1710 mtx_unlock(&res->hr_amp_lock); 1711 } 1712 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) 1713 (void)refcnt_release(&hio->hio_countdown); 1714 } 1715 if (refcnt_release(&hio->hio_countdown) > 0) 1716 continue; 1717 pjdlog_debug(2, 1718 "remote_send: (%p) Moving request to the done queue.", 1719 hio); 1720 QUEUE_INSERT2(hio, done); 1721 } 1722 /* NOTREACHED */ 1723 return (NULL); 1724 } 1725 1726 /* 1727 * Thread receives answer from secondary node and passes it to ggate_send 1728 * thread. 1729 */ 1730 static void * 1731 remote_recv_thread(void *arg) 1732 { 1733 struct hast_resource *res = arg; 1734 struct g_gate_ctl_io *ggio; 1735 struct hio *hio; 1736 struct nv *nv; 1737 unsigned int ncomp; 1738 uint64_t seq; 1739 bool memsyncack; 1740 int error; 1741 1742 /* Remote component is 1 for now. */ 1743 ncomp = 1; 1744 1745 for (;;) { 1746 /* Wait until there is anything to receive. */ 1747 mtx_lock(&hio_recv_list_lock[ncomp]); 1748 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1749 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1750 cv_wait(&hio_recv_list_cond[ncomp], 1751 &hio_recv_list_lock[ncomp]); 1752 } 1753 mtx_unlock(&hio_recv_list_lock[ncomp]); 1754 1755 memsyncack = false; 1756 1757 rw_rlock(&hio_remote_lock[ncomp]); 1758 if (!ISCONNECTED(res, ncomp)) { 1759 rw_unlock(&hio_remote_lock[ncomp]); 1760 /* 1761 * Connection is dead, so move all pending requests to 1762 * the done queue (one-by-one). 1763 */ 1764 mtx_lock(&hio_recv_list_lock[ncomp]); 1765 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1766 PJDLOG_ASSERT(hio != NULL); 1767 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1768 hio_next[ncomp]); 1769 hio_recv_list_size[ncomp]--; 1770 mtx_unlock(&hio_recv_list_lock[ncomp]); 1771 goto done_queue; 1772 } 1773 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1774 pjdlog_errno(LOG_ERR, 1775 "Unable to receive reply header"); 1776 rw_unlock(&hio_remote_lock[ncomp]); 1777 remote_close(res, ncomp); 1778 continue; 1779 } 1780 rw_unlock(&hio_remote_lock[ncomp]); 1781 seq = nv_get_uint64(nv, "seq"); 1782 if (seq == 0) { 1783 pjdlog_error("Header contains no 'seq' field."); 1784 nv_free(nv); 1785 continue; 1786 } 1787 memsyncack = nv_exists(nv, "received"); 1788 mtx_lock(&hio_recv_list_lock[ncomp]); 1789 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1790 if (hio->hio_ggio.gctl_seq == seq) { 1791 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1792 hio_next[ncomp]); 1793 hio_recv_list_size[ncomp]--; 1794 break; 1795 } 1796 } 1797 mtx_unlock(&hio_recv_list_lock[ncomp]); 1798 if (hio == NULL) { 1799 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1800 (uintmax_t)seq); 1801 nv_free(nv); 1802 continue; 1803 } 1804 ggio = &hio->hio_ggio; 1805 error = nv_get_int16(nv, "error"); 1806 if (error != 0) { 1807 /* Request failed on remote side. */ 1808 hio->hio_errors[ncomp] = error; 1809 reqlog(LOG_WARNING, 0, ggio, 1810 "Remote request failed (%s): ", strerror(error)); 1811 nv_free(nv); 1812 goto done_queue; 1813 } 1814 switch (ggio->gctl_cmd) { 1815 case BIO_READ: 1816 rw_rlock(&hio_remote_lock[ncomp]); 1817 if (!ISCONNECTED(res, ncomp)) { 1818 rw_unlock(&hio_remote_lock[ncomp]); 1819 nv_free(nv); 1820 goto done_queue; 1821 } 1822 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1823 ggio->gctl_data, ggio->gctl_length) == -1) { 1824 hio->hio_errors[ncomp] = errno; 1825 pjdlog_errno(LOG_ERR, 1826 "Unable to receive reply data"); 1827 rw_unlock(&hio_remote_lock[ncomp]); 1828 nv_free(nv); 1829 remote_close(res, ncomp); 1830 goto done_queue; 1831 } 1832 rw_unlock(&hio_remote_lock[ncomp]); 1833 break; 1834 case BIO_WRITE: 1835 case BIO_DELETE: 1836 case BIO_FLUSH: 1837 break; 1838 default: 1839 PJDLOG_ABORT("invalid condition"); 1840 } 1841 hio->hio_errors[ncomp] = 0; 1842 nv_free(nv); 1843 done_queue: 1844 if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1845 hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1846 if (refcnt_release(&hio->hio_countdown) > 0) 1847 continue; 1848 } else { 1849 /* 1850 * Depending on hio_countdown value, requests finished 1851 * in the following order: 1852 * 1853 * 0: local write, remote memsync, remote final 1854 * or 1855 * 0: remote memsync, local write, remote final 1856 * 1857 * 1: local write, remote memsync, (remote final) 1858 * or 1859 * 1: remote memsync, remote final, (local write) 1860 * 1861 * 2: remote memsync, (local write), (remote final) 1862 * or 1863 * 2: remote memsync, (remote final), (local write) 1864 */ 1865 switch (refcnt_release(&hio->hio_countdown)) { 1866 case 0: 1867 /* 1868 * Remote final reply arrived. 1869 */ 1870 PJDLOG_ASSERT(!memsyncack); 1871 break; 1872 case 1: 1873 if (memsyncack) { 1874 /* 1875 * Local request already finished, so we 1876 * can complete the write. 1877 */ 1878 if (hio->hio_errors[0] == 0) 1879 write_complete(res, hio); 1880 /* 1881 * We still need to wait for final 1882 * remote reply. 1883 */ 1884 pjdlog_debug(2, 1885 "remote_recv: (%p) Moving request back to the recv queue.", 1886 hio); 1887 mtx_lock(&hio_recv_list_lock[ncomp]); 1888 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], 1889 hio, hio_next[ncomp]); 1890 hio_recv_list_size[ncomp]++; 1891 mtx_unlock(&hio_recv_list_lock[ncomp]); 1892 } else { 1893 /* 1894 * Remote final reply arrived before 1895 * local write finished. 1896 * Nothing to do in such case. 1897 */ 1898 } 1899 continue; 1900 case 2: 1901 /* 1902 * We received remote memsync reply even before 1903 * local write finished. 1904 */ 1905 PJDLOG_ASSERT(memsyncack); 1906 1907 pjdlog_debug(2, 1908 "remote_recv: (%p) Moving request back to the recv queue.", 1909 hio); 1910 mtx_lock(&hio_recv_list_lock[ncomp]); 1911 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, 1912 hio_next[ncomp]); 1913 hio_recv_list_size[ncomp]++; 1914 mtx_unlock(&hio_recv_list_lock[ncomp]); 1915 continue; 1916 default: 1917 PJDLOG_ABORT("Invalid hio_countdown."); 1918 } 1919 } 1920 if (ISSYNCREQ(hio)) { 1921 mtx_lock(&sync_lock); 1922 SYNCREQDONE(hio); 1923 mtx_unlock(&sync_lock); 1924 cv_signal(&sync_cond); 1925 } else { 1926 pjdlog_debug(2, 1927 "remote_recv: (%p) Moving request to the done queue.", 1928 hio); 1929 QUEUE_INSERT2(hio, done); 1930 } 1931 } 1932 /* NOTREACHED */ 1933 return (NULL); 1934 } 1935 1936 /* 1937 * Thread sends answer to the kernel. 1938 */ 1939 static void * 1940 ggate_send_thread(void *arg) 1941 { 1942 struct hast_resource *res = arg; 1943 struct g_gate_ctl_io *ggio; 1944 struct hio *hio; 1945 unsigned int ii, ncomps; 1946 1947 ncomps = HAST_NCOMPONENTS; 1948 1949 for (;;) { 1950 pjdlog_debug(2, "ggate_send: Taking request."); 1951 QUEUE_TAKE2(hio, done); 1952 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1953 ggio = &hio->hio_ggio; 1954 for (ii = 0; ii < ncomps; ii++) { 1955 if (hio->hio_errors[ii] == 0) { 1956 /* 1957 * One successful request is enough to declare 1958 * success. 1959 */ 1960 ggio->gctl_error = 0; 1961 break; 1962 } 1963 } 1964 if (ii == ncomps) { 1965 /* 1966 * None of the requests were successful. 1967 * Use the error from local component except the 1968 * case when we did only remote request. 1969 */ 1970 if (ggio->gctl_cmd == BIO_READ && 1971 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1972 ggio->gctl_error = hio->hio_errors[1]; 1973 else 1974 ggio->gctl_error = hio->hio_errors[0]; 1975 } 1976 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1977 mtx_lock(&res->hr_amp_lock); 1978 if (activemap_write_complete(res->hr_amp, 1979 ggio->gctl_offset, ggio->gctl_length)) { 1980 res->hr_stat_activemap_update++; 1981 (void)hast_activemap_flush(res); 1982 } else { 1983 mtx_unlock(&res->hr_amp_lock); 1984 } 1985 } 1986 if (ggio->gctl_cmd == BIO_WRITE) { 1987 /* 1988 * Unlock range we locked. 1989 */ 1990 mtx_lock(&range_lock); 1991 rangelock_del(range_regular, ggio->gctl_offset, 1992 ggio->gctl_length); 1993 if (range_sync_wait) 1994 cv_signal(&range_sync_cond); 1995 mtx_unlock(&range_lock); 1996 if (!hio->hio_done) 1997 write_complete(res, hio); 1998 } else { 1999 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 2000 primary_exit(EX_OSERR, 2001 "G_GATE_CMD_DONE failed"); 2002 } 2003 } 2004 if (hio->hio_errors[0]) { 2005 switch (ggio->gctl_cmd) { 2006 case BIO_READ: 2007 res->hr_stat_read_error++; 2008 break; 2009 case BIO_WRITE: 2010 res->hr_stat_write_error++; 2011 break; 2012 case BIO_DELETE: 2013 res->hr_stat_delete_error++; 2014 break; 2015 case BIO_FLUSH: 2016 res->hr_stat_flush_error++; 2017 break; 2018 } 2019 } 2020 pjdlog_debug(2, 2021 "ggate_send: (%p) Moving request to the free queue.", hio); 2022 QUEUE_INSERT2(hio, free); 2023 } 2024 /* NOTREACHED */ 2025 return (NULL); 2026 } 2027 2028 /* 2029 * Thread synchronize local and remote components. 2030 */ 2031 static void * 2032 sync_thread(void *arg __unused) 2033 { 2034 struct hast_resource *res = arg; 2035 struct hio *hio; 2036 struct g_gate_ctl_io *ggio; 2037 struct timeval tstart, tend, tdiff; 2038 unsigned int ii, ncomp, ncomps; 2039 off_t offset, length, synced; 2040 bool dorewind, directreads; 2041 int syncext; 2042 2043 ncomps = HAST_NCOMPONENTS; 2044 dorewind = true; 2045 synced = 0; 2046 offset = -1; 2047 directreads = false; 2048 2049 for (;;) { 2050 mtx_lock(&sync_lock); 2051 if (offset >= 0 && !sync_inprogress) { 2052 gettimeofday(&tend, NULL); 2053 timersub(&tend, &tstart, &tdiff); 2054 pjdlog_info("Synchronization interrupted after %#.0T. " 2055 "%NB synchronized so far.", &tdiff, 2056 (intmax_t)synced); 2057 event_send(res, EVENT_SYNCINTR); 2058 } 2059 while (!sync_inprogress) { 2060 dorewind = true; 2061 synced = 0; 2062 cv_wait(&sync_cond, &sync_lock); 2063 } 2064 mtx_unlock(&sync_lock); 2065 /* 2066 * Obtain offset at which we should synchronize. 2067 * Rewind synchronization if needed. 2068 */ 2069 mtx_lock(&res->hr_amp_lock); 2070 if (dorewind) 2071 activemap_sync_rewind(res->hr_amp); 2072 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 2073 if (syncext != -1) { 2074 /* 2075 * We synchronized entire syncext extent, we can mark 2076 * it as clean now. 2077 */ 2078 if (activemap_extent_complete(res->hr_amp, syncext)) 2079 (void)hast_activemap_flush(res); 2080 else 2081 mtx_unlock(&res->hr_amp_lock); 2082 } else { 2083 mtx_unlock(&res->hr_amp_lock); 2084 } 2085 if (dorewind) { 2086 dorewind = false; 2087 if (offset == -1) 2088 pjdlog_info("Nodes are in sync."); 2089 else { 2090 pjdlog_info("Synchronization started. %NB to go.", 2091 (intmax_t)(res->hr_extentsize * 2092 activemap_ndirty(res->hr_amp))); 2093 event_send(res, EVENT_SYNCSTART); 2094 gettimeofday(&tstart, NULL); 2095 } 2096 } 2097 if (offset == -1) { 2098 sync_stop(); 2099 pjdlog_debug(1, "Nothing to synchronize."); 2100 /* 2101 * Synchronization complete, make both localcnt and 2102 * remotecnt equal. 2103 */ 2104 ncomp = 1; 2105 rw_rlock(&hio_remote_lock[ncomp]); 2106 if (ISCONNECTED(res, ncomp)) { 2107 if (synced > 0) { 2108 int64_t bps; 2109 2110 gettimeofday(&tend, NULL); 2111 timersub(&tend, &tstart, &tdiff); 2112 bps = (int64_t)((double)synced / 2113 ((double)tdiff.tv_sec + 2114 (double)tdiff.tv_usec / 1000000)); 2115 pjdlog_info("Synchronization complete. " 2116 "%NB synchronized in %#.0lT (%NB/sec).", 2117 (intmax_t)synced, &tdiff, 2118 (intmax_t)bps); 2119 event_send(res, EVENT_SYNCDONE); 2120 } 2121 mtx_lock(&metadata_lock); 2122 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 2123 directreads = true; 2124 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 2125 res->hr_primary_localcnt = 2126 res->hr_secondary_remotecnt; 2127 res->hr_primary_remotecnt = 2128 res->hr_secondary_localcnt; 2129 pjdlog_debug(1, 2130 "Setting localcnt to %ju and remotecnt to %ju.", 2131 (uintmax_t)res->hr_primary_localcnt, 2132 (uintmax_t)res->hr_primary_remotecnt); 2133 (void)metadata_write(res); 2134 mtx_unlock(&metadata_lock); 2135 } 2136 rw_unlock(&hio_remote_lock[ncomp]); 2137 if (directreads) { 2138 directreads = false; 2139 enable_direct_reads(res); 2140 } 2141 continue; 2142 } 2143 pjdlog_debug(2, "sync: Taking free request."); 2144 QUEUE_TAKE2(hio, free); 2145 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 2146 /* 2147 * Lock the range we are going to synchronize. We don't want 2148 * race where someone writes between our read and write. 2149 */ 2150 for (;;) { 2151 mtx_lock(&range_lock); 2152 if (rangelock_islocked(range_regular, offset, length)) { 2153 pjdlog_debug(2, 2154 "sync: Range offset=%jd length=%jd locked.", 2155 (intmax_t)offset, (intmax_t)length); 2156 range_sync_wait = true; 2157 cv_wait(&range_sync_cond, &range_lock); 2158 range_sync_wait = false; 2159 mtx_unlock(&range_lock); 2160 continue; 2161 } 2162 if (rangelock_add(range_sync, offset, length) == -1) { 2163 mtx_unlock(&range_lock); 2164 pjdlog_debug(2, 2165 "sync: Range offset=%jd length=%jd is already locked, waiting.", 2166 (intmax_t)offset, (intmax_t)length); 2167 sleep(1); 2168 continue; 2169 } 2170 mtx_unlock(&range_lock); 2171 break; 2172 } 2173 /* 2174 * First read the data from synchronization source. 2175 */ 2176 SYNCREQ(hio); 2177 ggio = &hio->hio_ggio; 2178 ggio->gctl_cmd = BIO_READ; 2179 ggio->gctl_offset = offset; 2180 ggio->gctl_length = length; 2181 ggio->gctl_error = 0; 2182 hio->hio_done = false; 2183 hio->hio_replication = res->hr_replication; 2184 for (ii = 0; ii < ncomps; ii++) 2185 hio->hio_errors[ii] = EINVAL; 2186 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2187 hio); 2188 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2189 hio); 2190 mtx_lock(&metadata_lock); 2191 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2192 /* 2193 * This range is up-to-date on local component, 2194 * so handle request locally. 2195 */ 2196 /* Local component is 0 for now. */ 2197 ncomp = 0; 2198 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2199 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2200 /* 2201 * This range is out-of-date on local component, 2202 * so send request to the remote node. 2203 */ 2204 /* Remote component is 1 for now. */ 2205 ncomp = 1; 2206 } 2207 mtx_unlock(&metadata_lock); 2208 refcnt_init(&hio->hio_countdown, 1); 2209 QUEUE_INSERT1(hio, send, ncomp); 2210 2211 /* 2212 * Let's wait for READ to finish. 2213 */ 2214 mtx_lock(&sync_lock); 2215 while (!ISSYNCREQDONE(hio)) 2216 cv_wait(&sync_cond, &sync_lock); 2217 mtx_unlock(&sync_lock); 2218 2219 if (hio->hio_errors[ncomp] != 0) { 2220 pjdlog_error("Unable to read synchronization data: %s.", 2221 strerror(hio->hio_errors[ncomp])); 2222 goto free_queue; 2223 } 2224 2225 /* 2226 * We read the data from synchronization source, now write it 2227 * to synchronization target. 2228 */ 2229 SYNCREQ(hio); 2230 ggio->gctl_cmd = BIO_WRITE; 2231 for (ii = 0; ii < ncomps; ii++) 2232 hio->hio_errors[ii] = EINVAL; 2233 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2234 hio); 2235 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2236 hio); 2237 mtx_lock(&metadata_lock); 2238 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2239 /* 2240 * This range is up-to-date on local component, 2241 * so we update remote component. 2242 */ 2243 /* Remote component is 1 for now. */ 2244 ncomp = 1; 2245 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2246 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2247 /* 2248 * This range is out-of-date on local component, 2249 * so we update it. 2250 */ 2251 /* Local component is 0 for now. */ 2252 ncomp = 0; 2253 } 2254 mtx_unlock(&metadata_lock); 2255 2256 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2257 hio); 2258 refcnt_init(&hio->hio_countdown, 1); 2259 QUEUE_INSERT1(hio, send, ncomp); 2260 2261 /* 2262 * Let's wait for WRITE to finish. 2263 */ 2264 mtx_lock(&sync_lock); 2265 while (!ISSYNCREQDONE(hio)) 2266 cv_wait(&sync_cond, &sync_lock); 2267 mtx_unlock(&sync_lock); 2268 2269 if (hio->hio_errors[ncomp] != 0) { 2270 pjdlog_error("Unable to write synchronization data: %s.", 2271 strerror(hio->hio_errors[ncomp])); 2272 goto free_queue; 2273 } 2274 2275 synced += length; 2276 free_queue: 2277 mtx_lock(&range_lock); 2278 rangelock_del(range_sync, offset, length); 2279 if (range_regular_wait) 2280 cv_signal(&range_regular_cond); 2281 mtx_unlock(&range_lock); 2282 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2283 hio); 2284 QUEUE_INSERT2(hio, free); 2285 } 2286 /* NOTREACHED */ 2287 return (NULL); 2288 } 2289 2290 void 2291 primary_config_reload(struct hast_resource *res, struct nv *nv) 2292 { 2293 unsigned int ii, ncomps; 2294 int modified, vint; 2295 const char *vstr; 2296 2297 pjdlog_info("Reloading configuration..."); 2298 2299 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2300 PJDLOG_ASSERT(gres == res); 2301 nv_assert(nv, "remoteaddr"); 2302 nv_assert(nv, "sourceaddr"); 2303 nv_assert(nv, "replication"); 2304 nv_assert(nv, "checksum"); 2305 nv_assert(nv, "compression"); 2306 nv_assert(nv, "timeout"); 2307 nv_assert(nv, "exec"); 2308 nv_assert(nv, "metaflush"); 2309 2310 ncomps = HAST_NCOMPONENTS; 2311 2312 #define MODIFIED_REMOTEADDR 0x01 2313 #define MODIFIED_SOURCEADDR 0x02 2314 #define MODIFIED_REPLICATION 0x04 2315 #define MODIFIED_CHECKSUM 0x08 2316 #define MODIFIED_COMPRESSION 0x10 2317 #define MODIFIED_TIMEOUT 0x20 2318 #define MODIFIED_EXEC 0x40 2319 #define MODIFIED_METAFLUSH 0x80 2320 modified = 0; 2321 2322 vstr = nv_get_string(nv, "remoteaddr"); 2323 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2324 /* 2325 * Don't copy res->hr_remoteaddr to gres just yet. 2326 * We want remote_close() to log disconnect from the old 2327 * addresses, not from the new ones. 2328 */ 2329 modified |= MODIFIED_REMOTEADDR; 2330 } 2331 vstr = nv_get_string(nv, "sourceaddr"); 2332 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2333 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2334 modified |= MODIFIED_SOURCEADDR; 2335 } 2336 vint = nv_get_int32(nv, "replication"); 2337 if (gres->hr_replication != vint) { 2338 gres->hr_replication = vint; 2339 modified |= MODIFIED_REPLICATION; 2340 } 2341 vint = nv_get_int32(nv, "checksum"); 2342 if (gres->hr_checksum != vint) { 2343 gres->hr_checksum = vint; 2344 modified |= MODIFIED_CHECKSUM; 2345 } 2346 vint = nv_get_int32(nv, "compression"); 2347 if (gres->hr_compression != vint) { 2348 gres->hr_compression = vint; 2349 modified |= MODIFIED_COMPRESSION; 2350 } 2351 vint = nv_get_int32(nv, "timeout"); 2352 if (gres->hr_timeout != vint) { 2353 gres->hr_timeout = vint; 2354 modified |= MODIFIED_TIMEOUT; 2355 } 2356 vstr = nv_get_string(nv, "exec"); 2357 if (strcmp(gres->hr_exec, vstr) != 0) { 2358 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2359 modified |= MODIFIED_EXEC; 2360 } 2361 vint = nv_get_int32(nv, "metaflush"); 2362 if (gres->hr_metaflush != vint) { 2363 gres->hr_metaflush = vint; 2364 modified |= MODIFIED_METAFLUSH; 2365 } 2366 2367 /* 2368 * Change timeout for connected sockets. 2369 * Don't bother if we need to reconnect. 2370 */ 2371 if ((modified & MODIFIED_TIMEOUT) != 0 && 2372 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2373 for (ii = 0; ii < ncomps; ii++) { 2374 if (!ISREMOTE(ii)) 2375 continue; 2376 rw_rlock(&hio_remote_lock[ii]); 2377 if (!ISCONNECTED(gres, ii)) { 2378 rw_unlock(&hio_remote_lock[ii]); 2379 continue; 2380 } 2381 rw_unlock(&hio_remote_lock[ii]); 2382 if (proto_timeout(gres->hr_remotein, 2383 gres->hr_timeout) == -1) { 2384 pjdlog_errno(LOG_WARNING, 2385 "Unable to set connection timeout"); 2386 } 2387 if (proto_timeout(gres->hr_remoteout, 2388 gres->hr_timeout) == -1) { 2389 pjdlog_errno(LOG_WARNING, 2390 "Unable to set connection timeout"); 2391 } 2392 } 2393 } 2394 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2395 for (ii = 0; ii < ncomps; ii++) { 2396 if (!ISREMOTE(ii)) 2397 continue; 2398 remote_close(gres, ii); 2399 } 2400 if (modified & MODIFIED_REMOTEADDR) { 2401 vstr = nv_get_string(nv, "remoteaddr"); 2402 strlcpy(gres->hr_remoteaddr, vstr, 2403 sizeof(gres->hr_remoteaddr)); 2404 } 2405 } 2406 #undef MODIFIED_REMOTEADDR 2407 #undef MODIFIED_SOURCEADDR 2408 #undef MODIFIED_REPLICATION 2409 #undef MODIFIED_CHECKSUM 2410 #undef MODIFIED_COMPRESSION 2411 #undef MODIFIED_TIMEOUT 2412 #undef MODIFIED_EXEC 2413 #undef MODIFIED_METAFLUSH 2414 2415 pjdlog_info("Configuration reloaded successfully."); 2416 } 2417 2418 static void 2419 guard_one(struct hast_resource *res, unsigned int ncomp) 2420 { 2421 struct proto_conn *in, *out; 2422 2423 if (!ISREMOTE(ncomp)) 2424 return; 2425 2426 rw_rlock(&hio_remote_lock[ncomp]); 2427 2428 if (!real_remote(res)) { 2429 rw_unlock(&hio_remote_lock[ncomp]); 2430 return; 2431 } 2432 2433 if (ISCONNECTED(res, ncomp)) { 2434 PJDLOG_ASSERT(res->hr_remotein != NULL); 2435 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2436 rw_unlock(&hio_remote_lock[ncomp]); 2437 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2438 res->hr_remoteaddr); 2439 return; 2440 } 2441 2442 PJDLOG_ASSERT(res->hr_remotein == NULL); 2443 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2444 /* 2445 * Upgrade the lock. It doesn't have to be atomic as no other thread 2446 * can change connection status from disconnected to connected. 2447 */ 2448 rw_unlock(&hio_remote_lock[ncomp]); 2449 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2450 res->hr_remoteaddr); 2451 in = out = NULL; 2452 if (init_remote(res, &in, &out) == 0) { 2453 rw_wlock(&hio_remote_lock[ncomp]); 2454 PJDLOG_ASSERT(res->hr_remotein == NULL); 2455 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2456 PJDLOG_ASSERT(in != NULL && out != NULL); 2457 res->hr_remotein = in; 2458 res->hr_remoteout = out; 2459 rw_unlock(&hio_remote_lock[ncomp]); 2460 pjdlog_info("Successfully reconnected to %s.", 2461 res->hr_remoteaddr); 2462 sync_start(); 2463 } else { 2464 /* Both connections should be NULL. */ 2465 PJDLOG_ASSERT(res->hr_remotein == NULL); 2466 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2467 PJDLOG_ASSERT(in == NULL && out == NULL); 2468 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2469 res->hr_remoteaddr); 2470 } 2471 } 2472 2473 /* 2474 * Thread guards remote connections and reconnects when needed, handles 2475 * signals, etc. 2476 */ 2477 static void * 2478 guard_thread(void *arg) 2479 { 2480 struct hast_resource *res = arg; 2481 unsigned int ii, ncomps; 2482 struct timespec timeout; 2483 time_t lastcheck, now; 2484 sigset_t mask; 2485 int signo; 2486 2487 ncomps = HAST_NCOMPONENTS; 2488 lastcheck = time(NULL); 2489 2490 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2491 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2492 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2493 2494 timeout.tv_sec = HAST_KEEPALIVE; 2495 timeout.tv_nsec = 0; 2496 signo = -1; 2497 2498 for (;;) { 2499 switch (signo) { 2500 case SIGINT: 2501 case SIGTERM: 2502 sigexit_received = true; 2503 primary_exitx(EX_OK, 2504 "Termination signal received, exiting."); 2505 break; 2506 default: 2507 break; 2508 } 2509 2510 /* 2511 * Don't check connections until we fully started, 2512 * as we may still be looping, waiting for remote node 2513 * to switch from primary to secondary. 2514 */ 2515 if (fullystarted) { 2516 pjdlog_debug(2, "remote_guard: Checking connections."); 2517 now = time(NULL); 2518 if (lastcheck + HAST_KEEPALIVE <= now) { 2519 for (ii = 0; ii < ncomps; ii++) 2520 guard_one(res, ii); 2521 lastcheck = now; 2522 } 2523 } 2524 signo = sigtimedwait(&mask, NULL, &timeout); 2525 } 2526 /* NOTREACHED */ 2527 return (NULL); 2528 } 2529