1 /*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * All rights reserved. 4 * 5 * This software was developed by Pawel Jakub Dawidek under sponsorship from 6 * the FreeBSD Foundation. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30 #include <sys/cdefs.h> 31 __FBSDID("$FreeBSD$"); 32 33 #include <sys/types.h> 34 #include <sys/time.h> 35 #include <sys/bio.h> 36 #include <sys/disk.h> 37 #include <sys/refcount.h> 38 #include <sys/stat.h> 39 40 #include <geom/gate/g_gate.h> 41 42 #include <assert.h> 43 #include <err.h> 44 #include <errno.h> 45 #include <fcntl.h> 46 #include <libgeom.h> 47 #include <pthread.h> 48 #include <stdint.h> 49 #include <stdio.h> 50 #include <string.h> 51 #include <sysexits.h> 52 #include <unistd.h> 53 54 #include <activemap.h> 55 #include <nv.h> 56 #include <rangelock.h> 57 58 #include "control.h" 59 #include "hast.h" 60 #include "hast_proto.h" 61 #include "hastd.h" 62 #include "metadata.h" 63 #include "proto.h" 64 #include "pjdlog.h" 65 #include "subr.h" 66 #include "synch.h" 67 68 struct hio { 69 /* 70 * Number of components we are still waiting for. 71 * When this field goes to 0, we can send the request back to the 72 * kernel. Each component has to decrease this counter by one 73 * even on failure. 74 */ 75 unsigned int hio_countdown; 76 /* 77 * Each component has a place to store its own error. 78 * Once the request is handled by all components we can decide if the 79 * request overall is successful or not. 80 */ 81 int *hio_errors; 82 /* 83 * Structure used to comunicate with GEOM Gate class. 84 */ 85 struct g_gate_ctl_io hio_ggio; 86 TAILQ_ENTRY(hio) *hio_next; 87 }; 88 #define hio_free_next hio_next[0] 89 #define hio_done_next hio_next[0] 90 91 /* 92 * Free list holds unused structures. When free list is empty, we have to wait 93 * until some in-progress requests are freed. 94 */ 95 static TAILQ_HEAD(, hio) hio_free_list; 96 static pthread_mutex_t hio_free_list_lock; 97 static pthread_cond_t hio_free_list_cond; 98 /* 99 * There is one send list for every component. One requests is placed on all 100 * send lists - each component gets the same request, but each component is 101 * responsible for managing his own send list. 102 */ 103 static TAILQ_HEAD(, hio) *hio_send_list; 104 static pthread_mutex_t *hio_send_list_lock; 105 static pthread_cond_t *hio_send_list_cond; 106 /* 107 * There is one recv list for every component, although local components don't 108 * use recv lists as local requests are done synchronously. 109 */ 110 static TAILQ_HEAD(, hio) *hio_recv_list; 111 static pthread_mutex_t *hio_recv_list_lock; 112 static pthread_cond_t *hio_recv_list_cond; 113 /* 114 * Request is placed on done list by the slowest component (the one that 115 * decreased hio_countdown from 1 to 0). 116 */ 117 static TAILQ_HEAD(, hio) hio_done_list; 118 static pthread_mutex_t hio_done_list_lock; 119 static pthread_cond_t hio_done_list_cond; 120 /* 121 * Structure below are for interaction with sync thread. 122 */ 123 static bool sync_inprogress; 124 static pthread_mutex_t sync_lock; 125 static pthread_cond_t sync_cond; 126 /* 127 * The lock below allows to synchornize access to remote connections. 128 */ 129 static pthread_rwlock_t *hio_remote_lock; 130 static pthread_mutex_t hio_guard_lock; 131 static pthread_cond_t hio_guard_cond; 132 133 /* 134 * Lock to synchronize metadata updates. Also synchronize access to 135 * hr_primary_localcnt and hr_primary_remotecnt fields. 136 */ 137 static pthread_mutex_t metadata_lock; 138 139 /* 140 * Maximum number of outstanding I/O requests. 141 */ 142 #define HAST_HIO_MAX 256 143 /* 144 * Number of components. At this point there are only two components: local 145 * and remote, but in the future it might be possible to use multiple local 146 * and remote components. 147 */ 148 #define HAST_NCOMPONENTS 2 149 /* 150 * Number of seconds to sleep before next reconnect try. 151 */ 152 #define RECONNECT_SLEEP 5 153 154 #define ISCONNECTED(res, no) \ 155 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156 157 #define QUEUE_INSERT1(hio, name, ncomp) do { \ 158 bool _wakeup; \ 159 \ 160 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163 hio_next[(ncomp)]); \ 164 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165 if (_wakeup) \ 166 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167 } while (0) 168 #define QUEUE_INSERT2(hio, name) do { \ 169 bool _wakeup; \ 170 \ 171 mtx_lock(&hio_##name##_list_lock); \ 172 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174 mtx_unlock(&hio_##name##_list_lock); \ 175 if (_wakeup) \ 176 cv_signal(&hio_##name##_list_cond); \ 177 } while (0) 178 #define QUEUE_TAKE1(hio, name, ncomp) do { \ 179 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 180 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 181 cv_wait(&hio_##name##_list_cond[(ncomp)], \ 182 &hio_##name##_list_lock[(ncomp)]); \ 183 } \ 184 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 185 hio_next[(ncomp)]); \ 186 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 187 } while (0) 188 #define QUEUE_TAKE2(hio, name) do { \ 189 mtx_lock(&hio_##name##_list_lock); \ 190 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 191 cv_wait(&hio_##name##_list_cond, \ 192 &hio_##name##_list_lock); \ 193 } \ 194 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 195 mtx_unlock(&hio_##name##_list_lock); \ 196 } while (0) 197 198 #define SYNCREQ(hio) do { (hio)->hio_ggio.gctl_unit = -1; } while (0) 199 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 200 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 201 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 202 203 static struct hast_resource *gres; 204 205 static pthread_mutex_t range_lock; 206 static struct rangelocks *range_regular; 207 static bool range_regular_wait; 208 static pthread_cond_t range_regular_cond; 209 static struct rangelocks *range_sync; 210 static bool range_sync_wait; 211 static pthread_cond_t range_sync_cond; 212 213 static void *ggate_recv_thread(void *arg); 214 static void *local_send_thread(void *arg); 215 static void *remote_send_thread(void *arg); 216 static void *remote_recv_thread(void *arg); 217 static void *ggate_send_thread(void *arg); 218 static void *sync_thread(void *arg); 219 static void *guard_thread(void *arg); 220 221 static void sighandler(int sig); 222 223 static void 224 cleanup(struct hast_resource *res) 225 { 226 int rerrno; 227 228 /* Remember errno. */ 229 rerrno = errno; 230 231 /* 232 * Close descriptor to /dev/hast/<name> 233 * to work-around race in the kernel. 234 */ 235 close(res->hr_localfd); 236 237 /* Destroy ggate provider if we created one. */ 238 if (res->hr_ggateunit >= 0) { 239 struct g_gate_ctl_destroy ggiod; 240 241 ggiod.gctl_version = G_GATE_VERSION; 242 ggiod.gctl_unit = res->hr_ggateunit; 243 ggiod.gctl_force = 1; 244 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 245 pjdlog_warning("Unable to destroy hast/%s device", 246 res->hr_provname); 247 } 248 res->hr_ggateunit = -1; 249 } 250 251 /* Restore errno. */ 252 errno = rerrno; 253 } 254 255 static void 256 primary_exit(int exitcode, const char *fmt, ...) 257 { 258 va_list ap; 259 260 assert(exitcode != EX_OK); 261 va_start(ap, fmt); 262 pjdlogv_errno(LOG_ERR, fmt, ap); 263 va_end(ap); 264 cleanup(gres); 265 exit(exitcode); 266 } 267 268 static void 269 primary_exitx(int exitcode, const char *fmt, ...) 270 { 271 va_list ap; 272 273 va_start(ap, fmt); 274 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 275 va_end(ap); 276 cleanup(gres); 277 exit(exitcode); 278 } 279 280 static int 281 hast_activemap_flush(struct hast_resource *res) 282 { 283 const unsigned char *buf; 284 size_t size; 285 286 buf = activemap_bitmap(res->hr_amp, &size); 287 assert(buf != NULL); 288 assert((size % res->hr_local_sectorsize) == 0); 289 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 290 (ssize_t)size) { 291 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 292 "Unable to flush activemap to disk")); 293 return (-1); 294 } 295 return (0); 296 } 297 298 static void 299 init_environment(struct hast_resource *res __unused) 300 { 301 struct hio *hio; 302 unsigned int ii, ncomps; 303 304 /* 305 * In the future it might be per-resource value. 306 */ 307 ncomps = HAST_NCOMPONENTS; 308 309 /* 310 * Allocate memory needed by lists. 311 */ 312 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 313 if (hio_send_list == NULL) { 314 primary_exitx(EX_TEMPFAIL, 315 "Unable to allocate %zu bytes of memory for send lists.", 316 sizeof(hio_send_list[0]) * ncomps); 317 } 318 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 319 if (hio_send_list_lock == NULL) { 320 primary_exitx(EX_TEMPFAIL, 321 "Unable to allocate %zu bytes of memory for send list locks.", 322 sizeof(hio_send_list_lock[0]) * ncomps); 323 } 324 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 325 if (hio_send_list_cond == NULL) { 326 primary_exitx(EX_TEMPFAIL, 327 "Unable to allocate %zu bytes of memory for send list condition variables.", 328 sizeof(hio_send_list_cond[0]) * ncomps); 329 } 330 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 331 if (hio_recv_list == NULL) { 332 primary_exitx(EX_TEMPFAIL, 333 "Unable to allocate %zu bytes of memory for recv lists.", 334 sizeof(hio_recv_list[0]) * ncomps); 335 } 336 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 337 if (hio_recv_list_lock == NULL) { 338 primary_exitx(EX_TEMPFAIL, 339 "Unable to allocate %zu bytes of memory for recv list locks.", 340 sizeof(hio_recv_list_lock[0]) * ncomps); 341 } 342 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 343 if (hio_recv_list_cond == NULL) { 344 primary_exitx(EX_TEMPFAIL, 345 "Unable to allocate %zu bytes of memory for recv list condition variables.", 346 sizeof(hio_recv_list_cond[0]) * ncomps); 347 } 348 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 349 if (hio_remote_lock == NULL) { 350 primary_exitx(EX_TEMPFAIL, 351 "Unable to allocate %zu bytes of memory for remote connections locks.", 352 sizeof(hio_remote_lock[0]) * ncomps); 353 } 354 355 /* 356 * Initialize lists, their locks and theirs condition variables. 357 */ 358 TAILQ_INIT(&hio_free_list); 359 mtx_init(&hio_free_list_lock); 360 cv_init(&hio_free_list_cond); 361 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 362 TAILQ_INIT(&hio_send_list[ii]); 363 mtx_init(&hio_send_list_lock[ii]); 364 cv_init(&hio_send_list_cond[ii]); 365 TAILQ_INIT(&hio_recv_list[ii]); 366 mtx_init(&hio_recv_list_lock[ii]); 367 cv_init(&hio_recv_list_cond[ii]); 368 rw_init(&hio_remote_lock[ii]); 369 } 370 TAILQ_INIT(&hio_done_list); 371 mtx_init(&hio_done_list_lock); 372 cv_init(&hio_done_list_cond); 373 mtx_init(&hio_guard_lock); 374 cv_init(&hio_guard_cond); 375 mtx_init(&metadata_lock); 376 377 /* 378 * Allocate requests pool and initialize requests. 379 */ 380 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 381 hio = malloc(sizeof(*hio)); 382 if (hio == NULL) { 383 primary_exitx(EX_TEMPFAIL, 384 "Unable to allocate %zu bytes of memory for hio request.", 385 sizeof(*hio)); 386 } 387 hio->hio_countdown = 0; 388 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 389 if (hio->hio_errors == NULL) { 390 primary_exitx(EX_TEMPFAIL, 391 "Unable allocate %zu bytes of memory for hio errors.", 392 sizeof(hio->hio_errors[0]) * ncomps); 393 } 394 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 395 if (hio->hio_next == NULL) { 396 primary_exitx(EX_TEMPFAIL, 397 "Unable allocate %zu bytes of memory for hio_next field.", 398 sizeof(hio->hio_next[0]) * ncomps); 399 } 400 hio->hio_ggio.gctl_version = G_GATE_VERSION; 401 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 402 if (hio->hio_ggio.gctl_data == NULL) { 403 primary_exitx(EX_TEMPFAIL, 404 "Unable to allocate %zu bytes of memory for gctl_data.", 405 MAXPHYS); 406 } 407 hio->hio_ggio.gctl_length = MAXPHYS; 408 hio->hio_ggio.gctl_error = 0; 409 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 410 } 411 412 /* 413 * Turn on signals handling. 414 */ 415 signal(SIGINT, sighandler); 416 signal(SIGTERM, sighandler); 417 } 418 419 static void 420 init_local(struct hast_resource *res) 421 { 422 unsigned char *buf; 423 size_t mapsize; 424 425 if (metadata_read(res, true) < 0) 426 exit(EX_NOINPUT); 427 mtx_init(&res->hr_amp_lock); 428 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 429 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 430 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 431 } 432 mtx_init(&range_lock); 433 cv_init(&range_regular_cond); 434 if (rangelock_init(&range_regular) < 0) 435 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 436 cv_init(&range_sync_cond); 437 if (rangelock_init(&range_sync) < 0) 438 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 439 mapsize = activemap_ondisk_size(res->hr_amp); 440 buf = calloc(1, mapsize); 441 if (buf == NULL) { 442 primary_exitx(EX_TEMPFAIL, 443 "Unable to allocate buffer for activemap."); 444 } 445 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 446 (ssize_t)mapsize) { 447 primary_exit(EX_NOINPUT, "Unable to read activemap"); 448 } 449 activemap_copyin(res->hr_amp, buf, mapsize); 450 if (res->hr_resuid != 0) 451 return; 452 /* 453 * We're using provider for the first time, so we have to generate 454 * resource unique identifier and initialize local and remote counts. 455 */ 456 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 457 res->hr_primary_localcnt = 1; 458 res->hr_primary_remotecnt = 0; 459 if (metadata_write(res) < 0) 460 exit(EX_NOINPUT); 461 } 462 463 static bool 464 init_remote(struct hast_resource *res, struct proto_conn **inp, 465 struct proto_conn **outp) 466 { 467 struct proto_conn *in, *out; 468 struct nv *nvout, *nvin; 469 const unsigned char *token; 470 unsigned char *map; 471 const char *errmsg; 472 int32_t extentsize; 473 int64_t datasize; 474 uint32_t mapsize; 475 size_t size; 476 477 assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 478 479 in = out = NULL; 480 481 /* Prepare outgoing connection with remote node. */ 482 if (proto_client(res->hr_remoteaddr, &out) < 0) { 483 primary_exit(EX_OSERR, "Unable to create connection to %s", 484 res->hr_remoteaddr); 485 } 486 /* Try to connect, but accept failure. */ 487 if (proto_connect(out) < 0) { 488 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 489 res->hr_remoteaddr); 490 goto close; 491 } 492 /* 493 * First handshake step. 494 * Setup outgoing connection with remote node. 495 */ 496 nvout = nv_alloc(); 497 nv_add_string(nvout, res->hr_name, "resource"); 498 if (nv_error(nvout) != 0) { 499 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 500 "Unable to allocate header for connection with %s", 501 res->hr_remoteaddr); 502 nv_free(nvout); 503 goto close; 504 } 505 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 506 pjdlog_errno(LOG_WARNING, 507 "Unable to send handshake header to %s", 508 res->hr_remoteaddr); 509 nv_free(nvout); 510 goto close; 511 } 512 nv_free(nvout); 513 if (hast_proto_recv_hdr(out, &nvin) < 0) { 514 pjdlog_errno(LOG_WARNING, 515 "Unable to receive handshake header from %s", 516 res->hr_remoteaddr); 517 goto close; 518 } 519 errmsg = nv_get_string(nvin, "errmsg"); 520 if (errmsg != NULL) { 521 pjdlog_warning("%s", errmsg); 522 nv_free(nvin); 523 goto close; 524 } 525 token = nv_get_uint8_array(nvin, &size, "token"); 526 if (token == NULL) { 527 pjdlog_warning("Handshake header from %s has no 'token' field.", 528 res->hr_remoteaddr); 529 nv_free(nvin); 530 goto close; 531 } 532 if (size != sizeof(res->hr_token)) { 533 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 534 res->hr_remoteaddr, size, sizeof(res->hr_token)); 535 nv_free(nvin); 536 goto close; 537 } 538 bcopy(token, res->hr_token, sizeof(res->hr_token)); 539 nv_free(nvin); 540 541 /* 542 * Second handshake step. 543 * Setup incoming connection with remote node. 544 */ 545 if (proto_client(res->hr_remoteaddr, &in) < 0) { 546 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 547 res->hr_remoteaddr); 548 } 549 /* Try to connect, but accept failure. */ 550 if (proto_connect(in) < 0) { 551 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 552 res->hr_remoteaddr); 553 goto close; 554 } 555 nvout = nv_alloc(); 556 nv_add_string(nvout, res->hr_name, "resource"); 557 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 558 "token"); 559 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 560 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 561 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 562 if (nv_error(nvout) != 0) { 563 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 564 "Unable to allocate header for connection with %s", 565 res->hr_remoteaddr); 566 nv_free(nvout); 567 goto close; 568 } 569 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 570 pjdlog_errno(LOG_WARNING, 571 "Unable to send handshake header to %s", 572 res->hr_remoteaddr); 573 nv_free(nvout); 574 goto close; 575 } 576 nv_free(nvout); 577 if (hast_proto_recv_hdr(out, &nvin) < 0) { 578 pjdlog_errno(LOG_WARNING, 579 "Unable to receive handshake header from %s", 580 res->hr_remoteaddr); 581 goto close; 582 } 583 errmsg = nv_get_string(nvin, "errmsg"); 584 if (errmsg != NULL) { 585 pjdlog_warning("%s", errmsg); 586 nv_free(nvin); 587 goto close; 588 } 589 datasize = nv_get_int64(nvin, "datasize"); 590 if (datasize != res->hr_datasize) { 591 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 592 (intmax_t)res->hr_datasize, (intmax_t)datasize); 593 nv_free(nvin); 594 goto close; 595 } 596 extentsize = nv_get_int32(nvin, "extentsize"); 597 if (extentsize != res->hr_extentsize) { 598 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 599 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 600 nv_free(nvin); 601 goto close; 602 } 603 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 604 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 605 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 606 map = NULL; 607 mapsize = nv_get_uint32(nvin, "mapsize"); 608 if (mapsize > 0) { 609 map = malloc(mapsize); 610 if (map == NULL) { 611 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 612 (uintmax_t)mapsize); 613 nv_free(nvin); 614 goto close; 615 } 616 /* 617 * Remote node have some dirty extents on its own, lets 618 * download its activemap. 619 */ 620 if (hast_proto_recv_data(res, out, nvin, map, 621 mapsize) < 0) { 622 pjdlog_errno(LOG_ERR, 623 "Unable to receive remote activemap"); 624 nv_free(nvin); 625 free(map); 626 goto close; 627 } 628 /* 629 * Merge local and remote bitmaps. 630 */ 631 activemap_merge(res->hr_amp, map, mapsize); 632 free(map); 633 /* 634 * Now that we merged bitmaps from both nodes, flush it to the 635 * disk before we start to synchronize. 636 */ 637 (void)hast_activemap_flush(res); 638 } 639 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 640 if (inp != NULL && outp != NULL) { 641 *inp = in; 642 *outp = out; 643 } else { 644 res->hr_remotein = in; 645 res->hr_remoteout = out; 646 } 647 return (true); 648 close: 649 proto_close(out); 650 if (in != NULL) 651 proto_close(in); 652 return (false); 653 } 654 655 static void 656 sync_start(void) 657 { 658 659 mtx_lock(&sync_lock); 660 sync_inprogress = true; 661 mtx_unlock(&sync_lock); 662 cv_signal(&sync_cond); 663 } 664 665 static void 666 init_ggate(struct hast_resource *res) 667 { 668 struct g_gate_ctl_create ggiocreate; 669 struct g_gate_ctl_cancel ggiocancel; 670 671 /* 672 * We communicate with ggate via /dev/ggctl. Open it. 673 */ 674 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 675 if (res->hr_ggatefd < 0) 676 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 677 /* 678 * Create provider before trying to connect, as connection failure 679 * is not critical, but may take some time. 680 */ 681 ggiocreate.gctl_version = G_GATE_VERSION; 682 ggiocreate.gctl_mediasize = res->hr_datasize; 683 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 684 ggiocreate.gctl_flags = 0; 685 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 686 ggiocreate.gctl_timeout = 0; 687 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 688 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 689 res->hr_provname); 690 bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info)); 691 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 692 pjdlog_info("Device hast/%s created.", res->hr_provname); 693 res->hr_ggateunit = ggiocreate.gctl_unit; 694 return; 695 } 696 if (errno != EEXIST) { 697 primary_exit(EX_OSERR, "Unable to create hast/%s device", 698 res->hr_provname); 699 } 700 pjdlog_debug(1, 701 "Device hast/%s already exists, we will try to take it over.", 702 res->hr_provname); 703 /* 704 * If we received EEXIST, we assume that the process who created the 705 * provider died and didn't clean up. In that case we will start from 706 * where he left of. 707 */ 708 ggiocancel.gctl_version = G_GATE_VERSION; 709 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 710 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 711 res->hr_provname); 712 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 713 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 714 res->hr_ggateunit = ggiocancel.gctl_unit; 715 return; 716 } 717 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 718 res->hr_provname); 719 } 720 721 void 722 hastd_primary(struct hast_resource *res) 723 { 724 pthread_t td; 725 pid_t pid; 726 int error; 727 728 gres = res; 729 730 /* 731 * Create communication channel between parent and child. 732 */ 733 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 734 KEEP_ERRNO((void)pidfile_remove(pfh)); 735 primary_exit(EX_OSERR, 736 "Unable to create control sockets between parent and child"); 737 } 738 739 pid = fork(); 740 if (pid < 0) { 741 KEEP_ERRNO((void)pidfile_remove(pfh)); 742 primary_exit(EX_OSERR, "Unable to fork"); 743 } 744 745 if (pid > 0) { 746 /* This is parent. */ 747 res->hr_workerpid = pid; 748 return; 749 } 750 (void)pidfile_close(pfh); 751 752 setproctitle("%s (primary)", res->hr_name); 753 754 init_local(res); 755 if (init_remote(res, NULL, NULL)) 756 sync_start(); 757 init_ggate(res); 758 init_environment(res); 759 error = pthread_create(&td, NULL, ggate_recv_thread, res); 760 assert(error == 0); 761 error = pthread_create(&td, NULL, local_send_thread, res); 762 assert(error == 0); 763 error = pthread_create(&td, NULL, remote_send_thread, res); 764 assert(error == 0); 765 error = pthread_create(&td, NULL, remote_recv_thread, res); 766 assert(error == 0); 767 error = pthread_create(&td, NULL, ggate_send_thread, res); 768 assert(error == 0); 769 error = pthread_create(&td, NULL, sync_thread, res); 770 assert(error == 0); 771 error = pthread_create(&td, NULL, ctrl_thread, res); 772 assert(error == 0); 773 (void)guard_thread(res); 774 } 775 776 static void 777 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 778 { 779 char msg[1024]; 780 va_list ap; 781 int len; 782 783 va_start(ap, fmt); 784 len = vsnprintf(msg, sizeof(msg), fmt, ap); 785 va_end(ap); 786 if ((size_t)len < sizeof(msg)) { 787 switch (ggio->gctl_cmd) { 788 case BIO_READ: 789 (void)snprintf(msg + len, sizeof(msg) - len, 790 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 791 (uintmax_t)ggio->gctl_length); 792 break; 793 case BIO_DELETE: 794 (void)snprintf(msg + len, sizeof(msg) - len, 795 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 796 (uintmax_t)ggio->gctl_length); 797 break; 798 case BIO_FLUSH: 799 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 800 break; 801 case BIO_WRITE: 802 (void)snprintf(msg + len, sizeof(msg) - len, 803 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 804 (uintmax_t)ggio->gctl_length); 805 break; 806 default: 807 (void)snprintf(msg + len, sizeof(msg) - len, 808 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 809 break; 810 } 811 } 812 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 813 } 814 815 static void 816 remote_close(struct hast_resource *res, int ncomp) 817 { 818 819 rw_wlock(&hio_remote_lock[ncomp]); 820 /* 821 * A race is possible between dropping rlock and acquiring wlock - 822 * another thread can close connection in-between. 823 */ 824 if (!ISCONNECTED(res, ncomp)) { 825 assert(res->hr_remotein == NULL); 826 assert(res->hr_remoteout == NULL); 827 rw_unlock(&hio_remote_lock[ncomp]); 828 return; 829 } 830 831 assert(res->hr_remotein != NULL); 832 assert(res->hr_remoteout != NULL); 833 834 pjdlog_debug(2, "Closing old incoming connection to %s.", 835 res->hr_remoteaddr); 836 proto_close(res->hr_remotein); 837 res->hr_remotein = NULL; 838 pjdlog_debug(2, "Closing old outgoing connection to %s.", 839 res->hr_remoteaddr); 840 proto_close(res->hr_remoteout); 841 res->hr_remoteout = NULL; 842 843 rw_unlock(&hio_remote_lock[ncomp]); 844 845 /* 846 * Stop synchronization if in-progress. 847 */ 848 mtx_lock(&sync_lock); 849 if (sync_inprogress) 850 sync_inprogress = false; 851 mtx_unlock(&sync_lock); 852 853 /* 854 * Wake up guard thread, so it can immediately start reconnect. 855 */ 856 mtx_lock(&hio_guard_lock); 857 cv_signal(&hio_guard_cond); 858 mtx_unlock(&hio_guard_lock); 859 } 860 861 /* 862 * Thread receives ggate I/O requests from the kernel and passes them to 863 * appropriate threads: 864 * WRITE - always goes to both local_send and remote_send threads 865 * READ (when the block is up-to-date on local component) - 866 * only local_send thread 867 * READ (when the block isn't up-to-date on local component) - 868 * only remote_send thread 869 * DELETE - always goes to both local_send and remote_send threads 870 * FLUSH - always goes to both local_send and remote_send threads 871 */ 872 static void * 873 ggate_recv_thread(void *arg) 874 { 875 struct hast_resource *res = arg; 876 struct g_gate_ctl_io *ggio; 877 struct hio *hio; 878 unsigned int ii, ncomp, ncomps; 879 int error; 880 881 ncomps = HAST_NCOMPONENTS; 882 883 for (;;) { 884 pjdlog_debug(2, "ggate_recv: Taking free request."); 885 QUEUE_TAKE2(hio, free); 886 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 887 ggio = &hio->hio_ggio; 888 ggio->gctl_unit = res->hr_ggateunit; 889 ggio->gctl_length = MAXPHYS; 890 ggio->gctl_error = 0; 891 pjdlog_debug(2, 892 "ggate_recv: (%p) Waiting for request from the kernel.", 893 hio); 894 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 895 if (sigexit_received) 896 pthread_exit(NULL); 897 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 898 } 899 error = ggio->gctl_error; 900 switch (error) { 901 case 0: 902 break; 903 case ECANCELED: 904 /* Exit gracefully. */ 905 if (!sigexit_received) { 906 pjdlog_debug(2, 907 "ggate_recv: (%p) Received cancel from the kernel.", 908 hio); 909 pjdlog_info("Received cancel from the kernel, exiting."); 910 } 911 pthread_exit(NULL); 912 case ENOMEM: 913 /* 914 * Buffer too small? Impossible, we allocate MAXPHYS 915 * bytes - request can't be bigger than that. 916 */ 917 /* FALLTHROUGH */ 918 case ENXIO: 919 default: 920 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 921 strerror(error)); 922 } 923 for (ii = 0; ii < ncomps; ii++) 924 hio->hio_errors[ii] = EINVAL; 925 reqlog(LOG_DEBUG, 2, ggio, 926 "ggate_recv: (%p) Request received from the kernel: ", 927 hio); 928 /* 929 * Inform all components about new write request. 930 * For read request prefer local component unless the given 931 * range is out-of-date, then use remote component. 932 */ 933 switch (ggio->gctl_cmd) { 934 case BIO_READ: 935 pjdlog_debug(2, 936 "ggate_recv: (%p) Moving request to the send queue.", 937 hio); 938 refcount_init(&hio->hio_countdown, 1); 939 mtx_lock(&metadata_lock); 940 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 941 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 942 /* 943 * This range is up-to-date on local component, 944 * so handle request locally. 945 */ 946 /* Local component is 0 for now. */ 947 ncomp = 0; 948 } else /* if (res->hr_syncsrc == 949 HAST_SYNCSRC_SECONDARY) */ { 950 assert(res->hr_syncsrc == 951 HAST_SYNCSRC_SECONDARY); 952 /* 953 * This range is out-of-date on local component, 954 * so send request to the remote node. 955 */ 956 /* Remote component is 1 for now. */ 957 ncomp = 1; 958 } 959 mtx_unlock(&metadata_lock); 960 QUEUE_INSERT1(hio, send, ncomp); 961 break; 962 case BIO_WRITE: 963 for (;;) { 964 mtx_lock(&range_lock); 965 if (rangelock_islocked(range_sync, 966 ggio->gctl_offset, ggio->gctl_length)) { 967 pjdlog_debug(2, 968 "regular: Range offset=%jd length=%zu locked.", 969 (intmax_t)ggio->gctl_offset, 970 (size_t)ggio->gctl_length); 971 range_regular_wait = true; 972 cv_wait(&range_regular_cond, &range_lock); 973 range_regular_wait = false; 974 mtx_unlock(&range_lock); 975 continue; 976 } 977 if (rangelock_add(range_regular, 978 ggio->gctl_offset, ggio->gctl_length) < 0) { 979 mtx_unlock(&range_lock); 980 pjdlog_debug(2, 981 "regular: Range offset=%jd length=%zu is already locked, waiting.", 982 (intmax_t)ggio->gctl_offset, 983 (size_t)ggio->gctl_length); 984 sleep(1); 985 continue; 986 } 987 mtx_unlock(&range_lock); 988 break; 989 } 990 mtx_lock(&res->hr_amp_lock); 991 if (activemap_write_start(res->hr_amp, 992 ggio->gctl_offset, ggio->gctl_length)) { 993 (void)hast_activemap_flush(res); 994 } 995 mtx_unlock(&res->hr_amp_lock); 996 /* FALLTHROUGH */ 997 case BIO_DELETE: 998 case BIO_FLUSH: 999 pjdlog_debug(2, 1000 "ggate_recv: (%p) Moving request to the send queues.", 1001 hio); 1002 refcount_init(&hio->hio_countdown, ncomps); 1003 for (ii = 0; ii < ncomps; ii++) 1004 QUEUE_INSERT1(hio, send, ii); 1005 break; 1006 } 1007 } 1008 /* NOTREACHED */ 1009 return (NULL); 1010 } 1011 1012 /* 1013 * Thread reads from or writes to local component. 1014 * If local read fails, it redirects it to remote_send thread. 1015 */ 1016 static void * 1017 local_send_thread(void *arg) 1018 { 1019 struct hast_resource *res = arg; 1020 struct g_gate_ctl_io *ggio; 1021 struct hio *hio; 1022 unsigned int ncomp, rncomp; 1023 ssize_t ret; 1024 1025 /* Local component is 0 for now. */ 1026 ncomp = 0; 1027 /* Remote component is 1 for now. */ 1028 rncomp = 1; 1029 1030 for (;;) { 1031 pjdlog_debug(2, "local_send: Taking request."); 1032 QUEUE_TAKE1(hio, send, ncomp); 1033 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1034 ggio = &hio->hio_ggio; 1035 switch (ggio->gctl_cmd) { 1036 case BIO_READ: 1037 ret = pread(res->hr_localfd, ggio->gctl_data, 1038 ggio->gctl_length, 1039 ggio->gctl_offset + res->hr_localoff); 1040 if (ret == ggio->gctl_length) 1041 hio->hio_errors[ncomp] = 0; 1042 else { 1043 /* 1044 * If READ failed, try to read from remote node. 1045 */ 1046 QUEUE_INSERT1(hio, send, rncomp); 1047 continue; 1048 } 1049 break; 1050 case BIO_WRITE: 1051 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1052 ggio->gctl_length, 1053 ggio->gctl_offset + res->hr_localoff); 1054 if (ret < 0) 1055 hio->hio_errors[ncomp] = errno; 1056 else if (ret != ggio->gctl_length) 1057 hio->hio_errors[ncomp] = EIO; 1058 else 1059 hio->hio_errors[ncomp] = 0; 1060 break; 1061 case BIO_DELETE: 1062 ret = g_delete(res->hr_localfd, 1063 ggio->gctl_offset + res->hr_localoff, 1064 ggio->gctl_length); 1065 if (ret < 0) 1066 hio->hio_errors[ncomp] = errno; 1067 else 1068 hio->hio_errors[ncomp] = 0; 1069 break; 1070 case BIO_FLUSH: 1071 ret = g_flush(res->hr_localfd); 1072 if (ret < 0) 1073 hio->hio_errors[ncomp] = errno; 1074 else 1075 hio->hio_errors[ncomp] = 0; 1076 break; 1077 } 1078 if (refcount_release(&hio->hio_countdown)) { 1079 if (ISSYNCREQ(hio)) { 1080 mtx_lock(&sync_lock); 1081 SYNCREQDONE(hio); 1082 mtx_unlock(&sync_lock); 1083 cv_signal(&sync_cond); 1084 } else { 1085 pjdlog_debug(2, 1086 "local_send: (%p) Moving request to the done queue.", 1087 hio); 1088 QUEUE_INSERT2(hio, done); 1089 } 1090 } 1091 } 1092 /* NOTREACHED */ 1093 return (NULL); 1094 } 1095 1096 /* 1097 * Thread sends request to secondary node. 1098 */ 1099 static void * 1100 remote_send_thread(void *arg) 1101 { 1102 struct hast_resource *res = arg; 1103 struct g_gate_ctl_io *ggio; 1104 struct hio *hio; 1105 struct nv *nv; 1106 unsigned int ncomp; 1107 bool wakeup; 1108 uint64_t offset, length; 1109 uint8_t cmd; 1110 void *data; 1111 1112 /* Remote component is 1 for now. */ 1113 ncomp = 1; 1114 1115 for (;;) { 1116 pjdlog_debug(2, "remote_send: Taking request."); 1117 QUEUE_TAKE1(hio, send, ncomp); 1118 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1119 ggio = &hio->hio_ggio; 1120 switch (ggio->gctl_cmd) { 1121 case BIO_READ: 1122 cmd = HIO_READ; 1123 data = NULL; 1124 offset = ggio->gctl_offset; 1125 length = ggio->gctl_length; 1126 break; 1127 case BIO_WRITE: 1128 cmd = HIO_WRITE; 1129 data = ggio->gctl_data; 1130 offset = ggio->gctl_offset; 1131 length = ggio->gctl_length; 1132 break; 1133 case BIO_DELETE: 1134 cmd = HIO_DELETE; 1135 data = NULL; 1136 offset = ggio->gctl_offset; 1137 length = ggio->gctl_length; 1138 break; 1139 case BIO_FLUSH: 1140 cmd = HIO_FLUSH; 1141 data = NULL; 1142 offset = 0; 1143 length = 0; 1144 break; 1145 default: 1146 assert(!"invalid condition"); 1147 abort(); 1148 } 1149 nv = nv_alloc(); 1150 nv_add_uint8(nv, cmd, "cmd"); 1151 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1152 nv_add_uint64(nv, offset, "offset"); 1153 nv_add_uint64(nv, length, "length"); 1154 if (nv_error(nv) != 0) { 1155 hio->hio_errors[ncomp] = nv_error(nv); 1156 pjdlog_debug(2, 1157 "remote_send: (%p) Unable to prepare header to send.", 1158 hio); 1159 reqlog(LOG_ERR, 0, ggio, 1160 "Unable to prepare header to send (%s): ", 1161 strerror(nv_error(nv))); 1162 /* Move failed request immediately to the done queue. */ 1163 goto done_queue; 1164 } 1165 pjdlog_debug(2, 1166 "remote_send: (%p) Moving request to the recv queue.", 1167 hio); 1168 /* 1169 * Protect connection from disappearing. 1170 */ 1171 rw_rlock(&hio_remote_lock[ncomp]); 1172 if (!ISCONNECTED(res, ncomp)) { 1173 rw_unlock(&hio_remote_lock[ncomp]); 1174 hio->hio_errors[ncomp] = ENOTCONN; 1175 goto done_queue; 1176 } 1177 /* 1178 * Move the request to recv queue before sending it, because 1179 * in different order we can get reply before we move request 1180 * to recv queue. 1181 */ 1182 mtx_lock(&hio_recv_list_lock[ncomp]); 1183 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1184 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1185 mtx_unlock(&hio_recv_list_lock[ncomp]); 1186 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1187 data != NULL ? length : 0) < 0) { 1188 hio->hio_errors[ncomp] = errno; 1189 rw_unlock(&hio_remote_lock[ncomp]); 1190 remote_close(res, ncomp); 1191 pjdlog_debug(2, 1192 "remote_send: (%p) Unable to send request.", hio); 1193 reqlog(LOG_ERR, 0, ggio, 1194 "Unable to send request (%s): ", 1195 strerror(hio->hio_errors[ncomp])); 1196 /* 1197 * Take request back from the receive queue and move 1198 * it immediately to the done queue. 1199 */ 1200 mtx_lock(&hio_recv_list_lock[ncomp]); 1201 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1202 mtx_unlock(&hio_recv_list_lock[ncomp]); 1203 goto done_queue; 1204 } 1205 rw_unlock(&hio_remote_lock[ncomp]); 1206 nv_free(nv); 1207 if (wakeup) 1208 cv_signal(&hio_recv_list_cond[ncomp]); 1209 continue; 1210 done_queue: 1211 nv_free(nv); 1212 if (ISSYNCREQ(hio)) { 1213 if (!refcount_release(&hio->hio_countdown)) 1214 continue; 1215 mtx_lock(&sync_lock); 1216 SYNCREQDONE(hio); 1217 mtx_unlock(&sync_lock); 1218 cv_signal(&sync_cond); 1219 continue; 1220 } 1221 if (ggio->gctl_cmd == BIO_WRITE) { 1222 mtx_lock(&res->hr_amp_lock); 1223 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1224 ggio->gctl_length)) { 1225 (void)hast_activemap_flush(res); 1226 } 1227 mtx_unlock(&res->hr_amp_lock); 1228 } 1229 if (!refcount_release(&hio->hio_countdown)) 1230 continue; 1231 pjdlog_debug(2, 1232 "remote_send: (%p) Moving request to the done queue.", 1233 hio); 1234 QUEUE_INSERT2(hio, done); 1235 } 1236 /* NOTREACHED */ 1237 return (NULL); 1238 } 1239 1240 /* 1241 * Thread receives answer from secondary node and passes it to ggate_send 1242 * thread. 1243 */ 1244 static void * 1245 remote_recv_thread(void *arg) 1246 { 1247 struct hast_resource *res = arg; 1248 struct g_gate_ctl_io *ggio; 1249 struct hio *hio; 1250 struct nv *nv; 1251 unsigned int ncomp; 1252 uint64_t seq; 1253 int error; 1254 1255 /* Remote component is 1 for now. */ 1256 ncomp = 1; 1257 1258 for (;;) { 1259 /* Wait until there is anything to receive. */ 1260 mtx_lock(&hio_recv_list_lock[ncomp]); 1261 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1262 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1263 cv_wait(&hio_recv_list_cond[ncomp], 1264 &hio_recv_list_lock[ncomp]); 1265 } 1266 mtx_unlock(&hio_recv_list_lock[ncomp]); 1267 rw_rlock(&hio_remote_lock[ncomp]); 1268 if (!ISCONNECTED(res, ncomp)) { 1269 rw_unlock(&hio_remote_lock[ncomp]); 1270 /* 1271 * Connection is dead, so move all pending requests to 1272 * the done queue (one-by-one). 1273 */ 1274 mtx_lock(&hio_recv_list_lock[ncomp]); 1275 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1276 assert(hio != NULL); 1277 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1278 hio_next[ncomp]); 1279 mtx_unlock(&hio_recv_list_lock[ncomp]); 1280 goto done_queue; 1281 } 1282 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1283 pjdlog_errno(LOG_ERR, 1284 "Unable to receive reply header"); 1285 rw_unlock(&hio_remote_lock[ncomp]); 1286 remote_close(res, ncomp); 1287 continue; 1288 } 1289 rw_unlock(&hio_remote_lock[ncomp]); 1290 seq = nv_get_uint64(nv, "seq"); 1291 if (seq == 0) { 1292 pjdlog_error("Header contains no 'seq' field."); 1293 nv_free(nv); 1294 continue; 1295 } 1296 mtx_lock(&hio_recv_list_lock[ncomp]); 1297 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1298 if (hio->hio_ggio.gctl_seq == seq) { 1299 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1300 hio_next[ncomp]); 1301 break; 1302 } 1303 } 1304 mtx_unlock(&hio_recv_list_lock[ncomp]); 1305 if (hio == NULL) { 1306 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1307 (uintmax_t)seq); 1308 nv_free(nv); 1309 continue; 1310 } 1311 error = nv_get_int16(nv, "error"); 1312 if (error != 0) { 1313 /* Request failed on remote side. */ 1314 hio->hio_errors[ncomp] = 0; 1315 nv_free(nv); 1316 goto done_queue; 1317 } 1318 ggio = &hio->hio_ggio; 1319 switch (ggio->gctl_cmd) { 1320 case BIO_READ: 1321 rw_rlock(&hio_remote_lock[ncomp]); 1322 if (!ISCONNECTED(res, ncomp)) { 1323 rw_unlock(&hio_remote_lock[ncomp]); 1324 nv_free(nv); 1325 goto done_queue; 1326 } 1327 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1328 ggio->gctl_data, ggio->gctl_length) < 0) { 1329 hio->hio_errors[ncomp] = errno; 1330 pjdlog_errno(LOG_ERR, 1331 "Unable to receive reply data"); 1332 rw_unlock(&hio_remote_lock[ncomp]); 1333 nv_free(nv); 1334 remote_close(res, ncomp); 1335 goto done_queue; 1336 } 1337 rw_unlock(&hio_remote_lock[ncomp]); 1338 break; 1339 case BIO_WRITE: 1340 case BIO_DELETE: 1341 case BIO_FLUSH: 1342 break; 1343 default: 1344 assert(!"invalid condition"); 1345 abort(); 1346 } 1347 hio->hio_errors[ncomp] = 0; 1348 nv_free(nv); 1349 done_queue: 1350 if (refcount_release(&hio->hio_countdown)) { 1351 if (ISSYNCREQ(hio)) { 1352 mtx_lock(&sync_lock); 1353 SYNCREQDONE(hio); 1354 mtx_unlock(&sync_lock); 1355 cv_signal(&sync_cond); 1356 } else { 1357 pjdlog_debug(2, 1358 "remote_recv: (%p) Moving request to the done queue.", 1359 hio); 1360 QUEUE_INSERT2(hio, done); 1361 } 1362 } 1363 } 1364 /* NOTREACHED */ 1365 return (NULL); 1366 } 1367 1368 /* 1369 * Thread sends answer to the kernel. 1370 */ 1371 static void * 1372 ggate_send_thread(void *arg) 1373 { 1374 struct hast_resource *res = arg; 1375 struct g_gate_ctl_io *ggio; 1376 struct hio *hio; 1377 unsigned int ii, ncomp, ncomps; 1378 1379 ncomps = HAST_NCOMPONENTS; 1380 1381 for (;;) { 1382 pjdlog_debug(2, "ggate_send: Taking request."); 1383 QUEUE_TAKE2(hio, done); 1384 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1385 ggio = &hio->hio_ggio; 1386 for (ii = 0; ii < ncomps; ii++) { 1387 if (hio->hio_errors[ii] == 0) { 1388 /* 1389 * One successful request is enough to declare 1390 * success. 1391 */ 1392 ggio->gctl_error = 0; 1393 break; 1394 } 1395 } 1396 if (ii == ncomps) { 1397 /* 1398 * None of the requests were successful. 1399 * Use first error. 1400 */ 1401 ggio->gctl_error = hio->hio_errors[0]; 1402 } 1403 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1404 mtx_lock(&res->hr_amp_lock); 1405 activemap_write_complete(res->hr_amp, 1406 ggio->gctl_offset, ggio->gctl_length); 1407 mtx_unlock(&res->hr_amp_lock); 1408 } 1409 if (ggio->gctl_cmd == BIO_WRITE) { 1410 /* 1411 * Unlock range we locked. 1412 */ 1413 mtx_lock(&range_lock); 1414 rangelock_del(range_regular, ggio->gctl_offset, 1415 ggio->gctl_length); 1416 if (range_sync_wait) 1417 cv_signal(&range_sync_cond); 1418 mtx_unlock(&range_lock); 1419 /* 1420 * Bump local count if this is first write after 1421 * connection failure with remote node. 1422 */ 1423 ncomp = 1; 1424 rw_rlock(&hio_remote_lock[ncomp]); 1425 if (!ISCONNECTED(res, ncomp)) { 1426 mtx_lock(&metadata_lock); 1427 if (res->hr_primary_localcnt == 1428 res->hr_secondary_remotecnt) { 1429 res->hr_primary_localcnt++; 1430 pjdlog_debug(1, 1431 "Increasing localcnt to %ju.", 1432 (uintmax_t)res->hr_primary_localcnt); 1433 (void)metadata_write(res); 1434 } 1435 mtx_unlock(&metadata_lock); 1436 } 1437 rw_unlock(&hio_remote_lock[ncomp]); 1438 } 1439 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1440 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1441 pjdlog_debug(2, 1442 "ggate_send: (%p) Moving request to the free queue.", hio); 1443 QUEUE_INSERT2(hio, free); 1444 } 1445 /* NOTREACHED */ 1446 return (NULL); 1447 } 1448 1449 /* 1450 * Thread synchronize local and remote components. 1451 */ 1452 static void * 1453 sync_thread(void *arg __unused) 1454 { 1455 struct hast_resource *res = arg; 1456 struct hio *hio; 1457 struct g_gate_ctl_io *ggio; 1458 unsigned int ii, ncomp, ncomps; 1459 off_t offset, length, synced; 1460 bool dorewind; 1461 int syncext; 1462 1463 ncomps = HAST_NCOMPONENTS; 1464 dorewind = true; 1465 synced = 0; 1466 1467 for (;;) { 1468 mtx_lock(&sync_lock); 1469 while (!sync_inprogress) { 1470 dorewind = true; 1471 synced = 0; 1472 cv_wait(&sync_cond, &sync_lock); 1473 } 1474 mtx_unlock(&sync_lock); 1475 /* 1476 * Obtain offset at which we should synchronize. 1477 * Rewind synchronization if needed. 1478 */ 1479 mtx_lock(&res->hr_amp_lock); 1480 if (dorewind) 1481 activemap_sync_rewind(res->hr_amp); 1482 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1483 if (syncext != -1) { 1484 /* 1485 * We synchronized entire syncext extent, we can mark 1486 * it as clean now. 1487 */ 1488 if (activemap_extent_complete(res->hr_amp, syncext)) 1489 (void)hast_activemap_flush(res); 1490 } 1491 mtx_unlock(&res->hr_amp_lock); 1492 if (dorewind) { 1493 dorewind = false; 1494 if (offset < 0) 1495 pjdlog_info("Nodes are in sync."); 1496 else { 1497 pjdlog_info("Synchronization started. %ju bytes to go.", 1498 (uintmax_t)(res->hr_extentsize * 1499 activemap_ndirty(res->hr_amp))); 1500 } 1501 } 1502 if (offset < 0) { 1503 mtx_lock(&sync_lock); 1504 sync_inprogress = false; 1505 mtx_unlock(&sync_lock); 1506 pjdlog_debug(1, "Nothing to synchronize."); 1507 /* 1508 * Synchronization complete, make both localcnt and 1509 * remotecnt equal. 1510 */ 1511 ncomp = 1; 1512 rw_rlock(&hio_remote_lock[ncomp]); 1513 if (ISCONNECTED(res, ncomp)) { 1514 if (synced > 0) { 1515 pjdlog_info("Synchronization complete. " 1516 "%jd bytes synchronized.", 1517 (intmax_t)synced); 1518 } 1519 mtx_lock(&metadata_lock); 1520 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1521 res->hr_primary_localcnt = 1522 res->hr_secondary_localcnt; 1523 res->hr_primary_remotecnt = 1524 res->hr_secondary_remotecnt; 1525 pjdlog_debug(1, 1526 "Setting localcnt to %ju and remotecnt to %ju.", 1527 (uintmax_t)res->hr_primary_localcnt, 1528 (uintmax_t)res->hr_secondary_localcnt); 1529 (void)metadata_write(res); 1530 mtx_unlock(&metadata_lock); 1531 } else if (synced > 0) { 1532 pjdlog_info("Synchronization interrupted. " 1533 "%jd bytes synchronized so far.", 1534 (intmax_t)synced); 1535 } 1536 rw_unlock(&hio_remote_lock[ncomp]); 1537 continue; 1538 } 1539 pjdlog_debug(2, "sync: Taking free request."); 1540 QUEUE_TAKE2(hio, free); 1541 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1542 /* 1543 * Lock the range we are going to synchronize. We don't want 1544 * race where someone writes between our read and write. 1545 */ 1546 for (;;) { 1547 mtx_lock(&range_lock); 1548 if (rangelock_islocked(range_regular, offset, length)) { 1549 pjdlog_debug(2, 1550 "sync: Range offset=%jd length=%jd locked.", 1551 (intmax_t)offset, (intmax_t)length); 1552 range_sync_wait = true; 1553 cv_wait(&range_sync_cond, &range_lock); 1554 range_sync_wait = false; 1555 mtx_unlock(&range_lock); 1556 continue; 1557 } 1558 if (rangelock_add(range_sync, offset, length) < 0) { 1559 mtx_unlock(&range_lock); 1560 pjdlog_debug(2, 1561 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1562 (intmax_t)offset, (intmax_t)length); 1563 sleep(1); 1564 continue; 1565 } 1566 mtx_unlock(&range_lock); 1567 break; 1568 } 1569 /* 1570 * First read the data from synchronization source. 1571 */ 1572 SYNCREQ(hio); 1573 ggio = &hio->hio_ggio; 1574 ggio->gctl_cmd = BIO_READ; 1575 ggio->gctl_offset = offset; 1576 ggio->gctl_length = length; 1577 ggio->gctl_error = 0; 1578 for (ii = 0; ii < ncomps; ii++) 1579 hio->hio_errors[ii] = EINVAL; 1580 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1581 hio); 1582 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1583 hio); 1584 mtx_lock(&metadata_lock); 1585 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1586 /* 1587 * This range is up-to-date on local component, 1588 * so handle request locally. 1589 */ 1590 /* Local component is 0 for now. */ 1591 ncomp = 0; 1592 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1593 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1594 /* 1595 * This range is out-of-date on local component, 1596 * so send request to the remote node. 1597 */ 1598 /* Remote component is 1 for now. */ 1599 ncomp = 1; 1600 } 1601 mtx_unlock(&metadata_lock); 1602 refcount_init(&hio->hio_countdown, 1); 1603 QUEUE_INSERT1(hio, send, ncomp); 1604 1605 /* 1606 * Let's wait for READ to finish. 1607 */ 1608 mtx_lock(&sync_lock); 1609 while (!ISSYNCREQDONE(hio)) 1610 cv_wait(&sync_cond, &sync_lock); 1611 mtx_unlock(&sync_lock); 1612 1613 if (hio->hio_errors[ncomp] != 0) { 1614 pjdlog_error("Unable to read synchronization data: %s.", 1615 strerror(hio->hio_errors[ncomp])); 1616 goto free_queue; 1617 } 1618 1619 /* 1620 * We read the data from synchronization source, now write it 1621 * to synchronization target. 1622 */ 1623 SYNCREQ(hio); 1624 ggio->gctl_cmd = BIO_WRITE; 1625 for (ii = 0; ii < ncomps; ii++) 1626 hio->hio_errors[ii] = EINVAL; 1627 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1628 hio); 1629 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1630 hio); 1631 mtx_lock(&metadata_lock); 1632 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1633 /* 1634 * This range is up-to-date on local component, 1635 * so we update remote component. 1636 */ 1637 /* Remote component is 1 for now. */ 1638 ncomp = 1; 1639 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1640 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1641 /* 1642 * This range is out-of-date on local component, 1643 * so we update it. 1644 */ 1645 /* Local component is 0 for now. */ 1646 ncomp = 0; 1647 } 1648 mtx_unlock(&metadata_lock); 1649 1650 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1651 hio); 1652 refcount_init(&hio->hio_countdown, 1); 1653 QUEUE_INSERT1(hio, send, ncomp); 1654 1655 /* 1656 * Let's wait for WRITE to finish. 1657 */ 1658 mtx_lock(&sync_lock); 1659 while (!ISSYNCREQDONE(hio)) 1660 cv_wait(&sync_cond, &sync_lock); 1661 mtx_unlock(&sync_lock); 1662 1663 if (hio->hio_errors[ncomp] != 0) { 1664 pjdlog_error("Unable to write synchronization data: %s.", 1665 strerror(hio->hio_errors[ncomp])); 1666 goto free_queue; 1667 } 1668 free_queue: 1669 mtx_lock(&range_lock); 1670 rangelock_del(range_sync, offset, length); 1671 if (range_regular_wait) 1672 cv_signal(&range_regular_cond); 1673 mtx_unlock(&range_lock); 1674 1675 synced += length; 1676 1677 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1678 hio); 1679 QUEUE_INSERT2(hio, free); 1680 } 1681 /* NOTREACHED */ 1682 return (NULL); 1683 } 1684 1685 static void 1686 sighandler(int sig) 1687 { 1688 bool unlock; 1689 1690 switch (sig) { 1691 case SIGINT: 1692 case SIGTERM: 1693 sigexit_received = true; 1694 break; 1695 default: 1696 assert(!"invalid condition"); 1697 } 1698 /* 1699 * XXX: Racy, but if we cannot obtain hio_guard_lock here, we don't 1700 * want to risk deadlock. 1701 */ 1702 unlock = mtx_trylock(&hio_guard_lock); 1703 cv_signal(&hio_guard_cond); 1704 if (unlock) 1705 mtx_unlock(&hio_guard_lock); 1706 } 1707 1708 /* 1709 * Thread guards remote connections and reconnects when needed, handles 1710 * signals, etc. 1711 */ 1712 static void * 1713 guard_thread(void *arg) 1714 { 1715 struct hast_resource *res = arg; 1716 struct proto_conn *in, *out; 1717 unsigned int ii, ncomps; 1718 int timeout; 1719 1720 ncomps = HAST_NCOMPONENTS; 1721 /* The is only one remote component for now. */ 1722 #define ISREMOTE(no) ((no) == 1) 1723 1724 for (;;) { 1725 if (sigexit_received) { 1726 primary_exitx(EX_OK, 1727 "Termination signal received, exiting."); 1728 } 1729 /* 1730 * If all the connection will be fine, we will sleep until 1731 * someone wakes us up. 1732 * If any of the connections will be broken and we won't be 1733 * able to connect, we will sleep only for RECONNECT_SLEEP 1734 * seconds so we can retry soon. 1735 */ 1736 timeout = 0; 1737 pjdlog_debug(2, "remote_guard: Checking connections."); 1738 mtx_lock(&hio_guard_lock); 1739 for (ii = 0; ii < ncomps; ii++) { 1740 if (!ISREMOTE(ii)) 1741 continue; 1742 rw_rlock(&hio_remote_lock[ii]); 1743 if (ISCONNECTED(res, ii)) { 1744 assert(res->hr_remotein != NULL); 1745 assert(res->hr_remoteout != NULL); 1746 rw_unlock(&hio_remote_lock[ii]); 1747 pjdlog_debug(2, 1748 "remote_guard: Connection to %s is ok.", 1749 res->hr_remoteaddr); 1750 } else { 1751 assert(res->hr_remotein == NULL); 1752 assert(res->hr_remoteout == NULL); 1753 /* 1754 * Upgrade the lock. It doesn't have to be 1755 * atomic as no other thread can change 1756 * connection status from disconnected to 1757 * connected. 1758 */ 1759 rw_unlock(&hio_remote_lock[ii]); 1760 pjdlog_debug(2, 1761 "remote_guard: Reconnecting to %s.", 1762 res->hr_remoteaddr); 1763 in = out = NULL; 1764 if (init_remote(res, &in, &out)) { 1765 rw_wlock(&hio_remote_lock[ii]); 1766 assert(res->hr_remotein == NULL); 1767 assert(res->hr_remoteout == NULL); 1768 assert(in != NULL && out != NULL); 1769 res->hr_remotein = in; 1770 res->hr_remoteout = out; 1771 rw_unlock(&hio_remote_lock[ii]); 1772 pjdlog_info("Successfully reconnected to %s.", 1773 res->hr_remoteaddr); 1774 sync_start(); 1775 } else { 1776 /* Both connections should be NULL. */ 1777 assert(res->hr_remotein == NULL); 1778 assert(res->hr_remoteout == NULL); 1779 assert(in == NULL && out == NULL); 1780 pjdlog_debug(2, 1781 "remote_guard: Reconnect to %s failed.", 1782 res->hr_remoteaddr); 1783 timeout = RECONNECT_SLEEP; 1784 } 1785 } 1786 } 1787 (void)cv_timedwait(&hio_guard_cond, &hio_guard_lock, timeout); 1788 mtx_unlock(&hio_guard_lock); 1789 } 1790 #undef ISREMOTE 1791 /* NOTREACHED */ 1792 return (NULL); 1793 } 1794