xref: /freebsd/contrib/unbound/util/tube.c (revision 95eb4b873b6a8b527c5bd78d7191975dfca38998)
1 /*
2  * util/tube.c - pipe service
3  *
4  * Copyright (c) 2008, NLnet Labs. All rights reserved.
5  *
6  * This software is open source.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * Redistributions of source code must retain the above copyright notice,
13  * this list of conditions and the following disclaimer.
14  *
15  * Redistributions in binary form must reproduce the above copyright notice,
16  * this list of conditions and the following disclaimer in the documentation
17  * and/or other materials provided with the distribution.
18  *
19  * Neither the name of the NLNET LABS nor the names of its contributors may
20  * be used to endorse or promote products derived from this software without
21  * specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /**
37  * \file
38  *
39  * This file contains pipe service functions.
40  */
41 #include "config.h"
42 #include "util/tube.h"
43 #include "util/log.h"
44 #include "util/net_help.h"
45 #include "util/netevent.h"
46 #include "util/fptr_wlist.h"
47 #include "util/ub_event.h"
48 #ifdef HAVE_POLL_H
49 #include <poll.h>
50 #endif
51 
52 #ifndef USE_WINSOCK
53 /* on unix */
54 
55 #ifndef HAVE_SOCKETPAIR
56 /** no socketpair() available, like on Minix 3.1.7, use pipe */
57 #define socketpair(f, t, p, sv) pipe(sv)
58 #endif /* HAVE_SOCKETPAIR */
59 
60 struct tube* tube_create(void)
61 {
62 	struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
63 	int sv[2];
64 	if(!tube) {
65 		int err = errno;
66 		log_err("tube_create: out of memory");
67 		errno = err;
68 		return NULL;
69 	}
70 	tube->sr = -1;
71 	tube->sw = -1;
72 	if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
73 		int err = errno;
74 		log_err("socketpair: %s", strerror(errno));
75 		free(tube);
76 		errno = err;
77 		return NULL;
78 	}
79 	tube->sr = sv[0];
80 	tube->sw = sv[1];
81 	if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
82 		int err = errno;
83 		log_err("tube: cannot set nonblocking");
84 		tube_delete(tube);
85 		errno = err;
86 		return NULL;
87 	}
88 	return tube;
89 }
90 
91 void tube_delete(struct tube* tube)
92 {
93 	if(!tube) return;
94 	tube_remove_bg_listen(tube);
95 	tube_remove_bg_write(tube);
96 	/* close fds after deleting commpoints, to be sure.
97 	 *            Also epoll does not like closing fd before event_del */
98 	tube_close_read(tube);
99 	tube_close_write(tube);
100 	free(tube);
101 }
102 
103 void tube_close_read(struct tube* tube)
104 {
105 	if(tube->sr != -1) {
106 		close(tube->sr);
107 		tube->sr = -1;
108 	}
109 }
110 
111 void tube_close_write(struct tube* tube)
112 {
113 	if(tube->sw != -1) {
114 		close(tube->sw);
115 		tube->sw = -1;
116 	}
117 }
118 
119 void tube_remove_bg_listen(struct tube* tube)
120 {
121 	if(tube->listen_com) {
122 		comm_point_delete(tube->listen_com);
123 		tube->listen_com = NULL;
124 	}
125 	free(tube->cmd_msg);
126 	tube->cmd_msg = NULL;
127 }
128 
129 void tube_remove_bg_write(struct tube* tube)
130 {
131 	if(tube->res_com) {
132 		comm_point_delete(tube->res_com);
133 		tube->res_com = NULL;
134 	}
135 	if(tube->res_list) {
136 		struct tube_res_list* np, *p = tube->res_list;
137 		tube->res_list = NULL;
138 		tube->res_last = NULL;
139 		while(p) {
140 			np = p->next;
141 			free(p->buf);
142 			free(p);
143 			p = np;
144 		}
145 	}
146 }
147 
148 int
149 tube_handle_listen(struct comm_point* c, void* arg, int error,
150         struct comm_reply* ATTR_UNUSED(reply_info))
151 {
152 	struct tube* tube = (struct tube*)arg;
153 	ssize_t r;
154 	if(error != NETEVENT_NOERROR) {
155 		fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
156 		(*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
157 		return 0;
158 	}
159 
160 	if(tube->cmd_read < sizeof(tube->cmd_len)) {
161 		/* complete reading the length of control msg */
162 		r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
163 			sizeof(tube->cmd_len) - tube->cmd_read);
164 		if(r==0) {
165 			/* error has happened or */
166 			/* parent closed pipe, must have exited somehow */
167 			fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
168 			(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
169 				tube->listen_arg);
170 			return 0;
171 		}
172 		if(r==-1) {
173 			if(errno != EAGAIN && errno != EINTR) {
174 				log_err("rpipe error: %s", strerror(errno));
175 			}
176 			/* nothing to read now, try later */
177 			return 0;
178 		}
179 		tube->cmd_read += r;
180 		if(tube->cmd_read < sizeof(tube->cmd_len)) {
181 			/* not complete, try later */
182 			return 0;
183 		}
184 		tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
185 		if(!tube->cmd_msg) {
186 			log_err("malloc failure");
187 			tube->cmd_read = 0;
188 			return 0;
189 		}
190 	}
191 	/* cmd_len has been read, read remainder */
192 	r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
193 		tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
194 	if(r==0) {
195 		/* error has happened or */
196 		/* parent closed pipe, must have exited somehow */
197 		fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
198 		(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
199 			tube->listen_arg);
200 		return 0;
201 	}
202 	if(r==-1) {
203 		/* nothing to read now, try later */
204 		if(errno != EAGAIN && errno != EINTR) {
205 			log_err("rpipe error: %s", strerror(errno));
206 		}
207 		return 0;
208 	}
209 	tube->cmd_read += r;
210 	if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
211 		/* not complete, try later */
212 		return 0;
213 	}
214 	tube->cmd_read = 0;
215 
216 	fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
217 	(*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len,
218 		NETEVENT_NOERROR, tube->listen_arg);
219 		/* also frees the buf */
220 	tube->cmd_msg = NULL;
221 	return 0;
222 }
223 
224 int
225 tube_handle_write(struct comm_point* c, void* arg, int error,
226         struct comm_reply* ATTR_UNUSED(reply_info))
227 {
228 	struct tube* tube = (struct tube*)arg;
229 	struct tube_res_list* item = tube->res_list;
230 	ssize_t r;
231 	if(error != NETEVENT_NOERROR) {
232 		log_err("tube_handle_write net error %d", error);
233 		return 0;
234 	}
235 
236 	if(!item) {
237 		comm_point_stop_listening(c);
238 		return 0;
239 	}
240 
241 	if(tube->res_write < sizeof(item->len)) {
242 		r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
243 			sizeof(item->len) - tube->res_write);
244 		if(r == -1) {
245 			if(errno != EAGAIN && errno != EINTR) {
246 				log_err("wpipe error: %s", strerror(errno));
247 			}
248 			return 0; /* try again later */
249 		}
250 		if(r == 0) {
251 			/* error on pipe, must have exited somehow */
252 			/* cannot signal this to pipe user */
253 			return 0;
254 		}
255 		tube->res_write += r;
256 		if(tube->res_write < sizeof(item->len))
257 			return 0;
258 	}
259 	r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
260 		item->len - (tube->res_write - sizeof(item->len)));
261 	if(r == -1) {
262 		if(errno != EAGAIN && errno != EINTR) {
263 			log_err("wpipe error: %s", strerror(errno));
264 		}
265 		return 0; /* try again later */
266 	}
267 	if(r == 0) {
268 		/* error on pipe, must have exited somehow */
269 		/* cannot signal this to pipe user */
270 		return 0;
271 	}
272 	tube->res_write += r;
273 	if(tube->res_write < sizeof(item->len) + item->len)
274 		return 0;
275 	/* done this result, remove it */
276 	free(item->buf);
277 	item->buf = NULL;
278 	tube->res_list = tube->res_list->next;
279 	free(item);
280 	if(!tube->res_list) {
281 		tube->res_last = NULL;
282 		comm_point_stop_listening(c);
283 	}
284 	tube->res_write = 0;
285 	return 0;
286 }
287 
288 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
289         int nonblock)
290 {
291 	ssize_t r, d;
292 	int fd = tube->sw;
293 
294 	/* test */
295 	if(nonblock) {
296 		r = write(fd, &len, sizeof(len));
297 		if(r == -1) {
298 			if(errno==EINTR || errno==EAGAIN)
299 				return -1;
300 			log_err("tube msg write failed: %s", strerror(errno));
301 			return -1; /* can still continue, perhaps */
302 		}
303 	} else r = 0;
304 	if(!fd_set_block(fd))
305 		return 0;
306 	/* write remainder */
307 	d = r;
308 	while(d != (ssize_t)sizeof(len)) {
309 		if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) {
310 			if(errno == EAGAIN)
311 				continue; /* temporarily unavail: try again*/
312 			log_err("tube msg write failed: %s", strerror(errno));
313 			(void)fd_set_nonblock(fd);
314 			return 0;
315 		}
316 		d += r;
317 	}
318 	d = 0;
319 	while(d != (ssize_t)len) {
320 		if((r=write(fd, buf+d, len-d)) == -1) {
321 			if(errno == EAGAIN)
322 				continue; /* temporarily unavail: try again*/
323 			log_err("tube msg write failed: %s", strerror(errno));
324 			(void)fd_set_nonblock(fd);
325 			return 0;
326 		}
327 		d += r;
328 	}
329 	if(!fd_set_nonblock(fd))
330 		return 0;
331 	return 1;
332 }
333 
334 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
335         int nonblock)
336 {
337 	ssize_t r, d;
338 	int fd = tube->sr;
339 
340 	/* test */
341 	*len = 0;
342 	if(nonblock) {
343 		r = read(fd, len, sizeof(*len));
344 		if(r == -1) {
345 			if(errno==EINTR || errno==EAGAIN)
346 				return -1;
347 			log_err("tube msg read failed: %s", strerror(errno));
348 			return -1; /* we can still continue, perhaps */
349 		}
350 		if(r == 0) /* EOF */
351 			return 0;
352 	} else r = 0;
353 	if(!fd_set_block(fd))
354 		return 0;
355 	/* read remainder */
356 	d = r;
357 	while(d != (ssize_t)sizeof(*len)) {
358 		if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) {
359 			log_err("tube msg read failed: %s", strerror(errno));
360 			(void)fd_set_nonblock(fd);
361 			return 0;
362 		}
363 		if(r == 0) /* EOF */ {
364 			(void)fd_set_nonblock(fd);
365 			return 0;
366 		}
367 		d += r;
368 	}
369 	if (*len >= 65536*2) {
370 		log_err("tube msg length %u is too big", (unsigned)*len);
371 		(void)fd_set_nonblock(fd);
372 		return 0;
373 	}
374 	*buf = (uint8_t*)malloc(*len);
375 	if(!*buf) {
376 		log_err("tube read out of memory");
377 		(void)fd_set_nonblock(fd);
378 		return 0;
379 	}
380 	d = 0;
381 	while(d < (ssize_t)*len) {
382 		if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) {
383 			log_err("tube msg read failed: %s", strerror(errno));
384 			(void)fd_set_nonblock(fd);
385 			free(*buf);
386 			return 0;
387 		}
388 		if(r == 0) { /* EOF */
389 			(void)fd_set_nonblock(fd);
390 			free(*buf);
391 			return 0;
392 		}
393 		d += r;
394 	}
395 	if(!fd_set_nonblock(fd)) {
396 		free(*buf);
397 		return 0;
398 	}
399 	return 1;
400 }
401 
402 /** perform poll() on the fd */
403 static int
404 pollit(int fd, struct timeval* t)
405 {
406 	struct pollfd fds;
407 	int pret;
408 	int msec = -1;
409 	memset(&fds, 0, sizeof(fds));
410 	fds.fd = fd;
411 	fds.events = POLLIN | POLLERR | POLLHUP;
412 #ifndef S_SPLINT_S
413 	if(t)
414 		msec = t->tv_sec*1000 + t->tv_usec/1000;
415 #endif
416 
417 	pret = poll(&fds, 1, msec);
418 
419 	if(pret == -1)
420 		return 0;
421 	if(pret != 0)
422 		return 1;
423 	return 0;
424 }
425 
426 int tube_poll(struct tube* tube)
427 {
428 	struct timeval t;
429 	memset(&t, 0, sizeof(t));
430 	return pollit(tube->sr, &t);
431 }
432 
433 int tube_wait(struct tube* tube)
434 {
435 	return pollit(tube->sr, NULL);
436 }
437 
438 int tube_wait_timeout(struct tube* tube, int msec)
439 {
440 	int ret = 0;
441 
442 	while(1) {
443 		struct pollfd fds;
444 		memset(&fds, 0, sizeof(fds));
445 
446 		fds.fd = tube->sr;
447 		fds.events = POLLIN | POLLERR | POLLHUP;
448 		ret = poll(&fds, 1, msec);
449 
450 		if(ret == -1) {
451 			if(errno == EAGAIN || errno == EINTR)
452 				continue;
453 			return -1;
454 		}
455 		break;
456 	}
457 
458 	if(ret != 0)
459 		return 1;
460 	return 0;
461 }
462 
463 int tube_read_fd(struct tube* tube)
464 {
465 	return tube->sr;
466 }
467 
468 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
469         tube_callback_type* cb, void* arg)
470 {
471 	tube->listen_cb = cb;
472 	tube->listen_arg = arg;
473 	if(!(tube->listen_com = comm_point_create_raw(base, tube->sr,
474 		0, tube_handle_listen, tube))) {
475 		int err = errno;
476 		log_err("tube_setup_bg_l: commpoint creation failed");
477 		errno = err;
478 		return 0;
479 	}
480 	return 1;
481 }
482 
483 int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
484 {
485 	if(!(tube->res_com = comm_point_create_raw(base, tube->sw,
486 		1, tube_handle_write, tube))) {
487 		int err = errno;
488 		log_err("tube_setup_bg_w: commpoint creation failed");
489 		errno = err;
490 		return 0;
491 	}
492 	return 1;
493 }
494 
495 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
496 {
497 	struct tube_res_list* item;
498 	if(!tube || !tube->res_com) return 0;
499 	item = (struct tube_res_list*)malloc(sizeof(*item));
500 	if(!item) {
501 		free(msg);
502 		log_err("out of memory for async answer");
503 		return 0;
504 	}
505 	item->buf = msg;
506 	item->len = len;
507 	item->next = NULL;
508 	/* add at back of list, since the first one may be partially written */
509 	if(tube->res_last)
510 		tube->res_last->next = item;
511 	else    tube->res_list = item;
512 	tube->res_last = item;
513 	if(tube->res_list == tube->res_last) {
514 		/* first added item, start the write process */
515 		comm_point_start_listening(tube->res_com, -1, -1);
516 	}
517 	return 1;
518 }
519 
520 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
521 	void* ATTR_UNUSED(arg))
522 {
523 	log_assert(0);
524 }
525 
526 #else /* USE_WINSOCK */
527 /* on windows */
528 
529 
530 struct tube* tube_create(void)
531 {
532 	/* windows does not have forks like unix, so we only support
533 	 * threads on windows. And thus the pipe need only connect
534 	 * threads. We use a mutex and a list of datagrams. */
535 	struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
536 	if(!tube) {
537 		int err = errno;
538 		log_err("tube_create: out of memory");
539 		errno = err;
540 		return NULL;
541 	}
542 	tube->event = WSACreateEvent();
543 	if(tube->event == WSA_INVALID_EVENT) {
544 		free(tube);
545 		log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
546 		return NULL;
547 	}
548 	if(!WSAResetEvent(tube->event)) {
549 		log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError()));
550 	}
551 	lock_basic_init(&tube->res_lock);
552 	verbose(VERB_ALGO, "tube created");
553 	return tube;
554 }
555 
556 void tube_delete(struct tube* tube)
557 {
558 	if(!tube) return;
559 	tube_remove_bg_listen(tube);
560 	tube_remove_bg_write(tube);
561 	tube_close_read(tube);
562 	tube_close_write(tube);
563 	if(!WSACloseEvent(tube->event))
564 		log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
565 	lock_basic_destroy(&tube->res_lock);
566 	verbose(VERB_ALGO, "tube deleted");
567 	free(tube);
568 }
569 
570 void tube_close_read(struct tube* ATTR_UNUSED(tube))
571 {
572 	verbose(VERB_ALGO, "tube close_read");
573 }
574 
575 void tube_close_write(struct tube* ATTR_UNUSED(tube))
576 {
577 	verbose(VERB_ALGO, "tube close_write");
578 	/* wake up waiting reader with an empty queue */
579 	if(!WSASetEvent(tube->event)) {
580 		log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
581 	}
582 }
583 
584 void tube_remove_bg_listen(struct tube* tube)
585 {
586 	verbose(VERB_ALGO, "tube remove_bg_listen");
587 	ub_winsock_unregister_wsaevent(tube->ev_listen);
588 }
589 
590 void tube_remove_bg_write(struct tube* tube)
591 {
592 	verbose(VERB_ALGO, "tube remove_bg_write");
593 	if(tube->res_list) {
594 		struct tube_res_list* np, *p = tube->res_list;
595 		tube->res_list = NULL;
596 		tube->res_last = NULL;
597 		while(p) {
598 			np = p->next;
599 			free(p->buf);
600 			free(p);
601 			p = np;
602 		}
603 	}
604 }
605 
606 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
607         int ATTR_UNUSED(nonblock))
608 {
609 	uint8_t* a;
610 	verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
611 	a = (uint8_t*)memdup(buf, len);
612 	if(!a) {
613 		log_err("out of memory in tube_write_msg");
614 		return 0;
615 	}
616 	/* always nonblocking, this pipe cannot get full */
617 	return tube_queue_item(tube, a, len);
618 }
619 
620 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
621         int nonblock)
622 {
623 	struct tube_res_list* item = NULL;
624 	verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
625 	*buf = NULL;
626 	if(!tube_poll(tube)) {
627 		verbose(VERB_ALGO, "tube read_msg nodata");
628 		/* nothing ready right now, wait if we want to */
629 		if(nonblock)
630 			return -1; /* would block waiting for items */
631 		if(!tube_wait(tube))
632 			return 0;
633 	}
634 	lock_basic_lock(&tube->res_lock);
635 	if(tube->res_list) {
636 		item = tube->res_list;
637 		tube->res_list = item->next;
638 		if(tube->res_last == item) {
639 			/* the list is now empty */
640 			tube->res_last = NULL;
641 			verbose(VERB_ALGO, "tube read_msg lastdata");
642 			if(!WSAResetEvent(tube->event)) {
643 				log_err("WSAResetEvent: %s",
644 					wsa_strerror(WSAGetLastError()));
645 			}
646 		}
647 	}
648 	lock_basic_unlock(&tube->res_lock);
649 	if(!item)
650 		return 0; /* would block waiting for items */
651 	*buf = item->buf;
652 	*len = item->len;
653 	free(item);
654 	verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
655 	return 1;
656 }
657 
658 int tube_poll(struct tube* tube)
659 {
660 	struct tube_res_list* item = NULL;
661 	lock_basic_lock(&tube->res_lock);
662 	item = tube->res_list;
663 	lock_basic_unlock(&tube->res_lock);
664 	if(item)
665 		return 1;
666 	return 0;
667 }
668 
669 int tube_wait(struct tube* tube)
670 {
671 	/* block on eventhandle */
672 	DWORD res = WSAWaitForMultipleEvents(
673 		1 /* one event in array */,
674 		&tube->event /* the event to wait for, our pipe signal */,
675 		0 /* wait for all events is false */,
676 		WSA_INFINITE /* wait, no timeout */,
677 		0 /* we are not alertable for IO completion routines */
678 		);
679 	if(res == WSA_WAIT_TIMEOUT) {
680 		return 0;
681 	}
682 	if(res == WSA_WAIT_IO_COMPLETION) {
683 		/* a bit unexpected, since we were not alertable */
684 		return 0;
685 	}
686 	return 1;
687 }
688 
689 int tube_wait_timeout(struct tube* tube, int msec)
690 {
691 	/* block on eventhandle */
692 	DWORD res = WSAWaitForMultipleEvents(
693 		1 /* one event in array */,
694 		&tube->event /* the event to wait for, our pipe signal */,
695 		0 /* wait for all events is false */,
696 		msec /* wait for timeout */,
697 		0 /* we are not alertable for IO completion routines */
698 		);
699 	if(res == WSA_WAIT_TIMEOUT) {
700 		return 0;
701 	}
702 	if(res == WSA_WAIT_IO_COMPLETION) {
703 		/* a bit unexpected, since we were not alertable */
704 		return -1;
705 	}
706 	return 1;
707 }
708 
709 int tube_read_fd(struct tube* ATTR_UNUSED(tube))
710 {
711 	/* nothing sensible on Windows */
712 	return -1;
713 }
714 
715 int
716 tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
717 	int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
718 {
719 	log_assert(0);
720 	return 0;
721 }
722 
723 int
724 tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
725 	int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
726 {
727 	log_assert(0);
728 	return 0;
729 }
730 
731 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
732         tube_callback_type* cb, void* arg)
733 {
734 	tube->listen_cb = cb;
735 	tube->listen_arg = arg;
736 	if(!comm_base_internal(base))
737 		return 1; /* ignore when no comm base - testing */
738 	tube->ev_listen = ub_winsock_register_wsaevent(
739 	    comm_base_internal(base), tube->event, &tube_handle_signal, tube);
740 	return tube->ev_listen ? 1 : 0;
741 }
742 
743 int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube),
744 	struct comm_base* ATTR_UNUSED(base))
745 {
746 	/* the queue item routine performs the signaling */
747 	return 1;
748 }
749 
750 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
751 {
752 	struct tube_res_list* item;
753 	if(!tube) return 0;
754 	item = (struct tube_res_list*)malloc(sizeof(*item));
755 	verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
756 	if(!item) {
757 		free(msg);
758 		log_err("out of memory for async answer");
759 		return 0;
760 	}
761 	item->buf = msg;
762 	item->len = len;
763 	item->next = NULL;
764 	lock_basic_lock(&tube->res_lock);
765 	/* add at back of list, since the first one may be partially written */
766 	if(tube->res_last)
767 		tube->res_last->next = item;
768 	else    tube->res_list = item;
769 	tube->res_last = item;
770 	/* signal the eventhandle */
771 	if(!WSASetEvent(tube->event)) {
772 		log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
773 	}
774 	lock_basic_unlock(&tube->res_lock);
775 	return 1;
776 }
777 
778 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
779 	void* arg)
780 {
781 	struct tube* tube = (struct tube*)arg;
782 	uint8_t* buf;
783 	uint32_t len = 0;
784 	verbose(VERB_ALGO, "tube handle_signal");
785 	while(tube_poll(tube)) {
786 		if(tube_read_msg(tube, &buf, &len, 1)) {
787 			fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
788 			(*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR,
789 				tube->listen_arg);
790 		}
791 	}
792 }
793 
794 #endif /* USE_WINSOCK */
795