1 /*- 2 * Copyright (c) 2009-2010 The FreeBSD Foundation 3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/param.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/stat.h> 39 40 #include <err.h> 41 #include <errno.h> 42 #include <fcntl.h> 43 #include <libgeom.h> 44 #include <pthread.h> 45 #include <signal.h> 46 #include <stdint.h> 47 #include <stdio.h> 48 #include <string.h> 49 #include <sysexits.h> 50 #include <unistd.h> 51 52 #include <activemap.h> 53 #include <nv.h> 54 #include <pjdlog.h> 55 56 #include "control.h" 57 #include "event.h" 58 #include "hast.h" 59 #include "hast_proto.h" 60 #include "hastd.h" 61 #include "hooks.h" 62 #include "metadata.h" 63 #include "proto.h" 64 #include "subr.h" 65 #include "synch.h" 66 67 struct hio { 68 uint64_t hio_seq; 69 int hio_error; 70 void *hio_data; 71 uint8_t hio_cmd; 72 uint64_t hio_offset; 73 uint64_t hio_length; 74 TAILQ_ENTRY(hio) hio_next; 75 }; 76 77 static struct hast_resource *gres; 78 79 /* 80 * Free list holds unused structures. When free list is empty, we have to wait 81 * until some in-progress requests are freed. 82 */ 83 static TAILQ_HEAD(, hio) hio_free_list; 84 static pthread_mutex_t hio_free_list_lock; 85 static pthread_cond_t hio_free_list_cond; 86 /* 87 * Disk thread (the one that do I/O requests) takes requests from this list. 88 */ 89 static TAILQ_HEAD(, hio) hio_disk_list; 90 static pthread_mutex_t hio_disk_list_lock; 91 static pthread_cond_t hio_disk_list_cond; 92 /* 93 * There is one recv list for every component, although local components don't 94 * use recv lists as local requests are done synchronously. 95 */ 96 static TAILQ_HEAD(, hio) hio_send_list; 97 static pthread_mutex_t hio_send_list_lock; 98 static pthread_cond_t hio_send_list_cond; 99 100 /* 101 * Maximum number of outstanding I/O requests. 102 */ 103 #define HAST_HIO_MAX 256 104 105 static void *recv_thread(void *arg); 106 static void *disk_thread(void *arg); 107 static void *send_thread(void *arg); 108 109 #define QUEUE_INSERT(name, hio) do { \ 110 bool _wakeup; \ 111 \ 112 mtx_lock(&hio_##name##_list_lock); \ 113 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 114 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ 115 mtx_unlock(&hio_##name##_list_lock); \ 116 if (_wakeup) \ 117 cv_signal(&hio_##name##_list_cond); \ 118 } while (0) 119 #define QUEUE_TAKE(name, hio) do { \ 120 mtx_lock(&hio_##name##_list_lock); \ 121 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 122 cv_wait(&hio_##name##_list_cond, \ 123 &hio_##name##_list_lock); \ 124 } \ 125 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ 126 mtx_unlock(&hio_##name##_list_lock); \ 127 } while (0) 128 129 static void 130 hio_clear(struct hio *hio) 131 { 132 133 hio->hio_seq = 0; 134 hio->hio_error = 0; 135 hio->hio_cmd = HIO_UNDEF; 136 hio->hio_offset = 0; 137 hio->hio_length = 0; 138 } 139 140 static void 141 init_environment(void) 142 { 143 struct hio *hio; 144 unsigned int ii; 145 146 /* 147 * Initialize lists, their locks and theirs condition variables. 148 */ 149 TAILQ_INIT(&hio_free_list); 150 mtx_init(&hio_free_list_lock); 151 cv_init(&hio_free_list_cond); 152 TAILQ_INIT(&hio_disk_list); 153 mtx_init(&hio_disk_list_lock); 154 cv_init(&hio_disk_list_cond); 155 TAILQ_INIT(&hio_send_list); 156 mtx_init(&hio_send_list_lock); 157 cv_init(&hio_send_list_cond); 158 159 /* 160 * Allocate requests pool and initialize requests. 161 */ 162 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 163 hio = malloc(sizeof(*hio)); 164 if (hio == NULL) { 165 pjdlog_exitx(EX_TEMPFAIL, 166 "Unable to allocate memory (%zu bytes) for hio request.", 167 sizeof(*hio)); 168 } 169 hio->hio_data = malloc(MAXPHYS); 170 if (hio->hio_data == NULL) { 171 pjdlog_exitx(EX_TEMPFAIL, 172 "Unable to allocate memory (%zu bytes) for gctl_data.", 173 (size_t)MAXPHYS); 174 } 175 hio_clear(hio); 176 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); 177 } 178 } 179 180 static void 181 init_local(struct hast_resource *res) 182 { 183 184 if (metadata_read(res, true) == -1) 185 exit(EX_NOINPUT); 186 } 187 188 static void 189 init_remote(struct hast_resource *res, struct nv *nvin) 190 { 191 uint64_t resuid; 192 struct nv *nvout; 193 unsigned char *map; 194 size_t mapsize; 195 196 #ifdef notyet 197 /* Setup direction. */ 198 if (proto_send(res->hr_remoteout, NULL, 0) == -1) 199 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 200 #endif 201 202 nvout = nv_alloc(); 203 nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); 204 nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); 205 resuid = nv_get_uint64(nvin, "resuid"); 206 res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); 207 res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 208 nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); 209 nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); 210 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 211 METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); 212 map = malloc(mapsize); 213 if (map == NULL) { 214 pjdlog_exitx(EX_TEMPFAIL, 215 "Unable to allocate memory (%zu bytes) for activemap.", 216 mapsize); 217 } 218 /* 219 * When we work as primary and secondary is missing we will increase 220 * localcnt in our metadata. When secondary is connected and synced 221 * we make localcnt be equal to remotecnt, which means nodes are more 222 * or less in sync. 223 * Split-brain condition is when both nodes are not able to communicate 224 * and are both configured as primary nodes. In turn, they can both 225 * make incompatible changes to the data and we have to detect that. 226 * Under split-brain condition we will increase our localcnt on first 227 * write and remote node will increase its localcnt on first write. 228 * When we connect we can see that primary's localcnt is greater than 229 * our remotecnt (primary was modified while we weren't watching) and 230 * our localcnt is greater than primary's remotecnt (we were modified 231 * while primary wasn't watching). 232 * There are many possible combinations which are all gathered below. 233 * Don't pay too much attention to exact numbers, the more important 234 * is to compare them. We compare secondary's local with primary's 235 * remote and secondary's remote with primary's local. 236 * Note that every case where primary's localcnt is smaller than 237 * secondary's remotecnt and where secondary's localcnt is smaller than 238 * primary's remotecnt should be impossible in practise. We will perform 239 * full synchronization then. Those cases are marked with an asterisk. 240 * Regular synchronization means that only extents marked as dirty are 241 * synchronized (regular synchronization). 242 * 243 * SECONDARY METADATA PRIMARY METADATA 244 * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. 245 * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. 246 * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. 247 * local=3 remote=3 local=3 remote=2 Primary is out-of-date, 248 * regular sync from secondary. 249 * local=3 remote=3 local=3 remote=3 Regular sync just in case. 250 * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. 251 * local=3 remote=3 local=4 remote=2 Split-brain condition. 252 * local=3 remote=3 local=4 remote=3 Secondary out-of-date, 253 * regular sync from primary. 254 * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. 255 */ 256 if (res->hr_resuid == 0) { 257 /* 258 * Provider is used for the first time. If primary node done no 259 * writes yet as well (we will find "virgin" argument) then 260 * there is no need to synchronize anything. If primary node 261 * done any writes already we have to synchronize everything. 262 */ 263 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 264 res->hr_resuid = resuid; 265 if (metadata_write(res) == -1) 266 exit(EX_NOINPUT); 267 if (nv_exists(nvin, "virgin")) { 268 free(map); 269 map = NULL; 270 mapsize = 0; 271 } else { 272 memset(map, 0xff, mapsize); 273 } 274 nv_add_int8(nvout, 1, "virgin"); 275 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 276 } else if (res->hr_resuid != resuid) { 277 char errmsg[256]; 278 279 free(map); 280 (void)snprintf(errmsg, sizeof(errmsg), 281 "Resource unique ID mismatch (primary=%ju, secondary=%ju).", 282 (uintmax_t)resuid, (uintmax_t)res->hr_resuid); 283 pjdlog_error("%s", errmsg); 284 nv_add_string(nvout, errmsg, "errmsg"); 285 if (hast_proto_send(res, res->hr_remotein, nvout, 286 NULL, 0) == -1) { 287 pjdlog_exit(EX_TEMPFAIL, 288 "Unable to send response to %s", 289 res->hr_remoteaddr); 290 } 291 nv_free(nvout); 292 exit(EX_CONFIG); 293 } else if ( 294 /* Is primary out-of-date? */ 295 (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 296 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 297 /* Are the nodes more or less in sync? */ 298 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 299 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 300 /* Is secondary out-of-date? */ 301 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 302 res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { 303 /* 304 * Nodes are more or less in sync or one of the nodes is 305 * out-of-date. 306 * It doesn't matter at this point which one, we just have to 307 * send out local bitmap to the remote node. 308 */ 309 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != 310 (ssize_t)mapsize) { 311 pjdlog_exit(LOG_ERR, "Unable to read activemap"); 312 } 313 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 314 res->hr_secondary_remotecnt == res->hr_primary_localcnt) { 315 /* Primary is out-of-date, sync from secondary. */ 316 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 317 } else { 318 /* 319 * Secondary is out-of-date or counts match. 320 * Sync from primary. 321 */ 322 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 323 } 324 } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 325 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 326 /* 327 * Not good, we have split-brain condition. 328 */ 329 free(map); 330 pjdlog_error("Split-brain detected, exiting."); 331 nv_add_string(nvout, "Split-brain condition!", "errmsg"); 332 if (hast_proto_send(res, res->hr_remotein, nvout, 333 NULL, 0) == -1) { 334 pjdlog_exit(EX_TEMPFAIL, 335 "Unable to send response to %s", 336 res->hr_remoteaddr); 337 } 338 nv_free(nvout); 339 /* Exit on split-brain. */ 340 event_send(res, EVENT_SPLITBRAIN); 341 exit(EX_CONFIG); 342 } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || 343 res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { 344 /* 345 * This should never happen in practise, but we will perform 346 * full synchronization. 347 */ 348 PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt || 349 res->hr_primary_localcnt < res->hr_secondary_remotecnt); 350 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 351 METADATA_SIZE, res->hr_extentsize, 352 res->hr_local_sectorsize); 353 memset(map, 0xff, mapsize); 354 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { 355 /* In this one of five cases sync from secondary. */ 356 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 357 } else { 358 /* For the rest four cases sync from primary. */ 359 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 360 } 361 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", 362 (uintmax_t)res->hr_primary_localcnt, 363 (uintmax_t)res->hr_primary_remotecnt, 364 (uintmax_t)res->hr_secondary_localcnt, 365 (uintmax_t)res->hr_secondary_remotecnt); 366 } 367 nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); 368 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) { 369 pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s", 370 res->hr_remoteaddr); 371 } 372 if (map != NULL) 373 free(map); 374 nv_free(nvout); 375 #ifdef notyet 376 /* Setup direction. */ 377 if (proto_recv(res->hr_remotein, NULL, 0) == -1) 378 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 379 #endif 380 } 381 382 void 383 hastd_secondary(struct hast_resource *res, struct nv *nvin) 384 { 385 sigset_t mask; 386 pthread_t td; 387 pid_t pid; 388 int error, mode, debuglevel; 389 390 /* 391 * Create communication channel between parent and child. 392 */ 393 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 394 KEEP_ERRNO((void)pidfile_remove(pfh)); 395 pjdlog_exit(EX_OSERR, 396 "Unable to create control sockets between parent and child"); 397 } 398 /* 399 * Create communication channel between child and parent. 400 */ 401 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 402 KEEP_ERRNO((void)pidfile_remove(pfh)); 403 pjdlog_exit(EX_OSERR, 404 "Unable to create event sockets between child and parent"); 405 } 406 407 pid = fork(); 408 if (pid == -1) { 409 KEEP_ERRNO((void)pidfile_remove(pfh)); 410 pjdlog_exit(EX_OSERR, "Unable to fork"); 411 } 412 413 if (pid > 0) { 414 /* This is parent. */ 415 proto_close(res->hr_remotein); 416 res->hr_remotein = NULL; 417 proto_close(res->hr_remoteout); 418 res->hr_remoteout = NULL; 419 /* Declare that we are receiver. */ 420 proto_recv(res->hr_event, NULL, 0); 421 /* Declare that we are sender. */ 422 proto_send(res->hr_ctrl, NULL, 0); 423 res->hr_workerpid = pid; 424 return; 425 } 426 427 gres = res; 428 mode = pjdlog_mode_get(); 429 debuglevel = pjdlog_debug_get(); 430 431 /* Declare that we are sender. */ 432 proto_send(res->hr_event, NULL, 0); 433 /* Declare that we are receiver. */ 434 proto_recv(res->hr_ctrl, NULL, 0); 435 descriptors_cleanup(res); 436 437 descriptors_assert(res, mode); 438 439 pjdlog_init(mode); 440 pjdlog_debug_set(debuglevel); 441 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 442 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 443 444 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 445 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 446 447 /* Error in setting timeout is not critical, but why should it fail? */ 448 if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1) 449 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 450 if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1) 451 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 452 453 init_local(res); 454 init_environment(); 455 456 if (drop_privs(res) != 0) 457 exit(EX_CONFIG); 458 pjdlog_info("Privileges successfully dropped."); 459 460 /* 461 * Create the control thread before sending any event to the parent, 462 * as we can deadlock when parent sends control request to worker, 463 * but worker has no control thread started yet, so parent waits. 464 * In the meantime worker sends an event to the parent, but parent 465 * is unable to handle the event, because it waits for control 466 * request response. 467 */ 468 error = pthread_create(&td, NULL, ctrl_thread, res); 469 PJDLOG_ASSERT(error == 0); 470 471 init_remote(res, nvin); 472 event_send(res, EVENT_CONNECT); 473 474 error = pthread_create(&td, NULL, recv_thread, res); 475 PJDLOG_ASSERT(error == 0); 476 error = pthread_create(&td, NULL, disk_thread, res); 477 PJDLOG_ASSERT(error == 0); 478 (void)send_thread(res); 479 } 480 481 static void 482 reqlog(int loglevel, int debuglevel, int error, struct hio *hio, 483 const char *fmt, ...) 484 { 485 char msg[1024]; 486 va_list ap; 487 int len; 488 489 va_start(ap, fmt); 490 len = vsnprintf(msg, sizeof(msg), fmt, ap); 491 va_end(ap); 492 if ((size_t)len < sizeof(msg)) { 493 switch (hio->hio_cmd) { 494 case HIO_READ: 495 (void)snprintf(msg + len, sizeof(msg) - len, 496 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, 497 (uintmax_t)hio->hio_length); 498 break; 499 case HIO_DELETE: 500 (void)snprintf(msg + len, sizeof(msg) - len, 501 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, 502 (uintmax_t)hio->hio_length); 503 break; 504 case HIO_FLUSH: 505 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 506 break; 507 case HIO_WRITE: 508 (void)snprintf(msg + len, sizeof(msg) - len, 509 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, 510 (uintmax_t)hio->hio_length); 511 break; 512 case HIO_KEEPALIVE: 513 (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); 514 break; 515 default: 516 (void)snprintf(msg + len, sizeof(msg) - len, 517 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); 518 break; 519 } 520 } 521 pjdlog_common(loglevel, debuglevel, error, "%s", msg); 522 } 523 524 static int 525 requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv) 526 { 527 528 hio->hio_cmd = nv_get_uint8(nv, "cmd"); 529 if (hio->hio_cmd == 0) { 530 pjdlog_error("Header contains no 'cmd' field."); 531 hio->hio_error = EINVAL; 532 goto end; 533 } 534 if (hio->hio_cmd != HIO_KEEPALIVE) { 535 hio->hio_seq = nv_get_uint64(nv, "seq"); 536 if (hio->hio_seq == 0) { 537 pjdlog_error("Header contains no 'seq' field."); 538 hio->hio_error = EINVAL; 539 goto end; 540 } 541 } 542 switch (hio->hio_cmd) { 543 case HIO_FLUSH: 544 case HIO_KEEPALIVE: 545 break; 546 case HIO_READ: 547 case HIO_WRITE: 548 case HIO_DELETE: 549 hio->hio_offset = nv_get_uint64(nv, "offset"); 550 if (nv_error(nv) != 0) { 551 pjdlog_error("Header is missing 'offset' field."); 552 hio->hio_error = EINVAL; 553 goto end; 554 } 555 hio->hio_length = nv_get_uint64(nv, "length"); 556 if (nv_error(nv) != 0) { 557 pjdlog_error("Header is missing 'length' field."); 558 hio->hio_error = EINVAL; 559 goto end; 560 } 561 if (hio->hio_length == 0) { 562 pjdlog_error("Data length is zero."); 563 hio->hio_error = EINVAL; 564 goto end; 565 } 566 if (hio->hio_length > MAXPHYS) { 567 pjdlog_error("Data length is too large (%ju > %ju).", 568 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); 569 hio->hio_error = EINVAL; 570 goto end; 571 } 572 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { 573 pjdlog_error("Offset %ju is not multiple of sector size.", 574 (uintmax_t)hio->hio_offset); 575 hio->hio_error = EINVAL; 576 goto end; 577 } 578 if ((hio->hio_length % res->hr_local_sectorsize) != 0) { 579 pjdlog_error("Length %ju is not multiple of sector size.", 580 (uintmax_t)hio->hio_length); 581 hio->hio_error = EINVAL; 582 goto end; 583 } 584 if (hio->hio_offset + hio->hio_length > 585 (uint64_t)res->hr_datasize) { 586 pjdlog_error("Data offset is too large (%ju > %ju).", 587 (uintmax_t)(hio->hio_offset + hio->hio_length), 588 (uintmax_t)res->hr_datasize); 589 hio->hio_error = EINVAL; 590 goto end; 591 } 592 break; 593 default: 594 pjdlog_error("Header contains invalid 'cmd' (%hhu).", 595 hio->hio_cmd); 596 hio->hio_error = EINVAL; 597 goto end; 598 } 599 hio->hio_error = 0; 600 end: 601 return (hio->hio_error); 602 } 603 604 static __dead2 void 605 secondary_exit(int exitcode, const char *fmt, ...) 606 { 607 va_list ap; 608 609 PJDLOG_ASSERT(exitcode != EX_OK); 610 va_start(ap, fmt); 611 pjdlogv_errno(LOG_ERR, fmt, ap); 612 va_end(ap); 613 event_send(gres, EVENT_DISCONNECT); 614 exit(exitcode); 615 } 616 617 /* 618 * Thread receives requests from the primary node. 619 */ 620 static void * 621 recv_thread(void *arg) 622 { 623 struct hast_resource *res = arg; 624 struct hio *hio; 625 struct nv *nv; 626 627 for (;;) { 628 pjdlog_debug(2, "recv: Taking free request."); 629 QUEUE_TAKE(free, hio); 630 pjdlog_debug(2, "recv: (%p) Got request.", hio); 631 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 632 secondary_exit(EX_TEMPFAIL, 633 "Unable to receive request header"); 634 } 635 if (requnpack(res, hio, nv) != 0) { 636 nv_free(nv); 637 pjdlog_debug(2, 638 "recv: (%p) Moving request to the send queue.", 639 hio); 640 QUEUE_INSERT(send, hio); 641 continue; 642 } 643 switch (hio->hio_cmd) { 644 case HIO_READ: 645 res->hr_stat_read++; 646 break; 647 case HIO_WRITE: 648 res->hr_stat_write++; 649 break; 650 case HIO_DELETE: 651 res->hr_stat_delete++; 652 break; 653 case HIO_FLUSH: 654 res->hr_stat_flush++; 655 break; 656 case HIO_KEEPALIVE: 657 break; 658 default: 659 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 660 hio->hio_cmd); 661 } 662 reqlog(LOG_DEBUG, 2, -1, hio, 663 "recv: (%p) Got request header: ", hio); 664 if (hio->hio_cmd == HIO_KEEPALIVE) { 665 nv_free(nv); 666 pjdlog_debug(2, 667 "recv: (%p) Moving request to the free queue.", 668 hio); 669 hio_clear(hio); 670 QUEUE_INSERT(free, hio); 671 continue; 672 } else if (hio->hio_cmd == HIO_WRITE) { 673 if (hast_proto_recv_data(res, res->hr_remotein, nv, 674 hio->hio_data, MAXPHYS) == -1) { 675 secondary_exit(EX_TEMPFAIL, 676 "Unable to receive request data"); 677 } 678 } 679 nv_free(nv); 680 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", 681 hio); 682 QUEUE_INSERT(disk, hio); 683 } 684 /* NOTREACHED */ 685 return (NULL); 686 } 687 688 /* 689 * Thread reads from or writes to local component and also handles DELETE and 690 * FLUSH requests. 691 */ 692 static void * 693 disk_thread(void *arg) 694 { 695 struct hast_resource *res = arg; 696 struct hio *hio; 697 ssize_t ret; 698 bool clear_activemap, logerror; 699 700 clear_activemap = true; 701 702 for (;;) { 703 pjdlog_debug(2, "disk: Taking request."); 704 QUEUE_TAKE(disk, hio); 705 while (clear_activemap) { 706 unsigned char *map; 707 size_t mapsize; 708 709 /* 710 * When first request is received, it means that primary 711 * already received our activemap, merged it and stored 712 * locally. We can now safely clear our activemap. 713 */ 714 mapsize = 715 activemap_calc_ondisk_size(res->hr_local_mediasize - 716 METADATA_SIZE, res->hr_extentsize, 717 res->hr_local_sectorsize); 718 map = calloc(1, mapsize); 719 if (map == NULL) { 720 pjdlog_warning("Unable to allocate memory to clear local activemap."); 721 break; 722 } 723 if (pwrite(res->hr_localfd, map, mapsize, 724 METADATA_SIZE) != (ssize_t)mapsize) { 725 pjdlog_errno(LOG_WARNING, 726 "Unable to store cleared activemap"); 727 free(map); 728 break; 729 } 730 free(map); 731 clear_activemap = false; 732 pjdlog_debug(1, "Local activemap cleared."); 733 break; 734 } 735 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); 736 logerror = true; 737 /* Handle the actual request. */ 738 switch (hio->hio_cmd) { 739 case HIO_READ: 740 ret = pread(res->hr_localfd, hio->hio_data, 741 hio->hio_length, 742 hio->hio_offset + res->hr_localoff); 743 if (ret == -1) 744 hio->hio_error = errno; 745 else if (ret != (int64_t)hio->hio_length) 746 hio->hio_error = EIO; 747 else 748 hio->hio_error = 0; 749 break; 750 case HIO_WRITE: 751 ret = pwrite(res->hr_localfd, hio->hio_data, 752 hio->hio_length, 753 hio->hio_offset + res->hr_localoff); 754 if (ret == -1) 755 hio->hio_error = errno; 756 else if (ret != (int64_t)hio->hio_length) 757 hio->hio_error = EIO; 758 else 759 hio->hio_error = 0; 760 break; 761 case HIO_DELETE: 762 ret = g_delete(res->hr_localfd, 763 hio->hio_offset + res->hr_localoff, 764 hio->hio_length); 765 if (ret == -1) 766 hio->hio_error = errno; 767 else 768 hio->hio_error = 0; 769 break; 770 case HIO_FLUSH: 771 if (!res->hr_localflush) { 772 ret = -1; 773 hio->hio_error = EOPNOTSUPP; 774 logerror = false; 775 break; 776 } 777 ret = g_flush(res->hr_localfd); 778 if (ret == -1) { 779 if (errno == EOPNOTSUPP) 780 res->hr_localflush = false; 781 hio->hio_error = errno; 782 } else { 783 hio->hio_error = 0; 784 } 785 break; 786 default: 787 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 788 hio->hio_cmd); 789 } 790 if (logerror && hio->hio_error != 0) { 791 reqlog(LOG_ERR, 0, hio->hio_error, hio, 792 "Request failed: "); 793 } 794 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", 795 hio); 796 QUEUE_INSERT(send, hio); 797 } 798 /* NOTREACHED */ 799 return (NULL); 800 } 801 802 /* 803 * Thread sends requests back to primary node. 804 */ 805 static void * 806 send_thread(void *arg) 807 { 808 struct hast_resource *res = arg; 809 struct nv *nvout; 810 struct hio *hio; 811 void *data; 812 size_t length; 813 814 for (;;) { 815 pjdlog_debug(2, "send: Taking request."); 816 QUEUE_TAKE(send, hio); 817 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); 818 nvout = nv_alloc(); 819 /* Copy sequence number. */ 820 nv_add_uint64(nvout, hio->hio_seq, "seq"); 821 switch (hio->hio_cmd) { 822 case HIO_READ: 823 if (hio->hio_error == 0) { 824 data = hio->hio_data; 825 length = hio->hio_length; 826 break; 827 } 828 /* 829 * We send no data in case of an error. 830 */ 831 /* FALLTHROUGH */ 832 case HIO_DELETE: 833 case HIO_FLUSH: 834 case HIO_WRITE: 835 data = NULL; 836 length = 0; 837 break; 838 default: 839 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 840 hio->hio_cmd); 841 } 842 if (hio->hio_error != 0) 843 nv_add_int16(nvout, hio->hio_error, "error"); 844 if (hast_proto_send(res, res->hr_remoteout, nvout, data, 845 length) == -1) { 846 secondary_exit(EX_TEMPFAIL, "Unable to send reply"); 847 } 848 nv_free(nvout); 849 pjdlog_debug(2, "send: (%p) Moving request to the free queue.", 850 hio); 851 hio_clear(hio); 852 QUEUE_INSERT(free, hio); 853 } 854 /* NOTREACHED */ 855 return (NULL); 856 } 857