1 /*- 2 * Copyright (c) 2009-2010 The FreeBSD Foundation 3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/param.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/stat.h> 39 40 #include <err.h> 41 #include <errno.h> 42 #include <fcntl.h> 43 #include <libgeom.h> 44 #include <pthread.h> 45 #include <signal.h> 46 #include <stdint.h> 47 #include <stdio.h> 48 #include <string.h> 49 #include <sysexits.h> 50 #include <unistd.h> 51 52 #include <activemap.h> 53 #include <nv.h> 54 #include <pjdlog.h> 55 56 #include "control.h" 57 #include "event.h" 58 #include "hast.h" 59 #include "hast_proto.h" 60 #include "hastd.h" 61 #include "hooks.h" 62 #include "metadata.h" 63 #include "proto.h" 64 #include "subr.h" 65 #include "synch.h" 66 67 struct hio { 68 uint64_t hio_seq; 69 int hio_error; 70 void *hio_data; 71 uint8_t hio_cmd; 72 uint64_t hio_offset; 73 uint64_t hio_length; 74 TAILQ_ENTRY(hio) hio_next; 75 }; 76 77 static struct hast_resource *gres; 78 79 /* 80 * Free list holds unused structures. When free list is empty, we have to wait 81 * until some in-progress requests are freed. 82 */ 83 static TAILQ_HEAD(, hio) hio_free_list; 84 static pthread_mutex_t hio_free_list_lock; 85 static pthread_cond_t hio_free_list_cond; 86 /* 87 * Disk thread (the one that do I/O requests) takes requests from this list. 88 */ 89 static TAILQ_HEAD(, hio) hio_disk_list; 90 static pthread_mutex_t hio_disk_list_lock; 91 static pthread_cond_t hio_disk_list_cond; 92 /* 93 * There is one recv list for every component, although local components don't 94 * use recv lists as local requests are done synchronously. 95 */ 96 static TAILQ_HEAD(, hio) hio_send_list; 97 static pthread_mutex_t hio_send_list_lock; 98 static pthread_cond_t hio_send_list_cond; 99 100 /* 101 * Maximum number of outstanding I/O requests. 102 */ 103 #define HAST_HIO_MAX 256 104 105 static void *recv_thread(void *arg); 106 static void *disk_thread(void *arg); 107 static void *send_thread(void *arg); 108 109 #define QUEUE_INSERT(name, hio) do { \ 110 bool _wakeup; \ 111 \ 112 mtx_lock(&hio_##name##_list_lock); \ 113 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 114 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ 115 mtx_unlock(&hio_##name##_list_lock); \ 116 if (_wakeup) \ 117 cv_signal(&hio_##name##_list_cond); \ 118 } while (0) 119 #define QUEUE_TAKE(name, hio) do { \ 120 mtx_lock(&hio_##name##_list_lock); \ 121 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 122 cv_wait(&hio_##name##_list_cond, \ 123 &hio_##name##_list_lock); \ 124 } \ 125 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ 126 mtx_unlock(&hio_##name##_list_lock); \ 127 } while (0) 128 129 static void 130 hio_clear(struct hio *hio) 131 { 132 133 hio->hio_seq = 0; 134 hio->hio_error = 0; 135 hio->hio_cmd = HIO_UNDEF; 136 hio->hio_offset = 0; 137 hio->hio_length = 0; 138 } 139 140 static void 141 init_environment(void) 142 { 143 struct hio *hio; 144 unsigned int ii; 145 146 /* 147 * Initialize lists, their locks and theirs condition variables. 148 */ 149 TAILQ_INIT(&hio_free_list); 150 mtx_init(&hio_free_list_lock); 151 cv_init(&hio_free_list_cond); 152 TAILQ_INIT(&hio_disk_list); 153 mtx_init(&hio_disk_list_lock); 154 cv_init(&hio_disk_list_cond); 155 TAILQ_INIT(&hio_send_list); 156 mtx_init(&hio_send_list_lock); 157 cv_init(&hio_send_list_cond); 158 159 /* 160 * Allocate requests pool and initialize requests. 161 */ 162 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 163 hio = malloc(sizeof(*hio)); 164 if (hio == NULL) { 165 pjdlog_exitx(EX_TEMPFAIL, 166 "Unable to allocate memory (%zu bytes) for hio request.", 167 sizeof(*hio)); 168 } 169 hio->hio_data = malloc(MAXPHYS); 170 if (hio->hio_data == NULL) { 171 pjdlog_exitx(EX_TEMPFAIL, 172 "Unable to allocate memory (%zu bytes) for gctl_data.", 173 (size_t)MAXPHYS); 174 } 175 hio_clear(hio); 176 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); 177 } 178 } 179 180 static void 181 init_local(struct hast_resource *res) 182 { 183 184 if (metadata_read(res, true) == -1) 185 exit(EX_NOINPUT); 186 } 187 188 static void 189 init_remote(struct hast_resource *res, struct nv *nvin) 190 { 191 uint64_t resuid; 192 struct nv *nvout; 193 unsigned char *map; 194 size_t mapsize; 195 196 #ifdef notyet 197 /* Setup direction. */ 198 if (proto_send(res->hr_remoteout, NULL, 0) == -1) 199 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 200 #endif 201 202 nvout = nv_alloc(); 203 nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); 204 nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); 205 resuid = nv_get_uint64(nvin, "resuid"); 206 res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); 207 res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 208 nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); 209 nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); 210 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 211 METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); 212 map = malloc(mapsize); 213 if (map == NULL) { 214 pjdlog_exitx(EX_TEMPFAIL, 215 "Unable to allocate memory (%zu bytes) for activemap.", 216 mapsize); 217 } 218 /* 219 * When we work as primary and secondary is missing we will increase 220 * localcnt in our metadata. When secondary is connected and synced 221 * we make localcnt be equal to remotecnt, which means nodes are more 222 * or less in sync. 223 * Split-brain condition is when both nodes are not able to communicate 224 * and are both configured as primary nodes. In turn, they can both 225 * make incompatible changes to the data and we have to detect that. 226 * Under split-brain condition we will increase our localcnt on first 227 * write and remote node will increase its localcnt on first write. 228 * When we connect we can see that primary's localcnt is greater than 229 * our remotecnt (primary was modified while we weren't watching) and 230 * our localcnt is greater than primary's remotecnt (we were modified 231 * while primary wasn't watching). 232 * There are many possible combinations which are all gathered below. 233 * Don't pay too much attention to exact numbers, the more important 234 * is to compare them. We compare secondary's local with primary's 235 * remote and secondary's remote with primary's local. 236 * Note that every case where primary's localcnt is smaller than 237 * secondary's remotecnt and where secondary's localcnt is smaller than 238 * primary's remotecnt should be impossible in practise. We will perform 239 * full synchronization then. Those cases are marked with an asterisk. 240 * Regular synchronization means that only extents marked as dirty are 241 * synchronized (regular synchronization). 242 * 243 * SECONDARY METADATA PRIMARY METADATA 244 * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. 245 * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. 246 * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. 247 * local=3 remote=3 local=3 remote=2 Primary is out-of-date, 248 * regular sync from secondary. 249 * local=3 remote=3 local=3 remote=3 Regular sync just in case. 250 * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. 251 * local=3 remote=3 local=4 remote=2 Split-brain condition. 252 * local=3 remote=3 local=4 remote=3 Secondary out-of-date, 253 * regular sync from primary. 254 * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. 255 */ 256 if (res->hr_resuid == 0) { 257 /* 258 * Provider is used for the first time. If primary node done no 259 * writes yet as well (we will find "virgin" argument) then 260 * there is no need to synchronize anything. If primary node 261 * done any writes already we have to synchronize everything. 262 */ 263 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 264 res->hr_resuid = resuid; 265 if (metadata_write(res) == -1) 266 exit(EX_NOINPUT); 267 if (nv_exists(nvin, "virgin")) { 268 free(map); 269 map = NULL; 270 mapsize = 0; 271 } else { 272 memset(map, 0xff, mapsize); 273 } 274 nv_add_int8(nvout, 1, "virgin"); 275 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 276 } else if (res->hr_resuid != resuid) { 277 char errmsg[256]; 278 279 free(map); 280 (void)snprintf(errmsg, sizeof(errmsg), 281 "Resource unique ID mismatch (primary=%ju, secondary=%ju).", 282 (uintmax_t)resuid, (uintmax_t)res->hr_resuid); 283 pjdlog_error("%s", errmsg); 284 nv_add_string(nvout, errmsg, "errmsg"); 285 if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) == -1) { 286 pjdlog_exit(EX_TEMPFAIL, "Unable to send response to %s", 287 res->hr_remoteaddr); 288 } 289 nv_free(nvout); 290 exit(EX_CONFIG); 291 } else if ( 292 /* Is primary out-of-date? */ 293 (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 294 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 295 /* Are the nodes more or less in sync? */ 296 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 297 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 298 /* Is secondary out-of-date? */ 299 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 300 res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { 301 /* 302 * Nodes are more or less in sync or one of the nodes is 303 * out-of-date. 304 * It doesn't matter at this point which one, we just have to 305 * send out local bitmap to the remote node. 306 */ 307 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != 308 (ssize_t)mapsize) { 309 pjdlog_exit(LOG_ERR, "Unable to read activemap"); 310 } 311 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 312 res->hr_secondary_remotecnt == res->hr_primary_localcnt) { 313 /* Primary is out-of-date, sync from secondary. */ 314 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 315 } else { 316 /* 317 * Secondary is out-of-date or counts match. 318 * Sync from primary. 319 */ 320 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 321 } 322 } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 323 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 324 /* 325 * Not good, we have split-brain condition. 326 */ 327 free(map); 328 pjdlog_error("Split-brain detected, exiting."); 329 nv_add_string(nvout, "Split-brain condition!", "errmsg"); 330 if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) == -1) { 331 pjdlog_exit(EX_TEMPFAIL, "Unable to send response to %s", 332 res->hr_remoteaddr); 333 } 334 nv_free(nvout); 335 /* Exit on split-brain. */ 336 event_send(res, EVENT_SPLITBRAIN); 337 exit(EX_CONFIG); 338 } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || 339 res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { 340 /* 341 * This should never happen in practise, but we will perform 342 * full synchronization. 343 */ 344 PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt || 345 res->hr_primary_localcnt < res->hr_secondary_remotecnt); 346 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 347 METADATA_SIZE, res->hr_extentsize, 348 res->hr_local_sectorsize); 349 memset(map, 0xff, mapsize); 350 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { 351 /* In this one of five cases sync from secondary. */ 352 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 353 } else { 354 /* For the rest four cases sync from primary. */ 355 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 356 } 357 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", 358 (uintmax_t)res->hr_primary_localcnt, 359 (uintmax_t)res->hr_primary_remotecnt, 360 (uintmax_t)res->hr_secondary_localcnt, 361 (uintmax_t)res->hr_secondary_remotecnt); 362 } 363 nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); 364 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) { 365 pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s", 366 res->hr_remoteaddr); 367 } 368 if (map != NULL) 369 free(map); 370 nv_free(nvout); 371 #ifdef notyet 372 /* Setup direction. */ 373 if (proto_recv(res->hr_remotein, NULL, 0) == -1) 374 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 375 #endif 376 } 377 378 void 379 hastd_secondary(struct hast_resource *res, struct nv *nvin) 380 { 381 sigset_t mask; 382 pthread_t td; 383 pid_t pid; 384 int error, mode, debuglevel; 385 386 /* 387 * Create communication channel between parent and child. 388 */ 389 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 390 KEEP_ERRNO((void)pidfile_remove(pfh)); 391 pjdlog_exit(EX_OSERR, 392 "Unable to create control sockets between parent and child"); 393 } 394 /* 395 * Create communication channel between child and parent. 396 */ 397 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 398 KEEP_ERRNO((void)pidfile_remove(pfh)); 399 pjdlog_exit(EX_OSERR, 400 "Unable to create event sockets between child and parent"); 401 } 402 403 pid = fork(); 404 if (pid == -1) { 405 KEEP_ERRNO((void)pidfile_remove(pfh)); 406 pjdlog_exit(EX_OSERR, "Unable to fork"); 407 } 408 409 if (pid > 0) { 410 /* This is parent. */ 411 proto_close(res->hr_remotein); 412 res->hr_remotein = NULL; 413 proto_close(res->hr_remoteout); 414 res->hr_remoteout = NULL; 415 /* Declare that we are receiver. */ 416 proto_recv(res->hr_event, NULL, 0); 417 /* Declare that we are sender. */ 418 proto_send(res->hr_ctrl, NULL, 0); 419 res->hr_workerpid = pid; 420 return; 421 } 422 423 gres = res; 424 mode = pjdlog_mode_get(); 425 debuglevel = pjdlog_debug_get(); 426 427 /* Declare that we are sender. */ 428 proto_send(res->hr_event, NULL, 0); 429 /* Declare that we are receiver. */ 430 proto_recv(res->hr_ctrl, NULL, 0); 431 descriptors_cleanup(res); 432 433 descriptors_assert(res, mode); 434 435 pjdlog_init(mode); 436 pjdlog_debug_set(debuglevel); 437 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 438 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 439 440 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 441 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 442 443 /* Error in setting timeout is not critical, but why should it fail? */ 444 if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1) 445 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 446 if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1) 447 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 448 449 init_local(res); 450 init_environment(); 451 452 if (drop_privs(res) != 0) 453 exit(EX_CONFIG); 454 pjdlog_info("Privileges successfully dropped."); 455 456 /* 457 * Create the control thread before sending any event to the parent, 458 * as we can deadlock when parent sends control request to worker, 459 * but worker has no control thread started yet, so parent waits. 460 * In the meantime worker sends an event to the parent, but parent 461 * is unable to handle the event, because it waits for control 462 * request response. 463 */ 464 error = pthread_create(&td, NULL, ctrl_thread, res); 465 PJDLOG_ASSERT(error == 0); 466 467 init_remote(res, nvin); 468 event_send(res, EVENT_CONNECT); 469 470 error = pthread_create(&td, NULL, recv_thread, res); 471 PJDLOG_ASSERT(error == 0); 472 error = pthread_create(&td, NULL, disk_thread, res); 473 PJDLOG_ASSERT(error == 0); 474 (void)send_thread(res); 475 } 476 477 static void 478 reqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...) 479 { 480 char msg[1024]; 481 va_list ap; 482 int len; 483 484 va_start(ap, fmt); 485 len = vsnprintf(msg, sizeof(msg), fmt, ap); 486 va_end(ap); 487 if ((size_t)len < sizeof(msg)) { 488 switch (hio->hio_cmd) { 489 case HIO_READ: 490 (void)snprintf(msg + len, sizeof(msg) - len, 491 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, 492 (uintmax_t)hio->hio_length); 493 break; 494 case HIO_DELETE: 495 (void)snprintf(msg + len, sizeof(msg) - len, 496 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, 497 (uintmax_t)hio->hio_length); 498 break; 499 case HIO_FLUSH: 500 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 501 break; 502 case HIO_WRITE: 503 (void)snprintf(msg + len, sizeof(msg) - len, 504 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, 505 (uintmax_t)hio->hio_length); 506 break; 507 case HIO_KEEPALIVE: 508 (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); 509 break; 510 default: 511 (void)snprintf(msg + len, sizeof(msg) - len, 512 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); 513 break; 514 } 515 } 516 pjdlog_common(loglevel, debuglevel, error, "%s", msg); 517 } 518 519 static int 520 requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv) 521 { 522 523 hio->hio_cmd = nv_get_uint8(nv, "cmd"); 524 if (hio->hio_cmd == 0) { 525 pjdlog_error("Header contains no 'cmd' field."); 526 hio->hio_error = EINVAL; 527 goto end; 528 } 529 if (hio->hio_cmd != HIO_KEEPALIVE) { 530 hio->hio_seq = nv_get_uint64(nv, "seq"); 531 if (hio->hio_seq == 0) { 532 pjdlog_error("Header contains no 'seq' field."); 533 hio->hio_error = EINVAL; 534 goto end; 535 } 536 } 537 switch (hio->hio_cmd) { 538 case HIO_FLUSH: 539 case HIO_KEEPALIVE: 540 break; 541 case HIO_READ: 542 case HIO_WRITE: 543 case HIO_DELETE: 544 hio->hio_offset = nv_get_uint64(nv, "offset"); 545 if (nv_error(nv) != 0) { 546 pjdlog_error("Header is missing 'offset' field."); 547 hio->hio_error = EINVAL; 548 goto end; 549 } 550 hio->hio_length = nv_get_uint64(nv, "length"); 551 if (nv_error(nv) != 0) { 552 pjdlog_error("Header is missing 'length' field."); 553 hio->hio_error = EINVAL; 554 goto end; 555 } 556 if (hio->hio_length == 0) { 557 pjdlog_error("Data length is zero."); 558 hio->hio_error = EINVAL; 559 goto end; 560 } 561 if (hio->hio_length > MAXPHYS) { 562 pjdlog_error("Data length is too large (%ju > %ju).", 563 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); 564 hio->hio_error = EINVAL; 565 goto end; 566 } 567 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { 568 pjdlog_error("Offset %ju is not multiple of sector size.", 569 (uintmax_t)hio->hio_offset); 570 hio->hio_error = EINVAL; 571 goto end; 572 } 573 if ((hio->hio_length % res->hr_local_sectorsize) != 0) { 574 pjdlog_error("Length %ju is not multiple of sector size.", 575 (uintmax_t)hio->hio_length); 576 hio->hio_error = EINVAL; 577 goto end; 578 } 579 if (hio->hio_offset + hio->hio_length > 580 (uint64_t)res->hr_datasize) { 581 pjdlog_error("Data offset is too large (%ju > %ju).", 582 (uintmax_t)(hio->hio_offset + hio->hio_length), 583 (uintmax_t)res->hr_datasize); 584 hio->hio_error = EINVAL; 585 goto end; 586 } 587 break; 588 default: 589 pjdlog_error("Header contains invalid 'cmd' (%hhu).", 590 hio->hio_cmd); 591 hio->hio_error = EINVAL; 592 goto end; 593 } 594 hio->hio_error = 0; 595 end: 596 return (hio->hio_error); 597 } 598 599 static __dead2 void 600 secondary_exit(int exitcode, const char *fmt, ...) 601 { 602 va_list ap; 603 604 PJDLOG_ASSERT(exitcode != EX_OK); 605 va_start(ap, fmt); 606 pjdlogv_errno(LOG_ERR, fmt, ap); 607 va_end(ap); 608 event_send(gres, EVENT_DISCONNECT); 609 exit(exitcode); 610 } 611 612 /* 613 * Thread receives requests from the primary node. 614 */ 615 static void * 616 recv_thread(void *arg) 617 { 618 struct hast_resource *res = arg; 619 struct hio *hio; 620 struct nv *nv; 621 622 for (;;) { 623 pjdlog_debug(2, "recv: Taking free request."); 624 QUEUE_TAKE(free, hio); 625 pjdlog_debug(2, "recv: (%p) Got request.", hio); 626 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 627 secondary_exit(EX_TEMPFAIL, 628 "Unable to receive request header"); 629 } 630 if (requnpack(res, hio, nv) != 0) { 631 nv_free(nv); 632 pjdlog_debug(2, 633 "recv: (%p) Moving request to the send queue.", 634 hio); 635 QUEUE_INSERT(send, hio); 636 continue; 637 } 638 switch (hio->hio_cmd) { 639 case HIO_READ: 640 res->hr_stat_read++; 641 break; 642 case HIO_WRITE: 643 res->hr_stat_write++; 644 break; 645 case HIO_DELETE: 646 res->hr_stat_delete++; 647 break; 648 case HIO_FLUSH: 649 res->hr_stat_flush++; 650 break; 651 case HIO_KEEPALIVE: 652 break; 653 default: 654 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 655 hio->hio_cmd); 656 } 657 reqlog(LOG_DEBUG, 2, -1, hio, 658 "recv: (%p) Got request header: ", hio); 659 if (hio->hio_cmd == HIO_KEEPALIVE) { 660 nv_free(nv); 661 pjdlog_debug(2, 662 "recv: (%p) Moving request to the free queue.", 663 hio); 664 hio_clear(hio); 665 QUEUE_INSERT(free, hio); 666 continue; 667 } else if (hio->hio_cmd == HIO_WRITE) { 668 if (hast_proto_recv_data(res, res->hr_remotein, nv, 669 hio->hio_data, MAXPHYS) == -1) { 670 secondary_exit(EX_TEMPFAIL, 671 "Unable to receive request data"); 672 } 673 } 674 nv_free(nv); 675 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", 676 hio); 677 QUEUE_INSERT(disk, hio); 678 } 679 /* NOTREACHED */ 680 return (NULL); 681 } 682 683 /* 684 * Thread reads from or writes to local component and also handles DELETE and 685 * FLUSH requests. 686 */ 687 static void * 688 disk_thread(void *arg) 689 { 690 struct hast_resource *res = arg; 691 struct hio *hio; 692 ssize_t ret; 693 bool clear_activemap, logerror; 694 695 clear_activemap = true; 696 697 for (;;) { 698 pjdlog_debug(2, "disk: Taking request."); 699 QUEUE_TAKE(disk, hio); 700 while (clear_activemap) { 701 unsigned char *map; 702 size_t mapsize; 703 704 /* 705 * When first request is received, it means that primary 706 * already received our activemap, merged it and stored 707 * locally. We can now safely clear our activemap. 708 */ 709 mapsize = 710 activemap_calc_ondisk_size(res->hr_local_mediasize - 711 METADATA_SIZE, res->hr_extentsize, 712 res->hr_local_sectorsize); 713 map = calloc(1, mapsize); 714 if (map == NULL) { 715 pjdlog_warning("Unable to allocate memory to clear local activemap."); 716 break; 717 } 718 if (pwrite(res->hr_localfd, map, mapsize, 719 METADATA_SIZE) != (ssize_t)mapsize) { 720 pjdlog_errno(LOG_WARNING, 721 "Unable to store cleared activemap"); 722 free(map); 723 break; 724 } 725 free(map); 726 clear_activemap = false; 727 pjdlog_debug(1, "Local activemap cleared."); 728 break; 729 } 730 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); 731 logerror = true; 732 /* Handle the actual request. */ 733 switch (hio->hio_cmd) { 734 case HIO_READ: 735 ret = pread(res->hr_localfd, hio->hio_data, 736 hio->hio_length, 737 hio->hio_offset + res->hr_localoff); 738 if (ret == -1) 739 hio->hio_error = errno; 740 else if (ret != (int64_t)hio->hio_length) 741 hio->hio_error = EIO; 742 else 743 hio->hio_error = 0; 744 break; 745 case HIO_WRITE: 746 ret = pwrite(res->hr_localfd, hio->hio_data, 747 hio->hio_length, 748 hio->hio_offset + res->hr_localoff); 749 if (ret == -1) 750 hio->hio_error = errno; 751 else if (ret != (int64_t)hio->hio_length) 752 hio->hio_error = EIO; 753 else 754 hio->hio_error = 0; 755 break; 756 case HIO_DELETE: 757 ret = g_delete(res->hr_localfd, 758 hio->hio_offset + res->hr_localoff, 759 hio->hio_length); 760 if (ret == -1) 761 hio->hio_error = errno; 762 else 763 hio->hio_error = 0; 764 break; 765 case HIO_FLUSH: 766 if (!res->hr_localflush) { 767 ret = -1; 768 hio->hio_error = EOPNOTSUPP; 769 logerror = false; 770 break; 771 } 772 ret = g_flush(res->hr_localfd); 773 if (ret == -1) { 774 if (errno == EOPNOTSUPP) 775 res->hr_localflush = false; 776 hio->hio_error = errno; 777 } else { 778 hio->hio_error = 0; 779 } 780 break; 781 default: 782 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 783 hio->hio_cmd); 784 } 785 if (logerror && hio->hio_error != 0) { 786 reqlog(LOG_ERR, 0, hio->hio_error, hio, 787 "Request failed: "); 788 } 789 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", 790 hio); 791 QUEUE_INSERT(send, hio); 792 } 793 /* NOTREACHED */ 794 return (NULL); 795 } 796 797 /* 798 * Thread sends requests back to primary node. 799 */ 800 static void * 801 send_thread(void *arg) 802 { 803 struct hast_resource *res = arg; 804 struct nv *nvout; 805 struct hio *hio; 806 void *data; 807 size_t length; 808 809 for (;;) { 810 pjdlog_debug(2, "send: Taking request."); 811 QUEUE_TAKE(send, hio); 812 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); 813 nvout = nv_alloc(); 814 /* Copy sequence number. */ 815 nv_add_uint64(nvout, hio->hio_seq, "seq"); 816 switch (hio->hio_cmd) { 817 case HIO_READ: 818 if (hio->hio_error == 0) { 819 data = hio->hio_data; 820 length = hio->hio_length; 821 break; 822 } 823 /* 824 * We send no data in case of an error. 825 */ 826 /* FALLTHROUGH */ 827 case HIO_DELETE: 828 case HIO_FLUSH: 829 case HIO_WRITE: 830 data = NULL; 831 length = 0; 832 break; 833 default: 834 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 835 hio->hio_cmd); 836 } 837 if (hio->hio_error != 0) 838 nv_add_int16(nvout, hio->hio_error, "error"); 839 if (hast_proto_send(res, res->hr_remoteout, nvout, data, 840 length) == -1) { 841 secondary_exit(EX_TEMPFAIL, "Unable to send reply."); 842 } 843 nv_free(nvout); 844 pjdlog_debug(2, "send: (%p) Moving request to the free queue.", 845 hio); 846 hio_clear(hio); 847 QUEUE_INSERT(free, hio); 848 } 849 /* NOTREACHED */ 850 return (NULL); 851 } 852