xref: /freebsd/sbin/hastd/secondary.c (revision b2db760808f74bb53c232900091c9da801ebbfcc)
1 /*-
2  * Copyright (c) 2009-2010 The FreeBSD Foundation
3  * All rights reserved.
4  *
5  * This software was developed by Pawel Jakub Dawidek under sponsorship from
6  * the FreeBSD Foundation.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  */
29 
30 #include <sys/cdefs.h>
31 __FBSDID("$FreeBSD$");
32 
33 #include <sys/param.h>
34 #include <sys/time.h>
35 #include <sys/bio.h>
36 #include <sys/disk.h>
37 #include <sys/stat.h>
38 
39 #include <assert.h>
40 #include <err.h>
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <libgeom.h>
44 #include <pthread.h>
45 #include <stdint.h>
46 #include <stdio.h>
47 #include <string.h>
48 #include <sysexits.h>
49 #include <unistd.h>
50 
51 #include <activemap.h>
52 #include <nv.h>
53 #include <pjdlog.h>
54 
55 #include "control.h"
56 #include "hast.h"
57 #include "hast_proto.h"
58 #include "hastd.h"
59 #include "metadata.h"
60 #include "proto.h"
61 #include "subr.h"
62 #include "synch.h"
63 
64 struct hio {
65 	uint64_t 	 hio_seq;
66 	int	 	 hio_error;
67 	struct nv	*hio_nv;
68 	void		*hio_data;
69 	uint8_t		 hio_cmd;
70 	uint64_t	 hio_offset;
71 	uint64_t	 hio_length;
72 	TAILQ_ENTRY(hio) hio_next;
73 };
74 
75 /*
76  * Free list holds unused structures. When free list is empty, we have to wait
77  * until some in-progress requests are freed.
78  */
79 static TAILQ_HEAD(, hio) hio_free_list;
80 static pthread_mutex_t hio_free_list_lock;
81 static pthread_cond_t hio_free_list_cond;
82 /*
83  * Disk thread (the one that do I/O requests) takes requests from this list.
84  */
85 static TAILQ_HEAD(, hio) hio_disk_list;
86 static pthread_mutex_t hio_disk_list_lock;
87 static pthread_cond_t hio_disk_list_cond;
88 /*
89  * There is one recv list for every component, although local components don't
90  * use recv lists as local requests are done synchronously.
91  */
92 static TAILQ_HEAD(, hio) hio_send_list;
93 static pthread_mutex_t hio_send_list_lock;
94 static pthread_cond_t hio_send_list_cond;
95 
96 /*
97  * Maximum number of outstanding I/O requests.
98  */
99 #define	HAST_HIO_MAX	256
100 
101 static void *recv_thread(void *arg);
102 static void *disk_thread(void *arg);
103 static void *send_thread(void *arg);
104 
105 static void
106 init_environment(void)
107 {
108 	struct hio *hio;
109 	unsigned int ii;
110 
111 	/*
112 	 * Initialize lists, their locks and theirs condition variables.
113 	 */
114 	TAILQ_INIT(&hio_free_list);
115 	mtx_init(&hio_free_list_lock);
116 	cv_init(&hio_free_list_cond);
117 	TAILQ_INIT(&hio_disk_list);
118 	mtx_init(&hio_disk_list_lock);
119 	cv_init(&hio_disk_list_cond);
120 	TAILQ_INIT(&hio_send_list);
121 	mtx_init(&hio_send_list_lock);
122 	cv_init(&hio_send_list_cond);
123 
124 	/*
125 	 * Allocate requests pool and initialize requests.
126 	 */
127 	for (ii = 0; ii < HAST_HIO_MAX; ii++) {
128 		hio = malloc(sizeof(*hio));
129 		if (hio == NULL) {
130 			pjdlog_exitx(EX_TEMPFAIL,
131 			    "Unable to allocate memory (%zu bytes) for hio request.",
132 			    sizeof(*hio));
133 		}
134 		hio->hio_error = 0;
135 		hio->hio_data = malloc(MAXPHYS);
136 		if (hio->hio_data == NULL) {
137 			pjdlog_exitx(EX_TEMPFAIL,
138 			    "Unable to allocate memory (%zu bytes) for gctl_data.",
139 			    (size_t)MAXPHYS);
140 		}
141 		TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
142 	}
143 }
144 
145 static void
146 init_local(struct hast_resource *res)
147 {
148 
149 	if (metadata_read(res, true) < 0)
150 		exit(EX_NOINPUT);
151 }
152 
153 static void
154 init_remote(struct hast_resource *res, struct nv *nvin)
155 {
156 	uint64_t resuid;
157 	struct nv *nvout;
158 	unsigned char *map;
159 	size_t mapsize;
160 
161 	map = NULL;
162 	mapsize = 0;
163 	nvout = nv_alloc();
164 	nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
165 	nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
166 	resuid = nv_get_uint64(nvin, "resuid");
167 	res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
168 	res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
169 	nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
170 	nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
171 	mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
172 	    METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
173 	map = malloc(mapsize);
174 	if (map == NULL) {
175 		pjdlog_exitx(EX_TEMPFAIL,
176 		    "Unable to allocate memory (%zu bytes) for activemap.",
177 		    mapsize);
178 	}
179 	nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
180 	/*
181 	 * When we work as primary and secondary is missing we will increase
182 	 * localcnt in our metadata. When secondary is connected and synced
183 	 * we make localcnt be equal to remotecnt, which means nodes are more
184 	 * or less in sync.
185 	 * Split-brain condition is when both nodes are not able to communicate
186 	 * and are both configured as primary nodes. In turn, they can both
187 	 * make incompatible changes to the data and we have to detect that.
188 	 * Under split-brain condition we will increase our localcnt on first
189 	 * write and remote node will increase its localcnt on first write.
190 	 * When we connect we can see that primary's localcnt is greater than
191 	 * our remotecnt (primary was modified while we weren't watching) and
192 	 * our localcnt is greater than primary's remotecnt (we were modified
193 	 * while primary wasn't watching).
194 	 * There are many possible combinations which are all gathered below.
195 	 * Don't pay too much attention to exact numbers, the more important
196 	 * is to compare them. We compare secondary's local with primary's
197 	 * remote and secondary's remote with primary's local.
198 	 * Note that every case where primary's localcnt is smaller than
199 	 * secondary's remotecnt and where secondary's localcnt is smaller than
200 	 * primary's remotecnt should be impossible in practise. We will perform
201 	 * full synchronization then. Those cases are marked with an asterisk.
202 	 * Regular synchronization means that only extents marked as dirty are
203 	 * synchronized (regular synchronization).
204 	 *
205 	 * SECONDARY METADATA PRIMARY METADATA
206 	 * local=3 remote=3   local=2 remote=2*  ?! Full sync from secondary.
207 	 * local=3 remote=3   local=2 remote=3*  ?! Full sync from primary.
208 	 * local=3 remote=3   local=2 remote=4*  ?! Full sync from primary.
209 	 * local=3 remote=3   local=3 remote=2   Primary is out-of-date,
210 	 *                                       regular sync from secondary.
211 	 * local=3 remote=3   local=3 remote=3   Regular sync just in case.
212 	 * local=3 remote=3   local=3 remote=4*  ?! Full sync from primary.
213 	 * local=3 remote=3   local=4 remote=2   Split-brain condition.
214 	 * local=3 remote=3   local=4 remote=3   Secondary out-of-date,
215 	 *                                       regular sync from primary.
216 	 * local=3 remote=3   local=4 remote=4*  ?! Full sync from primary.
217 	 */
218 	if (res->hr_resuid == 0) {
219 		/*
220 		 * Provider is used for the first time. Initialize everything.
221 		 */
222 		assert(res->hr_secondary_localcnt == 0);
223 		res->hr_resuid = resuid;
224 		if (metadata_write(res) < 0)
225 			exit(EX_NOINPUT);
226 		memset(map, 0xff, mapsize);
227 		nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
228 	} else if (
229 	    /* Is primary is out-of-date? */
230 	    (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
231 	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
232 	    /* Node are more or less in sync? */
233 	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
234 	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
235 	    /* Is secondary is out-of-date? */
236 	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
237 	     res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
238 		/*
239 		 * Nodes are more or less in sync or one of the nodes is
240 		 * out-of-date.
241 		 * It doesn't matter at this point which one, we just have to
242 		 * send out local bitmap to the remote node.
243 		 */
244 		if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
245 		    (ssize_t)mapsize) {
246 			pjdlog_exit(LOG_ERR, "Unable to read activemap");
247 		}
248 		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
249 		     res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
250 			/* Primary is out-of-date, sync from secondary. */
251 			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
252 		} else {
253 			/*
254 			 * Secondary is out-of-date or counts match.
255 			 * Sync from primary.
256 			 */
257 			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
258 		}
259 	} else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
260 	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
261 		/*
262 		 * Not good, we have split-brain condition.
263 		 */
264 		pjdlog_error("Split-brain detected, exiting.");
265 		nv_add_string(nvout, "Split-brain condition!", "errmsg");
266 		free(map);
267 		map = NULL;
268 		mapsize = 0;
269 	} else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
270 	    res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
271 		/*
272 		 * This should never happen in practise, but we will perform
273 		 * full synchronization.
274 		 */
275 		assert(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
276 		    res->hr_primary_localcnt < res->hr_secondary_remotecnt);
277 		mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
278 		    METADATA_SIZE, res->hr_extentsize,
279 		    res->hr_local_sectorsize);
280 		memset(map, 0xff, mapsize);
281 		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
282 			/* In this one of five cases sync from secondary. */
283 			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
284 		} else {
285 			/* For the rest four cases sync from primary. */
286 			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
287 		}
288 		pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
289 		    (uintmax_t)res->hr_primary_localcnt,
290 		    (uintmax_t)res->hr_primary_remotecnt,
291 		    (uintmax_t)res->hr_secondary_localcnt,
292 		    (uintmax_t)res->hr_secondary_remotecnt);
293 	}
294 	if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) {
295 		pjdlog_errno(LOG_WARNING, "Unable to send activemap to %s",
296 		    res->hr_remoteaddr);
297 		nv_free(nvout);
298 		exit(EX_TEMPFAIL);
299 	}
300 	nv_free(nvout);
301 	if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
302 	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
303 		/* Exit on split-brain. */
304 		exit(EX_CONFIG);
305 	}
306 }
307 
308 void
309 hastd_secondary(struct hast_resource *res, struct nv *nvin)
310 {
311 	pthread_t td;
312 	pid_t pid;
313 	int error;
314 
315 	/*
316 	 * Create communication channel between parent and child.
317 	 */
318 	if (proto_client("socketpair://", &res->hr_ctrl) < 0) {
319 		KEEP_ERRNO((void)pidfile_remove(pfh));
320 		pjdlog_exit(EX_OSERR,
321 		    "Unable to create control sockets between parent and child");
322 	}
323 
324 	pid = fork();
325 	if (pid < 0) {
326 		KEEP_ERRNO((void)pidfile_remove(pfh));
327 		pjdlog_exit(EX_OSERR, "Unable to fork");
328 	}
329 
330 	if (pid > 0) {
331 		/* This is parent. */
332 		proto_close(res->hr_remotein);
333 		res->hr_remotein = NULL;
334 		proto_close(res->hr_remoteout);
335 		res->hr_remoteout = NULL;
336 		res->hr_workerpid = pid;
337 		return;
338 	}
339 	(void)pidfile_close(pfh);
340 
341 	setproctitle("%s (secondary)", res->hr_name);
342 
343 	signal(SIGHUP, SIG_DFL);
344 	signal(SIGCHLD, SIG_DFL);
345 
346 	/* Error in setting timeout is not critical, but why should it fail? */
347 	if (proto_timeout(res->hr_remotein, 0) < 0)
348 		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
349 	if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0)
350 		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
351 
352 	init_local(res);
353 	init_remote(res, nvin);
354 	init_environment();
355 
356 	error = pthread_create(&td, NULL, recv_thread, res);
357 	assert(error == 0);
358 	error = pthread_create(&td, NULL, disk_thread, res);
359 	assert(error == 0);
360 	error = pthread_create(&td, NULL, send_thread, res);
361 	assert(error == 0);
362 	(void)ctrl_thread(res);
363 }
364 
365 static void
366 reqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...)
367 {
368 	char msg[1024];
369 	va_list ap;
370 	int len;
371 
372 	va_start(ap, fmt);
373 	len = vsnprintf(msg, sizeof(msg), fmt, ap);
374 	va_end(ap);
375 	if ((size_t)len < sizeof(msg)) {
376 		switch (hio->hio_cmd) {
377 		case HIO_READ:
378 			(void)snprintf(msg + len, sizeof(msg) - len,
379 			    "READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
380 			    (uintmax_t)hio->hio_length);
381 			break;
382 		case HIO_DELETE:
383 			(void)snprintf(msg + len, sizeof(msg) - len,
384 			    "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
385 			    (uintmax_t)hio->hio_length);
386 			break;
387 		case HIO_FLUSH:
388 			(void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
389 			break;
390 		case HIO_WRITE:
391 			(void)snprintf(msg + len, sizeof(msg) - len,
392 			    "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
393 			    (uintmax_t)hio->hio_length);
394 			break;
395 		default:
396 			(void)snprintf(msg + len, sizeof(msg) - len,
397 			    "UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
398 			break;
399 		}
400 	}
401 	pjdlog_common(loglevel, debuglevel, error, "%s", msg);
402 }
403 
404 static int
405 requnpack(struct hast_resource *res, struct hio *hio)
406 {
407 
408 	hio->hio_cmd = nv_get_uint8(hio->hio_nv, "cmd");
409 	if (hio->hio_cmd == 0) {
410 		pjdlog_error("Header contains no 'cmd' field.");
411 		hio->hio_error = EINVAL;
412 		goto end;
413 	}
414 	switch (hio->hio_cmd) {
415 	case HIO_READ:
416 	case HIO_WRITE:
417 	case HIO_DELETE:
418 		hio->hio_offset = nv_get_uint64(hio->hio_nv, "offset");
419 		if (nv_error(hio->hio_nv) != 0) {
420 			pjdlog_error("Header is missing 'offset' field.");
421 			hio->hio_error = EINVAL;
422 			goto end;
423 		}
424 		hio->hio_length = nv_get_uint64(hio->hio_nv, "length");
425 		if (nv_error(hio->hio_nv) != 0) {
426 			pjdlog_error("Header is missing 'length' field.");
427 			hio->hio_error = EINVAL;
428 			goto end;
429 		}
430 		if (hio->hio_length == 0) {
431 			pjdlog_error("Data length is zero.");
432 			hio->hio_error = EINVAL;
433 			goto end;
434 		}
435 		if (hio->hio_length > MAXPHYS) {
436 			pjdlog_error("Data length is too large (%ju > %ju).",
437 			    (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
438 			hio->hio_error = EINVAL;
439 			goto end;
440 		}
441 		if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
442 			pjdlog_error("Offset %ju is not multiple of sector size.",
443 			    (uintmax_t)hio->hio_offset);
444 			hio->hio_error = EINVAL;
445 			goto end;
446 		}
447 		if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
448 			pjdlog_error("Length %ju is not multiple of sector size.",
449 			    (uintmax_t)hio->hio_length);
450 			hio->hio_error = EINVAL;
451 			goto end;
452 		}
453 		if (hio->hio_offset + hio->hio_length >
454 		    (uint64_t)res->hr_datasize) {
455 			pjdlog_error("Data offset is too large (%ju > %ju).",
456 			    (uintmax_t)(hio->hio_offset + hio->hio_length),
457 			    (uintmax_t)res->hr_datasize);
458 			hio->hio_error = EINVAL;
459 			goto end;
460 		}
461 		break;
462 	default:
463 		pjdlog_error("Header contains invalid 'cmd' (%hhu).",
464 		    hio->hio_cmd);
465 		hio->hio_error = EINVAL;
466 		goto end;
467 	}
468 	hio->hio_error = 0;
469 end:
470 	return (hio->hio_error);
471 }
472 
473 /*
474  * Thread receives requests from the primary node.
475  */
476 static void *
477 recv_thread(void *arg)
478 {
479 	struct hast_resource *res = arg;
480 	struct hio *hio;
481 	bool wakeup;
482 
483 	for (;;) {
484 		pjdlog_debug(2, "recv: Taking free request.");
485 		mtx_lock(&hio_free_list_lock);
486 		while ((hio = TAILQ_FIRST(&hio_free_list)) == NULL) {
487 			pjdlog_debug(2, "recv: No free requests, waiting.");
488 			cv_wait(&hio_free_list_cond, &hio_free_list_lock);
489 		}
490 		TAILQ_REMOVE(&hio_free_list, hio, hio_next);
491 		mtx_unlock(&hio_free_list_lock);
492 		pjdlog_debug(2, "recv: (%p) Got request.", hio);
493 		if (hast_proto_recv_hdr(res->hr_remotein, &hio->hio_nv) < 0) {
494 			pjdlog_exit(EX_TEMPFAIL,
495 			    "Unable to receive request header");
496 		}
497 		if (requnpack(res, hio) != 0)
498 			goto send_queue;
499 		reqlog(LOG_DEBUG, 2, -1, hio,
500 		    "recv: (%p) Got request header: ", hio);
501 		if (hio->hio_cmd == HIO_WRITE) {
502 			if (hast_proto_recv_data(res, res->hr_remotein,
503 			    hio->hio_nv, hio->hio_data, MAXPHYS) < 0) {
504 				pjdlog_exit(EX_TEMPFAIL,
505 				    "Unable to receive reply data");
506 			}
507 		}
508 		pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
509 		    hio);
510 		mtx_lock(&hio_disk_list_lock);
511 		wakeup = TAILQ_EMPTY(&hio_disk_list);
512 		TAILQ_INSERT_TAIL(&hio_disk_list, hio, hio_next);
513 		mtx_unlock(&hio_disk_list_lock);
514 		if (wakeup)
515 			cv_signal(&hio_disk_list_cond);
516 		continue;
517 send_queue:
518 		pjdlog_debug(2, "recv: (%p) Moving request to the send queue.",
519 		    hio);
520 		mtx_lock(&hio_send_list_lock);
521 		wakeup = TAILQ_EMPTY(&hio_send_list);
522 		TAILQ_INSERT_TAIL(&hio_send_list, hio, hio_next);
523 		mtx_unlock(&hio_send_list_lock);
524 		if (wakeup)
525 			cv_signal(&hio_send_list_cond);
526 	}
527 	/* NOTREACHED */
528 	return (NULL);
529 }
530 
531 /*
532  * Thread reads from or writes to local component and also handles DELETE and
533  * FLUSH requests.
534  */
535 static void *
536 disk_thread(void *arg)
537 {
538 	struct hast_resource *res = arg;
539 	struct hio *hio;
540 	ssize_t ret;
541 	bool clear_activemap, wakeup;
542 
543 	clear_activemap = true;
544 
545 	for (;;) {
546 		pjdlog_debug(2, "disk: Taking request.");
547 		mtx_lock(&hio_disk_list_lock);
548 		while ((hio = TAILQ_FIRST(&hio_disk_list)) == NULL) {
549 			pjdlog_debug(2, "disk: No requests, waiting.");
550 			cv_wait(&hio_disk_list_cond, &hio_disk_list_lock);
551 		}
552 		TAILQ_REMOVE(&hio_disk_list, hio, hio_next);
553 		mtx_unlock(&hio_disk_list_lock);
554 		while (clear_activemap) {
555 			unsigned char *map;
556 			size_t mapsize;
557 
558 			/*
559 			 * When first request is received, it means that primary
560 			 * already received our activemap, merged it and stored
561 			 * locally. We can now safely clear our activemap.
562 			 */
563 			mapsize =
564 			    activemap_calc_ondisk_size(res->hr_local_mediasize -
565 			    METADATA_SIZE, res->hr_extentsize,
566 			    res->hr_local_sectorsize);
567 			map = calloc(1, mapsize);
568 			if (map == NULL) {
569 				pjdlog_warning("Unable to allocate memory to clear local activemap.");
570 				break;
571 			}
572 			if (pwrite(res->hr_localfd, map, mapsize,
573 			    METADATA_SIZE) != (ssize_t)mapsize) {
574 				pjdlog_errno(LOG_WARNING,
575 				    "Unable to store cleared activemap");
576 				free(map);
577 				break;
578 			}
579 			free(map);
580 			clear_activemap = false;
581 			pjdlog_debug(1, "Local activemap cleared.");
582 		}
583 		reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
584 		/* Handle the actual request. */
585 		switch (hio->hio_cmd) {
586 		case HIO_READ:
587 			ret = pread(res->hr_localfd, hio->hio_data,
588 			    hio->hio_length,
589 			    hio->hio_offset + res->hr_localoff);
590 			if (ret < 0)
591 				hio->hio_error = errno;
592 			else if (ret != (int64_t)hio->hio_length)
593 				hio->hio_error = EIO;
594 			else
595 				hio->hio_error = 0;
596 			break;
597 		case HIO_WRITE:
598 			ret = pwrite(res->hr_localfd, hio->hio_data,
599 			    hio->hio_length,
600 			    hio->hio_offset + res->hr_localoff);
601 			if (ret < 0)
602 				hio->hio_error = errno;
603 			else if (ret != (int64_t)hio->hio_length)
604 				hio->hio_error = EIO;
605 			else
606 				hio->hio_error = 0;
607 			break;
608 		case HIO_DELETE:
609 			ret = g_delete(res->hr_localfd,
610 			    hio->hio_offset + res->hr_localoff,
611 			    hio->hio_length);
612 			if (ret < 0)
613 				hio->hio_error = errno;
614 			else
615 				hio->hio_error = 0;
616 			break;
617 		case HIO_FLUSH:
618 			ret = g_flush(res->hr_localfd);
619 			if (ret < 0)
620 				hio->hio_error = errno;
621 			else
622 				hio->hio_error = 0;
623 			break;
624 		}
625 		if (hio->hio_error != 0) {
626 			reqlog(LOG_ERR, 0, hio->hio_error, hio,
627 			    "Request failed: ");
628 		}
629 		pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
630 		    hio);
631 		mtx_lock(&hio_send_list_lock);
632 		wakeup = TAILQ_EMPTY(&hio_send_list);
633 		TAILQ_INSERT_TAIL(&hio_send_list, hio, hio_next);
634 		mtx_unlock(&hio_send_list_lock);
635 		if (wakeup)
636 			cv_signal(&hio_send_list_cond);
637 	}
638 	/* NOTREACHED */
639 	return (NULL);
640 }
641 
642 /*
643  * Thread sends requests back to primary node.
644  */
645 static void *
646 send_thread(void *arg)
647 {
648 	struct hast_resource *res = arg;
649 	struct nv *nvout;
650 	struct hio *hio;
651 	void *data;
652 	size_t length;
653 	bool wakeup;
654 
655 	for (;;) {
656 		pjdlog_debug(2, "send: Taking request.");
657 		mtx_lock(&hio_send_list_lock);
658 		while ((hio = TAILQ_FIRST(&hio_send_list)) == NULL) {
659 			pjdlog_debug(2, "send: No requests, waiting.");
660 			cv_wait(&hio_send_list_cond, &hio_send_list_lock);
661 		}
662 		TAILQ_REMOVE(&hio_send_list, hio, hio_next);
663 		mtx_unlock(&hio_send_list_lock);
664 		reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
665 		nvout = nv_alloc();
666 		/* Copy sequence number. */
667 		nv_add_uint64(nvout, nv_get_uint64(hio->hio_nv, "seq"), "seq");
668 		switch (hio->hio_cmd) {
669 		case HIO_READ:
670 			if (hio->hio_error == 0) {
671 				data = hio->hio_data;
672 				length = hio->hio_length;
673 				break;
674 			}
675 			/*
676 			 * We send no data in case of an error.
677 			 */
678 			/* FALLTHROUGH */
679 		case HIO_DELETE:
680 		case HIO_FLUSH:
681 		case HIO_WRITE:
682 			data = NULL;
683 			length = 0;
684 			break;
685 		default:
686 			abort();
687 			break;
688 		}
689 		if (hio->hio_error != 0)
690 			nv_add_int16(nvout, hio->hio_error, "error");
691 		if (hast_proto_send(res, res->hr_remoteout, nvout, data,
692 		    length) < 0) {
693 			pjdlog_exit(EX_TEMPFAIL, "Unable to send reply.");
694 		}
695 		nv_free(nvout);
696 		pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
697 		    hio);
698 		nv_free(hio->hio_nv);
699 		hio->hio_error = 0;
700 		mtx_lock(&hio_free_list_lock);
701 		wakeup = TAILQ_EMPTY(&hio_free_list);
702 		TAILQ_INSERT_TAIL(&hio_free_list, hio, hio_next);
703 		mtx_unlock(&hio_free_list_lock);
704 		if (wakeup)
705 			cv_signal(&hio_free_list_cond);
706 	}
707 	/* NOTREACHED */
708 	return (NULL);
709 }
710