1 /*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/types.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/stat.h> 39 40 #include <geom/gate/g_gate.h> 41 42 #include <err.h> 43 #include <errno.h> 44 #include <fcntl.h> 45 #include <libgeom.h> 46 #include <pthread.h> 47 #include <signal.h> 48 #include <stdint.h> 49 #include <stdio.h> 50 #include <string.h> 51 #include <sysexits.h> 52 #include <unistd.h> 53 54 #include <activemap.h> 55 #include <nv.h> 56 #include <rangelock.h> 57 58 #include "control.h" 59 #include "event.h" 60 #include "hast.h" 61 #include "hast_proto.h" 62 #include "hastd.h" 63 #include "hooks.h" 64 #include "metadata.h" 65 #include "proto.h" 66 #include "pjdlog.h" 67 #include "refcnt.h" 68 #include "subr.h" 69 #include "synch.h" 70 71 /* The is only one remote component for now. */ 72 #define ISREMOTE(no) ((no) == 1) 73 74 struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 unsigned int hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 /* 93 * Request was already confirmed to GEOM Gate. 94 */ 95 bool hio_done; 96 /* 97 * Remember replication from the time the request was initiated, 98 * so we won't get confused when replication changes on reload. 99 */ 100 int hio_replication; 101 TAILQ_ENTRY(hio) *hio_next; 102 }; 103 #define hio_free_next hio_next[0] 104 #define hio_done_next hio_next[0] 105 106 /* 107 * Free list holds unused structures. When free list is empty, we have to wait 108 * until some in-progress requests are freed. 109 */ 110 static TAILQ_HEAD(, hio) hio_free_list; 111 static pthread_mutex_t hio_free_list_lock; 112 static pthread_cond_t hio_free_list_cond; 113 /* 114 * There is one send list for every component. One requests is placed on all 115 * send lists - each component gets the same request, but each component is 116 * responsible for managing his own send list. 117 */ 118 static TAILQ_HEAD(, hio) *hio_send_list; 119 static pthread_mutex_t *hio_send_list_lock; 120 static pthread_cond_t *hio_send_list_cond; 121 /* 122 * There is one recv list for every component, although local components don't 123 * use recv lists as local requests are done synchronously. 124 */ 125 static TAILQ_HEAD(, hio) *hio_recv_list; 126 static pthread_mutex_t *hio_recv_list_lock; 127 static pthread_cond_t *hio_recv_list_cond; 128 /* 129 * Request is placed on done list by the slowest component (the one that 130 * decreased hio_countdown from 1 to 0). 131 */ 132 static TAILQ_HEAD(, hio) hio_done_list; 133 static pthread_mutex_t hio_done_list_lock; 134 static pthread_cond_t hio_done_list_cond; 135 /* 136 * Structure below are for interaction with sync thread. 137 */ 138 static bool sync_inprogress; 139 static pthread_mutex_t sync_lock; 140 static pthread_cond_t sync_cond; 141 /* 142 * The lock below allows to synchornize access to remote connections. 143 */ 144 static pthread_rwlock_t *hio_remote_lock; 145 146 /* 147 * Lock to synchronize metadata updates. Also synchronize access to 148 * hr_primary_localcnt and hr_primary_remotecnt fields. 149 */ 150 static pthread_mutex_t metadata_lock; 151 152 /* 153 * Maximum number of outstanding I/O requests. 154 */ 155 #define HAST_HIO_MAX 256 156 /* 157 * Number of components. At this point there are only two components: local 158 * and remote, but in the future it might be possible to use multiple local 159 * and remote components. 160 */ 161 #define HAST_NCOMPONENTS 2 162 163 #define ISCONNECTED(res, no) \ 164 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 165 166 #define QUEUE_INSERT1(hio, name, ncomp) do { \ 167 bool _wakeup; \ 168 \ 169 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 170 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 171 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 172 hio_next[(ncomp)]); \ 173 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 174 if (_wakeup) \ 175 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 176 } while (0) 177 #define QUEUE_INSERT2(hio, name) do { \ 178 bool _wakeup; \ 179 \ 180 mtx_lock(&hio_##name##_list_lock); \ 181 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 182 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 183 mtx_unlock(&hio_##name##_list_lock); \ 184 if (_wakeup) \ 185 cv_signal(&hio_##name##_list_cond); \ 186 } while (0) 187 #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 188 bool _last; \ 189 \ 190 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 191 _last = false; \ 192 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 193 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 194 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 195 if ((timeout) != 0) \ 196 _last = true; \ 197 } \ 198 if (hio != NULL) { \ 199 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 200 hio_next[(ncomp)]); \ 201 } \ 202 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 203 } while (0) 204 #define QUEUE_TAKE2(hio, name) do { \ 205 mtx_lock(&hio_##name##_list_lock); \ 206 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 207 cv_wait(&hio_##name##_list_cond, \ 208 &hio_##name##_list_lock); \ 209 } \ 210 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 211 mtx_unlock(&hio_##name##_list_lock); \ 212 } while (0) 213 214 #define SYNCREQ(hio) do { \ 215 (hio)->hio_ggio.gctl_unit = -1; \ 216 (hio)->hio_ggio.gctl_seq = 1; \ 217 } while (0) 218 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 219 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 220 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 221 222 static struct hast_resource *gres; 223 224 static pthread_mutex_t range_lock; 225 static struct rangelocks *range_regular; 226 static bool range_regular_wait; 227 static pthread_cond_t range_regular_cond; 228 static struct rangelocks *range_sync; 229 static bool range_sync_wait; 230 static pthread_cond_t range_sync_cond; 231 static bool fullystarted; 232 233 static void *ggate_recv_thread(void *arg); 234 static void *local_send_thread(void *arg); 235 static void *remote_send_thread(void *arg); 236 static void *remote_recv_thread(void *arg); 237 static void *ggate_send_thread(void *arg); 238 static void *sync_thread(void *arg); 239 static void *guard_thread(void *arg); 240 241 static void 242 cleanup(struct hast_resource *res) 243 { 244 int rerrno; 245 246 /* Remember errno. */ 247 rerrno = errno; 248 249 /* Destroy ggate provider if we created one. */ 250 if (res->hr_ggateunit >= 0) { 251 struct g_gate_ctl_destroy ggiod; 252 253 bzero(&ggiod, sizeof(ggiod)); 254 ggiod.gctl_version = G_GATE_VERSION; 255 ggiod.gctl_unit = res->hr_ggateunit; 256 ggiod.gctl_force = 1; 257 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 258 pjdlog_errno(LOG_WARNING, 259 "Unable to destroy hast/%s device", 260 res->hr_provname); 261 } 262 res->hr_ggateunit = -1; 263 } 264 265 /* Restore errno. */ 266 errno = rerrno; 267 } 268 269 static __dead2 void 270 primary_exit(int exitcode, const char *fmt, ...) 271 { 272 va_list ap; 273 274 PJDLOG_ASSERT(exitcode != EX_OK); 275 va_start(ap, fmt); 276 pjdlogv_errno(LOG_ERR, fmt, ap); 277 va_end(ap); 278 cleanup(gres); 279 exit(exitcode); 280 } 281 282 static __dead2 void 283 primary_exitx(int exitcode, const char *fmt, ...) 284 { 285 va_list ap; 286 287 va_start(ap, fmt); 288 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 289 va_end(ap); 290 cleanup(gres); 291 exit(exitcode); 292 } 293 294 static int 295 hast_activemap_flush(struct hast_resource *res) 296 { 297 const unsigned char *buf; 298 size_t size; 299 300 buf = activemap_bitmap(res->hr_amp, &size); 301 PJDLOG_ASSERT(buf != NULL); 302 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 303 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 304 (ssize_t)size) { 305 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 306 res->hr_stat_activemap_write_error++; 307 return (-1); 308 } 309 if (res->hr_metaflush == 1 && g_flush(res->hr_localfd) == -1) { 310 if (errno == EOPNOTSUPP) { 311 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 312 res->hr_localpath); 313 res->hr_metaflush = 0; 314 } else { 315 pjdlog_errno(LOG_ERR, 316 "Unable to flush disk cache on activemap update"); 317 res->hr_stat_activemap_flush_error++; 318 return (-1); 319 } 320 } 321 return (0); 322 } 323 324 static bool 325 real_remote(const struct hast_resource *res) 326 { 327 328 return (strcmp(res->hr_remoteaddr, "none") != 0); 329 } 330 331 static void 332 init_environment(struct hast_resource *res __unused) 333 { 334 struct hio *hio; 335 unsigned int ii, ncomps; 336 337 /* 338 * In the future it might be per-resource value. 339 */ 340 ncomps = HAST_NCOMPONENTS; 341 342 /* 343 * Allocate memory needed by lists. 344 */ 345 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 346 if (hio_send_list == NULL) { 347 primary_exitx(EX_TEMPFAIL, 348 "Unable to allocate %zu bytes of memory for send lists.", 349 sizeof(hio_send_list[0]) * ncomps); 350 } 351 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 352 if (hio_send_list_lock == NULL) { 353 primary_exitx(EX_TEMPFAIL, 354 "Unable to allocate %zu bytes of memory for send list locks.", 355 sizeof(hio_send_list_lock[0]) * ncomps); 356 } 357 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 358 if (hio_send_list_cond == NULL) { 359 primary_exitx(EX_TEMPFAIL, 360 "Unable to allocate %zu bytes of memory for send list condition variables.", 361 sizeof(hio_send_list_cond[0]) * ncomps); 362 } 363 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 364 if (hio_recv_list == NULL) { 365 primary_exitx(EX_TEMPFAIL, 366 "Unable to allocate %zu bytes of memory for recv lists.", 367 sizeof(hio_recv_list[0]) * ncomps); 368 } 369 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 370 if (hio_recv_list_lock == NULL) { 371 primary_exitx(EX_TEMPFAIL, 372 "Unable to allocate %zu bytes of memory for recv list locks.", 373 sizeof(hio_recv_list_lock[0]) * ncomps); 374 } 375 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 376 if (hio_recv_list_cond == NULL) { 377 primary_exitx(EX_TEMPFAIL, 378 "Unable to allocate %zu bytes of memory for recv list condition variables.", 379 sizeof(hio_recv_list_cond[0]) * ncomps); 380 } 381 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 382 if (hio_remote_lock == NULL) { 383 primary_exitx(EX_TEMPFAIL, 384 "Unable to allocate %zu bytes of memory for remote connections locks.", 385 sizeof(hio_remote_lock[0]) * ncomps); 386 } 387 388 /* 389 * Initialize lists, their locks and theirs condition variables. 390 */ 391 TAILQ_INIT(&hio_free_list); 392 mtx_init(&hio_free_list_lock); 393 cv_init(&hio_free_list_cond); 394 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 395 TAILQ_INIT(&hio_send_list[ii]); 396 mtx_init(&hio_send_list_lock[ii]); 397 cv_init(&hio_send_list_cond[ii]); 398 TAILQ_INIT(&hio_recv_list[ii]); 399 mtx_init(&hio_recv_list_lock[ii]); 400 cv_init(&hio_recv_list_cond[ii]); 401 rw_init(&hio_remote_lock[ii]); 402 } 403 TAILQ_INIT(&hio_done_list); 404 mtx_init(&hio_done_list_lock); 405 cv_init(&hio_done_list_cond); 406 mtx_init(&metadata_lock); 407 408 /* 409 * Allocate requests pool and initialize requests. 410 */ 411 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 412 hio = malloc(sizeof(*hio)); 413 if (hio == NULL) { 414 primary_exitx(EX_TEMPFAIL, 415 "Unable to allocate %zu bytes of memory for hio request.", 416 sizeof(*hio)); 417 } 418 hio->hio_countdown = 0; 419 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 420 if (hio->hio_errors == NULL) { 421 primary_exitx(EX_TEMPFAIL, 422 "Unable allocate %zu bytes of memory for hio errors.", 423 sizeof(hio->hio_errors[0]) * ncomps); 424 } 425 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 426 if (hio->hio_next == NULL) { 427 primary_exitx(EX_TEMPFAIL, 428 "Unable allocate %zu bytes of memory for hio_next field.", 429 sizeof(hio->hio_next[0]) * ncomps); 430 } 431 hio->hio_ggio.gctl_version = G_GATE_VERSION; 432 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 433 if (hio->hio_ggio.gctl_data == NULL) { 434 primary_exitx(EX_TEMPFAIL, 435 "Unable to allocate %zu bytes of memory for gctl_data.", 436 MAXPHYS); 437 } 438 hio->hio_ggio.gctl_length = MAXPHYS; 439 hio->hio_ggio.gctl_error = 0; 440 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 441 } 442 } 443 444 static bool 445 init_resuid(struct hast_resource *res) 446 { 447 448 mtx_lock(&metadata_lock); 449 if (res->hr_resuid != 0) { 450 mtx_unlock(&metadata_lock); 451 return (false); 452 } else { 453 /* Initialize unique resource identifier. */ 454 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 455 mtx_unlock(&metadata_lock); 456 if (metadata_write(res) == -1) 457 exit(EX_NOINPUT); 458 return (true); 459 } 460 } 461 462 static void 463 init_local(struct hast_resource *res) 464 { 465 unsigned char *buf; 466 size_t mapsize; 467 468 if (metadata_read(res, true) == -1) 469 exit(EX_NOINPUT); 470 mtx_init(&res->hr_amp_lock); 471 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 472 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 473 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 474 } 475 mtx_init(&range_lock); 476 cv_init(&range_regular_cond); 477 if (rangelock_init(&range_regular) == -1) 478 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 479 cv_init(&range_sync_cond); 480 if (rangelock_init(&range_sync) == -1) 481 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 482 mapsize = activemap_ondisk_size(res->hr_amp); 483 buf = calloc(1, mapsize); 484 if (buf == NULL) { 485 primary_exitx(EX_TEMPFAIL, 486 "Unable to allocate buffer for activemap."); 487 } 488 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 489 (ssize_t)mapsize) { 490 primary_exit(EX_NOINPUT, "Unable to read activemap"); 491 } 492 activemap_copyin(res->hr_amp, buf, mapsize); 493 free(buf); 494 if (res->hr_resuid != 0) 495 return; 496 /* 497 * We're using provider for the first time. Initialize local and remote 498 * counters. We don't initialize resuid here, as we want to do it just 499 * in time. The reason for this is that we want to inform secondary 500 * that there were no writes yet, so there is no need to synchronize 501 * anything. 502 */ 503 res->hr_primary_localcnt = 0; 504 res->hr_primary_remotecnt = 0; 505 if (metadata_write(res) == -1) 506 exit(EX_NOINPUT); 507 } 508 509 static int 510 primary_connect(struct hast_resource *res, struct proto_conn **connp) 511 { 512 struct proto_conn *conn; 513 int16_t val; 514 515 val = 1; 516 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 517 primary_exit(EX_TEMPFAIL, 518 "Unable to send connection request to parent"); 519 } 520 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 521 primary_exit(EX_TEMPFAIL, 522 "Unable to receive reply to connection request from parent"); 523 } 524 if (val != 0) { 525 errno = val; 526 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 527 res->hr_remoteaddr); 528 return (-1); 529 } 530 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 531 primary_exit(EX_TEMPFAIL, 532 "Unable to receive connection from parent"); 533 } 534 if (proto_connect_wait(conn, res->hr_timeout) == -1) { 535 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 536 res->hr_remoteaddr); 537 proto_close(conn); 538 return (-1); 539 } 540 /* Error in setting timeout is not critical, but why should it fail? */ 541 if (proto_timeout(conn, res->hr_timeout) == -1) 542 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 543 544 *connp = conn; 545 546 return (0); 547 } 548 549 /* 550 * Function instructs GEOM_GATE to handle reads directly from within the kernel. 551 */ 552 static void 553 enable_direct_reads(struct hast_resource *res) 554 { 555 struct g_gate_ctl_modify ggiomodify; 556 557 bzero(&ggiomodify, sizeof(ggiomodify)); 558 ggiomodify.gctl_version = G_GATE_VERSION; 559 ggiomodify.gctl_unit = res->hr_ggateunit; 560 ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; 561 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, 562 sizeof(ggiomodify.gctl_readprov)); 563 ggiomodify.gctl_readoffset = res->hr_localoff; 564 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) 565 pjdlog_debug(1, "Direct reads enabled."); 566 else 567 pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); 568 } 569 570 static int 571 init_remote(struct hast_resource *res, struct proto_conn **inp, 572 struct proto_conn **outp) 573 { 574 struct proto_conn *in, *out; 575 struct nv *nvout, *nvin; 576 const unsigned char *token; 577 unsigned char *map; 578 const char *errmsg; 579 int32_t extentsize; 580 int64_t datasize; 581 uint32_t mapsize; 582 uint8_t version; 583 size_t size; 584 int error; 585 586 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 587 PJDLOG_ASSERT(real_remote(res)); 588 589 in = out = NULL; 590 errmsg = NULL; 591 592 if (primary_connect(res, &out) == -1) 593 return (ECONNREFUSED); 594 595 error = ECONNABORTED; 596 597 /* 598 * First handshake step. 599 * Setup outgoing connection with remote node. 600 */ 601 nvout = nv_alloc(); 602 nv_add_string(nvout, res->hr_name, "resource"); 603 nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); 604 if (nv_error(nvout) != 0) { 605 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 606 "Unable to allocate header for connection with %s", 607 res->hr_remoteaddr); 608 nv_free(nvout); 609 goto close; 610 } 611 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 612 pjdlog_errno(LOG_WARNING, 613 "Unable to send handshake header to %s", 614 res->hr_remoteaddr); 615 nv_free(nvout); 616 goto close; 617 } 618 nv_free(nvout); 619 if (hast_proto_recv_hdr(out, &nvin) == -1) { 620 pjdlog_errno(LOG_WARNING, 621 "Unable to receive handshake header from %s", 622 res->hr_remoteaddr); 623 goto close; 624 } 625 errmsg = nv_get_string(nvin, "errmsg"); 626 if (errmsg != NULL) { 627 pjdlog_warning("%s", errmsg); 628 if (nv_exists(nvin, "wait")) 629 error = EBUSY; 630 nv_free(nvin); 631 goto close; 632 } 633 version = nv_get_uint8(nvin, "version"); 634 if (version == 0) { 635 /* 636 * If no version is sent, it means this is protocol version 1. 637 */ 638 version = 1; 639 } 640 if (version > HAST_PROTO_VERSION) { 641 pjdlog_warning("Invalid version received (%hhu).", version); 642 nv_free(nvin); 643 goto close; 644 } 645 res->hr_version = version; 646 pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); 647 token = nv_get_uint8_array(nvin, &size, "token"); 648 if (token == NULL) { 649 pjdlog_warning("Handshake header from %s has no 'token' field.", 650 res->hr_remoteaddr); 651 nv_free(nvin); 652 goto close; 653 } 654 if (size != sizeof(res->hr_token)) { 655 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 656 res->hr_remoteaddr, size, sizeof(res->hr_token)); 657 nv_free(nvin); 658 goto close; 659 } 660 bcopy(token, res->hr_token, sizeof(res->hr_token)); 661 nv_free(nvin); 662 663 /* 664 * Second handshake step. 665 * Setup incoming connection with remote node. 666 */ 667 if (primary_connect(res, &in) == -1) 668 goto close; 669 670 nvout = nv_alloc(); 671 nv_add_string(nvout, res->hr_name, "resource"); 672 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 673 "token"); 674 if (res->hr_resuid == 0) { 675 /* 676 * The resuid field was not yet initialized. 677 * Because we do synchronization inside init_resuid(), it is 678 * possible that someone already initialized it, the function 679 * will return false then, but if we successfully initialized 680 * it, we will get true. True means that there were no writes 681 * to this resource yet and we want to inform secondary that 682 * synchronization is not needed by sending "virgin" argument. 683 */ 684 if (init_resuid(res)) 685 nv_add_int8(nvout, 1, "virgin"); 686 } 687 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 688 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 689 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 690 if (nv_error(nvout) != 0) { 691 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 692 "Unable to allocate header for connection with %s", 693 res->hr_remoteaddr); 694 nv_free(nvout); 695 goto close; 696 } 697 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 698 pjdlog_errno(LOG_WARNING, 699 "Unable to send handshake header to %s", 700 res->hr_remoteaddr); 701 nv_free(nvout); 702 goto close; 703 } 704 nv_free(nvout); 705 if (hast_proto_recv_hdr(out, &nvin) == -1) { 706 pjdlog_errno(LOG_WARNING, 707 "Unable to receive handshake header from %s", 708 res->hr_remoteaddr); 709 goto close; 710 } 711 errmsg = nv_get_string(nvin, "errmsg"); 712 if (errmsg != NULL) { 713 pjdlog_warning("%s", errmsg); 714 nv_free(nvin); 715 goto close; 716 } 717 datasize = nv_get_int64(nvin, "datasize"); 718 if (datasize != res->hr_datasize) { 719 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 720 (intmax_t)res->hr_datasize, (intmax_t)datasize); 721 nv_free(nvin); 722 goto close; 723 } 724 extentsize = nv_get_int32(nvin, "extentsize"); 725 if (extentsize != res->hr_extentsize) { 726 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 727 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 728 nv_free(nvin); 729 goto close; 730 } 731 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 732 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 733 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 734 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) 735 enable_direct_reads(res); 736 if (nv_exists(nvin, "virgin")) { 737 /* 738 * Secondary was reinitialized, bump localcnt if it is 0 as 739 * only we have the data. 740 */ 741 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 742 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 743 744 if (res->hr_primary_localcnt == 0) { 745 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 746 747 mtx_lock(&metadata_lock); 748 res->hr_primary_localcnt++; 749 pjdlog_debug(1, "Increasing localcnt to %ju.", 750 (uintmax_t)res->hr_primary_localcnt); 751 (void)metadata_write(res); 752 mtx_unlock(&metadata_lock); 753 } 754 } 755 map = NULL; 756 mapsize = nv_get_uint32(nvin, "mapsize"); 757 if (mapsize > 0) { 758 map = malloc(mapsize); 759 if (map == NULL) { 760 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 761 (uintmax_t)mapsize); 762 nv_free(nvin); 763 goto close; 764 } 765 /* 766 * Remote node have some dirty extents on its own, lets 767 * download its activemap. 768 */ 769 if (hast_proto_recv_data(res, out, nvin, map, 770 mapsize) == -1) { 771 pjdlog_errno(LOG_ERR, 772 "Unable to receive remote activemap"); 773 nv_free(nvin); 774 free(map); 775 goto close; 776 } 777 /* 778 * Merge local and remote bitmaps. 779 */ 780 activemap_merge(res->hr_amp, map, mapsize); 781 free(map); 782 /* 783 * Now that we merged bitmaps from both nodes, flush it to the 784 * disk before we start to synchronize. 785 */ 786 (void)hast_activemap_flush(res); 787 } 788 nv_free(nvin); 789 #ifdef notyet 790 /* Setup directions. */ 791 if (proto_send(out, NULL, 0) == -1) 792 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 793 if (proto_recv(in, NULL, 0) == -1) 794 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 795 #endif 796 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 797 if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && 798 res->hr_version < 2) { 799 pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); 800 res->hr_replication = HAST_REPLICATION_FULLSYNC; 801 } else if (res->hr_replication != res->hr_original_replication) { 802 /* 803 * This is in case hastd disconnected and was upgraded. 804 */ 805 res->hr_replication = res->hr_original_replication; 806 } 807 if (inp != NULL && outp != NULL) { 808 *inp = in; 809 *outp = out; 810 } else { 811 res->hr_remotein = in; 812 res->hr_remoteout = out; 813 } 814 event_send(res, EVENT_CONNECT); 815 return (0); 816 close: 817 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 818 event_send(res, EVENT_SPLITBRAIN); 819 proto_close(out); 820 if (in != NULL) 821 proto_close(in); 822 return (error); 823 } 824 825 static void 826 sync_start(void) 827 { 828 829 mtx_lock(&sync_lock); 830 sync_inprogress = true; 831 mtx_unlock(&sync_lock); 832 cv_signal(&sync_cond); 833 } 834 835 static void 836 sync_stop(void) 837 { 838 839 mtx_lock(&sync_lock); 840 if (sync_inprogress) 841 sync_inprogress = false; 842 mtx_unlock(&sync_lock); 843 } 844 845 static void 846 init_ggate(struct hast_resource *res) 847 { 848 struct g_gate_ctl_create ggiocreate; 849 struct g_gate_ctl_cancel ggiocancel; 850 851 /* 852 * We communicate with ggate via /dev/ggctl. Open it. 853 */ 854 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 855 if (res->hr_ggatefd == -1) 856 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 857 /* 858 * Create provider before trying to connect, as connection failure 859 * is not critical, but may take some time. 860 */ 861 bzero(&ggiocreate, sizeof(ggiocreate)); 862 ggiocreate.gctl_version = G_GATE_VERSION; 863 ggiocreate.gctl_mediasize = res->hr_datasize; 864 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 865 ggiocreate.gctl_flags = 0; 866 ggiocreate.gctl_maxcount = 0; 867 ggiocreate.gctl_timeout = 0; 868 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 869 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 870 res->hr_provname); 871 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 872 pjdlog_info("Device hast/%s created.", res->hr_provname); 873 res->hr_ggateunit = ggiocreate.gctl_unit; 874 return; 875 } 876 if (errno != EEXIST) { 877 primary_exit(EX_OSERR, "Unable to create hast/%s device", 878 res->hr_provname); 879 } 880 pjdlog_debug(1, 881 "Device hast/%s already exists, we will try to take it over.", 882 res->hr_provname); 883 /* 884 * If we received EEXIST, we assume that the process who created the 885 * provider died and didn't clean up. In that case we will start from 886 * where he left of. 887 */ 888 bzero(&ggiocancel, sizeof(ggiocancel)); 889 ggiocancel.gctl_version = G_GATE_VERSION; 890 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 891 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 892 res->hr_provname); 893 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 894 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 895 res->hr_ggateunit = ggiocancel.gctl_unit; 896 return; 897 } 898 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 899 res->hr_provname); 900 } 901 902 void 903 hastd_primary(struct hast_resource *res) 904 { 905 pthread_t td; 906 pid_t pid; 907 int error, mode, debuglevel; 908 909 /* 910 * Create communication channel for sending control commands from 911 * parent to child. 912 */ 913 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 914 /* TODO: There's no need for this to be fatal error. */ 915 KEEP_ERRNO((void)pidfile_remove(pfh)); 916 pjdlog_exit(EX_OSERR, 917 "Unable to create control sockets between parent and child"); 918 } 919 /* 920 * Create communication channel for sending events from child to parent. 921 */ 922 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 923 /* TODO: There's no need for this to be fatal error. */ 924 KEEP_ERRNO((void)pidfile_remove(pfh)); 925 pjdlog_exit(EX_OSERR, 926 "Unable to create event sockets between child and parent"); 927 } 928 /* 929 * Create communication channel for sending connection requests from 930 * child to parent. 931 */ 932 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 933 /* TODO: There's no need for this to be fatal error. */ 934 KEEP_ERRNO((void)pidfile_remove(pfh)); 935 pjdlog_exit(EX_OSERR, 936 "Unable to create connection sockets between child and parent"); 937 } 938 939 pid = fork(); 940 if (pid == -1) { 941 /* TODO: There's no need for this to be fatal error. */ 942 KEEP_ERRNO((void)pidfile_remove(pfh)); 943 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 944 } 945 946 if (pid > 0) { 947 /* This is parent. */ 948 /* Declare that we are receiver. */ 949 proto_recv(res->hr_event, NULL, 0); 950 proto_recv(res->hr_conn, NULL, 0); 951 /* Declare that we are sender. */ 952 proto_send(res->hr_ctrl, NULL, 0); 953 res->hr_workerpid = pid; 954 return; 955 } 956 957 gres = res; 958 mode = pjdlog_mode_get(); 959 debuglevel = pjdlog_debug_get(); 960 961 /* Declare that we are sender. */ 962 proto_send(res->hr_event, NULL, 0); 963 proto_send(res->hr_conn, NULL, 0); 964 /* Declare that we are receiver. */ 965 proto_recv(res->hr_ctrl, NULL, 0); 966 descriptors_cleanup(res); 967 968 descriptors_assert(res, mode); 969 970 pjdlog_init(mode); 971 pjdlog_debug_set(debuglevel); 972 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 973 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 974 975 init_local(res); 976 init_ggate(res); 977 init_environment(res); 978 979 if (drop_privs(res) != 0) { 980 cleanup(res); 981 exit(EX_CONFIG); 982 } 983 pjdlog_info("Privileges successfully dropped."); 984 985 /* 986 * Create the guard thread first, so we can handle signals from the 987 * very beginning. 988 */ 989 error = pthread_create(&td, NULL, guard_thread, res); 990 PJDLOG_ASSERT(error == 0); 991 /* 992 * Create the control thread before sending any event to the parent, 993 * as we can deadlock when parent sends control request to worker, 994 * but worker has no control thread started yet, so parent waits. 995 * In the meantime worker sends an event to the parent, but parent 996 * is unable to handle the event, because it waits for control 997 * request response. 998 */ 999 error = pthread_create(&td, NULL, ctrl_thread, res); 1000 PJDLOG_ASSERT(error == 0); 1001 if (real_remote(res)) { 1002 error = init_remote(res, NULL, NULL); 1003 if (error == 0) { 1004 sync_start(); 1005 } else if (error == EBUSY) { 1006 time_t start = time(NULL); 1007 1008 pjdlog_warning("Waiting for remote node to become %s for %ds.", 1009 role2str(HAST_ROLE_SECONDARY), 1010 res->hr_timeout); 1011 for (;;) { 1012 sleep(1); 1013 error = init_remote(res, NULL, NULL); 1014 if (error != EBUSY) 1015 break; 1016 if (time(NULL) > start + res->hr_timeout) 1017 break; 1018 } 1019 if (error == EBUSY) { 1020 pjdlog_warning("Remote node is still %s, starting anyway.", 1021 role2str(HAST_ROLE_PRIMARY)); 1022 } 1023 } 1024 } 1025 error = pthread_create(&td, NULL, ggate_recv_thread, res); 1026 PJDLOG_ASSERT(error == 0); 1027 error = pthread_create(&td, NULL, local_send_thread, res); 1028 PJDLOG_ASSERT(error == 0); 1029 error = pthread_create(&td, NULL, remote_send_thread, res); 1030 PJDLOG_ASSERT(error == 0); 1031 error = pthread_create(&td, NULL, remote_recv_thread, res); 1032 PJDLOG_ASSERT(error == 0); 1033 error = pthread_create(&td, NULL, ggate_send_thread, res); 1034 PJDLOG_ASSERT(error == 0); 1035 fullystarted = true; 1036 (void)sync_thread(res); 1037 } 1038 1039 static void 1040 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, 1041 const char *fmt, ...) 1042 { 1043 char msg[1024]; 1044 va_list ap; 1045 1046 va_start(ap, fmt); 1047 (void)vsnprintf(msg, sizeof(msg), fmt, ap); 1048 va_end(ap); 1049 switch (ggio->gctl_cmd) { 1050 case BIO_READ: 1051 (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1052 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1053 break; 1054 case BIO_DELETE: 1055 (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1056 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1057 break; 1058 case BIO_FLUSH: 1059 (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1060 break; 1061 case BIO_WRITE: 1062 (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1063 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1064 break; 1065 default: 1066 (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1067 (unsigned int)ggio->gctl_cmd); 1068 break; 1069 } 1070 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1071 } 1072 1073 static void 1074 remote_close(struct hast_resource *res, int ncomp) 1075 { 1076 1077 rw_wlock(&hio_remote_lock[ncomp]); 1078 /* 1079 * Check for a race between dropping rlock and acquiring wlock - 1080 * another thread can close connection in-between. 1081 */ 1082 if (!ISCONNECTED(res, ncomp)) { 1083 PJDLOG_ASSERT(res->hr_remotein == NULL); 1084 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1085 rw_unlock(&hio_remote_lock[ncomp]); 1086 return; 1087 } 1088 1089 PJDLOG_ASSERT(res->hr_remotein != NULL); 1090 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1091 1092 pjdlog_debug(2, "Closing incoming connection to %s.", 1093 res->hr_remoteaddr); 1094 proto_close(res->hr_remotein); 1095 res->hr_remotein = NULL; 1096 pjdlog_debug(2, "Closing outgoing connection to %s.", 1097 res->hr_remoteaddr); 1098 proto_close(res->hr_remoteout); 1099 res->hr_remoteout = NULL; 1100 1101 rw_unlock(&hio_remote_lock[ncomp]); 1102 1103 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1104 1105 /* 1106 * Stop synchronization if in-progress. 1107 */ 1108 sync_stop(); 1109 1110 event_send(res, EVENT_DISCONNECT); 1111 } 1112 1113 /* 1114 * Acknowledge write completion to the kernel, but don't update activemap yet. 1115 */ 1116 static void 1117 write_complete(struct hast_resource *res, struct hio *hio) 1118 { 1119 struct g_gate_ctl_io *ggio; 1120 unsigned int ncomp; 1121 1122 PJDLOG_ASSERT(!hio->hio_done); 1123 1124 ggio = &hio->hio_ggio; 1125 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1126 1127 /* 1128 * Bump local count if this is first write after 1129 * connection failure with remote node. 1130 */ 1131 ncomp = 1; 1132 rw_rlock(&hio_remote_lock[ncomp]); 1133 if (!ISCONNECTED(res, ncomp)) { 1134 mtx_lock(&metadata_lock); 1135 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1136 res->hr_primary_localcnt++; 1137 pjdlog_debug(1, "Increasing localcnt to %ju.", 1138 (uintmax_t)res->hr_primary_localcnt); 1139 (void)metadata_write(res); 1140 } 1141 mtx_unlock(&metadata_lock); 1142 } 1143 rw_unlock(&hio_remote_lock[ncomp]); 1144 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1145 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1146 hio->hio_done = true; 1147 } 1148 1149 /* 1150 * Thread receives ggate I/O requests from the kernel and passes them to 1151 * appropriate threads: 1152 * WRITE - always goes to both local_send and remote_send threads 1153 * READ (when the block is up-to-date on local component) - 1154 * only local_send thread 1155 * READ (when the block isn't up-to-date on local component) - 1156 * only remote_send thread 1157 * DELETE - always goes to both local_send and remote_send threads 1158 * FLUSH - always goes to both local_send and remote_send threads 1159 */ 1160 static void * 1161 ggate_recv_thread(void *arg) 1162 { 1163 struct hast_resource *res = arg; 1164 struct g_gate_ctl_io *ggio; 1165 struct hio *hio; 1166 unsigned int ii, ncomp, ncomps; 1167 int error; 1168 1169 for (;;) { 1170 pjdlog_debug(2, "ggate_recv: Taking free request."); 1171 QUEUE_TAKE2(hio, free); 1172 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1173 ggio = &hio->hio_ggio; 1174 ggio->gctl_unit = res->hr_ggateunit; 1175 ggio->gctl_length = MAXPHYS; 1176 ggio->gctl_error = 0; 1177 hio->hio_done = false; 1178 hio->hio_replication = res->hr_replication; 1179 pjdlog_debug(2, 1180 "ggate_recv: (%p) Waiting for request from the kernel.", 1181 hio); 1182 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1183 if (sigexit_received) 1184 pthread_exit(NULL); 1185 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1186 } 1187 error = ggio->gctl_error; 1188 switch (error) { 1189 case 0: 1190 break; 1191 case ECANCELED: 1192 /* Exit gracefully. */ 1193 if (!sigexit_received) { 1194 pjdlog_debug(2, 1195 "ggate_recv: (%p) Received cancel from the kernel.", 1196 hio); 1197 pjdlog_info("Received cancel from the kernel, exiting."); 1198 } 1199 pthread_exit(NULL); 1200 case ENOMEM: 1201 /* 1202 * Buffer too small? Impossible, we allocate MAXPHYS 1203 * bytes - request can't be bigger than that. 1204 */ 1205 /* FALLTHROUGH */ 1206 case ENXIO: 1207 default: 1208 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1209 strerror(error)); 1210 } 1211 1212 ncomp = 0; 1213 ncomps = HAST_NCOMPONENTS; 1214 1215 for (ii = 0; ii < ncomps; ii++) 1216 hio->hio_errors[ii] = EINVAL; 1217 reqlog(LOG_DEBUG, 2, ggio, 1218 "ggate_recv: (%p) Request received from the kernel: ", 1219 hio); 1220 1221 /* 1222 * Inform all components about new write request. 1223 * For read request prefer local component unless the given 1224 * range is out-of-date, then use remote component. 1225 */ 1226 switch (ggio->gctl_cmd) { 1227 case BIO_READ: 1228 res->hr_stat_read++; 1229 ncomps = 1; 1230 mtx_lock(&metadata_lock); 1231 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1232 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1233 /* 1234 * This range is up-to-date on local component, 1235 * so handle request locally. 1236 */ 1237 /* Local component is 0 for now. */ 1238 ncomp = 0; 1239 } else /* if (res->hr_syncsrc == 1240 HAST_SYNCSRC_SECONDARY) */ { 1241 PJDLOG_ASSERT(res->hr_syncsrc == 1242 HAST_SYNCSRC_SECONDARY); 1243 /* 1244 * This range is out-of-date on local component, 1245 * so send request to the remote node. 1246 */ 1247 /* Remote component is 1 for now. */ 1248 ncomp = 1; 1249 } 1250 mtx_unlock(&metadata_lock); 1251 break; 1252 case BIO_WRITE: 1253 res->hr_stat_write++; 1254 if (res->hr_resuid == 0 && 1255 res->hr_primary_localcnt == 0) { 1256 /* This is first write. */ 1257 res->hr_primary_localcnt = 1; 1258 } 1259 for (;;) { 1260 mtx_lock(&range_lock); 1261 if (rangelock_islocked(range_sync, 1262 ggio->gctl_offset, ggio->gctl_length)) { 1263 pjdlog_debug(2, 1264 "regular: Range offset=%jd length=%zu locked.", 1265 (intmax_t)ggio->gctl_offset, 1266 (size_t)ggio->gctl_length); 1267 range_regular_wait = true; 1268 cv_wait(&range_regular_cond, &range_lock); 1269 range_regular_wait = false; 1270 mtx_unlock(&range_lock); 1271 continue; 1272 } 1273 if (rangelock_add(range_regular, 1274 ggio->gctl_offset, ggio->gctl_length) == -1) { 1275 mtx_unlock(&range_lock); 1276 pjdlog_debug(2, 1277 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1278 (intmax_t)ggio->gctl_offset, 1279 (size_t)ggio->gctl_length); 1280 sleep(1); 1281 continue; 1282 } 1283 mtx_unlock(&range_lock); 1284 break; 1285 } 1286 mtx_lock(&res->hr_amp_lock); 1287 if (activemap_write_start(res->hr_amp, 1288 ggio->gctl_offset, ggio->gctl_length)) { 1289 res->hr_stat_activemap_update++; 1290 (void)hast_activemap_flush(res); 1291 } 1292 mtx_unlock(&res->hr_amp_lock); 1293 break; 1294 case BIO_DELETE: 1295 res->hr_stat_delete++; 1296 break; 1297 case BIO_FLUSH: 1298 res->hr_stat_flush++; 1299 break; 1300 } 1301 pjdlog_debug(2, 1302 "ggate_recv: (%p) Moving request to the send queues.", hio); 1303 hio->hio_countdown = ncomps; 1304 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1305 ggio->gctl_cmd == BIO_WRITE) { 1306 /* Each remote request needs two responses in memsync. */ 1307 hio->hio_countdown++; 1308 } 1309 for (ii = ncomp; ii < ncomps; ii++) 1310 QUEUE_INSERT1(hio, send, ii); 1311 } 1312 /* NOTREACHED */ 1313 return (NULL); 1314 } 1315 1316 /* 1317 * Thread reads from or writes to local component. 1318 * If local read fails, it redirects it to remote_send thread. 1319 */ 1320 static void * 1321 local_send_thread(void *arg) 1322 { 1323 struct hast_resource *res = arg; 1324 struct g_gate_ctl_io *ggio; 1325 struct hio *hio; 1326 unsigned int ncomp, rncomp; 1327 ssize_t ret; 1328 1329 /* Local component is 0 for now. */ 1330 ncomp = 0; 1331 /* Remote component is 1 for now. */ 1332 rncomp = 1; 1333 1334 for (;;) { 1335 pjdlog_debug(2, "local_send: Taking request."); 1336 QUEUE_TAKE1(hio, send, ncomp, 0); 1337 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1338 ggio = &hio->hio_ggio; 1339 switch (ggio->gctl_cmd) { 1340 case BIO_READ: 1341 ret = pread(res->hr_localfd, ggio->gctl_data, 1342 ggio->gctl_length, 1343 ggio->gctl_offset + res->hr_localoff); 1344 if (ret == ggio->gctl_length) 1345 hio->hio_errors[ncomp] = 0; 1346 else if (!ISSYNCREQ(hio)) { 1347 /* 1348 * If READ failed, try to read from remote node. 1349 */ 1350 if (ret == -1) { 1351 reqlog(LOG_WARNING, 0, ggio, 1352 "Local request failed (%s), trying remote node. ", 1353 strerror(errno)); 1354 } else if (ret != ggio->gctl_length) { 1355 reqlog(LOG_WARNING, 0, ggio, 1356 "Local request failed (%zd != %jd), trying remote node. ", 1357 ret, (intmax_t)ggio->gctl_length); 1358 } 1359 QUEUE_INSERT1(hio, send, rncomp); 1360 continue; 1361 } 1362 break; 1363 case BIO_WRITE: 1364 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1365 ggio->gctl_length, 1366 ggio->gctl_offset + res->hr_localoff); 1367 if (ret == -1) { 1368 hio->hio_errors[ncomp] = errno; 1369 reqlog(LOG_WARNING, 0, ggio, 1370 "Local request failed (%s): ", 1371 strerror(errno)); 1372 } else if (ret != ggio->gctl_length) { 1373 hio->hio_errors[ncomp] = EIO; 1374 reqlog(LOG_WARNING, 0, ggio, 1375 "Local request failed (%zd != %jd): ", 1376 ret, (intmax_t)ggio->gctl_length); 1377 } else { 1378 hio->hio_errors[ncomp] = 0; 1379 if (hio->hio_replication == 1380 HAST_REPLICATION_ASYNC) { 1381 ggio->gctl_error = 0; 1382 write_complete(res, hio); 1383 } 1384 } 1385 break; 1386 case BIO_DELETE: 1387 ret = g_delete(res->hr_localfd, 1388 ggio->gctl_offset + res->hr_localoff, 1389 ggio->gctl_length); 1390 if (ret == -1) { 1391 hio->hio_errors[ncomp] = errno; 1392 reqlog(LOG_WARNING, 0, ggio, 1393 "Local request failed (%s): ", 1394 strerror(errno)); 1395 } else { 1396 hio->hio_errors[ncomp] = 0; 1397 } 1398 break; 1399 case BIO_FLUSH: 1400 if (!res->hr_localflush) { 1401 ret = -1; 1402 errno = EOPNOTSUPP; 1403 break; 1404 } 1405 ret = g_flush(res->hr_localfd); 1406 if (ret == -1) { 1407 if (errno == EOPNOTSUPP) 1408 res->hr_localflush = false; 1409 hio->hio_errors[ncomp] = errno; 1410 reqlog(LOG_WARNING, 0, ggio, 1411 "Local request failed (%s): ", 1412 strerror(errno)); 1413 } else { 1414 hio->hio_errors[ncomp] = 0; 1415 } 1416 break; 1417 } 1418 1419 if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1420 ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1421 if (refcnt_release(&hio->hio_countdown) > 0) 1422 continue; 1423 } else { 1424 /* 1425 * Depending on hio_countdown value, requests finished 1426 * in the following order: 1427 * 0: remote memsync, remote final, local write 1428 * 1: remote memsync, local write, (remote final) 1429 * 2: local write, (remote memsync), (remote final) 1430 */ 1431 switch (refcnt_release(&hio->hio_countdown)) { 1432 case 0: 1433 /* 1434 * Local write finished as last. 1435 */ 1436 break; 1437 case 1: 1438 /* 1439 * Local write finished after remote memsync 1440 * reply arrvied. We can complete the write now. 1441 */ 1442 if (hio->hio_errors[0] == 0) 1443 write_complete(res, hio); 1444 continue; 1445 case 2: 1446 /* 1447 * Local write finished as first. 1448 */ 1449 continue; 1450 default: 1451 PJDLOG_ABORT("Invalid hio_countdown."); 1452 } 1453 } 1454 if (ISSYNCREQ(hio)) { 1455 mtx_lock(&sync_lock); 1456 SYNCREQDONE(hio); 1457 mtx_unlock(&sync_lock); 1458 cv_signal(&sync_cond); 1459 } else { 1460 pjdlog_debug(2, 1461 "local_send: (%p) Moving request to the done queue.", 1462 hio); 1463 QUEUE_INSERT2(hio, done); 1464 } 1465 } 1466 /* NOTREACHED */ 1467 return (NULL); 1468 } 1469 1470 static void 1471 keepalive_send(struct hast_resource *res, unsigned int ncomp) 1472 { 1473 struct nv *nv; 1474 1475 rw_rlock(&hio_remote_lock[ncomp]); 1476 1477 if (!ISCONNECTED(res, ncomp)) { 1478 rw_unlock(&hio_remote_lock[ncomp]); 1479 return; 1480 } 1481 1482 PJDLOG_ASSERT(res->hr_remotein != NULL); 1483 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1484 1485 nv = nv_alloc(); 1486 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1487 if (nv_error(nv) != 0) { 1488 rw_unlock(&hio_remote_lock[ncomp]); 1489 nv_free(nv); 1490 pjdlog_debug(1, 1491 "keepalive_send: Unable to prepare header to send."); 1492 return; 1493 } 1494 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1495 rw_unlock(&hio_remote_lock[ncomp]); 1496 pjdlog_common(LOG_DEBUG, 1, errno, 1497 "keepalive_send: Unable to send request"); 1498 nv_free(nv); 1499 remote_close(res, ncomp); 1500 return; 1501 } 1502 1503 rw_unlock(&hio_remote_lock[ncomp]); 1504 nv_free(nv); 1505 pjdlog_debug(2, "keepalive_send: Request sent."); 1506 } 1507 1508 /* 1509 * Thread sends request to secondary node. 1510 */ 1511 static void * 1512 remote_send_thread(void *arg) 1513 { 1514 struct hast_resource *res = arg; 1515 struct g_gate_ctl_io *ggio; 1516 time_t lastcheck, now; 1517 struct hio *hio; 1518 struct nv *nv; 1519 unsigned int ncomp; 1520 bool wakeup; 1521 uint64_t offset, length; 1522 uint8_t cmd; 1523 void *data; 1524 1525 /* Remote component is 1 for now. */ 1526 ncomp = 1; 1527 lastcheck = time(NULL); 1528 1529 for (;;) { 1530 pjdlog_debug(2, "remote_send: Taking request."); 1531 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1532 if (hio == NULL) { 1533 now = time(NULL); 1534 if (lastcheck + HAST_KEEPALIVE <= now) { 1535 keepalive_send(res, ncomp); 1536 lastcheck = now; 1537 } 1538 continue; 1539 } 1540 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1541 ggio = &hio->hio_ggio; 1542 switch (ggio->gctl_cmd) { 1543 case BIO_READ: 1544 cmd = HIO_READ; 1545 data = NULL; 1546 offset = ggio->gctl_offset; 1547 length = ggio->gctl_length; 1548 break; 1549 case BIO_WRITE: 1550 cmd = HIO_WRITE; 1551 data = ggio->gctl_data; 1552 offset = ggio->gctl_offset; 1553 length = ggio->gctl_length; 1554 break; 1555 case BIO_DELETE: 1556 cmd = HIO_DELETE; 1557 data = NULL; 1558 offset = ggio->gctl_offset; 1559 length = ggio->gctl_length; 1560 break; 1561 case BIO_FLUSH: 1562 cmd = HIO_FLUSH; 1563 data = NULL; 1564 offset = 0; 1565 length = 0; 1566 break; 1567 default: 1568 PJDLOG_ABORT("invalid condition"); 1569 } 1570 nv = nv_alloc(); 1571 nv_add_uint8(nv, cmd, "cmd"); 1572 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1573 nv_add_uint64(nv, offset, "offset"); 1574 nv_add_uint64(nv, length, "length"); 1575 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1576 ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { 1577 nv_add_uint8(nv, 1, "memsync"); 1578 } 1579 if (nv_error(nv) != 0) { 1580 hio->hio_errors[ncomp] = nv_error(nv); 1581 pjdlog_debug(2, 1582 "remote_send: (%p) Unable to prepare header to send.", 1583 hio); 1584 reqlog(LOG_ERR, 0, ggio, 1585 "Unable to prepare header to send (%s): ", 1586 strerror(nv_error(nv))); 1587 /* Move failed request immediately to the done queue. */ 1588 goto done_queue; 1589 } 1590 /* 1591 * Protect connection from disappearing. 1592 */ 1593 rw_rlock(&hio_remote_lock[ncomp]); 1594 if (!ISCONNECTED(res, ncomp)) { 1595 rw_unlock(&hio_remote_lock[ncomp]); 1596 hio->hio_errors[ncomp] = ENOTCONN; 1597 goto done_queue; 1598 } 1599 /* 1600 * Move the request to recv queue before sending it, because 1601 * in different order we can get reply before we move request 1602 * to recv queue. 1603 */ 1604 pjdlog_debug(2, 1605 "remote_send: (%p) Moving request to the recv queue.", 1606 hio); 1607 mtx_lock(&hio_recv_list_lock[ncomp]); 1608 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1609 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1610 mtx_unlock(&hio_recv_list_lock[ncomp]); 1611 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1612 data != NULL ? length : 0) == -1) { 1613 hio->hio_errors[ncomp] = errno; 1614 rw_unlock(&hio_remote_lock[ncomp]); 1615 pjdlog_debug(2, 1616 "remote_send: (%p) Unable to send request.", hio); 1617 reqlog(LOG_ERR, 0, ggio, 1618 "Unable to send request (%s): ", 1619 strerror(hio->hio_errors[ncomp])); 1620 remote_close(res, ncomp); 1621 /* 1622 * Take request back from the receive queue and move 1623 * it immediately to the done queue. 1624 */ 1625 mtx_lock(&hio_recv_list_lock[ncomp]); 1626 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1627 hio_next[ncomp]); 1628 mtx_unlock(&hio_recv_list_lock[ncomp]); 1629 goto done_queue; 1630 } 1631 rw_unlock(&hio_remote_lock[ncomp]); 1632 nv_free(nv); 1633 if (wakeup) 1634 cv_signal(&hio_recv_list_cond[ncomp]); 1635 continue; 1636 done_queue: 1637 nv_free(nv); 1638 if (ISSYNCREQ(hio)) { 1639 if (refcnt_release(&hio->hio_countdown) > 0) 1640 continue; 1641 mtx_lock(&sync_lock); 1642 SYNCREQDONE(hio); 1643 mtx_unlock(&sync_lock); 1644 cv_signal(&sync_cond); 1645 continue; 1646 } 1647 if (ggio->gctl_cmd == BIO_WRITE) { 1648 mtx_lock(&res->hr_amp_lock); 1649 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1650 ggio->gctl_length)) { 1651 (void)hast_activemap_flush(res); 1652 } 1653 mtx_unlock(&res->hr_amp_lock); 1654 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) 1655 (void)refcnt_release(&hio->hio_countdown); 1656 } 1657 if (refcnt_release(&hio->hio_countdown) > 0) 1658 continue; 1659 pjdlog_debug(2, 1660 "remote_send: (%p) Moving request to the done queue.", 1661 hio); 1662 QUEUE_INSERT2(hio, done); 1663 } 1664 /* NOTREACHED */ 1665 return (NULL); 1666 } 1667 1668 /* 1669 * Thread receives answer from secondary node and passes it to ggate_send 1670 * thread. 1671 */ 1672 static void * 1673 remote_recv_thread(void *arg) 1674 { 1675 struct hast_resource *res = arg; 1676 struct g_gate_ctl_io *ggio; 1677 struct hio *hio; 1678 struct nv *nv; 1679 unsigned int ncomp; 1680 uint64_t seq; 1681 bool memsyncack; 1682 int error; 1683 1684 /* Remote component is 1 for now. */ 1685 ncomp = 1; 1686 1687 for (;;) { 1688 /* Wait until there is anything to receive. */ 1689 mtx_lock(&hio_recv_list_lock[ncomp]); 1690 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1691 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1692 cv_wait(&hio_recv_list_cond[ncomp], 1693 &hio_recv_list_lock[ncomp]); 1694 } 1695 mtx_unlock(&hio_recv_list_lock[ncomp]); 1696 1697 memsyncack = false; 1698 1699 rw_rlock(&hio_remote_lock[ncomp]); 1700 if (!ISCONNECTED(res, ncomp)) { 1701 rw_unlock(&hio_remote_lock[ncomp]); 1702 /* 1703 * Connection is dead, so move all pending requests to 1704 * the done queue (one-by-one). 1705 */ 1706 mtx_lock(&hio_recv_list_lock[ncomp]); 1707 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1708 PJDLOG_ASSERT(hio != NULL); 1709 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1710 hio_next[ncomp]); 1711 mtx_unlock(&hio_recv_list_lock[ncomp]); 1712 goto done_queue; 1713 } 1714 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1715 pjdlog_errno(LOG_ERR, 1716 "Unable to receive reply header"); 1717 rw_unlock(&hio_remote_lock[ncomp]); 1718 remote_close(res, ncomp); 1719 continue; 1720 } 1721 rw_unlock(&hio_remote_lock[ncomp]); 1722 seq = nv_get_uint64(nv, "seq"); 1723 if (seq == 0) { 1724 pjdlog_error("Header contains no 'seq' field."); 1725 nv_free(nv); 1726 continue; 1727 } 1728 memsyncack = nv_exists(nv, "received"); 1729 mtx_lock(&hio_recv_list_lock[ncomp]); 1730 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1731 if (hio->hio_ggio.gctl_seq == seq) { 1732 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1733 hio_next[ncomp]); 1734 break; 1735 } 1736 } 1737 mtx_unlock(&hio_recv_list_lock[ncomp]); 1738 if (hio == NULL) { 1739 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1740 (uintmax_t)seq); 1741 nv_free(nv); 1742 continue; 1743 } 1744 ggio = &hio->hio_ggio; 1745 error = nv_get_int16(nv, "error"); 1746 if (error != 0) { 1747 /* Request failed on remote side. */ 1748 hio->hio_errors[ncomp] = error; 1749 reqlog(LOG_WARNING, 0, ggio, 1750 "Remote request failed (%s): ", strerror(error)); 1751 nv_free(nv); 1752 goto done_queue; 1753 } 1754 switch (ggio->gctl_cmd) { 1755 case BIO_READ: 1756 rw_rlock(&hio_remote_lock[ncomp]); 1757 if (!ISCONNECTED(res, ncomp)) { 1758 rw_unlock(&hio_remote_lock[ncomp]); 1759 nv_free(nv); 1760 goto done_queue; 1761 } 1762 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1763 ggio->gctl_data, ggio->gctl_length) == -1) { 1764 hio->hio_errors[ncomp] = errno; 1765 pjdlog_errno(LOG_ERR, 1766 "Unable to receive reply data"); 1767 rw_unlock(&hio_remote_lock[ncomp]); 1768 nv_free(nv); 1769 remote_close(res, ncomp); 1770 goto done_queue; 1771 } 1772 rw_unlock(&hio_remote_lock[ncomp]); 1773 break; 1774 case BIO_WRITE: 1775 case BIO_DELETE: 1776 case BIO_FLUSH: 1777 break; 1778 default: 1779 PJDLOG_ABORT("invalid condition"); 1780 } 1781 hio->hio_errors[ncomp] = 0; 1782 nv_free(nv); 1783 done_queue: 1784 if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1785 hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1786 if (refcnt_release(&hio->hio_countdown) > 0) 1787 continue; 1788 } else { 1789 /* 1790 * Depending on hio_countdown value, requests finished 1791 * in the following order: 1792 * 1793 * 0: local write, remote memsync, remote final 1794 * or 1795 * 0: remote memsync, local write, remote final 1796 * 1797 * 1: local write, remote memsync, (remote final) 1798 * or 1799 * 1: remote memsync, remote final, (local write) 1800 * 1801 * 2: remote memsync, (local write), (remote final) 1802 * or 1803 * 2: remote memsync, (remote final), (local write) 1804 */ 1805 switch (refcnt_release(&hio->hio_countdown)) { 1806 case 0: 1807 /* 1808 * Remote final reply arrived. 1809 */ 1810 PJDLOG_ASSERT(!memsyncack); 1811 break; 1812 case 1: 1813 if (memsyncack) { 1814 /* 1815 * Local request already finished, so we 1816 * can complete the write. 1817 */ 1818 if (hio->hio_errors[0] == 0) 1819 write_complete(res, hio); 1820 /* 1821 * We still need to wait for final 1822 * remote reply. 1823 */ 1824 pjdlog_debug(2, 1825 "remote_recv: (%p) Moving request back to the recv queue.", 1826 hio); 1827 mtx_lock(&hio_recv_list_lock[ncomp]); 1828 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], 1829 hio, hio_next[ncomp]); 1830 mtx_unlock(&hio_recv_list_lock[ncomp]); 1831 } else { 1832 /* 1833 * Remote final reply arrived before 1834 * local write finished. 1835 * Nothing to do in such case. 1836 */ 1837 } 1838 continue; 1839 case 2: 1840 /* 1841 * We received remote memsync reply even before 1842 * local write finished. 1843 */ 1844 PJDLOG_ASSERT(memsyncack); 1845 1846 pjdlog_debug(2, 1847 "remote_recv: (%p) Moving request back to the recv queue.", 1848 hio); 1849 mtx_lock(&hio_recv_list_lock[ncomp]); 1850 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, 1851 hio_next[ncomp]); 1852 mtx_unlock(&hio_recv_list_lock[ncomp]); 1853 continue; 1854 default: 1855 PJDLOG_ABORT("Invalid hio_countdown."); 1856 } 1857 } 1858 if (ISSYNCREQ(hio)) { 1859 mtx_lock(&sync_lock); 1860 SYNCREQDONE(hio); 1861 mtx_unlock(&sync_lock); 1862 cv_signal(&sync_cond); 1863 } else { 1864 pjdlog_debug(2, 1865 "remote_recv: (%p) Moving request to the done queue.", 1866 hio); 1867 QUEUE_INSERT2(hio, done); 1868 } 1869 } 1870 /* NOTREACHED */ 1871 return (NULL); 1872 } 1873 1874 /* 1875 * Thread sends answer to the kernel. 1876 */ 1877 static void * 1878 ggate_send_thread(void *arg) 1879 { 1880 struct hast_resource *res = arg; 1881 struct g_gate_ctl_io *ggio; 1882 struct hio *hio; 1883 unsigned int ii, ncomps; 1884 1885 ncomps = HAST_NCOMPONENTS; 1886 1887 for (;;) { 1888 pjdlog_debug(2, "ggate_send: Taking request."); 1889 QUEUE_TAKE2(hio, done); 1890 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1891 ggio = &hio->hio_ggio; 1892 for (ii = 0; ii < ncomps; ii++) { 1893 if (hio->hio_errors[ii] == 0) { 1894 /* 1895 * One successful request is enough to declare 1896 * success. 1897 */ 1898 ggio->gctl_error = 0; 1899 break; 1900 } 1901 } 1902 if (ii == ncomps) { 1903 /* 1904 * None of the requests were successful. 1905 * Use the error from local component except the 1906 * case when we did only remote request. 1907 */ 1908 if (ggio->gctl_cmd == BIO_READ && 1909 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1910 ggio->gctl_error = hio->hio_errors[1]; 1911 else 1912 ggio->gctl_error = hio->hio_errors[0]; 1913 } 1914 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1915 mtx_lock(&res->hr_amp_lock); 1916 if (activemap_write_complete(res->hr_amp, 1917 ggio->gctl_offset, ggio->gctl_length)) { 1918 res->hr_stat_activemap_update++; 1919 (void)hast_activemap_flush(res); 1920 } 1921 mtx_unlock(&res->hr_amp_lock); 1922 } 1923 if (ggio->gctl_cmd == BIO_WRITE) { 1924 /* 1925 * Unlock range we locked. 1926 */ 1927 mtx_lock(&range_lock); 1928 rangelock_del(range_regular, ggio->gctl_offset, 1929 ggio->gctl_length); 1930 if (range_sync_wait) 1931 cv_signal(&range_sync_cond); 1932 mtx_unlock(&range_lock); 1933 if (!hio->hio_done) 1934 write_complete(res, hio); 1935 } else { 1936 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1937 primary_exit(EX_OSERR, 1938 "G_GATE_CMD_DONE failed"); 1939 } 1940 } 1941 if (hio->hio_errors[0]) { 1942 switch (ggio->gctl_cmd) { 1943 case BIO_READ: 1944 res->hr_stat_read_error++; 1945 break; 1946 case BIO_WRITE: 1947 res->hr_stat_write_error++; 1948 break; 1949 case BIO_DELETE: 1950 res->hr_stat_delete_error++; 1951 break; 1952 case BIO_FLUSH: 1953 res->hr_stat_flush_error++; 1954 break; 1955 } 1956 } 1957 pjdlog_debug(2, 1958 "ggate_send: (%p) Moving request to the free queue.", hio); 1959 QUEUE_INSERT2(hio, free); 1960 } 1961 /* NOTREACHED */ 1962 return (NULL); 1963 } 1964 1965 /* 1966 * Thread synchronize local and remote components. 1967 */ 1968 static void * 1969 sync_thread(void *arg __unused) 1970 { 1971 struct hast_resource *res = arg; 1972 struct hio *hio; 1973 struct g_gate_ctl_io *ggio; 1974 struct timeval tstart, tend, tdiff; 1975 unsigned int ii, ncomp, ncomps; 1976 off_t offset, length, synced; 1977 bool dorewind, directreads; 1978 int syncext; 1979 1980 ncomps = HAST_NCOMPONENTS; 1981 dorewind = true; 1982 synced = 0; 1983 offset = -1; 1984 directreads = false; 1985 1986 for (;;) { 1987 mtx_lock(&sync_lock); 1988 if (offset >= 0 && !sync_inprogress) { 1989 gettimeofday(&tend, NULL); 1990 timersub(&tend, &tstart, &tdiff); 1991 pjdlog_info("Synchronization interrupted after %#.0T. " 1992 "%NB synchronized so far.", &tdiff, 1993 (intmax_t)synced); 1994 event_send(res, EVENT_SYNCINTR); 1995 } 1996 while (!sync_inprogress) { 1997 dorewind = true; 1998 synced = 0; 1999 cv_wait(&sync_cond, &sync_lock); 2000 } 2001 mtx_unlock(&sync_lock); 2002 /* 2003 * Obtain offset at which we should synchronize. 2004 * Rewind synchronization if needed. 2005 */ 2006 mtx_lock(&res->hr_amp_lock); 2007 if (dorewind) 2008 activemap_sync_rewind(res->hr_amp); 2009 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 2010 if (syncext != -1) { 2011 /* 2012 * We synchronized entire syncext extent, we can mark 2013 * it as clean now. 2014 */ 2015 if (activemap_extent_complete(res->hr_amp, syncext)) 2016 (void)hast_activemap_flush(res); 2017 } 2018 mtx_unlock(&res->hr_amp_lock); 2019 if (dorewind) { 2020 dorewind = false; 2021 if (offset == -1) 2022 pjdlog_info("Nodes are in sync."); 2023 else { 2024 pjdlog_info("Synchronization started. %NB to go.", 2025 (intmax_t)(res->hr_extentsize * 2026 activemap_ndirty(res->hr_amp))); 2027 event_send(res, EVENT_SYNCSTART); 2028 gettimeofday(&tstart, NULL); 2029 } 2030 } 2031 if (offset == -1) { 2032 sync_stop(); 2033 pjdlog_debug(1, "Nothing to synchronize."); 2034 /* 2035 * Synchronization complete, make both localcnt and 2036 * remotecnt equal. 2037 */ 2038 ncomp = 1; 2039 rw_rlock(&hio_remote_lock[ncomp]); 2040 if (ISCONNECTED(res, ncomp)) { 2041 if (synced > 0) { 2042 int64_t bps; 2043 2044 gettimeofday(&tend, NULL); 2045 timersub(&tend, &tstart, &tdiff); 2046 bps = (int64_t)((double)synced / 2047 ((double)tdiff.tv_sec + 2048 (double)tdiff.tv_usec / 1000000)); 2049 pjdlog_info("Synchronization complete. " 2050 "%NB synchronized in %#.0lT (%NB/sec).", 2051 (intmax_t)synced, &tdiff, 2052 (intmax_t)bps); 2053 event_send(res, EVENT_SYNCDONE); 2054 } 2055 mtx_lock(&metadata_lock); 2056 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 2057 directreads = true; 2058 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 2059 res->hr_primary_localcnt = 2060 res->hr_secondary_remotecnt; 2061 res->hr_primary_remotecnt = 2062 res->hr_secondary_localcnt; 2063 pjdlog_debug(1, 2064 "Setting localcnt to %ju and remotecnt to %ju.", 2065 (uintmax_t)res->hr_primary_localcnt, 2066 (uintmax_t)res->hr_primary_remotecnt); 2067 (void)metadata_write(res); 2068 mtx_unlock(&metadata_lock); 2069 } 2070 rw_unlock(&hio_remote_lock[ncomp]); 2071 if (directreads) { 2072 directreads = false; 2073 enable_direct_reads(res); 2074 } 2075 continue; 2076 } 2077 pjdlog_debug(2, "sync: Taking free request."); 2078 QUEUE_TAKE2(hio, free); 2079 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 2080 /* 2081 * Lock the range we are going to synchronize. We don't want 2082 * race where someone writes between our read and write. 2083 */ 2084 for (;;) { 2085 mtx_lock(&range_lock); 2086 if (rangelock_islocked(range_regular, offset, length)) { 2087 pjdlog_debug(2, 2088 "sync: Range offset=%jd length=%jd locked.", 2089 (intmax_t)offset, (intmax_t)length); 2090 range_sync_wait = true; 2091 cv_wait(&range_sync_cond, &range_lock); 2092 range_sync_wait = false; 2093 mtx_unlock(&range_lock); 2094 continue; 2095 } 2096 if (rangelock_add(range_sync, offset, length) == -1) { 2097 mtx_unlock(&range_lock); 2098 pjdlog_debug(2, 2099 "sync: Range offset=%jd length=%jd is already locked, waiting.", 2100 (intmax_t)offset, (intmax_t)length); 2101 sleep(1); 2102 continue; 2103 } 2104 mtx_unlock(&range_lock); 2105 break; 2106 } 2107 /* 2108 * First read the data from synchronization source. 2109 */ 2110 SYNCREQ(hio); 2111 ggio = &hio->hio_ggio; 2112 ggio->gctl_cmd = BIO_READ; 2113 ggio->gctl_offset = offset; 2114 ggio->gctl_length = length; 2115 ggio->gctl_error = 0; 2116 hio->hio_done = false; 2117 hio->hio_replication = res->hr_replication; 2118 for (ii = 0; ii < ncomps; ii++) 2119 hio->hio_errors[ii] = EINVAL; 2120 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2121 hio); 2122 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2123 hio); 2124 mtx_lock(&metadata_lock); 2125 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2126 /* 2127 * This range is up-to-date on local component, 2128 * so handle request locally. 2129 */ 2130 /* Local component is 0 for now. */ 2131 ncomp = 0; 2132 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2133 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2134 /* 2135 * This range is out-of-date on local component, 2136 * so send request to the remote node. 2137 */ 2138 /* Remote component is 1 for now. */ 2139 ncomp = 1; 2140 } 2141 mtx_unlock(&metadata_lock); 2142 hio->hio_countdown = 1; 2143 QUEUE_INSERT1(hio, send, ncomp); 2144 2145 /* 2146 * Let's wait for READ to finish. 2147 */ 2148 mtx_lock(&sync_lock); 2149 while (!ISSYNCREQDONE(hio)) 2150 cv_wait(&sync_cond, &sync_lock); 2151 mtx_unlock(&sync_lock); 2152 2153 if (hio->hio_errors[ncomp] != 0) { 2154 pjdlog_error("Unable to read synchronization data: %s.", 2155 strerror(hio->hio_errors[ncomp])); 2156 goto free_queue; 2157 } 2158 2159 /* 2160 * We read the data from synchronization source, now write it 2161 * to synchronization target. 2162 */ 2163 SYNCREQ(hio); 2164 ggio->gctl_cmd = BIO_WRITE; 2165 for (ii = 0; ii < ncomps; ii++) 2166 hio->hio_errors[ii] = EINVAL; 2167 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2168 hio); 2169 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2170 hio); 2171 mtx_lock(&metadata_lock); 2172 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2173 /* 2174 * This range is up-to-date on local component, 2175 * so we update remote component. 2176 */ 2177 /* Remote component is 1 for now. */ 2178 ncomp = 1; 2179 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2180 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2181 /* 2182 * This range is out-of-date on local component, 2183 * so we update it. 2184 */ 2185 /* Local component is 0 for now. */ 2186 ncomp = 0; 2187 } 2188 mtx_unlock(&metadata_lock); 2189 2190 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2191 hio); 2192 hio->hio_countdown = 1; 2193 QUEUE_INSERT1(hio, send, ncomp); 2194 2195 /* 2196 * Let's wait for WRITE to finish. 2197 */ 2198 mtx_lock(&sync_lock); 2199 while (!ISSYNCREQDONE(hio)) 2200 cv_wait(&sync_cond, &sync_lock); 2201 mtx_unlock(&sync_lock); 2202 2203 if (hio->hio_errors[ncomp] != 0) { 2204 pjdlog_error("Unable to write synchronization data: %s.", 2205 strerror(hio->hio_errors[ncomp])); 2206 goto free_queue; 2207 } 2208 2209 synced += length; 2210 free_queue: 2211 mtx_lock(&range_lock); 2212 rangelock_del(range_sync, offset, length); 2213 if (range_regular_wait) 2214 cv_signal(&range_regular_cond); 2215 mtx_unlock(&range_lock); 2216 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2217 hio); 2218 QUEUE_INSERT2(hio, free); 2219 } 2220 /* NOTREACHED */ 2221 return (NULL); 2222 } 2223 2224 void 2225 primary_config_reload(struct hast_resource *res, struct nv *nv) 2226 { 2227 unsigned int ii, ncomps; 2228 int modified, vint; 2229 const char *vstr; 2230 2231 pjdlog_info("Reloading configuration..."); 2232 2233 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2234 PJDLOG_ASSERT(gres == res); 2235 nv_assert(nv, "remoteaddr"); 2236 nv_assert(nv, "sourceaddr"); 2237 nv_assert(nv, "replication"); 2238 nv_assert(nv, "checksum"); 2239 nv_assert(nv, "compression"); 2240 nv_assert(nv, "timeout"); 2241 nv_assert(nv, "exec"); 2242 nv_assert(nv, "metaflush"); 2243 2244 ncomps = HAST_NCOMPONENTS; 2245 2246 #define MODIFIED_REMOTEADDR 0x01 2247 #define MODIFIED_SOURCEADDR 0x02 2248 #define MODIFIED_REPLICATION 0x04 2249 #define MODIFIED_CHECKSUM 0x08 2250 #define MODIFIED_COMPRESSION 0x10 2251 #define MODIFIED_TIMEOUT 0x20 2252 #define MODIFIED_EXEC 0x40 2253 #define MODIFIED_METAFLUSH 0x80 2254 modified = 0; 2255 2256 vstr = nv_get_string(nv, "remoteaddr"); 2257 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2258 /* 2259 * Don't copy res->hr_remoteaddr to gres just yet. 2260 * We want remote_close() to log disconnect from the old 2261 * addresses, not from the new ones. 2262 */ 2263 modified |= MODIFIED_REMOTEADDR; 2264 } 2265 vstr = nv_get_string(nv, "sourceaddr"); 2266 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2267 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2268 modified |= MODIFIED_SOURCEADDR; 2269 } 2270 vint = nv_get_int32(nv, "replication"); 2271 if (gres->hr_replication != vint) { 2272 gres->hr_replication = vint; 2273 modified |= MODIFIED_REPLICATION; 2274 } 2275 vint = nv_get_int32(nv, "checksum"); 2276 if (gres->hr_checksum != vint) { 2277 gres->hr_checksum = vint; 2278 modified |= MODIFIED_CHECKSUM; 2279 } 2280 vint = nv_get_int32(nv, "compression"); 2281 if (gres->hr_compression != vint) { 2282 gres->hr_compression = vint; 2283 modified |= MODIFIED_COMPRESSION; 2284 } 2285 vint = nv_get_int32(nv, "timeout"); 2286 if (gres->hr_timeout != vint) { 2287 gres->hr_timeout = vint; 2288 modified |= MODIFIED_TIMEOUT; 2289 } 2290 vstr = nv_get_string(nv, "exec"); 2291 if (strcmp(gres->hr_exec, vstr) != 0) { 2292 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2293 modified |= MODIFIED_EXEC; 2294 } 2295 vint = nv_get_int32(nv, "metaflush"); 2296 if (gres->hr_metaflush != vint) { 2297 gres->hr_metaflush = vint; 2298 modified |= MODIFIED_METAFLUSH; 2299 } 2300 2301 /* 2302 * Change timeout for connected sockets. 2303 * Don't bother if we need to reconnect. 2304 */ 2305 if ((modified & MODIFIED_TIMEOUT) != 0 && 2306 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2307 for (ii = 0; ii < ncomps; ii++) { 2308 if (!ISREMOTE(ii)) 2309 continue; 2310 rw_rlock(&hio_remote_lock[ii]); 2311 if (!ISCONNECTED(gres, ii)) { 2312 rw_unlock(&hio_remote_lock[ii]); 2313 continue; 2314 } 2315 rw_unlock(&hio_remote_lock[ii]); 2316 if (proto_timeout(gres->hr_remotein, 2317 gres->hr_timeout) == -1) { 2318 pjdlog_errno(LOG_WARNING, 2319 "Unable to set connection timeout"); 2320 } 2321 if (proto_timeout(gres->hr_remoteout, 2322 gres->hr_timeout) == -1) { 2323 pjdlog_errno(LOG_WARNING, 2324 "Unable to set connection timeout"); 2325 } 2326 } 2327 } 2328 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2329 for (ii = 0; ii < ncomps; ii++) { 2330 if (!ISREMOTE(ii)) 2331 continue; 2332 remote_close(gres, ii); 2333 } 2334 if (modified & MODIFIED_REMOTEADDR) { 2335 vstr = nv_get_string(nv, "remoteaddr"); 2336 strlcpy(gres->hr_remoteaddr, vstr, 2337 sizeof(gres->hr_remoteaddr)); 2338 } 2339 } 2340 #undef MODIFIED_REMOTEADDR 2341 #undef MODIFIED_SOURCEADDR 2342 #undef MODIFIED_REPLICATION 2343 #undef MODIFIED_CHECKSUM 2344 #undef MODIFIED_COMPRESSION 2345 #undef MODIFIED_TIMEOUT 2346 #undef MODIFIED_EXEC 2347 #undef MODIFIED_METAFLUSH 2348 2349 pjdlog_info("Configuration reloaded successfully."); 2350 } 2351 2352 static void 2353 guard_one(struct hast_resource *res, unsigned int ncomp) 2354 { 2355 struct proto_conn *in, *out; 2356 2357 if (!ISREMOTE(ncomp)) 2358 return; 2359 2360 rw_rlock(&hio_remote_lock[ncomp]); 2361 2362 if (!real_remote(res)) { 2363 rw_unlock(&hio_remote_lock[ncomp]); 2364 return; 2365 } 2366 2367 if (ISCONNECTED(res, ncomp)) { 2368 PJDLOG_ASSERT(res->hr_remotein != NULL); 2369 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2370 rw_unlock(&hio_remote_lock[ncomp]); 2371 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2372 res->hr_remoteaddr); 2373 return; 2374 } 2375 2376 PJDLOG_ASSERT(res->hr_remotein == NULL); 2377 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2378 /* 2379 * Upgrade the lock. It doesn't have to be atomic as no other thread 2380 * can change connection status from disconnected to connected. 2381 */ 2382 rw_unlock(&hio_remote_lock[ncomp]); 2383 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2384 res->hr_remoteaddr); 2385 in = out = NULL; 2386 if (init_remote(res, &in, &out) == 0) { 2387 rw_wlock(&hio_remote_lock[ncomp]); 2388 PJDLOG_ASSERT(res->hr_remotein == NULL); 2389 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2390 PJDLOG_ASSERT(in != NULL && out != NULL); 2391 res->hr_remotein = in; 2392 res->hr_remoteout = out; 2393 rw_unlock(&hio_remote_lock[ncomp]); 2394 pjdlog_info("Successfully reconnected to %s.", 2395 res->hr_remoteaddr); 2396 sync_start(); 2397 } else { 2398 /* Both connections should be NULL. */ 2399 PJDLOG_ASSERT(res->hr_remotein == NULL); 2400 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2401 PJDLOG_ASSERT(in == NULL && out == NULL); 2402 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2403 res->hr_remoteaddr); 2404 } 2405 } 2406 2407 /* 2408 * Thread guards remote connections and reconnects when needed, handles 2409 * signals, etc. 2410 */ 2411 static void * 2412 guard_thread(void *arg) 2413 { 2414 struct hast_resource *res = arg; 2415 unsigned int ii, ncomps; 2416 struct timespec timeout; 2417 time_t lastcheck, now; 2418 sigset_t mask; 2419 int signo; 2420 2421 ncomps = HAST_NCOMPONENTS; 2422 lastcheck = time(NULL); 2423 2424 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2425 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2426 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2427 2428 timeout.tv_sec = HAST_KEEPALIVE; 2429 timeout.tv_nsec = 0; 2430 signo = -1; 2431 2432 for (;;) { 2433 switch (signo) { 2434 case SIGINT: 2435 case SIGTERM: 2436 sigexit_received = true; 2437 primary_exitx(EX_OK, 2438 "Termination signal received, exiting."); 2439 break; 2440 default: 2441 break; 2442 } 2443 2444 /* 2445 * Don't check connections until we fully started, 2446 * as we may still be looping, waiting for remote node 2447 * to switch from primary to secondary. 2448 */ 2449 if (fullystarted) { 2450 pjdlog_debug(2, "remote_guard: Checking connections."); 2451 now = time(NULL); 2452 if (lastcheck + HAST_KEEPALIVE <= now) { 2453 for (ii = 0; ii < ncomps; ii++) 2454 guard_one(res, ii); 2455 lastcheck = now; 2456 } 2457 } 2458 signo = sigtimedwait(&mask, NULL, &timeout); 2459 } 2460 /* NOTREACHED */ 2461 return (NULL); 2462 } 2463