1 /*- 2 * Copyright (c) 2009-2010 The FreeBSD Foundation 3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31 #include <sys/cdefs.h> 32 __FBSDID("$FreeBSD$"); 33 34 #include <sys/param.h> 35 #include <sys/time.h> 36 #include <sys/bio.h> 37 #include <sys/disk.h> 38 #include <sys/stat.h> 39 40 #include <err.h> 41 #include <errno.h> 42 #include <fcntl.h> 43 #include <libgeom.h> 44 #include <pthread.h> 45 #include <signal.h> 46 #include <stdint.h> 47 #include <stdio.h> 48 #include <string.h> 49 #include <sysexits.h> 50 #include <unistd.h> 51 52 #include <activemap.h> 53 #include <nv.h> 54 #include <pjdlog.h> 55 56 #include "control.h" 57 #include "event.h" 58 #include "hast.h" 59 #include "hast_proto.h" 60 #include "hastd.h" 61 #include "hooks.h" 62 #include "metadata.h" 63 #include "proto.h" 64 #include "subr.h" 65 #include "synch.h" 66 67 struct hio { 68 uint64_t hio_seq; 69 int hio_error; 70 void *hio_data; 71 uint8_t hio_cmd; 72 uint64_t hio_offset; 73 uint64_t hio_length; 74 TAILQ_ENTRY(hio) hio_next; 75 }; 76 77 static struct hast_resource *gres; 78 79 /* 80 * Free list holds unused structures. When free list is empty, we have to wait 81 * until some in-progress requests are freed. 82 */ 83 static TAILQ_HEAD(, hio) hio_free_list; 84 static pthread_mutex_t hio_free_list_lock; 85 static pthread_cond_t hio_free_list_cond; 86 /* 87 * Disk thread (the one that do I/O requests) takes requests from this list. 88 */ 89 static TAILQ_HEAD(, hio) hio_disk_list; 90 static pthread_mutex_t hio_disk_list_lock; 91 static pthread_cond_t hio_disk_list_cond; 92 /* 93 * There is one recv list for every component, although local components don't 94 * use recv lists as local requests are done synchronously. 95 */ 96 static TAILQ_HEAD(, hio) hio_send_list; 97 static pthread_mutex_t hio_send_list_lock; 98 static pthread_cond_t hio_send_list_cond; 99 100 /* 101 * Maximum number of outstanding I/O requests. 102 */ 103 #define HAST_HIO_MAX 256 104 105 static void *recv_thread(void *arg); 106 static void *disk_thread(void *arg); 107 static void *send_thread(void *arg); 108 109 #define QUEUE_INSERT(name, hio) do { \ 110 bool _wakeup; \ 111 \ 112 mtx_lock(&hio_##name##_list_lock); \ 113 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 114 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ 115 mtx_unlock(&hio_##name##_list_lock); \ 116 if (_wakeup) \ 117 cv_signal(&hio_##name##_list_cond); \ 118 } while (0) 119 #define QUEUE_TAKE(name, hio) do { \ 120 mtx_lock(&hio_##name##_list_lock); \ 121 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 122 cv_wait(&hio_##name##_list_cond, \ 123 &hio_##name##_list_lock); \ 124 } \ 125 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ 126 mtx_unlock(&hio_##name##_list_lock); \ 127 } while (0) 128 129 static void 130 hio_clear(struct hio *hio) 131 { 132 133 hio->hio_seq = 0; 134 hio->hio_error = 0; 135 hio->hio_cmd = HIO_UNDEF; 136 hio->hio_offset = 0; 137 hio->hio_length = 0; 138 } 139 140 static void 141 init_environment(void) 142 { 143 struct hio *hio; 144 unsigned int ii; 145 146 /* 147 * Initialize lists, their locks and theirs condition variables. 148 */ 149 TAILQ_INIT(&hio_free_list); 150 mtx_init(&hio_free_list_lock); 151 cv_init(&hio_free_list_cond); 152 TAILQ_INIT(&hio_disk_list); 153 mtx_init(&hio_disk_list_lock); 154 cv_init(&hio_disk_list_cond); 155 TAILQ_INIT(&hio_send_list); 156 mtx_init(&hio_send_list_lock); 157 cv_init(&hio_send_list_cond); 158 159 /* 160 * Allocate requests pool and initialize requests. 161 */ 162 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 163 hio = malloc(sizeof(*hio)); 164 if (hio == NULL) { 165 pjdlog_exitx(EX_TEMPFAIL, 166 "Unable to allocate memory (%zu bytes) for hio request.", 167 sizeof(*hio)); 168 } 169 hio->hio_data = malloc(MAXPHYS); 170 if (hio->hio_data == NULL) { 171 pjdlog_exitx(EX_TEMPFAIL, 172 "Unable to allocate memory (%zu bytes) for gctl_data.", 173 (size_t)MAXPHYS); 174 } 175 hio_clear(hio); 176 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); 177 } 178 } 179 180 static void 181 init_local(struct hast_resource *res) 182 { 183 184 if (metadata_read(res, true) < 0) 185 exit(EX_NOINPUT); 186 } 187 188 static void 189 init_remote(struct hast_resource *res, struct nv *nvin) 190 { 191 uint64_t resuid; 192 struct nv *nvout; 193 unsigned char *map; 194 size_t mapsize; 195 196 #ifdef notyet 197 /* Setup direction. */ 198 if (proto_send(res->hr_remoteout, NULL, 0) == -1) 199 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 200 #endif 201 202 map = NULL; 203 mapsize = 0; 204 nvout = nv_alloc(); 205 nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); 206 nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); 207 resuid = nv_get_uint64(nvin, "resuid"); 208 res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); 209 res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 210 nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); 211 nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); 212 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 213 METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); 214 map = malloc(mapsize); 215 if (map == NULL) { 216 pjdlog_exitx(EX_TEMPFAIL, 217 "Unable to allocate memory (%zu bytes) for activemap.", 218 mapsize); 219 } 220 /* 221 * When we work as primary and secondary is missing we will increase 222 * localcnt in our metadata. When secondary is connected and synced 223 * we make localcnt be equal to remotecnt, which means nodes are more 224 * or less in sync. 225 * Split-brain condition is when both nodes are not able to communicate 226 * and are both configured as primary nodes. In turn, they can both 227 * make incompatible changes to the data and we have to detect that. 228 * Under split-brain condition we will increase our localcnt on first 229 * write and remote node will increase its localcnt on first write. 230 * When we connect we can see that primary's localcnt is greater than 231 * our remotecnt (primary was modified while we weren't watching) and 232 * our localcnt is greater than primary's remotecnt (we were modified 233 * while primary wasn't watching). 234 * There are many possible combinations which are all gathered below. 235 * Don't pay too much attention to exact numbers, the more important 236 * is to compare them. We compare secondary's local with primary's 237 * remote and secondary's remote with primary's local. 238 * Note that every case where primary's localcnt is smaller than 239 * secondary's remotecnt and where secondary's localcnt is smaller than 240 * primary's remotecnt should be impossible in practise. We will perform 241 * full synchronization then. Those cases are marked with an asterisk. 242 * Regular synchronization means that only extents marked as dirty are 243 * synchronized (regular synchronization). 244 * 245 * SECONDARY METADATA PRIMARY METADATA 246 * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. 247 * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. 248 * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. 249 * local=3 remote=3 local=3 remote=2 Primary is out-of-date, 250 * regular sync from secondary. 251 * local=3 remote=3 local=3 remote=3 Regular sync just in case. 252 * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. 253 * local=3 remote=3 local=4 remote=2 Split-brain condition. 254 * local=3 remote=3 local=4 remote=3 Secondary out-of-date, 255 * regular sync from primary. 256 * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. 257 */ 258 if (res->hr_resuid == 0) { 259 /* 260 * Provider is used for the first time. If primary node done no 261 * writes yet as well (we will find "virgin" argument) then 262 * there is no need to synchronize anything. If primary node 263 * done any writes already we have to synchronize everything. 264 */ 265 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 266 res->hr_resuid = resuid; 267 if (metadata_write(res) < 0) 268 exit(EX_NOINPUT); 269 if (nv_exists(nvin, "virgin")) { 270 free(map); 271 map = NULL; 272 mapsize = 0; 273 } else { 274 memset(map, 0xff, mapsize); 275 } 276 nv_add_int8(nvout, 1, "virgin"); 277 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 278 } else if (res->hr_resuid != resuid) { 279 char errmsg[256]; 280 281 free(map); 282 (void)snprintf(errmsg, sizeof(errmsg), 283 "Resource unique ID mismatch (primary=%ju, secondary=%ju).", 284 (uintmax_t)resuid, (uintmax_t)res->hr_resuid); 285 pjdlog_error("%s", errmsg); 286 nv_add_string(nvout, errmsg, "errmsg"); 287 if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) < 0) { 288 pjdlog_exit(EX_TEMPFAIL, "Unable to send response to %s", 289 res->hr_remoteaddr); 290 } 291 nv_free(nvout); 292 exit(EX_CONFIG); 293 } else if ( 294 /* Is primary out-of-date? */ 295 (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 296 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 297 /* Are the nodes more or less in sync? */ 298 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 299 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 300 /* Is secondary out-of-date? */ 301 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 302 res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { 303 /* 304 * Nodes are more or less in sync or one of the nodes is 305 * out-of-date. 306 * It doesn't matter at this point which one, we just have to 307 * send out local bitmap to the remote node. 308 */ 309 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != 310 (ssize_t)mapsize) { 311 pjdlog_exit(LOG_ERR, "Unable to read activemap"); 312 } 313 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 314 res->hr_secondary_remotecnt == res->hr_primary_localcnt) { 315 /* Primary is out-of-date, sync from secondary. */ 316 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 317 } else { 318 /* 319 * Secondary is out-of-date or counts match. 320 * Sync from primary. 321 */ 322 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 323 } 324 } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 325 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 326 /* 327 * Not good, we have split-brain condition. 328 */ 329 free(map); 330 pjdlog_error("Split-brain detected, exiting."); 331 nv_add_string(nvout, "Split-brain condition!", "errmsg"); 332 if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) < 0) { 333 pjdlog_exit(EX_TEMPFAIL, "Unable to send response to %s", 334 res->hr_remoteaddr); 335 } 336 nv_free(nvout); 337 /* Exit on split-brain. */ 338 event_send(res, EVENT_SPLITBRAIN); 339 exit(EX_CONFIG); 340 } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || 341 res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { 342 /* 343 * This should never happen in practise, but we will perform 344 * full synchronization. 345 */ 346 PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt || 347 res->hr_primary_localcnt < res->hr_secondary_remotecnt); 348 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 349 METADATA_SIZE, res->hr_extentsize, 350 res->hr_local_sectorsize); 351 memset(map, 0xff, mapsize); 352 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { 353 /* In this one of five cases sync from secondary. */ 354 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 355 } else { 356 /* For the rest four cases sync from primary. */ 357 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 358 } 359 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", 360 (uintmax_t)res->hr_primary_localcnt, 361 (uintmax_t)res->hr_primary_remotecnt, 362 (uintmax_t)res->hr_secondary_localcnt, 363 (uintmax_t)res->hr_secondary_remotecnt); 364 } 365 nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); 366 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) { 367 pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s", 368 res->hr_remoteaddr); 369 } 370 if (map != NULL) 371 free(map); 372 nv_free(nvout); 373 #ifdef notyet 374 /* Setup direction. */ 375 if (proto_recv(res->hr_remotein, NULL, 0) == -1) 376 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 377 #endif 378 } 379 380 void 381 hastd_secondary(struct hast_resource *res, struct nv *nvin) 382 { 383 sigset_t mask; 384 pthread_t td; 385 pid_t pid; 386 int error, mode, debuglevel; 387 388 /* 389 * Create communication channel between parent and child. 390 */ 391 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) { 392 KEEP_ERRNO((void)pidfile_remove(pfh)); 393 pjdlog_exit(EX_OSERR, 394 "Unable to create control sockets between parent and child"); 395 } 396 /* 397 * Create communication channel between child and parent. 398 */ 399 if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) { 400 KEEP_ERRNO((void)pidfile_remove(pfh)); 401 pjdlog_exit(EX_OSERR, 402 "Unable to create event sockets between child and parent"); 403 } 404 405 pid = fork(); 406 if (pid < 0) { 407 KEEP_ERRNO((void)pidfile_remove(pfh)); 408 pjdlog_exit(EX_OSERR, "Unable to fork"); 409 } 410 411 if (pid > 0) { 412 /* This is parent. */ 413 proto_close(res->hr_remotein); 414 res->hr_remotein = NULL; 415 proto_close(res->hr_remoteout); 416 res->hr_remoteout = NULL; 417 /* Declare that we are receiver. */ 418 proto_recv(res->hr_event, NULL, 0); 419 /* Declare that we are sender. */ 420 proto_send(res->hr_ctrl, NULL, 0); 421 res->hr_workerpid = pid; 422 return; 423 } 424 425 gres = res; 426 mode = pjdlog_mode_get(); 427 debuglevel = pjdlog_debug_get(); 428 429 /* Declare that we are sender. */ 430 proto_send(res->hr_event, NULL, 0); 431 /* Declare that we are receiver. */ 432 proto_recv(res->hr_ctrl, NULL, 0); 433 descriptors_cleanup(res); 434 435 descriptors_assert(res, mode); 436 437 pjdlog_init(mode); 438 pjdlog_debug_set(debuglevel); 439 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 440 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 441 442 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 443 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 444 445 /* Error in setting timeout is not critical, but why should it fail? */ 446 if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) < 0) 447 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 448 if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0) 449 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 450 451 init_local(res); 452 init_environment(); 453 454 if (drop_privs(res) != 0) 455 exit(EX_CONFIG); 456 pjdlog_info("Privileges successfully dropped."); 457 458 /* 459 * Create the control thread before sending any event to the parent, 460 * as we can deadlock when parent sends control request to worker, 461 * but worker has no control thread started yet, so parent waits. 462 * In the meantime worker sends an event to the parent, but parent 463 * is unable to handle the event, because it waits for control 464 * request response. 465 */ 466 error = pthread_create(&td, NULL, ctrl_thread, res); 467 PJDLOG_ASSERT(error == 0); 468 469 init_remote(res, nvin); 470 event_send(res, EVENT_CONNECT); 471 472 error = pthread_create(&td, NULL, recv_thread, res); 473 PJDLOG_ASSERT(error == 0); 474 error = pthread_create(&td, NULL, disk_thread, res); 475 PJDLOG_ASSERT(error == 0); 476 (void)send_thread(res); 477 } 478 479 static void 480 reqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...) 481 { 482 char msg[1024]; 483 va_list ap; 484 int len; 485 486 va_start(ap, fmt); 487 len = vsnprintf(msg, sizeof(msg), fmt, ap); 488 va_end(ap); 489 if ((size_t)len < sizeof(msg)) { 490 switch (hio->hio_cmd) { 491 case HIO_READ: 492 (void)snprintf(msg + len, sizeof(msg) - len, 493 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, 494 (uintmax_t)hio->hio_length); 495 break; 496 case HIO_DELETE: 497 (void)snprintf(msg + len, sizeof(msg) - len, 498 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, 499 (uintmax_t)hio->hio_length); 500 break; 501 case HIO_FLUSH: 502 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 503 break; 504 case HIO_WRITE: 505 (void)snprintf(msg + len, sizeof(msg) - len, 506 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, 507 (uintmax_t)hio->hio_length); 508 break; 509 case HIO_KEEPALIVE: 510 (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); 511 break; 512 default: 513 (void)snprintf(msg + len, sizeof(msg) - len, 514 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); 515 break; 516 } 517 } 518 pjdlog_common(loglevel, debuglevel, error, "%s", msg); 519 } 520 521 static int 522 requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv) 523 { 524 525 hio->hio_cmd = nv_get_uint8(nv, "cmd"); 526 if (hio->hio_cmd == 0) { 527 pjdlog_error("Header contains no 'cmd' field."); 528 hio->hio_error = EINVAL; 529 goto end; 530 } 531 if (hio->hio_cmd != HIO_KEEPALIVE) { 532 hio->hio_seq = nv_get_uint64(nv, "seq"); 533 if (hio->hio_seq == 0) { 534 pjdlog_error("Header contains no 'seq' field."); 535 hio->hio_error = EINVAL; 536 goto end; 537 } 538 } 539 switch (hio->hio_cmd) { 540 case HIO_FLUSH: 541 case HIO_KEEPALIVE: 542 break; 543 case HIO_READ: 544 case HIO_WRITE: 545 case HIO_DELETE: 546 hio->hio_offset = nv_get_uint64(nv, "offset"); 547 if (nv_error(nv) != 0) { 548 pjdlog_error("Header is missing 'offset' field."); 549 hio->hio_error = EINVAL; 550 goto end; 551 } 552 hio->hio_length = nv_get_uint64(nv, "length"); 553 if (nv_error(nv) != 0) { 554 pjdlog_error("Header is missing 'length' field."); 555 hio->hio_error = EINVAL; 556 goto end; 557 } 558 if (hio->hio_length == 0) { 559 pjdlog_error("Data length is zero."); 560 hio->hio_error = EINVAL; 561 goto end; 562 } 563 if (hio->hio_length > MAXPHYS) { 564 pjdlog_error("Data length is too large (%ju > %ju).", 565 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); 566 hio->hio_error = EINVAL; 567 goto end; 568 } 569 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { 570 pjdlog_error("Offset %ju is not multiple of sector size.", 571 (uintmax_t)hio->hio_offset); 572 hio->hio_error = EINVAL; 573 goto end; 574 } 575 if ((hio->hio_length % res->hr_local_sectorsize) != 0) { 576 pjdlog_error("Length %ju is not multiple of sector size.", 577 (uintmax_t)hio->hio_length); 578 hio->hio_error = EINVAL; 579 goto end; 580 } 581 if (hio->hio_offset + hio->hio_length > 582 (uint64_t)res->hr_datasize) { 583 pjdlog_error("Data offset is too large (%ju > %ju).", 584 (uintmax_t)(hio->hio_offset + hio->hio_length), 585 (uintmax_t)res->hr_datasize); 586 hio->hio_error = EINVAL; 587 goto end; 588 } 589 break; 590 default: 591 pjdlog_error("Header contains invalid 'cmd' (%hhu).", 592 hio->hio_cmd); 593 hio->hio_error = EINVAL; 594 goto end; 595 } 596 hio->hio_error = 0; 597 end: 598 return (hio->hio_error); 599 } 600 601 static __dead2 void 602 secondary_exit(int exitcode, const char *fmt, ...) 603 { 604 va_list ap; 605 606 PJDLOG_ASSERT(exitcode != EX_OK); 607 va_start(ap, fmt); 608 pjdlogv_errno(LOG_ERR, fmt, ap); 609 va_end(ap); 610 event_send(gres, EVENT_DISCONNECT); 611 exit(exitcode); 612 } 613 614 /* 615 * Thread receives requests from the primary node. 616 */ 617 static void * 618 recv_thread(void *arg) 619 { 620 struct hast_resource *res = arg; 621 struct hio *hio; 622 struct nv *nv; 623 624 for (;;) { 625 pjdlog_debug(2, "recv: Taking free request."); 626 QUEUE_TAKE(free, hio); 627 pjdlog_debug(2, "recv: (%p) Got request.", hio); 628 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 629 secondary_exit(EX_TEMPFAIL, 630 "Unable to receive request header"); 631 } 632 if (requnpack(res, hio, nv) != 0) { 633 nv_free(nv); 634 pjdlog_debug(2, 635 "recv: (%p) Moving request to the send queue.", 636 hio); 637 QUEUE_INSERT(send, hio); 638 continue; 639 } 640 switch (hio->hio_cmd) { 641 case HIO_READ: 642 res->hr_stat_read++; 643 break; 644 case HIO_WRITE: 645 res->hr_stat_write++; 646 break; 647 case HIO_DELETE: 648 res->hr_stat_delete++; 649 break; 650 case HIO_FLUSH: 651 res->hr_stat_flush++; 652 break; 653 case HIO_KEEPALIVE: 654 break; 655 default: 656 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 657 hio->hio_cmd); 658 } 659 reqlog(LOG_DEBUG, 2, -1, hio, 660 "recv: (%p) Got request header: ", hio); 661 if (hio->hio_cmd == HIO_KEEPALIVE) { 662 nv_free(nv); 663 pjdlog_debug(2, 664 "recv: (%p) Moving request to the free queue.", 665 hio); 666 hio_clear(hio); 667 QUEUE_INSERT(free, hio); 668 continue; 669 } else if (hio->hio_cmd == HIO_WRITE) { 670 if (hast_proto_recv_data(res, res->hr_remotein, nv, 671 hio->hio_data, MAXPHYS) < 0) { 672 secondary_exit(EX_TEMPFAIL, 673 "Unable to receive request data"); 674 } 675 } 676 nv_free(nv); 677 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", 678 hio); 679 QUEUE_INSERT(disk, hio); 680 } 681 /* NOTREACHED */ 682 return (NULL); 683 } 684 685 /* 686 * Thread reads from or writes to local component and also handles DELETE and 687 * FLUSH requests. 688 */ 689 static void * 690 disk_thread(void *arg) 691 { 692 struct hast_resource *res = arg; 693 struct hio *hio; 694 ssize_t ret; 695 bool clear_activemap, logerror; 696 697 clear_activemap = true; 698 699 for (;;) { 700 pjdlog_debug(2, "disk: Taking request."); 701 QUEUE_TAKE(disk, hio); 702 while (clear_activemap) { 703 unsigned char *map; 704 size_t mapsize; 705 706 /* 707 * When first request is received, it means that primary 708 * already received our activemap, merged it and stored 709 * locally. We can now safely clear our activemap. 710 */ 711 mapsize = 712 activemap_calc_ondisk_size(res->hr_local_mediasize - 713 METADATA_SIZE, res->hr_extentsize, 714 res->hr_local_sectorsize); 715 map = calloc(1, mapsize); 716 if (map == NULL) { 717 pjdlog_warning("Unable to allocate memory to clear local activemap."); 718 break; 719 } 720 if (pwrite(res->hr_localfd, map, mapsize, 721 METADATA_SIZE) != (ssize_t)mapsize) { 722 pjdlog_errno(LOG_WARNING, 723 "Unable to store cleared activemap"); 724 free(map); 725 break; 726 } 727 free(map); 728 clear_activemap = false; 729 pjdlog_debug(1, "Local activemap cleared."); 730 break; 731 } 732 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); 733 logerror = true; 734 /* Handle the actual request. */ 735 switch (hio->hio_cmd) { 736 case HIO_READ: 737 ret = pread(res->hr_localfd, hio->hio_data, 738 hio->hio_length, 739 hio->hio_offset + res->hr_localoff); 740 if (ret < 0) 741 hio->hio_error = errno; 742 else if (ret != (int64_t)hio->hio_length) 743 hio->hio_error = EIO; 744 else 745 hio->hio_error = 0; 746 break; 747 case HIO_WRITE: 748 ret = pwrite(res->hr_localfd, hio->hio_data, 749 hio->hio_length, 750 hio->hio_offset + res->hr_localoff); 751 if (ret < 0) 752 hio->hio_error = errno; 753 else if (ret != (int64_t)hio->hio_length) 754 hio->hio_error = EIO; 755 else 756 hio->hio_error = 0; 757 break; 758 case HIO_DELETE: 759 ret = g_delete(res->hr_localfd, 760 hio->hio_offset + res->hr_localoff, 761 hio->hio_length); 762 if (ret < 0) 763 hio->hio_error = errno; 764 else 765 hio->hio_error = 0; 766 break; 767 case HIO_FLUSH: 768 if (!res->hr_localflush) { 769 ret = -1; 770 hio->hio_error = EOPNOTSUPP; 771 logerror = false; 772 break; 773 } 774 ret = g_flush(res->hr_localfd); 775 if (ret < 0) { 776 if (errno == EOPNOTSUPP) 777 res->hr_localflush = false; 778 hio->hio_error = errno; 779 } else { 780 hio->hio_error = 0; 781 } 782 break; 783 default: 784 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 785 hio->hio_cmd); 786 } 787 if (logerror && hio->hio_error != 0) { 788 reqlog(LOG_ERR, 0, hio->hio_error, hio, 789 "Request failed: "); 790 } 791 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", 792 hio); 793 QUEUE_INSERT(send, hio); 794 } 795 /* NOTREACHED */ 796 return (NULL); 797 } 798 799 /* 800 * Thread sends requests back to primary node. 801 */ 802 static void * 803 send_thread(void *arg) 804 { 805 struct hast_resource *res = arg; 806 struct nv *nvout; 807 struct hio *hio; 808 void *data; 809 size_t length; 810 811 for (;;) { 812 pjdlog_debug(2, "send: Taking request."); 813 QUEUE_TAKE(send, hio); 814 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); 815 nvout = nv_alloc(); 816 /* Copy sequence number. */ 817 nv_add_uint64(nvout, hio->hio_seq, "seq"); 818 switch (hio->hio_cmd) { 819 case HIO_READ: 820 if (hio->hio_error == 0) { 821 data = hio->hio_data; 822 length = hio->hio_length; 823 break; 824 } 825 /* 826 * We send no data in case of an error. 827 */ 828 /* FALLTHROUGH */ 829 case HIO_DELETE: 830 case HIO_FLUSH: 831 case HIO_WRITE: 832 data = NULL; 833 length = 0; 834 break; 835 default: 836 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 837 hio->hio_cmd); 838 } 839 if (hio->hio_error != 0) 840 nv_add_int16(nvout, hio->hio_error, "error"); 841 if (hast_proto_send(res, res->hr_remoteout, nvout, data, 842 length) < 0) { 843 secondary_exit(EX_TEMPFAIL, "Unable to send reply."); 844 } 845 nv_free(nvout); 846 pjdlog_debug(2, "send: (%p) Moving request to the free queue.", 847 hio); 848 hio_clear(hio); 849 QUEUE_INSERT(free, hio); 850 } 851 /* NOTREACHED */ 852 return (NULL); 853 } 854