1 /*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/types.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/stat.h> 39 40 #include <geom/gate/g_gate.h> 41 42 #include <err.h> 43 #include <errno.h> 44 #include <fcntl.h> 45 #include <libgeom.h> 46 #include <pthread.h> 47 #include <signal.h> 48 #include <stdint.h> 49 #include <stdio.h> 50 #include <string.h> 51 #include <sysexits.h> 52 #include <unistd.h> 53 54 #include <activemap.h> 55 #include <nv.h> 56 #include <rangelock.h> 57 58 #include "control.h" 59 #include "event.h" 60 #include "hast.h" 61 #include "hast_proto.h" 62 #include "hastd.h" 63 #include "hooks.h" 64 #include "metadata.h" 65 #include "proto.h" 66 #include "pjdlog.h" 67 #include "refcnt.h" 68 #include "subr.h" 69 #include "synch.h" 70 71 /* The is only one remote component for now. */ 72 #define ISREMOTE(no) ((no) == 1) 73 74 struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 refcnt_t hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 /* 93 * Request was already confirmed to GEOM Gate. 94 */ 95 bool hio_done; 96 /* 97 * Number of components we are still waiting before sending write 98 * completion ack to GEOM Gate. Used for memsync. 99 */ 100 refcnt_t hio_writecount; 101 /* 102 * Memsync request was acknowleged by remote. 103 */ 104 bool hio_memsyncacked; 105 /* 106 * Remember replication from the time the request was initiated, 107 * so we won't get confused when replication changes on reload. 108 */ 109 int hio_replication; 110 TAILQ_ENTRY(hio) *hio_next; 111 }; 112 #define hio_free_next hio_next[0] 113 #define hio_done_next hio_next[0] 114 115 /* 116 * Free list holds unused structures. When free list is empty, we have to wait 117 * until some in-progress requests are freed. 118 */ 119 static TAILQ_HEAD(, hio) hio_free_list; 120 static size_t hio_free_list_size; 121 static pthread_mutex_t hio_free_list_lock; 122 static pthread_cond_t hio_free_list_cond; 123 /* 124 * There is one send list for every component. One requests is placed on all 125 * send lists - each component gets the same request, but each component is 126 * responsible for managing his own send list. 127 */ 128 static TAILQ_HEAD(, hio) *hio_send_list; 129 static size_t *hio_send_list_size; 130 static pthread_mutex_t *hio_send_list_lock; 131 static pthread_cond_t *hio_send_list_cond; 132 #define hio_send_local_list_size hio_send_list_size[0] 133 #define hio_send_remote_list_size hio_send_list_size[1] 134 /* 135 * There is one recv list for every component, although local components don't 136 * use recv lists as local requests are done synchronously. 137 */ 138 static TAILQ_HEAD(, hio) *hio_recv_list; 139 static size_t *hio_recv_list_size; 140 static pthread_mutex_t *hio_recv_list_lock; 141 static pthread_cond_t *hio_recv_list_cond; 142 #define hio_recv_remote_list_size hio_recv_list_size[1] 143 /* 144 * Request is placed on done list by the slowest component (the one that 145 * decreased hio_countdown from 1 to 0). 146 */ 147 static TAILQ_HEAD(, hio) hio_done_list; 148 static size_t hio_done_list_size; 149 static pthread_mutex_t hio_done_list_lock; 150 static pthread_cond_t hio_done_list_cond; 151 /* 152 * Structure below are for interaction with sync thread. 153 */ 154 static bool sync_inprogress; 155 static pthread_mutex_t sync_lock; 156 static pthread_cond_t sync_cond; 157 /* 158 * The lock below allows to synchornize access to remote connections. 159 */ 160 static pthread_rwlock_t *hio_remote_lock; 161 162 /* 163 * Lock to synchronize metadata updates. Also synchronize access to 164 * hr_primary_localcnt and hr_primary_remotecnt fields. 165 */ 166 static pthread_mutex_t metadata_lock; 167 168 /* 169 * Maximum number of outstanding I/O requests. 170 */ 171 #define HAST_HIO_MAX 256 172 /* 173 * Number of components. At this point there are only two components: local 174 * and remote, but in the future it might be possible to use multiple local 175 * and remote components. 176 */ 177 #define HAST_NCOMPONENTS 2 178 179 #define ISCONNECTED(res, no) \ 180 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 181 182 #define QUEUE_INSERT1(hio, name, ncomp) do { \ 183 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 184 if (TAILQ_EMPTY(&hio_##name##_list[(ncomp)])) \ 185 cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \ 186 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 187 hio_next[(ncomp)]); \ 188 hio_##name##_list_size[(ncomp)]++; \ 189 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 190 } while (0) 191 #define QUEUE_INSERT2(hio, name) do { \ 192 mtx_lock(&hio_##name##_list_lock); \ 193 if (TAILQ_EMPTY(&hio_##name##_list)) \ 194 cv_broadcast(&hio_##name##_list_cond); \ 195 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 196 hio_##name##_list_size++; \ 197 mtx_unlock(&hio_##name##_list_lock); \ 198 } while (0) 199 #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 200 bool _last; \ 201 \ 202 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 203 _last = false; \ 204 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 205 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 206 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 207 if ((timeout) != 0) \ 208 _last = true; \ 209 } \ 210 if (hio != NULL) { \ 211 PJDLOG_ASSERT(hio_##name##_list_size[(ncomp)] != 0); \ 212 hio_##name##_list_size[(ncomp)]--; \ 213 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 214 hio_next[(ncomp)]); \ 215 } \ 216 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 217 } while (0) 218 #define QUEUE_TAKE2(hio, name) do { \ 219 mtx_lock(&hio_##name##_list_lock); \ 220 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 221 cv_wait(&hio_##name##_list_cond, \ 222 &hio_##name##_list_lock); \ 223 } \ 224 PJDLOG_ASSERT(hio_##name##_list_size != 0); \ 225 hio_##name##_list_size--; \ 226 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 227 mtx_unlock(&hio_##name##_list_lock); \ 228 } while (0) 229 230 #define ISFULLSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_FULLSYNC) 231 #define ISMEMSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_MEMSYNC) 232 #define ISASYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_ASYNC) 233 234 #define SYNCREQ(hio) do { \ 235 (hio)->hio_ggio.gctl_unit = -1; \ 236 (hio)->hio_ggio.gctl_seq = 1; \ 237 } while (0) 238 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 239 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 240 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 241 242 #define ISMEMSYNCWRITE(hio) (ISMEMSYNC(hio) && \ 243 (hio)->hio_ggio.gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) 244 245 static struct hast_resource *gres; 246 247 static pthread_mutex_t range_lock; 248 static struct rangelocks *range_regular; 249 static bool range_regular_wait; 250 static pthread_cond_t range_regular_cond; 251 static struct rangelocks *range_sync; 252 static bool range_sync_wait; 253 static pthread_cond_t range_sync_cond; 254 static bool fullystarted; 255 256 static void *ggate_recv_thread(void *arg); 257 static void *local_send_thread(void *arg); 258 static void *remote_send_thread(void *arg); 259 static void *remote_recv_thread(void *arg); 260 static void *ggate_send_thread(void *arg); 261 static void *sync_thread(void *arg); 262 static void *guard_thread(void *arg); 263 264 static void 265 output_status_aux(struct nv *nvout) 266 { 267 268 nv_add_uint64(nvout, (uint64_t)hio_free_list_size, 269 "idle_queue_size"); 270 nv_add_uint64(nvout, (uint64_t)hio_send_local_list_size, 271 "local_queue_size"); 272 nv_add_uint64(nvout, (uint64_t)hio_send_remote_list_size, 273 "send_queue_size"); 274 nv_add_uint64(nvout, (uint64_t)hio_recv_remote_list_size, 275 "recv_queue_size"); 276 nv_add_uint64(nvout, (uint64_t)hio_done_list_size, 277 "done_queue_size"); 278 } 279 280 static void 281 cleanup(struct hast_resource *res) 282 { 283 int rerrno; 284 285 /* Remember errno. */ 286 rerrno = errno; 287 288 /* Destroy ggate provider if we created one. */ 289 if (res->hr_ggateunit >= 0) { 290 struct g_gate_ctl_destroy ggiod; 291 292 bzero(&ggiod, sizeof(ggiod)); 293 ggiod.gctl_version = G_GATE_VERSION; 294 ggiod.gctl_unit = res->hr_ggateunit; 295 ggiod.gctl_force = 1; 296 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 297 pjdlog_errno(LOG_WARNING, 298 "Unable to destroy hast/%s device", 299 res->hr_provname); 300 } 301 res->hr_ggateunit = -1; 302 } 303 304 /* Restore errno. */ 305 errno = rerrno; 306 } 307 308 static __dead2 void 309 primary_exit(int exitcode, const char *fmt, ...) 310 { 311 va_list ap; 312 313 PJDLOG_ASSERT(exitcode != EX_OK); 314 va_start(ap, fmt); 315 pjdlogv_errno(LOG_ERR, fmt, ap); 316 va_end(ap); 317 cleanup(gres); 318 exit(exitcode); 319 } 320 321 static __dead2 void 322 primary_exitx(int exitcode, const char *fmt, ...) 323 { 324 va_list ap; 325 326 va_start(ap, fmt); 327 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 328 va_end(ap); 329 cleanup(gres); 330 exit(exitcode); 331 } 332 333 /* Expects res->hr_amp locked, returns unlocked. */ 334 static int 335 hast_activemap_flush(struct hast_resource *res) 336 { 337 const unsigned char *buf; 338 size_t size; 339 int ret; 340 341 mtx_lock(&res->hr_amp_diskmap_lock); 342 buf = activemap_bitmap(res->hr_amp, &size); 343 mtx_unlock(&res->hr_amp_lock); 344 PJDLOG_ASSERT(buf != NULL); 345 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 346 ret = 0; 347 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 348 (ssize_t)size) { 349 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 350 res->hr_stat_activemap_write_error++; 351 ret = -1; 352 } 353 if (ret == 0 && res->hr_metaflush == 1 && 354 g_flush(res->hr_localfd) == -1) { 355 if (errno == EOPNOTSUPP) { 356 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 357 res->hr_localpath); 358 res->hr_metaflush = 0; 359 } else { 360 pjdlog_errno(LOG_ERR, 361 "Unable to flush disk cache on activemap update"); 362 res->hr_stat_activemap_flush_error++; 363 ret = -1; 364 } 365 } 366 mtx_unlock(&res->hr_amp_diskmap_lock); 367 return (ret); 368 } 369 370 static bool 371 real_remote(const struct hast_resource *res) 372 { 373 374 return (strcmp(res->hr_remoteaddr, "none") != 0); 375 } 376 377 static void 378 init_environment(struct hast_resource *res __unused) 379 { 380 struct hio *hio; 381 unsigned int ii, ncomps; 382 383 /* 384 * In the future it might be per-resource value. 385 */ 386 ncomps = HAST_NCOMPONENTS; 387 388 /* 389 * Allocate memory needed by lists. 390 */ 391 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 392 if (hio_send_list == NULL) { 393 primary_exitx(EX_TEMPFAIL, 394 "Unable to allocate %zu bytes of memory for send lists.", 395 sizeof(hio_send_list[0]) * ncomps); 396 } 397 hio_send_list_size = malloc(sizeof(hio_send_list_size[0]) * ncomps); 398 if (hio_send_list_size == NULL) { 399 primary_exitx(EX_TEMPFAIL, 400 "Unable to allocate %zu bytes of memory for send list counters.", 401 sizeof(hio_send_list_size[0]) * ncomps); 402 } 403 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 404 if (hio_send_list_lock == NULL) { 405 primary_exitx(EX_TEMPFAIL, 406 "Unable to allocate %zu bytes of memory for send list locks.", 407 sizeof(hio_send_list_lock[0]) * ncomps); 408 } 409 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 410 if (hio_send_list_cond == NULL) { 411 primary_exitx(EX_TEMPFAIL, 412 "Unable to allocate %zu bytes of memory for send list condition variables.", 413 sizeof(hio_send_list_cond[0]) * ncomps); 414 } 415 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 416 if (hio_recv_list == NULL) { 417 primary_exitx(EX_TEMPFAIL, 418 "Unable to allocate %zu bytes of memory for recv lists.", 419 sizeof(hio_recv_list[0]) * ncomps); 420 } 421 hio_recv_list_size = malloc(sizeof(hio_recv_list_size[0]) * ncomps); 422 if (hio_recv_list_size == NULL) { 423 primary_exitx(EX_TEMPFAIL, 424 "Unable to allocate %zu bytes of memory for recv list counters.", 425 sizeof(hio_recv_list_size[0]) * ncomps); 426 } 427 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 428 if (hio_recv_list_lock == NULL) { 429 primary_exitx(EX_TEMPFAIL, 430 "Unable to allocate %zu bytes of memory for recv list locks.", 431 sizeof(hio_recv_list_lock[0]) * ncomps); 432 } 433 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 434 if (hio_recv_list_cond == NULL) { 435 primary_exitx(EX_TEMPFAIL, 436 "Unable to allocate %zu bytes of memory for recv list condition variables.", 437 sizeof(hio_recv_list_cond[0]) * ncomps); 438 } 439 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 440 if (hio_remote_lock == NULL) { 441 primary_exitx(EX_TEMPFAIL, 442 "Unable to allocate %zu bytes of memory for remote connections locks.", 443 sizeof(hio_remote_lock[0]) * ncomps); 444 } 445 446 /* 447 * Initialize lists, their counters, locks and condition variables. 448 */ 449 TAILQ_INIT(&hio_free_list); 450 mtx_init(&hio_free_list_lock); 451 cv_init(&hio_free_list_cond); 452 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 453 TAILQ_INIT(&hio_send_list[ii]); 454 hio_send_list_size[ii] = 0; 455 mtx_init(&hio_send_list_lock[ii]); 456 cv_init(&hio_send_list_cond[ii]); 457 TAILQ_INIT(&hio_recv_list[ii]); 458 hio_recv_list_size[ii] = 0; 459 mtx_init(&hio_recv_list_lock[ii]); 460 cv_init(&hio_recv_list_cond[ii]); 461 rw_init(&hio_remote_lock[ii]); 462 } 463 TAILQ_INIT(&hio_done_list); 464 mtx_init(&hio_done_list_lock); 465 cv_init(&hio_done_list_cond); 466 mtx_init(&metadata_lock); 467 468 /* 469 * Allocate requests pool and initialize requests. 470 */ 471 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 472 hio = malloc(sizeof(*hio)); 473 if (hio == NULL) { 474 primary_exitx(EX_TEMPFAIL, 475 "Unable to allocate %zu bytes of memory for hio request.", 476 sizeof(*hio)); 477 } 478 refcnt_init(&hio->hio_countdown, 0); 479 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 480 if (hio->hio_errors == NULL) { 481 primary_exitx(EX_TEMPFAIL, 482 "Unable allocate %zu bytes of memory for hio errors.", 483 sizeof(hio->hio_errors[0]) * ncomps); 484 } 485 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 486 if (hio->hio_next == NULL) { 487 primary_exitx(EX_TEMPFAIL, 488 "Unable allocate %zu bytes of memory for hio_next field.", 489 sizeof(hio->hio_next[0]) * ncomps); 490 } 491 hio->hio_ggio.gctl_version = G_GATE_VERSION; 492 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 493 if (hio->hio_ggio.gctl_data == NULL) { 494 primary_exitx(EX_TEMPFAIL, 495 "Unable to allocate %zu bytes of memory for gctl_data.", 496 MAXPHYS); 497 } 498 hio->hio_ggio.gctl_length = MAXPHYS; 499 hio->hio_ggio.gctl_error = 0; 500 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 501 hio_free_list_size++; 502 } 503 } 504 505 static bool 506 init_resuid(struct hast_resource *res) 507 { 508 509 mtx_lock(&metadata_lock); 510 if (res->hr_resuid != 0) { 511 mtx_unlock(&metadata_lock); 512 return (false); 513 } else { 514 /* Initialize unique resource identifier. */ 515 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 516 mtx_unlock(&metadata_lock); 517 if (metadata_write(res) == -1) 518 exit(EX_NOINPUT); 519 return (true); 520 } 521 } 522 523 static void 524 init_local(struct hast_resource *res) 525 { 526 unsigned char *buf; 527 size_t mapsize; 528 529 if (metadata_read(res, true) == -1) 530 exit(EX_NOINPUT); 531 mtx_init(&res->hr_amp_lock); 532 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 533 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 534 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 535 } 536 mtx_init(&range_lock); 537 cv_init(&range_regular_cond); 538 if (rangelock_init(&range_regular) == -1) 539 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 540 cv_init(&range_sync_cond); 541 if (rangelock_init(&range_sync) == -1) 542 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 543 mapsize = activemap_ondisk_size(res->hr_amp); 544 buf = calloc(1, mapsize); 545 if (buf == NULL) { 546 primary_exitx(EX_TEMPFAIL, 547 "Unable to allocate buffer for activemap."); 548 } 549 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 550 (ssize_t)mapsize) { 551 primary_exit(EX_NOINPUT, "Unable to read activemap"); 552 } 553 activemap_copyin(res->hr_amp, buf, mapsize); 554 free(buf); 555 if (res->hr_resuid != 0) 556 return; 557 /* 558 * We're using provider for the first time. Initialize local and remote 559 * counters. We don't initialize resuid here, as we want to do it just 560 * in time. The reason for this is that we want to inform secondary 561 * that there were no writes yet, so there is no need to synchronize 562 * anything. 563 */ 564 res->hr_primary_localcnt = 0; 565 res->hr_primary_remotecnt = 0; 566 if (metadata_write(res) == -1) 567 exit(EX_NOINPUT); 568 } 569 570 static int 571 primary_connect(struct hast_resource *res, struct proto_conn **connp) 572 { 573 struct proto_conn *conn; 574 int16_t val; 575 576 val = 1; 577 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 578 primary_exit(EX_TEMPFAIL, 579 "Unable to send connection request to parent"); 580 } 581 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 582 primary_exit(EX_TEMPFAIL, 583 "Unable to receive reply to connection request from parent"); 584 } 585 if (val != 0) { 586 errno = val; 587 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 588 res->hr_remoteaddr); 589 return (-1); 590 } 591 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 592 primary_exit(EX_TEMPFAIL, 593 "Unable to receive connection from parent"); 594 } 595 if (proto_connect_wait(conn, res->hr_timeout) == -1) { 596 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 597 res->hr_remoteaddr); 598 proto_close(conn); 599 return (-1); 600 } 601 /* Error in setting timeout is not critical, but why should it fail? */ 602 if (proto_timeout(conn, res->hr_timeout) == -1) 603 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 604 605 *connp = conn; 606 607 return (0); 608 } 609 610 /* 611 * Function instructs GEOM_GATE to handle reads directly from within the kernel. 612 */ 613 static void 614 enable_direct_reads(struct hast_resource *res) 615 { 616 struct g_gate_ctl_modify ggiomodify; 617 618 bzero(&ggiomodify, sizeof(ggiomodify)); 619 ggiomodify.gctl_version = G_GATE_VERSION; 620 ggiomodify.gctl_unit = res->hr_ggateunit; 621 ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; 622 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, 623 sizeof(ggiomodify.gctl_readprov)); 624 ggiomodify.gctl_readoffset = res->hr_localoff; 625 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) 626 pjdlog_debug(1, "Direct reads enabled."); 627 else 628 pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); 629 } 630 631 static int 632 init_remote(struct hast_resource *res, struct proto_conn **inp, 633 struct proto_conn **outp) 634 { 635 struct proto_conn *in, *out; 636 struct nv *nvout, *nvin; 637 const unsigned char *token; 638 unsigned char *map; 639 const char *errmsg; 640 int32_t extentsize; 641 int64_t datasize; 642 uint32_t mapsize; 643 uint8_t version; 644 size_t size; 645 int error; 646 647 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 648 PJDLOG_ASSERT(real_remote(res)); 649 650 in = out = NULL; 651 errmsg = NULL; 652 653 if (primary_connect(res, &out) == -1) 654 return (ECONNREFUSED); 655 656 error = ECONNABORTED; 657 658 /* 659 * First handshake step. 660 * Setup outgoing connection with remote node. 661 */ 662 nvout = nv_alloc(); 663 nv_add_string(nvout, res->hr_name, "resource"); 664 nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); 665 if (nv_error(nvout) != 0) { 666 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 667 "Unable to allocate header for connection with %s", 668 res->hr_remoteaddr); 669 nv_free(nvout); 670 goto close; 671 } 672 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 673 pjdlog_errno(LOG_WARNING, 674 "Unable to send handshake header to %s", 675 res->hr_remoteaddr); 676 nv_free(nvout); 677 goto close; 678 } 679 nv_free(nvout); 680 if (hast_proto_recv_hdr(out, &nvin) == -1) { 681 pjdlog_errno(LOG_WARNING, 682 "Unable to receive handshake header from %s", 683 res->hr_remoteaddr); 684 goto close; 685 } 686 errmsg = nv_get_string(nvin, "errmsg"); 687 if (errmsg != NULL) { 688 pjdlog_warning("%s", errmsg); 689 if (nv_exists(nvin, "wait")) 690 error = EBUSY; 691 nv_free(nvin); 692 goto close; 693 } 694 version = nv_get_uint8(nvin, "version"); 695 if (version == 0) { 696 /* 697 * If no version is sent, it means this is protocol version 1. 698 */ 699 version = 1; 700 } 701 if (version > HAST_PROTO_VERSION) { 702 pjdlog_warning("Invalid version received (%hhu).", version); 703 nv_free(nvin); 704 goto close; 705 } 706 res->hr_version = version; 707 pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); 708 token = nv_get_uint8_array(nvin, &size, "token"); 709 if (token == NULL) { 710 pjdlog_warning("Handshake header from %s has no 'token' field.", 711 res->hr_remoteaddr); 712 nv_free(nvin); 713 goto close; 714 } 715 if (size != sizeof(res->hr_token)) { 716 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 717 res->hr_remoteaddr, size, sizeof(res->hr_token)); 718 nv_free(nvin); 719 goto close; 720 } 721 bcopy(token, res->hr_token, sizeof(res->hr_token)); 722 nv_free(nvin); 723 724 /* 725 * Second handshake step. 726 * Setup incoming connection with remote node. 727 */ 728 if (primary_connect(res, &in) == -1) 729 goto close; 730 731 nvout = nv_alloc(); 732 nv_add_string(nvout, res->hr_name, "resource"); 733 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 734 "token"); 735 if (res->hr_resuid == 0) { 736 /* 737 * The resuid field was not yet initialized. 738 * Because we do synchronization inside init_resuid(), it is 739 * possible that someone already initialized it, the function 740 * will return false then, but if we successfully initialized 741 * it, we will get true. True means that there were no writes 742 * to this resource yet and we want to inform secondary that 743 * synchronization is not needed by sending "virgin" argument. 744 */ 745 if (init_resuid(res)) 746 nv_add_int8(nvout, 1, "virgin"); 747 } 748 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 749 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 750 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 751 if (nv_error(nvout) != 0) { 752 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 753 "Unable to allocate header for connection with %s", 754 res->hr_remoteaddr); 755 nv_free(nvout); 756 goto close; 757 } 758 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 759 pjdlog_errno(LOG_WARNING, 760 "Unable to send handshake header to %s", 761 res->hr_remoteaddr); 762 nv_free(nvout); 763 goto close; 764 } 765 nv_free(nvout); 766 if (hast_proto_recv_hdr(out, &nvin) == -1) { 767 pjdlog_errno(LOG_WARNING, 768 "Unable to receive handshake header from %s", 769 res->hr_remoteaddr); 770 goto close; 771 } 772 errmsg = nv_get_string(nvin, "errmsg"); 773 if (errmsg != NULL) { 774 pjdlog_warning("%s", errmsg); 775 nv_free(nvin); 776 goto close; 777 } 778 datasize = nv_get_int64(nvin, "datasize"); 779 if (datasize != res->hr_datasize) { 780 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 781 (intmax_t)res->hr_datasize, (intmax_t)datasize); 782 nv_free(nvin); 783 goto close; 784 } 785 extentsize = nv_get_int32(nvin, "extentsize"); 786 if (extentsize != res->hr_extentsize) { 787 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 788 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 789 nv_free(nvin); 790 goto close; 791 } 792 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 793 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 794 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 795 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) 796 enable_direct_reads(res); 797 if (nv_exists(nvin, "virgin")) { 798 /* 799 * Secondary was reinitialized, bump localcnt if it is 0 as 800 * only we have the data. 801 */ 802 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 803 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 804 805 if (res->hr_primary_localcnt == 0) { 806 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 807 808 mtx_lock(&metadata_lock); 809 res->hr_primary_localcnt++; 810 pjdlog_debug(1, "Increasing localcnt to %ju.", 811 (uintmax_t)res->hr_primary_localcnt); 812 (void)metadata_write(res); 813 mtx_unlock(&metadata_lock); 814 } 815 } 816 map = NULL; 817 mapsize = nv_get_uint32(nvin, "mapsize"); 818 if (mapsize > 0) { 819 map = malloc(mapsize); 820 if (map == NULL) { 821 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 822 (uintmax_t)mapsize); 823 nv_free(nvin); 824 goto close; 825 } 826 /* 827 * Remote node have some dirty extents on its own, lets 828 * download its activemap. 829 */ 830 if (hast_proto_recv_data(res, out, nvin, map, 831 mapsize) == -1) { 832 pjdlog_errno(LOG_ERR, 833 "Unable to receive remote activemap"); 834 nv_free(nvin); 835 free(map); 836 goto close; 837 } 838 mtx_lock(&res->hr_amp_lock); 839 /* 840 * Merge local and remote bitmaps. 841 */ 842 activemap_merge(res->hr_amp, map, mapsize); 843 free(map); 844 /* 845 * Now that we merged bitmaps from both nodes, flush it to the 846 * disk before we start to synchronize. 847 */ 848 (void)hast_activemap_flush(res); 849 } 850 nv_free(nvin); 851 #ifdef notyet 852 /* Setup directions. */ 853 if (proto_send(out, NULL, 0) == -1) 854 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 855 if (proto_recv(in, NULL, 0) == -1) 856 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 857 #endif 858 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 859 if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && 860 res->hr_version < 2) { 861 pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); 862 res->hr_replication = HAST_REPLICATION_FULLSYNC; 863 } else if (res->hr_replication != res->hr_original_replication) { 864 /* 865 * This is in case hastd disconnected and was upgraded. 866 */ 867 res->hr_replication = res->hr_original_replication; 868 } 869 if (inp != NULL && outp != NULL) { 870 *inp = in; 871 *outp = out; 872 } else { 873 res->hr_remotein = in; 874 res->hr_remoteout = out; 875 } 876 event_send(res, EVENT_CONNECT); 877 return (0); 878 close: 879 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 880 event_send(res, EVENT_SPLITBRAIN); 881 proto_close(out); 882 if (in != NULL) 883 proto_close(in); 884 return (error); 885 } 886 887 static void 888 sync_start(void) 889 { 890 891 mtx_lock(&sync_lock); 892 sync_inprogress = true; 893 mtx_unlock(&sync_lock); 894 cv_signal(&sync_cond); 895 } 896 897 static void 898 sync_stop(void) 899 { 900 901 mtx_lock(&sync_lock); 902 if (sync_inprogress) 903 sync_inprogress = false; 904 mtx_unlock(&sync_lock); 905 } 906 907 static void 908 init_ggate(struct hast_resource *res) 909 { 910 struct g_gate_ctl_create ggiocreate; 911 struct g_gate_ctl_cancel ggiocancel; 912 913 /* 914 * We communicate with ggate via /dev/ggctl. Open it. 915 */ 916 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 917 if (res->hr_ggatefd == -1) 918 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 919 /* 920 * Create provider before trying to connect, as connection failure 921 * is not critical, but may take some time. 922 */ 923 bzero(&ggiocreate, sizeof(ggiocreate)); 924 ggiocreate.gctl_version = G_GATE_VERSION; 925 ggiocreate.gctl_mediasize = res->hr_datasize; 926 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 927 ggiocreate.gctl_flags = 0; 928 ggiocreate.gctl_maxcount = 0; 929 ggiocreate.gctl_timeout = 0; 930 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 931 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 932 res->hr_provname); 933 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 934 pjdlog_info("Device hast/%s created.", res->hr_provname); 935 res->hr_ggateunit = ggiocreate.gctl_unit; 936 return; 937 } 938 if (errno != EEXIST) { 939 primary_exit(EX_OSERR, "Unable to create hast/%s device", 940 res->hr_provname); 941 } 942 pjdlog_debug(1, 943 "Device hast/%s already exists, we will try to take it over.", 944 res->hr_provname); 945 /* 946 * If we received EEXIST, we assume that the process who created the 947 * provider died and didn't clean up. In that case we will start from 948 * where he left of. 949 */ 950 bzero(&ggiocancel, sizeof(ggiocancel)); 951 ggiocancel.gctl_version = G_GATE_VERSION; 952 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 953 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 954 res->hr_provname); 955 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 956 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 957 res->hr_ggateunit = ggiocancel.gctl_unit; 958 return; 959 } 960 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 961 res->hr_provname); 962 } 963 964 void 965 hastd_primary(struct hast_resource *res) 966 { 967 pthread_t td; 968 pid_t pid; 969 int error, mode, debuglevel; 970 971 /* 972 * Create communication channel for sending control commands from 973 * parent to child. 974 */ 975 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 976 /* TODO: There's no need for this to be fatal error. */ 977 KEEP_ERRNO((void)pidfile_remove(pfh)); 978 pjdlog_exit(EX_OSERR, 979 "Unable to create control sockets between parent and child"); 980 } 981 /* 982 * Create communication channel for sending events from child to parent. 983 */ 984 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 985 /* TODO: There's no need for this to be fatal error. */ 986 KEEP_ERRNO((void)pidfile_remove(pfh)); 987 pjdlog_exit(EX_OSERR, 988 "Unable to create event sockets between child and parent"); 989 } 990 /* 991 * Create communication channel for sending connection requests from 992 * child to parent. 993 */ 994 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 995 /* TODO: There's no need for this to be fatal error. */ 996 KEEP_ERRNO((void)pidfile_remove(pfh)); 997 pjdlog_exit(EX_OSERR, 998 "Unable to create connection sockets between child and parent"); 999 } 1000 1001 pid = fork(); 1002 if (pid == -1) { 1003 /* TODO: There's no need for this to be fatal error. */ 1004 KEEP_ERRNO((void)pidfile_remove(pfh)); 1005 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 1006 } 1007 1008 if (pid > 0) { 1009 /* This is parent. */ 1010 /* Declare that we are receiver. */ 1011 proto_recv(res->hr_event, NULL, 0); 1012 proto_recv(res->hr_conn, NULL, 0); 1013 /* Declare that we are sender. */ 1014 proto_send(res->hr_ctrl, NULL, 0); 1015 res->hr_workerpid = pid; 1016 return; 1017 } 1018 1019 gres = res; 1020 res->output_status_aux = output_status_aux; 1021 mode = pjdlog_mode_get(); 1022 debuglevel = pjdlog_debug_get(); 1023 1024 /* Declare that we are sender. */ 1025 proto_send(res->hr_event, NULL, 0); 1026 proto_send(res->hr_conn, NULL, 0); 1027 /* Declare that we are receiver. */ 1028 proto_recv(res->hr_ctrl, NULL, 0); 1029 descriptors_cleanup(res); 1030 1031 descriptors_assert(res, mode); 1032 1033 pjdlog_init(mode); 1034 pjdlog_debug_set(debuglevel); 1035 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 1036 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 1037 1038 init_local(res); 1039 init_ggate(res); 1040 init_environment(res); 1041 1042 if (drop_privs(res) != 0) { 1043 cleanup(res); 1044 exit(EX_CONFIG); 1045 } 1046 pjdlog_info("Privileges successfully dropped."); 1047 1048 /* 1049 * Create the guard thread first, so we can handle signals from the 1050 * very beginning. 1051 */ 1052 error = pthread_create(&td, NULL, guard_thread, res); 1053 PJDLOG_ASSERT(error == 0); 1054 /* 1055 * Create the control thread before sending any event to the parent, 1056 * as we can deadlock when parent sends control request to worker, 1057 * but worker has no control thread started yet, so parent waits. 1058 * In the meantime worker sends an event to the parent, but parent 1059 * is unable to handle the event, because it waits for control 1060 * request response. 1061 */ 1062 error = pthread_create(&td, NULL, ctrl_thread, res); 1063 PJDLOG_ASSERT(error == 0); 1064 if (real_remote(res)) { 1065 error = init_remote(res, NULL, NULL); 1066 if (error == 0) { 1067 sync_start(); 1068 } else if (error == EBUSY) { 1069 time_t start = time(NULL); 1070 1071 pjdlog_warning("Waiting for remote node to become %s for %ds.", 1072 role2str(HAST_ROLE_SECONDARY), 1073 res->hr_timeout); 1074 for (;;) { 1075 sleep(1); 1076 error = init_remote(res, NULL, NULL); 1077 if (error != EBUSY) 1078 break; 1079 if (time(NULL) > start + res->hr_timeout) 1080 break; 1081 } 1082 if (error == EBUSY) { 1083 pjdlog_warning("Remote node is still %s, starting anyway.", 1084 role2str(HAST_ROLE_PRIMARY)); 1085 } 1086 } 1087 } 1088 error = pthread_create(&td, NULL, ggate_recv_thread, res); 1089 PJDLOG_ASSERT(error == 0); 1090 error = pthread_create(&td, NULL, local_send_thread, res); 1091 PJDLOG_ASSERT(error == 0); 1092 error = pthread_create(&td, NULL, remote_send_thread, res); 1093 PJDLOG_ASSERT(error == 0); 1094 error = pthread_create(&td, NULL, remote_recv_thread, res); 1095 PJDLOG_ASSERT(error == 0); 1096 error = pthread_create(&td, NULL, ggate_send_thread, res); 1097 PJDLOG_ASSERT(error == 0); 1098 fullystarted = true; 1099 (void)sync_thread(res); 1100 } 1101 1102 static void 1103 reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, 1104 const char *fmt, ...) 1105 { 1106 char msg[1024]; 1107 va_list ap; 1108 1109 va_start(ap, fmt); 1110 (void)vsnprintf(msg, sizeof(msg), fmt, ap); 1111 va_end(ap); 1112 switch (ggio->gctl_cmd) { 1113 case BIO_READ: 1114 (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1115 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1116 break; 1117 case BIO_DELETE: 1118 (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1119 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1120 break; 1121 case BIO_FLUSH: 1122 (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1123 break; 1124 case BIO_WRITE: 1125 (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1126 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1127 break; 1128 default: 1129 (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1130 (unsigned int)ggio->gctl_cmd); 1131 break; 1132 } 1133 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1134 } 1135 1136 static void 1137 remote_close(struct hast_resource *res, int ncomp) 1138 { 1139 1140 rw_wlock(&hio_remote_lock[ncomp]); 1141 /* 1142 * Check for a race between dropping rlock and acquiring wlock - 1143 * another thread can close connection in-between. 1144 */ 1145 if (!ISCONNECTED(res, ncomp)) { 1146 PJDLOG_ASSERT(res->hr_remotein == NULL); 1147 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1148 rw_unlock(&hio_remote_lock[ncomp]); 1149 return; 1150 } 1151 1152 PJDLOG_ASSERT(res->hr_remotein != NULL); 1153 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1154 1155 pjdlog_debug(2, "Closing incoming connection to %s.", 1156 res->hr_remoteaddr); 1157 proto_close(res->hr_remotein); 1158 res->hr_remotein = NULL; 1159 pjdlog_debug(2, "Closing outgoing connection to %s.", 1160 res->hr_remoteaddr); 1161 proto_close(res->hr_remoteout); 1162 res->hr_remoteout = NULL; 1163 1164 rw_unlock(&hio_remote_lock[ncomp]); 1165 1166 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1167 1168 /* 1169 * Stop synchronization if in-progress. 1170 */ 1171 sync_stop(); 1172 1173 event_send(res, EVENT_DISCONNECT); 1174 } 1175 1176 /* 1177 * Acknowledge write completion to the kernel, but don't update activemap yet. 1178 */ 1179 static void 1180 write_complete(struct hast_resource *res, struct hio *hio) 1181 { 1182 struct g_gate_ctl_io *ggio; 1183 unsigned int ncomp; 1184 1185 PJDLOG_ASSERT(!hio->hio_done); 1186 1187 ggio = &hio->hio_ggio; 1188 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1189 1190 /* 1191 * Bump local count if this is first write after 1192 * connection failure with remote node. 1193 */ 1194 ncomp = 1; 1195 rw_rlock(&hio_remote_lock[ncomp]); 1196 if (!ISCONNECTED(res, ncomp)) { 1197 mtx_lock(&metadata_lock); 1198 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1199 res->hr_primary_localcnt++; 1200 pjdlog_debug(1, "Increasing localcnt to %ju.", 1201 (uintmax_t)res->hr_primary_localcnt); 1202 (void)metadata_write(res); 1203 } 1204 mtx_unlock(&metadata_lock); 1205 } 1206 rw_unlock(&hio_remote_lock[ncomp]); 1207 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1208 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1209 hio->hio_done = true; 1210 } 1211 1212 /* 1213 * Thread receives ggate I/O requests from the kernel and passes them to 1214 * appropriate threads: 1215 * WRITE - always goes to both local_send and remote_send threads 1216 * READ (when the block is up-to-date on local component) - 1217 * only local_send thread 1218 * READ (when the block isn't up-to-date on local component) - 1219 * only remote_send thread 1220 * DELETE - always goes to both local_send and remote_send threads 1221 * FLUSH - always goes to both local_send and remote_send threads 1222 */ 1223 static void * 1224 ggate_recv_thread(void *arg) 1225 { 1226 struct hast_resource *res = arg; 1227 struct g_gate_ctl_io *ggio; 1228 struct hio *hio; 1229 unsigned int ii, ncomp, ncomps; 1230 int error; 1231 1232 for (;;) { 1233 pjdlog_debug(2, "ggate_recv: Taking free request."); 1234 QUEUE_TAKE2(hio, free); 1235 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1236 ggio = &hio->hio_ggio; 1237 ggio->gctl_unit = res->hr_ggateunit; 1238 ggio->gctl_length = MAXPHYS; 1239 ggio->gctl_error = 0; 1240 hio->hio_done = false; 1241 hio->hio_replication = res->hr_replication; 1242 pjdlog_debug(2, 1243 "ggate_recv: (%p) Waiting for request from the kernel.", 1244 hio); 1245 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1246 if (sigexit_received) 1247 pthread_exit(NULL); 1248 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1249 } 1250 error = ggio->gctl_error; 1251 switch (error) { 1252 case 0: 1253 break; 1254 case ECANCELED: 1255 /* Exit gracefully. */ 1256 if (!sigexit_received) { 1257 pjdlog_debug(2, 1258 "ggate_recv: (%p) Received cancel from the kernel.", 1259 hio); 1260 pjdlog_info("Received cancel from the kernel, exiting."); 1261 } 1262 pthread_exit(NULL); 1263 case ENOMEM: 1264 /* 1265 * Buffer too small? Impossible, we allocate MAXPHYS 1266 * bytes - request can't be bigger than that. 1267 */ 1268 /* FALLTHROUGH */ 1269 case ENXIO: 1270 default: 1271 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1272 strerror(error)); 1273 } 1274 1275 ncomp = 0; 1276 ncomps = HAST_NCOMPONENTS; 1277 1278 for (ii = 0; ii < ncomps; ii++) 1279 hio->hio_errors[ii] = EINVAL; 1280 reqlog(LOG_DEBUG, 2, ggio, 1281 "ggate_recv: (%p) Request received from the kernel: ", 1282 hio); 1283 1284 /* 1285 * Inform all components about new write request. 1286 * For read request prefer local component unless the given 1287 * range is out-of-date, then use remote component. 1288 */ 1289 switch (ggio->gctl_cmd) { 1290 case BIO_READ: 1291 res->hr_stat_read++; 1292 ncomps = 1; 1293 mtx_lock(&metadata_lock); 1294 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1295 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1296 /* 1297 * This range is up-to-date on local component, 1298 * so handle request locally. 1299 */ 1300 /* Local component is 0 for now. */ 1301 ncomp = 0; 1302 } else /* if (res->hr_syncsrc == 1303 HAST_SYNCSRC_SECONDARY) */ { 1304 PJDLOG_ASSERT(res->hr_syncsrc == 1305 HAST_SYNCSRC_SECONDARY); 1306 /* 1307 * This range is out-of-date on local component, 1308 * so send request to the remote node. 1309 */ 1310 /* Remote component is 1 for now. */ 1311 ncomp = 1; 1312 } 1313 mtx_unlock(&metadata_lock); 1314 break; 1315 case BIO_WRITE: 1316 res->hr_stat_write++; 1317 if (res->hr_resuid == 0 && 1318 res->hr_primary_localcnt == 0) { 1319 /* This is first write. */ 1320 res->hr_primary_localcnt = 1; 1321 } 1322 for (;;) { 1323 mtx_lock(&range_lock); 1324 if (rangelock_islocked(range_sync, 1325 ggio->gctl_offset, ggio->gctl_length)) { 1326 pjdlog_debug(2, 1327 "regular: Range offset=%jd length=%zu locked.", 1328 (intmax_t)ggio->gctl_offset, 1329 (size_t)ggio->gctl_length); 1330 range_regular_wait = true; 1331 cv_wait(&range_regular_cond, &range_lock); 1332 range_regular_wait = false; 1333 mtx_unlock(&range_lock); 1334 continue; 1335 } 1336 if (rangelock_add(range_regular, 1337 ggio->gctl_offset, ggio->gctl_length) == -1) { 1338 mtx_unlock(&range_lock); 1339 pjdlog_debug(2, 1340 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1341 (intmax_t)ggio->gctl_offset, 1342 (size_t)ggio->gctl_length); 1343 sleep(1); 1344 continue; 1345 } 1346 mtx_unlock(&range_lock); 1347 break; 1348 } 1349 mtx_lock(&res->hr_amp_lock); 1350 if (activemap_write_start(res->hr_amp, 1351 ggio->gctl_offset, ggio->gctl_length)) { 1352 res->hr_stat_activemap_update++; 1353 (void)hast_activemap_flush(res); 1354 } else { 1355 mtx_unlock(&res->hr_amp_lock); 1356 } 1357 if (ISMEMSYNC(hio)) { 1358 hio->hio_memsyncacked = false; 1359 refcnt_init(&hio->hio_writecount, ncomps); 1360 } 1361 break; 1362 case BIO_DELETE: 1363 res->hr_stat_delete++; 1364 break; 1365 case BIO_FLUSH: 1366 res->hr_stat_flush++; 1367 break; 1368 } 1369 pjdlog_debug(2, 1370 "ggate_recv: (%p) Moving request to the send queues.", hio); 1371 refcnt_init(&hio->hio_countdown, ncomps); 1372 for (ii = ncomp; ii < ncomps; ii++) 1373 QUEUE_INSERT1(hio, send, ii); 1374 } 1375 /* NOTREACHED */ 1376 return (NULL); 1377 } 1378 1379 /* 1380 * Thread reads from or writes to local component. 1381 * If local read fails, it redirects it to remote_send thread. 1382 */ 1383 static void * 1384 local_send_thread(void *arg) 1385 { 1386 struct hast_resource *res = arg; 1387 struct g_gate_ctl_io *ggio; 1388 struct hio *hio; 1389 unsigned int ncomp, rncomp; 1390 ssize_t ret; 1391 1392 /* Local component is 0 for now. */ 1393 ncomp = 0; 1394 /* Remote component is 1 for now. */ 1395 rncomp = 1; 1396 1397 for (;;) { 1398 pjdlog_debug(2, "local_send: Taking request."); 1399 QUEUE_TAKE1(hio, send, ncomp, 0); 1400 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1401 ggio = &hio->hio_ggio; 1402 switch (ggio->gctl_cmd) { 1403 case BIO_READ: 1404 ret = pread(res->hr_localfd, ggio->gctl_data, 1405 ggio->gctl_length, 1406 ggio->gctl_offset + res->hr_localoff); 1407 if (ret == ggio->gctl_length) 1408 hio->hio_errors[ncomp] = 0; 1409 else if (!ISSYNCREQ(hio)) { 1410 /* 1411 * If READ failed, try to read from remote node. 1412 */ 1413 if (ret == -1) { 1414 reqlog(LOG_WARNING, 0, ggio, 1415 "Local request failed (%s), trying remote node. ", 1416 strerror(errno)); 1417 } else if (ret != ggio->gctl_length) { 1418 reqlog(LOG_WARNING, 0, ggio, 1419 "Local request failed (%zd != %jd), trying remote node. ", 1420 ret, (intmax_t)ggio->gctl_length); 1421 } 1422 QUEUE_INSERT1(hio, send, rncomp); 1423 continue; 1424 } 1425 break; 1426 case BIO_WRITE: 1427 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1428 ggio->gctl_length, 1429 ggio->gctl_offset + res->hr_localoff); 1430 if (ret == -1) { 1431 hio->hio_errors[ncomp] = errno; 1432 reqlog(LOG_WARNING, 0, ggio, 1433 "Local request failed (%s): ", 1434 strerror(errno)); 1435 } else if (ret != ggio->gctl_length) { 1436 hio->hio_errors[ncomp] = EIO; 1437 reqlog(LOG_WARNING, 0, ggio, 1438 "Local request failed (%zd != %jd): ", 1439 ret, (intmax_t)ggio->gctl_length); 1440 } else { 1441 hio->hio_errors[ncomp] = 0; 1442 if (ISASYNC(hio)) { 1443 ggio->gctl_error = 0; 1444 write_complete(res, hio); 1445 } 1446 } 1447 break; 1448 case BIO_DELETE: 1449 ret = g_delete(res->hr_localfd, 1450 ggio->gctl_offset + res->hr_localoff, 1451 ggio->gctl_length); 1452 if (ret == -1) { 1453 hio->hio_errors[ncomp] = errno; 1454 reqlog(LOG_WARNING, 0, ggio, 1455 "Local request failed (%s): ", 1456 strerror(errno)); 1457 } else { 1458 hio->hio_errors[ncomp] = 0; 1459 } 1460 break; 1461 case BIO_FLUSH: 1462 if (!res->hr_localflush) { 1463 ret = -1; 1464 errno = EOPNOTSUPP; 1465 break; 1466 } 1467 ret = g_flush(res->hr_localfd); 1468 if (ret == -1) { 1469 if (errno == EOPNOTSUPP) 1470 res->hr_localflush = false; 1471 hio->hio_errors[ncomp] = errno; 1472 reqlog(LOG_WARNING, 0, ggio, 1473 "Local request failed (%s): ", 1474 strerror(errno)); 1475 } else { 1476 hio->hio_errors[ncomp] = 0; 1477 } 1478 break; 1479 } 1480 if (ISMEMSYNCWRITE(hio)) { 1481 if (refcnt_release(&hio->hio_writecount) == 0) { 1482 write_complete(res, hio); 1483 } 1484 } 1485 if (refcnt_release(&hio->hio_countdown) > 0) 1486 continue; 1487 if (ISSYNCREQ(hio)) { 1488 mtx_lock(&sync_lock); 1489 SYNCREQDONE(hio); 1490 mtx_unlock(&sync_lock); 1491 cv_signal(&sync_cond); 1492 } else { 1493 pjdlog_debug(2, 1494 "local_send: (%p) Moving request to the done queue.", 1495 hio); 1496 QUEUE_INSERT2(hio, done); 1497 } 1498 } 1499 /* NOTREACHED */ 1500 return (NULL); 1501 } 1502 1503 static void 1504 keepalive_send(struct hast_resource *res, unsigned int ncomp) 1505 { 1506 struct nv *nv; 1507 1508 rw_rlock(&hio_remote_lock[ncomp]); 1509 1510 if (!ISCONNECTED(res, ncomp)) { 1511 rw_unlock(&hio_remote_lock[ncomp]); 1512 return; 1513 } 1514 1515 PJDLOG_ASSERT(res->hr_remotein != NULL); 1516 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1517 1518 nv = nv_alloc(); 1519 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1520 if (nv_error(nv) != 0) { 1521 rw_unlock(&hio_remote_lock[ncomp]); 1522 nv_free(nv); 1523 pjdlog_debug(1, 1524 "keepalive_send: Unable to prepare header to send."); 1525 return; 1526 } 1527 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1528 rw_unlock(&hio_remote_lock[ncomp]); 1529 pjdlog_common(LOG_DEBUG, 1, errno, 1530 "keepalive_send: Unable to send request"); 1531 nv_free(nv); 1532 remote_close(res, ncomp); 1533 return; 1534 } 1535 1536 rw_unlock(&hio_remote_lock[ncomp]); 1537 nv_free(nv); 1538 pjdlog_debug(2, "keepalive_send: Request sent."); 1539 } 1540 1541 /* 1542 * Thread sends request to secondary node. 1543 */ 1544 static void * 1545 remote_send_thread(void *arg) 1546 { 1547 struct hast_resource *res = arg; 1548 struct g_gate_ctl_io *ggio; 1549 time_t lastcheck, now; 1550 struct hio *hio; 1551 struct nv *nv; 1552 unsigned int ncomp; 1553 bool wakeup; 1554 uint64_t offset, length; 1555 uint8_t cmd; 1556 void *data; 1557 1558 /* Remote component is 1 for now. */ 1559 ncomp = 1; 1560 lastcheck = time(NULL); 1561 1562 for (;;) { 1563 pjdlog_debug(2, "remote_send: Taking request."); 1564 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1565 if (hio == NULL) { 1566 now = time(NULL); 1567 if (lastcheck + HAST_KEEPALIVE <= now) { 1568 keepalive_send(res, ncomp); 1569 lastcheck = now; 1570 } 1571 continue; 1572 } 1573 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1574 ggio = &hio->hio_ggio; 1575 switch (ggio->gctl_cmd) { 1576 case BIO_READ: 1577 cmd = HIO_READ; 1578 data = NULL; 1579 offset = ggio->gctl_offset; 1580 length = ggio->gctl_length; 1581 break; 1582 case BIO_WRITE: 1583 cmd = HIO_WRITE; 1584 data = ggio->gctl_data; 1585 offset = ggio->gctl_offset; 1586 length = ggio->gctl_length; 1587 break; 1588 case BIO_DELETE: 1589 cmd = HIO_DELETE; 1590 data = NULL; 1591 offset = ggio->gctl_offset; 1592 length = ggio->gctl_length; 1593 break; 1594 case BIO_FLUSH: 1595 cmd = HIO_FLUSH; 1596 data = NULL; 1597 offset = 0; 1598 length = 0; 1599 break; 1600 default: 1601 PJDLOG_ABORT("invalid condition"); 1602 } 1603 nv = nv_alloc(); 1604 nv_add_uint8(nv, cmd, "cmd"); 1605 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1606 nv_add_uint64(nv, offset, "offset"); 1607 nv_add_uint64(nv, length, "length"); 1608 if (ISMEMSYNCWRITE(hio)) 1609 nv_add_uint8(nv, 1, "memsync"); 1610 if (nv_error(nv) != 0) { 1611 hio->hio_errors[ncomp] = nv_error(nv); 1612 pjdlog_debug(2, 1613 "remote_send: (%p) Unable to prepare header to send.", 1614 hio); 1615 reqlog(LOG_ERR, 0, ggio, 1616 "Unable to prepare header to send (%s): ", 1617 strerror(nv_error(nv))); 1618 /* Move failed request immediately to the done queue. */ 1619 goto done_queue; 1620 } 1621 /* 1622 * Protect connection from disappearing. 1623 */ 1624 rw_rlock(&hio_remote_lock[ncomp]); 1625 if (!ISCONNECTED(res, ncomp)) { 1626 rw_unlock(&hio_remote_lock[ncomp]); 1627 hio->hio_errors[ncomp] = ENOTCONN; 1628 goto done_queue; 1629 } 1630 /* 1631 * Move the request to recv queue before sending it, because 1632 * in different order we can get reply before we move request 1633 * to recv queue. 1634 */ 1635 pjdlog_debug(2, 1636 "remote_send: (%p) Moving request to the recv queue.", 1637 hio); 1638 mtx_lock(&hio_recv_list_lock[ncomp]); 1639 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1640 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1641 hio_recv_list_size[ncomp]++; 1642 mtx_unlock(&hio_recv_list_lock[ncomp]); 1643 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1644 data != NULL ? length : 0) == -1) { 1645 hio->hio_errors[ncomp] = errno; 1646 rw_unlock(&hio_remote_lock[ncomp]); 1647 pjdlog_debug(2, 1648 "remote_send: (%p) Unable to send request.", hio); 1649 reqlog(LOG_ERR, 0, ggio, 1650 "Unable to send request (%s): ", 1651 strerror(hio->hio_errors[ncomp])); 1652 remote_close(res, ncomp); 1653 } else { 1654 rw_unlock(&hio_remote_lock[ncomp]); 1655 } 1656 nv_free(nv); 1657 if (wakeup) 1658 cv_signal(&hio_recv_list_cond[ncomp]); 1659 continue; 1660 done_queue: 1661 nv_free(nv); 1662 if (ISSYNCREQ(hio)) { 1663 if (refcnt_release(&hio->hio_countdown) > 0) 1664 continue; 1665 mtx_lock(&sync_lock); 1666 SYNCREQDONE(hio); 1667 mtx_unlock(&sync_lock); 1668 cv_signal(&sync_cond); 1669 continue; 1670 } 1671 if (ggio->gctl_cmd == BIO_WRITE) { 1672 mtx_lock(&res->hr_amp_lock); 1673 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1674 ggio->gctl_length)) { 1675 (void)hast_activemap_flush(res); 1676 } else { 1677 mtx_unlock(&res->hr_amp_lock); 1678 } 1679 if (ISMEMSYNCWRITE(hio)) { 1680 if (refcnt_release(&hio->hio_writecount) == 0) { 1681 if (hio->hio_errors[0] == 0) 1682 write_complete(res, hio); 1683 } 1684 } 1685 } 1686 if (refcnt_release(&hio->hio_countdown) > 0) 1687 continue; 1688 pjdlog_debug(2, 1689 "remote_send: (%p) Moving request to the done queue.", 1690 hio); 1691 QUEUE_INSERT2(hio, done); 1692 } 1693 /* NOTREACHED */ 1694 return (NULL); 1695 } 1696 1697 /* 1698 * Thread receives answer from secondary node and passes it to ggate_send 1699 * thread. 1700 */ 1701 static void * 1702 remote_recv_thread(void *arg) 1703 { 1704 struct hast_resource *res = arg; 1705 struct g_gate_ctl_io *ggio; 1706 struct hio *hio; 1707 struct nv *nv; 1708 unsigned int ncomp; 1709 uint64_t seq; 1710 bool memsyncack; 1711 int error; 1712 1713 /* Remote component is 1 for now. */ 1714 ncomp = 1; 1715 1716 for (;;) { 1717 /* Wait until there is anything to receive. */ 1718 mtx_lock(&hio_recv_list_lock[ncomp]); 1719 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1720 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1721 cv_wait(&hio_recv_list_cond[ncomp], 1722 &hio_recv_list_lock[ncomp]); 1723 } 1724 mtx_unlock(&hio_recv_list_lock[ncomp]); 1725 1726 memsyncack = false; 1727 1728 rw_rlock(&hio_remote_lock[ncomp]); 1729 if (!ISCONNECTED(res, ncomp)) { 1730 rw_unlock(&hio_remote_lock[ncomp]); 1731 /* 1732 * Connection is dead, so move all pending requests to 1733 * the done queue (one-by-one). 1734 */ 1735 mtx_lock(&hio_recv_list_lock[ncomp]); 1736 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1737 PJDLOG_ASSERT(hio != NULL); 1738 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1739 hio_next[ncomp]); 1740 hio_recv_list_size[ncomp]--; 1741 mtx_unlock(&hio_recv_list_lock[ncomp]); 1742 hio->hio_errors[ncomp] = ENOTCONN; 1743 goto done_queue; 1744 } 1745 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1746 pjdlog_errno(LOG_ERR, 1747 "Unable to receive reply header"); 1748 rw_unlock(&hio_remote_lock[ncomp]); 1749 remote_close(res, ncomp); 1750 continue; 1751 } 1752 rw_unlock(&hio_remote_lock[ncomp]); 1753 seq = nv_get_uint64(nv, "seq"); 1754 if (seq == 0) { 1755 pjdlog_error("Header contains no 'seq' field."); 1756 nv_free(nv); 1757 continue; 1758 } 1759 memsyncack = nv_exists(nv, "received"); 1760 mtx_lock(&hio_recv_list_lock[ncomp]); 1761 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1762 if (hio->hio_ggio.gctl_seq == seq) { 1763 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1764 hio_next[ncomp]); 1765 hio_recv_list_size[ncomp]--; 1766 break; 1767 } 1768 } 1769 mtx_unlock(&hio_recv_list_lock[ncomp]); 1770 if (hio == NULL) { 1771 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1772 (uintmax_t)seq); 1773 nv_free(nv); 1774 continue; 1775 } 1776 ggio = &hio->hio_ggio; 1777 error = nv_get_int16(nv, "error"); 1778 if (error != 0) { 1779 /* Request failed on remote side. */ 1780 hio->hio_errors[ncomp] = error; 1781 reqlog(LOG_WARNING, 0, ggio, 1782 "Remote request failed (%s): ", strerror(error)); 1783 nv_free(nv); 1784 goto done_queue; 1785 } 1786 switch (ggio->gctl_cmd) { 1787 case BIO_READ: 1788 rw_rlock(&hio_remote_lock[ncomp]); 1789 if (!ISCONNECTED(res, ncomp)) { 1790 rw_unlock(&hio_remote_lock[ncomp]); 1791 nv_free(nv); 1792 goto done_queue; 1793 } 1794 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1795 ggio->gctl_data, ggio->gctl_length) == -1) { 1796 hio->hio_errors[ncomp] = errno; 1797 pjdlog_errno(LOG_ERR, 1798 "Unable to receive reply data"); 1799 rw_unlock(&hio_remote_lock[ncomp]); 1800 nv_free(nv); 1801 remote_close(res, ncomp); 1802 goto done_queue; 1803 } 1804 rw_unlock(&hio_remote_lock[ncomp]); 1805 break; 1806 case BIO_WRITE: 1807 case BIO_DELETE: 1808 case BIO_FLUSH: 1809 break; 1810 default: 1811 PJDLOG_ABORT("invalid condition"); 1812 } 1813 hio->hio_errors[ncomp] = 0; 1814 nv_free(nv); 1815 done_queue: 1816 if (ISMEMSYNCWRITE(hio)) { 1817 if (!hio->hio_memsyncacked) { 1818 PJDLOG_ASSERT(memsyncack || 1819 hio->hio_errors[ncomp] != 0); 1820 /* Remote ack arrived. */ 1821 if (refcnt_release(&hio->hio_writecount) == 0) { 1822 if (hio->hio_errors[0] == 0) 1823 write_complete(res, hio); 1824 } 1825 hio->hio_memsyncacked = true; 1826 if (hio->hio_errors[ncomp] == 0) { 1827 pjdlog_debug(2, 1828 "remote_recv: (%p) Moving request " 1829 "back to the recv queue.", hio); 1830 mtx_lock(&hio_recv_list_lock[ncomp]); 1831 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], 1832 hio, hio_next[ncomp]); 1833 hio_recv_list_size[ncomp]++; 1834 mtx_unlock(&hio_recv_list_lock[ncomp]); 1835 continue; 1836 } 1837 } else { 1838 PJDLOG_ASSERT(!memsyncack); 1839 /* Remote final reply arrived. */ 1840 } 1841 } 1842 if (refcnt_release(&hio->hio_countdown) > 0) 1843 continue; 1844 if (ISSYNCREQ(hio)) { 1845 mtx_lock(&sync_lock); 1846 SYNCREQDONE(hio); 1847 mtx_unlock(&sync_lock); 1848 cv_signal(&sync_cond); 1849 } else { 1850 pjdlog_debug(2, 1851 "remote_recv: (%p) Moving request to the done queue.", 1852 hio); 1853 QUEUE_INSERT2(hio, done); 1854 } 1855 } 1856 /* NOTREACHED */ 1857 return (NULL); 1858 } 1859 1860 /* 1861 * Thread sends answer to the kernel. 1862 */ 1863 static void * 1864 ggate_send_thread(void *arg) 1865 { 1866 struct hast_resource *res = arg; 1867 struct g_gate_ctl_io *ggio; 1868 struct hio *hio; 1869 unsigned int ii, ncomps; 1870 1871 ncomps = HAST_NCOMPONENTS; 1872 1873 for (;;) { 1874 pjdlog_debug(2, "ggate_send: Taking request."); 1875 QUEUE_TAKE2(hio, done); 1876 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1877 ggio = &hio->hio_ggio; 1878 for (ii = 0; ii < ncomps; ii++) { 1879 if (hio->hio_errors[ii] == 0) { 1880 /* 1881 * One successful request is enough to declare 1882 * success. 1883 */ 1884 ggio->gctl_error = 0; 1885 break; 1886 } 1887 } 1888 if (ii == ncomps) { 1889 /* 1890 * None of the requests were successful. 1891 * Use the error from local component except the 1892 * case when we did only remote request. 1893 */ 1894 if (ggio->gctl_cmd == BIO_READ && 1895 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1896 ggio->gctl_error = hio->hio_errors[1]; 1897 else 1898 ggio->gctl_error = hio->hio_errors[0]; 1899 } 1900 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1901 mtx_lock(&res->hr_amp_lock); 1902 if (activemap_write_complete(res->hr_amp, 1903 ggio->gctl_offset, ggio->gctl_length)) { 1904 res->hr_stat_activemap_update++; 1905 (void)hast_activemap_flush(res); 1906 } else { 1907 mtx_unlock(&res->hr_amp_lock); 1908 } 1909 } 1910 if (ggio->gctl_cmd == BIO_WRITE) { 1911 /* 1912 * Unlock range we locked. 1913 */ 1914 mtx_lock(&range_lock); 1915 rangelock_del(range_regular, ggio->gctl_offset, 1916 ggio->gctl_length); 1917 if (range_sync_wait) 1918 cv_signal(&range_sync_cond); 1919 mtx_unlock(&range_lock); 1920 if (!hio->hio_done) 1921 write_complete(res, hio); 1922 } else { 1923 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1924 primary_exit(EX_OSERR, 1925 "G_GATE_CMD_DONE failed"); 1926 } 1927 } 1928 if (hio->hio_errors[0]) { 1929 switch (ggio->gctl_cmd) { 1930 case BIO_READ: 1931 res->hr_stat_read_error++; 1932 break; 1933 case BIO_WRITE: 1934 res->hr_stat_write_error++; 1935 break; 1936 case BIO_DELETE: 1937 res->hr_stat_delete_error++; 1938 break; 1939 case BIO_FLUSH: 1940 res->hr_stat_flush_error++; 1941 break; 1942 } 1943 } 1944 pjdlog_debug(2, 1945 "ggate_send: (%p) Moving request to the free queue.", hio); 1946 QUEUE_INSERT2(hio, free); 1947 } 1948 /* NOTREACHED */ 1949 return (NULL); 1950 } 1951 1952 /* 1953 * Thread synchronize local and remote components. 1954 */ 1955 static void * 1956 sync_thread(void *arg __unused) 1957 { 1958 struct hast_resource *res = arg; 1959 struct hio *hio; 1960 struct g_gate_ctl_io *ggio; 1961 struct timeval tstart, tend, tdiff; 1962 unsigned int ii, ncomp, ncomps; 1963 off_t offset, length, synced; 1964 bool dorewind, directreads; 1965 int syncext; 1966 1967 ncomps = HAST_NCOMPONENTS; 1968 dorewind = true; 1969 synced = 0; 1970 offset = -1; 1971 directreads = false; 1972 1973 for (;;) { 1974 mtx_lock(&sync_lock); 1975 if (offset >= 0 && !sync_inprogress) { 1976 gettimeofday(&tend, NULL); 1977 timersub(&tend, &tstart, &tdiff); 1978 pjdlog_info("Synchronization interrupted after %#.0T. " 1979 "%NB synchronized so far.", &tdiff, 1980 (intmax_t)synced); 1981 event_send(res, EVENT_SYNCINTR); 1982 } 1983 while (!sync_inprogress) { 1984 dorewind = true; 1985 synced = 0; 1986 cv_wait(&sync_cond, &sync_lock); 1987 } 1988 mtx_unlock(&sync_lock); 1989 /* 1990 * Obtain offset at which we should synchronize. 1991 * Rewind synchronization if needed. 1992 */ 1993 mtx_lock(&res->hr_amp_lock); 1994 if (dorewind) 1995 activemap_sync_rewind(res->hr_amp); 1996 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1997 if (syncext != -1) { 1998 /* 1999 * We synchronized entire syncext extent, we can mark 2000 * it as clean now. 2001 */ 2002 if (activemap_extent_complete(res->hr_amp, syncext)) 2003 (void)hast_activemap_flush(res); 2004 else 2005 mtx_unlock(&res->hr_amp_lock); 2006 } else { 2007 mtx_unlock(&res->hr_amp_lock); 2008 } 2009 if (dorewind) { 2010 dorewind = false; 2011 if (offset == -1) 2012 pjdlog_info("Nodes are in sync."); 2013 else { 2014 pjdlog_info("Synchronization started. %NB to go.", 2015 (intmax_t)(res->hr_extentsize * 2016 activemap_ndirty(res->hr_amp))); 2017 event_send(res, EVENT_SYNCSTART); 2018 gettimeofday(&tstart, NULL); 2019 } 2020 } 2021 if (offset == -1) { 2022 sync_stop(); 2023 pjdlog_debug(1, "Nothing to synchronize."); 2024 /* 2025 * Synchronization complete, make both localcnt and 2026 * remotecnt equal. 2027 */ 2028 ncomp = 1; 2029 rw_rlock(&hio_remote_lock[ncomp]); 2030 if (ISCONNECTED(res, ncomp)) { 2031 if (synced > 0) { 2032 int64_t bps; 2033 2034 gettimeofday(&tend, NULL); 2035 timersub(&tend, &tstart, &tdiff); 2036 bps = (int64_t)((double)synced / 2037 ((double)tdiff.tv_sec + 2038 (double)tdiff.tv_usec / 1000000)); 2039 pjdlog_info("Synchronization complete. " 2040 "%NB synchronized in %#.0lT (%NB/sec).", 2041 (intmax_t)synced, &tdiff, 2042 (intmax_t)bps); 2043 event_send(res, EVENT_SYNCDONE); 2044 } 2045 mtx_lock(&metadata_lock); 2046 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 2047 directreads = true; 2048 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 2049 res->hr_primary_localcnt = 2050 res->hr_secondary_remotecnt; 2051 res->hr_primary_remotecnt = 2052 res->hr_secondary_localcnt; 2053 pjdlog_debug(1, 2054 "Setting localcnt to %ju and remotecnt to %ju.", 2055 (uintmax_t)res->hr_primary_localcnt, 2056 (uintmax_t)res->hr_primary_remotecnt); 2057 (void)metadata_write(res); 2058 mtx_unlock(&metadata_lock); 2059 } 2060 rw_unlock(&hio_remote_lock[ncomp]); 2061 if (directreads) { 2062 directreads = false; 2063 enable_direct_reads(res); 2064 } 2065 continue; 2066 } 2067 pjdlog_debug(2, "sync: Taking free request."); 2068 QUEUE_TAKE2(hio, free); 2069 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 2070 /* 2071 * Lock the range we are going to synchronize. We don't want 2072 * race where someone writes between our read and write. 2073 */ 2074 for (;;) { 2075 mtx_lock(&range_lock); 2076 if (rangelock_islocked(range_regular, offset, length)) { 2077 pjdlog_debug(2, 2078 "sync: Range offset=%jd length=%jd locked.", 2079 (intmax_t)offset, (intmax_t)length); 2080 range_sync_wait = true; 2081 cv_wait(&range_sync_cond, &range_lock); 2082 range_sync_wait = false; 2083 mtx_unlock(&range_lock); 2084 continue; 2085 } 2086 if (rangelock_add(range_sync, offset, length) == -1) { 2087 mtx_unlock(&range_lock); 2088 pjdlog_debug(2, 2089 "sync: Range offset=%jd length=%jd is already locked, waiting.", 2090 (intmax_t)offset, (intmax_t)length); 2091 sleep(1); 2092 continue; 2093 } 2094 mtx_unlock(&range_lock); 2095 break; 2096 } 2097 /* 2098 * First read the data from synchronization source. 2099 */ 2100 SYNCREQ(hio); 2101 ggio = &hio->hio_ggio; 2102 ggio->gctl_cmd = BIO_READ; 2103 ggio->gctl_offset = offset; 2104 ggio->gctl_length = length; 2105 ggio->gctl_error = 0; 2106 hio->hio_done = false; 2107 hio->hio_replication = res->hr_replication; 2108 for (ii = 0; ii < ncomps; ii++) 2109 hio->hio_errors[ii] = EINVAL; 2110 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2111 hio); 2112 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2113 hio); 2114 mtx_lock(&metadata_lock); 2115 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2116 /* 2117 * This range is up-to-date on local component, 2118 * so handle request locally. 2119 */ 2120 /* Local component is 0 for now. */ 2121 ncomp = 0; 2122 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2123 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2124 /* 2125 * This range is out-of-date on local component, 2126 * so send request to the remote node. 2127 */ 2128 /* Remote component is 1 for now. */ 2129 ncomp = 1; 2130 } 2131 mtx_unlock(&metadata_lock); 2132 refcnt_init(&hio->hio_countdown, 1); 2133 QUEUE_INSERT1(hio, send, ncomp); 2134 2135 /* 2136 * Let's wait for READ to finish. 2137 */ 2138 mtx_lock(&sync_lock); 2139 while (!ISSYNCREQDONE(hio)) 2140 cv_wait(&sync_cond, &sync_lock); 2141 mtx_unlock(&sync_lock); 2142 2143 if (hio->hio_errors[ncomp] != 0) { 2144 pjdlog_error("Unable to read synchronization data: %s.", 2145 strerror(hio->hio_errors[ncomp])); 2146 goto free_queue; 2147 } 2148 2149 /* 2150 * We read the data from synchronization source, now write it 2151 * to synchronization target. 2152 */ 2153 SYNCREQ(hio); 2154 ggio->gctl_cmd = BIO_WRITE; 2155 for (ii = 0; ii < ncomps; ii++) 2156 hio->hio_errors[ii] = EINVAL; 2157 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2158 hio); 2159 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2160 hio); 2161 mtx_lock(&metadata_lock); 2162 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2163 /* 2164 * This range is up-to-date on local component, 2165 * so we update remote component. 2166 */ 2167 /* Remote component is 1 for now. */ 2168 ncomp = 1; 2169 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2170 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2171 /* 2172 * This range is out-of-date on local component, 2173 * so we update it. 2174 */ 2175 /* Local component is 0 for now. */ 2176 ncomp = 0; 2177 } 2178 mtx_unlock(&metadata_lock); 2179 2180 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2181 hio); 2182 refcnt_init(&hio->hio_countdown, 1); 2183 QUEUE_INSERT1(hio, send, ncomp); 2184 2185 /* 2186 * Let's wait for WRITE to finish. 2187 */ 2188 mtx_lock(&sync_lock); 2189 while (!ISSYNCREQDONE(hio)) 2190 cv_wait(&sync_cond, &sync_lock); 2191 mtx_unlock(&sync_lock); 2192 2193 if (hio->hio_errors[ncomp] != 0) { 2194 pjdlog_error("Unable to write synchronization data: %s.", 2195 strerror(hio->hio_errors[ncomp])); 2196 goto free_queue; 2197 } 2198 2199 synced += length; 2200 free_queue: 2201 mtx_lock(&range_lock); 2202 rangelock_del(range_sync, offset, length); 2203 if (range_regular_wait) 2204 cv_signal(&range_regular_cond); 2205 mtx_unlock(&range_lock); 2206 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2207 hio); 2208 QUEUE_INSERT2(hio, free); 2209 } 2210 /* NOTREACHED */ 2211 return (NULL); 2212 } 2213 2214 void 2215 primary_config_reload(struct hast_resource *res, struct nv *nv) 2216 { 2217 unsigned int ii, ncomps; 2218 int modified, vint; 2219 const char *vstr; 2220 2221 pjdlog_info("Reloading configuration..."); 2222 2223 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2224 PJDLOG_ASSERT(gres == res); 2225 nv_assert(nv, "remoteaddr"); 2226 nv_assert(nv, "sourceaddr"); 2227 nv_assert(nv, "replication"); 2228 nv_assert(nv, "checksum"); 2229 nv_assert(nv, "compression"); 2230 nv_assert(nv, "timeout"); 2231 nv_assert(nv, "exec"); 2232 nv_assert(nv, "metaflush"); 2233 2234 ncomps = HAST_NCOMPONENTS; 2235 2236 #define MODIFIED_REMOTEADDR 0x01 2237 #define MODIFIED_SOURCEADDR 0x02 2238 #define MODIFIED_REPLICATION 0x04 2239 #define MODIFIED_CHECKSUM 0x08 2240 #define MODIFIED_COMPRESSION 0x10 2241 #define MODIFIED_TIMEOUT 0x20 2242 #define MODIFIED_EXEC 0x40 2243 #define MODIFIED_METAFLUSH 0x80 2244 modified = 0; 2245 2246 vstr = nv_get_string(nv, "remoteaddr"); 2247 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2248 /* 2249 * Don't copy res->hr_remoteaddr to gres just yet. 2250 * We want remote_close() to log disconnect from the old 2251 * addresses, not from the new ones. 2252 */ 2253 modified |= MODIFIED_REMOTEADDR; 2254 } 2255 vstr = nv_get_string(nv, "sourceaddr"); 2256 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2257 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2258 modified |= MODIFIED_SOURCEADDR; 2259 } 2260 vint = nv_get_int32(nv, "replication"); 2261 if (gres->hr_replication != vint) { 2262 gres->hr_replication = vint; 2263 modified |= MODIFIED_REPLICATION; 2264 } 2265 vint = nv_get_int32(nv, "checksum"); 2266 if (gres->hr_checksum != vint) { 2267 gres->hr_checksum = vint; 2268 modified |= MODIFIED_CHECKSUM; 2269 } 2270 vint = nv_get_int32(nv, "compression"); 2271 if (gres->hr_compression != vint) { 2272 gres->hr_compression = vint; 2273 modified |= MODIFIED_COMPRESSION; 2274 } 2275 vint = nv_get_int32(nv, "timeout"); 2276 if (gres->hr_timeout != vint) { 2277 gres->hr_timeout = vint; 2278 modified |= MODIFIED_TIMEOUT; 2279 } 2280 vstr = nv_get_string(nv, "exec"); 2281 if (strcmp(gres->hr_exec, vstr) != 0) { 2282 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2283 modified |= MODIFIED_EXEC; 2284 } 2285 vint = nv_get_int32(nv, "metaflush"); 2286 if (gres->hr_metaflush != vint) { 2287 gres->hr_metaflush = vint; 2288 modified |= MODIFIED_METAFLUSH; 2289 } 2290 2291 /* 2292 * Change timeout for connected sockets. 2293 * Don't bother if we need to reconnect. 2294 */ 2295 if ((modified & MODIFIED_TIMEOUT) != 0 && 2296 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2297 for (ii = 0; ii < ncomps; ii++) { 2298 if (!ISREMOTE(ii)) 2299 continue; 2300 rw_rlock(&hio_remote_lock[ii]); 2301 if (!ISCONNECTED(gres, ii)) { 2302 rw_unlock(&hio_remote_lock[ii]); 2303 continue; 2304 } 2305 rw_unlock(&hio_remote_lock[ii]); 2306 if (proto_timeout(gres->hr_remotein, 2307 gres->hr_timeout) == -1) { 2308 pjdlog_errno(LOG_WARNING, 2309 "Unable to set connection timeout"); 2310 } 2311 if (proto_timeout(gres->hr_remoteout, 2312 gres->hr_timeout) == -1) { 2313 pjdlog_errno(LOG_WARNING, 2314 "Unable to set connection timeout"); 2315 } 2316 } 2317 } 2318 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2319 for (ii = 0; ii < ncomps; ii++) { 2320 if (!ISREMOTE(ii)) 2321 continue; 2322 remote_close(gres, ii); 2323 } 2324 if (modified & MODIFIED_REMOTEADDR) { 2325 vstr = nv_get_string(nv, "remoteaddr"); 2326 strlcpy(gres->hr_remoteaddr, vstr, 2327 sizeof(gres->hr_remoteaddr)); 2328 } 2329 } 2330 #undef MODIFIED_REMOTEADDR 2331 #undef MODIFIED_SOURCEADDR 2332 #undef MODIFIED_REPLICATION 2333 #undef MODIFIED_CHECKSUM 2334 #undef MODIFIED_COMPRESSION 2335 #undef MODIFIED_TIMEOUT 2336 #undef MODIFIED_EXEC 2337 #undef MODIFIED_METAFLUSH 2338 2339 pjdlog_info("Configuration reloaded successfully."); 2340 } 2341 2342 static void 2343 guard_one(struct hast_resource *res, unsigned int ncomp) 2344 { 2345 struct proto_conn *in, *out; 2346 2347 if (!ISREMOTE(ncomp)) 2348 return; 2349 2350 rw_rlock(&hio_remote_lock[ncomp]); 2351 2352 if (!real_remote(res)) { 2353 rw_unlock(&hio_remote_lock[ncomp]); 2354 return; 2355 } 2356 2357 if (ISCONNECTED(res, ncomp)) { 2358 PJDLOG_ASSERT(res->hr_remotein != NULL); 2359 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2360 rw_unlock(&hio_remote_lock[ncomp]); 2361 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2362 res->hr_remoteaddr); 2363 return; 2364 } 2365 2366 PJDLOG_ASSERT(res->hr_remotein == NULL); 2367 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2368 /* 2369 * Upgrade the lock. It doesn't have to be atomic as no other thread 2370 * can change connection status from disconnected to connected. 2371 */ 2372 rw_unlock(&hio_remote_lock[ncomp]); 2373 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2374 res->hr_remoteaddr); 2375 in = out = NULL; 2376 if (init_remote(res, &in, &out) == 0) { 2377 rw_wlock(&hio_remote_lock[ncomp]); 2378 PJDLOG_ASSERT(res->hr_remotein == NULL); 2379 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2380 PJDLOG_ASSERT(in != NULL && out != NULL); 2381 res->hr_remotein = in; 2382 res->hr_remoteout = out; 2383 rw_unlock(&hio_remote_lock[ncomp]); 2384 pjdlog_info("Successfully reconnected to %s.", 2385 res->hr_remoteaddr); 2386 sync_start(); 2387 } else { 2388 /* Both connections should be NULL. */ 2389 PJDLOG_ASSERT(res->hr_remotein == NULL); 2390 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2391 PJDLOG_ASSERT(in == NULL && out == NULL); 2392 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2393 res->hr_remoteaddr); 2394 } 2395 } 2396 2397 /* 2398 * Thread guards remote connections and reconnects when needed, handles 2399 * signals, etc. 2400 */ 2401 static void * 2402 guard_thread(void *arg) 2403 { 2404 struct hast_resource *res = arg; 2405 unsigned int ii, ncomps; 2406 struct timespec timeout; 2407 time_t lastcheck, now; 2408 sigset_t mask; 2409 int signo; 2410 2411 ncomps = HAST_NCOMPONENTS; 2412 lastcheck = time(NULL); 2413 2414 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2415 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2416 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2417 2418 timeout.tv_sec = HAST_KEEPALIVE; 2419 timeout.tv_nsec = 0; 2420 signo = -1; 2421 2422 for (;;) { 2423 switch (signo) { 2424 case SIGINT: 2425 case SIGTERM: 2426 sigexit_received = true; 2427 primary_exitx(EX_OK, 2428 "Termination signal received, exiting."); 2429 break; 2430 default: 2431 break; 2432 } 2433 2434 /* 2435 * Don't check connections until we fully started, 2436 * as we may still be looping, waiting for remote node 2437 * to switch from primary to secondary. 2438 */ 2439 if (fullystarted) { 2440 pjdlog_debug(2, "remote_guard: Checking connections."); 2441 now = time(NULL); 2442 if (lastcheck + HAST_KEEPALIVE <= now) { 2443 for (ii = 0; ii < ncomps; ii++) 2444 guard_one(res, ii); 2445 lastcheck = now; 2446 } 2447 } 2448 signo = sigtimedwait(&mask, NULL, &timeout); 2449 } 2450 /* NOTREACHED */ 2451 return (NULL); 2452 } 2453