1 /*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * All rights reserved. 4 * 5 * This software was developed by Pawel Jakub Dawidek under sponsorship from 6 * the FreeBSD Foundation. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30 #include <sys/cdefs.h> 31 __FBSDID("$FreeBSD$"); 32 33 #include <sys/types.h> 34 #include <sys/time.h> 35 #include <sys/bio.h> 36 #include <sys/disk.h> 37 #include <sys/refcount.h> 38 #include <sys/stat.h> 39 40 #include <geom/gate/g_gate.h> 41 42 #include <assert.h> 43 #include <err.h> 44 #include <errno.h> 45 #include <fcntl.h> 46 #include <libgeom.h> 47 #include <pthread.h> 48 #include <stdint.h> 49 #include <stdio.h> 50 #include <string.h> 51 #include <sysexits.h> 52 #include <unistd.h> 53 54 #include <activemap.h> 55 #include <nv.h> 56 #include <rangelock.h> 57 58 #include "control.h" 59 #include "hast.h" 60 #include "hast_proto.h" 61 #include "hastd.h" 62 #include "metadata.h" 63 #include "proto.h" 64 #include "pjdlog.h" 65 #include "subr.h" 66 #include "synch.h" 67 68 struct hio { 69 /* 70 * Number of components we are still waiting for. 71 * When this field goes to 0, we can send the request back to the 72 * kernel. Each component has to decrease this counter by one 73 * even on failure. 74 */ 75 unsigned int hio_countdown; 76 /* 77 * Each component has a place to store its own error. 78 * Once the request is handled by all components we can decide if the 79 * request overall is successful or not. 80 */ 81 int *hio_errors; 82 /* 83 * Structure used to comunicate with GEOM Gate class. 84 */ 85 struct g_gate_ctl_io hio_ggio; 86 TAILQ_ENTRY(hio) *hio_next; 87 }; 88 #define hio_free_next hio_next[0] 89 #define hio_done_next hio_next[0] 90 91 /* 92 * Free list holds unused structures. When free list is empty, we have to wait 93 * until some in-progress requests are freed. 94 */ 95 static TAILQ_HEAD(, hio) hio_free_list; 96 static pthread_mutex_t hio_free_list_lock; 97 static pthread_cond_t hio_free_list_cond; 98 /* 99 * There is one send list for every component. One requests is placed on all 100 * send lists - each component gets the same request, but each component is 101 * responsible for managing his own send list. 102 */ 103 static TAILQ_HEAD(, hio) *hio_send_list; 104 static pthread_mutex_t *hio_send_list_lock; 105 static pthread_cond_t *hio_send_list_cond; 106 /* 107 * There is one recv list for every component, although local components don't 108 * use recv lists as local requests are done synchronously. 109 */ 110 static TAILQ_HEAD(, hio) *hio_recv_list; 111 static pthread_mutex_t *hio_recv_list_lock; 112 static pthread_cond_t *hio_recv_list_cond; 113 /* 114 * Request is placed on done list by the slowest component (the one that 115 * decreased hio_countdown from 1 to 0). 116 */ 117 static TAILQ_HEAD(, hio) hio_done_list; 118 static pthread_mutex_t hio_done_list_lock; 119 static pthread_cond_t hio_done_list_cond; 120 /* 121 * Structure below are for interaction with sync thread. 122 */ 123 static bool sync_inprogress; 124 static pthread_mutex_t sync_lock; 125 static pthread_cond_t sync_cond; 126 /* 127 * The lock below allows to synchornize access to remote connections. 128 */ 129 static pthread_rwlock_t *hio_remote_lock; 130 static pthread_mutex_t hio_guard_lock; 131 static pthread_cond_t hio_guard_cond; 132 133 /* 134 * Lock to synchronize metadata updates. Also synchronize access to 135 * hr_primary_localcnt and hr_primary_remotecnt fields. 136 */ 137 static pthread_mutex_t metadata_lock; 138 139 /* 140 * Maximum number of outstanding I/O requests. 141 */ 142 #define HAST_HIO_MAX 256 143 /* 144 * Number of components. At this point there are only two components: local 145 * and remote, but in the future it might be possible to use multiple local 146 * and remote components. 147 */ 148 #define HAST_NCOMPONENTS 2 149 /* 150 * Number of seconds to sleep before next reconnect try. 151 */ 152 #define RECONNECT_SLEEP 5 153 154 #define ISCONNECTED(res, no) \ 155 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156 157 #define QUEUE_INSERT1(hio, name, ncomp) do { \ 158 bool _wakeup; \ 159 \ 160 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163 hio_next[(ncomp)]); \ 164 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165 if (_wakeup) \ 166 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167 } while (0) 168 #define QUEUE_INSERT2(hio, name) do { \ 169 bool _wakeup; \ 170 \ 171 mtx_lock(&hio_##name##_list_lock); \ 172 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174 mtx_unlock(&hio_##name##_list_lock); \ 175 if (_wakeup) \ 176 cv_signal(&hio_##name##_list_cond); \ 177 } while (0) 178 #define QUEUE_TAKE1(hio, name, ncomp) do { \ 179 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 180 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 181 cv_wait(&hio_##name##_list_cond[(ncomp)], \ 182 &hio_##name##_list_lock[(ncomp)]); \ 183 } \ 184 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 185 hio_next[(ncomp)]); \ 186 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 187 } while (0) 188 #define QUEUE_TAKE2(hio, name) do { \ 189 mtx_lock(&hio_##name##_list_lock); \ 190 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 191 cv_wait(&hio_##name##_list_cond, \ 192 &hio_##name##_list_lock); \ 193 } \ 194 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 195 mtx_unlock(&hio_##name##_list_lock); \ 196 } while (0) 197 198 #define SYNCREQ(hio) do { \ 199 (hio)->hio_ggio.gctl_unit = -1; \ 200 (hio)->hio_ggio.gctl_seq = 1; \ 201 } while (0) 202 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 203 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 204 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 205 206 static struct hast_resource *gres; 207 208 static pthread_mutex_t range_lock; 209 static struct rangelocks *range_regular; 210 static bool range_regular_wait; 211 static pthread_cond_t range_regular_cond; 212 static struct rangelocks *range_sync; 213 static bool range_sync_wait; 214 static pthread_cond_t range_sync_cond; 215 216 static void *ggate_recv_thread(void *arg); 217 static void *local_send_thread(void *arg); 218 static void *remote_send_thread(void *arg); 219 static void *remote_recv_thread(void *arg); 220 static void *ggate_send_thread(void *arg); 221 static void *sync_thread(void *arg); 222 static void *guard_thread(void *arg); 223 224 static void sighandler(int sig); 225 226 static void 227 cleanup(struct hast_resource *res) 228 { 229 int rerrno; 230 231 /* Remember errno. */ 232 rerrno = errno; 233 234 /* 235 * Close descriptor to /dev/hast/<name> 236 * to work-around race in the kernel. 237 */ 238 close(res->hr_localfd); 239 240 /* Destroy ggate provider if we created one. */ 241 if (res->hr_ggateunit >= 0) { 242 struct g_gate_ctl_destroy ggiod; 243 244 ggiod.gctl_version = G_GATE_VERSION; 245 ggiod.gctl_unit = res->hr_ggateunit; 246 ggiod.gctl_force = 1; 247 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 248 pjdlog_warning("Unable to destroy hast/%s device", 249 res->hr_provname); 250 } 251 res->hr_ggateunit = -1; 252 } 253 254 /* Restore errno. */ 255 errno = rerrno; 256 } 257 258 static void 259 primary_exit(int exitcode, const char *fmt, ...) 260 { 261 va_list ap; 262 263 assert(exitcode != EX_OK); 264 va_start(ap, fmt); 265 pjdlogv_errno(LOG_ERR, fmt, ap); 266 va_end(ap); 267 cleanup(gres); 268 exit(exitcode); 269 } 270 271 static void 272 primary_exitx(int exitcode, const char *fmt, ...) 273 { 274 va_list ap; 275 276 va_start(ap, fmt); 277 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 278 va_end(ap); 279 cleanup(gres); 280 exit(exitcode); 281 } 282 283 static int 284 hast_activemap_flush(struct hast_resource *res) 285 { 286 const unsigned char *buf; 287 size_t size; 288 289 buf = activemap_bitmap(res->hr_amp, &size); 290 assert(buf != NULL); 291 assert((size % res->hr_local_sectorsize) == 0); 292 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 293 (ssize_t)size) { 294 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 295 "Unable to flush activemap to disk")); 296 return (-1); 297 } 298 return (0); 299 } 300 301 static void 302 init_environment(struct hast_resource *res __unused) 303 { 304 struct hio *hio; 305 unsigned int ii, ncomps; 306 307 /* 308 * In the future it might be per-resource value. 309 */ 310 ncomps = HAST_NCOMPONENTS; 311 312 /* 313 * Allocate memory needed by lists. 314 */ 315 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 316 if (hio_send_list == NULL) { 317 primary_exitx(EX_TEMPFAIL, 318 "Unable to allocate %zu bytes of memory for send lists.", 319 sizeof(hio_send_list[0]) * ncomps); 320 } 321 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 322 if (hio_send_list_lock == NULL) { 323 primary_exitx(EX_TEMPFAIL, 324 "Unable to allocate %zu bytes of memory for send list locks.", 325 sizeof(hio_send_list_lock[0]) * ncomps); 326 } 327 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 328 if (hio_send_list_cond == NULL) { 329 primary_exitx(EX_TEMPFAIL, 330 "Unable to allocate %zu bytes of memory for send list condition variables.", 331 sizeof(hio_send_list_cond[0]) * ncomps); 332 } 333 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 334 if (hio_recv_list == NULL) { 335 primary_exitx(EX_TEMPFAIL, 336 "Unable to allocate %zu bytes of memory for recv lists.", 337 sizeof(hio_recv_list[0]) * ncomps); 338 } 339 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 340 if (hio_recv_list_lock == NULL) { 341 primary_exitx(EX_TEMPFAIL, 342 "Unable to allocate %zu bytes of memory for recv list locks.", 343 sizeof(hio_recv_list_lock[0]) * ncomps); 344 } 345 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 346 if (hio_recv_list_cond == NULL) { 347 primary_exitx(EX_TEMPFAIL, 348 "Unable to allocate %zu bytes of memory for recv list condition variables.", 349 sizeof(hio_recv_list_cond[0]) * ncomps); 350 } 351 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 352 if (hio_remote_lock == NULL) { 353 primary_exitx(EX_TEMPFAIL, 354 "Unable to allocate %zu bytes of memory for remote connections locks.", 355 sizeof(hio_remote_lock[0]) * ncomps); 356 } 357 358 /* 359 * Initialize lists, their locks and theirs condition variables. 360 */ 361 TAILQ_INIT(&hio_free_list); 362 mtx_init(&hio_free_list_lock); 363 cv_init(&hio_free_list_cond); 364 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 365 TAILQ_INIT(&hio_send_list[ii]); 366 mtx_init(&hio_send_list_lock[ii]); 367 cv_init(&hio_send_list_cond[ii]); 368 TAILQ_INIT(&hio_recv_list[ii]); 369 mtx_init(&hio_recv_list_lock[ii]); 370 cv_init(&hio_recv_list_cond[ii]); 371 rw_init(&hio_remote_lock[ii]); 372 } 373 TAILQ_INIT(&hio_done_list); 374 mtx_init(&hio_done_list_lock); 375 cv_init(&hio_done_list_cond); 376 mtx_init(&hio_guard_lock); 377 cv_init(&hio_guard_cond); 378 mtx_init(&metadata_lock); 379 380 /* 381 * Allocate requests pool and initialize requests. 382 */ 383 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 384 hio = malloc(sizeof(*hio)); 385 if (hio == NULL) { 386 primary_exitx(EX_TEMPFAIL, 387 "Unable to allocate %zu bytes of memory for hio request.", 388 sizeof(*hio)); 389 } 390 hio->hio_countdown = 0; 391 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 392 if (hio->hio_errors == NULL) { 393 primary_exitx(EX_TEMPFAIL, 394 "Unable allocate %zu bytes of memory for hio errors.", 395 sizeof(hio->hio_errors[0]) * ncomps); 396 } 397 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 398 if (hio->hio_next == NULL) { 399 primary_exitx(EX_TEMPFAIL, 400 "Unable allocate %zu bytes of memory for hio_next field.", 401 sizeof(hio->hio_next[0]) * ncomps); 402 } 403 hio->hio_ggio.gctl_version = G_GATE_VERSION; 404 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 405 if (hio->hio_ggio.gctl_data == NULL) { 406 primary_exitx(EX_TEMPFAIL, 407 "Unable to allocate %zu bytes of memory for gctl_data.", 408 MAXPHYS); 409 } 410 hio->hio_ggio.gctl_length = MAXPHYS; 411 hio->hio_ggio.gctl_error = 0; 412 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 413 } 414 415 /* 416 * Turn on signals handling. 417 */ 418 signal(SIGINT, sighandler); 419 signal(SIGTERM, sighandler); 420 } 421 422 static void 423 init_local(struct hast_resource *res) 424 { 425 unsigned char *buf; 426 size_t mapsize; 427 428 if (metadata_read(res, true) < 0) 429 exit(EX_NOINPUT); 430 mtx_init(&res->hr_amp_lock); 431 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 432 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 433 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 434 } 435 mtx_init(&range_lock); 436 cv_init(&range_regular_cond); 437 if (rangelock_init(&range_regular) < 0) 438 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 439 cv_init(&range_sync_cond); 440 if (rangelock_init(&range_sync) < 0) 441 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 442 mapsize = activemap_ondisk_size(res->hr_amp); 443 buf = calloc(1, mapsize); 444 if (buf == NULL) { 445 primary_exitx(EX_TEMPFAIL, 446 "Unable to allocate buffer for activemap."); 447 } 448 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 449 (ssize_t)mapsize) { 450 primary_exit(EX_NOINPUT, "Unable to read activemap"); 451 } 452 activemap_copyin(res->hr_amp, buf, mapsize); 453 free(buf); 454 if (res->hr_resuid != 0) 455 return; 456 /* 457 * We're using provider for the first time, so we have to generate 458 * resource unique identifier and initialize local and remote counts. 459 */ 460 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 461 res->hr_primary_localcnt = 1; 462 res->hr_primary_remotecnt = 0; 463 if (metadata_write(res) < 0) 464 exit(EX_NOINPUT); 465 } 466 467 static bool 468 init_remote(struct hast_resource *res, struct proto_conn **inp, 469 struct proto_conn **outp) 470 { 471 struct proto_conn *in, *out; 472 struct nv *nvout, *nvin; 473 const unsigned char *token; 474 unsigned char *map; 475 const char *errmsg; 476 int32_t extentsize; 477 int64_t datasize; 478 uint32_t mapsize; 479 size_t size; 480 481 assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 482 483 in = out = NULL; 484 485 /* Prepare outgoing connection with remote node. */ 486 if (proto_client(res->hr_remoteaddr, &out) < 0) { 487 primary_exit(EX_TEMPFAIL, "Unable to create connection to %s", 488 res->hr_remoteaddr); 489 } 490 /* Try to connect, but accept failure. */ 491 if (proto_connect(out) < 0) { 492 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 493 res->hr_remoteaddr); 494 goto close; 495 } 496 /* Error in setting timeout is not critical, but why should it fail? */ 497 if (proto_timeout(out, res->hr_timeout) < 0) 498 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 499 /* 500 * First handshake step. 501 * Setup outgoing connection with remote node. 502 */ 503 nvout = nv_alloc(); 504 nv_add_string(nvout, res->hr_name, "resource"); 505 if (nv_error(nvout) != 0) { 506 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 507 "Unable to allocate header for connection with %s", 508 res->hr_remoteaddr); 509 nv_free(nvout); 510 goto close; 511 } 512 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 513 pjdlog_errno(LOG_WARNING, 514 "Unable to send handshake header to %s", 515 res->hr_remoteaddr); 516 nv_free(nvout); 517 goto close; 518 } 519 nv_free(nvout); 520 if (hast_proto_recv_hdr(out, &nvin) < 0) { 521 pjdlog_errno(LOG_WARNING, 522 "Unable to receive handshake header from %s", 523 res->hr_remoteaddr); 524 goto close; 525 } 526 errmsg = nv_get_string(nvin, "errmsg"); 527 if (errmsg != NULL) { 528 pjdlog_warning("%s", errmsg); 529 nv_free(nvin); 530 goto close; 531 } 532 token = nv_get_uint8_array(nvin, &size, "token"); 533 if (token == NULL) { 534 pjdlog_warning("Handshake header from %s has no 'token' field.", 535 res->hr_remoteaddr); 536 nv_free(nvin); 537 goto close; 538 } 539 if (size != sizeof(res->hr_token)) { 540 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 541 res->hr_remoteaddr, size, sizeof(res->hr_token)); 542 nv_free(nvin); 543 goto close; 544 } 545 bcopy(token, res->hr_token, sizeof(res->hr_token)); 546 nv_free(nvin); 547 548 /* 549 * Second handshake step. 550 * Setup incoming connection with remote node. 551 */ 552 if (proto_client(res->hr_remoteaddr, &in) < 0) { 553 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 554 res->hr_remoteaddr); 555 } 556 /* Try to connect, but accept failure. */ 557 if (proto_connect(in) < 0) { 558 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 559 res->hr_remoteaddr); 560 goto close; 561 } 562 /* Error in setting timeout is not critical, but why should it fail? */ 563 if (proto_timeout(in, res->hr_timeout) < 0) 564 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 565 nvout = nv_alloc(); 566 nv_add_string(nvout, res->hr_name, "resource"); 567 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 568 "token"); 569 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 570 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 571 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 572 if (nv_error(nvout) != 0) { 573 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 574 "Unable to allocate header for connection with %s", 575 res->hr_remoteaddr); 576 nv_free(nvout); 577 goto close; 578 } 579 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 580 pjdlog_errno(LOG_WARNING, 581 "Unable to send handshake header to %s", 582 res->hr_remoteaddr); 583 nv_free(nvout); 584 goto close; 585 } 586 nv_free(nvout); 587 if (hast_proto_recv_hdr(out, &nvin) < 0) { 588 pjdlog_errno(LOG_WARNING, 589 "Unable to receive handshake header from %s", 590 res->hr_remoteaddr); 591 goto close; 592 } 593 errmsg = nv_get_string(nvin, "errmsg"); 594 if (errmsg != NULL) { 595 pjdlog_warning("%s", errmsg); 596 nv_free(nvin); 597 goto close; 598 } 599 datasize = nv_get_int64(nvin, "datasize"); 600 if (datasize != res->hr_datasize) { 601 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 602 (intmax_t)res->hr_datasize, (intmax_t)datasize); 603 nv_free(nvin); 604 goto close; 605 } 606 extentsize = nv_get_int32(nvin, "extentsize"); 607 if (extentsize != res->hr_extentsize) { 608 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 609 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 610 nv_free(nvin); 611 goto close; 612 } 613 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 614 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 615 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 616 map = NULL; 617 mapsize = nv_get_uint32(nvin, "mapsize"); 618 if (mapsize > 0) { 619 map = malloc(mapsize); 620 if (map == NULL) { 621 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 622 (uintmax_t)mapsize); 623 nv_free(nvin); 624 goto close; 625 } 626 /* 627 * Remote node have some dirty extents on its own, lets 628 * download its activemap. 629 */ 630 if (hast_proto_recv_data(res, out, nvin, map, 631 mapsize) < 0) { 632 pjdlog_errno(LOG_ERR, 633 "Unable to receive remote activemap"); 634 nv_free(nvin); 635 free(map); 636 goto close; 637 } 638 /* 639 * Merge local and remote bitmaps. 640 */ 641 activemap_merge(res->hr_amp, map, mapsize); 642 free(map); 643 /* 644 * Now that we merged bitmaps from both nodes, flush it to the 645 * disk before we start to synchronize. 646 */ 647 (void)hast_activemap_flush(res); 648 } 649 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 650 if (inp != NULL && outp != NULL) { 651 *inp = in; 652 *outp = out; 653 } else { 654 res->hr_remotein = in; 655 res->hr_remoteout = out; 656 } 657 return (true); 658 close: 659 proto_close(out); 660 if (in != NULL) 661 proto_close(in); 662 return (false); 663 } 664 665 static void 666 sync_start(void) 667 { 668 669 mtx_lock(&sync_lock); 670 sync_inprogress = true; 671 mtx_unlock(&sync_lock); 672 cv_signal(&sync_cond); 673 } 674 675 static void 676 init_ggate(struct hast_resource *res) 677 { 678 struct g_gate_ctl_create ggiocreate; 679 struct g_gate_ctl_cancel ggiocancel; 680 681 /* 682 * We communicate with ggate via /dev/ggctl. Open it. 683 */ 684 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 685 if (res->hr_ggatefd < 0) 686 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 687 /* 688 * Create provider before trying to connect, as connection failure 689 * is not critical, but may take some time. 690 */ 691 ggiocreate.gctl_version = G_GATE_VERSION; 692 ggiocreate.gctl_mediasize = res->hr_datasize; 693 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 694 ggiocreate.gctl_flags = 0; 695 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 696 ggiocreate.gctl_timeout = 0; 697 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 698 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 699 res->hr_provname); 700 bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info)); 701 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 702 pjdlog_info("Device hast/%s created.", res->hr_provname); 703 res->hr_ggateunit = ggiocreate.gctl_unit; 704 return; 705 } 706 if (errno != EEXIST) { 707 primary_exit(EX_OSERR, "Unable to create hast/%s device", 708 res->hr_provname); 709 } 710 pjdlog_debug(1, 711 "Device hast/%s already exists, we will try to take it over.", 712 res->hr_provname); 713 /* 714 * If we received EEXIST, we assume that the process who created the 715 * provider died and didn't clean up. In that case we will start from 716 * where he left of. 717 */ 718 ggiocancel.gctl_version = G_GATE_VERSION; 719 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 720 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 721 res->hr_provname); 722 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 723 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 724 res->hr_ggateunit = ggiocancel.gctl_unit; 725 return; 726 } 727 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 728 res->hr_provname); 729 } 730 731 void 732 hastd_primary(struct hast_resource *res) 733 { 734 pthread_t td; 735 pid_t pid; 736 int error; 737 738 gres = res; 739 740 /* 741 * Create communication channel between parent and child. 742 */ 743 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 744 KEEP_ERRNO((void)pidfile_remove(pfh)); 745 primary_exit(EX_OSERR, 746 "Unable to create control sockets between parent and child"); 747 } 748 749 pid = fork(); 750 if (pid < 0) { 751 KEEP_ERRNO((void)pidfile_remove(pfh)); 752 primary_exit(EX_TEMPFAIL, "Unable to fork"); 753 } 754 755 if (pid > 0) { 756 /* This is parent. */ 757 res->hr_workerpid = pid; 758 return; 759 } 760 (void)pidfile_close(pfh); 761 762 setproctitle("%s (primary)", res->hr_name); 763 764 init_local(res); 765 if (init_remote(res, NULL, NULL)) 766 sync_start(); 767 init_ggate(res); 768 init_environment(res); 769 error = pthread_create(&td, NULL, ggate_recv_thread, res); 770 assert(error == 0); 771 error = pthread_create(&td, NULL, local_send_thread, res); 772 assert(error == 0); 773 error = pthread_create(&td, NULL, remote_send_thread, res); 774 assert(error == 0); 775 error = pthread_create(&td, NULL, remote_recv_thread, res); 776 assert(error == 0); 777 error = pthread_create(&td, NULL, ggate_send_thread, res); 778 assert(error == 0); 779 error = pthread_create(&td, NULL, sync_thread, res); 780 assert(error == 0); 781 error = pthread_create(&td, NULL, ctrl_thread, res); 782 assert(error == 0); 783 (void)guard_thread(res); 784 } 785 786 static void 787 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 788 { 789 char msg[1024]; 790 va_list ap; 791 int len; 792 793 va_start(ap, fmt); 794 len = vsnprintf(msg, sizeof(msg), fmt, ap); 795 va_end(ap); 796 if ((size_t)len < sizeof(msg)) { 797 switch (ggio->gctl_cmd) { 798 case BIO_READ: 799 (void)snprintf(msg + len, sizeof(msg) - len, 800 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 801 (uintmax_t)ggio->gctl_length); 802 break; 803 case BIO_DELETE: 804 (void)snprintf(msg + len, sizeof(msg) - len, 805 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 806 (uintmax_t)ggio->gctl_length); 807 break; 808 case BIO_FLUSH: 809 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 810 break; 811 case BIO_WRITE: 812 (void)snprintf(msg + len, sizeof(msg) - len, 813 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 814 (uintmax_t)ggio->gctl_length); 815 break; 816 default: 817 (void)snprintf(msg + len, sizeof(msg) - len, 818 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 819 break; 820 } 821 } 822 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 823 } 824 825 static void 826 remote_close(struct hast_resource *res, int ncomp) 827 { 828 829 rw_wlock(&hio_remote_lock[ncomp]); 830 /* 831 * A race is possible between dropping rlock and acquiring wlock - 832 * another thread can close connection in-between. 833 */ 834 if (!ISCONNECTED(res, ncomp)) { 835 assert(res->hr_remotein == NULL); 836 assert(res->hr_remoteout == NULL); 837 rw_unlock(&hio_remote_lock[ncomp]); 838 return; 839 } 840 841 assert(res->hr_remotein != NULL); 842 assert(res->hr_remoteout != NULL); 843 844 pjdlog_debug(2, "Closing old incoming connection to %s.", 845 res->hr_remoteaddr); 846 proto_close(res->hr_remotein); 847 res->hr_remotein = NULL; 848 pjdlog_debug(2, "Closing old outgoing connection to %s.", 849 res->hr_remoteaddr); 850 proto_close(res->hr_remoteout); 851 res->hr_remoteout = NULL; 852 853 rw_unlock(&hio_remote_lock[ncomp]); 854 855 /* 856 * Stop synchronization if in-progress. 857 */ 858 mtx_lock(&sync_lock); 859 if (sync_inprogress) 860 sync_inprogress = false; 861 mtx_unlock(&sync_lock); 862 863 /* 864 * Wake up guard thread, so it can immediately start reconnect. 865 */ 866 mtx_lock(&hio_guard_lock); 867 cv_signal(&hio_guard_cond); 868 mtx_unlock(&hio_guard_lock); 869 } 870 871 /* 872 * Thread receives ggate I/O requests from the kernel and passes them to 873 * appropriate threads: 874 * WRITE - always goes to both local_send and remote_send threads 875 * READ (when the block is up-to-date on local component) - 876 * only local_send thread 877 * READ (when the block isn't up-to-date on local component) - 878 * only remote_send thread 879 * DELETE - always goes to both local_send and remote_send threads 880 * FLUSH - always goes to both local_send and remote_send threads 881 */ 882 static void * 883 ggate_recv_thread(void *arg) 884 { 885 struct hast_resource *res = arg; 886 struct g_gate_ctl_io *ggio; 887 struct hio *hio; 888 unsigned int ii, ncomp, ncomps; 889 int error; 890 891 ncomps = HAST_NCOMPONENTS; 892 893 for (;;) { 894 pjdlog_debug(2, "ggate_recv: Taking free request."); 895 QUEUE_TAKE2(hio, free); 896 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 897 ggio = &hio->hio_ggio; 898 ggio->gctl_unit = res->hr_ggateunit; 899 ggio->gctl_length = MAXPHYS; 900 ggio->gctl_error = 0; 901 pjdlog_debug(2, 902 "ggate_recv: (%p) Waiting for request from the kernel.", 903 hio); 904 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 905 if (sigexit_received) 906 pthread_exit(NULL); 907 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 908 } 909 error = ggio->gctl_error; 910 switch (error) { 911 case 0: 912 break; 913 case ECANCELED: 914 /* Exit gracefully. */ 915 if (!sigexit_received) { 916 pjdlog_debug(2, 917 "ggate_recv: (%p) Received cancel from the kernel.", 918 hio); 919 pjdlog_info("Received cancel from the kernel, exiting."); 920 } 921 pthread_exit(NULL); 922 case ENOMEM: 923 /* 924 * Buffer too small? Impossible, we allocate MAXPHYS 925 * bytes - request can't be bigger than that. 926 */ 927 /* FALLTHROUGH */ 928 case ENXIO: 929 default: 930 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 931 strerror(error)); 932 } 933 for (ii = 0; ii < ncomps; ii++) 934 hio->hio_errors[ii] = EINVAL; 935 reqlog(LOG_DEBUG, 2, ggio, 936 "ggate_recv: (%p) Request received from the kernel: ", 937 hio); 938 /* 939 * Inform all components about new write request. 940 * For read request prefer local component unless the given 941 * range is out-of-date, then use remote component. 942 */ 943 switch (ggio->gctl_cmd) { 944 case BIO_READ: 945 pjdlog_debug(2, 946 "ggate_recv: (%p) Moving request to the send queue.", 947 hio); 948 refcount_init(&hio->hio_countdown, 1); 949 mtx_lock(&metadata_lock); 950 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 951 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 952 /* 953 * This range is up-to-date on local component, 954 * so handle request locally. 955 */ 956 /* Local component is 0 for now. */ 957 ncomp = 0; 958 } else /* if (res->hr_syncsrc == 959 HAST_SYNCSRC_SECONDARY) */ { 960 assert(res->hr_syncsrc == 961 HAST_SYNCSRC_SECONDARY); 962 /* 963 * This range is out-of-date on local component, 964 * so send request to the remote node. 965 */ 966 /* Remote component is 1 for now. */ 967 ncomp = 1; 968 } 969 mtx_unlock(&metadata_lock); 970 QUEUE_INSERT1(hio, send, ncomp); 971 break; 972 case BIO_WRITE: 973 for (;;) { 974 mtx_lock(&range_lock); 975 if (rangelock_islocked(range_sync, 976 ggio->gctl_offset, ggio->gctl_length)) { 977 pjdlog_debug(2, 978 "regular: Range offset=%jd length=%zu locked.", 979 (intmax_t)ggio->gctl_offset, 980 (size_t)ggio->gctl_length); 981 range_regular_wait = true; 982 cv_wait(&range_regular_cond, &range_lock); 983 range_regular_wait = false; 984 mtx_unlock(&range_lock); 985 continue; 986 } 987 if (rangelock_add(range_regular, 988 ggio->gctl_offset, ggio->gctl_length) < 0) { 989 mtx_unlock(&range_lock); 990 pjdlog_debug(2, 991 "regular: Range offset=%jd length=%zu is already locked, waiting.", 992 (intmax_t)ggio->gctl_offset, 993 (size_t)ggio->gctl_length); 994 sleep(1); 995 continue; 996 } 997 mtx_unlock(&range_lock); 998 break; 999 } 1000 mtx_lock(&res->hr_amp_lock); 1001 if (activemap_write_start(res->hr_amp, 1002 ggio->gctl_offset, ggio->gctl_length)) { 1003 (void)hast_activemap_flush(res); 1004 } 1005 mtx_unlock(&res->hr_amp_lock); 1006 /* FALLTHROUGH */ 1007 case BIO_DELETE: 1008 case BIO_FLUSH: 1009 pjdlog_debug(2, 1010 "ggate_recv: (%p) Moving request to the send queues.", 1011 hio); 1012 refcount_init(&hio->hio_countdown, ncomps); 1013 for (ii = 0; ii < ncomps; ii++) 1014 QUEUE_INSERT1(hio, send, ii); 1015 break; 1016 } 1017 } 1018 /* NOTREACHED */ 1019 return (NULL); 1020 } 1021 1022 /* 1023 * Thread reads from or writes to local component. 1024 * If local read fails, it redirects it to remote_send thread. 1025 */ 1026 static void * 1027 local_send_thread(void *arg) 1028 { 1029 struct hast_resource *res = arg; 1030 struct g_gate_ctl_io *ggio; 1031 struct hio *hio; 1032 unsigned int ncomp, rncomp; 1033 ssize_t ret; 1034 1035 /* Local component is 0 for now. */ 1036 ncomp = 0; 1037 /* Remote component is 1 for now. */ 1038 rncomp = 1; 1039 1040 for (;;) { 1041 pjdlog_debug(2, "local_send: Taking request."); 1042 QUEUE_TAKE1(hio, send, ncomp); 1043 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1044 ggio = &hio->hio_ggio; 1045 switch (ggio->gctl_cmd) { 1046 case BIO_READ: 1047 ret = pread(res->hr_localfd, ggio->gctl_data, 1048 ggio->gctl_length, 1049 ggio->gctl_offset + res->hr_localoff); 1050 if (ret == ggio->gctl_length) 1051 hio->hio_errors[ncomp] = 0; 1052 else { 1053 /* 1054 * If READ failed, try to read from remote node. 1055 */ 1056 QUEUE_INSERT1(hio, send, rncomp); 1057 continue; 1058 } 1059 break; 1060 case BIO_WRITE: 1061 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1062 ggio->gctl_length, 1063 ggio->gctl_offset + res->hr_localoff); 1064 if (ret < 0) 1065 hio->hio_errors[ncomp] = errno; 1066 else if (ret != ggio->gctl_length) 1067 hio->hio_errors[ncomp] = EIO; 1068 else 1069 hio->hio_errors[ncomp] = 0; 1070 break; 1071 case BIO_DELETE: 1072 ret = g_delete(res->hr_localfd, 1073 ggio->gctl_offset + res->hr_localoff, 1074 ggio->gctl_length); 1075 if (ret < 0) 1076 hio->hio_errors[ncomp] = errno; 1077 else 1078 hio->hio_errors[ncomp] = 0; 1079 break; 1080 case BIO_FLUSH: 1081 ret = g_flush(res->hr_localfd); 1082 if (ret < 0) 1083 hio->hio_errors[ncomp] = errno; 1084 else 1085 hio->hio_errors[ncomp] = 0; 1086 break; 1087 } 1088 if (refcount_release(&hio->hio_countdown)) { 1089 if (ISSYNCREQ(hio)) { 1090 mtx_lock(&sync_lock); 1091 SYNCREQDONE(hio); 1092 mtx_unlock(&sync_lock); 1093 cv_signal(&sync_cond); 1094 } else { 1095 pjdlog_debug(2, 1096 "local_send: (%p) Moving request to the done queue.", 1097 hio); 1098 QUEUE_INSERT2(hio, done); 1099 } 1100 } 1101 } 1102 /* NOTREACHED */ 1103 return (NULL); 1104 } 1105 1106 /* 1107 * Thread sends request to secondary node. 1108 */ 1109 static void * 1110 remote_send_thread(void *arg) 1111 { 1112 struct hast_resource *res = arg; 1113 struct g_gate_ctl_io *ggio; 1114 struct hio *hio; 1115 struct nv *nv; 1116 unsigned int ncomp; 1117 bool wakeup; 1118 uint64_t offset, length; 1119 uint8_t cmd; 1120 void *data; 1121 1122 /* Remote component is 1 for now. */ 1123 ncomp = 1; 1124 1125 for (;;) { 1126 pjdlog_debug(2, "remote_send: Taking request."); 1127 QUEUE_TAKE1(hio, send, ncomp); 1128 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1129 ggio = &hio->hio_ggio; 1130 switch (ggio->gctl_cmd) { 1131 case BIO_READ: 1132 cmd = HIO_READ; 1133 data = NULL; 1134 offset = ggio->gctl_offset; 1135 length = ggio->gctl_length; 1136 break; 1137 case BIO_WRITE: 1138 cmd = HIO_WRITE; 1139 data = ggio->gctl_data; 1140 offset = ggio->gctl_offset; 1141 length = ggio->gctl_length; 1142 break; 1143 case BIO_DELETE: 1144 cmd = HIO_DELETE; 1145 data = NULL; 1146 offset = ggio->gctl_offset; 1147 length = ggio->gctl_length; 1148 break; 1149 case BIO_FLUSH: 1150 cmd = HIO_FLUSH; 1151 data = NULL; 1152 offset = 0; 1153 length = 0; 1154 break; 1155 default: 1156 assert(!"invalid condition"); 1157 abort(); 1158 } 1159 nv = nv_alloc(); 1160 nv_add_uint8(nv, cmd, "cmd"); 1161 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1162 nv_add_uint64(nv, offset, "offset"); 1163 nv_add_uint64(nv, length, "length"); 1164 if (nv_error(nv) != 0) { 1165 hio->hio_errors[ncomp] = nv_error(nv); 1166 pjdlog_debug(2, 1167 "remote_send: (%p) Unable to prepare header to send.", 1168 hio); 1169 reqlog(LOG_ERR, 0, ggio, 1170 "Unable to prepare header to send (%s): ", 1171 strerror(nv_error(nv))); 1172 /* Move failed request immediately to the done queue. */ 1173 goto done_queue; 1174 } 1175 pjdlog_debug(2, 1176 "remote_send: (%p) Moving request to the recv queue.", 1177 hio); 1178 /* 1179 * Protect connection from disappearing. 1180 */ 1181 rw_rlock(&hio_remote_lock[ncomp]); 1182 if (!ISCONNECTED(res, ncomp)) { 1183 rw_unlock(&hio_remote_lock[ncomp]); 1184 hio->hio_errors[ncomp] = ENOTCONN; 1185 goto done_queue; 1186 } 1187 /* 1188 * Move the request to recv queue before sending it, because 1189 * in different order we can get reply before we move request 1190 * to recv queue. 1191 */ 1192 mtx_lock(&hio_recv_list_lock[ncomp]); 1193 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1194 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1195 mtx_unlock(&hio_recv_list_lock[ncomp]); 1196 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1197 data != NULL ? length : 0) < 0) { 1198 hio->hio_errors[ncomp] = errno; 1199 rw_unlock(&hio_remote_lock[ncomp]); 1200 remote_close(res, ncomp); 1201 pjdlog_debug(2, 1202 "remote_send: (%p) Unable to send request.", hio); 1203 reqlog(LOG_ERR, 0, ggio, 1204 "Unable to send request (%s): ", 1205 strerror(hio->hio_errors[ncomp])); 1206 /* 1207 * Take request back from the receive queue and move 1208 * it immediately to the done queue. 1209 */ 1210 mtx_lock(&hio_recv_list_lock[ncomp]); 1211 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1212 mtx_unlock(&hio_recv_list_lock[ncomp]); 1213 goto done_queue; 1214 } 1215 rw_unlock(&hio_remote_lock[ncomp]); 1216 nv_free(nv); 1217 if (wakeup) 1218 cv_signal(&hio_recv_list_cond[ncomp]); 1219 continue; 1220 done_queue: 1221 nv_free(nv); 1222 if (ISSYNCREQ(hio)) { 1223 if (!refcount_release(&hio->hio_countdown)) 1224 continue; 1225 mtx_lock(&sync_lock); 1226 SYNCREQDONE(hio); 1227 mtx_unlock(&sync_lock); 1228 cv_signal(&sync_cond); 1229 continue; 1230 } 1231 if (ggio->gctl_cmd == BIO_WRITE) { 1232 mtx_lock(&res->hr_amp_lock); 1233 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1234 ggio->gctl_length)) { 1235 (void)hast_activemap_flush(res); 1236 } 1237 mtx_unlock(&res->hr_amp_lock); 1238 } 1239 if (!refcount_release(&hio->hio_countdown)) 1240 continue; 1241 pjdlog_debug(2, 1242 "remote_send: (%p) Moving request to the done queue.", 1243 hio); 1244 QUEUE_INSERT2(hio, done); 1245 } 1246 /* NOTREACHED */ 1247 return (NULL); 1248 } 1249 1250 /* 1251 * Thread receives answer from secondary node and passes it to ggate_send 1252 * thread. 1253 */ 1254 static void * 1255 remote_recv_thread(void *arg) 1256 { 1257 struct hast_resource *res = arg; 1258 struct g_gate_ctl_io *ggio; 1259 struct hio *hio; 1260 struct nv *nv; 1261 unsigned int ncomp; 1262 uint64_t seq; 1263 int error; 1264 1265 /* Remote component is 1 for now. */ 1266 ncomp = 1; 1267 1268 for (;;) { 1269 /* Wait until there is anything to receive. */ 1270 mtx_lock(&hio_recv_list_lock[ncomp]); 1271 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1272 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1273 cv_wait(&hio_recv_list_cond[ncomp], 1274 &hio_recv_list_lock[ncomp]); 1275 } 1276 mtx_unlock(&hio_recv_list_lock[ncomp]); 1277 rw_rlock(&hio_remote_lock[ncomp]); 1278 if (!ISCONNECTED(res, ncomp)) { 1279 rw_unlock(&hio_remote_lock[ncomp]); 1280 /* 1281 * Connection is dead, so move all pending requests to 1282 * the done queue (one-by-one). 1283 */ 1284 mtx_lock(&hio_recv_list_lock[ncomp]); 1285 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1286 assert(hio != NULL); 1287 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1288 hio_next[ncomp]); 1289 mtx_unlock(&hio_recv_list_lock[ncomp]); 1290 goto done_queue; 1291 } 1292 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1293 pjdlog_errno(LOG_ERR, 1294 "Unable to receive reply header"); 1295 rw_unlock(&hio_remote_lock[ncomp]); 1296 remote_close(res, ncomp); 1297 continue; 1298 } 1299 rw_unlock(&hio_remote_lock[ncomp]); 1300 seq = nv_get_uint64(nv, "seq"); 1301 if (seq == 0) { 1302 pjdlog_error("Header contains no 'seq' field."); 1303 nv_free(nv); 1304 continue; 1305 } 1306 mtx_lock(&hio_recv_list_lock[ncomp]); 1307 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1308 if (hio->hio_ggio.gctl_seq == seq) { 1309 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1310 hio_next[ncomp]); 1311 break; 1312 } 1313 } 1314 mtx_unlock(&hio_recv_list_lock[ncomp]); 1315 if (hio == NULL) { 1316 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1317 (uintmax_t)seq); 1318 nv_free(nv); 1319 continue; 1320 } 1321 error = nv_get_int16(nv, "error"); 1322 if (error != 0) { 1323 /* Request failed on remote side. */ 1324 hio->hio_errors[ncomp] = 0; 1325 nv_free(nv); 1326 goto done_queue; 1327 } 1328 ggio = &hio->hio_ggio; 1329 switch (ggio->gctl_cmd) { 1330 case BIO_READ: 1331 rw_rlock(&hio_remote_lock[ncomp]); 1332 if (!ISCONNECTED(res, ncomp)) { 1333 rw_unlock(&hio_remote_lock[ncomp]); 1334 nv_free(nv); 1335 goto done_queue; 1336 } 1337 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1338 ggio->gctl_data, ggio->gctl_length) < 0) { 1339 hio->hio_errors[ncomp] = errno; 1340 pjdlog_errno(LOG_ERR, 1341 "Unable to receive reply data"); 1342 rw_unlock(&hio_remote_lock[ncomp]); 1343 nv_free(nv); 1344 remote_close(res, ncomp); 1345 goto done_queue; 1346 } 1347 rw_unlock(&hio_remote_lock[ncomp]); 1348 break; 1349 case BIO_WRITE: 1350 case BIO_DELETE: 1351 case BIO_FLUSH: 1352 break; 1353 default: 1354 assert(!"invalid condition"); 1355 abort(); 1356 } 1357 hio->hio_errors[ncomp] = 0; 1358 nv_free(nv); 1359 done_queue: 1360 if (refcount_release(&hio->hio_countdown)) { 1361 if (ISSYNCREQ(hio)) { 1362 mtx_lock(&sync_lock); 1363 SYNCREQDONE(hio); 1364 mtx_unlock(&sync_lock); 1365 cv_signal(&sync_cond); 1366 } else { 1367 pjdlog_debug(2, 1368 "remote_recv: (%p) Moving request to the done queue.", 1369 hio); 1370 QUEUE_INSERT2(hio, done); 1371 } 1372 } 1373 } 1374 /* NOTREACHED */ 1375 return (NULL); 1376 } 1377 1378 /* 1379 * Thread sends answer to the kernel. 1380 */ 1381 static void * 1382 ggate_send_thread(void *arg) 1383 { 1384 struct hast_resource *res = arg; 1385 struct g_gate_ctl_io *ggio; 1386 struct hio *hio; 1387 unsigned int ii, ncomp, ncomps; 1388 1389 ncomps = HAST_NCOMPONENTS; 1390 1391 for (;;) { 1392 pjdlog_debug(2, "ggate_send: Taking request."); 1393 QUEUE_TAKE2(hio, done); 1394 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1395 ggio = &hio->hio_ggio; 1396 for (ii = 0; ii < ncomps; ii++) { 1397 if (hio->hio_errors[ii] == 0) { 1398 /* 1399 * One successful request is enough to declare 1400 * success. 1401 */ 1402 ggio->gctl_error = 0; 1403 break; 1404 } 1405 } 1406 if (ii == ncomps) { 1407 /* 1408 * None of the requests were successful. 1409 * Use first error. 1410 */ 1411 ggio->gctl_error = hio->hio_errors[0]; 1412 } 1413 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1414 mtx_lock(&res->hr_amp_lock); 1415 activemap_write_complete(res->hr_amp, 1416 ggio->gctl_offset, ggio->gctl_length); 1417 mtx_unlock(&res->hr_amp_lock); 1418 } 1419 if (ggio->gctl_cmd == BIO_WRITE) { 1420 /* 1421 * Unlock range we locked. 1422 */ 1423 mtx_lock(&range_lock); 1424 rangelock_del(range_regular, ggio->gctl_offset, 1425 ggio->gctl_length); 1426 if (range_sync_wait) 1427 cv_signal(&range_sync_cond); 1428 mtx_unlock(&range_lock); 1429 /* 1430 * Bump local count if this is first write after 1431 * connection failure with remote node. 1432 */ 1433 ncomp = 1; 1434 rw_rlock(&hio_remote_lock[ncomp]); 1435 if (!ISCONNECTED(res, ncomp)) { 1436 mtx_lock(&metadata_lock); 1437 if (res->hr_primary_localcnt == 1438 res->hr_secondary_remotecnt) { 1439 res->hr_primary_localcnt++; 1440 pjdlog_debug(1, 1441 "Increasing localcnt to %ju.", 1442 (uintmax_t)res->hr_primary_localcnt); 1443 (void)metadata_write(res); 1444 } 1445 mtx_unlock(&metadata_lock); 1446 } 1447 rw_unlock(&hio_remote_lock[ncomp]); 1448 } 1449 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1450 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1451 pjdlog_debug(2, 1452 "ggate_send: (%p) Moving request to the free queue.", hio); 1453 QUEUE_INSERT2(hio, free); 1454 } 1455 /* NOTREACHED */ 1456 return (NULL); 1457 } 1458 1459 /* 1460 * Thread synchronize local and remote components. 1461 */ 1462 static void * 1463 sync_thread(void *arg __unused) 1464 { 1465 struct hast_resource *res = arg; 1466 struct hio *hio; 1467 struct g_gate_ctl_io *ggio; 1468 unsigned int ii, ncomp, ncomps; 1469 off_t offset, length, synced; 1470 bool dorewind; 1471 int syncext; 1472 1473 ncomps = HAST_NCOMPONENTS; 1474 dorewind = true; 1475 synced = 0; 1476 1477 for (;;) { 1478 mtx_lock(&sync_lock); 1479 while (!sync_inprogress) { 1480 dorewind = true; 1481 synced = 0; 1482 cv_wait(&sync_cond, &sync_lock); 1483 } 1484 mtx_unlock(&sync_lock); 1485 /* 1486 * Obtain offset at which we should synchronize. 1487 * Rewind synchronization if needed. 1488 */ 1489 mtx_lock(&res->hr_amp_lock); 1490 if (dorewind) 1491 activemap_sync_rewind(res->hr_amp); 1492 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1493 if (syncext != -1) { 1494 /* 1495 * We synchronized entire syncext extent, we can mark 1496 * it as clean now. 1497 */ 1498 if (activemap_extent_complete(res->hr_amp, syncext)) 1499 (void)hast_activemap_flush(res); 1500 } 1501 mtx_unlock(&res->hr_amp_lock); 1502 if (dorewind) { 1503 dorewind = false; 1504 if (offset < 0) 1505 pjdlog_info("Nodes are in sync."); 1506 else { 1507 pjdlog_info("Synchronization started. %ju bytes to go.", 1508 (uintmax_t)(res->hr_extentsize * 1509 activemap_ndirty(res->hr_amp))); 1510 } 1511 } 1512 if (offset < 0) { 1513 mtx_lock(&sync_lock); 1514 sync_inprogress = false; 1515 mtx_unlock(&sync_lock); 1516 pjdlog_debug(1, "Nothing to synchronize."); 1517 /* 1518 * Synchronization complete, make both localcnt and 1519 * remotecnt equal. 1520 */ 1521 ncomp = 1; 1522 rw_rlock(&hio_remote_lock[ncomp]); 1523 if (ISCONNECTED(res, ncomp)) { 1524 if (synced > 0) { 1525 pjdlog_info("Synchronization complete. " 1526 "%jd bytes synchronized.", 1527 (intmax_t)synced); 1528 } 1529 mtx_lock(&metadata_lock); 1530 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1531 res->hr_primary_localcnt = 1532 res->hr_secondary_localcnt; 1533 res->hr_primary_remotecnt = 1534 res->hr_secondary_remotecnt; 1535 pjdlog_debug(1, 1536 "Setting localcnt to %ju and remotecnt to %ju.", 1537 (uintmax_t)res->hr_primary_localcnt, 1538 (uintmax_t)res->hr_secondary_localcnt); 1539 (void)metadata_write(res); 1540 mtx_unlock(&metadata_lock); 1541 } else if (synced > 0) { 1542 pjdlog_info("Synchronization interrupted. " 1543 "%jd bytes synchronized so far.", 1544 (intmax_t)synced); 1545 } 1546 rw_unlock(&hio_remote_lock[ncomp]); 1547 continue; 1548 } 1549 pjdlog_debug(2, "sync: Taking free request."); 1550 QUEUE_TAKE2(hio, free); 1551 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1552 /* 1553 * Lock the range we are going to synchronize. We don't want 1554 * race where someone writes between our read and write. 1555 */ 1556 for (;;) { 1557 mtx_lock(&range_lock); 1558 if (rangelock_islocked(range_regular, offset, length)) { 1559 pjdlog_debug(2, 1560 "sync: Range offset=%jd length=%jd locked.", 1561 (intmax_t)offset, (intmax_t)length); 1562 range_sync_wait = true; 1563 cv_wait(&range_sync_cond, &range_lock); 1564 range_sync_wait = false; 1565 mtx_unlock(&range_lock); 1566 continue; 1567 } 1568 if (rangelock_add(range_sync, offset, length) < 0) { 1569 mtx_unlock(&range_lock); 1570 pjdlog_debug(2, 1571 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1572 (intmax_t)offset, (intmax_t)length); 1573 sleep(1); 1574 continue; 1575 } 1576 mtx_unlock(&range_lock); 1577 break; 1578 } 1579 /* 1580 * First read the data from synchronization source. 1581 */ 1582 SYNCREQ(hio); 1583 ggio = &hio->hio_ggio; 1584 ggio->gctl_cmd = BIO_READ; 1585 ggio->gctl_offset = offset; 1586 ggio->gctl_length = length; 1587 ggio->gctl_error = 0; 1588 for (ii = 0; ii < ncomps; ii++) 1589 hio->hio_errors[ii] = EINVAL; 1590 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1591 hio); 1592 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1593 hio); 1594 mtx_lock(&metadata_lock); 1595 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1596 /* 1597 * This range is up-to-date on local component, 1598 * so handle request locally. 1599 */ 1600 /* Local component is 0 for now. */ 1601 ncomp = 0; 1602 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1603 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1604 /* 1605 * This range is out-of-date on local component, 1606 * so send request to the remote node. 1607 */ 1608 /* Remote component is 1 for now. */ 1609 ncomp = 1; 1610 } 1611 mtx_unlock(&metadata_lock); 1612 refcount_init(&hio->hio_countdown, 1); 1613 QUEUE_INSERT1(hio, send, ncomp); 1614 1615 /* 1616 * Let's wait for READ to finish. 1617 */ 1618 mtx_lock(&sync_lock); 1619 while (!ISSYNCREQDONE(hio)) 1620 cv_wait(&sync_cond, &sync_lock); 1621 mtx_unlock(&sync_lock); 1622 1623 if (hio->hio_errors[ncomp] != 0) { 1624 pjdlog_error("Unable to read synchronization data: %s.", 1625 strerror(hio->hio_errors[ncomp])); 1626 goto free_queue; 1627 } 1628 1629 /* 1630 * We read the data from synchronization source, now write it 1631 * to synchronization target. 1632 */ 1633 SYNCREQ(hio); 1634 ggio->gctl_cmd = BIO_WRITE; 1635 for (ii = 0; ii < ncomps; ii++) 1636 hio->hio_errors[ii] = EINVAL; 1637 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1638 hio); 1639 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1640 hio); 1641 mtx_lock(&metadata_lock); 1642 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1643 /* 1644 * This range is up-to-date on local component, 1645 * so we update remote component. 1646 */ 1647 /* Remote component is 1 for now. */ 1648 ncomp = 1; 1649 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1650 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1651 /* 1652 * This range is out-of-date on local component, 1653 * so we update it. 1654 */ 1655 /* Local component is 0 for now. */ 1656 ncomp = 0; 1657 } 1658 mtx_unlock(&metadata_lock); 1659 1660 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1661 hio); 1662 refcount_init(&hio->hio_countdown, 1); 1663 QUEUE_INSERT1(hio, send, ncomp); 1664 1665 /* 1666 * Let's wait for WRITE to finish. 1667 */ 1668 mtx_lock(&sync_lock); 1669 while (!ISSYNCREQDONE(hio)) 1670 cv_wait(&sync_cond, &sync_lock); 1671 mtx_unlock(&sync_lock); 1672 1673 if (hio->hio_errors[ncomp] != 0) { 1674 pjdlog_error("Unable to write synchronization data: %s.", 1675 strerror(hio->hio_errors[ncomp])); 1676 goto free_queue; 1677 } 1678 free_queue: 1679 mtx_lock(&range_lock); 1680 rangelock_del(range_sync, offset, length); 1681 if (range_regular_wait) 1682 cv_signal(&range_regular_cond); 1683 mtx_unlock(&range_lock); 1684 1685 synced += length; 1686 1687 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1688 hio); 1689 QUEUE_INSERT2(hio, free); 1690 } 1691 /* NOTREACHED */ 1692 return (NULL); 1693 } 1694 1695 static void 1696 sighandler(int sig) 1697 { 1698 bool unlock; 1699 1700 switch (sig) { 1701 case SIGINT: 1702 case SIGTERM: 1703 sigexit_received = true; 1704 break; 1705 default: 1706 assert(!"invalid condition"); 1707 } 1708 /* 1709 * XXX: Racy, but if we cannot obtain hio_guard_lock here, we don't 1710 * want to risk deadlock. 1711 */ 1712 unlock = mtx_trylock(&hio_guard_lock); 1713 cv_signal(&hio_guard_cond); 1714 if (unlock) 1715 mtx_unlock(&hio_guard_lock); 1716 } 1717 1718 /* 1719 * Thread guards remote connections and reconnects when needed, handles 1720 * signals, etc. 1721 */ 1722 static void * 1723 guard_thread(void *arg) 1724 { 1725 struct hast_resource *res = arg; 1726 struct proto_conn *in, *out; 1727 unsigned int ii, ncomps; 1728 int timeout; 1729 1730 ncomps = HAST_NCOMPONENTS; 1731 /* The is only one remote component for now. */ 1732 #define ISREMOTE(no) ((no) == 1) 1733 1734 for (;;) { 1735 if (sigexit_received) { 1736 primary_exitx(EX_OK, 1737 "Termination signal received, exiting."); 1738 } 1739 /* 1740 * If all the connection will be fine, we will sleep until 1741 * someone wakes us up. 1742 * If any of the connections will be broken and we won't be 1743 * able to connect, we will sleep only for RECONNECT_SLEEP 1744 * seconds so we can retry soon. 1745 */ 1746 timeout = 0; 1747 pjdlog_debug(2, "remote_guard: Checking connections."); 1748 mtx_lock(&hio_guard_lock); 1749 for (ii = 0; ii < ncomps; ii++) { 1750 if (!ISREMOTE(ii)) 1751 continue; 1752 rw_rlock(&hio_remote_lock[ii]); 1753 if (ISCONNECTED(res, ii)) { 1754 assert(res->hr_remotein != NULL); 1755 assert(res->hr_remoteout != NULL); 1756 rw_unlock(&hio_remote_lock[ii]); 1757 pjdlog_debug(2, 1758 "remote_guard: Connection to %s is ok.", 1759 res->hr_remoteaddr); 1760 } else { 1761 assert(res->hr_remotein == NULL); 1762 assert(res->hr_remoteout == NULL); 1763 /* 1764 * Upgrade the lock. It doesn't have to be 1765 * atomic as no other thread can change 1766 * connection status from disconnected to 1767 * connected. 1768 */ 1769 rw_unlock(&hio_remote_lock[ii]); 1770 pjdlog_debug(2, 1771 "remote_guard: Reconnecting to %s.", 1772 res->hr_remoteaddr); 1773 in = out = NULL; 1774 if (init_remote(res, &in, &out)) { 1775 rw_wlock(&hio_remote_lock[ii]); 1776 assert(res->hr_remotein == NULL); 1777 assert(res->hr_remoteout == NULL); 1778 assert(in != NULL && out != NULL); 1779 res->hr_remotein = in; 1780 res->hr_remoteout = out; 1781 rw_unlock(&hio_remote_lock[ii]); 1782 pjdlog_info("Successfully reconnected to %s.", 1783 res->hr_remoteaddr); 1784 sync_start(); 1785 } else { 1786 /* Both connections should be NULL. */ 1787 assert(res->hr_remotein == NULL); 1788 assert(res->hr_remoteout == NULL); 1789 assert(in == NULL && out == NULL); 1790 pjdlog_debug(2, 1791 "remote_guard: Reconnect to %s failed.", 1792 res->hr_remoteaddr); 1793 timeout = RECONNECT_SLEEP; 1794 } 1795 } 1796 } 1797 (void)cv_timedwait(&hio_guard_cond, &hio_guard_lock, timeout); 1798 mtx_unlock(&hio_guard_lock); 1799 } 1800 #undef ISREMOTE 1801 /* NOTREACHED */ 1802 return (NULL); 1803 } 1804