1 /*- 2 * Copyright (c) 2009-2010 The FreeBSD Foundation 3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/param.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/stat.h> 39 40 #include <err.h> 41 #include <errno.h> 42 #include <fcntl.h> 43 #include <libgeom.h> 44 #include <pthread.h> 45 #include <signal.h> 46 #include <stdint.h> 47 #include <stdio.h> 48 #include <string.h> 49 #include <sysexits.h> 50 #include <unistd.h> 51 52 #include <activemap.h> 53 #include <nv.h> 54 #include <pjdlog.h> 55 56 #include "control.h" 57 #include "event.h" 58 #include "hast.h" 59 #include "hast_proto.h" 60 #include "hastd.h" 61 #include "hooks.h" 62 #include "metadata.h" 63 #include "proto.h" 64 #include "subr.h" 65 #include "synch.h" 66 67 struct hio { 68 uint64_t hio_seq; 69 int hio_error; 70 void *hio_data; 71 uint8_t hio_cmd; 72 uint64_t hio_offset; 73 uint64_t hio_length; 74 bool hio_memsync; 75 TAILQ_ENTRY(hio) hio_next; 76 }; 77 78 static struct hast_resource *gres; 79 80 /* 81 * Free list holds unused structures. When free list is empty, we have to wait 82 * until some in-progress requests are freed. 83 */ 84 static TAILQ_HEAD(, hio) hio_free_list; 85 static pthread_mutex_t hio_free_list_lock; 86 static pthread_cond_t hio_free_list_cond; 87 /* 88 * Disk thread (the one that do I/O requests) takes requests from this list. 89 */ 90 static TAILQ_HEAD(, hio) hio_disk_list; 91 static pthread_mutex_t hio_disk_list_lock; 92 static pthread_cond_t hio_disk_list_cond; 93 /* 94 * There is one recv list for every component, although local components don't 95 * use recv lists as local requests are done synchronously. 96 */ 97 static TAILQ_HEAD(, hio) hio_send_list; 98 static pthread_mutex_t hio_send_list_lock; 99 static pthread_cond_t hio_send_list_cond; 100 101 /* 102 * Maximum number of outstanding I/O requests. 103 */ 104 #define HAST_HIO_MAX 256 105 106 static void *recv_thread(void *arg); 107 static void *disk_thread(void *arg); 108 static void *send_thread(void *arg); 109 110 #define QUEUE_INSERT(name, hio) do { \ 111 bool _wakeup; \ 112 \ 113 mtx_lock(&hio_##name##_list_lock); \ 114 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 115 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ 116 mtx_unlock(&hio_##name##_list_lock); \ 117 if (_wakeup) \ 118 cv_signal(&hio_##name##_list_cond); \ 119 } while (0) 120 #define QUEUE_TAKE(name, hio) do { \ 121 mtx_lock(&hio_##name##_list_lock); \ 122 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 123 cv_wait(&hio_##name##_list_cond, \ 124 &hio_##name##_list_lock); \ 125 } \ 126 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ 127 mtx_unlock(&hio_##name##_list_lock); \ 128 } while (0) 129 130 static void 131 hio_clear(struct hio *hio) 132 { 133 134 hio->hio_seq = 0; 135 hio->hio_error = 0; 136 hio->hio_cmd = HIO_UNDEF; 137 hio->hio_offset = 0; 138 hio->hio_length = 0; 139 hio->hio_memsync = false; 140 } 141 142 static void 143 hio_copy(const struct hio *srchio, struct hio *dsthio) 144 { 145 146 /* 147 * We don't copy hio_error, hio_data and hio_next fields. 148 */ 149 150 dsthio->hio_seq = srchio->hio_seq; 151 dsthio->hio_cmd = srchio->hio_cmd; 152 dsthio->hio_offset = srchio->hio_offset; 153 dsthio->hio_length = srchio->hio_length; 154 dsthio->hio_memsync = srchio->hio_memsync; 155 } 156 157 static void 158 init_environment(void) 159 { 160 struct hio *hio; 161 unsigned int ii; 162 163 /* 164 * Initialize lists, their locks and theirs condition variables. 165 */ 166 TAILQ_INIT(&hio_free_list); 167 mtx_init(&hio_free_list_lock); 168 cv_init(&hio_free_list_cond); 169 TAILQ_INIT(&hio_disk_list); 170 mtx_init(&hio_disk_list_lock); 171 cv_init(&hio_disk_list_cond); 172 TAILQ_INIT(&hio_send_list); 173 mtx_init(&hio_send_list_lock); 174 cv_init(&hio_send_list_cond); 175 176 /* 177 * Allocate requests pool and initialize requests. 178 */ 179 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 180 hio = malloc(sizeof(*hio)); 181 if (hio == NULL) { 182 pjdlog_exitx(EX_TEMPFAIL, 183 "Unable to allocate memory (%zu bytes) for hio request.", 184 sizeof(*hio)); 185 } 186 hio->hio_data = malloc(MAXPHYS); 187 if (hio->hio_data == NULL) { 188 pjdlog_exitx(EX_TEMPFAIL, 189 "Unable to allocate memory (%zu bytes) for gctl_data.", 190 (size_t)MAXPHYS); 191 } 192 hio_clear(hio); 193 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); 194 } 195 } 196 197 static void 198 init_local(struct hast_resource *res) 199 { 200 201 if (metadata_read(res, true) == -1) 202 exit(EX_NOINPUT); 203 } 204 205 static void 206 init_remote(struct hast_resource *res, struct nv *nvin) 207 { 208 uint64_t resuid; 209 struct nv *nvout; 210 unsigned char *map; 211 size_t mapsize; 212 213 #ifdef notyet 214 /* Setup direction. */ 215 if (proto_send(res->hr_remoteout, NULL, 0) == -1) 216 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 217 #endif 218 219 nvout = nv_alloc(); 220 nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); 221 nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); 222 resuid = nv_get_uint64(nvin, "resuid"); 223 res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); 224 res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 225 nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); 226 nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); 227 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 228 METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); 229 map = malloc(mapsize); 230 if (map == NULL) { 231 pjdlog_exitx(EX_TEMPFAIL, 232 "Unable to allocate memory (%zu bytes) for activemap.", 233 mapsize); 234 } 235 /* 236 * When we work as primary and secondary is missing we will increase 237 * localcnt in our metadata. When secondary is connected and synced 238 * we make localcnt be equal to remotecnt, which means nodes are more 239 * or less in sync. 240 * Split-brain condition is when both nodes are not able to communicate 241 * and are both configured as primary nodes. In turn, they can both 242 * make incompatible changes to the data and we have to detect that. 243 * Under split-brain condition we will increase our localcnt on first 244 * write and remote node will increase its localcnt on first write. 245 * When we connect we can see that primary's localcnt is greater than 246 * our remotecnt (primary was modified while we weren't watching) and 247 * our localcnt is greater than primary's remotecnt (we were modified 248 * while primary wasn't watching). 249 * There are many possible combinations which are all gathered below. 250 * Don't pay too much attention to exact numbers, the more important 251 * is to compare them. We compare secondary's local with primary's 252 * remote and secondary's remote with primary's local. 253 * Note that every case where primary's localcnt is smaller than 254 * secondary's remotecnt and where secondary's localcnt is smaller than 255 * primary's remotecnt should be impossible in practise. We will perform 256 * full synchronization then. Those cases are marked with an asterisk. 257 * Regular synchronization means that only extents marked as dirty are 258 * synchronized (regular synchronization). 259 * 260 * SECONDARY METADATA PRIMARY METADATA 261 * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. 262 * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. 263 * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. 264 * local=3 remote=3 local=3 remote=2 Primary is out-of-date, 265 * regular sync from secondary. 266 * local=3 remote=3 local=3 remote=3 Regular sync just in case. 267 * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. 268 * local=3 remote=3 local=4 remote=2 Split-brain condition. 269 * local=3 remote=3 local=4 remote=3 Secondary out-of-date, 270 * regular sync from primary. 271 * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. 272 */ 273 if (res->hr_resuid == 0) { 274 /* 275 * Provider is used for the first time. If primary node done no 276 * writes yet as well (we will find "virgin" argument) then 277 * there is no need to synchronize anything. If primary node 278 * done any writes already we have to synchronize everything. 279 */ 280 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 281 res->hr_resuid = resuid; 282 if (metadata_write(res) == -1) 283 exit(EX_NOINPUT); 284 if (nv_exists(nvin, "virgin")) { 285 free(map); 286 map = NULL; 287 mapsize = 0; 288 } else { 289 memset(map, 0xff, mapsize); 290 } 291 nv_add_int8(nvout, 1, "virgin"); 292 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 293 } else if (res->hr_resuid != resuid) { 294 char errmsg[256]; 295 296 free(map); 297 (void)snprintf(errmsg, sizeof(errmsg), 298 "Resource unique ID mismatch (primary=%ju, secondary=%ju).", 299 (uintmax_t)resuid, (uintmax_t)res->hr_resuid); 300 pjdlog_error("%s", errmsg); 301 nv_add_string(nvout, errmsg, "errmsg"); 302 if (hast_proto_send(res, res->hr_remotein, nvout, 303 NULL, 0) == -1) { 304 pjdlog_exit(EX_TEMPFAIL, 305 "Unable to send response to %s", 306 res->hr_remoteaddr); 307 } 308 nv_free(nvout); 309 exit(EX_CONFIG); 310 } else if ( 311 /* Is primary out-of-date? */ 312 (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 313 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 314 /* Are the nodes more or less in sync? */ 315 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 316 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 317 /* Is secondary out-of-date? */ 318 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 319 res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { 320 /* 321 * Nodes are more or less in sync or one of the nodes is 322 * out-of-date. 323 * It doesn't matter at this point which one, we just have to 324 * send out local bitmap to the remote node. 325 */ 326 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != 327 (ssize_t)mapsize) { 328 pjdlog_exit(LOG_ERR, "Unable to read activemap"); 329 } 330 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 331 res->hr_secondary_remotecnt == res->hr_primary_localcnt) { 332 /* Primary is out-of-date, sync from secondary. */ 333 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 334 } else { 335 /* 336 * Secondary is out-of-date or counts match. 337 * Sync from primary. 338 */ 339 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 340 } 341 } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 342 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 343 /* 344 * Not good, we have split-brain condition. 345 */ 346 free(map); 347 pjdlog_error("Split-brain detected, exiting."); 348 nv_add_string(nvout, "Split-brain condition!", "errmsg"); 349 if (hast_proto_send(res, res->hr_remotein, nvout, 350 NULL, 0) == -1) { 351 pjdlog_exit(EX_TEMPFAIL, 352 "Unable to send response to %s", 353 res->hr_remoteaddr); 354 } 355 nv_free(nvout); 356 /* Exit on split-brain. */ 357 event_send(res, EVENT_SPLITBRAIN); 358 exit(EX_CONFIG); 359 } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || 360 res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { 361 /* 362 * This should never happen in practise, but we will perform 363 * full synchronization. 364 */ 365 PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt || 366 res->hr_primary_localcnt < res->hr_secondary_remotecnt); 367 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 368 METADATA_SIZE, res->hr_extentsize, 369 res->hr_local_sectorsize); 370 memset(map, 0xff, mapsize); 371 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { 372 /* In this one of five cases sync from secondary. */ 373 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 374 } else { 375 /* For the rest four cases sync from primary. */ 376 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 377 } 378 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", 379 (uintmax_t)res->hr_primary_localcnt, 380 (uintmax_t)res->hr_primary_remotecnt, 381 (uintmax_t)res->hr_secondary_localcnt, 382 (uintmax_t)res->hr_secondary_remotecnt); 383 } 384 nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); 385 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) { 386 pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s", 387 res->hr_remoteaddr); 388 } 389 if (map != NULL) 390 free(map); 391 nv_free(nvout); 392 #ifdef notyet 393 /* Setup direction. */ 394 if (proto_recv(res->hr_remotein, NULL, 0) == -1) 395 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 396 #endif 397 } 398 399 void 400 hastd_secondary(struct hast_resource *res, struct nv *nvin) 401 { 402 sigset_t mask; 403 pthread_t td; 404 pid_t pid; 405 int error, mode, debuglevel; 406 407 /* 408 * Create communication channel between parent and child. 409 */ 410 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 411 KEEP_ERRNO((void)pidfile_remove(pfh)); 412 pjdlog_exit(EX_OSERR, 413 "Unable to create control sockets between parent and child"); 414 } 415 /* 416 * Create communication channel between child and parent. 417 */ 418 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 419 KEEP_ERRNO((void)pidfile_remove(pfh)); 420 pjdlog_exit(EX_OSERR, 421 "Unable to create event sockets between child and parent"); 422 } 423 424 pid = fork(); 425 if (pid == -1) { 426 KEEP_ERRNO((void)pidfile_remove(pfh)); 427 pjdlog_exit(EX_OSERR, "Unable to fork"); 428 } 429 430 if (pid > 0) { 431 /* This is parent. */ 432 proto_close(res->hr_remotein); 433 res->hr_remotein = NULL; 434 proto_close(res->hr_remoteout); 435 res->hr_remoteout = NULL; 436 /* Declare that we are receiver. */ 437 proto_recv(res->hr_event, NULL, 0); 438 /* Declare that we are sender. */ 439 proto_send(res->hr_ctrl, NULL, 0); 440 res->hr_workerpid = pid; 441 return; 442 } 443 444 gres = res; 445 mode = pjdlog_mode_get(); 446 debuglevel = pjdlog_debug_get(); 447 448 /* Declare that we are sender. */ 449 proto_send(res->hr_event, NULL, 0); 450 /* Declare that we are receiver. */ 451 proto_recv(res->hr_ctrl, NULL, 0); 452 descriptors_cleanup(res); 453 454 descriptors_assert(res, mode); 455 456 pjdlog_init(mode); 457 pjdlog_debug_set(debuglevel); 458 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 459 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 460 461 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 462 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 463 464 /* Error in setting timeout is not critical, but why should it fail? */ 465 if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1) 466 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 467 if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1) 468 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 469 470 init_local(res); 471 init_environment(); 472 473 if (drop_privs(res) != 0) 474 exit(EX_CONFIG); 475 pjdlog_info("Privileges successfully dropped."); 476 477 /* 478 * Create the control thread before sending any event to the parent, 479 * as we can deadlock when parent sends control request to worker, 480 * but worker has no control thread started yet, so parent waits. 481 * In the meantime worker sends an event to the parent, but parent 482 * is unable to handle the event, because it waits for control 483 * request response. 484 */ 485 error = pthread_create(&td, NULL, ctrl_thread, res); 486 PJDLOG_ASSERT(error == 0); 487 488 init_remote(res, nvin); 489 event_send(res, EVENT_CONNECT); 490 491 error = pthread_create(&td, NULL, recv_thread, res); 492 PJDLOG_ASSERT(error == 0); 493 error = pthread_create(&td, NULL, disk_thread, res); 494 PJDLOG_ASSERT(error == 0); 495 (void)send_thread(res); 496 } 497 498 static void 499 reqlog(int loglevel, int debuglevel, int error, struct hio *hio, 500 const char *fmt, ...) 501 { 502 char msg[1024]; 503 va_list ap; 504 int len; 505 506 va_start(ap, fmt); 507 len = vsnprintf(msg, sizeof(msg), fmt, ap); 508 va_end(ap); 509 if ((size_t)len < sizeof(msg)) { 510 switch (hio->hio_cmd) { 511 case HIO_READ: 512 (void)snprintf(msg + len, sizeof(msg) - len, 513 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, 514 (uintmax_t)hio->hio_length); 515 break; 516 case HIO_DELETE: 517 (void)snprintf(msg + len, sizeof(msg) - len, 518 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, 519 (uintmax_t)hio->hio_length); 520 break; 521 case HIO_FLUSH: 522 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 523 break; 524 case HIO_WRITE: 525 (void)snprintf(msg + len, sizeof(msg) - len, 526 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, 527 (uintmax_t)hio->hio_length); 528 break; 529 case HIO_KEEPALIVE: 530 (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); 531 break; 532 default: 533 (void)snprintf(msg + len, sizeof(msg) - len, 534 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); 535 break; 536 } 537 } 538 pjdlog_common(loglevel, debuglevel, error, "%s", msg); 539 } 540 541 static int 542 requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv) 543 { 544 545 hio->hio_cmd = nv_get_uint8(nv, "cmd"); 546 if (hio->hio_cmd == 0) { 547 pjdlog_error("Header contains no 'cmd' field."); 548 hio->hio_error = EINVAL; 549 goto end; 550 } 551 if (hio->hio_cmd != HIO_KEEPALIVE) { 552 hio->hio_seq = nv_get_uint64(nv, "seq"); 553 if (hio->hio_seq == 0) { 554 pjdlog_error("Header contains no 'seq' field."); 555 hio->hio_error = EINVAL; 556 goto end; 557 } 558 } 559 switch (hio->hio_cmd) { 560 case HIO_FLUSH: 561 case HIO_KEEPALIVE: 562 break; 563 case HIO_WRITE: 564 hio->hio_memsync = nv_exists(nv, "memsync"); 565 /* FALLTHROUGH */ 566 case HIO_READ: 567 case HIO_DELETE: 568 hio->hio_offset = nv_get_uint64(nv, "offset"); 569 if (nv_error(nv) != 0) { 570 pjdlog_error("Header is missing 'offset' field."); 571 hio->hio_error = EINVAL; 572 goto end; 573 } 574 hio->hio_length = nv_get_uint64(nv, "length"); 575 if (nv_error(nv) != 0) { 576 pjdlog_error("Header is missing 'length' field."); 577 hio->hio_error = EINVAL; 578 goto end; 579 } 580 if (hio->hio_length == 0) { 581 pjdlog_error("Data length is zero."); 582 hio->hio_error = EINVAL; 583 goto end; 584 } 585 if (hio->hio_cmd != HIO_DELETE && hio->hio_length > MAXPHYS) { 586 pjdlog_error("Data length is too large (%ju > %ju).", 587 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); 588 hio->hio_error = EINVAL; 589 goto end; 590 } 591 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { 592 pjdlog_error("Offset %ju is not multiple of sector size.", 593 (uintmax_t)hio->hio_offset); 594 hio->hio_error = EINVAL; 595 goto end; 596 } 597 if ((hio->hio_length % res->hr_local_sectorsize) != 0) { 598 pjdlog_error("Length %ju is not multiple of sector size.", 599 (uintmax_t)hio->hio_length); 600 hio->hio_error = EINVAL; 601 goto end; 602 } 603 if (hio->hio_offset + hio->hio_length > 604 (uint64_t)res->hr_datasize) { 605 pjdlog_error("Data offset is too large (%ju > %ju).", 606 (uintmax_t)(hio->hio_offset + hio->hio_length), 607 (uintmax_t)res->hr_datasize); 608 hio->hio_error = EINVAL; 609 goto end; 610 } 611 break; 612 default: 613 pjdlog_error("Header contains invalid 'cmd' (%hhu).", 614 hio->hio_cmd); 615 hio->hio_error = EINVAL; 616 goto end; 617 } 618 hio->hio_error = 0; 619 end: 620 return (hio->hio_error); 621 } 622 623 static __dead2 void 624 secondary_exit(int exitcode, const char *fmt, ...) 625 { 626 va_list ap; 627 628 PJDLOG_ASSERT(exitcode != EX_OK); 629 va_start(ap, fmt); 630 pjdlogv_errno(LOG_ERR, fmt, ap); 631 va_end(ap); 632 event_send(gres, EVENT_DISCONNECT); 633 exit(exitcode); 634 } 635 636 /* 637 * Thread receives requests from the primary node. 638 */ 639 static void * 640 recv_thread(void *arg) 641 { 642 struct hast_resource *res = arg; 643 struct hio *hio, *mshio; 644 struct nv *nv; 645 646 for (;;) { 647 pjdlog_debug(2, "recv: Taking free request."); 648 QUEUE_TAKE(free, hio); 649 pjdlog_debug(2, "recv: (%p) Got request.", hio); 650 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 651 secondary_exit(EX_TEMPFAIL, 652 "Unable to receive request header"); 653 } 654 if (requnpack(res, hio, nv) != 0) { 655 nv_free(nv); 656 pjdlog_debug(2, 657 "recv: (%p) Moving request to the send queue.", 658 hio); 659 QUEUE_INSERT(send, hio); 660 continue; 661 } 662 switch (hio->hio_cmd) { 663 case HIO_READ: 664 res->hr_stat_read++; 665 break; 666 case HIO_WRITE: 667 res->hr_stat_write++; 668 break; 669 case HIO_DELETE: 670 res->hr_stat_delete++; 671 break; 672 case HIO_FLUSH: 673 res->hr_stat_flush++; 674 break; 675 case HIO_KEEPALIVE: 676 break; 677 default: 678 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 679 hio->hio_cmd); 680 } 681 reqlog(LOG_DEBUG, 2, -1, hio, 682 "recv: (%p) Got request header: ", hio); 683 if (hio->hio_cmd == HIO_KEEPALIVE) { 684 nv_free(nv); 685 pjdlog_debug(2, 686 "recv: (%p) Moving request to the free queue.", 687 hio); 688 hio_clear(hio); 689 QUEUE_INSERT(free, hio); 690 continue; 691 } else if (hio->hio_cmd == HIO_WRITE) { 692 if (hast_proto_recv_data(res, res->hr_remotein, nv, 693 hio->hio_data, MAXPHYS) == -1) { 694 secondary_exit(EX_TEMPFAIL, 695 "Unable to receive request data"); 696 } 697 if (hio->hio_memsync) { 698 /* 699 * For memsync requests we expect two replies. 700 * Clone the hio so we can handle both of them. 701 */ 702 pjdlog_debug(2, "recv: Taking free request."); 703 QUEUE_TAKE(free, mshio); 704 pjdlog_debug(2, "recv: (%p) Got request.", 705 mshio); 706 hio_copy(hio, mshio); 707 mshio->hio_error = 0; 708 /* 709 * We want to keep 'memsync' tag only on the 710 * request going onto send queue (mshio). 711 */ 712 hio->hio_memsync = false; 713 pjdlog_debug(2, 714 "recv: (%p) Moving memsync request to the send queue.", 715 mshio); 716 QUEUE_INSERT(send, mshio); 717 } 718 } 719 nv_free(nv); 720 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", 721 hio); 722 QUEUE_INSERT(disk, hio); 723 } 724 /* NOTREACHED */ 725 return (NULL); 726 } 727 728 /* 729 * Thread reads from or writes to local component and also handles DELETE and 730 * FLUSH requests. 731 */ 732 static void * 733 disk_thread(void *arg) 734 { 735 struct hast_resource *res = arg; 736 struct hio *hio; 737 ssize_t ret; 738 bool clear_activemap, logerror; 739 740 clear_activemap = true; 741 742 for (;;) { 743 pjdlog_debug(2, "disk: Taking request."); 744 QUEUE_TAKE(disk, hio); 745 while (clear_activemap) { 746 unsigned char *map; 747 size_t mapsize; 748 749 /* 750 * When first request is received, it means that primary 751 * already received our activemap, merged it and stored 752 * locally. We can now safely clear our activemap. 753 */ 754 mapsize = 755 activemap_calc_ondisk_size(res->hr_local_mediasize - 756 METADATA_SIZE, res->hr_extentsize, 757 res->hr_local_sectorsize); 758 map = calloc(1, mapsize); 759 if (map == NULL) { 760 pjdlog_warning("Unable to allocate memory to clear local activemap."); 761 break; 762 } 763 if (pwrite(res->hr_localfd, map, mapsize, 764 METADATA_SIZE) != (ssize_t)mapsize) { 765 pjdlog_errno(LOG_WARNING, 766 "Unable to store cleared activemap"); 767 free(map); 768 res->hr_stat_activemap_write_error++; 769 break; 770 } 771 free(map); 772 clear_activemap = false; 773 pjdlog_debug(1, "Local activemap cleared."); 774 break; 775 } 776 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); 777 logerror = true; 778 /* Handle the actual request. */ 779 switch (hio->hio_cmd) { 780 case HIO_READ: 781 ret = pread(res->hr_localfd, hio->hio_data, 782 hio->hio_length, 783 hio->hio_offset + res->hr_localoff); 784 if (ret == -1) 785 hio->hio_error = errno; 786 else if (ret != (int64_t)hio->hio_length) 787 hio->hio_error = EIO; 788 else 789 hio->hio_error = 0; 790 break; 791 case HIO_WRITE: 792 ret = pwrite(res->hr_localfd, hio->hio_data, 793 hio->hio_length, 794 hio->hio_offset + res->hr_localoff); 795 if (ret == -1) 796 hio->hio_error = errno; 797 else if (ret != (int64_t)hio->hio_length) 798 hio->hio_error = EIO; 799 else 800 hio->hio_error = 0; 801 break; 802 case HIO_DELETE: 803 ret = g_delete(res->hr_localfd, 804 hio->hio_offset + res->hr_localoff, 805 hio->hio_length); 806 if (ret == -1) 807 hio->hio_error = errno; 808 else 809 hio->hio_error = 0; 810 break; 811 case HIO_FLUSH: 812 if (!res->hr_localflush) { 813 ret = -1; 814 hio->hio_error = EOPNOTSUPP; 815 logerror = false; 816 break; 817 } 818 ret = g_flush(res->hr_localfd); 819 if (ret == -1) { 820 if (errno == EOPNOTSUPP) 821 res->hr_localflush = false; 822 hio->hio_error = errno; 823 } else { 824 hio->hio_error = 0; 825 } 826 break; 827 default: 828 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 829 hio->hio_cmd); 830 } 831 if (logerror && hio->hio_error != 0) { 832 reqlog(LOG_ERR, 0, hio->hio_error, hio, 833 "Request failed: "); 834 } 835 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", 836 hio); 837 QUEUE_INSERT(send, hio); 838 } 839 /* NOTREACHED */ 840 return (NULL); 841 } 842 843 /* 844 * Thread sends requests back to primary node. 845 */ 846 static void * 847 send_thread(void *arg) 848 { 849 struct hast_resource *res = arg; 850 struct nv *nvout; 851 struct hio *hio; 852 void *data; 853 size_t length; 854 855 for (;;) { 856 pjdlog_debug(2, "send: Taking request."); 857 QUEUE_TAKE(send, hio); 858 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); 859 nvout = nv_alloc(); 860 /* Copy sequence number. */ 861 nv_add_uint64(nvout, hio->hio_seq, "seq"); 862 if (hio->hio_memsync) { 863 PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE); 864 nv_add_int8(nvout, 1, "received"); 865 } 866 switch (hio->hio_cmd) { 867 case HIO_READ: 868 if (hio->hio_error == 0) { 869 data = hio->hio_data; 870 length = hio->hio_length; 871 break; 872 } 873 /* 874 * We send no data in case of an error. 875 */ 876 /* FALLTHROUGH */ 877 case HIO_DELETE: 878 case HIO_FLUSH: 879 case HIO_WRITE: 880 data = NULL; 881 length = 0; 882 break; 883 default: 884 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 885 hio->hio_cmd); 886 } 887 if (hio->hio_error != 0) { 888 switch (hio->hio_cmd) { 889 case HIO_READ: 890 res->hr_stat_read_error++; 891 break; 892 case HIO_WRITE: 893 res->hr_stat_write_error++; 894 break; 895 case HIO_DELETE: 896 res->hr_stat_delete_error++; 897 break; 898 case HIO_FLUSH: 899 res->hr_stat_flush_error++; 900 break; 901 } 902 nv_add_int16(nvout, hio->hio_error, "error"); 903 } 904 if (hast_proto_send(res, res->hr_remoteout, nvout, data, 905 length) == -1) { 906 secondary_exit(EX_TEMPFAIL, "Unable to send reply"); 907 } 908 nv_free(nvout); 909 pjdlog_debug(2, "send: (%p) Moving request to the free queue.", 910 hio); 911 hio_clear(hio); 912 QUEUE_INSERT(free, hio); 913 } 914 /* NOTREACHED */ 915 return (NULL); 916 } 917