Lines Matching +full:send +full:- +full:flush +full:- +full:out +full:- +full:sequence
1 /*-
2 * SPDX-License-Identifier: BSD-2-Clause
4 * Copyright (c) 2009-2010 The FreeBSD Foundation
29 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
81 * until some in-progress requests are freed.
126 hio_##name##_list_size--; \
144 hio->hio_seq = 0; in hio_clear()
145 hio->hio_error = 0; in hio_clear()
146 hio->hio_cmd = HIO_UNDEF; in hio_clear()
147 hio->hio_offset = 0; in hio_clear()
148 hio->hio_length = 0; in hio_clear()
149 hio->hio_memsync = false; in hio_clear()
160 dsthio->hio_seq = srchio->hio_seq; in hio_copy()
161 dsthio->hio_cmd = srchio->hio_cmd; in hio_copy()
162 dsthio->hio_offset = srchio->hio_offset; in hio_copy()
163 dsthio->hio_length = srchio->hio_length; in hio_copy()
164 dsthio->hio_memsync = srchio->hio_memsync; in hio_copy()
196 hio->hio_data = malloc(MAXPHYS); in init_environment()
197 if (hio->hio_data == NULL) { in init_environment()
212 if (metadata_read(res, true) == -1) in init_local()
226 if (proto_send(res->hr_remoteout, NULL, 0) == -1) in init_remote()
231 nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); in init_remote()
232 nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); in init_remote()
234 res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); in init_remote()
235 res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); in init_remote()
236 nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); in init_remote()
237 nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); in init_remote()
238 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - in init_remote()
239 METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); in init_remote()
251 * Split-brain condition is when both nodes are not able to communicate in init_remote()
254 * Under split-brain condition we will increase our localcnt on first in init_remote()
275 * local=3 remote=3 local=3 remote=2 Primary is out-of-date, in init_remote()
279 * local=3 remote=3 local=4 remote=2 Split-brain condition. in init_remote()
280 * local=3 remote=3 local=4 remote=3 Secondary out-of-date, in init_remote()
284 if (res->hr_resuid == 0) { in init_remote()
291 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); in init_remote()
292 res->hr_resuid = resuid; in init_remote()
293 if (metadata_write(res) == -1) in init_remote()
304 } else if (res->hr_resuid != resuid) { in init_remote()
310 (uintmax_t)resuid, (uintmax_t)res->hr_resuid); in init_remote()
313 if (hast_proto_send(res, res->hr_remotein, nvout, in init_remote()
314 NULL, 0) == -1) { in init_remote()
316 "Unable to send response to %s", in init_remote()
317 res->hr_remoteaddr); in init_remote()
322 /* Is primary out-of-date? */ in init_remote()
323 (res->hr_secondary_localcnt > res->hr_primary_remotecnt && in init_remote()
324 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || in init_remote()
326 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && in init_remote()
327 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || in init_remote()
328 /* Is secondary out-of-date? */ in init_remote()
329 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && in init_remote()
330 res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { in init_remote()
333 * out-of-date. in init_remote()
335 * send out local bitmap to the remote node. in init_remote()
337 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != in init_remote()
341 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && in init_remote()
342 res->hr_secondary_remotecnt == res->hr_primary_localcnt) { in init_remote()
343 /* Primary is out-of-date, sync from secondary. */ in init_remote()
347 * Secondary is out-of-date or counts match. in init_remote()
352 } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && in init_remote()
353 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { in init_remote()
355 * Not good, we have split-brain condition. in init_remote()
358 pjdlog_error("Split-brain detected, exiting."); in init_remote()
359 nv_add_string(nvout, "Split-brain condition!", "errmsg"); in init_remote()
360 if (hast_proto_send(res, res->hr_remotein, nvout, in init_remote()
361 NULL, 0) == -1) { in init_remote()
363 "Unable to send response to %s", in init_remote()
364 res->hr_remoteaddr); in init_remote()
367 /* Exit on split-brain. */ in init_remote()
370 } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || in init_remote()
371 res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { in init_remote()
376 PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt || in init_remote()
377 res->hr_primary_localcnt < res->hr_secondary_remotecnt); in init_remote()
378 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - in init_remote()
379 METADATA_SIZE, res->hr_extentsize, in init_remote()
380 res->hr_local_sectorsize); in init_remote()
382 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { in init_remote()
390 (uintmax_t)res->hr_primary_localcnt, in init_remote()
391 (uintmax_t)res->hr_primary_remotecnt, in init_remote()
392 (uintmax_t)res->hr_secondary_localcnt, in init_remote()
393 (uintmax_t)res->hr_secondary_remotecnt); in init_remote()
396 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) { in init_remote()
397 pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s", in init_remote()
398 res->hr_remoteaddr); in init_remote()
405 if (proto_recv(res->hr_remotein, NULL, 0) == -1) in init_remote()
421 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { in hastd_secondary()
429 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { in hastd_secondary()
436 if (pid == -1) { in hastd_secondary()
443 proto_close(res->hr_remotein); in hastd_secondary()
444 res->hr_remotein = NULL; in hastd_secondary()
445 proto_close(res->hr_remoteout); in hastd_secondary()
446 res->hr_remoteout = NULL; in hastd_secondary()
448 proto_recv(res->hr_event, NULL, 0); in hastd_secondary()
450 proto_send(res->hr_ctrl, NULL, 0); in hastd_secondary()
451 res->hr_workerpid = pid; in hastd_secondary()
456 res->output_status_aux = output_status_aux; in hastd_secondary()
461 proto_send(res->hr_event, NULL, 0); in hastd_secondary()
463 proto_recv(res->hr_ctrl, NULL, 0); in hastd_secondary()
470 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); in hastd_secondary()
471 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); in hastd_secondary()
477 if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1) in hastd_secondary()
479 if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1) in hastd_secondary()
522 switch (hio->hio_cmd) { in reqlog()
524 (void)snprintf(msg + len, sizeof(msg) - len, in reqlog()
525 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, in reqlog()
526 (uintmax_t)hio->hio_length); in reqlog()
529 (void)snprintf(msg + len, sizeof(msg) - len, in reqlog()
530 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, in reqlog()
531 (uintmax_t)hio->hio_length); in reqlog()
534 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); in reqlog()
537 (void)snprintf(msg + len, sizeof(msg) - len, in reqlog()
538 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, in reqlog()
539 (uintmax_t)hio->hio_length); in reqlog()
542 (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); in reqlog()
545 (void)snprintf(msg + len, sizeof(msg) - len, in reqlog()
546 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); in reqlog()
557 hio->hio_cmd = nv_get_uint8(nv, "cmd"); in requnpack()
558 if (hio->hio_cmd == 0) { in requnpack()
560 hio->hio_error = EINVAL; in requnpack()
563 if (hio->hio_cmd != HIO_KEEPALIVE) { in requnpack()
564 hio->hio_seq = nv_get_uint64(nv, "seq"); in requnpack()
565 if (hio->hio_seq == 0) { in requnpack()
567 hio->hio_error = EINVAL; in requnpack()
571 switch (hio->hio_cmd) { in requnpack()
576 hio->hio_memsync = nv_exists(nv, "memsync"); in requnpack()
580 hio->hio_offset = nv_get_uint64(nv, "offset"); in requnpack()
583 hio->hio_error = EINVAL; in requnpack()
586 hio->hio_length = nv_get_uint64(nv, "length"); in requnpack()
589 hio->hio_error = EINVAL; in requnpack()
592 if (hio->hio_length == 0) { in requnpack()
594 hio->hio_error = EINVAL; in requnpack()
597 if (hio->hio_cmd != HIO_DELETE && hio->hio_length > MAXPHYS) { in requnpack()
599 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); in requnpack()
600 hio->hio_error = EINVAL; in requnpack()
603 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { in requnpack()
605 (uintmax_t)hio->hio_offset); in requnpack()
606 hio->hio_error = EINVAL; in requnpack()
609 if ((hio->hio_length % res->hr_local_sectorsize) != 0) { in requnpack()
611 (uintmax_t)hio->hio_length); in requnpack()
612 hio->hio_error = EINVAL; in requnpack()
615 if (hio->hio_offset + hio->hio_length > in requnpack()
616 (uint64_t)res->hr_datasize) { in requnpack()
618 (uintmax_t)(hio->hio_offset + hio->hio_length), in requnpack()
619 (uintmax_t)res->hr_datasize); in requnpack()
620 hio->hio_error = EINVAL; in requnpack()
626 hio->hio_cmd); in requnpack()
627 hio->hio_error = EINVAL; in requnpack()
630 hio->hio_error = 0; in requnpack()
632 return (hio->hio_error); in requnpack()
662 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { in recv_thread()
669 "recv: (%p) Moving request to the send queue.", in recv_thread()
671 QUEUE_INSERT(send, hio); in recv_thread()
674 switch (hio->hio_cmd) { in recv_thread()
676 res->hr_stat_read++; in recv_thread()
679 res->hr_stat_write++; in recv_thread()
682 res->hr_stat_delete++; in recv_thread()
685 res->hr_stat_flush++; in recv_thread()
691 hio->hio_cmd); in recv_thread()
693 reqlog(LOG_DEBUG, 2, -1, hio, in recv_thread()
695 if (hio->hio_cmd == HIO_KEEPALIVE) { in recv_thread()
703 } else if (hio->hio_cmd == HIO_WRITE) { in recv_thread()
704 if (hast_proto_recv_data(res, res->hr_remotein, nv, in recv_thread()
705 hio->hio_data, MAXPHYS) == -1) { in recv_thread()
709 if (hio->hio_memsync) { in recv_thread()
719 mshio->hio_error = 0; in recv_thread()
722 * request going onto send queue (mshio). in recv_thread()
724 hio->hio_memsync = false; in recv_thread()
726 "recv: (%p) Moving memsync request to the send queue.", in recv_thread()
728 QUEUE_INSERT(send, mshio); in recv_thread()
742 * FLUSH requests.
767 activemap_calc_ondisk_size(res->hr_local_mediasize - in disk_thread()
768 METADATA_SIZE, res->hr_extentsize, in disk_thread()
769 res->hr_local_sectorsize); in disk_thread()
775 if (pwrite(res->hr_localfd, map, mapsize, in disk_thread()
780 res->hr_stat_activemap_write_error++; in disk_thread()
788 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); in disk_thread()
791 switch (hio->hio_cmd) { in disk_thread()
793 ret = pread(res->hr_localfd, hio->hio_data, in disk_thread()
794 hio->hio_length, in disk_thread()
795 hio->hio_offset + res->hr_localoff); in disk_thread()
796 if (ret == -1) in disk_thread()
797 hio->hio_error = errno; in disk_thread()
798 else if (ret != (int64_t)hio->hio_length) in disk_thread()
799 hio->hio_error = EIO; in disk_thread()
801 hio->hio_error = 0; in disk_thread()
804 ret = pwrite(res->hr_localfd, hio->hio_data, in disk_thread()
805 hio->hio_length, in disk_thread()
806 hio->hio_offset + res->hr_localoff); in disk_thread()
807 if (ret == -1) in disk_thread()
808 hio->hio_error = errno; in disk_thread()
809 else if (ret != (int64_t)hio->hio_length) in disk_thread()
810 hio->hio_error = EIO; in disk_thread()
812 hio->hio_error = 0; in disk_thread()
815 ret = g_delete(res->hr_localfd, in disk_thread()
816 hio->hio_offset + res->hr_localoff, in disk_thread()
817 hio->hio_length); in disk_thread()
818 if (ret == -1) in disk_thread()
819 hio->hio_error = errno; in disk_thread()
821 hio->hio_error = 0; in disk_thread()
824 if (!res->hr_localflush) { in disk_thread()
825 ret = -1; in disk_thread()
826 hio->hio_error = EOPNOTSUPP; in disk_thread()
830 ret = g_flush(res->hr_localfd); in disk_thread()
831 if (ret == -1) { in disk_thread()
833 res->hr_localflush = false; in disk_thread()
834 hio->hio_error = errno; in disk_thread()
836 hio->hio_error = 0; in disk_thread()
841 hio->hio_cmd); in disk_thread()
843 if (logerror && hio->hio_error != 0) { in disk_thread()
844 reqlog(LOG_ERR, 0, hio->hio_error, hio, in disk_thread()
847 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", in disk_thread()
849 QUEUE_INSERT(send, hio); in disk_thread()
868 pjdlog_debug(2, "send: Taking request."); in send_thread()
869 QUEUE_TAKE(send, hio); in send_thread()
870 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); in send_thread()
872 /* Copy sequence number. */ in send_thread()
873 nv_add_uint64(nvout, hio->hio_seq, "seq"); in send_thread()
874 if (hio->hio_memsync) { in send_thread()
875 PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE); in send_thread()
878 switch (hio->hio_cmd) { in send_thread()
880 if (hio->hio_error == 0) { in send_thread()
881 data = hio->hio_data; in send_thread()
882 length = hio->hio_length; in send_thread()
886 * We send no data in case of an error. in send_thread()
897 hio->hio_cmd); in send_thread()
899 if (hio->hio_error != 0) { in send_thread()
900 switch (hio->hio_cmd) { in send_thread()
902 res->hr_stat_read_error++; in send_thread()
905 res->hr_stat_write_error++; in send_thread()
908 res->hr_stat_delete_error++; in send_thread()
911 res->hr_stat_flush_error++; in send_thread()
914 nv_add_int16(nvout, hio->hio_error, "error"); in send_thread()
916 if (hast_proto_send(res, res->hr_remoteout, nvout, data, in send_thread()
917 length) == -1) { in send_thread()
918 secondary_exit(EX_TEMPFAIL, "Unable to send reply"); in send_thread()
921 pjdlog_debug(2, "send: (%p) Moving request to the free queue.", in send_thread()