1 /*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/types.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/stat.h> 39 40 #include <geom/gate/g_gate.h> 41 42 #include <err.h> 43 #include <errno.h> 44 #include <fcntl.h> 45 #include <libgeom.h> 46 #include <pthread.h> 47 #include <signal.h> 48 #include <stdint.h> 49 #include <stdio.h> 50 #include <string.h> 51 #include <sysexits.h> 52 #include <unistd.h> 53 54 #include <activemap.h> 55 #include <nv.h> 56 #include <rangelock.h> 57 58 #include "control.h" 59 #include "event.h" 60 #include "hast.h" 61 #include "hast_proto.h" 62 #include "hastd.h" 63 #include "hooks.h" 64 #include "metadata.h" 65 #include "proto.h" 66 #include "pjdlog.h" 67 #include "refcnt.h" 68 #include "subr.h" 69 #include "synch.h" 70 71 /* The is only one remote component for now. */ 72 #define ISREMOTE(no) ((no) == 1) 73 74 struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 refcnt_t hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 /* 93 * Request was already confirmed to GEOM Gate. 94 */ 95 bool hio_done; 96 /* 97 * Remember replication from the time the request was initiated, 98 * so we won't get confused when replication changes on reload. 99 */ 100 int hio_replication; 101 TAILQ_ENTRY(hio) *hio_next; 102 }; 103 #define hio_free_next hio_next[0] 104 #define hio_done_next hio_next[0] 105 106 /* 107 * Free list holds unused structures. When free list is empty, we have to wait 108 * until some in-progress requests are freed. 109 */ 110 static TAILQ_HEAD(, hio) hio_free_list; 111 static pthread_mutex_t hio_free_list_lock; 112 static pthread_cond_t hio_free_list_cond; 113 /* 114 * There is one send list for every component. One requests is placed on all 115 * send lists - each component gets the same request, but each component is 116 * responsible for managing his own send list. 117 */ 118 static TAILQ_HEAD(, hio) *hio_send_list; 119 static pthread_mutex_t *hio_send_list_lock; 120 static pthread_cond_t *hio_send_list_cond; 121 /* 122 * There is one recv list for every component, although local components don't 123 * use recv lists as local requests are done synchronously. 124 */ 125 static TAILQ_HEAD(, hio) *hio_recv_list; 126 static pthread_mutex_t *hio_recv_list_lock; 127 static pthread_cond_t *hio_recv_list_cond; 128 /* 129 * Request is placed on done list by the slowest component (the one that 130 * decreased hio_countdown from 1 to 0). 131 */ 132 static TAILQ_HEAD(, hio) hio_done_list; 133 static pthread_mutex_t hio_done_list_lock; 134 static pthread_cond_t hio_done_list_cond; 135 /* 136 * Structure below are for interaction with sync thread. 137 */ 138 static bool sync_inprogress; 139 static pthread_mutex_t sync_lock; 140 static pthread_cond_t sync_cond; 141 /* 142 * The lock below allows to synchornize access to remote connections. 143 */ 144 static pthread_rwlock_t *hio_remote_lock; 145 146 /* 147 * Lock to synchronize metadata updates. Also synchronize access to 148 * hr_primary_localcnt and hr_primary_remotecnt fields. 149 */ 150 static pthread_mutex_t metadata_lock; 151 152 /* 153 * Maximum number of outstanding I/O requests. 154 */ 155 #define HAST_HIO_MAX 256 156 /* 157 * Number of components. At this point there are only two components: local 158 * and remote, but in the future it might be possible to use multiple local 159 * and remote components. 160 */ 161 #define HAST_NCOMPONENTS 2 162 163 #define ISCONNECTED(res, no) \ 164 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 165 166 #define QUEUE_INSERT1(hio, name, ncomp) do { \ 167 bool _wakeup; \ 168 \ 169 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 170 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 171 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 172 hio_next[(ncomp)]); \ 173 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 174 if (_wakeup) \ 175 cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \ 176 } while (0) 177 #define QUEUE_INSERT2(hio, name) do { \ 178 bool _wakeup; \ 179 \ 180 mtx_lock(&hio_##name##_list_lock); \ 181 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 182 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 183 mtx_unlock(&hio_##name##_list_lock); \ 184 if (_wakeup) \ 185 cv_broadcast(&hio_##name##_list_cond); \ 186 } while (0) 187 #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 188 bool _last; \ 189 \ 190 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 191 _last = false; \ 192 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 193 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 194 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 195 if ((timeout) != 0) \ 196 _last = true; \ 197 } \ 198 if (hio != NULL) { \ 199 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 200 hio_next[(ncomp)]); \ 201 } \ 202 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 203 } while (0) 204 #define QUEUE_TAKE2(hio, name) do { \ 205 mtx_lock(&hio_##name##_list_lock); \ 206 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 207 cv_wait(&hio_##name##_list_cond, \ 208 &hio_##name##_list_lock); \ 209 } \ 210 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 211 mtx_unlock(&hio_##name##_list_lock); \ 212 } while (0) 213 214 #define SYNCREQ(hio) do { \ 215 (hio)->hio_ggio.gctl_unit = -1; \ 216 (hio)->hio_ggio.gctl_seq = 1; \ 217 } while (0) 218 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 219 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 220 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 221 222 static struct hast_resource *gres; 223 224 static pthread_mutex_t range_lock; 225 static struct rangelocks *range_regular; 226 static bool range_regular_wait; 227 static pthread_cond_t range_regular_cond; 228 static struct rangelocks *range_sync; 229 static bool range_sync_wait; 230 static pthread_cond_t range_sync_cond; 231 static bool fullystarted; 232 233 static void *ggate_recv_thread(void *arg); 234 static void *local_send_thread(void *arg); 235 static void *remote_send_thread(void *arg); 236 static void *remote_recv_thread(void *arg); 237 static void *ggate_send_thread(void *arg); 238 static void *sync_thread(void *arg); 239 static void *guard_thread(void *arg); 240 241 static void 242 cleanup(struct hast_resource *res) 243 { 244 int rerrno; 245 246 /* Remember errno. */ 247 rerrno = errno; 248 249 /* Destroy ggate provider if we created one. */ 250 if (res->hr_ggateunit >= 0) { 251 struct g_gate_ctl_destroy ggiod; 252 253 bzero(&ggiod, sizeof(ggiod)); 254 ggiod.gctl_version = G_GATE_VERSION; 255 ggiod.gctl_unit = res->hr_ggateunit; 256 ggiod.gctl_force = 1; 257 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 258 pjdlog_errno(LOG_WARNING, 259 "Unable to destroy hast/%s device", 260 res->hr_provname); 261 } 262 res->hr_ggateunit = -1; 263 } 264 265 /* Restore errno. */ 266 errno = rerrno; 267 } 268 269 static __dead2 void 270 primary_exit(int exitcode, const char *fmt, ...) 271 { 272 va_list ap; 273 274 PJDLOG_ASSERT(exitcode != EX_OK); 275 va_start(ap, fmt); 276 pjdlogv_errno(LOG_ERR, fmt, ap); 277 va_end(ap); 278 cleanup(gres); 279 exit(exitcode); 280 } 281 282 static __dead2 void 283 primary_exitx(int exitcode, const char *fmt, ...) 284 { 285 va_list ap; 286 287 va_start(ap, fmt); 288 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 289 va_end(ap); 290 cleanup(gres); 291 exit(exitcode); 292 } 293 294 /* Expects res->hr_amp locked, returns unlocked. */ 295 static int 296 hast_activemap_flush(struct hast_resource *res) 297 { 298 const unsigned char *buf; 299 size_t size; 300 int ret; 301 302 mtx_lock(&res->hr_amp_diskmap_lock); 303 buf = activemap_bitmap(res->hr_amp, &size); 304 mtx_unlock(&res->hr_amp_lock); 305 PJDLOG_ASSERT(buf != NULL); 306 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 307 ret = 0; 308 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 309 (ssize_t)size) { 310 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 311 res->hr_stat_activemap_write_error++; 312 ret = -1; 313 } 314 if (ret == 0 && res->hr_metaflush == 1 && 315 g_flush(res->hr_localfd) == -1) { 316 if (errno == EOPNOTSUPP) { 317 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 318 res->hr_localpath); 319 res->hr_metaflush = 0; 320 } else { 321 pjdlog_errno(LOG_ERR, 322 "Unable to flush disk cache on activemap update"); 323 res->hr_stat_activemap_flush_error++; 324 ret = -1; 325 } 326 } 327 mtx_unlock(&res->hr_amp_diskmap_lock); 328 return (ret); 329 } 330 331 static bool 332 real_remote(const struct hast_resource *res) 333 { 334 335 return (strcmp(res->hr_remoteaddr, "none") != 0); 336 } 337 338 static void 339 init_environment(struct hast_resource *res __unused) 340 { 341 struct hio *hio; 342 unsigned int ii, ncomps; 343 344 /* 345 * In the future it might be per-resource value. 346 */ 347 ncomps = HAST_NCOMPONENTS; 348 349 /* 350 * Allocate memory needed by lists. 351 */ 352 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 353 if (hio_send_list == NULL) { 354 primary_exitx(EX_TEMPFAIL, 355 "Unable to allocate %zu bytes of memory for send lists.", 356 sizeof(hio_send_list[0]) * ncomps); 357 } 358 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 359 if (hio_send_list_lock == NULL) { 360 primary_exitx(EX_TEMPFAIL, 361 "Unable to allocate %zu bytes of memory for send list locks.", 362 sizeof(hio_send_list_lock[0]) * ncomps); 363 } 364 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 365 if (hio_send_list_cond == NULL) { 366 primary_exitx(EX_TEMPFAIL, 367 "Unable to allocate %zu bytes of memory for send list condition variables.", 368 sizeof(hio_send_list_cond[0]) * ncomps); 369 } 370 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 371 if (hio_recv_list == NULL) { 372 primary_exitx(EX_TEMPFAIL, 373 "Unable to allocate %zu bytes of memory for recv lists.", 374 sizeof(hio_recv_list[0]) * ncomps); 375 } 376 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 377 if (hio_recv_list_lock == NULL) { 378 primary_exitx(EX_TEMPFAIL, 379 "Unable to allocate %zu bytes of memory for recv list locks.", 380 sizeof(hio_recv_list_lock[0]) * ncomps); 381 } 382 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 383 if (hio_recv_list_cond == NULL) { 384 primary_exitx(EX_TEMPFAIL, 385 "Unable to allocate %zu bytes of memory for recv list condition variables.", 386 sizeof(hio_recv_list_cond[0]) * ncomps); 387 } 388 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 389 if (hio_remote_lock == NULL) { 390 primary_exitx(EX_TEMPFAIL, 391 "Unable to allocate %zu bytes of memory for remote connections locks.", 392 sizeof(hio_remote_lock[0]) * ncomps); 393 } 394 395 /* 396 * Initialize lists, their locks and theirs condition variables. 397 */ 398 TAILQ_INIT(&hio_free_list); 399 mtx_init(&hio_free_list_lock); 400 cv_init(&hio_free_list_cond); 401 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 402 TAILQ_INIT(&hio_send_list[ii]); 403 mtx_init(&hio_send_list_lock[ii]); 404 cv_init(&hio_send_list_cond[ii]); 405 TAILQ_INIT(&hio_recv_list[ii]); 406 mtx_init(&hio_recv_list_lock[ii]); 407 cv_init(&hio_recv_list_cond[ii]); 408 rw_init(&hio_remote_lock[ii]); 409 } 410 TAILQ_INIT(&hio_done_list); 411 mtx_init(&hio_done_list_lock); 412 cv_init(&hio_done_list_cond); 413 mtx_init(&metadata_lock); 414 415 /* 416 * Allocate requests pool and initialize requests. 417 */ 418 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 419 hio = malloc(sizeof(*hio)); 420 if (hio == NULL) { 421 primary_exitx(EX_TEMPFAIL, 422 "Unable to allocate %zu bytes of memory for hio request.", 423 sizeof(*hio)); 424 } 425 refcnt_init(&hio->hio_countdown, 0); 426 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 427 if (hio->hio_errors == NULL) { 428 primary_exitx(EX_TEMPFAIL, 429 "Unable allocate %zu bytes of memory for hio errors.", 430 sizeof(hio->hio_errors[0]) * ncomps); 431 } 432 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 433 if (hio->hio_next == NULL) { 434 primary_exitx(EX_TEMPFAIL, 435 "Unable allocate %zu bytes of memory for hio_next field.", 436 sizeof(hio->hio_next[0]) * ncomps); 437 } 438 hio->hio_ggio.gctl_version = G_GATE_VERSION; 439 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 440 if (hio->hio_ggio.gctl_data == NULL) { 441 primary_exitx(EX_TEMPFAIL, 442 "Unable to allocate %zu bytes of memory for gctl_data.", 443 MAXPHYS); 444 } 445 hio->hio_ggio.gctl_length = MAXPHYS; 446 hio->hio_ggio.gctl_error = 0; 447 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 448 } 449 } 450 451 static bool 452 init_resuid(struct hast_resource *res) 453 { 454 455 mtx_lock(&metadata_lock); 456 if (res->hr_resuid != 0) { 457 mtx_unlock(&metadata_lock); 458 return (false); 459 } else { 460 /* Initialize unique resource identifier. */ 461 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 462 mtx_unlock(&metadata_lock); 463 if (metadata_write(res) == -1) 464 exit(EX_NOINPUT); 465 return (true); 466 } 467 } 468 469 static void 470 init_local(struct hast_resource *res) 471 { 472 unsigned char *buf; 473 size_t mapsize; 474 475 if (metadata_read(res, true) == -1) 476 exit(EX_NOINPUT); 477 mtx_init(&res->hr_amp_lock); 478 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 479 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 480 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 481 } 482 mtx_init(&range_lock); 483 cv_init(&range_regular_cond); 484 if (rangelock_init(&range_regular) == -1) 485 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 486 cv_init(&range_sync_cond); 487 if (rangelock_init(&range_sync) == -1) 488 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 489 mapsize = activemap_ondisk_size(res->hr_amp); 490 buf = calloc(1, mapsize); 491 if (buf == NULL) { 492 primary_exitx(EX_TEMPFAIL, 493 "Unable to allocate buffer for activemap."); 494 } 495 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 496 (ssize_t)mapsize) { 497 primary_exit(EX_NOINPUT, "Unable to read activemap"); 498 } 499 activemap_copyin(res->hr_amp, buf, mapsize); 500 free(buf); 501 if (res->hr_resuid != 0) 502 return; 503 /* 504 * We're using provider for the first time. Initialize local and remote 505 * counters. We don't initialize resuid here, as we want to do it just 506 * in time. The reason for this is that we want to inform secondary 507 * that there were no writes yet, so there is no need to synchronize 508 * anything. 509 */ 510 res->hr_primary_localcnt = 0; 511 res->hr_primary_remotecnt = 0; 512 if (metadata_write(res) == -1) 513 exit(EX_NOINPUT); 514 } 515 516 static int 517 primary_connect(struct hast_resource *res, struct proto_conn **connp) 518 { 519 struct proto_conn *conn; 520 int16_t val; 521 522 val = 1; 523 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 524 primary_exit(EX_TEMPFAIL, 525 "Unable to send connection request to parent"); 526 } 527 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 528 primary_exit(EX_TEMPFAIL, 529 "Unable to receive reply to connection request from parent"); 530 } 531 if (val != 0) { 532 errno = val; 533 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 534 res->hr_remoteaddr); 535 return (-1); 536 } 537 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 538 primary_exit(EX_TEMPFAIL, 539 "Unable to receive connection from parent"); 540 } 541 if (proto_connect_wait(conn, res->hr_timeout) == -1) { 542 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 543 res->hr_remoteaddr); 544 proto_close(conn); 545 return (-1); 546 } 547 /* Error in setting timeout is not critical, but why should it fail? */ 548 if (proto_timeout(conn, res->hr_timeout) == -1) 549 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 550 551 *connp = conn; 552 553 return (0); 554 } 555 556 /* 557 * Function instructs GEOM_GATE to handle reads directly from within the kernel. 558 */ 559 static void 560 enable_direct_reads(struct hast_resource *res) 561 { 562 struct g_gate_ctl_modify ggiomodify; 563 564 bzero(&ggiomodify, sizeof(ggiomodify)); 565 ggiomodify.gctl_version = G_GATE_VERSION; 566 ggiomodify.gctl_unit = res->hr_ggateunit; 567 ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; 568 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, 569 sizeof(ggiomodify.gctl_readprov)); 570 ggiomodify.gctl_readoffset = res->hr_localoff; 571 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) 572 pjdlog_debug(1, "Direct reads enabled."); 573 else 574 pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); 575 } 576 577 static int 578 init_remote(struct hast_resource *res, struct proto_conn **inp, 579 struct proto_conn **outp) 580 { 581 struct proto_conn *in, *out; 582 struct nv *nvout, *nvin; 583 const unsigned char *token; 584 unsigned char *map; 585 const char *errmsg; 586 int32_t extentsize; 587 int64_t datasize; 588 uint32_t mapsize; 589 uint8_t version; 590 size_t size; 591 int error; 592 593 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 594 PJDLOG_ASSERT(real_remote(res)); 595 596 in = out = NULL; 597 errmsg = NULL; 598 599 if (primary_connect(res, &out) == -1) 600 return (ECONNREFUSED); 601 602 error = ECONNABORTED; 603 604 /* 605 * First handshake step. 606 * Setup outgoing connection with remote node. 607 */ 608 nvout = nv_alloc(); 609 nv_add_string(nvout, res->hr_name, "resource"); 610 nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); 611 if (nv_error(nvout) != 0) { 612 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 613 "Unable to allocate header for connection with %s", 614 res->hr_remoteaddr); 615 nv_free(nvout); 616 goto close; 617 } 618 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 619 pjdlog_errno(LOG_WARNING, 620 "Unable to send handshake header to %s", 621 res->hr_remoteaddr); 622 nv_free(nvout); 623 goto close; 624 } 625 nv_free(nvout); 626 if (hast_proto_recv_hdr(out, &nvin) == -1) { 627 pjdlog_errno(LOG_WARNING, 628 "Unable to receive handshake header from %s", 629 res->hr_remoteaddr); 630 goto close; 631 } 632 errmsg = nv_get_string(nvin, "errmsg"); 633 if (errmsg != NULL) { 634 pjdlog_warning("%s", errmsg); 635 if (nv_exists(nvin, "wait")) 636 error = EBUSY; 637 nv_free(nvin); 638 goto close; 639 } 640 version = nv_get_uint8(nvin, "version"); 641 if (version == 0) { 642 /* 643 * If no version is sent, it means this is protocol version 1. 644 */ 645 version = 1; 646 } 647 if (version > HAST_PROTO_VERSION) { 648 pjdlog_warning("Invalid version received (%hhu).", version); 649 nv_free(nvin); 650 goto close; 651 } 652 res->hr_version = version; 653 pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); 654 token = nv_get_uint8_array(nvin, &size, "token"); 655 if (token == NULL) { 656 pjdlog_warning("Handshake header from %s has no 'token' field.", 657 res->hr_remoteaddr); 658 nv_free(nvin); 659 goto close; 660 } 661 if (size != sizeof(res->hr_token)) { 662 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 663 res->hr_remoteaddr, size, sizeof(res->hr_token)); 664 nv_free(nvin); 665 goto close; 666 } 667 bcopy(token, res->hr_token, sizeof(res->hr_token)); 668 nv_free(nvin); 669 670 /* 671 * Second handshake step. 672 * Setup incoming connection with remote node. 673 */ 674 if (primary_connect(res, &in) == -1) 675 goto close; 676 677 nvout = nv_alloc(); 678 nv_add_string(nvout, res->hr_name, "resource"); 679 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 680 "token"); 681 if (res->hr_resuid == 0) { 682 /* 683 * The resuid field was not yet initialized. 684 * Because we do synchronization inside init_resuid(), it is 685 * possible that someone already initialized it, the function 686 * will return false then, but if we successfully initialized 687 * it, we will get true. True means that there were no writes 688 * to this resource yet and we want to inform secondary that 689 * synchronization is not needed by sending "virgin" argument. 690 */ 691 if (init_resuid(res)) 692 nv_add_int8(nvout, 1, "virgin"); 693 } 694 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 695 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 696 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 697 if (nv_error(nvout) != 0) { 698 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 699 "Unable to allocate header for connection with %s", 700 res->hr_remoteaddr); 701 nv_free(nvout); 702 goto close; 703 } 704 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 705 pjdlog_errno(LOG_WARNING, 706 "Unable to send handshake header to %s", 707 res->hr_remoteaddr); 708 nv_free(nvout); 709 goto close; 710 } 711 nv_free(nvout); 712 if (hast_proto_recv_hdr(out, &nvin) == -1) { 713 pjdlog_errno(LOG_WARNING, 714 "Unable to receive handshake header from %s", 715 res->hr_remoteaddr); 716 goto close; 717 } 718 errmsg = nv_get_string(nvin, "errmsg"); 719 if (errmsg != NULL) { 720 pjdlog_warning("%s", errmsg); 721 nv_free(nvin); 722 goto close; 723 } 724 datasize = nv_get_int64(nvin, "datasize"); 725 if (datasize != res->hr_datasize) { 726 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 727 (intmax_t)res->hr_datasize, (intmax_t)datasize); 728 nv_free(nvin); 729 goto close; 730 } 731 extentsize = nv_get_int32(nvin, "extentsize"); 732 if (extentsize != res->hr_extentsize) { 733 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 734 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 735 nv_free(nvin); 736 goto close; 737 } 738 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 739 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 740 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 741 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) 742 enable_direct_reads(res); 743 if (nv_exists(nvin, "virgin")) { 744 /* 745 * Secondary was reinitialized, bump localcnt if it is 0 as 746 * only we have the data. 747 */ 748 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 749 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 750 751 if (res->hr_primary_localcnt == 0) { 752 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 753 754 mtx_lock(&metadata_lock); 755 res->hr_primary_localcnt++; 756 pjdlog_debug(1, "Increasing localcnt to %ju.", 757 (uintmax_t)res->hr_primary_localcnt); 758 (void)metadata_write(res); 759 mtx_unlock(&metadata_lock); 760 } 761 } 762 map = NULL; 763 mapsize = nv_get_uint32(nvin, "mapsize"); 764 if (mapsize > 0) { 765 map = malloc(mapsize); 766 if (map == NULL) { 767 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 768 (uintmax_t)mapsize); 769 nv_free(nvin); 770 goto close; 771 } 772 /* 773 * Remote node have some dirty extents on its own, lets 774 * download its activemap. 775 */ 776 if (hast_proto_recv_data(res, out, nvin, map, 777 mapsize) == -1) { 778 pjdlog_errno(LOG_ERR, 779 "Unable to receive remote activemap"); 780 nv_free(nvin); 781 free(map); 782 goto close; 783 } 784 /* 785 * Merge local and remote bitmaps. 786 */ 787 activemap_merge(res->hr_amp, map, mapsize); 788 free(map); 789 /* 790 * Now that we merged bitmaps from both nodes, flush it to the 791 * disk before we start to synchronize. 792 */ 793 mtx_lock(&res->hr_amp_lock); 794 (void)hast_activemap_flush(res); 795 } 796 nv_free(nvin); 797 #ifdef notyet 798 /* Setup directions. */ 799 if (proto_send(out, NULL, 0) == -1) 800 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 801 if (proto_recv(in, NULL, 0) == -1) 802 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 803 #endif 804 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 805 if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && 806 res->hr_version < 2) { 807 pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); 808 res->hr_replication = HAST_REPLICATION_FULLSYNC; 809 } else if (res->hr_replication != res->hr_original_replication) { 810 /* 811 * This is in case hastd disconnected and was upgraded. 812 */ 813 res->hr_replication = res->hr_original_replication; 814 } 815 if (inp != NULL && outp != NULL) { 816 *inp = in; 817 *outp = out; 818 } else { 819 res->hr_remotein = in; 820 res->hr_remoteout = out; 821 } 822 event_send(res, EVENT_CONNECT); 823 return (0); 824 close: 825 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 826 event_send(res, EVENT_SPLITBRAIN); 827 proto_close(out); 828 if (in != NULL) 829 proto_close(in); 830 return (error); 831 } 832 833 static void 834 sync_start(void) 835 { 836 837 mtx_lock(&sync_lock); 838 sync_inprogress = true; 839 mtx_unlock(&sync_lock); 840 cv_signal(&sync_cond); 841 } 842 843 static void 844 sync_stop(void) 845 { 846 847 mtx_lock(&sync_lock); 848 if (sync_inprogress) 849 sync_inprogress = false; 850 mtx_unlock(&sync_lock); 851 } 852 853 static void 854 init_ggate(struct hast_resource *res) 855 { 856 struct g_gate_ctl_create ggiocreate; 857 struct g_gate_ctl_cancel ggiocancel; 858 859 /* 860 * We communicate with ggate via /dev/ggctl. Open it. 861 */ 862 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 863 if (res->hr_ggatefd == -1) 864 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 865 /* 866 * Create provider before trying to connect, as connection failure 867 * is not critical, but may take some time. 868 */ 869 bzero(&ggiocreate, sizeof(ggiocreate)); 870 ggiocreate.gctl_version = G_GATE_VERSION; 871 ggiocreate.gctl_mediasize = res->hr_datasize; 872 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 873 ggiocreate.gctl_flags = 0; 874 ggiocreate.gctl_maxcount = 0; 875 ggiocreate.gctl_timeout = 0; 876 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 877 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 878 res->hr_provname); 879 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 880 pjdlog_info("Device hast/%s created.", res->hr_provname); 881 res->hr_ggateunit = ggiocreate.gctl_unit; 882 return; 883 } 884 if (errno != EEXIST) { 885 primary_exit(EX_OSERR, "Unable to create hast/%s device", 886 res->hr_provname); 887 } 888 pjdlog_debug(1, 889 "Device hast/%s already exists, we will try to take it over.", 890 res->hr_provname); 891 /* 892 * If we received EEXIST, we assume that the process who created the 893 * provider died and didn't clean up. In that case we will start from 894 * where he left of. 895 */ 896 bzero(&ggiocancel, sizeof(ggiocancel)); 897 ggiocancel.gctl_version = G_GATE_VERSION; 898 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 899 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 900 res->hr_provname); 901 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 902 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 903 res->hr_ggateunit = ggiocancel.gctl_unit; 904 return; 905 } 906 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 907 res->hr_provname); 908 } 909 910 void 911 hastd_primary(struct hast_resource *res) 912 { 913 pthread_t td; 914 pid_t pid; 915 int error, mode, debuglevel; 916 917 /* 918 * Create communication channel for sending control commands from 919 * parent to child. 920 */ 921 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 922 /* TODO: There's no need for this to be fatal error. */ 923 KEEP_ERRNO((void)pidfile_remove(pfh)); 924 pjdlog_exit(EX_OSERR, 925 "Unable to create control sockets between parent and child"); 926 } 927 /* 928 * Create communication channel for sending events from child to parent. 929 */ 930 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 931 /* TODO: There's no need for this to be fatal error. */ 932 KEEP_ERRNO((void)pidfile_remove(pfh)); 933 pjdlog_exit(EX_OSERR, 934 "Unable to create event sockets between child and parent"); 935 } 936 /* 937 * Create communication channel for sending connection requests from 938 * child to parent. 939 */ 940 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 941 /* TODO: There's no need for this to be fatal error. */ 942 KEEP_ERRNO((void)pidfile_remove(pfh)); 943 pjdlog_exit(EX_OSERR, 944 "Unable to create connection sockets between child and parent"); 945 } 946 947 pid = fork(); 948 if (pid == -1) { 949 /* TODO: There's no need for this to be fatal error. */ 950 KEEP_ERRNO((void)pidfile_remove(pfh)); 951 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 952 } 953 954 if (pid > 0) { 955 /* This is parent. */ 956 /* Declare that we are receiver. */ 957 proto_recv(res->hr_event, NULL, 0); 958 proto_recv(res->hr_conn, NULL, 0); 959 /* Declare that we are sender. */ 960 proto_send(res->hr_ctrl, NULL, 0); 961 res->hr_workerpid = pid; 962 return; 963 } 964 965 gres = res; 966 mode = pjdlog_mode_get(); 967 debuglevel = pjdlog_debug_get(); 968 969 /* Declare that we are sender. */ 970 proto_send(res->hr_event, NULL, 0); 971 proto_send(res->hr_conn, NULL, 0); 972 /* Declare that we are receiver. */ 973 proto_recv(res->hr_ctrl, NULL, 0); 974 descriptors_cleanup(res); 975 976 descriptors_assert(res, mode); 977 978 pjdlog_init(mode); 979 pjdlog_debug_set(debuglevel); 980 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 981 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 982 983 init_local(res); 984 init_ggate(res); 985 init_environment(res); 986 987 if (drop_privs(res) != 0) { 988 cleanup(res); 989 exit(EX_CONFIG); 990 } 991 pjdlog_info("Privileges successfully dropped."); 992 993 /* 994 * Create the guard thread first, so we can handle signals from the 995 * very beginning. 996 */ 997 error = pthread_create(&td, NULL, guard_thread, res); 998 PJDLOG_ASSERT(error == 0); 999 /* 1000 * Create the control thread before sending any event to the parent, 1001 * as we can deadlock when parent sends control request to worker, 1002 * but worker has no control thread started yet, so parent waits. 1003 * In the meantime worker sends an event to the parent, but parent 1004 * is unable to handle the event, because it waits for control 1005 * request response. 1006 */ 1007 error = pthread_create(&td, NULL, ctrl_thread, res); 1008 PJDLOG_ASSERT(error == 0); 1009 if (real_remote(res)) { 1010 error = init_remote(res, NULL, NULL); 1011 if (error == 0) { 1012 sync_start(); 1013 } else if (error == EBUSY) { 1014 time_t start = time(NULL); 1015 1016 pjdlog_warning("Waiting for remote node to become %s for %ds.", 1017 role2str(HAST_ROLE_SECONDARY), 1018 res->hr_timeout); 1019 for (;;) { 1020 sleep(1); 1021 error = init_remote(res, NULL, NULL); 1022 if (error != EBUSY) 1023 break; 1024 if (time(NULL) > start + res->hr_timeout) 1025 break; 1026 } 1027 if (error == EBUSY) { 1028 pjdlog_warning("Remote node is still %s, starting anyway.", 1029 role2str(HAST_ROLE_PRIMARY)); 1030 } 1031 } 1032 } 1033 error = pthread_create(&td, NULL, ggate_recv_thread, res); 1034 PJDLOG_ASSERT(error == 0); 1035 error = pthread_create(&td, NULL, local_send_thread, res); 1036 PJDLOG_ASSERT(error == 0); 1037 error = pthread_create(&td, NULL, remote_send_thread, res); 1038 PJDLOG_ASSERT(error == 0); 1039 error = pthread_create(&td, NULL, remote_recv_thread, res); 1040 PJDLOG_ASSERT(error == 0); 1041 error = pthread_create(&td, NULL, ggate_send_thread, res); 1042 PJDLOG_ASSERT(error == 0); 1043 fullystarted = true; 1044 (void)sync_thread(res); 1045 } 1046 1047 static void 1048 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, 1049 const char *fmt, ...) 1050 { 1051 char msg[1024]; 1052 va_list ap; 1053 1054 va_start(ap, fmt); 1055 (void)vsnprintf(msg, sizeof(msg), fmt, ap); 1056 va_end(ap); 1057 switch (ggio->gctl_cmd) { 1058 case BIO_READ: 1059 (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1060 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1061 break; 1062 case BIO_DELETE: 1063 (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1064 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1065 break; 1066 case BIO_FLUSH: 1067 (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1068 break; 1069 case BIO_WRITE: 1070 (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1071 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1072 break; 1073 default: 1074 (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1075 (unsigned int)ggio->gctl_cmd); 1076 break; 1077 } 1078 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1079 } 1080 1081 static void 1082 remote_close(struct hast_resource *res, int ncomp) 1083 { 1084 1085 rw_wlock(&hio_remote_lock[ncomp]); 1086 /* 1087 * Check for a race between dropping rlock and acquiring wlock - 1088 * another thread can close connection in-between. 1089 */ 1090 if (!ISCONNECTED(res, ncomp)) { 1091 PJDLOG_ASSERT(res->hr_remotein == NULL); 1092 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1093 rw_unlock(&hio_remote_lock[ncomp]); 1094 return; 1095 } 1096 1097 PJDLOG_ASSERT(res->hr_remotein != NULL); 1098 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1099 1100 pjdlog_debug(2, "Closing incoming connection to %s.", 1101 res->hr_remoteaddr); 1102 proto_close(res->hr_remotein); 1103 res->hr_remotein = NULL; 1104 pjdlog_debug(2, "Closing outgoing connection to %s.", 1105 res->hr_remoteaddr); 1106 proto_close(res->hr_remoteout); 1107 res->hr_remoteout = NULL; 1108 1109 rw_unlock(&hio_remote_lock[ncomp]); 1110 1111 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1112 1113 /* 1114 * Stop synchronization if in-progress. 1115 */ 1116 sync_stop(); 1117 1118 event_send(res, EVENT_DISCONNECT); 1119 } 1120 1121 /* 1122 * Acknowledge write completion to the kernel, but don't update activemap yet. 1123 */ 1124 static void 1125 write_complete(struct hast_resource *res, struct hio *hio) 1126 { 1127 struct g_gate_ctl_io *ggio; 1128 unsigned int ncomp; 1129 1130 PJDLOG_ASSERT(!hio->hio_done); 1131 1132 ggio = &hio->hio_ggio; 1133 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1134 1135 /* 1136 * Bump local count if this is first write after 1137 * connection failure with remote node. 1138 */ 1139 ncomp = 1; 1140 rw_rlock(&hio_remote_lock[ncomp]); 1141 if (!ISCONNECTED(res, ncomp)) { 1142 mtx_lock(&metadata_lock); 1143 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1144 res->hr_primary_localcnt++; 1145 pjdlog_debug(1, "Increasing localcnt to %ju.", 1146 (uintmax_t)res->hr_primary_localcnt); 1147 (void)metadata_write(res); 1148 } 1149 mtx_unlock(&metadata_lock); 1150 } 1151 rw_unlock(&hio_remote_lock[ncomp]); 1152 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1153 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1154 hio->hio_done = true; 1155 } 1156 1157 /* 1158 * Thread receives ggate I/O requests from the kernel and passes them to 1159 * appropriate threads: 1160 * WRITE - always goes to both local_send and remote_send threads 1161 * READ (when the block is up-to-date on local component) - 1162 * only local_send thread 1163 * READ (when the block isn't up-to-date on local component) - 1164 * only remote_send thread 1165 * DELETE - always goes to both local_send and remote_send threads 1166 * FLUSH - always goes to both local_send and remote_send threads 1167 */ 1168 static void * 1169 ggate_recv_thread(void *arg) 1170 { 1171 struct hast_resource *res = arg; 1172 struct g_gate_ctl_io *ggio; 1173 struct hio *hio; 1174 unsigned int ii, ncomp, ncomps; 1175 int error; 1176 1177 for (;;) { 1178 pjdlog_debug(2, "ggate_recv: Taking free request."); 1179 QUEUE_TAKE2(hio, free); 1180 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1181 ggio = &hio->hio_ggio; 1182 ggio->gctl_unit = res->hr_ggateunit; 1183 ggio->gctl_length = MAXPHYS; 1184 ggio->gctl_error = 0; 1185 hio->hio_done = false; 1186 hio->hio_replication = res->hr_replication; 1187 pjdlog_debug(2, 1188 "ggate_recv: (%p) Waiting for request from the kernel.", 1189 hio); 1190 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1191 if (sigexit_received) 1192 pthread_exit(NULL); 1193 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1194 } 1195 error = ggio->gctl_error; 1196 switch (error) { 1197 case 0: 1198 break; 1199 case ECANCELED: 1200 /* Exit gracefully. */ 1201 if (!sigexit_received) { 1202 pjdlog_debug(2, 1203 "ggate_recv: (%p) Received cancel from the kernel.", 1204 hio); 1205 pjdlog_info("Received cancel from the kernel, exiting."); 1206 } 1207 pthread_exit(NULL); 1208 case ENOMEM: 1209 /* 1210 * Buffer too small? Impossible, we allocate MAXPHYS 1211 * bytes - request can't be bigger than that. 1212 */ 1213 /* FALLTHROUGH */ 1214 case ENXIO: 1215 default: 1216 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1217 strerror(error)); 1218 } 1219 1220 ncomp = 0; 1221 ncomps = HAST_NCOMPONENTS; 1222 1223 for (ii = 0; ii < ncomps; ii++) 1224 hio->hio_errors[ii] = EINVAL; 1225 reqlog(LOG_DEBUG, 2, ggio, 1226 "ggate_recv: (%p) Request received from the kernel: ", 1227 hio); 1228 1229 /* 1230 * Inform all components about new write request. 1231 * For read request prefer local component unless the given 1232 * range is out-of-date, then use remote component. 1233 */ 1234 switch (ggio->gctl_cmd) { 1235 case BIO_READ: 1236 res->hr_stat_read++; 1237 ncomps = 1; 1238 mtx_lock(&metadata_lock); 1239 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1240 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1241 /* 1242 * This range is up-to-date on local component, 1243 * so handle request locally. 1244 */ 1245 /* Local component is 0 for now. */ 1246 ncomp = 0; 1247 } else /* if (res->hr_syncsrc == 1248 HAST_SYNCSRC_SECONDARY) */ { 1249 PJDLOG_ASSERT(res->hr_syncsrc == 1250 HAST_SYNCSRC_SECONDARY); 1251 /* 1252 * This range is out-of-date on local component, 1253 * so send request to the remote node. 1254 */ 1255 /* Remote component is 1 for now. */ 1256 ncomp = 1; 1257 } 1258 mtx_unlock(&metadata_lock); 1259 break; 1260 case BIO_WRITE: 1261 res->hr_stat_write++; 1262 if (res->hr_resuid == 0 && 1263 res->hr_primary_localcnt == 0) { 1264 /* This is first write. */ 1265 res->hr_primary_localcnt = 1; 1266 } 1267 for (;;) { 1268 mtx_lock(&range_lock); 1269 if (rangelock_islocked(range_sync, 1270 ggio->gctl_offset, ggio->gctl_length)) { 1271 pjdlog_debug(2, 1272 "regular: Range offset=%jd length=%zu locked.", 1273 (intmax_t)ggio->gctl_offset, 1274 (size_t)ggio->gctl_length); 1275 range_regular_wait = true; 1276 cv_wait(&range_regular_cond, &range_lock); 1277 range_regular_wait = false; 1278 mtx_unlock(&range_lock); 1279 continue; 1280 } 1281 if (rangelock_add(range_regular, 1282 ggio->gctl_offset, ggio->gctl_length) == -1) { 1283 mtx_unlock(&range_lock); 1284 pjdlog_debug(2, 1285 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1286 (intmax_t)ggio->gctl_offset, 1287 (size_t)ggio->gctl_length); 1288 sleep(1); 1289 continue; 1290 } 1291 mtx_unlock(&range_lock); 1292 break; 1293 } 1294 mtx_lock(&res->hr_amp_lock); 1295 if (activemap_write_start(res->hr_amp, 1296 ggio->gctl_offset, ggio->gctl_length)) { 1297 res->hr_stat_activemap_update++; 1298 (void)hast_activemap_flush(res); 1299 } else { 1300 mtx_unlock(&res->hr_amp_lock); 1301 } 1302 break; 1303 case BIO_DELETE: 1304 res->hr_stat_delete++; 1305 break; 1306 case BIO_FLUSH: 1307 res->hr_stat_flush++; 1308 break; 1309 } 1310 pjdlog_debug(2, 1311 "ggate_recv: (%p) Moving request to the send queues.", hio); 1312 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1313 ggio->gctl_cmd == BIO_WRITE) { 1314 /* Each remote request needs two responses in memsync. */ 1315 refcnt_init(&hio->hio_countdown, ncomps + 1); 1316 } else { 1317 refcnt_init(&hio->hio_countdown, ncomps); 1318 } 1319 for (ii = ncomp; ii < ncomps; ii++) 1320 QUEUE_INSERT1(hio, send, ii); 1321 } 1322 /* NOTREACHED */ 1323 return (NULL); 1324 } 1325 1326 /* 1327 * Thread reads from or writes to local component. 1328 * If local read fails, it redirects it to remote_send thread. 1329 */ 1330 static void * 1331 local_send_thread(void *arg) 1332 { 1333 struct hast_resource *res = arg; 1334 struct g_gate_ctl_io *ggio; 1335 struct hio *hio; 1336 unsigned int ncomp, rncomp; 1337 ssize_t ret; 1338 1339 /* Local component is 0 for now. */ 1340 ncomp = 0; 1341 /* Remote component is 1 for now. */ 1342 rncomp = 1; 1343 1344 for (;;) { 1345 pjdlog_debug(2, "local_send: Taking request."); 1346 QUEUE_TAKE1(hio, send, ncomp, 0); 1347 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1348 ggio = &hio->hio_ggio; 1349 switch (ggio->gctl_cmd) { 1350 case BIO_READ: 1351 ret = pread(res->hr_localfd, ggio->gctl_data, 1352 ggio->gctl_length, 1353 ggio->gctl_offset + res->hr_localoff); 1354 if (ret == ggio->gctl_length) 1355 hio->hio_errors[ncomp] = 0; 1356 else if (!ISSYNCREQ(hio)) { 1357 /* 1358 * If READ failed, try to read from remote node. 1359 */ 1360 if (ret == -1) { 1361 reqlog(LOG_WARNING, 0, ggio, 1362 "Local request failed (%s), trying remote node. ", 1363 strerror(errno)); 1364 } else if (ret != ggio->gctl_length) { 1365 reqlog(LOG_WARNING, 0, ggio, 1366 "Local request failed (%zd != %jd), trying remote node. ", 1367 ret, (intmax_t)ggio->gctl_length); 1368 } 1369 QUEUE_INSERT1(hio, send, rncomp); 1370 continue; 1371 } 1372 break; 1373 case BIO_WRITE: 1374 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1375 ggio->gctl_length, 1376 ggio->gctl_offset + res->hr_localoff); 1377 if (ret == -1) { 1378 hio->hio_errors[ncomp] = errno; 1379 reqlog(LOG_WARNING, 0, ggio, 1380 "Local request failed (%s): ", 1381 strerror(errno)); 1382 } else if (ret != ggio->gctl_length) { 1383 hio->hio_errors[ncomp] = EIO; 1384 reqlog(LOG_WARNING, 0, ggio, 1385 "Local request failed (%zd != %jd): ", 1386 ret, (intmax_t)ggio->gctl_length); 1387 } else { 1388 hio->hio_errors[ncomp] = 0; 1389 if (hio->hio_replication == 1390 HAST_REPLICATION_ASYNC) { 1391 ggio->gctl_error = 0; 1392 write_complete(res, hio); 1393 } 1394 } 1395 break; 1396 case BIO_DELETE: 1397 ret = g_delete(res->hr_localfd, 1398 ggio->gctl_offset + res->hr_localoff, 1399 ggio->gctl_length); 1400 if (ret == -1) { 1401 hio->hio_errors[ncomp] = errno; 1402 reqlog(LOG_WARNING, 0, ggio, 1403 "Local request failed (%s): ", 1404 strerror(errno)); 1405 } else { 1406 hio->hio_errors[ncomp] = 0; 1407 } 1408 break; 1409 case BIO_FLUSH: 1410 if (!res->hr_localflush) { 1411 ret = -1; 1412 errno = EOPNOTSUPP; 1413 break; 1414 } 1415 ret = g_flush(res->hr_localfd); 1416 if (ret == -1) { 1417 if (errno == EOPNOTSUPP) 1418 res->hr_localflush = false; 1419 hio->hio_errors[ncomp] = errno; 1420 reqlog(LOG_WARNING, 0, ggio, 1421 "Local request failed (%s): ", 1422 strerror(errno)); 1423 } else { 1424 hio->hio_errors[ncomp] = 0; 1425 } 1426 break; 1427 } 1428 1429 if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1430 ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1431 if (refcnt_release(&hio->hio_countdown) > 0) 1432 continue; 1433 } else { 1434 /* 1435 * Depending on hio_countdown value, requests finished 1436 * in the following order: 1437 * 0: remote memsync, remote final, local write 1438 * 1: remote memsync, local write, (remote final) 1439 * 2: local write, (remote memsync), (remote final) 1440 */ 1441 switch (refcnt_release(&hio->hio_countdown)) { 1442 case 0: 1443 /* 1444 * Local write finished as last. 1445 */ 1446 break; 1447 case 1: 1448 /* 1449 * Local write finished after remote memsync 1450 * reply arrvied. We can complete the write now. 1451 */ 1452 if (hio->hio_errors[0] == 0) 1453 write_complete(res, hio); 1454 continue; 1455 case 2: 1456 /* 1457 * Local write finished as first. 1458 */ 1459 continue; 1460 default: 1461 PJDLOG_ABORT("Invalid hio_countdown."); 1462 } 1463 } 1464 if (ISSYNCREQ(hio)) { 1465 mtx_lock(&sync_lock); 1466 SYNCREQDONE(hio); 1467 mtx_unlock(&sync_lock); 1468 cv_signal(&sync_cond); 1469 } else { 1470 pjdlog_debug(2, 1471 "local_send: (%p) Moving request to the done queue.", 1472 hio); 1473 QUEUE_INSERT2(hio, done); 1474 } 1475 } 1476 /* NOTREACHED */ 1477 return (NULL); 1478 } 1479 1480 static void 1481 keepalive_send(struct hast_resource *res, unsigned int ncomp) 1482 { 1483 struct nv *nv; 1484 1485 rw_rlock(&hio_remote_lock[ncomp]); 1486 1487 if (!ISCONNECTED(res, ncomp)) { 1488 rw_unlock(&hio_remote_lock[ncomp]); 1489 return; 1490 } 1491 1492 PJDLOG_ASSERT(res->hr_remotein != NULL); 1493 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1494 1495 nv = nv_alloc(); 1496 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1497 if (nv_error(nv) != 0) { 1498 rw_unlock(&hio_remote_lock[ncomp]); 1499 nv_free(nv); 1500 pjdlog_debug(1, 1501 "keepalive_send: Unable to prepare header to send."); 1502 return; 1503 } 1504 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1505 rw_unlock(&hio_remote_lock[ncomp]); 1506 pjdlog_common(LOG_DEBUG, 1, errno, 1507 "keepalive_send: Unable to send request"); 1508 nv_free(nv); 1509 remote_close(res, ncomp); 1510 return; 1511 } 1512 1513 rw_unlock(&hio_remote_lock[ncomp]); 1514 nv_free(nv); 1515 pjdlog_debug(2, "keepalive_send: Request sent."); 1516 } 1517 1518 /* 1519 * Thread sends request to secondary node. 1520 */ 1521 static void * 1522 remote_send_thread(void *arg) 1523 { 1524 struct hast_resource *res = arg; 1525 struct g_gate_ctl_io *ggio; 1526 time_t lastcheck, now; 1527 struct hio *hio; 1528 struct nv *nv; 1529 unsigned int ncomp; 1530 bool wakeup; 1531 uint64_t offset, length; 1532 uint8_t cmd; 1533 void *data; 1534 1535 /* Remote component is 1 for now. */ 1536 ncomp = 1; 1537 lastcheck = time(NULL); 1538 1539 for (;;) { 1540 pjdlog_debug(2, "remote_send: Taking request."); 1541 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1542 if (hio == NULL) { 1543 now = time(NULL); 1544 if (lastcheck + HAST_KEEPALIVE <= now) { 1545 keepalive_send(res, ncomp); 1546 lastcheck = now; 1547 } 1548 continue; 1549 } 1550 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1551 ggio = &hio->hio_ggio; 1552 switch (ggio->gctl_cmd) { 1553 case BIO_READ: 1554 cmd = HIO_READ; 1555 data = NULL; 1556 offset = ggio->gctl_offset; 1557 length = ggio->gctl_length; 1558 break; 1559 case BIO_WRITE: 1560 cmd = HIO_WRITE; 1561 data = ggio->gctl_data; 1562 offset = ggio->gctl_offset; 1563 length = ggio->gctl_length; 1564 break; 1565 case BIO_DELETE: 1566 cmd = HIO_DELETE; 1567 data = NULL; 1568 offset = ggio->gctl_offset; 1569 length = ggio->gctl_length; 1570 break; 1571 case BIO_FLUSH: 1572 cmd = HIO_FLUSH; 1573 data = NULL; 1574 offset = 0; 1575 length = 0; 1576 break; 1577 default: 1578 PJDLOG_ABORT("invalid condition"); 1579 } 1580 nv = nv_alloc(); 1581 nv_add_uint8(nv, cmd, "cmd"); 1582 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1583 nv_add_uint64(nv, offset, "offset"); 1584 nv_add_uint64(nv, length, "length"); 1585 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1586 ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { 1587 nv_add_uint8(nv, 1, "memsync"); 1588 } 1589 if (nv_error(nv) != 0) { 1590 hio->hio_errors[ncomp] = nv_error(nv); 1591 pjdlog_debug(2, 1592 "remote_send: (%p) Unable to prepare header to send.", 1593 hio); 1594 reqlog(LOG_ERR, 0, ggio, 1595 "Unable to prepare header to send (%s): ", 1596 strerror(nv_error(nv))); 1597 /* Move failed request immediately to the done queue. */ 1598 goto done_queue; 1599 } 1600 /* 1601 * Protect connection from disappearing. 1602 */ 1603 rw_rlock(&hio_remote_lock[ncomp]); 1604 if (!ISCONNECTED(res, ncomp)) { 1605 rw_unlock(&hio_remote_lock[ncomp]); 1606 hio->hio_errors[ncomp] = ENOTCONN; 1607 goto done_queue; 1608 } 1609 /* 1610 * Move the request to recv queue before sending it, because 1611 * in different order we can get reply before we move request 1612 * to recv queue. 1613 */ 1614 pjdlog_debug(2, 1615 "remote_send: (%p) Moving request to the recv queue.", 1616 hio); 1617 mtx_lock(&hio_recv_list_lock[ncomp]); 1618 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1619 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1620 mtx_unlock(&hio_recv_list_lock[ncomp]); 1621 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1622 data != NULL ? length : 0) == -1) { 1623 hio->hio_errors[ncomp] = errno; 1624 rw_unlock(&hio_remote_lock[ncomp]); 1625 pjdlog_debug(2, 1626 "remote_send: (%p) Unable to send request.", hio); 1627 reqlog(LOG_ERR, 0, ggio, 1628 "Unable to send request (%s): ", 1629 strerror(hio->hio_errors[ncomp])); 1630 remote_close(res, ncomp); 1631 /* 1632 * Take request back from the receive queue and move 1633 * it immediately to the done queue. 1634 */ 1635 mtx_lock(&hio_recv_list_lock[ncomp]); 1636 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1637 hio_next[ncomp]); 1638 mtx_unlock(&hio_recv_list_lock[ncomp]); 1639 goto done_queue; 1640 } 1641 rw_unlock(&hio_remote_lock[ncomp]); 1642 nv_free(nv); 1643 if (wakeup) 1644 cv_signal(&hio_recv_list_cond[ncomp]); 1645 continue; 1646 done_queue: 1647 nv_free(nv); 1648 if (ISSYNCREQ(hio)) { 1649 if (refcnt_release(&hio->hio_countdown) > 0) 1650 continue; 1651 mtx_lock(&sync_lock); 1652 SYNCREQDONE(hio); 1653 mtx_unlock(&sync_lock); 1654 cv_signal(&sync_cond); 1655 continue; 1656 } 1657 if (ggio->gctl_cmd == BIO_WRITE) { 1658 mtx_lock(&res->hr_amp_lock); 1659 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1660 ggio->gctl_length)) { 1661 (void)hast_activemap_flush(res); 1662 } else { 1663 mtx_unlock(&res->hr_amp_lock); 1664 } 1665 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) 1666 (void)refcnt_release(&hio->hio_countdown); 1667 } 1668 if (refcnt_release(&hio->hio_countdown) > 0) 1669 continue; 1670 pjdlog_debug(2, 1671 "remote_send: (%p) Moving request to the done queue.", 1672 hio); 1673 QUEUE_INSERT2(hio, done); 1674 } 1675 /* NOTREACHED */ 1676 return (NULL); 1677 } 1678 1679 /* 1680 * Thread receives answer from secondary node and passes it to ggate_send 1681 * thread. 1682 */ 1683 static void * 1684 remote_recv_thread(void *arg) 1685 { 1686 struct hast_resource *res = arg; 1687 struct g_gate_ctl_io *ggio; 1688 struct hio *hio; 1689 struct nv *nv; 1690 unsigned int ncomp; 1691 uint64_t seq; 1692 bool memsyncack; 1693 int error; 1694 1695 /* Remote component is 1 for now. */ 1696 ncomp = 1; 1697 1698 for (;;) { 1699 /* Wait until there is anything to receive. */ 1700 mtx_lock(&hio_recv_list_lock[ncomp]); 1701 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1702 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1703 cv_wait(&hio_recv_list_cond[ncomp], 1704 &hio_recv_list_lock[ncomp]); 1705 } 1706 mtx_unlock(&hio_recv_list_lock[ncomp]); 1707 1708 memsyncack = false; 1709 1710 rw_rlock(&hio_remote_lock[ncomp]); 1711 if (!ISCONNECTED(res, ncomp)) { 1712 rw_unlock(&hio_remote_lock[ncomp]); 1713 /* 1714 * Connection is dead, so move all pending requests to 1715 * the done queue (one-by-one). 1716 */ 1717 mtx_lock(&hio_recv_list_lock[ncomp]); 1718 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1719 PJDLOG_ASSERT(hio != NULL); 1720 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1721 hio_next[ncomp]); 1722 mtx_unlock(&hio_recv_list_lock[ncomp]); 1723 goto done_queue; 1724 } 1725 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1726 pjdlog_errno(LOG_ERR, 1727 "Unable to receive reply header"); 1728 rw_unlock(&hio_remote_lock[ncomp]); 1729 remote_close(res, ncomp); 1730 continue; 1731 } 1732 rw_unlock(&hio_remote_lock[ncomp]); 1733 seq = nv_get_uint64(nv, "seq"); 1734 if (seq == 0) { 1735 pjdlog_error("Header contains no 'seq' field."); 1736 nv_free(nv); 1737 continue; 1738 } 1739 memsyncack = nv_exists(nv, "received"); 1740 mtx_lock(&hio_recv_list_lock[ncomp]); 1741 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1742 if (hio->hio_ggio.gctl_seq == seq) { 1743 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1744 hio_next[ncomp]); 1745 break; 1746 } 1747 } 1748 mtx_unlock(&hio_recv_list_lock[ncomp]); 1749 if (hio == NULL) { 1750 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1751 (uintmax_t)seq); 1752 nv_free(nv); 1753 continue; 1754 } 1755 ggio = &hio->hio_ggio; 1756 error = nv_get_int16(nv, "error"); 1757 if (error != 0) { 1758 /* Request failed on remote side. */ 1759 hio->hio_errors[ncomp] = error; 1760 reqlog(LOG_WARNING, 0, ggio, 1761 "Remote request failed (%s): ", strerror(error)); 1762 nv_free(nv); 1763 goto done_queue; 1764 } 1765 switch (ggio->gctl_cmd) { 1766 case BIO_READ: 1767 rw_rlock(&hio_remote_lock[ncomp]); 1768 if (!ISCONNECTED(res, ncomp)) { 1769 rw_unlock(&hio_remote_lock[ncomp]); 1770 nv_free(nv); 1771 goto done_queue; 1772 } 1773 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1774 ggio->gctl_data, ggio->gctl_length) == -1) { 1775 hio->hio_errors[ncomp] = errno; 1776 pjdlog_errno(LOG_ERR, 1777 "Unable to receive reply data"); 1778 rw_unlock(&hio_remote_lock[ncomp]); 1779 nv_free(nv); 1780 remote_close(res, ncomp); 1781 goto done_queue; 1782 } 1783 rw_unlock(&hio_remote_lock[ncomp]); 1784 break; 1785 case BIO_WRITE: 1786 case BIO_DELETE: 1787 case BIO_FLUSH: 1788 break; 1789 default: 1790 PJDLOG_ABORT("invalid condition"); 1791 } 1792 hio->hio_errors[ncomp] = 0; 1793 nv_free(nv); 1794 done_queue: 1795 if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1796 hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1797 if (refcnt_release(&hio->hio_countdown) > 0) 1798 continue; 1799 } else { 1800 /* 1801 * Depending on hio_countdown value, requests finished 1802 * in the following order: 1803 * 1804 * 0: local write, remote memsync, remote final 1805 * or 1806 * 0: remote memsync, local write, remote final 1807 * 1808 * 1: local write, remote memsync, (remote final) 1809 * or 1810 * 1: remote memsync, remote final, (local write) 1811 * 1812 * 2: remote memsync, (local write), (remote final) 1813 * or 1814 * 2: remote memsync, (remote final), (local write) 1815 */ 1816 switch (refcnt_release(&hio->hio_countdown)) { 1817 case 0: 1818 /* 1819 * Remote final reply arrived. 1820 */ 1821 PJDLOG_ASSERT(!memsyncack); 1822 break; 1823 case 1: 1824 if (memsyncack) { 1825 /* 1826 * Local request already finished, so we 1827 * can complete the write. 1828 */ 1829 if (hio->hio_errors[0] == 0) 1830 write_complete(res, hio); 1831 /* 1832 * We still need to wait for final 1833 * remote reply. 1834 */ 1835 pjdlog_debug(2, 1836 "remote_recv: (%p) Moving request back to the recv queue.", 1837 hio); 1838 mtx_lock(&hio_recv_list_lock[ncomp]); 1839 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], 1840 hio, hio_next[ncomp]); 1841 mtx_unlock(&hio_recv_list_lock[ncomp]); 1842 } else { 1843 /* 1844 * Remote final reply arrived before 1845 * local write finished. 1846 * Nothing to do in such case. 1847 */ 1848 } 1849 continue; 1850 case 2: 1851 /* 1852 * We received remote memsync reply even before 1853 * local write finished. 1854 */ 1855 PJDLOG_ASSERT(memsyncack); 1856 1857 pjdlog_debug(2, 1858 "remote_recv: (%p) Moving request back to the recv queue.", 1859 hio); 1860 mtx_lock(&hio_recv_list_lock[ncomp]); 1861 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, 1862 hio_next[ncomp]); 1863 mtx_unlock(&hio_recv_list_lock[ncomp]); 1864 continue; 1865 default: 1866 PJDLOG_ABORT("Invalid hio_countdown."); 1867 } 1868 } 1869 if (ISSYNCREQ(hio)) { 1870 mtx_lock(&sync_lock); 1871 SYNCREQDONE(hio); 1872 mtx_unlock(&sync_lock); 1873 cv_signal(&sync_cond); 1874 } else { 1875 pjdlog_debug(2, 1876 "remote_recv: (%p) Moving request to the done queue.", 1877 hio); 1878 QUEUE_INSERT2(hio, done); 1879 } 1880 } 1881 /* NOTREACHED */ 1882 return (NULL); 1883 } 1884 1885 /* 1886 * Thread sends answer to the kernel. 1887 */ 1888 static void * 1889 ggate_send_thread(void *arg) 1890 { 1891 struct hast_resource *res = arg; 1892 struct g_gate_ctl_io *ggio; 1893 struct hio *hio; 1894 unsigned int ii, ncomps; 1895 1896 ncomps = HAST_NCOMPONENTS; 1897 1898 for (;;) { 1899 pjdlog_debug(2, "ggate_send: Taking request."); 1900 QUEUE_TAKE2(hio, done); 1901 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1902 ggio = &hio->hio_ggio; 1903 for (ii = 0; ii < ncomps; ii++) { 1904 if (hio->hio_errors[ii] == 0) { 1905 /* 1906 * One successful request is enough to declare 1907 * success. 1908 */ 1909 ggio->gctl_error = 0; 1910 break; 1911 } 1912 } 1913 if (ii == ncomps) { 1914 /* 1915 * None of the requests were successful. 1916 * Use the error from local component except the 1917 * case when we did only remote request. 1918 */ 1919 if (ggio->gctl_cmd == BIO_READ && 1920 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1921 ggio->gctl_error = hio->hio_errors[1]; 1922 else 1923 ggio->gctl_error = hio->hio_errors[0]; 1924 } 1925 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1926 mtx_lock(&res->hr_amp_lock); 1927 if (activemap_write_complete(res->hr_amp, 1928 ggio->gctl_offset, ggio->gctl_length)) { 1929 res->hr_stat_activemap_update++; 1930 (void)hast_activemap_flush(res); 1931 } else { 1932 mtx_unlock(&res->hr_amp_lock); 1933 } 1934 } 1935 if (ggio->gctl_cmd == BIO_WRITE) { 1936 /* 1937 * Unlock range we locked. 1938 */ 1939 mtx_lock(&range_lock); 1940 rangelock_del(range_regular, ggio->gctl_offset, 1941 ggio->gctl_length); 1942 if (range_sync_wait) 1943 cv_signal(&range_sync_cond); 1944 mtx_unlock(&range_lock); 1945 if (!hio->hio_done) 1946 write_complete(res, hio); 1947 } else { 1948 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1949 primary_exit(EX_OSERR, 1950 "G_GATE_CMD_DONE failed"); 1951 } 1952 } 1953 if (hio->hio_errors[0]) { 1954 switch (ggio->gctl_cmd) { 1955 case BIO_READ: 1956 res->hr_stat_read_error++; 1957 break; 1958 case BIO_WRITE: 1959 res->hr_stat_write_error++; 1960 break; 1961 case BIO_DELETE: 1962 res->hr_stat_delete_error++; 1963 break; 1964 case BIO_FLUSH: 1965 res->hr_stat_flush_error++; 1966 break; 1967 } 1968 } 1969 pjdlog_debug(2, 1970 "ggate_send: (%p) Moving request to the free queue.", hio); 1971 QUEUE_INSERT2(hio, free); 1972 } 1973 /* NOTREACHED */ 1974 return (NULL); 1975 } 1976 1977 /* 1978 * Thread synchronize local and remote components. 1979 */ 1980 static void * 1981 sync_thread(void *arg __unused) 1982 { 1983 struct hast_resource *res = arg; 1984 struct hio *hio; 1985 struct g_gate_ctl_io *ggio; 1986 struct timeval tstart, tend, tdiff; 1987 unsigned int ii, ncomp, ncomps; 1988 off_t offset, length, synced; 1989 bool dorewind, directreads; 1990 int syncext; 1991 1992 ncomps = HAST_NCOMPONENTS; 1993 dorewind = true; 1994 synced = 0; 1995 offset = -1; 1996 directreads = false; 1997 1998 for (;;) { 1999 mtx_lock(&sync_lock); 2000 if (offset >= 0 && !sync_inprogress) { 2001 gettimeofday(&tend, NULL); 2002 timersub(&tend, &tstart, &tdiff); 2003 pjdlog_info("Synchronization interrupted after %#.0T. " 2004 "%NB synchronized so far.", &tdiff, 2005 (intmax_t)synced); 2006 event_send(res, EVENT_SYNCINTR); 2007 } 2008 while (!sync_inprogress) { 2009 dorewind = true; 2010 synced = 0; 2011 cv_wait(&sync_cond, &sync_lock); 2012 } 2013 mtx_unlock(&sync_lock); 2014 /* 2015 * Obtain offset at which we should synchronize. 2016 * Rewind synchronization if needed. 2017 */ 2018 mtx_lock(&res->hr_amp_lock); 2019 if (dorewind) 2020 activemap_sync_rewind(res->hr_amp); 2021 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 2022 if (syncext != -1) { 2023 /* 2024 * We synchronized entire syncext extent, we can mark 2025 * it as clean now. 2026 */ 2027 if (activemap_extent_complete(res->hr_amp, syncext)) 2028 (void)hast_activemap_flush(res); 2029 else 2030 mtx_unlock(&res->hr_amp_lock); 2031 } else { 2032 mtx_unlock(&res->hr_amp_lock); 2033 } 2034 if (dorewind) { 2035 dorewind = false; 2036 if (offset == -1) 2037 pjdlog_info("Nodes are in sync."); 2038 else { 2039 pjdlog_info("Synchronization started. %NB to go.", 2040 (intmax_t)(res->hr_extentsize * 2041 activemap_ndirty(res->hr_amp))); 2042 event_send(res, EVENT_SYNCSTART); 2043 gettimeofday(&tstart, NULL); 2044 } 2045 } 2046 if (offset == -1) { 2047 sync_stop(); 2048 pjdlog_debug(1, "Nothing to synchronize."); 2049 /* 2050 * Synchronization complete, make both localcnt and 2051 * remotecnt equal. 2052 */ 2053 ncomp = 1; 2054 rw_rlock(&hio_remote_lock[ncomp]); 2055 if (ISCONNECTED(res, ncomp)) { 2056 if (synced > 0) { 2057 int64_t bps; 2058 2059 gettimeofday(&tend, NULL); 2060 timersub(&tend, &tstart, &tdiff); 2061 bps = (int64_t)((double)synced / 2062 ((double)tdiff.tv_sec + 2063 (double)tdiff.tv_usec / 1000000)); 2064 pjdlog_info("Synchronization complete. " 2065 "%NB synchronized in %#.0lT (%NB/sec).", 2066 (intmax_t)synced, &tdiff, 2067 (intmax_t)bps); 2068 event_send(res, EVENT_SYNCDONE); 2069 } 2070 mtx_lock(&metadata_lock); 2071 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 2072 directreads = true; 2073 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 2074 res->hr_primary_localcnt = 2075 res->hr_secondary_remotecnt; 2076 res->hr_primary_remotecnt = 2077 res->hr_secondary_localcnt; 2078 pjdlog_debug(1, 2079 "Setting localcnt to %ju and remotecnt to %ju.", 2080 (uintmax_t)res->hr_primary_localcnt, 2081 (uintmax_t)res->hr_primary_remotecnt); 2082 (void)metadata_write(res); 2083 mtx_unlock(&metadata_lock); 2084 } 2085 rw_unlock(&hio_remote_lock[ncomp]); 2086 if (directreads) { 2087 directreads = false; 2088 enable_direct_reads(res); 2089 } 2090 continue; 2091 } 2092 pjdlog_debug(2, "sync: Taking free request."); 2093 QUEUE_TAKE2(hio, free); 2094 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 2095 /* 2096 * Lock the range we are going to synchronize. We don't want 2097 * race where someone writes between our read and write. 2098 */ 2099 for (;;) { 2100 mtx_lock(&range_lock); 2101 if (rangelock_islocked(range_regular, offset, length)) { 2102 pjdlog_debug(2, 2103 "sync: Range offset=%jd length=%jd locked.", 2104 (intmax_t)offset, (intmax_t)length); 2105 range_sync_wait = true; 2106 cv_wait(&range_sync_cond, &range_lock); 2107 range_sync_wait = false; 2108 mtx_unlock(&range_lock); 2109 continue; 2110 } 2111 if (rangelock_add(range_sync, offset, length) == -1) { 2112 mtx_unlock(&range_lock); 2113 pjdlog_debug(2, 2114 "sync: Range offset=%jd length=%jd is already locked, waiting.", 2115 (intmax_t)offset, (intmax_t)length); 2116 sleep(1); 2117 continue; 2118 } 2119 mtx_unlock(&range_lock); 2120 break; 2121 } 2122 /* 2123 * First read the data from synchronization source. 2124 */ 2125 SYNCREQ(hio); 2126 ggio = &hio->hio_ggio; 2127 ggio->gctl_cmd = BIO_READ; 2128 ggio->gctl_offset = offset; 2129 ggio->gctl_length = length; 2130 ggio->gctl_error = 0; 2131 hio->hio_done = false; 2132 hio->hio_replication = res->hr_replication; 2133 for (ii = 0; ii < ncomps; ii++) 2134 hio->hio_errors[ii] = EINVAL; 2135 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2136 hio); 2137 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2138 hio); 2139 mtx_lock(&metadata_lock); 2140 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2141 /* 2142 * This range is up-to-date on local component, 2143 * so handle request locally. 2144 */ 2145 /* Local component is 0 for now. */ 2146 ncomp = 0; 2147 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2148 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2149 /* 2150 * This range is out-of-date on local component, 2151 * so send request to the remote node. 2152 */ 2153 /* Remote component is 1 for now. */ 2154 ncomp = 1; 2155 } 2156 mtx_unlock(&metadata_lock); 2157 refcnt_init(&hio->hio_countdown, 1); 2158 QUEUE_INSERT1(hio, send, ncomp); 2159 2160 /* 2161 * Let's wait for READ to finish. 2162 */ 2163 mtx_lock(&sync_lock); 2164 while (!ISSYNCREQDONE(hio)) 2165 cv_wait(&sync_cond, &sync_lock); 2166 mtx_unlock(&sync_lock); 2167 2168 if (hio->hio_errors[ncomp] != 0) { 2169 pjdlog_error("Unable to read synchronization data: %s.", 2170 strerror(hio->hio_errors[ncomp])); 2171 goto free_queue; 2172 } 2173 2174 /* 2175 * We read the data from synchronization source, now write it 2176 * to synchronization target. 2177 */ 2178 SYNCREQ(hio); 2179 ggio->gctl_cmd = BIO_WRITE; 2180 for (ii = 0; ii < ncomps; ii++) 2181 hio->hio_errors[ii] = EINVAL; 2182 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2183 hio); 2184 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2185 hio); 2186 mtx_lock(&metadata_lock); 2187 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2188 /* 2189 * This range is up-to-date on local component, 2190 * so we update remote component. 2191 */ 2192 /* Remote component is 1 for now. */ 2193 ncomp = 1; 2194 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2195 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2196 /* 2197 * This range is out-of-date on local component, 2198 * so we update it. 2199 */ 2200 /* Local component is 0 for now. */ 2201 ncomp = 0; 2202 } 2203 mtx_unlock(&metadata_lock); 2204 2205 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2206 hio); 2207 refcnt_init(&hio->hio_countdown, 1); 2208 QUEUE_INSERT1(hio, send, ncomp); 2209 2210 /* 2211 * Let's wait for WRITE to finish. 2212 */ 2213 mtx_lock(&sync_lock); 2214 while (!ISSYNCREQDONE(hio)) 2215 cv_wait(&sync_cond, &sync_lock); 2216 mtx_unlock(&sync_lock); 2217 2218 if (hio->hio_errors[ncomp] != 0) { 2219 pjdlog_error("Unable to write synchronization data: %s.", 2220 strerror(hio->hio_errors[ncomp])); 2221 goto free_queue; 2222 } 2223 2224 synced += length; 2225 free_queue: 2226 mtx_lock(&range_lock); 2227 rangelock_del(range_sync, offset, length); 2228 if (range_regular_wait) 2229 cv_signal(&range_regular_cond); 2230 mtx_unlock(&range_lock); 2231 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2232 hio); 2233 QUEUE_INSERT2(hio, free); 2234 } 2235 /* NOTREACHED */ 2236 return (NULL); 2237 } 2238 2239 void 2240 primary_config_reload(struct hast_resource *res, struct nv *nv) 2241 { 2242 unsigned int ii, ncomps; 2243 int modified, vint; 2244 const char *vstr; 2245 2246 pjdlog_info("Reloading configuration..."); 2247 2248 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2249 PJDLOG_ASSERT(gres == res); 2250 nv_assert(nv, "remoteaddr"); 2251 nv_assert(nv, "sourceaddr"); 2252 nv_assert(nv, "replication"); 2253 nv_assert(nv, "checksum"); 2254 nv_assert(nv, "compression"); 2255 nv_assert(nv, "timeout"); 2256 nv_assert(nv, "exec"); 2257 nv_assert(nv, "metaflush"); 2258 2259 ncomps = HAST_NCOMPONENTS; 2260 2261 #define MODIFIED_REMOTEADDR 0x01 2262 #define MODIFIED_SOURCEADDR 0x02 2263 #define MODIFIED_REPLICATION 0x04 2264 #define MODIFIED_CHECKSUM 0x08 2265 #define MODIFIED_COMPRESSION 0x10 2266 #define MODIFIED_TIMEOUT 0x20 2267 #define MODIFIED_EXEC 0x40 2268 #define MODIFIED_METAFLUSH 0x80 2269 modified = 0; 2270 2271 vstr = nv_get_string(nv, "remoteaddr"); 2272 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2273 /* 2274 * Don't copy res->hr_remoteaddr to gres just yet. 2275 * We want remote_close() to log disconnect from the old 2276 * addresses, not from the new ones. 2277 */ 2278 modified |= MODIFIED_REMOTEADDR; 2279 } 2280 vstr = nv_get_string(nv, "sourceaddr"); 2281 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2282 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2283 modified |= MODIFIED_SOURCEADDR; 2284 } 2285 vint = nv_get_int32(nv, "replication"); 2286 if (gres->hr_replication != vint) { 2287 gres->hr_replication = vint; 2288 modified |= MODIFIED_REPLICATION; 2289 } 2290 vint = nv_get_int32(nv, "checksum"); 2291 if (gres->hr_checksum != vint) { 2292 gres->hr_checksum = vint; 2293 modified |= MODIFIED_CHECKSUM; 2294 } 2295 vint = nv_get_int32(nv, "compression"); 2296 if (gres->hr_compression != vint) { 2297 gres->hr_compression = vint; 2298 modified |= MODIFIED_COMPRESSION; 2299 } 2300 vint = nv_get_int32(nv, "timeout"); 2301 if (gres->hr_timeout != vint) { 2302 gres->hr_timeout = vint; 2303 modified |= MODIFIED_TIMEOUT; 2304 } 2305 vstr = nv_get_string(nv, "exec"); 2306 if (strcmp(gres->hr_exec, vstr) != 0) { 2307 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2308 modified |= MODIFIED_EXEC; 2309 } 2310 vint = nv_get_int32(nv, "metaflush"); 2311 if (gres->hr_metaflush != vint) { 2312 gres->hr_metaflush = vint; 2313 modified |= MODIFIED_METAFLUSH; 2314 } 2315 2316 /* 2317 * Change timeout for connected sockets. 2318 * Don't bother if we need to reconnect. 2319 */ 2320 if ((modified & MODIFIED_TIMEOUT) != 0 && 2321 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2322 for (ii = 0; ii < ncomps; ii++) { 2323 if (!ISREMOTE(ii)) 2324 continue; 2325 rw_rlock(&hio_remote_lock[ii]); 2326 if (!ISCONNECTED(gres, ii)) { 2327 rw_unlock(&hio_remote_lock[ii]); 2328 continue; 2329 } 2330 rw_unlock(&hio_remote_lock[ii]); 2331 if (proto_timeout(gres->hr_remotein, 2332 gres->hr_timeout) == -1) { 2333 pjdlog_errno(LOG_WARNING, 2334 "Unable to set connection timeout"); 2335 } 2336 if (proto_timeout(gres->hr_remoteout, 2337 gres->hr_timeout) == -1) { 2338 pjdlog_errno(LOG_WARNING, 2339 "Unable to set connection timeout"); 2340 } 2341 } 2342 } 2343 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2344 for (ii = 0; ii < ncomps; ii++) { 2345 if (!ISREMOTE(ii)) 2346 continue; 2347 remote_close(gres, ii); 2348 } 2349 if (modified & MODIFIED_REMOTEADDR) { 2350 vstr = nv_get_string(nv, "remoteaddr"); 2351 strlcpy(gres->hr_remoteaddr, vstr, 2352 sizeof(gres->hr_remoteaddr)); 2353 } 2354 } 2355 #undef MODIFIED_REMOTEADDR 2356 #undef MODIFIED_SOURCEADDR 2357 #undef MODIFIED_REPLICATION 2358 #undef MODIFIED_CHECKSUM 2359 #undef MODIFIED_COMPRESSION 2360 #undef MODIFIED_TIMEOUT 2361 #undef MODIFIED_EXEC 2362 #undef MODIFIED_METAFLUSH 2363 2364 pjdlog_info("Configuration reloaded successfully."); 2365 } 2366 2367 static void 2368 guard_one(struct hast_resource *res, unsigned int ncomp) 2369 { 2370 struct proto_conn *in, *out; 2371 2372 if (!ISREMOTE(ncomp)) 2373 return; 2374 2375 rw_rlock(&hio_remote_lock[ncomp]); 2376 2377 if (!real_remote(res)) { 2378 rw_unlock(&hio_remote_lock[ncomp]); 2379 return; 2380 } 2381 2382 if (ISCONNECTED(res, ncomp)) { 2383 PJDLOG_ASSERT(res->hr_remotein != NULL); 2384 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2385 rw_unlock(&hio_remote_lock[ncomp]); 2386 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2387 res->hr_remoteaddr); 2388 return; 2389 } 2390 2391 PJDLOG_ASSERT(res->hr_remotein == NULL); 2392 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2393 /* 2394 * Upgrade the lock. It doesn't have to be atomic as no other thread 2395 * can change connection status from disconnected to connected. 2396 */ 2397 rw_unlock(&hio_remote_lock[ncomp]); 2398 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2399 res->hr_remoteaddr); 2400 in = out = NULL; 2401 if (init_remote(res, &in, &out) == 0) { 2402 rw_wlock(&hio_remote_lock[ncomp]); 2403 PJDLOG_ASSERT(res->hr_remotein == NULL); 2404 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2405 PJDLOG_ASSERT(in != NULL && out != NULL); 2406 res->hr_remotein = in; 2407 res->hr_remoteout = out; 2408 rw_unlock(&hio_remote_lock[ncomp]); 2409 pjdlog_info("Successfully reconnected to %s.", 2410 res->hr_remoteaddr); 2411 sync_start(); 2412 } else { 2413 /* Both connections should be NULL. */ 2414 PJDLOG_ASSERT(res->hr_remotein == NULL); 2415 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2416 PJDLOG_ASSERT(in == NULL && out == NULL); 2417 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2418 res->hr_remoteaddr); 2419 } 2420 } 2421 2422 /* 2423 * Thread guards remote connections and reconnects when needed, handles 2424 * signals, etc. 2425 */ 2426 static void * 2427 guard_thread(void *arg) 2428 { 2429 struct hast_resource *res = arg; 2430 unsigned int ii, ncomps; 2431 struct timespec timeout; 2432 time_t lastcheck, now; 2433 sigset_t mask; 2434 int signo; 2435 2436 ncomps = HAST_NCOMPONENTS; 2437 lastcheck = time(NULL); 2438 2439 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2440 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2441 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2442 2443 timeout.tv_sec = HAST_KEEPALIVE; 2444 timeout.tv_nsec = 0; 2445 signo = -1; 2446 2447 for (;;) { 2448 switch (signo) { 2449 case SIGINT: 2450 case SIGTERM: 2451 sigexit_received = true; 2452 primary_exitx(EX_OK, 2453 "Termination signal received, exiting."); 2454 break; 2455 default: 2456 break; 2457 } 2458 2459 /* 2460 * Don't check connections until we fully started, 2461 * as we may still be looping, waiting for remote node 2462 * to switch from primary to secondary. 2463 */ 2464 if (fullystarted) { 2465 pjdlog_debug(2, "remote_guard: Checking connections."); 2466 now = time(NULL); 2467 if (lastcheck + HAST_KEEPALIVE <= now) { 2468 for (ii = 0; ii < ncomps; ii++) 2469 guard_one(res, ii); 2470 lastcheck = now; 2471 } 2472 } 2473 signo = sigtimedwait(&mask, NULL, &timeout); 2474 } 2475 /* NOTREACHED */ 2476 return (NULL); 2477 } 2478