xref: /freebsd/sys/cam/ctl/ctl_ha.c (revision ae41709ab46305df80f7f35bb478a3c8ebf22ebb)
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
3  *
4  * Copyright (c) 2015 Alexander Motin <mav@FreeBSD.org>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer,
12  *    without modification, immediately at the beginning of the file.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include <sys/cdefs.h>
30 __FBSDID("$FreeBSD$");
31 
32 #include <sys/param.h>
33 #include <sys/condvar.h>
34 #include <sys/conf.h>
35 #include <sys/eventhandler.h>
36 #include <sys/kernel.h>
37 #include <sys/kthread.h>
38 #include <sys/limits.h>
39 #include <sys/lock.h>
40 #include <sys/malloc.h>
41 #include <sys/mbuf.h>
42 #include <sys/module.h>
43 #include <sys/mutex.h>
44 #include <sys/proc.h>
45 #include <sys/queue.h>
46 #include <sys/socket.h>
47 #include <sys/socketvar.h>
48 #include <sys/sysctl.h>
49 #include <sys/systm.h>
50 #include <sys/uio.h>
51 #include <netinet/in.h>
52 #include <netinet/tcp.h>
53 #include <vm/uma.h>
54 
55 #include <cam/cam.h>
56 #include <cam/scsi/scsi_all.h>
57 #include <cam/scsi/scsi_da.h>
58 #include <cam/ctl/ctl_io.h>
59 #include <cam/ctl/ctl.h>
60 #include <cam/ctl/ctl_frontend.h>
61 #include <cam/ctl/ctl_util.h>
62 #include <cam/ctl/ctl_backend.h>
63 #include <cam/ctl/ctl_ioctl.h>
64 #include <cam/ctl/ctl_ha.h>
65 #include <cam/ctl/ctl_private.h>
66 #include <cam/ctl/ctl_debug.h>
67 #include <cam/ctl/ctl_error.h>
68 
69 struct ha_msg_wire {
70 	uint32_t	 channel;
71 	uint32_t	 length;
72 };
73 
74 struct ha_dt_msg_wire {
75 	ctl_ha_dt_cmd	command;
76 	uint32_t	size;
77 	uint8_t		*local;
78 	uint8_t		*remote;
79 };
80 
81 struct ha_softc {
82 	struct ctl_softc *ha_ctl_softc;
83 	ctl_evt_handler	 ha_handler[CTL_HA_CHAN_MAX];
84 	char		 ha_peer[128];
85 	struct sockaddr_in  ha_peer_in;
86 	struct socket	*ha_lso;
87 	struct socket	*ha_so;
88 	struct mbufq	 ha_sendq;
89 	struct mbuf	*ha_sending;
90 	struct mtx	 ha_lock;
91 	int		 ha_connect;
92 	int		 ha_listen;
93 	int		 ha_connected;
94 	int		 ha_receiving;
95 	int		 ha_wakeup;
96 	int		 ha_disconnect;
97 	int		 ha_shutdown;
98 	eventhandler_tag ha_shutdown_eh;
99 	TAILQ_HEAD(, ctl_ha_dt_req) ha_dts;
100 } ha_softc;
101 
102 static void
103 ctl_ha_conn_wake(struct ha_softc *softc)
104 {
105 
106 	mtx_lock(&softc->ha_lock);
107 	softc->ha_wakeup = 1;
108 	mtx_unlock(&softc->ha_lock);
109 	wakeup(&softc->ha_wakeup);
110 }
111 
112 static int
113 ctl_ha_lupcall(struct socket *so, void *arg, int waitflag)
114 {
115 	struct ha_softc *softc = arg;
116 
117 	ctl_ha_conn_wake(softc);
118 	return (SU_OK);
119 }
120 
121 static int
122 ctl_ha_rupcall(struct socket *so, void *arg, int waitflag)
123 {
124 	struct ha_softc *softc = arg;
125 
126 	wakeup(&softc->ha_receiving);
127 	return (SU_OK);
128 }
129 
130 static int
131 ctl_ha_supcall(struct socket *so, void *arg, int waitflag)
132 {
133 	struct ha_softc *softc = arg;
134 
135 	ctl_ha_conn_wake(softc);
136 	return (SU_OK);
137 }
138 
139 static void
140 ctl_ha_evt(struct ha_softc *softc, ctl_ha_channel ch, ctl_ha_event evt,
141     int param)
142 {
143 	int i;
144 
145 	if (ch < CTL_HA_CHAN_MAX) {
146 		if (softc->ha_handler[ch])
147 			softc->ha_handler[ch](ch, evt, param);
148 		return;
149 	}
150 	for (i = 0; i < CTL_HA_CHAN_MAX; i++) {
151 		if (softc->ha_handler[i])
152 			softc->ha_handler[i](i, evt, param);
153 	}
154 }
155 
156 static void
157 ctl_ha_close(struct ha_softc *softc)
158 {
159 	struct socket *so = softc->ha_so;
160 	int report = 0;
161 
162 	if (softc->ha_connected || softc->ha_disconnect) {
163 		softc->ha_connected = 0;
164 		mbufq_drain(&softc->ha_sendq);
165 		m_freem(softc->ha_sending);
166 		softc->ha_sending = NULL;
167 		report = 1;
168 	}
169 	if (so) {
170 		SOCKBUF_LOCK(&so->so_rcv);
171 		soupcall_clear(so, SO_RCV);
172 		while (softc->ha_receiving) {
173 			wakeup(&softc->ha_receiving);
174 			msleep(&softc->ha_receiving, SOCKBUF_MTX(&so->so_rcv),
175 			    0, "ha_rx exit", 0);
176 		}
177 		SOCKBUF_UNLOCK(&so->so_rcv);
178 		SOCKBUF_LOCK(&so->so_snd);
179 		soupcall_clear(so, SO_SND);
180 		SOCKBUF_UNLOCK(&so->so_snd);
181 		softc->ha_so = NULL;
182 		if (softc->ha_connect)
183 			pause("reconnect", hz / 2);
184 		soclose(so);
185 	}
186 	if (report) {
187 		ctl_ha_evt(softc, CTL_HA_CHAN_MAX, CTL_HA_EVT_LINK_CHANGE,
188 		    (softc->ha_connect || softc->ha_listen) ?
189 		    CTL_HA_LINK_UNKNOWN : CTL_HA_LINK_OFFLINE);
190 	}
191 }
192 
193 static void
194 ctl_ha_lclose(struct ha_softc *softc)
195 {
196 
197 	if (softc->ha_lso) {
198 		if (SOLISTENING(softc->ha_lso)) {
199 			SOLISTEN_LOCK(softc->ha_lso);
200 			solisten_upcall_set(softc->ha_lso, NULL, NULL);
201 			SOLISTEN_UNLOCK(softc->ha_lso);
202 		}
203 		soclose(softc->ha_lso);
204 		softc->ha_lso = NULL;
205 	}
206 }
207 
208 static void
209 ctl_ha_rx_thread(void *arg)
210 {
211 	struct ha_softc *softc = arg;
212 	struct socket *so = softc->ha_so;
213 	struct ha_msg_wire wire_hdr;
214 	struct uio uio;
215 	struct iovec iov;
216 	int error, flags, next;
217 
218 	bzero(&wire_hdr, sizeof(wire_hdr));
219 	while (1) {
220 		if (wire_hdr.length > 0)
221 			next = wire_hdr.length;
222 		else
223 			next = sizeof(wire_hdr);
224 		SOCKBUF_LOCK(&so->so_rcv);
225 		while (sbavail(&so->so_rcv) < next || softc->ha_disconnect) {
226 			if (softc->ha_connected == 0 || softc->ha_disconnect ||
227 			    so->so_error ||
228 			    (so->so_rcv.sb_state & SBS_CANTRCVMORE)) {
229 				goto errout;
230 			}
231 			so->so_rcv.sb_lowat = next;
232 			msleep(&softc->ha_receiving, SOCKBUF_MTX(&so->so_rcv),
233 			    0, "-", 0);
234 		}
235 		SOCKBUF_UNLOCK(&so->so_rcv);
236 
237 		if (wire_hdr.length == 0) {
238 			iov.iov_base = &wire_hdr;
239 			iov.iov_len = sizeof(wire_hdr);
240 			uio.uio_iov = &iov;
241 			uio.uio_iovcnt = 1;
242 			uio.uio_rw = UIO_READ;
243 			uio.uio_segflg = UIO_SYSSPACE;
244 			uio.uio_td = curthread;
245 			uio.uio_resid = sizeof(wire_hdr);
246 			flags = MSG_DONTWAIT;
247 			error = soreceive(softc->ha_so, NULL, &uio, NULL,
248 			    NULL, &flags);
249 			if (error != 0) {
250 				printf("%s: header receive error %d\n",
251 				    __func__, error);
252 				SOCKBUF_LOCK(&so->so_rcv);
253 				goto errout;
254 			}
255 		} else {
256 			ctl_ha_evt(softc, wire_hdr.channel,
257 			    CTL_HA_EVT_MSG_RECV, wire_hdr.length);
258 			wire_hdr.length = 0;
259 		}
260 	}
261 
262 errout:
263 	softc->ha_receiving = 0;
264 	wakeup(&softc->ha_receiving);
265 	SOCKBUF_UNLOCK(&so->so_rcv);
266 	ctl_ha_conn_wake(softc);
267 	kthread_exit();
268 }
269 
270 static void
271 ctl_ha_send(struct ha_softc *softc)
272 {
273 	struct socket *so = softc->ha_so;
274 	int error;
275 
276 	while (1) {
277 		if (softc->ha_sending == NULL) {
278 			mtx_lock(&softc->ha_lock);
279 			softc->ha_sending = mbufq_dequeue(&softc->ha_sendq);
280 			mtx_unlock(&softc->ha_lock);
281 			if (softc->ha_sending == NULL) {
282 				so->so_snd.sb_lowat = so->so_snd.sb_hiwat + 1;
283 				break;
284 			}
285 		}
286 		SOCKBUF_LOCK(&so->so_snd);
287 		if (sbspace(&so->so_snd) < softc->ha_sending->m_pkthdr.len) {
288 			so->so_snd.sb_lowat = softc->ha_sending->m_pkthdr.len;
289 			SOCKBUF_UNLOCK(&so->so_snd);
290 			break;
291 		}
292 		SOCKBUF_UNLOCK(&so->so_snd);
293 		error = sosend(softc->ha_so, NULL, NULL, softc->ha_sending,
294 		    NULL, MSG_DONTWAIT, curthread);
295 		softc->ha_sending = NULL;
296 		if (error != 0) {
297 			printf("%s: sosend() error %d\n", __func__, error);
298 			return;
299 		}
300 	}
301 }
302 
303 static void
304 ctl_ha_sock_setup(struct ha_softc *softc)
305 {
306 	struct sockopt opt;
307 	struct socket *so = softc->ha_so;
308 	int error, val;
309 
310 	val = 1024 * 1024;
311 	error = soreserve(so, val, val);
312 	if (error)
313 		printf("%s: soreserve failed %d\n", __func__, error);
314 
315 	SOCKBUF_LOCK(&so->so_rcv);
316 	so->so_rcv.sb_lowat = sizeof(struct ha_msg_wire);
317 	soupcall_set(so, SO_RCV, ctl_ha_rupcall, softc);
318 	SOCKBUF_UNLOCK(&so->so_rcv);
319 	SOCKBUF_LOCK(&so->so_snd);
320 	so->so_snd.sb_lowat = sizeof(struct ha_msg_wire);
321 	soupcall_set(so, SO_SND, ctl_ha_supcall, softc);
322 	SOCKBUF_UNLOCK(&so->so_snd);
323 
324 	bzero(&opt, sizeof(struct sockopt));
325 	opt.sopt_dir = SOPT_SET;
326 	opt.sopt_level = SOL_SOCKET;
327 	opt.sopt_name = SO_KEEPALIVE;
328 	opt.sopt_val = &val;
329 	opt.sopt_valsize = sizeof(val);
330 	val = 1;
331 	error = sosetopt(so, &opt);
332 	if (error)
333 		printf("%s: KEEPALIVE setting failed %d\n", __func__, error);
334 
335 	opt.sopt_level = IPPROTO_TCP;
336 	opt.sopt_name = TCP_NODELAY;
337 	val = 1;
338 	error = sosetopt(so, &opt);
339 	if (error)
340 		printf("%s: NODELAY setting failed %d\n", __func__, error);
341 
342 	opt.sopt_name = TCP_KEEPINIT;
343 	val = 3;
344 	error = sosetopt(so, &opt);
345 	if (error)
346 		printf("%s: KEEPINIT setting failed %d\n", __func__, error);
347 
348 	opt.sopt_name = TCP_KEEPIDLE;
349 	val = 1;
350 	error = sosetopt(so, &opt);
351 	if (error)
352 		printf("%s: KEEPIDLE setting failed %d\n", __func__, error);
353 
354 	opt.sopt_name = TCP_KEEPINTVL;
355 	val = 1;
356 	error = sosetopt(so, &opt);
357 	if (error)
358 		printf("%s: KEEPINTVL setting failed %d\n", __func__, error);
359 
360 	opt.sopt_name = TCP_KEEPCNT;
361 	val = 5;
362 	error = sosetopt(so, &opt);
363 	if (error)
364 		printf("%s: KEEPCNT setting failed %d\n", __func__, error);
365 }
366 
367 static int
368 ctl_ha_connect(struct ha_softc *softc)
369 {
370 	struct thread *td = curthread;
371 	struct sockaddr_in sa;
372 	struct socket *so;
373 	int error;
374 
375 	/* Create the socket */
376 	error = socreate(PF_INET, &so, SOCK_STREAM,
377 	    IPPROTO_TCP, td->td_ucred, td);
378 	if (error != 0) {
379 		printf("%s: socreate() error %d\n", __func__, error);
380 		return (error);
381 	}
382 	softc->ha_so = so;
383 	ctl_ha_sock_setup(softc);
384 
385 	memcpy(&sa, &softc->ha_peer_in, sizeof(sa));
386 	error = soconnect(so, (struct sockaddr *)&sa, td);
387 	if (error != 0) {
388 		if (bootverbose)
389 			printf("%s: soconnect() error %d\n", __func__, error);
390 		goto out;
391 	}
392 	return (0);
393 
394 out:
395 	ctl_ha_close(softc);
396 	return (error);
397 }
398 
399 static int
400 ctl_ha_accept(struct ha_softc *softc)
401 {
402 	struct socket *lso, *so;
403 	struct sockaddr *sap;
404 	int error;
405 
406 	lso = softc->ha_lso;
407 	SOLISTEN_LOCK(lso);
408 	error = solisten_dequeue(lso, &so, 0);
409 	if (error == EWOULDBLOCK)
410 		return (error);
411 	if (error) {
412 		printf("%s: socket error %d\n", __func__, error);
413 		goto out;
414 	}
415 
416 	sap = NULL;
417 	error = soaccept(so, &sap);
418 	if (error != 0) {
419 		printf("%s: soaccept() error %d\n", __func__, error);
420 		if (sap != NULL)
421 			free(sap, M_SONAME);
422 		goto out;
423 	}
424 	if (sap != NULL)
425 		free(sap, M_SONAME);
426 	softc->ha_so = so;
427 	ctl_ha_sock_setup(softc);
428 	return (0);
429 
430 out:
431 	ctl_ha_lclose(softc);
432 	return (error);
433 }
434 
435 static int
436 ctl_ha_listen(struct ha_softc *softc)
437 {
438 	struct thread *td = curthread;
439 	struct sockaddr_in sa;
440 	struct sockopt opt;
441 	int error, val;
442 
443 	/* Create the socket */
444 	if (softc->ha_lso == NULL) {
445 		error = socreate(PF_INET, &softc->ha_lso, SOCK_STREAM,
446 		    IPPROTO_TCP, td->td_ucred, td);
447 		if (error != 0) {
448 			printf("%s: socreate() error %d\n", __func__, error);
449 			return (error);
450 		}
451 		bzero(&opt, sizeof(struct sockopt));
452 		opt.sopt_dir = SOPT_SET;
453 		opt.sopt_level = SOL_SOCKET;
454 		opt.sopt_name = SO_REUSEADDR;
455 		opt.sopt_val = &val;
456 		opt.sopt_valsize = sizeof(val);
457 		val = 1;
458 		error = sosetopt(softc->ha_lso, &opt);
459 		if (error) {
460 			printf("%s: REUSEADDR setting failed %d\n",
461 			    __func__, error);
462 		}
463 		bzero(&opt, sizeof(struct sockopt));
464 		opt.sopt_dir = SOPT_SET;
465 		opt.sopt_level = SOL_SOCKET;
466 		opt.sopt_name = SO_REUSEPORT;
467 		opt.sopt_val = &val;
468 		opt.sopt_valsize = sizeof(val);
469 		val = 1;
470 		error = sosetopt(softc->ha_lso, &opt);
471 		if (error) {
472 			printf("%s: REUSEPORT setting failed %d\n",
473 			    __func__, error);
474 		}
475 	}
476 
477 	memcpy(&sa, &softc->ha_peer_in, sizeof(sa));
478 	error = sobind(softc->ha_lso, (struct sockaddr *)&sa, td);
479 	if (error != 0) {
480 		printf("%s: sobind() error %d\n", __func__, error);
481 		goto out;
482 	}
483 	error = solisten(softc->ha_lso, 1, td);
484 	if (error != 0) {
485 		printf("%s: solisten() error %d\n", __func__, error);
486 		goto out;
487 	}
488 	SOLISTEN_LOCK(softc->ha_lso);
489 	softc->ha_lso->so_state |= SS_NBIO;
490 	solisten_upcall_set(softc->ha_lso, ctl_ha_lupcall, softc);
491 	SOLISTEN_UNLOCK(softc->ha_lso);
492 	return (0);
493 
494 out:
495 	ctl_ha_lclose(softc);
496 	return (error);
497 }
498 
499 static void
500 ctl_ha_conn_thread(void *arg)
501 {
502 	struct ha_softc *softc = arg;
503 	int error;
504 
505 	while (1) {
506 		if (softc->ha_disconnect || softc->ha_shutdown) {
507 			ctl_ha_close(softc);
508 			if (softc->ha_disconnect == 2 || softc->ha_shutdown)
509 				ctl_ha_lclose(softc);
510 			softc->ha_disconnect = 0;
511 			if (softc->ha_shutdown)
512 				break;
513 		} else if (softc->ha_so != NULL &&
514 		    (softc->ha_so->so_error ||
515 		     softc->ha_so->so_rcv.sb_state & SBS_CANTRCVMORE))
516 			ctl_ha_close(softc);
517 		if (softc->ha_so == NULL) {
518 			if (softc->ha_lso != NULL)
519 				ctl_ha_accept(softc);
520 			else if (softc->ha_listen)
521 				ctl_ha_listen(softc);
522 			else if (softc->ha_connect)
523 				ctl_ha_connect(softc);
524 		}
525 		if (softc->ha_so != NULL) {
526 			if (softc->ha_connected == 0 &&
527 			    softc->ha_so->so_error == 0 &&
528 			    (softc->ha_so->so_state & SS_ISCONNECTING) == 0) {
529 				softc->ha_connected = 1;
530 				ctl_ha_evt(softc, CTL_HA_CHAN_MAX,
531 				    CTL_HA_EVT_LINK_CHANGE,
532 				    CTL_HA_LINK_ONLINE);
533 				softc->ha_receiving = 1;
534 				error = kproc_kthread_add(ctl_ha_rx_thread,
535 				    softc, &softc->ha_ctl_softc->ctl_proc,
536 				    NULL, 0, 0, "ctl", "ha_rx");
537 				if (error != 0) {
538 					printf("Error creating CTL HA rx thread!\n");
539 					softc->ha_receiving = 0;
540 					softc->ha_disconnect = 1;
541 				}
542 			}
543 			ctl_ha_send(softc);
544 		}
545 		mtx_lock(&softc->ha_lock);
546 		if (softc->ha_so != NULL &&
547 		    (softc->ha_so->so_error ||
548 		     softc->ha_so->so_rcv.sb_state & SBS_CANTRCVMORE))
549 			;
550 		else if (!softc->ha_wakeup)
551 			msleep(&softc->ha_wakeup, &softc->ha_lock, 0, "-", hz);
552 		softc->ha_wakeup = 0;
553 		mtx_unlock(&softc->ha_lock);
554 	}
555 	mtx_lock(&softc->ha_lock);
556 	softc->ha_shutdown = 2;
557 	wakeup(&softc->ha_wakeup);
558 	mtx_unlock(&softc->ha_lock);
559 	kthread_exit();
560 }
561 
562 static int
563 ctl_ha_peer_sysctl(SYSCTL_HANDLER_ARGS)
564 {
565 	struct ha_softc *softc = (struct ha_softc *)arg1;
566 	struct sockaddr_in *sa;
567 	int error, b1, b2, b3, b4, p, num;
568 	char buf[128];
569 
570 	strlcpy(buf, softc->ha_peer, sizeof(buf));
571 	error = sysctl_handle_string(oidp, buf, sizeof(buf), req);
572 	if ((error != 0) || (req->newptr == NULL) ||
573 	    strncmp(buf, softc->ha_peer, sizeof(buf)) == 0)
574 		return (error);
575 
576 	sa = &softc->ha_peer_in;
577 	mtx_lock(&softc->ha_lock);
578 	if ((num = sscanf(buf, "connect %d.%d.%d.%d:%d",
579 	    &b1, &b2, &b3, &b4, &p)) >= 4) {
580 		softc->ha_connect = 1;
581 		softc->ha_listen = 0;
582 	} else if ((num = sscanf(buf, "listen %d.%d.%d.%d:%d",
583 	    &b1, &b2, &b3, &b4, &p)) >= 4) {
584 		softc->ha_connect = 0;
585 		softc->ha_listen = 1;
586 	} else {
587 		softc->ha_connect = 0;
588 		softc->ha_listen = 0;
589 		if (buf[0] != 0) {
590 			buf[0] = 0;
591 			error = EINVAL;
592 		}
593 	}
594 	strlcpy(softc->ha_peer, buf, sizeof(softc->ha_peer));
595 	if (softc->ha_connect || softc->ha_listen) {
596 		memset(sa, 0, sizeof(*sa));
597 		sa->sin_len = sizeof(struct sockaddr_in);
598 		sa->sin_family = AF_INET;
599 		sa->sin_port = htons((num >= 5) ? p : 999);
600 		sa->sin_addr.s_addr =
601 		    htonl((b1 << 24) + (b2 << 16) + (b3 << 8) + b4);
602 	}
603 	softc->ha_disconnect = 2;
604 	softc->ha_wakeup = 1;
605 	mtx_unlock(&softc->ha_lock);
606 	wakeup(&softc->ha_wakeup);
607 	return (error);
608 }
609 
610 ctl_ha_status
611 ctl_ha_msg_register(ctl_ha_channel channel, ctl_evt_handler handler)
612 {
613 	struct ha_softc *softc = &ha_softc;
614 
615 	KASSERT(channel < CTL_HA_CHAN_MAX,
616 	    ("Wrong CTL HA channel %d", channel));
617 	softc->ha_handler[channel] = handler;
618 	return (CTL_HA_STATUS_SUCCESS);
619 }
620 
621 ctl_ha_status
622 ctl_ha_msg_deregister(ctl_ha_channel channel)
623 {
624 	struct ha_softc *softc = &ha_softc;
625 
626 	KASSERT(channel < CTL_HA_CHAN_MAX,
627 	    ("Wrong CTL HA channel %d", channel));
628 	softc->ha_handler[channel] = NULL;
629 	return (CTL_HA_STATUS_SUCCESS);
630 }
631 
632 /*
633  * Receive a message of the specified size.
634  */
635 ctl_ha_status
636 ctl_ha_msg_recv(ctl_ha_channel channel, void *addr, size_t len,
637 		int wait)
638 {
639 	struct ha_softc *softc = &ha_softc;
640 	struct uio uio;
641 	struct iovec iov;
642 	int error, flags;
643 
644 	if (!softc->ha_connected)
645 		return (CTL_HA_STATUS_DISCONNECT);
646 
647 	iov.iov_base = addr;
648 	iov.iov_len = len;
649 	uio.uio_iov = &iov;
650 	uio.uio_iovcnt = 1;
651 	uio.uio_rw = UIO_READ;
652 	uio.uio_segflg = UIO_SYSSPACE;
653 	uio.uio_td = curthread;
654 	uio.uio_resid = len;
655 	flags = wait ? 0 : MSG_DONTWAIT;
656 	error = soreceive(softc->ha_so, NULL, &uio, NULL, NULL, &flags);
657 	if (error == 0)
658 		return (CTL_HA_STATUS_SUCCESS);
659 
660 	/* Consider all errors fatal for HA sanity. */
661 	mtx_lock(&softc->ha_lock);
662 	if (softc->ha_connected) {
663 		softc->ha_disconnect = 1;
664 		softc->ha_wakeup = 1;
665 		wakeup(&softc->ha_wakeup);
666 	}
667 	mtx_unlock(&softc->ha_lock);
668 	return (CTL_HA_STATUS_ERROR);
669 }
670 
671 /*
672  * Send a message of the specified size.
673  */
674 ctl_ha_status
675 ctl_ha_msg_send2(ctl_ha_channel channel, const void *addr, size_t len,
676     const void *addr2, size_t len2, int wait)
677 {
678 	struct ha_softc *softc = &ha_softc;
679 	struct mbuf *mb, *newmb;
680 	struct ha_msg_wire hdr;
681 	size_t copylen, off;
682 
683 	if (!softc->ha_connected)
684 		return (CTL_HA_STATUS_DISCONNECT);
685 
686 	newmb = m_getm2(NULL, sizeof(hdr) + len + len2, wait, MT_DATA,
687 	    M_PKTHDR);
688 	if (newmb == NULL) {
689 		/* Consider all errors fatal for HA sanity. */
690 		mtx_lock(&softc->ha_lock);
691 		if (softc->ha_connected) {
692 			softc->ha_disconnect = 1;
693 			softc->ha_wakeup = 1;
694 			wakeup(&softc->ha_wakeup);
695 		}
696 		mtx_unlock(&softc->ha_lock);
697 		printf("%s: Can't allocate mbuf chain\n", __func__);
698 		return (CTL_HA_STATUS_ERROR);
699 	}
700 	hdr.channel = channel;
701 	hdr.length = len + len2;
702 	mb = newmb;
703 	memcpy(mtodo(mb, 0), &hdr, sizeof(hdr));
704 	mb->m_len += sizeof(hdr);
705 	off = 0;
706 	for (; mb != NULL && off < len; mb = mb->m_next) {
707 		copylen = min(M_TRAILINGSPACE(mb), len - off);
708 		memcpy(mtodo(mb, mb->m_len), (const char *)addr + off, copylen);
709 		mb->m_len += copylen;
710 		off += copylen;
711 		if (off == len)
712 			break;
713 	}
714 	KASSERT(off == len, ("%s: off (%zu) != len (%zu)", __func__,
715 	    off, len));
716 	off = 0;
717 	for (; mb != NULL && off < len2; mb = mb->m_next) {
718 		copylen = min(M_TRAILINGSPACE(mb), len2 - off);
719 		memcpy(mtodo(mb, mb->m_len), (const char *)addr2 + off, copylen);
720 		mb->m_len += copylen;
721 		off += copylen;
722 	}
723 	KASSERT(off == len2, ("%s: off (%zu) != len2 (%zu)", __func__,
724 	    off, len2));
725 	newmb->m_pkthdr.len = sizeof(hdr) + len + len2;
726 
727 	mtx_lock(&softc->ha_lock);
728 	if (!softc->ha_connected) {
729 		mtx_unlock(&softc->ha_lock);
730 		m_freem(newmb);
731 		return (CTL_HA_STATUS_DISCONNECT);
732 	}
733 	mbufq_enqueue(&softc->ha_sendq, newmb);
734 	softc->ha_wakeup = 1;
735 	mtx_unlock(&softc->ha_lock);
736 	wakeup(&softc->ha_wakeup);
737 	return (CTL_HA_STATUS_SUCCESS);
738 }
739 
740 ctl_ha_status
741 ctl_ha_msg_send(ctl_ha_channel channel, const void *addr, size_t len,
742     int wait)
743 {
744 
745 	return (ctl_ha_msg_send2(channel, addr, len, NULL, 0, wait));
746 }
747 
748 ctl_ha_status
749 ctl_ha_msg_abort(ctl_ha_channel channel)
750 {
751 	struct ha_softc *softc = &ha_softc;
752 
753 	mtx_lock(&softc->ha_lock);
754 	softc->ha_disconnect = 1;
755 	softc->ha_wakeup = 1;
756 	mtx_unlock(&softc->ha_lock);
757 	wakeup(&softc->ha_wakeup);
758 	return (CTL_HA_STATUS_SUCCESS);
759 }
760 
761 /*
762  * Allocate a data transfer request structure.
763  */
764 struct ctl_ha_dt_req *
765 ctl_dt_req_alloc(void)
766 {
767 
768 	return (malloc(sizeof(struct ctl_ha_dt_req), M_CTL, M_WAITOK | M_ZERO));
769 }
770 
771 /*
772  * Free a data transfer request structure.
773  */
774 void
775 ctl_dt_req_free(struct ctl_ha_dt_req *req)
776 {
777 
778 	free(req, M_CTL);
779 }
780 
781 /*
782  * Issue a DMA request for a single buffer.
783  */
784 ctl_ha_status
785 ctl_dt_single(struct ctl_ha_dt_req *req)
786 {
787 	struct ha_softc *softc = &ha_softc;
788 	struct ha_dt_msg_wire wire_dt;
789 	ctl_ha_status status;
790 
791 	wire_dt.command = req->command;
792 	wire_dt.size = req->size;
793 	wire_dt.local = req->local;
794 	wire_dt.remote = req->remote;
795 	if (req->command == CTL_HA_DT_CMD_READ && req->callback != NULL) {
796 		mtx_lock(&softc->ha_lock);
797 		TAILQ_INSERT_TAIL(&softc->ha_dts, req, links);
798 		mtx_unlock(&softc->ha_lock);
799 		ctl_ha_msg_send(CTL_HA_CHAN_DATA, &wire_dt, sizeof(wire_dt),
800 		    M_WAITOK);
801 		return (CTL_HA_STATUS_WAIT);
802 	}
803 	if (req->command == CTL_HA_DT_CMD_READ) {
804 		status = ctl_ha_msg_send(CTL_HA_CHAN_DATA, &wire_dt,
805 		    sizeof(wire_dt), M_WAITOK);
806 	} else {
807 		status = ctl_ha_msg_send2(CTL_HA_CHAN_DATA, &wire_dt,
808 		    sizeof(wire_dt), req->local, req->size, M_WAITOK);
809 	}
810 	return (status);
811 }
812 
813 static void
814 ctl_dt_event_handler(ctl_ha_channel channel, ctl_ha_event event, int param)
815 {
816 	struct ha_softc *softc = &ha_softc;
817 	struct ctl_ha_dt_req *req;
818 	ctl_ha_status isc_status;
819 
820 	if (event == CTL_HA_EVT_MSG_RECV) {
821 		struct ha_dt_msg_wire wire_dt;
822 		uint8_t *tmp;
823 		int size;
824 
825 		size = min(sizeof(wire_dt), param);
826 		isc_status = ctl_ha_msg_recv(CTL_HA_CHAN_DATA, &wire_dt,
827 					     size, M_WAITOK);
828 		if (isc_status != CTL_HA_STATUS_SUCCESS) {
829 			printf("%s: Error receiving message: %d\n",
830 			    __func__, isc_status);
831 			return;
832 		}
833 
834 		if (wire_dt.command == CTL_HA_DT_CMD_READ) {
835 			wire_dt.command = CTL_HA_DT_CMD_WRITE;
836 			tmp = wire_dt.local;
837 			wire_dt.local = wire_dt.remote;
838 			wire_dt.remote = tmp;
839 			ctl_ha_msg_send2(CTL_HA_CHAN_DATA, &wire_dt,
840 			    sizeof(wire_dt), wire_dt.local, wire_dt.size,
841 			    M_WAITOK);
842 		} else if (wire_dt.command == CTL_HA_DT_CMD_WRITE) {
843 			isc_status = ctl_ha_msg_recv(CTL_HA_CHAN_DATA,
844 			    wire_dt.remote, wire_dt.size, M_WAITOK);
845 			mtx_lock(&softc->ha_lock);
846 			TAILQ_FOREACH(req, &softc->ha_dts, links) {
847 				if (req->local == wire_dt.remote) {
848 					TAILQ_REMOVE(&softc->ha_dts, req, links);
849 					break;
850 				}
851 			}
852 			mtx_unlock(&softc->ha_lock);
853 			if (req) {
854 				req->ret = isc_status;
855 				req->callback(req);
856 			}
857 		}
858 	} else if (event == CTL_HA_EVT_LINK_CHANGE) {
859 		CTL_DEBUG_PRINT(("%s: Link state change to %d\n", __func__,
860 		    param));
861 		if (param != CTL_HA_LINK_ONLINE) {
862 			mtx_lock(&softc->ha_lock);
863 			while ((req = TAILQ_FIRST(&softc->ha_dts)) != NULL) {
864 				TAILQ_REMOVE(&softc->ha_dts, req, links);
865 				mtx_unlock(&softc->ha_lock);
866 				req->ret = CTL_HA_STATUS_DISCONNECT;
867 				req->callback(req);
868 				mtx_lock(&softc->ha_lock);
869 			}
870 			mtx_unlock(&softc->ha_lock);
871 		}
872 	} else {
873 		printf("%s: Unknown event %d\n", __func__, event);
874 	}
875 }
876 
877 ctl_ha_status
878 ctl_ha_msg_init(struct ctl_softc *ctl_softc)
879 {
880 	struct ha_softc *softc = &ha_softc;
881 	int error;
882 
883 	softc->ha_ctl_softc = ctl_softc;
884 	mtx_init(&softc->ha_lock, "CTL HA mutex", NULL, MTX_DEF);
885 	mbufq_init(&softc->ha_sendq, INT_MAX);
886 	TAILQ_INIT(&softc->ha_dts);
887 	error = kproc_kthread_add(ctl_ha_conn_thread, softc,
888 	    &ctl_softc->ctl_proc, NULL, 0, 0, "ctl", "ha_tx");
889 	if (error != 0) {
890 		printf("error creating CTL HA connection thread!\n");
891 		mtx_destroy(&softc->ha_lock);
892 		return (CTL_HA_STATUS_ERROR);
893 	}
894 	softc->ha_shutdown_eh = EVENTHANDLER_REGISTER(shutdown_pre_sync,
895 	    ctl_ha_msg_shutdown, ctl_softc, SHUTDOWN_PRI_FIRST);
896 	SYSCTL_ADD_PROC(&ctl_softc->sysctl_ctx,
897 	    SYSCTL_CHILDREN(ctl_softc->sysctl_tree),
898 	    OID_AUTO, "ha_peer",
899 	    CTLTYPE_STRING | CTLFLAG_RWTUN | CTLFLAG_NEEDGIANT,
900 	    softc, 0, ctl_ha_peer_sysctl, "A", "HA peer connection method");
901 
902 	if (ctl_ha_msg_register(CTL_HA_CHAN_DATA, ctl_dt_event_handler)
903 	    != CTL_HA_STATUS_SUCCESS) {
904 		printf("%s: ctl_ha_msg_register failed.\n", __func__);
905 	}
906 
907 	return (CTL_HA_STATUS_SUCCESS);
908 };
909 
910 void
911 ctl_ha_msg_shutdown(struct ctl_softc *ctl_softc)
912 {
913 	struct ha_softc *softc = &ha_softc;
914 
915 	/* Disconnect and shutdown threads. */
916 	mtx_lock(&softc->ha_lock);
917 	if (softc->ha_shutdown < 2) {
918 		softc->ha_shutdown = 1;
919 		softc->ha_wakeup = 1;
920 		wakeup(&softc->ha_wakeup);
921 		while (softc->ha_shutdown < 2 && !SCHEDULER_STOPPED()) {
922 			msleep(&softc->ha_wakeup, &softc->ha_lock, 0,
923 			    "shutdown", hz);
924 		}
925 	}
926 	mtx_unlock(&softc->ha_lock);
927 };
928 
929 ctl_ha_status
930 ctl_ha_msg_destroy(struct ctl_softc *ctl_softc)
931 {
932 	struct ha_softc *softc = &ha_softc;
933 
934 	if (softc->ha_shutdown_eh != NULL) {
935 		EVENTHANDLER_DEREGISTER(shutdown_pre_sync,
936 		    softc->ha_shutdown_eh);
937 		softc->ha_shutdown_eh = NULL;
938 	}
939 
940 	ctl_ha_msg_shutdown(ctl_softc);	/* Just in case. */
941 
942 	if (ctl_ha_msg_deregister(CTL_HA_CHAN_DATA) != CTL_HA_STATUS_SUCCESS)
943 		printf("%s: ctl_ha_msg_deregister failed.\n", __func__);
944 
945 	mtx_destroy(&softc->ha_lock);
946 	return (CTL_HA_STATUS_SUCCESS);
947 };
948