Lines Matching +full:sync +full:- +full:update +full:- +full:mask

1 /*-
2 * SPDX-License-Identifier: BSD-2-Clause
5 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net>
116 * until some in-progress requests are freed.
124 * send lists - each component gets the same request, but each component is
151 * Structure below are for interaction with sync thread.
179 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL)
211 hio_##name##_list_size[(ncomp)]--; \
224 hio_##name##_list_size--; \
229 #define ISFULLSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_FULLSYNC)
230 #define ISMEMSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_MEMSYNC)
231 #define ISASYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_ASYNC)
234 (hio)->hio_ggio.gctl_unit = -1; \
235 (hio)->hio_ggio.gctl_seq = 1; \
237 #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1)
238 #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0)
239 #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2)
242 (hio)->hio_ggio.gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio))
288 if (res->hr_ggateunit >= 0) { in cleanup()
293 ggiod.gctl_unit = res->hr_ggateunit; in cleanup()
295 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { in cleanup()
298 res->hr_provname); in cleanup()
300 res->hr_ggateunit = -1; in cleanup()
333 hast_activemap_flush(struct hast_resource *res) __unlocks(res->hr_amp_lock) in hast_activemap_flush()
339 mtx_lock(&res->hr_amp_diskmap_lock); in hast_activemap_flush()
340 buf = activemap_bitmap(res->hr_amp, &size); in hast_activemap_flush()
341 mtx_unlock(&res->hr_amp_lock); in hast_activemap_flush()
343 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); in hast_activemap_flush()
345 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != in hast_activemap_flush()
348 res->hr_stat_activemap_write_error++; in hast_activemap_flush()
349 ret = -1; in hast_activemap_flush()
351 if (ret == 0 && res->hr_metaflush == 1 && in hast_activemap_flush()
352 g_flush(res->hr_localfd) == -1) { in hast_activemap_flush()
355 res->hr_localpath); in hast_activemap_flush()
356 res->hr_metaflush = 0; in hast_activemap_flush()
359 "Unable to flush disk cache on activemap update"); in hast_activemap_flush()
360 res->hr_stat_activemap_flush_error++; in hast_activemap_flush()
361 ret = -1; in hast_activemap_flush()
364 mtx_unlock(&res->hr_amp_diskmap_lock); in hast_activemap_flush()
372 return (strcmp(res->hr_remoteaddr, "none") != 0); in real_remote()
382 * In the future it might be per-resource value. in init_environment()
476 refcnt_init(&hio->hio_countdown, 0); in init_environment()
477 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); in init_environment()
478 if (hio->hio_errors == NULL) { in init_environment()
481 sizeof(hio->hio_errors[0]) * ncomps); in init_environment()
483 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); in init_environment()
484 if (hio->hio_next == NULL) { in init_environment()
487 sizeof(hio->hio_next[0]) * ncomps); in init_environment()
489 hio->hio_ggio.gctl_version = G_GATE_VERSION; in init_environment()
490 hio->hio_ggio.gctl_data = malloc(MAXPHYS); in init_environment()
491 if (hio->hio_ggio.gctl_data == NULL) { in init_environment()
496 hio->hio_ggio.gctl_length = MAXPHYS; in init_environment()
497 hio->hio_ggio.gctl_error = 0; in init_environment()
508 if (res->hr_resuid != 0) { in init_resuid()
513 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); in init_resuid()
515 if (metadata_write(res) == -1) in init_resuid()
527 if (metadata_read(res, true) == -1) in init_local()
529 mtx_init(&res->hr_amp_lock); in init_local()
530 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, in init_local()
531 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { in init_local()
536 if (rangelock_init(&range_regular) == -1) in init_local()
539 if (rangelock_init(&range_sync) == -1) in init_local()
540 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); in init_local()
541 mapsize = activemap_ondisk_size(res->hr_amp); in init_local()
547 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != in init_local()
551 activemap_copyin(res->hr_amp, buf, mapsize); in init_local()
553 if (res->hr_resuid != 0) in init_local()
562 res->hr_primary_localcnt = 0; in init_local()
563 res->hr_primary_remotecnt = 0; in init_local()
564 if (metadata_write(res) == -1) in init_local()
575 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { in primary_connect()
579 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { in primary_connect()
586 res->hr_remoteaddr); in primary_connect()
587 return (-1); in primary_connect()
589 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { in primary_connect()
593 if (proto_connect_wait(conn, res->hr_timeout) == -1) { in primary_connect()
595 res->hr_remoteaddr); in primary_connect()
597 return (-1); in primary_connect()
600 if (proto_timeout(conn, res->hr_timeout) == -1) in primary_connect()
618 ggiomodify.gctl_unit = res->hr_ggateunit; in enable_direct_reads()
620 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, in enable_direct_reads()
622 ggiomodify.gctl_readoffset = res->hr_localoff; in enable_direct_reads()
623 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) in enable_direct_reads()
651 if (primary_connect(res, &out) == -1) in init_remote()
661 nv_add_string(nvout, res->hr_name, "resource"); in init_remote()
666 res->hr_remoteaddr); in init_remote()
670 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { in init_remote()
673 res->hr_remoteaddr); in init_remote()
678 if (hast_proto_recv_hdr(out, &nvin) == -1) { in init_remote()
681 res->hr_remoteaddr); in init_remote()
704 res->hr_version = version; in init_remote()
705 pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); in init_remote()
709 res->hr_remoteaddr); in init_remote()
713 if (size != sizeof(res->hr_token)) { in init_remote()
715 res->hr_remoteaddr, size, sizeof(res->hr_token)); in init_remote()
719 bcopy(token, res->hr_token, sizeof(res->hr_token)); in init_remote()
726 if (primary_connect(res, &in) == -1) in init_remote()
730 nv_add_string(nvout, res->hr_name, "resource"); in init_remote()
731 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), in init_remote()
733 if (res->hr_resuid == 0) { in init_remote()
746 nv_add_uint64(nvout, res->hr_resuid, "resuid"); in init_remote()
747 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); in init_remote()
748 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); in init_remote()
752 res->hr_remoteaddr); in init_remote()
756 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { in init_remote()
759 res->hr_remoteaddr); in init_remote()
764 if (hast_proto_recv_hdr(out, &nvin) == -1) { in init_remote()
767 res->hr_remoteaddr); in init_remote()
777 if (datasize != res->hr_datasize) { in init_remote()
779 (intmax_t)res->hr_datasize, (intmax_t)datasize); in init_remote()
784 if (extentsize != res->hr_extentsize) { in init_remote()
786 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); in init_remote()
790 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); in init_remote()
791 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); in init_remote()
792 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); in init_remote()
793 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) in init_remote()
800 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); in init_remote()
801 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); in init_remote()
803 if (res->hr_primary_localcnt == 0) { in init_remote()
804 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); in init_remote()
807 res->hr_primary_localcnt++; in init_remote()
809 (uintmax_t)res->hr_primary_localcnt); in init_remote()
829 mapsize) == -1) { in init_remote()
836 mtx_lock(&res->hr_amp_lock); in init_remote()
840 activemap_merge(res->hr_amp, map, mapsize); in init_remote()
851 if (proto_send(out, NULL, 0) == -1) in init_remote()
853 if (proto_recv(in, NULL, 0) == -1) in init_remote()
856 pjdlog_info("Connected to %s.", res->hr_remoteaddr); in init_remote()
857 if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && in init_remote()
858 res->hr_version < 2) { in init_remote()
860 res->hr_replication = HAST_REPLICATION_FULLSYNC; in init_remote()
861 } else if (res->hr_replication != res->hr_original_replication) { in init_remote()
865 res->hr_replication = res->hr_original_replication; in init_remote()
871 res->hr_remotein = in; in init_remote()
872 res->hr_remoteout = out; in init_remote()
877 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) in init_remote()
914 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); in init_ggate()
915 if (res->hr_ggatefd == -1) in init_ggate()
923 ggiocreate.gctl_mediasize = res->hr_datasize; in init_ggate()
924 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; in init_ggate()
930 res->hr_provname); in init_ggate()
931 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { in init_ggate()
932 pjdlog_info("Device hast/%s created.", res->hr_provname); in init_ggate()
933 res->hr_ggateunit = ggiocreate.gctl_unit; in init_ggate()
938 res->hr_provname); in init_ggate()
942 res->hr_provname); in init_ggate()
952 res->hr_provname); in init_ggate()
953 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { in init_ggate()
954 pjdlog_info("Device hast/%s recovered.", res->hr_provname); in init_ggate()
955 res->hr_ggateunit = ggiocancel.gctl_unit; in init_ggate()
959 res->hr_provname); in init_ggate()
973 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { in hastd_primary()
982 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { in hastd_primary()
992 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { in hastd_primary()
1000 if (pid == -1) { in hastd_primary()
1009 proto_recv(res->hr_event, NULL, 0); in hastd_primary()
1010 proto_recv(res->hr_conn, NULL, 0); in hastd_primary()
1012 proto_send(res->hr_ctrl, NULL, 0); in hastd_primary()
1013 res->hr_workerpid = pid; in hastd_primary()
1018 res->output_status_aux = output_status_aux; in hastd_primary()
1023 proto_send(res->hr_event, NULL, 0); in hastd_primary()
1024 proto_send(res->hr_conn, NULL, 0); in hastd_primary()
1026 proto_recv(res->hr_ctrl, NULL, 0); in hastd_primary()
1033 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); in hastd_primary()
1034 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); in hastd_primary()
1071 res->hr_timeout); in hastd_primary()
1077 if (time(NULL) > start + res->hr_timeout) in hastd_primary()
1110 switch (ggio->gctl_cmd) { in reqlog()
1113 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); in reqlog()
1117 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); in reqlog()
1124 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); in reqlog()
1128 (unsigned int)ggio->gctl_cmd); in reqlog()
1131 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); in reqlog()
1140 * Check for a race between dropping rlock and acquiring wlock - in remote_close()
1141 * another thread can close connection in-between. in remote_close()
1144 PJDLOG_ASSERT(res->hr_remotein == NULL); in remote_close()
1145 PJDLOG_ASSERT(res->hr_remoteout == NULL); in remote_close()
1150 PJDLOG_ASSERT(res->hr_remotein != NULL); in remote_close()
1151 PJDLOG_ASSERT(res->hr_remoteout != NULL); in remote_close()
1154 res->hr_remoteaddr); in remote_close()
1155 proto_close(res->hr_remotein); in remote_close()
1156 res->hr_remotein = NULL; in remote_close()
1158 res->hr_remoteaddr); in remote_close()
1159 proto_close(res->hr_remoteout); in remote_close()
1160 res->hr_remoteout = NULL; in remote_close()
1164 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); in remote_close()
1167 * Stop synchronization if in-progress. in remote_close()
1175 * Acknowledge write completion to the kernel, but don't update activemap yet.
1183 PJDLOG_ASSERT(!hio->hio_done); in write_complete()
1185 ggio = &hio->hio_ggio; in write_complete()
1186 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); in write_complete()
1196 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { in write_complete()
1197 res->hr_primary_localcnt++; in write_complete()
1199 (uintmax_t)res->hr_primary_localcnt); in write_complete()
1205 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) in write_complete()
1207 hio->hio_done = true; in write_complete()
1213 * WRITE - always goes to both local_send and remote_send threads
1214 * READ (when the block is up-to-date on local component) -
1216 * READ (when the block isn't up-to-date on local component) -
1218 * DELETE - always goes to both local_send and remote_send threads
1219 * FLUSH - always goes to both local_send and remote_send threads
1234 ggio = &hio->hio_ggio; in ggate_recv_thread()
1235 ggio->gctl_unit = res->hr_ggateunit; in ggate_recv_thread()
1236 ggio->gctl_length = MAXPHYS; in ggate_recv_thread()
1237 ggio->gctl_error = 0; in ggate_recv_thread()
1238 hio->hio_done = false; in ggate_recv_thread()
1239 hio->hio_replication = res->hr_replication; in ggate_recv_thread()
1243 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { in ggate_recv_thread()
1248 error = ggio->gctl_error; in ggate_recv_thread()
1264 * bytes - request can't be bigger than that. in ggate_recv_thread()
1277 hio->hio_errors[ii] = EINVAL; in ggate_recv_thread()
1285 * range is out-of-date, then use remote component. in ggate_recv_thread()
1287 switch (ggio->gctl_cmd) { in ggate_recv_thread()
1289 res->hr_stat_read++; in ggate_recv_thread()
1292 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || in ggate_recv_thread()
1293 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { in ggate_recv_thread()
1295 * This range is up-to-date on local component, in ggate_recv_thread()
1300 } else /* if (res->hr_syncsrc == in ggate_recv_thread()
1302 PJDLOG_ASSERT(res->hr_syncsrc == in ggate_recv_thread()
1305 * This range is out-of-date on local component, in ggate_recv_thread()
1314 res->hr_stat_write++; in ggate_recv_thread()
1315 if (res->hr_resuid == 0 && in ggate_recv_thread()
1316 res->hr_primary_localcnt == 0) { in ggate_recv_thread()
1318 res->hr_primary_localcnt = 1; in ggate_recv_thread()
1323 ggio->gctl_offset, ggio->gctl_length)) { in ggate_recv_thread()
1326 (intmax_t)ggio->gctl_offset, in ggate_recv_thread()
1327 (size_t)ggio->gctl_length); in ggate_recv_thread()
1335 ggio->gctl_offset, ggio->gctl_length) == -1) { in ggate_recv_thread()
1339 (intmax_t)ggio->gctl_offset, in ggate_recv_thread()
1340 (size_t)ggio->gctl_length); in ggate_recv_thread()
1347 mtx_lock(&res->hr_amp_lock); in ggate_recv_thread()
1348 if (activemap_write_start(res->hr_amp, in ggate_recv_thread()
1349 ggio->gctl_offset, ggio->gctl_length)) { in ggate_recv_thread()
1350 res->hr_stat_activemap_update++; in ggate_recv_thread()
1353 mtx_unlock(&res->hr_amp_lock); in ggate_recv_thread()
1356 hio->hio_memsyncacked = false; in ggate_recv_thread()
1357 refcnt_init(&hio->hio_writecount, ncomps); in ggate_recv_thread()
1361 res->hr_stat_delete++; in ggate_recv_thread()
1364 res->hr_stat_flush++; in ggate_recv_thread()
1369 refcnt_init(&hio->hio_countdown, ncomps); in ggate_recv_thread()
1399 ggio = &hio->hio_ggio; in local_send_thread()
1400 switch (ggio->gctl_cmd) { in local_send_thread()
1402 ret = pread(res->hr_localfd, ggio->gctl_data, in local_send_thread()
1403 ggio->gctl_length, in local_send_thread()
1404 ggio->gctl_offset + res->hr_localoff); in local_send_thread()
1405 if (ret == ggio->gctl_length) in local_send_thread()
1406 hio->hio_errors[ncomp] = 0; in local_send_thread()
1411 if (ret == -1) { in local_send_thread()
1415 } else if (ret != ggio->gctl_length) { in local_send_thread()
1418 ret, (intmax_t)ggio->gctl_length); in local_send_thread()
1425 ret = pwrite(res->hr_localfd, ggio->gctl_data, in local_send_thread()
1426 ggio->gctl_length, in local_send_thread()
1427 ggio->gctl_offset + res->hr_localoff); in local_send_thread()
1428 if (ret == -1) { in local_send_thread()
1429 hio->hio_errors[ncomp] = errno; in local_send_thread()
1433 } else if (ret != ggio->gctl_length) { in local_send_thread()
1434 hio->hio_errors[ncomp] = EIO; in local_send_thread()
1437 ret, (intmax_t)ggio->gctl_length); in local_send_thread()
1439 hio->hio_errors[ncomp] = 0; in local_send_thread()
1441 ggio->gctl_error = 0; in local_send_thread()
1447 ret = g_delete(res->hr_localfd, in local_send_thread()
1448 ggio->gctl_offset + res->hr_localoff, in local_send_thread()
1449 ggio->gctl_length); in local_send_thread()
1450 if (ret == -1) { in local_send_thread()
1451 hio->hio_errors[ncomp] = errno; in local_send_thread()
1456 hio->hio_errors[ncomp] = 0; in local_send_thread()
1460 if (!res->hr_localflush) { in local_send_thread()
1461 ret = -1; in local_send_thread()
1465 ret = g_flush(res->hr_localfd); in local_send_thread()
1466 if (ret == -1) { in local_send_thread()
1468 res->hr_localflush = false; in local_send_thread()
1469 hio->hio_errors[ncomp] = errno; in local_send_thread()
1474 hio->hio_errors[ncomp] = 0; in local_send_thread()
1479 if (refcnt_release(&hio->hio_writecount) == 0) { in local_send_thread()
1483 if (refcnt_release(&hio->hio_countdown) > 0) in local_send_thread()
1513 PJDLOG_ASSERT(res->hr_remotein != NULL); in keepalive_send()
1514 PJDLOG_ASSERT(res->hr_remoteout != NULL); in keepalive_send()
1525 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { in keepalive_send()
1572 ggio = &hio->hio_ggio; in remote_send_thread()
1573 switch (ggio->gctl_cmd) { in remote_send_thread()
1577 offset = ggio->gctl_offset; in remote_send_thread()
1578 length = ggio->gctl_length; in remote_send_thread()
1582 data = ggio->gctl_data; in remote_send_thread()
1583 offset = ggio->gctl_offset; in remote_send_thread()
1584 length = ggio->gctl_length; in remote_send_thread()
1589 offset = ggio->gctl_offset; in remote_send_thread()
1590 length = ggio->gctl_length; in remote_send_thread()
1603 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); in remote_send_thread()
1609 hio->hio_errors[ncomp] = nv_error(nv); in remote_send_thread()
1625 hio->hio_errors[ncomp] = ENOTCONN; in remote_send_thread()
1641 if (hast_proto_send(res, res->hr_remoteout, nv, data, in remote_send_thread()
1642 data != NULL ? length : 0) == -1) { in remote_send_thread()
1643 hio->hio_errors[ncomp] = errno; in remote_send_thread()
1649 strerror(hio->hio_errors[ncomp])); in remote_send_thread()
1661 if (refcnt_release(&hio->hio_countdown) > 0) in remote_send_thread()
1669 if (ggio->gctl_cmd == BIO_WRITE) { in remote_send_thread()
1670 mtx_lock(&res->hr_amp_lock); in remote_send_thread()
1671 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, in remote_send_thread()
1672 ggio->gctl_length)) { in remote_send_thread()
1675 mtx_unlock(&res->hr_amp_lock); in remote_send_thread()
1678 if (refcnt_release(&hio->hio_writecount) == 0) { in remote_send_thread()
1679 if (hio->hio_errors[0] == 0) in remote_send_thread()
1684 if (refcnt_release(&hio->hio_countdown) > 0) in remote_send_thread()
1731 * the done queue (one-by-one). in remote_recv_thread()
1738 hio_recv_list_size[ncomp]--; in remote_recv_thread()
1740 hio->hio_errors[ncomp] = ENOTCONN; in remote_recv_thread()
1743 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { in remote_recv_thread()
1760 if (hio->hio_ggio.gctl_seq == seq) { in remote_recv_thread()
1763 hio_recv_list_size[ncomp]--; in remote_recv_thread()
1774 ggio = &hio->hio_ggio; in remote_recv_thread()
1778 hio->hio_errors[ncomp] = error; in remote_recv_thread()
1784 switch (ggio->gctl_cmd) { in remote_recv_thread()
1792 if (hast_proto_recv_data(res, res->hr_remotein, nv, in remote_recv_thread()
1793 ggio->gctl_data, ggio->gctl_length) == -1) { in remote_recv_thread()
1794 hio->hio_errors[ncomp] = errno; in remote_recv_thread()
1811 hio->hio_errors[ncomp] = 0; in remote_recv_thread()
1815 if (!hio->hio_memsyncacked) { in remote_recv_thread()
1817 hio->hio_errors[ncomp] != 0); in remote_recv_thread()
1819 if (refcnt_release(&hio->hio_writecount) == 0) { in remote_recv_thread()
1820 if (hio->hio_errors[0] == 0) in remote_recv_thread()
1823 hio->hio_memsyncacked = true; in remote_recv_thread()
1824 if (hio->hio_errors[ncomp] == 0) { in remote_recv_thread()
1840 if (refcnt_release(&hio->hio_countdown) > 0) in remote_recv_thread()
1875 ggio = &hio->hio_ggio; in ggate_send_thread()
1877 if (hio->hio_errors[ii] == 0) { in ggate_send_thread()
1882 ggio->gctl_error = 0; in ggate_send_thread()
1892 if (ggio->gctl_cmd == BIO_READ && in ggate_send_thread()
1893 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) in ggate_send_thread()
1894 ggio->gctl_error = hio->hio_errors[1]; in ggate_send_thread()
1896 ggio->gctl_error = hio->hio_errors[0]; in ggate_send_thread()
1898 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { in ggate_send_thread()
1899 mtx_lock(&res->hr_amp_lock); in ggate_send_thread()
1900 if (activemap_write_complete(res->hr_amp, in ggate_send_thread()
1901 ggio->gctl_offset, ggio->gctl_length)) { in ggate_send_thread()
1902 res->hr_stat_activemap_update++; in ggate_send_thread()
1905 mtx_unlock(&res->hr_amp_lock); in ggate_send_thread()
1908 if (ggio->gctl_cmd == BIO_WRITE) { in ggate_send_thread()
1913 rangelock_del(range_regular, ggio->gctl_offset, in ggate_send_thread()
1914 ggio->gctl_length); in ggate_send_thread()
1918 if (!hio->hio_done) in ggate_send_thread()
1921 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { in ggate_send_thread()
1926 if (hio->hio_errors[0]) { in ggate_send_thread()
1927 switch (ggio->gctl_cmd) { in ggate_send_thread()
1929 res->hr_stat_read_error++; in ggate_send_thread()
1932 res->hr_stat_write_error++; in ggate_send_thread()
1935 res->hr_stat_delete_error++; in ggate_send_thread()
1938 res->hr_stat_flush_error++; in ggate_send_thread()
1968 offset = -1; in sync_thread()
1991 mtx_lock(&res->hr_amp_lock); in sync_thread()
1993 activemap_sync_rewind(res->hr_amp); in sync_thread()
1994 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); in sync_thread()
1995 if (syncext != -1) { in sync_thread()
2000 if (activemap_extent_complete(res->hr_amp, syncext)) in sync_thread()
2003 mtx_unlock(&res->hr_amp_lock); in sync_thread()
2005 mtx_unlock(&res->hr_amp_lock); in sync_thread()
2009 if (offset == -1) in sync_thread()
2010 pjdlog_info("Nodes are in sync."); in sync_thread()
2013 (intmax_t)(res->hr_extentsize * in sync_thread()
2014 activemap_ndirty(res->hr_amp))); in sync_thread()
2019 if (offset == -1) { in sync_thread()
2044 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) in sync_thread()
2046 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; in sync_thread()
2047 res->hr_primary_localcnt = in sync_thread()
2048 res->hr_secondary_remotecnt; in sync_thread()
2049 res->hr_primary_remotecnt = in sync_thread()
2050 res->hr_secondary_localcnt; in sync_thread()
2053 (uintmax_t)res->hr_primary_localcnt, in sync_thread()
2054 (uintmax_t)res->hr_primary_remotecnt); in sync_thread()
2065 pjdlog_debug(2, "sync: Taking free request."); in sync_thread()
2067 pjdlog_debug(2, "sync: (%p) Got free request.", hio); in sync_thread()
2076 "sync: Range offset=%jd length=%jd locked.", in sync_thread()
2084 if (rangelock_add(range_sync, offset, length) == -1) { in sync_thread()
2087 "sync: Range offset=%jd length=%jd is already locked, waiting.", in sync_thread()
2099 ggio = &hio->hio_ggio; in sync_thread()
2100 ggio->gctl_cmd = BIO_READ; in sync_thread()
2101 ggio->gctl_offset = offset; in sync_thread()
2102 ggio->gctl_length = length; in sync_thread()
2103 ggio->gctl_error = 0; in sync_thread()
2104 hio->hio_done = false; in sync_thread()
2105 hio->hio_replication = res->hr_replication; in sync_thread()
2107 hio->hio_errors[ii] = EINVAL; in sync_thread()
2108 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", in sync_thread()
2110 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", in sync_thread()
2113 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { in sync_thread()
2115 * This range is up-to-date on local component, in sync_thread()
2120 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { in sync_thread()
2121 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); in sync_thread()
2123 * This range is out-of-date on local component, in sync_thread()
2130 refcnt_init(&hio->hio_countdown, 1); in sync_thread()
2141 if (hio->hio_errors[ncomp] != 0) { in sync_thread()
2143 strerror(hio->hio_errors[ncomp])); in sync_thread()
2152 ggio->gctl_cmd = BIO_WRITE; in sync_thread()
2154 hio->hio_errors[ii] = EINVAL; in sync_thread()
2155 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", in sync_thread()
2157 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", in sync_thread()
2160 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { in sync_thread()
2162 * This range is up-to-date on local component, in sync_thread()
2163 * so we update remote component. in sync_thread()
2167 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { in sync_thread()
2168 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); in sync_thread()
2170 * This range is out-of-date on local component, in sync_thread()
2171 * so we update it. in sync_thread()
2178 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", in sync_thread()
2180 refcnt_init(&hio->hio_countdown, 1); in sync_thread()
2191 if (hio->hio_errors[ncomp] != 0) { in sync_thread()
2193 strerror(hio->hio_errors[ncomp])); in sync_thread()
2204 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", in sync_thread()
2221 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); in primary_config_reload()
2245 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { in primary_config_reload()
2247 * Don't copy res->hr_remoteaddr to gres just yet. in primary_config_reload()
2254 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { in primary_config_reload()
2255 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); in primary_config_reload()
2259 if (gres->hr_replication != vint) { in primary_config_reload()
2260 gres->hr_replication = vint; in primary_config_reload()
2264 if (gres->hr_checksum != vint) { in primary_config_reload()
2265 gres->hr_checksum = vint; in primary_config_reload()
2269 if (gres->hr_compression != vint) { in primary_config_reload()
2270 gres->hr_compression = vint; in primary_config_reload()
2274 if (gres->hr_timeout != vint) { in primary_config_reload()
2275 gres->hr_timeout = vint; in primary_config_reload()
2279 if (strcmp(gres->hr_exec, vstr) != 0) { in primary_config_reload()
2280 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); in primary_config_reload()
2284 if (gres->hr_metaflush != vint) { in primary_config_reload()
2285 gres->hr_metaflush = vint; in primary_config_reload()
2304 if (proto_timeout(gres->hr_remotein, in primary_config_reload()
2305 gres->hr_timeout) == -1) { in primary_config_reload()
2309 if (proto_timeout(gres->hr_remoteout, in primary_config_reload()
2310 gres->hr_timeout) == -1) { in primary_config_reload()
2324 strlcpy(gres->hr_remoteaddr, vstr, in primary_config_reload()
2325 sizeof(gres->hr_remoteaddr)); in primary_config_reload()
2356 PJDLOG_ASSERT(res->hr_remotein != NULL); in guard_one()
2357 PJDLOG_ASSERT(res->hr_remoteout != NULL); in guard_one()
2360 res->hr_remoteaddr); in guard_one()
2364 PJDLOG_ASSERT(res->hr_remotein == NULL); in guard_one()
2365 PJDLOG_ASSERT(res->hr_remoteout == NULL); in guard_one()
2372 res->hr_remoteaddr); in guard_one()
2376 PJDLOG_ASSERT(res->hr_remotein == NULL); in guard_one()
2377 PJDLOG_ASSERT(res->hr_remoteout == NULL); in guard_one()
2379 res->hr_remotein = in; in guard_one()
2380 res->hr_remoteout = out; in guard_one()
2383 res->hr_remoteaddr); in guard_one()
2387 PJDLOG_ASSERT(res->hr_remotein == NULL); in guard_one()
2388 PJDLOG_ASSERT(res->hr_remoteout == NULL); in guard_one()
2391 res->hr_remoteaddr); in guard_one()
2406 sigset_t mask; in guard_thread() local
2412 PJDLOG_VERIFY(sigemptyset(&mask) == 0); in guard_thread()
2413 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); in guard_thread()
2414 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); in guard_thread()
2418 signo = -1; in guard_thread()
2446 signo = sigtimedwait(&mask, NULL, &timeout); in guard_thread()