1 /*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * All rights reserved. 4 * 5 * This software was developed by Pawel Jakub Dawidek under sponsorship from 6 * the FreeBSD Foundation. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30 #include <sys/cdefs.h> 31 __FBSDID("$FreeBSD$"); 32 33 #include <sys/types.h> 34 #include <sys/time.h> 35 #include <sys/bio.h> 36 #include <sys/disk.h> 37 #include <sys/refcount.h> 38 #include <sys/stat.h> 39 40 #include <geom/gate/g_gate.h> 41 42 #include <assert.h> 43 #include <err.h> 44 #include <errno.h> 45 #include <fcntl.h> 46 #include <libgeom.h> 47 #include <pthread.h> 48 #include <stdint.h> 49 #include <stdio.h> 50 #include <string.h> 51 #include <sysexits.h> 52 #include <unistd.h> 53 54 #include <activemap.h> 55 #include <nv.h> 56 #include <rangelock.h> 57 58 #include "control.h" 59 #include "hast.h" 60 #include "hast_proto.h" 61 #include "hastd.h" 62 #include "metadata.h" 63 #include "proto.h" 64 #include "pjdlog.h" 65 #include "subr.h" 66 #include "synch.h" 67 68 struct hio { 69 /* 70 * Number of components we are still waiting for. 71 * When this field goes to 0, we can send the request back to the 72 * kernel. Each component has to decrease this counter by one 73 * even on failure. 74 */ 75 unsigned int hio_countdown; 76 /* 77 * Each component has a place to store its own error. 78 * Once the request is handled by all components we can decide if the 79 * request overall is successful or not. 80 */ 81 int *hio_errors; 82 /* 83 * Structure used to comunicate with GEOM Gate class. 84 */ 85 struct g_gate_ctl_io hio_ggio; 86 TAILQ_ENTRY(hio) *hio_next; 87 }; 88 #define hio_free_next hio_next[0] 89 #define hio_done_next hio_next[0] 90 91 /* 92 * Free list holds unused structures. When free list is empty, we have to wait 93 * until some in-progress requests are freed. 94 */ 95 static TAILQ_HEAD(, hio) hio_free_list; 96 static pthread_mutex_t hio_free_list_lock; 97 static pthread_cond_t hio_free_list_cond; 98 /* 99 * There is one send list for every component. One requests is placed on all 100 * send lists - each component gets the same request, but each component is 101 * responsible for managing his own send list. 102 */ 103 static TAILQ_HEAD(, hio) *hio_send_list; 104 static pthread_mutex_t *hio_send_list_lock; 105 static pthread_cond_t *hio_send_list_cond; 106 /* 107 * There is one recv list for every component, although local components don't 108 * use recv lists as local requests are done synchronously. 109 */ 110 static TAILQ_HEAD(, hio) *hio_recv_list; 111 static pthread_mutex_t *hio_recv_list_lock; 112 static pthread_cond_t *hio_recv_list_cond; 113 /* 114 * Request is placed on done list by the slowest component (the one that 115 * decreased hio_countdown from 1 to 0). 116 */ 117 static TAILQ_HEAD(, hio) hio_done_list; 118 static pthread_mutex_t hio_done_list_lock; 119 static pthread_cond_t hio_done_list_cond; 120 /* 121 * Structure below are for interaction with sync thread. 122 */ 123 static bool sync_inprogress; 124 static pthread_mutex_t sync_lock; 125 static pthread_cond_t sync_cond; 126 /* 127 * The lock below allows to synchornize access to remote connections. 128 */ 129 static pthread_rwlock_t *hio_remote_lock; 130 static pthread_mutex_t hio_guard_lock; 131 static pthread_cond_t hio_guard_cond; 132 133 /* 134 * Lock to synchronize metadata updates. Also synchronize access to 135 * hr_primary_localcnt and hr_primary_remotecnt fields. 136 */ 137 static pthread_mutex_t metadata_lock; 138 139 /* 140 * Maximum number of outstanding I/O requests. 141 */ 142 #define HAST_HIO_MAX 256 143 /* 144 * Number of components. At this point there are only two components: local 145 * and remote, but in the future it might be possible to use multiple local 146 * and remote components. 147 */ 148 #define HAST_NCOMPONENTS 2 149 /* 150 * Number of seconds to sleep before next reconnect try. 151 */ 152 #define RECONNECT_SLEEP 5 153 154 #define ISCONNECTED(res, no) \ 155 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156 157 #define QUEUE_INSERT1(hio, name, ncomp) do { \ 158 bool _wakeup; \ 159 \ 160 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163 hio_next[(ncomp)]); \ 164 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165 if (_wakeup) \ 166 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167 } while (0) 168 #define QUEUE_INSERT2(hio, name) do { \ 169 bool _wakeup; \ 170 \ 171 mtx_lock(&hio_##name##_list_lock); \ 172 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174 mtx_unlock(&hio_##name##_list_lock); \ 175 if (_wakeup) \ 176 cv_signal(&hio_##name##_list_cond); \ 177 } while (0) 178 #define QUEUE_TAKE1(hio, name, ncomp) do { \ 179 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 180 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 181 cv_wait(&hio_##name##_list_cond[(ncomp)], \ 182 &hio_##name##_list_lock[(ncomp)]); \ 183 } \ 184 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 185 hio_next[(ncomp)]); \ 186 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 187 } while (0) 188 #define QUEUE_TAKE2(hio, name) do { \ 189 mtx_lock(&hio_##name##_list_lock); \ 190 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 191 cv_wait(&hio_##name##_list_cond, \ 192 &hio_##name##_list_lock); \ 193 } \ 194 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 195 mtx_unlock(&hio_##name##_list_lock); \ 196 } while (0) 197 198 #define SYNCREQ(hio) do { (hio)->hio_ggio.gctl_unit = -1; } while (0) 199 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 200 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 201 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 202 203 static struct hast_resource *gres; 204 205 static pthread_mutex_t range_lock; 206 static struct rangelocks *range_regular; 207 static bool range_regular_wait; 208 static pthread_cond_t range_regular_cond; 209 static struct rangelocks *range_sync; 210 static bool range_sync_wait; 211 static pthread_cond_t range_sync_cond; 212 213 static void *ggate_recv_thread(void *arg); 214 static void *local_send_thread(void *arg); 215 static void *remote_send_thread(void *arg); 216 static void *remote_recv_thread(void *arg); 217 static void *ggate_send_thread(void *arg); 218 static void *sync_thread(void *arg); 219 static void *guard_thread(void *arg); 220 221 static void sighandler(int sig); 222 223 static void 224 cleanup(struct hast_resource *res) 225 { 226 int rerrno; 227 228 /* Remember errno. */ 229 rerrno = errno; 230 231 /* 232 * Close descriptor to /dev/hast/<name> 233 * to work-around race in the kernel. 234 */ 235 close(res->hr_localfd); 236 237 /* Destroy ggate provider if we created one. */ 238 if (res->hr_ggateunit >= 0) { 239 struct g_gate_ctl_destroy ggiod; 240 241 ggiod.gctl_version = G_GATE_VERSION; 242 ggiod.gctl_unit = res->hr_ggateunit; 243 ggiod.gctl_force = 1; 244 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 245 pjdlog_warning("Unable to destroy hast/%s device", 246 res->hr_provname); 247 } 248 res->hr_ggateunit = -1; 249 } 250 251 /* Restore errno. */ 252 errno = rerrno; 253 } 254 255 static void 256 primary_exit(int exitcode, const char *fmt, ...) 257 { 258 va_list ap; 259 260 assert(exitcode != EX_OK); 261 va_start(ap, fmt); 262 pjdlogv_errno(LOG_ERR, fmt, ap); 263 va_end(ap); 264 cleanup(gres); 265 exit(exitcode); 266 } 267 268 static void 269 primary_exitx(int exitcode, const char *fmt, ...) 270 { 271 va_list ap; 272 273 va_start(ap, fmt); 274 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 275 va_end(ap); 276 cleanup(gres); 277 exit(exitcode); 278 } 279 280 static int 281 hast_activemap_flush(struct hast_resource *res) 282 { 283 const unsigned char *buf; 284 size_t size; 285 286 buf = activemap_bitmap(res->hr_amp, &size); 287 assert(buf != NULL); 288 assert((size % res->hr_local_sectorsize) == 0); 289 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 290 (ssize_t)size) { 291 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 292 "Unable to flush activemap to disk")); 293 return (-1); 294 } 295 return (0); 296 } 297 298 static void 299 init_environment(struct hast_resource *res __unused) 300 { 301 struct hio *hio; 302 unsigned int ii, ncomps; 303 304 /* 305 * In the future it might be per-resource value. 306 */ 307 ncomps = HAST_NCOMPONENTS; 308 309 /* 310 * Allocate memory needed by lists. 311 */ 312 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 313 if (hio_send_list == NULL) { 314 primary_exitx(EX_TEMPFAIL, 315 "Unable to allocate %zu bytes of memory for send lists.", 316 sizeof(hio_send_list[0]) * ncomps); 317 } 318 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 319 if (hio_send_list_lock == NULL) { 320 primary_exitx(EX_TEMPFAIL, 321 "Unable to allocate %zu bytes of memory for send list locks.", 322 sizeof(hio_send_list_lock[0]) * ncomps); 323 } 324 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 325 if (hio_send_list_cond == NULL) { 326 primary_exitx(EX_TEMPFAIL, 327 "Unable to allocate %zu bytes of memory for send list condition variables.", 328 sizeof(hio_send_list_cond[0]) * ncomps); 329 } 330 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 331 if (hio_recv_list == NULL) { 332 primary_exitx(EX_TEMPFAIL, 333 "Unable to allocate %zu bytes of memory for recv lists.", 334 sizeof(hio_recv_list[0]) * ncomps); 335 } 336 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 337 if (hio_recv_list_lock == NULL) { 338 primary_exitx(EX_TEMPFAIL, 339 "Unable to allocate %zu bytes of memory for recv list locks.", 340 sizeof(hio_recv_list_lock[0]) * ncomps); 341 } 342 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 343 if (hio_recv_list_cond == NULL) { 344 primary_exitx(EX_TEMPFAIL, 345 "Unable to allocate %zu bytes of memory for recv list condition variables.", 346 sizeof(hio_recv_list_cond[0]) * ncomps); 347 } 348 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 349 if (hio_remote_lock == NULL) { 350 primary_exitx(EX_TEMPFAIL, 351 "Unable to allocate %zu bytes of memory for remote connections locks.", 352 sizeof(hio_remote_lock[0]) * ncomps); 353 } 354 355 /* 356 * Initialize lists, their locks and theirs condition variables. 357 */ 358 TAILQ_INIT(&hio_free_list); 359 mtx_init(&hio_free_list_lock); 360 cv_init(&hio_free_list_cond); 361 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 362 TAILQ_INIT(&hio_send_list[ii]); 363 mtx_init(&hio_send_list_lock[ii]); 364 cv_init(&hio_send_list_cond[ii]); 365 TAILQ_INIT(&hio_recv_list[ii]); 366 mtx_init(&hio_recv_list_lock[ii]); 367 cv_init(&hio_recv_list_cond[ii]); 368 rw_init(&hio_remote_lock[ii]); 369 } 370 TAILQ_INIT(&hio_done_list); 371 mtx_init(&hio_done_list_lock); 372 cv_init(&hio_done_list_cond); 373 mtx_init(&hio_guard_lock); 374 cv_init(&hio_guard_cond); 375 mtx_init(&metadata_lock); 376 377 /* 378 * Allocate requests pool and initialize requests. 379 */ 380 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 381 hio = malloc(sizeof(*hio)); 382 if (hio == NULL) { 383 primary_exitx(EX_TEMPFAIL, 384 "Unable to allocate %zu bytes of memory for hio request.", 385 sizeof(*hio)); 386 } 387 hio->hio_countdown = 0; 388 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 389 if (hio->hio_errors == NULL) { 390 primary_exitx(EX_TEMPFAIL, 391 "Unable allocate %zu bytes of memory for hio errors.", 392 sizeof(hio->hio_errors[0]) * ncomps); 393 } 394 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 395 if (hio->hio_next == NULL) { 396 primary_exitx(EX_TEMPFAIL, 397 "Unable allocate %zu bytes of memory for hio_next field.", 398 sizeof(hio->hio_next[0]) * ncomps); 399 } 400 hio->hio_ggio.gctl_version = G_GATE_VERSION; 401 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 402 if (hio->hio_ggio.gctl_data == NULL) { 403 primary_exitx(EX_TEMPFAIL, 404 "Unable to allocate %zu bytes of memory for gctl_data.", 405 MAXPHYS); 406 } 407 hio->hio_ggio.gctl_length = MAXPHYS; 408 hio->hio_ggio.gctl_error = 0; 409 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 410 } 411 412 /* 413 * Turn on signals handling. 414 */ 415 signal(SIGINT, sighandler); 416 signal(SIGTERM, sighandler); 417 } 418 419 static void 420 init_local(struct hast_resource *res) 421 { 422 unsigned char *buf; 423 size_t mapsize; 424 425 if (metadata_read(res, true) < 0) 426 exit(EX_NOINPUT); 427 mtx_init(&res->hr_amp_lock); 428 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 429 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 430 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 431 } 432 mtx_init(&range_lock); 433 cv_init(&range_regular_cond); 434 if (rangelock_init(&range_regular) < 0) 435 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 436 cv_init(&range_sync_cond); 437 if (rangelock_init(&range_sync) < 0) 438 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 439 mapsize = activemap_ondisk_size(res->hr_amp); 440 buf = calloc(1, mapsize); 441 if (buf == NULL) { 442 primary_exitx(EX_TEMPFAIL, 443 "Unable to allocate buffer for activemap."); 444 } 445 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 446 (ssize_t)mapsize) { 447 primary_exit(EX_NOINPUT, "Unable to read activemap"); 448 } 449 activemap_copyin(res->hr_amp, buf, mapsize); 450 if (res->hr_resuid != 0) 451 return; 452 /* 453 * We're using provider for the first time, so we have to generate 454 * resource unique identifier and initialize local and remote counts. 455 */ 456 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 457 res->hr_primary_localcnt = 1; 458 res->hr_primary_remotecnt = 0; 459 if (metadata_write(res) < 0) 460 exit(EX_NOINPUT); 461 } 462 463 static void 464 init_remote(struct hast_resource *res) 465 { 466 struct nv *nvout, *nvin; 467 const unsigned char *token; 468 unsigned char *map; 469 const char *errmsg; 470 int32_t extentsize; 471 int64_t datasize; 472 uint32_t mapsize; 473 size_t size; 474 475 /* Prepare outgoing connection with remote node. */ 476 if (proto_client(res->hr_remoteaddr, &res->hr_remoteout) < 0) { 477 primary_exit(EX_OSERR, "Unable to create connection to %s", 478 res->hr_remoteaddr); 479 } 480 /* Try to connect, but accept failure. */ 481 if (proto_connect(res->hr_remoteout) < 0) { 482 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 483 res->hr_remoteaddr); 484 goto close; 485 } 486 /* 487 * First handshake step. 488 * Setup outgoing connection with remote node. 489 */ 490 nvout = nv_alloc(); 491 nv_add_string(nvout, res->hr_name, "resource"); 492 if (nv_error(nvout) != 0) { 493 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 494 "Unable to allocate header for connection with %s", 495 res->hr_remoteaddr); 496 nv_free(nvout); 497 goto close; 498 } 499 if (hast_proto_send(res, res->hr_remoteout, nvout, NULL, 0) < 0) { 500 pjdlog_errno(LOG_WARNING, 501 "Unable to send handshake header to %s", 502 res->hr_remoteaddr); 503 nv_free(nvout); 504 goto close; 505 } 506 nv_free(nvout); 507 if (hast_proto_recv_hdr(res->hr_remoteout, &nvin) < 0) { 508 pjdlog_errno(LOG_WARNING, 509 "Unable to receive handshake header from %s", 510 res->hr_remoteaddr); 511 goto close; 512 } 513 errmsg = nv_get_string(nvin, "errmsg"); 514 if (errmsg != NULL) { 515 pjdlog_warning("%s", errmsg); 516 nv_free(nvin); 517 goto close; 518 } 519 token = nv_get_uint8_array(nvin, &size, "token"); 520 if (token == NULL) { 521 pjdlog_warning("Handshake header from %s has no 'token' field.", 522 res->hr_remoteaddr); 523 nv_free(nvin); 524 goto close; 525 } 526 if (size != sizeof(res->hr_token)) { 527 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 528 res->hr_remoteaddr, size, sizeof(res->hr_token)); 529 nv_free(nvin); 530 goto close; 531 } 532 bcopy(token, res->hr_token, sizeof(res->hr_token)); 533 nv_free(nvin); 534 535 /* 536 * Second handshake step. 537 * Setup incoming connection with remote node. 538 */ 539 if (proto_client(res->hr_remoteaddr, &res->hr_remotein) < 0) { 540 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 541 res->hr_remoteaddr); 542 } 543 /* Try to connect, but accept failure. */ 544 if (proto_connect(res->hr_remotein) < 0) { 545 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 546 res->hr_remoteaddr); 547 goto close; 548 } 549 nvout = nv_alloc(); 550 nv_add_string(nvout, res->hr_name, "resource"); 551 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 552 "token"); 553 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 554 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 555 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 556 if (nv_error(nvout) != 0) { 557 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 558 "Unable to allocate header for connection with %s", 559 res->hr_remoteaddr); 560 nv_free(nvout); 561 goto close; 562 } 563 if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) < 0) { 564 pjdlog_errno(LOG_WARNING, 565 "Unable to send handshake header to %s", 566 res->hr_remoteaddr); 567 nv_free(nvout); 568 goto close; 569 } 570 nv_free(nvout); 571 if (hast_proto_recv_hdr(res->hr_remoteout, &nvin) < 0) { 572 pjdlog_errno(LOG_WARNING, 573 "Unable to receive handshake header from %s", 574 res->hr_remoteaddr); 575 goto close; 576 } 577 errmsg = nv_get_string(nvin, "errmsg"); 578 if (errmsg != NULL) { 579 pjdlog_warning("%s", errmsg); 580 nv_free(nvin); 581 goto close; 582 } 583 datasize = nv_get_int64(nvin, "datasize"); 584 if (datasize != res->hr_datasize) { 585 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 586 (intmax_t)res->hr_datasize, (intmax_t)datasize); 587 nv_free(nvin); 588 goto close; 589 } 590 extentsize = nv_get_int32(nvin, "extentsize"); 591 if (extentsize != res->hr_extentsize) { 592 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 593 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 594 nv_free(nvin); 595 goto close; 596 } 597 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 598 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 599 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 600 map = NULL; 601 mapsize = nv_get_uint32(nvin, "mapsize"); 602 if (mapsize > 0) { 603 map = malloc(mapsize); 604 if (map == NULL) { 605 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 606 (uintmax_t)mapsize); 607 nv_free(nvin); 608 goto close; 609 } 610 /* 611 * Remote node have some dirty extents on its own, lets 612 * download its activemap. 613 */ 614 if (hast_proto_recv_data(res, res->hr_remoteout, nvin, map, 615 mapsize) < 0) { 616 pjdlog_errno(LOG_ERR, 617 "Unable to receive remote activemap"); 618 nv_free(nvin); 619 free(map); 620 goto close; 621 } 622 /* 623 * Merge local and remote bitmaps. 624 */ 625 activemap_merge(res->hr_amp, map, mapsize); 626 free(map); 627 /* 628 * Now that we merged bitmaps from both nodes, flush it to the 629 * disk before we start to synchronize. 630 */ 631 (void)hast_activemap_flush(res); 632 } 633 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 634 mtx_lock(&sync_lock); 635 sync_inprogress = true; 636 mtx_unlock(&sync_lock); 637 cv_signal(&sync_cond); 638 return; 639 close: 640 proto_close(res->hr_remoteout); 641 res->hr_remoteout = NULL; 642 if (res->hr_remotein != NULL) { 643 proto_close(res->hr_remotein); 644 res->hr_remotein = NULL; 645 } 646 } 647 648 static void 649 init_ggate(struct hast_resource *res) 650 { 651 struct g_gate_ctl_create ggiocreate; 652 struct g_gate_ctl_cancel ggiocancel; 653 654 /* 655 * We communicate with ggate via /dev/ggctl. Open it. 656 */ 657 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 658 if (res->hr_ggatefd < 0) 659 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 660 /* 661 * Create provider before trying to connect, as connection failure 662 * is not critical, but may take some time. 663 */ 664 ggiocreate.gctl_version = G_GATE_VERSION; 665 ggiocreate.gctl_mediasize = res->hr_datasize; 666 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 667 ggiocreate.gctl_flags = 0; 668 ggiocreate.gctl_maxcount = 128; 669 ggiocreate.gctl_timeout = 0; 670 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 671 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 672 res->hr_provname); 673 bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info)); 674 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 675 pjdlog_info("Device hast/%s created.", res->hr_provname); 676 res->hr_ggateunit = ggiocreate.gctl_unit; 677 return; 678 } 679 if (errno != EEXIST) { 680 primary_exit(EX_OSERR, "Unable to create hast/%s device", 681 res->hr_provname); 682 } 683 pjdlog_debug(1, 684 "Device hast/%s already exists, we will try to take it over.", 685 res->hr_provname); 686 /* 687 * If we received EEXIST, we assume that the process who created the 688 * provider died and didn't clean up. In that case we will start from 689 * where he left of. 690 */ 691 ggiocancel.gctl_version = G_GATE_VERSION; 692 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 693 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 694 res->hr_provname); 695 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 696 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 697 res->hr_ggateunit = ggiocancel.gctl_unit; 698 return; 699 } 700 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 701 res->hr_provname); 702 } 703 704 void 705 hastd_primary(struct hast_resource *res) 706 { 707 pthread_t td; 708 pid_t pid; 709 int error; 710 711 gres = res; 712 713 /* 714 * Create communication channel between parent and child. 715 */ 716 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 717 KEEP_ERRNO((void)pidfile_remove(pfh)); 718 primary_exit(EX_OSERR, 719 "Unable to create control sockets between parent and child"); 720 } 721 722 pid = fork(); 723 if (pid < 0) { 724 KEEP_ERRNO((void)pidfile_remove(pfh)); 725 primary_exit(EX_OSERR, "Unable to fork"); 726 } 727 728 if (pid > 0) { 729 /* This is parent. */ 730 res->hr_workerpid = pid; 731 return; 732 } 733 (void)pidfile_close(pfh); 734 735 setproctitle("%s (primary)", res->hr_name); 736 737 init_local(res); 738 init_remote(res); 739 init_ggate(res); 740 init_environment(res); 741 error = pthread_create(&td, NULL, ggate_recv_thread, res); 742 assert(error == 0); 743 error = pthread_create(&td, NULL, local_send_thread, res); 744 assert(error == 0); 745 error = pthread_create(&td, NULL, remote_send_thread, res); 746 assert(error == 0); 747 error = pthread_create(&td, NULL, remote_recv_thread, res); 748 assert(error == 0); 749 error = pthread_create(&td, NULL, ggate_send_thread, res); 750 assert(error == 0); 751 error = pthread_create(&td, NULL, sync_thread, res); 752 assert(error == 0); 753 error = pthread_create(&td, NULL, ctrl_thread, res); 754 assert(error == 0); 755 (void)guard_thread(res); 756 } 757 758 static void 759 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 760 { 761 char msg[1024]; 762 va_list ap; 763 int len; 764 765 va_start(ap, fmt); 766 len = vsnprintf(msg, sizeof(msg), fmt, ap); 767 va_end(ap); 768 if ((size_t)len < sizeof(msg)) { 769 switch (ggio->gctl_cmd) { 770 case BIO_READ: 771 (void)snprintf(msg + len, sizeof(msg) - len, 772 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 773 (uintmax_t)ggio->gctl_length); 774 break; 775 case BIO_DELETE: 776 (void)snprintf(msg + len, sizeof(msg) - len, 777 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 778 (uintmax_t)ggio->gctl_length); 779 break; 780 case BIO_FLUSH: 781 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 782 break; 783 case BIO_WRITE: 784 (void)snprintf(msg + len, sizeof(msg) - len, 785 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 786 (uintmax_t)ggio->gctl_length); 787 break; 788 default: 789 (void)snprintf(msg + len, sizeof(msg) - len, 790 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 791 break; 792 } 793 } 794 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 795 } 796 797 static void 798 remote_close(struct hast_resource *res, int ncomp) 799 { 800 801 rw_wlock(&hio_remote_lock[ncomp]); 802 /* 803 * A race is possible between dropping rlock and acquiring wlock - 804 * another thread can close connection in-between. 805 */ 806 if (!ISCONNECTED(res, ncomp)) { 807 assert(res->hr_remotein == NULL); 808 assert(res->hr_remoteout == NULL); 809 rw_unlock(&hio_remote_lock[ncomp]); 810 return; 811 } 812 813 assert(res->hr_remotein != NULL); 814 assert(res->hr_remoteout != NULL); 815 816 pjdlog_debug(2, "Closing old incoming connection to %s.", 817 res->hr_remoteaddr); 818 proto_close(res->hr_remotein); 819 res->hr_remotein = NULL; 820 pjdlog_debug(2, "Closing old outgoing connection to %s.", 821 res->hr_remoteaddr); 822 proto_close(res->hr_remoteout); 823 res->hr_remoteout = NULL; 824 825 rw_unlock(&hio_remote_lock[ncomp]); 826 827 /* 828 * Stop synchronization if in-progress. 829 */ 830 mtx_lock(&sync_lock); 831 if (sync_inprogress) 832 sync_inprogress = false; 833 mtx_unlock(&sync_lock); 834 835 /* 836 * Wake up guard thread, so it can immediately start reconnect. 837 */ 838 mtx_lock(&hio_guard_lock); 839 cv_signal(&hio_guard_cond); 840 mtx_unlock(&hio_guard_lock); 841 } 842 843 /* 844 * Thread receives ggate I/O requests from the kernel and passes them to 845 * appropriate threads: 846 * WRITE - always goes to both local_send and remote_send threads 847 * READ (when the block is up-to-date on local component) - 848 * only local_send thread 849 * READ (when the block isn't up-to-date on local component) - 850 * only remote_send thread 851 * DELETE - always goes to both local_send and remote_send threads 852 * FLUSH - always goes to both local_send and remote_send threads 853 */ 854 static void * 855 ggate_recv_thread(void *arg) 856 { 857 struct hast_resource *res = arg; 858 struct g_gate_ctl_io *ggio; 859 struct hio *hio; 860 unsigned int ii, ncomp, ncomps; 861 int error; 862 863 ncomps = HAST_NCOMPONENTS; 864 865 for (;;) { 866 pjdlog_debug(2, "ggate_recv: Taking free request."); 867 QUEUE_TAKE2(hio, free); 868 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 869 ggio = &hio->hio_ggio; 870 ggio->gctl_unit = res->hr_ggateunit; 871 ggio->gctl_length = MAXPHYS; 872 ggio->gctl_error = 0; 873 pjdlog_debug(2, 874 "ggate_recv: (%p) Waiting for request from the kernel.", 875 hio); 876 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 877 if (sigexit_received) 878 pthread_exit(NULL); 879 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 880 } 881 error = ggio->gctl_error; 882 switch (error) { 883 case 0: 884 break; 885 case ECANCELED: 886 /* Exit gracefully. */ 887 if (!sigexit_received) { 888 pjdlog_debug(2, 889 "ggate_recv: (%p) Received cancel from the kernel.", 890 hio); 891 pjdlog_info("Received cancel from the kernel, exiting."); 892 } 893 pthread_exit(NULL); 894 case ENOMEM: 895 /* 896 * Buffer too small? Impossible, we allocate MAXPHYS 897 * bytes - request can't be bigger than that. 898 */ 899 /* FALLTHROUGH */ 900 case ENXIO: 901 default: 902 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 903 strerror(error)); 904 } 905 for (ii = 0; ii < ncomps; ii++) 906 hio->hio_errors[ii] = EINVAL; 907 reqlog(LOG_DEBUG, 2, ggio, 908 "ggate_recv: (%p) Request received from the kernel: ", 909 hio); 910 /* 911 * Inform all components about new write request. 912 * For read request prefer local component unless the given 913 * range is out-of-date, then use remote component. 914 */ 915 switch (ggio->gctl_cmd) { 916 case BIO_READ: 917 pjdlog_debug(2, 918 "ggate_recv: (%p) Moving request to the send queue.", 919 hio); 920 refcount_init(&hio->hio_countdown, 1); 921 mtx_lock(&metadata_lock); 922 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 923 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 924 /* 925 * This range is up-to-date on local component, 926 * so handle request locally. 927 */ 928 /* Local component is 0 for now. */ 929 ncomp = 0; 930 } else /* if (res->hr_syncsrc == 931 HAST_SYNCSRC_SECONDARY) */ { 932 assert(res->hr_syncsrc == 933 HAST_SYNCSRC_SECONDARY); 934 /* 935 * This range is out-of-date on local component, 936 * so send request to the remote node. 937 */ 938 /* Remote component is 1 for now. */ 939 ncomp = 1; 940 } 941 mtx_unlock(&metadata_lock); 942 QUEUE_INSERT1(hio, send, ncomp); 943 break; 944 case BIO_WRITE: 945 for (;;) { 946 mtx_lock(&range_lock); 947 if (rangelock_islocked(range_sync, 948 ggio->gctl_offset, ggio->gctl_length)) { 949 pjdlog_debug(2, 950 "regular: Range offset=%jd length=%zu locked.", 951 (intmax_t)ggio->gctl_offset, 952 (size_t)ggio->gctl_length); 953 range_regular_wait = true; 954 cv_wait(&range_regular_cond, &range_lock); 955 range_regular_wait = false; 956 mtx_unlock(&range_lock); 957 continue; 958 } 959 if (rangelock_add(range_regular, 960 ggio->gctl_offset, ggio->gctl_length) < 0) { 961 mtx_unlock(&range_lock); 962 pjdlog_debug(2, 963 "regular: Range offset=%jd length=%zu is already locked, waiting.", 964 (intmax_t)ggio->gctl_offset, 965 (size_t)ggio->gctl_length); 966 sleep(1); 967 continue; 968 } 969 mtx_unlock(&range_lock); 970 break; 971 } 972 mtx_lock(&res->hr_amp_lock); 973 if (activemap_write_start(res->hr_amp, 974 ggio->gctl_offset, ggio->gctl_length)) { 975 (void)hast_activemap_flush(res); 976 } 977 mtx_unlock(&res->hr_amp_lock); 978 /* FALLTHROUGH */ 979 case BIO_DELETE: 980 case BIO_FLUSH: 981 pjdlog_debug(2, 982 "ggate_recv: (%p) Moving request to the send queues.", 983 hio); 984 refcount_init(&hio->hio_countdown, ncomps); 985 for (ii = 0; ii < ncomps; ii++) 986 QUEUE_INSERT1(hio, send, ii); 987 break; 988 } 989 } 990 /* NOTREACHED */ 991 return (NULL); 992 } 993 994 /* 995 * Thread reads from or writes to local component. 996 * If local read fails, it redirects it to remote_send thread. 997 */ 998 static void * 999 local_send_thread(void *arg) 1000 { 1001 struct hast_resource *res = arg; 1002 struct g_gate_ctl_io *ggio; 1003 struct hio *hio; 1004 unsigned int ncomp, rncomp; 1005 ssize_t ret; 1006 1007 /* Local component is 0 for now. */ 1008 ncomp = 0; 1009 /* Remote component is 1 for now. */ 1010 rncomp = 1; 1011 1012 for (;;) { 1013 pjdlog_debug(2, "local_send: Taking request."); 1014 QUEUE_TAKE1(hio, send, ncomp); 1015 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1016 ggio = &hio->hio_ggio; 1017 switch (ggio->gctl_cmd) { 1018 case BIO_READ: 1019 ret = pread(res->hr_localfd, ggio->gctl_data, 1020 ggio->gctl_length, 1021 ggio->gctl_offset + res->hr_localoff); 1022 if (ret == ggio->gctl_length) 1023 hio->hio_errors[ncomp] = 0; 1024 else { 1025 /* 1026 * If READ failed, try to read from remote node. 1027 */ 1028 QUEUE_INSERT1(hio, send, rncomp); 1029 continue; 1030 } 1031 break; 1032 case BIO_WRITE: 1033 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1034 ggio->gctl_length, 1035 ggio->gctl_offset + res->hr_localoff); 1036 if (ret < 0) 1037 hio->hio_errors[ncomp] = errno; 1038 else if (ret != ggio->gctl_length) 1039 hio->hio_errors[ncomp] = EIO; 1040 else 1041 hio->hio_errors[ncomp] = 0; 1042 break; 1043 case BIO_DELETE: 1044 ret = g_delete(res->hr_localfd, 1045 ggio->gctl_offset + res->hr_localoff, 1046 ggio->gctl_length); 1047 if (ret < 0) 1048 hio->hio_errors[ncomp] = errno; 1049 else 1050 hio->hio_errors[ncomp] = 0; 1051 break; 1052 case BIO_FLUSH: 1053 ret = g_flush(res->hr_localfd); 1054 if (ret < 0) 1055 hio->hio_errors[ncomp] = errno; 1056 else 1057 hio->hio_errors[ncomp] = 0; 1058 break; 1059 } 1060 if (refcount_release(&hio->hio_countdown)) { 1061 if (ISSYNCREQ(hio)) { 1062 mtx_lock(&sync_lock); 1063 SYNCREQDONE(hio); 1064 mtx_unlock(&sync_lock); 1065 cv_signal(&sync_cond); 1066 } else { 1067 pjdlog_debug(2, 1068 "local_send: (%p) Moving request to the done queue.", 1069 hio); 1070 QUEUE_INSERT2(hio, done); 1071 } 1072 } 1073 } 1074 /* NOTREACHED */ 1075 return (NULL); 1076 } 1077 1078 /* 1079 * Thread sends request to secondary node. 1080 */ 1081 static void * 1082 remote_send_thread(void *arg) 1083 { 1084 struct hast_resource *res = arg; 1085 struct g_gate_ctl_io *ggio; 1086 struct hio *hio; 1087 struct nv *nv; 1088 unsigned int ncomp; 1089 bool wakeup; 1090 uint64_t offset, length; 1091 uint8_t cmd; 1092 void *data; 1093 1094 /* Remote component is 1 for now. */ 1095 ncomp = 1; 1096 1097 for (;;) { 1098 pjdlog_debug(2, "remote_send: Taking request."); 1099 QUEUE_TAKE1(hio, send, ncomp); 1100 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1101 ggio = &hio->hio_ggio; 1102 switch (ggio->gctl_cmd) { 1103 case BIO_READ: 1104 cmd = HIO_READ; 1105 data = NULL; 1106 offset = ggio->gctl_offset; 1107 length = ggio->gctl_length; 1108 break; 1109 case BIO_WRITE: 1110 cmd = HIO_WRITE; 1111 data = ggio->gctl_data; 1112 offset = ggio->gctl_offset; 1113 length = ggio->gctl_length; 1114 break; 1115 case BIO_DELETE: 1116 cmd = HIO_DELETE; 1117 data = NULL; 1118 offset = ggio->gctl_offset; 1119 length = ggio->gctl_length; 1120 break; 1121 case BIO_FLUSH: 1122 cmd = HIO_FLUSH; 1123 data = NULL; 1124 offset = 0; 1125 length = 0; 1126 break; 1127 default: 1128 assert(!"invalid condition"); 1129 abort(); 1130 } 1131 nv = nv_alloc(); 1132 nv_add_uint8(nv, cmd, "cmd"); 1133 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1134 nv_add_uint64(nv, offset, "offset"); 1135 nv_add_uint64(nv, length, "length"); 1136 if (nv_error(nv) != 0) { 1137 hio->hio_errors[ncomp] = nv_error(nv); 1138 pjdlog_debug(2, 1139 "remote_send: (%p) Unable to prepare header to send.", 1140 hio); 1141 reqlog(LOG_ERR, 0, ggio, 1142 "Unable to prepare header to send (%s): ", 1143 strerror(nv_error(nv))); 1144 /* Move failed request immediately to the done queue. */ 1145 goto done_queue; 1146 } 1147 pjdlog_debug(2, 1148 "remote_send: (%p) Moving request to the recv queue.", 1149 hio); 1150 /* 1151 * Protect connection from disappearing. 1152 */ 1153 rw_rlock(&hio_remote_lock[ncomp]); 1154 if (!ISCONNECTED(res, ncomp)) { 1155 rw_unlock(&hio_remote_lock[ncomp]); 1156 hio->hio_errors[ncomp] = ENOTCONN; 1157 goto done_queue; 1158 } 1159 /* 1160 * Move the request to recv queue before sending it, because 1161 * in different order we can get reply before we move request 1162 * to recv queue. 1163 */ 1164 mtx_lock(&hio_recv_list_lock[ncomp]); 1165 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1166 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1167 mtx_unlock(&hio_recv_list_lock[ncomp]); 1168 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1169 data != NULL ? length : 0) < 0) { 1170 hio->hio_errors[ncomp] = errno; 1171 rw_unlock(&hio_remote_lock[ncomp]); 1172 remote_close(res, ncomp); 1173 pjdlog_debug(2, 1174 "remote_send: (%p) Unable to send request.", hio); 1175 reqlog(LOG_ERR, 0, ggio, 1176 "Unable to send request (%s): ", 1177 strerror(hio->hio_errors[ncomp])); 1178 /* 1179 * Take request back from the receive queue and move 1180 * it immediately to the done queue. 1181 */ 1182 mtx_lock(&hio_recv_list_lock[ncomp]); 1183 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1184 mtx_unlock(&hio_recv_list_lock[ncomp]); 1185 goto done_queue; 1186 } 1187 rw_unlock(&hio_remote_lock[ncomp]); 1188 nv_free(nv); 1189 if (wakeup) 1190 cv_signal(&hio_recv_list_cond[ncomp]); 1191 continue; 1192 done_queue: 1193 nv_free(nv); 1194 if (ISSYNCREQ(hio)) { 1195 if (!refcount_release(&hio->hio_countdown)) 1196 continue; 1197 mtx_lock(&sync_lock); 1198 SYNCREQDONE(hio); 1199 mtx_unlock(&sync_lock); 1200 cv_signal(&sync_cond); 1201 continue; 1202 } 1203 if (ggio->gctl_cmd == BIO_WRITE) { 1204 mtx_lock(&res->hr_amp_lock); 1205 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1206 ggio->gctl_length)) { 1207 (void)hast_activemap_flush(res); 1208 } 1209 mtx_unlock(&res->hr_amp_lock); 1210 } 1211 if (!refcount_release(&hio->hio_countdown)) 1212 continue; 1213 pjdlog_debug(2, 1214 "remote_send: (%p) Moving request to the done queue.", 1215 hio); 1216 QUEUE_INSERT2(hio, done); 1217 } 1218 /* NOTREACHED */ 1219 return (NULL); 1220 } 1221 1222 /* 1223 * Thread receives answer from secondary node and passes it to ggate_send 1224 * thread. 1225 */ 1226 static void * 1227 remote_recv_thread(void *arg) 1228 { 1229 struct hast_resource *res = arg; 1230 struct g_gate_ctl_io *ggio; 1231 struct hio *hio; 1232 struct nv *nv; 1233 unsigned int ncomp; 1234 uint64_t seq; 1235 int error; 1236 1237 /* Remote component is 1 for now. */ 1238 ncomp = 1; 1239 1240 for (;;) { 1241 /* Wait until there is anything to receive. */ 1242 mtx_lock(&hio_recv_list_lock[ncomp]); 1243 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1244 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1245 cv_wait(&hio_recv_list_cond[ncomp], 1246 &hio_recv_list_lock[ncomp]); 1247 } 1248 mtx_unlock(&hio_recv_list_lock[ncomp]); 1249 rw_rlock(&hio_remote_lock[ncomp]); 1250 if (!ISCONNECTED(res, ncomp)) { 1251 rw_unlock(&hio_remote_lock[ncomp]); 1252 /* 1253 * Connection is dead, so move all pending requests to 1254 * the done queue (one-by-one). 1255 */ 1256 mtx_lock(&hio_recv_list_lock[ncomp]); 1257 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1258 assert(hio != NULL); 1259 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1260 hio_next[ncomp]); 1261 mtx_unlock(&hio_recv_list_lock[ncomp]); 1262 goto done_queue; 1263 } 1264 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1265 pjdlog_errno(LOG_ERR, 1266 "Unable to receive reply header"); 1267 rw_unlock(&hio_remote_lock[ncomp]); 1268 remote_close(res, ncomp); 1269 continue; 1270 } 1271 rw_unlock(&hio_remote_lock[ncomp]); 1272 seq = nv_get_uint64(nv, "seq"); 1273 if (seq == 0) { 1274 pjdlog_error("Header contains no 'seq' field."); 1275 nv_free(nv); 1276 continue; 1277 } 1278 mtx_lock(&hio_recv_list_lock[ncomp]); 1279 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1280 if (hio->hio_ggio.gctl_seq == seq) { 1281 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1282 hio_next[ncomp]); 1283 break; 1284 } 1285 } 1286 mtx_unlock(&hio_recv_list_lock[ncomp]); 1287 if (hio == NULL) { 1288 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1289 (uintmax_t)seq); 1290 nv_free(nv); 1291 continue; 1292 } 1293 error = nv_get_int16(nv, "error"); 1294 if (error != 0) { 1295 /* Request failed on remote side. */ 1296 hio->hio_errors[ncomp] = 0; 1297 nv_free(nv); 1298 goto done_queue; 1299 } 1300 ggio = &hio->hio_ggio; 1301 switch (ggio->gctl_cmd) { 1302 case BIO_READ: 1303 rw_rlock(&hio_remote_lock[ncomp]); 1304 if (!ISCONNECTED(res, ncomp)) { 1305 rw_unlock(&hio_remote_lock[ncomp]); 1306 nv_free(nv); 1307 goto done_queue; 1308 } 1309 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1310 ggio->gctl_data, ggio->gctl_length) < 0) { 1311 hio->hio_errors[ncomp] = errno; 1312 pjdlog_errno(LOG_ERR, 1313 "Unable to receive reply data"); 1314 rw_unlock(&hio_remote_lock[ncomp]); 1315 nv_free(nv); 1316 remote_close(res, ncomp); 1317 goto done_queue; 1318 } 1319 rw_unlock(&hio_remote_lock[ncomp]); 1320 break; 1321 case BIO_WRITE: 1322 case BIO_DELETE: 1323 case BIO_FLUSH: 1324 break; 1325 default: 1326 assert(!"invalid condition"); 1327 abort(); 1328 } 1329 hio->hio_errors[ncomp] = 0; 1330 nv_free(nv); 1331 done_queue: 1332 if (refcount_release(&hio->hio_countdown)) { 1333 if (ISSYNCREQ(hio)) { 1334 mtx_lock(&sync_lock); 1335 SYNCREQDONE(hio); 1336 mtx_unlock(&sync_lock); 1337 cv_signal(&sync_cond); 1338 } else { 1339 pjdlog_debug(2, 1340 "remote_recv: (%p) Moving request to the done queue.", 1341 hio); 1342 QUEUE_INSERT2(hio, done); 1343 } 1344 } 1345 } 1346 /* NOTREACHED */ 1347 return (NULL); 1348 } 1349 1350 /* 1351 * Thread sends answer to the kernel. 1352 */ 1353 static void * 1354 ggate_send_thread(void *arg) 1355 { 1356 struct hast_resource *res = arg; 1357 struct g_gate_ctl_io *ggio; 1358 struct hio *hio; 1359 unsigned int ii, ncomp, ncomps; 1360 1361 ncomps = HAST_NCOMPONENTS; 1362 1363 for (;;) { 1364 pjdlog_debug(2, "ggate_send: Taking request."); 1365 QUEUE_TAKE2(hio, done); 1366 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1367 ggio = &hio->hio_ggio; 1368 for (ii = 0; ii < ncomps; ii++) { 1369 if (hio->hio_errors[ii] == 0) { 1370 /* 1371 * One successful request is enough to declare 1372 * success. 1373 */ 1374 ggio->gctl_error = 0; 1375 break; 1376 } 1377 } 1378 if (ii == ncomps) { 1379 /* 1380 * None of the requests were successful. 1381 * Use first error. 1382 */ 1383 ggio->gctl_error = hio->hio_errors[0]; 1384 } 1385 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1386 mtx_lock(&res->hr_amp_lock); 1387 activemap_write_complete(res->hr_amp, 1388 ggio->gctl_offset, ggio->gctl_length); 1389 mtx_unlock(&res->hr_amp_lock); 1390 } 1391 if (ggio->gctl_cmd == BIO_WRITE) { 1392 /* 1393 * Unlock range we locked. 1394 */ 1395 mtx_lock(&range_lock); 1396 rangelock_del(range_regular, ggio->gctl_offset, 1397 ggio->gctl_length); 1398 if (range_sync_wait) 1399 cv_signal(&range_sync_cond); 1400 mtx_unlock(&range_lock); 1401 /* 1402 * Bump local count if this is first write after 1403 * connection failure with remote node. 1404 */ 1405 ncomp = 1; 1406 rw_rlock(&hio_remote_lock[ncomp]); 1407 if (!ISCONNECTED(res, ncomp)) { 1408 mtx_lock(&metadata_lock); 1409 if (res->hr_primary_localcnt == 1410 res->hr_secondary_remotecnt) { 1411 res->hr_primary_localcnt++; 1412 pjdlog_debug(1, 1413 "Increasing localcnt to %ju.", 1414 (uintmax_t)res->hr_primary_localcnt); 1415 (void)metadata_write(res); 1416 } 1417 mtx_unlock(&metadata_lock); 1418 } 1419 rw_unlock(&hio_remote_lock[ncomp]); 1420 } 1421 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1422 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1423 pjdlog_debug(2, 1424 "ggate_send: (%p) Moving request to the free queue.", hio); 1425 QUEUE_INSERT2(hio, free); 1426 } 1427 /* NOTREACHED */ 1428 return (NULL); 1429 } 1430 1431 /* 1432 * Thread synchronize local and remote components. 1433 */ 1434 static void * 1435 sync_thread(void *arg __unused) 1436 { 1437 struct hast_resource *res = arg; 1438 struct hio *hio; 1439 struct g_gate_ctl_io *ggio; 1440 unsigned int ii, ncomp, ncomps; 1441 off_t offset, length, synced; 1442 bool dorewind; 1443 int syncext; 1444 1445 ncomps = HAST_NCOMPONENTS; 1446 dorewind = true; 1447 synced = 0; 1448 1449 for (;;) { 1450 mtx_lock(&sync_lock); 1451 while (!sync_inprogress) { 1452 dorewind = true; 1453 synced = 0; 1454 cv_wait(&sync_cond, &sync_lock); 1455 } 1456 mtx_unlock(&sync_lock); 1457 /* 1458 * Obtain offset at which we should synchronize. 1459 * Rewind synchronization if needed. 1460 */ 1461 mtx_lock(&res->hr_amp_lock); 1462 if (dorewind) 1463 activemap_sync_rewind(res->hr_amp); 1464 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1465 if (syncext != -1) { 1466 /* 1467 * We synchronized entire syncext extent, we can mark 1468 * it as clean now. 1469 */ 1470 if (activemap_extent_complete(res->hr_amp, syncext)) 1471 (void)hast_activemap_flush(res); 1472 } 1473 mtx_unlock(&res->hr_amp_lock); 1474 if (dorewind) { 1475 dorewind = false; 1476 if (offset < 0) 1477 pjdlog_info("Nodes are in sync."); 1478 else { 1479 pjdlog_info("Synchronization started. %ju bytes to go.", 1480 (uintmax_t)(res->hr_extentsize * 1481 activemap_ndirty(res->hr_amp))); 1482 } 1483 } 1484 if (offset < 0) { 1485 mtx_lock(&sync_lock); 1486 sync_inprogress = false; 1487 mtx_unlock(&sync_lock); 1488 pjdlog_debug(1, "Nothing to synchronize."); 1489 /* 1490 * Synchronization complete, make both localcnt and 1491 * remotecnt equal. 1492 */ 1493 ncomp = 1; 1494 rw_rlock(&hio_remote_lock[ncomp]); 1495 if (ISCONNECTED(res, ncomp)) { 1496 if (synced > 0) { 1497 pjdlog_info("Synchronization complete. " 1498 "%jd bytes synchronized.", 1499 (intmax_t)synced); 1500 } 1501 mtx_lock(&metadata_lock); 1502 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1503 res->hr_primary_localcnt = 1504 res->hr_secondary_localcnt; 1505 res->hr_primary_remotecnt = 1506 res->hr_secondary_remotecnt; 1507 pjdlog_debug(1, 1508 "Setting localcnt to %ju and remotecnt to %ju.", 1509 (uintmax_t)res->hr_primary_localcnt, 1510 (uintmax_t)res->hr_secondary_localcnt); 1511 (void)metadata_write(res); 1512 mtx_unlock(&metadata_lock); 1513 } else if (synced > 0) { 1514 pjdlog_info("Synchronization interrupted. " 1515 "%jd bytes synchronized so far.", 1516 (intmax_t)synced); 1517 } 1518 rw_unlock(&hio_remote_lock[ncomp]); 1519 continue; 1520 } 1521 pjdlog_debug(2, "sync: Taking free request."); 1522 QUEUE_TAKE2(hio, free); 1523 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1524 /* 1525 * Lock the range we are going to synchronize. We don't want 1526 * race where someone writes between our read and write. 1527 */ 1528 for (;;) { 1529 mtx_lock(&range_lock); 1530 if (rangelock_islocked(range_regular, offset, length)) { 1531 pjdlog_debug(2, 1532 "sync: Range offset=%jd length=%jd locked.", 1533 (intmax_t)offset, (intmax_t)length); 1534 range_sync_wait = true; 1535 cv_wait(&range_sync_cond, &range_lock); 1536 range_sync_wait = false; 1537 mtx_unlock(&range_lock); 1538 continue; 1539 } 1540 if (rangelock_add(range_sync, offset, length) < 0) { 1541 mtx_unlock(&range_lock); 1542 pjdlog_debug(2, 1543 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1544 (intmax_t)offset, (intmax_t)length); 1545 sleep(1); 1546 continue; 1547 } 1548 mtx_unlock(&range_lock); 1549 break; 1550 } 1551 /* 1552 * First read the data from synchronization source. 1553 */ 1554 SYNCREQ(hio); 1555 ggio = &hio->hio_ggio; 1556 ggio->gctl_cmd = BIO_READ; 1557 ggio->gctl_offset = offset; 1558 ggio->gctl_length = length; 1559 ggio->gctl_error = 0; 1560 for (ii = 0; ii < ncomps; ii++) 1561 hio->hio_errors[ii] = EINVAL; 1562 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1563 hio); 1564 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1565 hio); 1566 mtx_lock(&metadata_lock); 1567 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1568 /* 1569 * This range is up-to-date on local component, 1570 * so handle request locally. 1571 */ 1572 /* Local component is 0 for now. */ 1573 ncomp = 0; 1574 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1575 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1576 /* 1577 * This range is out-of-date on local component, 1578 * so send request to the remote node. 1579 */ 1580 /* Remote component is 1 for now. */ 1581 ncomp = 1; 1582 } 1583 mtx_unlock(&metadata_lock); 1584 refcount_init(&hio->hio_countdown, 1); 1585 QUEUE_INSERT1(hio, send, ncomp); 1586 1587 /* 1588 * Let's wait for READ to finish. 1589 */ 1590 mtx_lock(&sync_lock); 1591 while (!ISSYNCREQDONE(hio)) 1592 cv_wait(&sync_cond, &sync_lock); 1593 mtx_unlock(&sync_lock); 1594 1595 if (hio->hio_errors[ncomp] != 0) { 1596 pjdlog_error("Unable to read synchronization data: %s.", 1597 strerror(hio->hio_errors[ncomp])); 1598 goto free_queue; 1599 } 1600 1601 /* 1602 * We read the data from synchronization source, now write it 1603 * to synchronization target. 1604 */ 1605 SYNCREQ(hio); 1606 ggio->gctl_cmd = BIO_WRITE; 1607 for (ii = 0; ii < ncomps; ii++) 1608 hio->hio_errors[ii] = EINVAL; 1609 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1610 hio); 1611 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1612 hio); 1613 mtx_lock(&metadata_lock); 1614 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1615 /* 1616 * This range is up-to-date on local component, 1617 * so we update remote component. 1618 */ 1619 /* Remote component is 1 for now. */ 1620 ncomp = 1; 1621 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1622 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1623 /* 1624 * This range is out-of-date on local component, 1625 * so we update it. 1626 */ 1627 /* Local component is 0 for now. */ 1628 ncomp = 0; 1629 } 1630 mtx_unlock(&metadata_lock); 1631 1632 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1633 hio); 1634 refcount_init(&hio->hio_countdown, 1); 1635 QUEUE_INSERT1(hio, send, ncomp); 1636 1637 /* 1638 * Let's wait for WRITE to finish. 1639 */ 1640 mtx_lock(&sync_lock); 1641 while (!ISSYNCREQDONE(hio)) 1642 cv_wait(&sync_cond, &sync_lock); 1643 mtx_unlock(&sync_lock); 1644 1645 if (hio->hio_errors[ncomp] != 0) { 1646 pjdlog_error("Unable to write synchronization data: %s.", 1647 strerror(hio->hio_errors[ncomp])); 1648 goto free_queue; 1649 } 1650 free_queue: 1651 mtx_lock(&range_lock); 1652 rangelock_del(range_sync, offset, length); 1653 if (range_regular_wait) 1654 cv_signal(&range_regular_cond); 1655 mtx_unlock(&range_lock); 1656 1657 synced += length; 1658 1659 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1660 hio); 1661 QUEUE_INSERT2(hio, free); 1662 } 1663 /* NOTREACHED */ 1664 return (NULL); 1665 } 1666 1667 static void 1668 sighandler(int sig) 1669 { 1670 bool unlock; 1671 1672 switch (sig) { 1673 case SIGINT: 1674 case SIGTERM: 1675 sigexit_received = true; 1676 break; 1677 default: 1678 assert(!"invalid condition"); 1679 } 1680 /* 1681 * XXX: Racy, but if we cannot obtain hio_guard_lock here, we don't 1682 * want to risk deadlock. 1683 */ 1684 unlock = mtx_trylock(&hio_guard_lock); 1685 cv_signal(&hio_guard_cond); 1686 if (unlock) 1687 mtx_unlock(&hio_guard_lock); 1688 } 1689 1690 /* 1691 * Thread guards remote connections and reconnects when needed, handles 1692 * signals, etc. 1693 */ 1694 static void * 1695 guard_thread(void *arg) 1696 { 1697 struct hast_resource *res = arg; 1698 unsigned int ii, ncomps; 1699 int timeout; 1700 1701 ncomps = HAST_NCOMPONENTS; 1702 /* The is only one remote component for now. */ 1703 #define ISREMOTE(no) ((no) == 1) 1704 1705 for (;;) { 1706 if (sigexit_received) { 1707 primary_exitx(EX_OK, 1708 "Termination signal received, exiting."); 1709 } 1710 /* 1711 * If all the connection will be fine, we will sleep until 1712 * someone wakes us up. 1713 * If any of the connections will be broken and we won't be 1714 * able to connect, we will sleep only for RECONNECT_SLEEP 1715 * seconds so we can retry soon. 1716 */ 1717 timeout = 0; 1718 pjdlog_debug(2, "remote_guard: Checking connections."); 1719 mtx_lock(&hio_guard_lock); 1720 for (ii = 0; ii < ncomps; ii++) { 1721 if (!ISREMOTE(ii)) 1722 continue; 1723 rw_rlock(&hio_remote_lock[ii]); 1724 if (ISCONNECTED(res, ii)) { 1725 assert(res->hr_remotein != NULL); 1726 assert(res->hr_remoteout != NULL); 1727 rw_unlock(&hio_remote_lock[ii]); 1728 pjdlog_debug(2, 1729 "remote_guard: Connection to %s is ok.", 1730 res->hr_remoteaddr); 1731 } else { 1732 assert(res->hr_remotein == NULL); 1733 assert(res->hr_remoteout == NULL); 1734 /* 1735 * Upgrade the lock. It doesn't have to be 1736 * atomic as no other thread can change 1737 * connection status from disconnected to 1738 * connected. 1739 */ 1740 rw_unlock(&hio_remote_lock[ii]); 1741 rw_wlock(&hio_remote_lock[ii]); 1742 assert(res->hr_remotein == NULL); 1743 assert(res->hr_remoteout == NULL); 1744 pjdlog_debug(2, 1745 "remote_guard: Reconnecting to %s.", 1746 res->hr_remoteaddr); 1747 init_remote(res); 1748 if (ISCONNECTED(res, ii)) { 1749 pjdlog_info("Successfully reconnected to %s.", 1750 res->hr_remoteaddr); 1751 } else { 1752 /* Both connections should be NULL. */ 1753 assert(res->hr_remotein == NULL); 1754 assert(res->hr_remoteout == NULL); 1755 pjdlog_debug(2, 1756 "remote_guard: Reconnect to %s failed.", 1757 res->hr_remoteaddr); 1758 timeout = RECONNECT_SLEEP; 1759 } 1760 rw_unlock(&hio_remote_lock[ii]); 1761 } 1762 } 1763 (void)cv_timedwait(&hio_guard_cond, &hio_guard_lock, timeout); 1764 mtx_unlock(&hio_guard_lock); 1765 } 1766 #undef ISREMOTE 1767 /* NOTREACHED */ 1768 return (NULL); 1769 } 1770