1 /*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/types.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/stat.h> 39 40 #include <geom/gate/g_gate.h> 41 42 #include <err.h> 43 #include <errno.h> 44 #include <fcntl.h> 45 #include <libgeom.h> 46 #include <pthread.h> 47 #include <signal.h> 48 #include <stdint.h> 49 #include <stdio.h> 50 #include <string.h> 51 #include <sysexits.h> 52 #include <unistd.h> 53 54 #include <activemap.h> 55 #include <nv.h> 56 #include <rangelock.h> 57 58 #include "control.h" 59 #include "event.h" 60 #include "hast.h" 61 #include "hast_proto.h" 62 #include "hastd.h" 63 #include "hooks.h" 64 #include "metadata.h" 65 #include "proto.h" 66 #include "pjdlog.h" 67 #include "refcnt.h" 68 #include "subr.h" 69 #include "synch.h" 70 71 /* The is only one remote component for now. */ 72 #define ISREMOTE(no) ((no) == 1) 73 74 struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 refcnt_t hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 /* 93 * Request was already confirmed to GEOM Gate. 94 */ 95 bool hio_done; 96 /* 97 * Remember replication from the time the request was initiated, 98 * so we won't get confused when replication changes on reload. 99 */ 100 int hio_replication; 101 TAILQ_ENTRY(hio) *hio_next; 102 }; 103 #define hio_free_next hio_next[0] 104 #define hio_done_next hio_next[0] 105 106 /* 107 * Free list holds unused structures. When free list is empty, we have to wait 108 * until some in-progress requests are freed. 109 */ 110 static TAILQ_HEAD(, hio) hio_free_list; 111 static pthread_mutex_t hio_free_list_lock; 112 static pthread_cond_t hio_free_list_cond; 113 /* 114 * There is one send list for every component. One requests is placed on all 115 * send lists - each component gets the same request, but each component is 116 * responsible for managing his own send list. 117 */ 118 static TAILQ_HEAD(, hio) *hio_send_list; 119 static pthread_mutex_t *hio_send_list_lock; 120 static pthread_cond_t *hio_send_list_cond; 121 /* 122 * There is one recv list for every component, although local components don't 123 * use recv lists as local requests are done synchronously. 124 */ 125 static TAILQ_HEAD(, hio) *hio_recv_list; 126 static pthread_mutex_t *hio_recv_list_lock; 127 static pthread_cond_t *hio_recv_list_cond; 128 /* 129 * Request is placed on done list by the slowest component (the one that 130 * decreased hio_countdown from 1 to 0). 131 */ 132 static TAILQ_HEAD(, hio) hio_done_list; 133 static pthread_mutex_t hio_done_list_lock; 134 static pthread_cond_t hio_done_list_cond; 135 /* 136 * Structure below are for interaction with sync thread. 137 */ 138 static bool sync_inprogress; 139 static pthread_mutex_t sync_lock; 140 static pthread_cond_t sync_cond; 141 /* 142 * The lock below allows to synchornize access to remote connections. 143 */ 144 static pthread_rwlock_t *hio_remote_lock; 145 146 /* 147 * Lock to synchronize metadata updates. Also synchronize access to 148 * hr_primary_localcnt and hr_primary_remotecnt fields. 149 */ 150 static pthread_mutex_t metadata_lock; 151 152 /* 153 * Maximum number of outstanding I/O requests. 154 */ 155 #define HAST_HIO_MAX 256 156 /* 157 * Number of components. At this point there are only two components: local 158 * and remote, but in the future it might be possible to use multiple local 159 * and remote components. 160 */ 161 #define HAST_NCOMPONENTS 2 162 163 #define ISCONNECTED(res, no) \ 164 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 165 166 #define QUEUE_INSERT1(hio, name, ncomp) do { \ 167 bool _wakeup; \ 168 \ 169 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 170 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 171 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 172 hio_next[(ncomp)]); \ 173 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 174 if (_wakeup) \ 175 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 176 } while (0) 177 #define QUEUE_INSERT2(hio, name) do { \ 178 bool _wakeup; \ 179 \ 180 mtx_lock(&hio_##name##_list_lock); \ 181 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 182 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 183 mtx_unlock(&hio_##name##_list_lock); \ 184 if (_wakeup) \ 185 cv_signal(&hio_##name##_list_cond); \ 186 } while (0) 187 #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 188 bool _last; \ 189 \ 190 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 191 _last = false; \ 192 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 193 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 194 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 195 if ((timeout) != 0) \ 196 _last = true; \ 197 } \ 198 if (hio != NULL) { \ 199 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 200 hio_next[(ncomp)]); \ 201 } \ 202 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 203 } while (0) 204 #define QUEUE_TAKE2(hio, name) do { \ 205 mtx_lock(&hio_##name##_list_lock); \ 206 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 207 cv_wait(&hio_##name##_list_cond, \ 208 &hio_##name##_list_lock); \ 209 } \ 210 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 211 mtx_unlock(&hio_##name##_list_lock); \ 212 } while (0) 213 214 #define SYNCREQ(hio) do { \ 215 (hio)->hio_ggio.gctl_unit = -1; \ 216 (hio)->hio_ggio.gctl_seq = 1; \ 217 } while (0) 218 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 219 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 220 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 221 222 static struct hast_resource *gres; 223 224 static pthread_mutex_t range_lock; 225 static struct rangelocks *range_regular; 226 static bool range_regular_wait; 227 static pthread_cond_t range_regular_cond; 228 static struct rangelocks *range_sync; 229 static bool range_sync_wait; 230 static pthread_cond_t range_sync_cond; 231 static bool fullystarted; 232 233 static void *ggate_recv_thread(void *arg); 234 static void *local_send_thread(void *arg); 235 static void *remote_send_thread(void *arg); 236 static void *remote_recv_thread(void *arg); 237 static void *ggate_send_thread(void *arg); 238 static void *sync_thread(void *arg); 239 static void *guard_thread(void *arg); 240 241 static void 242 cleanup(struct hast_resource *res) 243 { 244 int rerrno; 245 246 /* Remember errno. */ 247 rerrno = errno; 248 249 /* Destroy ggate provider if we created one. */ 250 if (res->hr_ggateunit >= 0) { 251 struct g_gate_ctl_destroy ggiod; 252 253 bzero(&ggiod, sizeof(ggiod)); 254 ggiod.gctl_version = G_GATE_VERSION; 255 ggiod.gctl_unit = res->hr_ggateunit; 256 ggiod.gctl_force = 1; 257 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 258 pjdlog_errno(LOG_WARNING, 259 "Unable to destroy hast/%s device", 260 res->hr_provname); 261 } 262 res->hr_ggateunit = -1; 263 } 264 265 /* Restore errno. */ 266 errno = rerrno; 267 } 268 269 static __dead2 void 270 primary_exit(int exitcode, const char *fmt, ...) 271 { 272 va_list ap; 273 274 PJDLOG_ASSERT(exitcode != EX_OK); 275 va_start(ap, fmt); 276 pjdlogv_errno(LOG_ERR, fmt, ap); 277 va_end(ap); 278 cleanup(gres); 279 exit(exitcode); 280 } 281 282 static __dead2 void 283 primary_exitx(int exitcode, const char *fmt, ...) 284 { 285 va_list ap; 286 287 va_start(ap, fmt); 288 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 289 va_end(ap); 290 cleanup(gres); 291 exit(exitcode); 292 } 293 294 static int 295 hast_activemap_flush(struct hast_resource *res) 296 { 297 const unsigned char *buf; 298 size_t size; 299 300 buf = activemap_bitmap(res->hr_amp, &size); 301 PJDLOG_ASSERT(buf != NULL); 302 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 303 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 304 (ssize_t)size) { 305 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 306 res->hr_stat_activemap_write_error++; 307 return (-1); 308 } 309 if (res->hr_metaflush == 1 && g_flush(res->hr_localfd) == -1) { 310 if (errno == EOPNOTSUPP) { 311 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 312 res->hr_localpath); 313 res->hr_metaflush = 0; 314 } else { 315 pjdlog_errno(LOG_ERR, 316 "Unable to flush disk cache on activemap update"); 317 res->hr_stat_activemap_flush_error++; 318 return (-1); 319 } 320 } 321 return (0); 322 } 323 324 static bool 325 real_remote(const struct hast_resource *res) 326 { 327 328 return (strcmp(res->hr_remoteaddr, "none") != 0); 329 } 330 331 static void 332 init_environment(struct hast_resource *res __unused) 333 { 334 struct hio *hio; 335 unsigned int ii, ncomps; 336 337 /* 338 * In the future it might be per-resource value. 339 */ 340 ncomps = HAST_NCOMPONENTS; 341 342 /* 343 * Allocate memory needed by lists. 344 */ 345 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 346 if (hio_send_list == NULL) { 347 primary_exitx(EX_TEMPFAIL, 348 "Unable to allocate %zu bytes of memory for send lists.", 349 sizeof(hio_send_list[0]) * ncomps); 350 } 351 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 352 if (hio_send_list_lock == NULL) { 353 primary_exitx(EX_TEMPFAIL, 354 "Unable to allocate %zu bytes of memory for send list locks.", 355 sizeof(hio_send_list_lock[0]) * ncomps); 356 } 357 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 358 if (hio_send_list_cond == NULL) { 359 primary_exitx(EX_TEMPFAIL, 360 "Unable to allocate %zu bytes of memory for send list condition variables.", 361 sizeof(hio_send_list_cond[0]) * ncomps); 362 } 363 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 364 if (hio_recv_list == NULL) { 365 primary_exitx(EX_TEMPFAIL, 366 "Unable to allocate %zu bytes of memory for recv lists.", 367 sizeof(hio_recv_list[0]) * ncomps); 368 } 369 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 370 if (hio_recv_list_lock == NULL) { 371 primary_exitx(EX_TEMPFAIL, 372 "Unable to allocate %zu bytes of memory for recv list locks.", 373 sizeof(hio_recv_list_lock[0]) * ncomps); 374 } 375 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 376 if (hio_recv_list_cond == NULL) { 377 primary_exitx(EX_TEMPFAIL, 378 "Unable to allocate %zu bytes of memory for recv list condition variables.", 379 sizeof(hio_recv_list_cond[0]) * ncomps); 380 } 381 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 382 if (hio_remote_lock == NULL) { 383 primary_exitx(EX_TEMPFAIL, 384 "Unable to allocate %zu bytes of memory for remote connections locks.", 385 sizeof(hio_remote_lock[0]) * ncomps); 386 } 387 388 /* 389 * Initialize lists, their locks and theirs condition variables. 390 */ 391 TAILQ_INIT(&hio_free_list); 392 mtx_init(&hio_free_list_lock); 393 cv_init(&hio_free_list_cond); 394 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 395 TAILQ_INIT(&hio_send_list[ii]); 396 mtx_init(&hio_send_list_lock[ii]); 397 cv_init(&hio_send_list_cond[ii]); 398 TAILQ_INIT(&hio_recv_list[ii]); 399 mtx_init(&hio_recv_list_lock[ii]); 400 cv_init(&hio_recv_list_cond[ii]); 401 rw_init(&hio_remote_lock[ii]); 402 } 403 TAILQ_INIT(&hio_done_list); 404 mtx_init(&hio_done_list_lock); 405 cv_init(&hio_done_list_cond); 406 mtx_init(&metadata_lock); 407 408 /* 409 * Allocate requests pool and initialize requests. 410 */ 411 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 412 hio = malloc(sizeof(*hio)); 413 if (hio == NULL) { 414 primary_exitx(EX_TEMPFAIL, 415 "Unable to allocate %zu bytes of memory for hio request.", 416 sizeof(*hio)); 417 } 418 refcnt_init(&hio->hio_countdown, 0); 419 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 420 if (hio->hio_errors == NULL) { 421 primary_exitx(EX_TEMPFAIL, 422 "Unable allocate %zu bytes of memory for hio errors.", 423 sizeof(hio->hio_errors[0]) * ncomps); 424 } 425 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 426 if (hio->hio_next == NULL) { 427 primary_exitx(EX_TEMPFAIL, 428 "Unable allocate %zu bytes of memory for hio_next field.", 429 sizeof(hio->hio_next[0]) * ncomps); 430 } 431 hio->hio_ggio.gctl_version = G_GATE_VERSION; 432 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 433 if (hio->hio_ggio.gctl_data == NULL) { 434 primary_exitx(EX_TEMPFAIL, 435 "Unable to allocate %zu bytes of memory for gctl_data.", 436 MAXPHYS); 437 } 438 hio->hio_ggio.gctl_length = MAXPHYS; 439 hio->hio_ggio.gctl_error = 0; 440 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 441 } 442 } 443 444 static bool 445 init_resuid(struct hast_resource *res) 446 { 447 448 mtx_lock(&metadata_lock); 449 if (res->hr_resuid != 0) { 450 mtx_unlock(&metadata_lock); 451 return (false); 452 } else { 453 /* Initialize unique resource identifier. */ 454 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 455 mtx_unlock(&metadata_lock); 456 if (metadata_write(res) == -1) 457 exit(EX_NOINPUT); 458 return (true); 459 } 460 } 461 462 static void 463 init_local(struct hast_resource *res) 464 { 465 unsigned char *buf; 466 size_t mapsize; 467 468 if (metadata_read(res, true) == -1) 469 exit(EX_NOINPUT); 470 mtx_init(&res->hr_amp_lock); 471 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 472 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 473 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 474 } 475 mtx_init(&range_lock); 476 cv_init(&range_regular_cond); 477 if (rangelock_init(&range_regular) == -1) 478 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 479 cv_init(&range_sync_cond); 480 if (rangelock_init(&range_sync) == -1) 481 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 482 mapsize = activemap_ondisk_size(res->hr_amp); 483 buf = calloc(1, mapsize); 484 if (buf == NULL) { 485 primary_exitx(EX_TEMPFAIL, 486 "Unable to allocate buffer for activemap."); 487 } 488 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 489 (ssize_t)mapsize) { 490 primary_exit(EX_NOINPUT, "Unable to read activemap"); 491 } 492 activemap_copyin(res->hr_amp, buf, mapsize); 493 free(buf); 494 if (res->hr_resuid != 0) 495 return; 496 /* 497 * We're using provider for the first time. Initialize local and remote 498 * counters. We don't initialize resuid here, as we want to do it just 499 * in time. The reason for this is that we want to inform secondary 500 * that there were no writes yet, so there is no need to synchronize 501 * anything. 502 */ 503 res->hr_primary_localcnt = 0; 504 res->hr_primary_remotecnt = 0; 505 if (metadata_write(res) == -1) 506 exit(EX_NOINPUT); 507 } 508 509 static int 510 primary_connect(struct hast_resource *res, struct proto_conn **connp) 511 { 512 struct proto_conn *conn; 513 int16_t val; 514 515 val = 1; 516 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 517 primary_exit(EX_TEMPFAIL, 518 "Unable to send connection request to parent"); 519 } 520 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 521 primary_exit(EX_TEMPFAIL, 522 "Unable to receive reply to connection request from parent"); 523 } 524 if (val != 0) { 525 errno = val; 526 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 527 res->hr_remoteaddr); 528 return (-1); 529 } 530 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 531 primary_exit(EX_TEMPFAIL, 532 "Unable to receive connection from parent"); 533 } 534 if (proto_connect_wait(conn, res->hr_timeout) == -1) { 535 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 536 res->hr_remoteaddr); 537 proto_close(conn); 538 return (-1); 539 } 540 /* Error in setting timeout is not critical, but why should it fail? */ 541 if (proto_timeout(conn, res->hr_timeout) == -1) 542 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 543 544 *connp = conn; 545 546 return (0); 547 } 548 549 /* 550 * Function instructs GEOM_GATE to handle reads directly from within the kernel. 551 */ 552 static void 553 enable_direct_reads(struct hast_resource *res) 554 { 555 struct g_gate_ctl_modify ggiomodify; 556 557 bzero(&ggiomodify, sizeof(ggiomodify)); 558 ggiomodify.gctl_version = G_GATE_VERSION; 559 ggiomodify.gctl_unit = res->hr_ggateunit; 560 ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; 561 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, 562 sizeof(ggiomodify.gctl_readprov)); 563 ggiomodify.gctl_readoffset = res->hr_localoff; 564 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) 565 pjdlog_debug(1, "Direct reads enabled."); 566 else 567 pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); 568 } 569 570 static int 571 init_remote(struct hast_resource *res, struct proto_conn **inp, 572 struct proto_conn **outp) 573 { 574 struct proto_conn *in, *out; 575 struct nv *nvout, *nvin; 576 const unsigned char *token; 577 unsigned char *map; 578 const char *errmsg; 579 int32_t extentsize; 580 int64_t datasize; 581 uint32_t mapsize; 582 uint8_t version; 583 size_t size; 584 int error; 585 586 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 587 PJDLOG_ASSERT(real_remote(res)); 588 589 in = out = NULL; 590 errmsg = NULL; 591 592 if (primary_connect(res, &out) == -1) 593 return (ECONNREFUSED); 594 595 error = ECONNABORTED; 596 597 /* 598 * First handshake step. 599 * Setup outgoing connection with remote node. 600 */ 601 nvout = nv_alloc(); 602 nv_add_string(nvout, res->hr_name, "resource"); 603 nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); 604 if (nv_error(nvout) != 0) { 605 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 606 "Unable to allocate header for connection with %s", 607 res->hr_remoteaddr); 608 nv_free(nvout); 609 goto close; 610 } 611 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 612 pjdlog_errno(LOG_WARNING, 613 "Unable to send handshake header to %s", 614 res->hr_remoteaddr); 615 nv_free(nvout); 616 goto close; 617 } 618 nv_free(nvout); 619 if (hast_proto_recv_hdr(out, &nvin) == -1) { 620 pjdlog_errno(LOG_WARNING, 621 "Unable to receive handshake header from %s", 622 res->hr_remoteaddr); 623 goto close; 624 } 625 errmsg = nv_get_string(nvin, "errmsg"); 626 if (errmsg != NULL) { 627 pjdlog_warning("%s", errmsg); 628 if (nv_exists(nvin, "wait")) 629 error = EBUSY; 630 nv_free(nvin); 631 goto close; 632 } 633 version = nv_get_uint8(nvin, "version"); 634 if (version == 0) { 635 /* 636 * If no version is sent, it means this is protocol version 1. 637 */ 638 version = 1; 639 } 640 if (version > HAST_PROTO_VERSION) { 641 pjdlog_warning("Invalid version received (%hhu).", version); 642 nv_free(nvin); 643 goto close; 644 } 645 res->hr_version = version; 646 pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); 647 token = nv_get_uint8_array(nvin, &size, "token"); 648 if (token == NULL) { 649 pjdlog_warning("Handshake header from %s has no 'token' field.", 650 res->hr_remoteaddr); 651 nv_free(nvin); 652 goto close; 653 } 654 if (size != sizeof(res->hr_token)) { 655 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 656 res->hr_remoteaddr, size, sizeof(res->hr_token)); 657 nv_free(nvin); 658 goto close; 659 } 660 bcopy(token, res->hr_token, sizeof(res->hr_token)); 661 nv_free(nvin); 662 663 /* 664 * Second handshake step. 665 * Setup incoming connection with remote node. 666 */ 667 if (primary_connect(res, &in) == -1) 668 goto close; 669 670 nvout = nv_alloc(); 671 nv_add_string(nvout, res->hr_name, "resource"); 672 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 673 "token"); 674 if (res->hr_resuid == 0) { 675 /* 676 * The resuid field was not yet initialized. 677 * Because we do synchronization inside init_resuid(), it is 678 * possible that someone already initialized it, the function 679 * will return false then, but if we successfully initialized 680 * it, we will get true. True means that there were no writes 681 * to this resource yet and we want to inform secondary that 682 * synchronization is not needed by sending "virgin" argument. 683 */ 684 if (init_resuid(res)) 685 nv_add_int8(nvout, 1, "virgin"); 686 } 687 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 688 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 689 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 690 if (nv_error(nvout) != 0) { 691 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 692 "Unable to allocate header for connection with %s", 693 res->hr_remoteaddr); 694 nv_free(nvout); 695 goto close; 696 } 697 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 698 pjdlog_errno(LOG_WARNING, 699 "Unable to send handshake header to %s", 700 res->hr_remoteaddr); 701 nv_free(nvout); 702 goto close; 703 } 704 nv_free(nvout); 705 if (hast_proto_recv_hdr(out, &nvin) == -1) { 706 pjdlog_errno(LOG_WARNING, 707 "Unable to receive handshake header from %s", 708 res->hr_remoteaddr); 709 goto close; 710 } 711 errmsg = nv_get_string(nvin, "errmsg"); 712 if (errmsg != NULL) { 713 pjdlog_warning("%s", errmsg); 714 nv_free(nvin); 715 goto close; 716 } 717 datasize = nv_get_int64(nvin, "datasize"); 718 if (datasize != res->hr_datasize) { 719 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 720 (intmax_t)res->hr_datasize, (intmax_t)datasize); 721 nv_free(nvin); 722 goto close; 723 } 724 extentsize = nv_get_int32(nvin, "extentsize"); 725 if (extentsize != res->hr_extentsize) { 726 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 727 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 728 nv_free(nvin); 729 goto close; 730 } 731 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 732 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 733 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 734 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) 735 enable_direct_reads(res); 736 if (nv_exists(nvin, "virgin")) { 737 /* 738 * Secondary was reinitialized, bump localcnt if it is 0 as 739 * only we have the data. 740 */ 741 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 742 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 743 744 if (res->hr_primary_localcnt == 0) { 745 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 746 747 mtx_lock(&metadata_lock); 748 res->hr_primary_localcnt++; 749 pjdlog_debug(1, "Increasing localcnt to %ju.", 750 (uintmax_t)res->hr_primary_localcnt); 751 (void)metadata_write(res); 752 mtx_unlock(&metadata_lock); 753 } 754 } 755 map = NULL; 756 mapsize = nv_get_uint32(nvin, "mapsize"); 757 if (mapsize > 0) { 758 map = malloc(mapsize); 759 if (map == NULL) { 760 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 761 (uintmax_t)mapsize); 762 nv_free(nvin); 763 goto close; 764 } 765 /* 766 * Remote node have some dirty extents on its own, lets 767 * download its activemap. 768 */ 769 if (hast_proto_recv_data(res, out, nvin, map, 770 mapsize) == -1) { 771 pjdlog_errno(LOG_ERR, 772 "Unable to receive remote activemap"); 773 nv_free(nvin); 774 free(map); 775 goto close; 776 } 777 /* 778 * Merge local and remote bitmaps. 779 */ 780 activemap_merge(res->hr_amp, map, mapsize); 781 free(map); 782 /* 783 * Now that we merged bitmaps from both nodes, flush it to the 784 * disk before we start to synchronize. 785 */ 786 (void)hast_activemap_flush(res); 787 } 788 nv_free(nvin); 789 #ifdef notyet 790 /* Setup directions. */ 791 if (proto_send(out, NULL, 0) == -1) 792 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 793 if (proto_recv(in, NULL, 0) == -1) 794 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 795 #endif 796 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 797 if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && 798 res->hr_version < 2) { 799 pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); 800 res->hr_replication = HAST_REPLICATION_FULLSYNC; 801 } else if (res->hr_replication != res->hr_original_replication) { 802 /* 803 * This is in case hastd disconnected and was upgraded. 804 */ 805 res->hr_replication = res->hr_original_replication; 806 } 807 if (inp != NULL && outp != NULL) { 808 *inp = in; 809 *outp = out; 810 } else { 811 res->hr_remotein = in; 812 res->hr_remoteout = out; 813 } 814 event_send(res, EVENT_CONNECT); 815 return (0); 816 close: 817 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 818 event_send(res, EVENT_SPLITBRAIN); 819 proto_close(out); 820 if (in != NULL) 821 proto_close(in); 822 return (error); 823 } 824 825 static void 826 sync_start(void) 827 { 828 829 mtx_lock(&sync_lock); 830 sync_inprogress = true; 831 mtx_unlock(&sync_lock); 832 cv_signal(&sync_cond); 833 } 834 835 static void 836 sync_stop(void) 837 { 838 839 mtx_lock(&sync_lock); 840 if (sync_inprogress) 841 sync_inprogress = false; 842 mtx_unlock(&sync_lock); 843 } 844 845 static void 846 init_ggate(struct hast_resource *res) 847 { 848 struct g_gate_ctl_create ggiocreate; 849 struct g_gate_ctl_cancel ggiocancel; 850 851 /* 852 * We communicate with ggate via /dev/ggctl. Open it. 853 */ 854 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 855 if (res->hr_ggatefd == -1) 856 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 857 /* 858 * Create provider before trying to connect, as connection failure 859 * is not critical, but may take some time. 860 */ 861 bzero(&ggiocreate, sizeof(ggiocreate)); 862 ggiocreate.gctl_version = G_GATE_VERSION; 863 ggiocreate.gctl_mediasize = res->hr_datasize; 864 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 865 ggiocreate.gctl_flags = 0; 866 ggiocreate.gctl_maxcount = 0; 867 ggiocreate.gctl_timeout = 0; 868 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 869 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 870 res->hr_provname); 871 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 872 pjdlog_info("Device hast/%s created.", res->hr_provname); 873 res->hr_ggateunit = ggiocreate.gctl_unit; 874 return; 875 } 876 if (errno != EEXIST) { 877 primary_exit(EX_OSERR, "Unable to create hast/%s device", 878 res->hr_provname); 879 } 880 pjdlog_debug(1, 881 "Device hast/%s already exists, we will try to take it over.", 882 res->hr_provname); 883 /* 884 * If we received EEXIST, we assume that the process who created the 885 * provider died and didn't clean up. In that case we will start from 886 * where he left of. 887 */ 888 bzero(&ggiocancel, sizeof(ggiocancel)); 889 ggiocancel.gctl_version = G_GATE_VERSION; 890 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 891 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 892 res->hr_provname); 893 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 894 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 895 res->hr_ggateunit = ggiocancel.gctl_unit; 896 return; 897 } 898 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 899 res->hr_provname); 900 } 901 902 void 903 hastd_primary(struct hast_resource *res) 904 { 905 pthread_t td; 906 pid_t pid; 907 int error, mode, debuglevel; 908 909 /* 910 * Create communication channel for sending control commands from 911 * parent to child. 912 */ 913 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 914 /* TODO: There's no need for this to be fatal error. */ 915 KEEP_ERRNO((void)pidfile_remove(pfh)); 916 pjdlog_exit(EX_OSERR, 917 "Unable to create control sockets between parent and child"); 918 } 919 /* 920 * Create communication channel for sending events from child to parent. 921 */ 922 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 923 /* TODO: There's no need for this to be fatal error. */ 924 KEEP_ERRNO((void)pidfile_remove(pfh)); 925 pjdlog_exit(EX_OSERR, 926 "Unable to create event sockets between child and parent"); 927 } 928 /* 929 * Create communication channel for sending connection requests from 930 * child to parent. 931 */ 932 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 933 /* TODO: There's no need for this to be fatal error. */ 934 KEEP_ERRNO((void)pidfile_remove(pfh)); 935 pjdlog_exit(EX_OSERR, 936 "Unable to create connection sockets between child and parent"); 937 } 938 939 pid = fork(); 940 if (pid == -1) { 941 /* TODO: There's no need for this to be fatal error. */ 942 KEEP_ERRNO((void)pidfile_remove(pfh)); 943 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 944 } 945 946 if (pid > 0) { 947 /* This is parent. */ 948 /* Declare that we are receiver. */ 949 proto_recv(res->hr_event, NULL, 0); 950 proto_recv(res->hr_conn, NULL, 0); 951 /* Declare that we are sender. */ 952 proto_send(res->hr_ctrl, NULL, 0); 953 res->hr_workerpid = pid; 954 return; 955 } 956 957 gres = res; 958 mode = pjdlog_mode_get(); 959 debuglevel = pjdlog_debug_get(); 960 961 /* Declare that we are sender. */ 962 proto_send(res->hr_event, NULL, 0); 963 proto_send(res->hr_conn, NULL, 0); 964 /* Declare that we are receiver. */ 965 proto_recv(res->hr_ctrl, NULL, 0); 966 descriptors_cleanup(res); 967 968 descriptors_assert(res, mode); 969 970 pjdlog_init(mode); 971 pjdlog_debug_set(debuglevel); 972 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 973 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 974 975 init_local(res); 976 init_ggate(res); 977 init_environment(res); 978 979 if (drop_privs(res) != 0) { 980 cleanup(res); 981 exit(EX_CONFIG); 982 } 983 pjdlog_info("Privileges successfully dropped."); 984 985 /* 986 * Create the guard thread first, so we can handle signals from the 987 * very beginning. 988 */ 989 error = pthread_create(&td, NULL, guard_thread, res); 990 PJDLOG_ASSERT(error == 0); 991 /* 992 * Create the control thread before sending any event to the parent, 993 * as we can deadlock when parent sends control request to worker, 994 * but worker has no control thread started yet, so parent waits. 995 * In the meantime worker sends an event to the parent, but parent 996 * is unable to handle the event, because it waits for control 997 * request response. 998 */ 999 error = pthread_create(&td, NULL, ctrl_thread, res); 1000 PJDLOG_ASSERT(error == 0); 1001 if (real_remote(res)) { 1002 error = init_remote(res, NULL, NULL); 1003 if (error == 0) { 1004 sync_start(); 1005 } else if (error == EBUSY) { 1006 time_t start = time(NULL); 1007 1008 pjdlog_warning("Waiting for remote node to become %s for %ds.", 1009 role2str(HAST_ROLE_SECONDARY), 1010 res->hr_timeout); 1011 for (;;) { 1012 sleep(1); 1013 error = init_remote(res, NULL, NULL); 1014 if (error != EBUSY) 1015 break; 1016 if (time(NULL) > start + res->hr_timeout) 1017 break; 1018 } 1019 if (error == EBUSY) { 1020 pjdlog_warning("Remote node is still %s, starting anyway.", 1021 role2str(HAST_ROLE_PRIMARY)); 1022 } 1023 } 1024 } 1025 error = pthread_create(&td, NULL, ggate_recv_thread, res); 1026 PJDLOG_ASSERT(error == 0); 1027 error = pthread_create(&td, NULL, local_send_thread, res); 1028 PJDLOG_ASSERT(error == 0); 1029 error = pthread_create(&td, NULL, remote_send_thread, res); 1030 PJDLOG_ASSERT(error == 0); 1031 error = pthread_create(&td, NULL, remote_recv_thread, res); 1032 PJDLOG_ASSERT(error == 0); 1033 error = pthread_create(&td, NULL, ggate_send_thread, res); 1034 PJDLOG_ASSERT(error == 0); 1035 fullystarted = true; 1036 (void)sync_thread(res); 1037 } 1038 1039 static void 1040 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, 1041 const char *fmt, ...) 1042 { 1043 char msg[1024]; 1044 va_list ap; 1045 1046 va_start(ap, fmt); 1047 (void)vsnprintf(msg, sizeof(msg), fmt, ap); 1048 va_end(ap); 1049 switch (ggio->gctl_cmd) { 1050 case BIO_READ: 1051 (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1052 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1053 break; 1054 case BIO_DELETE: 1055 (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1056 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1057 break; 1058 case BIO_FLUSH: 1059 (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1060 break; 1061 case BIO_WRITE: 1062 (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1063 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1064 break; 1065 default: 1066 (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1067 (unsigned int)ggio->gctl_cmd); 1068 break; 1069 } 1070 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1071 } 1072 1073 static void 1074 remote_close(struct hast_resource *res, int ncomp) 1075 { 1076 1077 rw_wlock(&hio_remote_lock[ncomp]); 1078 /* 1079 * Check for a race between dropping rlock and acquiring wlock - 1080 * another thread can close connection in-between. 1081 */ 1082 if (!ISCONNECTED(res, ncomp)) { 1083 PJDLOG_ASSERT(res->hr_remotein == NULL); 1084 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1085 rw_unlock(&hio_remote_lock[ncomp]); 1086 return; 1087 } 1088 1089 PJDLOG_ASSERT(res->hr_remotein != NULL); 1090 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1091 1092 pjdlog_debug(2, "Closing incoming connection to %s.", 1093 res->hr_remoteaddr); 1094 proto_close(res->hr_remotein); 1095 res->hr_remotein = NULL; 1096 pjdlog_debug(2, "Closing outgoing connection to %s.", 1097 res->hr_remoteaddr); 1098 proto_close(res->hr_remoteout); 1099 res->hr_remoteout = NULL; 1100 1101 rw_unlock(&hio_remote_lock[ncomp]); 1102 1103 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1104 1105 /* 1106 * Stop synchronization if in-progress. 1107 */ 1108 sync_stop(); 1109 1110 event_send(res, EVENT_DISCONNECT); 1111 } 1112 1113 /* 1114 * Acknowledge write completion to the kernel, but don't update activemap yet. 1115 */ 1116 static void 1117 write_complete(struct hast_resource *res, struct hio *hio) 1118 { 1119 struct g_gate_ctl_io *ggio; 1120 unsigned int ncomp; 1121 1122 PJDLOG_ASSERT(!hio->hio_done); 1123 1124 ggio = &hio->hio_ggio; 1125 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1126 1127 /* 1128 * Bump local count if this is first write after 1129 * connection failure with remote node. 1130 */ 1131 ncomp = 1; 1132 rw_rlock(&hio_remote_lock[ncomp]); 1133 if (!ISCONNECTED(res, ncomp)) { 1134 mtx_lock(&metadata_lock); 1135 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1136 res->hr_primary_localcnt++; 1137 pjdlog_debug(1, "Increasing localcnt to %ju.", 1138 (uintmax_t)res->hr_primary_localcnt); 1139 (void)metadata_write(res); 1140 } 1141 mtx_unlock(&metadata_lock); 1142 } 1143 rw_unlock(&hio_remote_lock[ncomp]); 1144 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1145 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1146 hio->hio_done = true; 1147 } 1148 1149 /* 1150 * Thread receives ggate I/O requests from the kernel and passes them to 1151 * appropriate threads: 1152 * WRITE - always goes to both local_send and remote_send threads 1153 * READ (when the block is up-to-date on local component) - 1154 * only local_send thread 1155 * READ (when the block isn't up-to-date on local component) - 1156 * only remote_send thread 1157 * DELETE - always goes to both local_send and remote_send threads 1158 * FLUSH - always goes to both local_send and remote_send threads 1159 */ 1160 static void * 1161 ggate_recv_thread(void *arg) 1162 { 1163 struct hast_resource *res = arg; 1164 struct g_gate_ctl_io *ggio; 1165 struct hio *hio; 1166 unsigned int ii, ncomp, ncomps; 1167 int error; 1168 1169 for (;;) { 1170 pjdlog_debug(2, "ggate_recv: Taking free request."); 1171 QUEUE_TAKE2(hio, free); 1172 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1173 ggio = &hio->hio_ggio; 1174 ggio->gctl_unit = res->hr_ggateunit; 1175 ggio->gctl_length = MAXPHYS; 1176 ggio->gctl_error = 0; 1177 hio->hio_done = false; 1178 hio->hio_replication = res->hr_replication; 1179 pjdlog_debug(2, 1180 "ggate_recv: (%p) Waiting for request from the kernel.", 1181 hio); 1182 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1183 if (sigexit_received) 1184 pthread_exit(NULL); 1185 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1186 } 1187 error = ggio->gctl_error; 1188 switch (error) { 1189 case 0: 1190 break; 1191 case ECANCELED: 1192 /* Exit gracefully. */ 1193 if (!sigexit_received) { 1194 pjdlog_debug(2, 1195 "ggate_recv: (%p) Received cancel from the kernel.", 1196 hio); 1197 pjdlog_info("Received cancel from the kernel, exiting."); 1198 } 1199 pthread_exit(NULL); 1200 case ENOMEM: 1201 /* 1202 * Buffer too small? Impossible, we allocate MAXPHYS 1203 * bytes - request can't be bigger than that. 1204 */ 1205 /* FALLTHROUGH */ 1206 case ENXIO: 1207 default: 1208 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1209 strerror(error)); 1210 } 1211 1212 ncomp = 0; 1213 ncomps = HAST_NCOMPONENTS; 1214 1215 for (ii = 0; ii < ncomps; ii++) 1216 hio->hio_errors[ii] = EINVAL; 1217 reqlog(LOG_DEBUG, 2, ggio, 1218 "ggate_recv: (%p) Request received from the kernel: ", 1219 hio); 1220 1221 /* 1222 * Inform all components about new write request. 1223 * For read request prefer local component unless the given 1224 * range is out-of-date, then use remote component. 1225 */ 1226 switch (ggio->gctl_cmd) { 1227 case BIO_READ: 1228 res->hr_stat_read++; 1229 ncomps = 1; 1230 mtx_lock(&metadata_lock); 1231 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1232 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1233 /* 1234 * This range is up-to-date on local component, 1235 * so handle request locally. 1236 */ 1237 /* Local component is 0 for now. */ 1238 ncomp = 0; 1239 } else /* if (res->hr_syncsrc == 1240 HAST_SYNCSRC_SECONDARY) */ { 1241 PJDLOG_ASSERT(res->hr_syncsrc == 1242 HAST_SYNCSRC_SECONDARY); 1243 /* 1244 * This range is out-of-date on local component, 1245 * so send request to the remote node. 1246 */ 1247 /* Remote component is 1 for now. */ 1248 ncomp = 1; 1249 } 1250 mtx_unlock(&metadata_lock); 1251 break; 1252 case BIO_WRITE: 1253 res->hr_stat_write++; 1254 if (res->hr_resuid == 0 && 1255 res->hr_primary_localcnt == 0) { 1256 /* This is first write. */ 1257 res->hr_primary_localcnt = 1; 1258 } 1259 for (;;) { 1260 mtx_lock(&range_lock); 1261 if (rangelock_islocked(range_sync, 1262 ggio->gctl_offset, ggio->gctl_length)) { 1263 pjdlog_debug(2, 1264 "regular: Range offset=%jd length=%zu locked.", 1265 (intmax_t)ggio->gctl_offset, 1266 (size_t)ggio->gctl_length); 1267 range_regular_wait = true; 1268 cv_wait(&range_regular_cond, &range_lock); 1269 range_regular_wait = false; 1270 mtx_unlock(&range_lock); 1271 continue; 1272 } 1273 if (rangelock_add(range_regular, 1274 ggio->gctl_offset, ggio->gctl_length) == -1) { 1275 mtx_unlock(&range_lock); 1276 pjdlog_debug(2, 1277 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1278 (intmax_t)ggio->gctl_offset, 1279 (size_t)ggio->gctl_length); 1280 sleep(1); 1281 continue; 1282 } 1283 mtx_unlock(&range_lock); 1284 break; 1285 } 1286 mtx_lock(&res->hr_amp_lock); 1287 if (activemap_write_start(res->hr_amp, 1288 ggio->gctl_offset, ggio->gctl_length)) { 1289 res->hr_stat_activemap_update++; 1290 (void)hast_activemap_flush(res); 1291 } 1292 mtx_unlock(&res->hr_amp_lock); 1293 break; 1294 case BIO_DELETE: 1295 res->hr_stat_delete++; 1296 break; 1297 case BIO_FLUSH: 1298 res->hr_stat_flush++; 1299 break; 1300 } 1301 pjdlog_debug(2, 1302 "ggate_recv: (%p) Moving request to the send queues.", hio); 1303 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1304 ggio->gctl_cmd == BIO_WRITE) { 1305 /* Each remote request needs two responses in memsync. */ 1306 refcnt_init(&hio->hio_countdown, ncomps + 1); 1307 } else { 1308 refcnt_init(&hio->hio_countdown, ncomps); 1309 } 1310 for (ii = ncomp; ii < ncomps; ii++) 1311 QUEUE_INSERT1(hio, send, ii); 1312 } 1313 /* NOTREACHED */ 1314 return (NULL); 1315 } 1316 1317 /* 1318 * Thread reads from or writes to local component. 1319 * If local read fails, it redirects it to remote_send thread. 1320 */ 1321 static void * 1322 local_send_thread(void *arg) 1323 { 1324 struct hast_resource *res = arg; 1325 struct g_gate_ctl_io *ggio; 1326 struct hio *hio; 1327 unsigned int ncomp, rncomp; 1328 ssize_t ret; 1329 1330 /* Local component is 0 for now. */ 1331 ncomp = 0; 1332 /* Remote component is 1 for now. */ 1333 rncomp = 1; 1334 1335 for (;;) { 1336 pjdlog_debug(2, "local_send: Taking request."); 1337 QUEUE_TAKE1(hio, send, ncomp, 0); 1338 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1339 ggio = &hio->hio_ggio; 1340 switch (ggio->gctl_cmd) { 1341 case BIO_READ: 1342 ret = pread(res->hr_localfd, ggio->gctl_data, 1343 ggio->gctl_length, 1344 ggio->gctl_offset + res->hr_localoff); 1345 if (ret == ggio->gctl_length) 1346 hio->hio_errors[ncomp] = 0; 1347 else if (!ISSYNCREQ(hio)) { 1348 /* 1349 * If READ failed, try to read from remote node. 1350 */ 1351 if (ret == -1) { 1352 reqlog(LOG_WARNING, 0, ggio, 1353 "Local request failed (%s), trying remote node. ", 1354 strerror(errno)); 1355 } else if (ret != ggio->gctl_length) { 1356 reqlog(LOG_WARNING, 0, ggio, 1357 "Local request failed (%zd != %jd), trying remote node. ", 1358 ret, (intmax_t)ggio->gctl_length); 1359 } 1360 QUEUE_INSERT1(hio, send, rncomp); 1361 continue; 1362 } 1363 break; 1364 case BIO_WRITE: 1365 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1366 ggio->gctl_length, 1367 ggio->gctl_offset + res->hr_localoff); 1368 if (ret == -1) { 1369 hio->hio_errors[ncomp] = errno; 1370 reqlog(LOG_WARNING, 0, ggio, 1371 "Local request failed (%s): ", 1372 strerror(errno)); 1373 } else if (ret != ggio->gctl_length) { 1374 hio->hio_errors[ncomp] = EIO; 1375 reqlog(LOG_WARNING, 0, ggio, 1376 "Local request failed (%zd != %jd): ", 1377 ret, (intmax_t)ggio->gctl_length); 1378 } else { 1379 hio->hio_errors[ncomp] = 0; 1380 if (hio->hio_replication == 1381 HAST_REPLICATION_ASYNC) { 1382 ggio->gctl_error = 0; 1383 write_complete(res, hio); 1384 } 1385 } 1386 break; 1387 case BIO_DELETE: 1388 ret = g_delete(res->hr_localfd, 1389 ggio->gctl_offset + res->hr_localoff, 1390 ggio->gctl_length); 1391 if (ret == -1) { 1392 hio->hio_errors[ncomp] = errno; 1393 reqlog(LOG_WARNING, 0, ggio, 1394 "Local request failed (%s): ", 1395 strerror(errno)); 1396 } else { 1397 hio->hio_errors[ncomp] = 0; 1398 } 1399 break; 1400 case BIO_FLUSH: 1401 if (!res->hr_localflush) { 1402 ret = -1; 1403 errno = EOPNOTSUPP; 1404 break; 1405 } 1406 ret = g_flush(res->hr_localfd); 1407 if (ret == -1) { 1408 if (errno == EOPNOTSUPP) 1409 res->hr_localflush = false; 1410 hio->hio_errors[ncomp] = errno; 1411 reqlog(LOG_WARNING, 0, ggio, 1412 "Local request failed (%s): ", 1413 strerror(errno)); 1414 } else { 1415 hio->hio_errors[ncomp] = 0; 1416 } 1417 break; 1418 } 1419 1420 if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1421 ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1422 if (refcnt_release(&hio->hio_countdown) > 0) 1423 continue; 1424 } else { 1425 /* 1426 * Depending on hio_countdown value, requests finished 1427 * in the following order: 1428 * 0: remote memsync, remote final, local write 1429 * 1: remote memsync, local write, (remote final) 1430 * 2: local write, (remote memsync), (remote final) 1431 */ 1432 switch (refcnt_release(&hio->hio_countdown)) { 1433 case 0: 1434 /* 1435 * Local write finished as last. 1436 */ 1437 break; 1438 case 1: 1439 /* 1440 * Local write finished after remote memsync 1441 * reply arrvied. We can complete the write now. 1442 */ 1443 if (hio->hio_errors[0] == 0) 1444 write_complete(res, hio); 1445 continue; 1446 case 2: 1447 /* 1448 * Local write finished as first. 1449 */ 1450 continue; 1451 default: 1452 PJDLOG_ABORT("Invalid hio_countdown."); 1453 } 1454 } 1455 if (ISSYNCREQ(hio)) { 1456 mtx_lock(&sync_lock); 1457 SYNCREQDONE(hio); 1458 mtx_unlock(&sync_lock); 1459 cv_signal(&sync_cond); 1460 } else { 1461 pjdlog_debug(2, 1462 "local_send: (%p) Moving request to the done queue.", 1463 hio); 1464 QUEUE_INSERT2(hio, done); 1465 } 1466 } 1467 /* NOTREACHED */ 1468 return (NULL); 1469 } 1470 1471 static void 1472 keepalive_send(struct hast_resource *res, unsigned int ncomp) 1473 { 1474 struct nv *nv; 1475 1476 rw_rlock(&hio_remote_lock[ncomp]); 1477 1478 if (!ISCONNECTED(res, ncomp)) { 1479 rw_unlock(&hio_remote_lock[ncomp]); 1480 return; 1481 } 1482 1483 PJDLOG_ASSERT(res->hr_remotein != NULL); 1484 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1485 1486 nv = nv_alloc(); 1487 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1488 if (nv_error(nv) != 0) { 1489 rw_unlock(&hio_remote_lock[ncomp]); 1490 nv_free(nv); 1491 pjdlog_debug(1, 1492 "keepalive_send: Unable to prepare header to send."); 1493 return; 1494 } 1495 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1496 rw_unlock(&hio_remote_lock[ncomp]); 1497 pjdlog_common(LOG_DEBUG, 1, errno, 1498 "keepalive_send: Unable to send request"); 1499 nv_free(nv); 1500 remote_close(res, ncomp); 1501 return; 1502 } 1503 1504 rw_unlock(&hio_remote_lock[ncomp]); 1505 nv_free(nv); 1506 pjdlog_debug(2, "keepalive_send: Request sent."); 1507 } 1508 1509 /* 1510 * Thread sends request to secondary node. 1511 */ 1512 static void * 1513 remote_send_thread(void *arg) 1514 { 1515 struct hast_resource *res = arg; 1516 struct g_gate_ctl_io *ggio; 1517 time_t lastcheck, now; 1518 struct hio *hio; 1519 struct nv *nv; 1520 unsigned int ncomp; 1521 bool wakeup; 1522 uint64_t offset, length; 1523 uint8_t cmd; 1524 void *data; 1525 1526 /* Remote component is 1 for now. */ 1527 ncomp = 1; 1528 lastcheck = time(NULL); 1529 1530 for (;;) { 1531 pjdlog_debug(2, "remote_send: Taking request."); 1532 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1533 if (hio == NULL) { 1534 now = time(NULL); 1535 if (lastcheck + HAST_KEEPALIVE <= now) { 1536 keepalive_send(res, ncomp); 1537 lastcheck = now; 1538 } 1539 continue; 1540 } 1541 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1542 ggio = &hio->hio_ggio; 1543 switch (ggio->gctl_cmd) { 1544 case BIO_READ: 1545 cmd = HIO_READ; 1546 data = NULL; 1547 offset = ggio->gctl_offset; 1548 length = ggio->gctl_length; 1549 break; 1550 case BIO_WRITE: 1551 cmd = HIO_WRITE; 1552 data = ggio->gctl_data; 1553 offset = ggio->gctl_offset; 1554 length = ggio->gctl_length; 1555 break; 1556 case BIO_DELETE: 1557 cmd = HIO_DELETE; 1558 data = NULL; 1559 offset = ggio->gctl_offset; 1560 length = ggio->gctl_length; 1561 break; 1562 case BIO_FLUSH: 1563 cmd = HIO_FLUSH; 1564 data = NULL; 1565 offset = 0; 1566 length = 0; 1567 break; 1568 default: 1569 PJDLOG_ABORT("invalid condition"); 1570 } 1571 nv = nv_alloc(); 1572 nv_add_uint8(nv, cmd, "cmd"); 1573 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1574 nv_add_uint64(nv, offset, "offset"); 1575 nv_add_uint64(nv, length, "length"); 1576 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1577 ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { 1578 nv_add_uint8(nv, 1, "memsync"); 1579 } 1580 if (nv_error(nv) != 0) { 1581 hio->hio_errors[ncomp] = nv_error(nv); 1582 pjdlog_debug(2, 1583 "remote_send: (%p) Unable to prepare header to send.", 1584 hio); 1585 reqlog(LOG_ERR, 0, ggio, 1586 "Unable to prepare header to send (%s): ", 1587 strerror(nv_error(nv))); 1588 /* Move failed request immediately to the done queue. */ 1589 goto done_queue; 1590 } 1591 /* 1592 * Protect connection from disappearing. 1593 */ 1594 rw_rlock(&hio_remote_lock[ncomp]); 1595 if (!ISCONNECTED(res, ncomp)) { 1596 rw_unlock(&hio_remote_lock[ncomp]); 1597 hio->hio_errors[ncomp] = ENOTCONN; 1598 goto done_queue; 1599 } 1600 /* 1601 * Move the request to recv queue before sending it, because 1602 * in different order we can get reply before we move request 1603 * to recv queue. 1604 */ 1605 pjdlog_debug(2, 1606 "remote_send: (%p) Moving request to the recv queue.", 1607 hio); 1608 mtx_lock(&hio_recv_list_lock[ncomp]); 1609 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1610 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1611 mtx_unlock(&hio_recv_list_lock[ncomp]); 1612 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1613 data != NULL ? length : 0) == -1) { 1614 hio->hio_errors[ncomp] = errno; 1615 rw_unlock(&hio_remote_lock[ncomp]); 1616 pjdlog_debug(2, 1617 "remote_send: (%p) Unable to send request.", hio); 1618 reqlog(LOG_ERR, 0, ggio, 1619 "Unable to send request (%s): ", 1620 strerror(hio->hio_errors[ncomp])); 1621 remote_close(res, ncomp); 1622 /* 1623 * Take request back from the receive queue and move 1624 * it immediately to the done queue. 1625 */ 1626 mtx_lock(&hio_recv_list_lock[ncomp]); 1627 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1628 hio_next[ncomp]); 1629 mtx_unlock(&hio_recv_list_lock[ncomp]); 1630 goto done_queue; 1631 } 1632 rw_unlock(&hio_remote_lock[ncomp]); 1633 nv_free(nv); 1634 if (wakeup) 1635 cv_signal(&hio_recv_list_cond[ncomp]); 1636 continue; 1637 done_queue: 1638 nv_free(nv); 1639 if (ISSYNCREQ(hio)) { 1640 if (refcnt_release(&hio->hio_countdown) > 0) 1641 continue; 1642 mtx_lock(&sync_lock); 1643 SYNCREQDONE(hio); 1644 mtx_unlock(&sync_lock); 1645 cv_signal(&sync_cond); 1646 continue; 1647 } 1648 if (ggio->gctl_cmd == BIO_WRITE) { 1649 mtx_lock(&res->hr_amp_lock); 1650 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1651 ggio->gctl_length)) { 1652 (void)hast_activemap_flush(res); 1653 } 1654 mtx_unlock(&res->hr_amp_lock); 1655 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) 1656 (void)refcnt_release(&hio->hio_countdown); 1657 } 1658 if (refcnt_release(&hio->hio_countdown) > 0) 1659 continue; 1660 pjdlog_debug(2, 1661 "remote_send: (%p) Moving request to the done queue.", 1662 hio); 1663 QUEUE_INSERT2(hio, done); 1664 } 1665 /* NOTREACHED */ 1666 return (NULL); 1667 } 1668 1669 /* 1670 * Thread receives answer from secondary node and passes it to ggate_send 1671 * thread. 1672 */ 1673 static void * 1674 remote_recv_thread(void *arg) 1675 { 1676 struct hast_resource *res = arg; 1677 struct g_gate_ctl_io *ggio; 1678 struct hio *hio; 1679 struct nv *nv; 1680 unsigned int ncomp; 1681 uint64_t seq; 1682 bool memsyncack; 1683 int error; 1684 1685 /* Remote component is 1 for now. */ 1686 ncomp = 1; 1687 1688 for (;;) { 1689 /* Wait until there is anything to receive. */ 1690 mtx_lock(&hio_recv_list_lock[ncomp]); 1691 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1692 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1693 cv_wait(&hio_recv_list_cond[ncomp], 1694 &hio_recv_list_lock[ncomp]); 1695 } 1696 mtx_unlock(&hio_recv_list_lock[ncomp]); 1697 1698 memsyncack = false; 1699 1700 rw_rlock(&hio_remote_lock[ncomp]); 1701 if (!ISCONNECTED(res, ncomp)) { 1702 rw_unlock(&hio_remote_lock[ncomp]); 1703 /* 1704 * Connection is dead, so move all pending requests to 1705 * the done queue (one-by-one). 1706 */ 1707 mtx_lock(&hio_recv_list_lock[ncomp]); 1708 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1709 PJDLOG_ASSERT(hio != NULL); 1710 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1711 hio_next[ncomp]); 1712 mtx_unlock(&hio_recv_list_lock[ncomp]); 1713 goto done_queue; 1714 } 1715 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1716 pjdlog_errno(LOG_ERR, 1717 "Unable to receive reply header"); 1718 rw_unlock(&hio_remote_lock[ncomp]); 1719 remote_close(res, ncomp); 1720 continue; 1721 } 1722 rw_unlock(&hio_remote_lock[ncomp]); 1723 seq = nv_get_uint64(nv, "seq"); 1724 if (seq == 0) { 1725 pjdlog_error("Header contains no 'seq' field."); 1726 nv_free(nv); 1727 continue; 1728 } 1729 memsyncack = nv_exists(nv, "received"); 1730 mtx_lock(&hio_recv_list_lock[ncomp]); 1731 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1732 if (hio->hio_ggio.gctl_seq == seq) { 1733 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1734 hio_next[ncomp]); 1735 break; 1736 } 1737 } 1738 mtx_unlock(&hio_recv_list_lock[ncomp]); 1739 if (hio == NULL) { 1740 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1741 (uintmax_t)seq); 1742 nv_free(nv); 1743 continue; 1744 } 1745 ggio = &hio->hio_ggio; 1746 error = nv_get_int16(nv, "error"); 1747 if (error != 0) { 1748 /* Request failed on remote side. */ 1749 hio->hio_errors[ncomp] = error; 1750 reqlog(LOG_WARNING, 0, ggio, 1751 "Remote request failed (%s): ", strerror(error)); 1752 nv_free(nv); 1753 goto done_queue; 1754 } 1755 switch (ggio->gctl_cmd) { 1756 case BIO_READ: 1757 rw_rlock(&hio_remote_lock[ncomp]); 1758 if (!ISCONNECTED(res, ncomp)) { 1759 rw_unlock(&hio_remote_lock[ncomp]); 1760 nv_free(nv); 1761 goto done_queue; 1762 } 1763 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1764 ggio->gctl_data, ggio->gctl_length) == -1) { 1765 hio->hio_errors[ncomp] = errno; 1766 pjdlog_errno(LOG_ERR, 1767 "Unable to receive reply data"); 1768 rw_unlock(&hio_remote_lock[ncomp]); 1769 nv_free(nv); 1770 remote_close(res, ncomp); 1771 goto done_queue; 1772 } 1773 rw_unlock(&hio_remote_lock[ncomp]); 1774 break; 1775 case BIO_WRITE: 1776 case BIO_DELETE: 1777 case BIO_FLUSH: 1778 break; 1779 default: 1780 PJDLOG_ABORT("invalid condition"); 1781 } 1782 hio->hio_errors[ncomp] = 0; 1783 nv_free(nv); 1784 done_queue: 1785 if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1786 hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1787 if (refcnt_release(&hio->hio_countdown) > 0) 1788 continue; 1789 } else { 1790 /* 1791 * Depending on hio_countdown value, requests finished 1792 * in the following order: 1793 * 1794 * 0: local write, remote memsync, remote final 1795 * or 1796 * 0: remote memsync, local write, remote final 1797 * 1798 * 1: local write, remote memsync, (remote final) 1799 * or 1800 * 1: remote memsync, remote final, (local write) 1801 * 1802 * 2: remote memsync, (local write), (remote final) 1803 * or 1804 * 2: remote memsync, (remote final), (local write) 1805 */ 1806 switch (refcnt_release(&hio->hio_countdown)) { 1807 case 0: 1808 /* 1809 * Remote final reply arrived. 1810 */ 1811 PJDLOG_ASSERT(!memsyncack); 1812 break; 1813 case 1: 1814 if (memsyncack) { 1815 /* 1816 * Local request already finished, so we 1817 * can complete the write. 1818 */ 1819 if (hio->hio_errors[0] == 0) 1820 write_complete(res, hio); 1821 /* 1822 * We still need to wait for final 1823 * remote reply. 1824 */ 1825 pjdlog_debug(2, 1826 "remote_recv: (%p) Moving request back to the recv queue.", 1827 hio); 1828 mtx_lock(&hio_recv_list_lock[ncomp]); 1829 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], 1830 hio, hio_next[ncomp]); 1831 mtx_unlock(&hio_recv_list_lock[ncomp]); 1832 } else { 1833 /* 1834 * Remote final reply arrived before 1835 * local write finished. 1836 * Nothing to do in such case. 1837 */ 1838 } 1839 continue; 1840 case 2: 1841 /* 1842 * We received remote memsync reply even before 1843 * local write finished. 1844 */ 1845 PJDLOG_ASSERT(memsyncack); 1846 1847 pjdlog_debug(2, 1848 "remote_recv: (%p) Moving request back to the recv queue.", 1849 hio); 1850 mtx_lock(&hio_recv_list_lock[ncomp]); 1851 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, 1852 hio_next[ncomp]); 1853 mtx_unlock(&hio_recv_list_lock[ncomp]); 1854 continue; 1855 default: 1856 PJDLOG_ABORT("Invalid hio_countdown."); 1857 } 1858 } 1859 if (ISSYNCREQ(hio)) { 1860 mtx_lock(&sync_lock); 1861 SYNCREQDONE(hio); 1862 mtx_unlock(&sync_lock); 1863 cv_signal(&sync_cond); 1864 } else { 1865 pjdlog_debug(2, 1866 "remote_recv: (%p) Moving request to the done queue.", 1867 hio); 1868 QUEUE_INSERT2(hio, done); 1869 } 1870 } 1871 /* NOTREACHED */ 1872 return (NULL); 1873 } 1874 1875 /* 1876 * Thread sends answer to the kernel. 1877 */ 1878 static void * 1879 ggate_send_thread(void *arg) 1880 { 1881 struct hast_resource *res = arg; 1882 struct g_gate_ctl_io *ggio; 1883 struct hio *hio; 1884 unsigned int ii, ncomps; 1885 1886 ncomps = HAST_NCOMPONENTS; 1887 1888 for (;;) { 1889 pjdlog_debug(2, "ggate_send: Taking request."); 1890 QUEUE_TAKE2(hio, done); 1891 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1892 ggio = &hio->hio_ggio; 1893 for (ii = 0; ii < ncomps; ii++) { 1894 if (hio->hio_errors[ii] == 0) { 1895 /* 1896 * One successful request is enough to declare 1897 * success. 1898 */ 1899 ggio->gctl_error = 0; 1900 break; 1901 } 1902 } 1903 if (ii == ncomps) { 1904 /* 1905 * None of the requests were successful. 1906 * Use the error from local component except the 1907 * case when we did only remote request. 1908 */ 1909 if (ggio->gctl_cmd == BIO_READ && 1910 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1911 ggio->gctl_error = hio->hio_errors[1]; 1912 else 1913 ggio->gctl_error = hio->hio_errors[0]; 1914 } 1915 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1916 mtx_lock(&res->hr_amp_lock); 1917 if (activemap_write_complete(res->hr_amp, 1918 ggio->gctl_offset, ggio->gctl_length)) { 1919 res->hr_stat_activemap_update++; 1920 (void)hast_activemap_flush(res); 1921 } 1922 mtx_unlock(&res->hr_amp_lock); 1923 } 1924 if (ggio->gctl_cmd == BIO_WRITE) { 1925 /* 1926 * Unlock range we locked. 1927 */ 1928 mtx_lock(&range_lock); 1929 rangelock_del(range_regular, ggio->gctl_offset, 1930 ggio->gctl_length); 1931 if (range_sync_wait) 1932 cv_signal(&range_sync_cond); 1933 mtx_unlock(&range_lock); 1934 if (!hio->hio_done) 1935 write_complete(res, hio); 1936 } else { 1937 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1938 primary_exit(EX_OSERR, 1939 "G_GATE_CMD_DONE failed"); 1940 } 1941 } 1942 if (hio->hio_errors[0]) { 1943 switch (ggio->gctl_cmd) { 1944 case BIO_READ: 1945 res->hr_stat_read_error++; 1946 break; 1947 case BIO_WRITE: 1948 res->hr_stat_write_error++; 1949 break; 1950 case BIO_DELETE: 1951 res->hr_stat_delete_error++; 1952 break; 1953 case BIO_FLUSH: 1954 res->hr_stat_flush_error++; 1955 break; 1956 } 1957 } 1958 pjdlog_debug(2, 1959 "ggate_send: (%p) Moving request to the free queue.", hio); 1960 QUEUE_INSERT2(hio, free); 1961 } 1962 /* NOTREACHED */ 1963 return (NULL); 1964 } 1965 1966 /* 1967 * Thread synchronize local and remote components. 1968 */ 1969 static void * 1970 sync_thread(void *arg __unused) 1971 { 1972 struct hast_resource *res = arg; 1973 struct hio *hio; 1974 struct g_gate_ctl_io *ggio; 1975 struct timeval tstart, tend, tdiff; 1976 unsigned int ii, ncomp, ncomps; 1977 off_t offset, length, synced; 1978 bool dorewind, directreads; 1979 int syncext; 1980 1981 ncomps = HAST_NCOMPONENTS; 1982 dorewind = true; 1983 synced = 0; 1984 offset = -1; 1985 directreads = false; 1986 1987 for (;;) { 1988 mtx_lock(&sync_lock); 1989 if (offset >= 0 && !sync_inprogress) { 1990 gettimeofday(&tend, NULL); 1991 timersub(&tend, &tstart, &tdiff); 1992 pjdlog_info("Synchronization interrupted after %#.0T. " 1993 "%NB synchronized so far.", &tdiff, 1994 (intmax_t)synced); 1995 event_send(res, EVENT_SYNCINTR); 1996 } 1997 while (!sync_inprogress) { 1998 dorewind = true; 1999 synced = 0; 2000 cv_wait(&sync_cond, &sync_lock); 2001 } 2002 mtx_unlock(&sync_lock); 2003 /* 2004 * Obtain offset at which we should synchronize. 2005 * Rewind synchronization if needed. 2006 */ 2007 mtx_lock(&res->hr_amp_lock); 2008 if (dorewind) 2009 activemap_sync_rewind(res->hr_amp); 2010 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 2011 if (syncext != -1) { 2012 /* 2013 * We synchronized entire syncext extent, we can mark 2014 * it as clean now. 2015 */ 2016 if (activemap_extent_complete(res->hr_amp, syncext)) 2017 (void)hast_activemap_flush(res); 2018 } 2019 mtx_unlock(&res->hr_amp_lock); 2020 if (dorewind) { 2021 dorewind = false; 2022 if (offset == -1) 2023 pjdlog_info("Nodes are in sync."); 2024 else { 2025 pjdlog_info("Synchronization started. %NB to go.", 2026 (intmax_t)(res->hr_extentsize * 2027 activemap_ndirty(res->hr_amp))); 2028 event_send(res, EVENT_SYNCSTART); 2029 gettimeofday(&tstart, NULL); 2030 } 2031 } 2032 if (offset == -1) { 2033 sync_stop(); 2034 pjdlog_debug(1, "Nothing to synchronize."); 2035 /* 2036 * Synchronization complete, make both localcnt and 2037 * remotecnt equal. 2038 */ 2039 ncomp = 1; 2040 rw_rlock(&hio_remote_lock[ncomp]); 2041 if (ISCONNECTED(res, ncomp)) { 2042 if (synced > 0) { 2043 int64_t bps; 2044 2045 gettimeofday(&tend, NULL); 2046 timersub(&tend, &tstart, &tdiff); 2047 bps = (int64_t)((double)synced / 2048 ((double)tdiff.tv_sec + 2049 (double)tdiff.tv_usec / 1000000)); 2050 pjdlog_info("Synchronization complete. " 2051 "%NB synchronized in %#.0lT (%NB/sec).", 2052 (intmax_t)synced, &tdiff, 2053 (intmax_t)bps); 2054 event_send(res, EVENT_SYNCDONE); 2055 } 2056 mtx_lock(&metadata_lock); 2057 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 2058 directreads = true; 2059 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 2060 res->hr_primary_localcnt = 2061 res->hr_secondary_remotecnt; 2062 res->hr_primary_remotecnt = 2063 res->hr_secondary_localcnt; 2064 pjdlog_debug(1, 2065 "Setting localcnt to %ju and remotecnt to %ju.", 2066 (uintmax_t)res->hr_primary_localcnt, 2067 (uintmax_t)res->hr_primary_remotecnt); 2068 (void)metadata_write(res); 2069 mtx_unlock(&metadata_lock); 2070 } 2071 rw_unlock(&hio_remote_lock[ncomp]); 2072 if (directreads) { 2073 directreads = false; 2074 enable_direct_reads(res); 2075 } 2076 continue; 2077 } 2078 pjdlog_debug(2, "sync: Taking free request."); 2079 QUEUE_TAKE2(hio, free); 2080 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 2081 /* 2082 * Lock the range we are going to synchronize. We don't want 2083 * race where someone writes between our read and write. 2084 */ 2085 for (;;) { 2086 mtx_lock(&range_lock); 2087 if (rangelock_islocked(range_regular, offset, length)) { 2088 pjdlog_debug(2, 2089 "sync: Range offset=%jd length=%jd locked.", 2090 (intmax_t)offset, (intmax_t)length); 2091 range_sync_wait = true; 2092 cv_wait(&range_sync_cond, &range_lock); 2093 range_sync_wait = false; 2094 mtx_unlock(&range_lock); 2095 continue; 2096 } 2097 if (rangelock_add(range_sync, offset, length) == -1) { 2098 mtx_unlock(&range_lock); 2099 pjdlog_debug(2, 2100 "sync: Range offset=%jd length=%jd is already locked, waiting.", 2101 (intmax_t)offset, (intmax_t)length); 2102 sleep(1); 2103 continue; 2104 } 2105 mtx_unlock(&range_lock); 2106 break; 2107 } 2108 /* 2109 * First read the data from synchronization source. 2110 */ 2111 SYNCREQ(hio); 2112 ggio = &hio->hio_ggio; 2113 ggio->gctl_cmd = BIO_READ; 2114 ggio->gctl_offset = offset; 2115 ggio->gctl_length = length; 2116 ggio->gctl_error = 0; 2117 hio->hio_done = false; 2118 hio->hio_replication = res->hr_replication; 2119 for (ii = 0; ii < ncomps; ii++) 2120 hio->hio_errors[ii] = EINVAL; 2121 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2122 hio); 2123 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2124 hio); 2125 mtx_lock(&metadata_lock); 2126 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2127 /* 2128 * This range is up-to-date on local component, 2129 * so handle request locally. 2130 */ 2131 /* Local component is 0 for now. */ 2132 ncomp = 0; 2133 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2134 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2135 /* 2136 * This range is out-of-date on local component, 2137 * so send request to the remote node. 2138 */ 2139 /* Remote component is 1 for now. */ 2140 ncomp = 1; 2141 } 2142 mtx_unlock(&metadata_lock); 2143 refcnt_init(&hio->hio_countdown, 1); 2144 QUEUE_INSERT1(hio, send, ncomp); 2145 2146 /* 2147 * Let's wait for READ to finish. 2148 */ 2149 mtx_lock(&sync_lock); 2150 while (!ISSYNCREQDONE(hio)) 2151 cv_wait(&sync_cond, &sync_lock); 2152 mtx_unlock(&sync_lock); 2153 2154 if (hio->hio_errors[ncomp] != 0) { 2155 pjdlog_error("Unable to read synchronization data: %s.", 2156 strerror(hio->hio_errors[ncomp])); 2157 goto free_queue; 2158 } 2159 2160 /* 2161 * We read the data from synchronization source, now write it 2162 * to synchronization target. 2163 */ 2164 SYNCREQ(hio); 2165 ggio->gctl_cmd = BIO_WRITE; 2166 for (ii = 0; ii < ncomps; ii++) 2167 hio->hio_errors[ii] = EINVAL; 2168 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2169 hio); 2170 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2171 hio); 2172 mtx_lock(&metadata_lock); 2173 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2174 /* 2175 * This range is up-to-date on local component, 2176 * so we update remote component. 2177 */ 2178 /* Remote component is 1 for now. */ 2179 ncomp = 1; 2180 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2181 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2182 /* 2183 * This range is out-of-date on local component, 2184 * so we update it. 2185 */ 2186 /* Local component is 0 for now. */ 2187 ncomp = 0; 2188 } 2189 mtx_unlock(&metadata_lock); 2190 2191 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2192 hio); 2193 refcnt_init(&hio->hio_countdown, 1); 2194 QUEUE_INSERT1(hio, send, ncomp); 2195 2196 /* 2197 * Let's wait for WRITE to finish. 2198 */ 2199 mtx_lock(&sync_lock); 2200 while (!ISSYNCREQDONE(hio)) 2201 cv_wait(&sync_cond, &sync_lock); 2202 mtx_unlock(&sync_lock); 2203 2204 if (hio->hio_errors[ncomp] != 0) { 2205 pjdlog_error("Unable to write synchronization data: %s.", 2206 strerror(hio->hio_errors[ncomp])); 2207 goto free_queue; 2208 } 2209 2210 synced += length; 2211 free_queue: 2212 mtx_lock(&range_lock); 2213 rangelock_del(range_sync, offset, length); 2214 if (range_regular_wait) 2215 cv_signal(&range_regular_cond); 2216 mtx_unlock(&range_lock); 2217 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2218 hio); 2219 QUEUE_INSERT2(hio, free); 2220 } 2221 /* NOTREACHED */ 2222 return (NULL); 2223 } 2224 2225 void 2226 primary_config_reload(struct hast_resource *res, struct nv *nv) 2227 { 2228 unsigned int ii, ncomps; 2229 int modified, vint; 2230 const char *vstr; 2231 2232 pjdlog_info("Reloading configuration..."); 2233 2234 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2235 PJDLOG_ASSERT(gres == res); 2236 nv_assert(nv, "remoteaddr"); 2237 nv_assert(nv, "sourceaddr"); 2238 nv_assert(nv, "replication"); 2239 nv_assert(nv, "checksum"); 2240 nv_assert(nv, "compression"); 2241 nv_assert(nv, "timeout"); 2242 nv_assert(nv, "exec"); 2243 nv_assert(nv, "metaflush"); 2244 2245 ncomps = HAST_NCOMPONENTS; 2246 2247 #define MODIFIED_REMOTEADDR 0x01 2248 #define MODIFIED_SOURCEADDR 0x02 2249 #define MODIFIED_REPLICATION 0x04 2250 #define MODIFIED_CHECKSUM 0x08 2251 #define MODIFIED_COMPRESSION 0x10 2252 #define MODIFIED_TIMEOUT 0x20 2253 #define MODIFIED_EXEC 0x40 2254 #define MODIFIED_METAFLUSH 0x80 2255 modified = 0; 2256 2257 vstr = nv_get_string(nv, "remoteaddr"); 2258 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2259 /* 2260 * Don't copy res->hr_remoteaddr to gres just yet. 2261 * We want remote_close() to log disconnect from the old 2262 * addresses, not from the new ones. 2263 */ 2264 modified |= MODIFIED_REMOTEADDR; 2265 } 2266 vstr = nv_get_string(nv, "sourceaddr"); 2267 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2268 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2269 modified |= MODIFIED_SOURCEADDR; 2270 } 2271 vint = nv_get_int32(nv, "replication"); 2272 if (gres->hr_replication != vint) { 2273 gres->hr_replication = vint; 2274 modified |= MODIFIED_REPLICATION; 2275 } 2276 vint = nv_get_int32(nv, "checksum"); 2277 if (gres->hr_checksum != vint) { 2278 gres->hr_checksum = vint; 2279 modified |= MODIFIED_CHECKSUM; 2280 } 2281 vint = nv_get_int32(nv, "compression"); 2282 if (gres->hr_compression != vint) { 2283 gres->hr_compression = vint; 2284 modified |= MODIFIED_COMPRESSION; 2285 } 2286 vint = nv_get_int32(nv, "timeout"); 2287 if (gres->hr_timeout != vint) { 2288 gres->hr_timeout = vint; 2289 modified |= MODIFIED_TIMEOUT; 2290 } 2291 vstr = nv_get_string(nv, "exec"); 2292 if (strcmp(gres->hr_exec, vstr) != 0) { 2293 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2294 modified |= MODIFIED_EXEC; 2295 } 2296 vint = nv_get_int32(nv, "metaflush"); 2297 if (gres->hr_metaflush != vint) { 2298 gres->hr_metaflush = vint; 2299 modified |= MODIFIED_METAFLUSH; 2300 } 2301 2302 /* 2303 * Change timeout for connected sockets. 2304 * Don't bother if we need to reconnect. 2305 */ 2306 if ((modified & MODIFIED_TIMEOUT) != 0 && 2307 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2308 for (ii = 0; ii < ncomps; ii++) { 2309 if (!ISREMOTE(ii)) 2310 continue; 2311 rw_rlock(&hio_remote_lock[ii]); 2312 if (!ISCONNECTED(gres, ii)) { 2313 rw_unlock(&hio_remote_lock[ii]); 2314 continue; 2315 } 2316 rw_unlock(&hio_remote_lock[ii]); 2317 if (proto_timeout(gres->hr_remotein, 2318 gres->hr_timeout) == -1) { 2319 pjdlog_errno(LOG_WARNING, 2320 "Unable to set connection timeout"); 2321 } 2322 if (proto_timeout(gres->hr_remoteout, 2323 gres->hr_timeout) == -1) { 2324 pjdlog_errno(LOG_WARNING, 2325 "Unable to set connection timeout"); 2326 } 2327 } 2328 } 2329 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2330 for (ii = 0; ii < ncomps; ii++) { 2331 if (!ISREMOTE(ii)) 2332 continue; 2333 remote_close(gres, ii); 2334 } 2335 if (modified & MODIFIED_REMOTEADDR) { 2336 vstr = nv_get_string(nv, "remoteaddr"); 2337 strlcpy(gres->hr_remoteaddr, vstr, 2338 sizeof(gres->hr_remoteaddr)); 2339 } 2340 } 2341 #undef MODIFIED_REMOTEADDR 2342 #undef MODIFIED_SOURCEADDR 2343 #undef MODIFIED_REPLICATION 2344 #undef MODIFIED_CHECKSUM 2345 #undef MODIFIED_COMPRESSION 2346 #undef MODIFIED_TIMEOUT 2347 #undef MODIFIED_EXEC 2348 #undef MODIFIED_METAFLUSH 2349 2350 pjdlog_info("Configuration reloaded successfully."); 2351 } 2352 2353 static void 2354 guard_one(struct hast_resource *res, unsigned int ncomp) 2355 { 2356 struct proto_conn *in, *out; 2357 2358 if (!ISREMOTE(ncomp)) 2359 return; 2360 2361 rw_rlock(&hio_remote_lock[ncomp]); 2362 2363 if (!real_remote(res)) { 2364 rw_unlock(&hio_remote_lock[ncomp]); 2365 return; 2366 } 2367 2368 if (ISCONNECTED(res, ncomp)) { 2369 PJDLOG_ASSERT(res->hr_remotein != NULL); 2370 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2371 rw_unlock(&hio_remote_lock[ncomp]); 2372 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2373 res->hr_remoteaddr); 2374 return; 2375 } 2376 2377 PJDLOG_ASSERT(res->hr_remotein == NULL); 2378 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2379 /* 2380 * Upgrade the lock. It doesn't have to be atomic as no other thread 2381 * can change connection status from disconnected to connected. 2382 */ 2383 rw_unlock(&hio_remote_lock[ncomp]); 2384 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2385 res->hr_remoteaddr); 2386 in = out = NULL; 2387 if (init_remote(res, &in, &out) == 0) { 2388 rw_wlock(&hio_remote_lock[ncomp]); 2389 PJDLOG_ASSERT(res->hr_remotein == NULL); 2390 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2391 PJDLOG_ASSERT(in != NULL && out != NULL); 2392 res->hr_remotein = in; 2393 res->hr_remoteout = out; 2394 rw_unlock(&hio_remote_lock[ncomp]); 2395 pjdlog_info("Successfully reconnected to %s.", 2396 res->hr_remoteaddr); 2397 sync_start(); 2398 } else { 2399 /* Both connections should be NULL. */ 2400 PJDLOG_ASSERT(res->hr_remotein == NULL); 2401 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2402 PJDLOG_ASSERT(in == NULL && out == NULL); 2403 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2404 res->hr_remoteaddr); 2405 } 2406 } 2407 2408 /* 2409 * Thread guards remote connections and reconnects when needed, handles 2410 * signals, etc. 2411 */ 2412 static void * 2413 guard_thread(void *arg) 2414 { 2415 struct hast_resource *res = arg; 2416 unsigned int ii, ncomps; 2417 struct timespec timeout; 2418 time_t lastcheck, now; 2419 sigset_t mask; 2420 int signo; 2421 2422 ncomps = HAST_NCOMPONENTS; 2423 lastcheck = time(NULL); 2424 2425 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2426 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2427 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2428 2429 timeout.tv_sec = HAST_KEEPALIVE; 2430 timeout.tv_nsec = 0; 2431 signo = -1; 2432 2433 for (;;) { 2434 switch (signo) { 2435 case SIGINT: 2436 case SIGTERM: 2437 sigexit_received = true; 2438 primary_exitx(EX_OK, 2439 "Termination signal received, exiting."); 2440 break; 2441 default: 2442 break; 2443 } 2444 2445 /* 2446 * Don't check connections until we fully started, 2447 * as we may still be looping, waiting for remote node 2448 * to switch from primary to secondary. 2449 */ 2450 if (fullystarted) { 2451 pjdlog_debug(2, "remote_guard: Checking connections."); 2452 now = time(NULL); 2453 if (lastcheck + HAST_KEEPALIVE <= now) { 2454 for (ii = 0; ii < ncomps; ii++) 2455 guard_one(res, ii); 2456 lastcheck = now; 2457 } 2458 } 2459 signo = sigtimedwait(&mask, NULL, &timeout); 2460 } 2461 /* NOTREACHED */ 2462 return (NULL); 2463 } 2464