1 /*
2 * util/tube.c - pipe service
3 *
4 * Copyright (c) 2008, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 *
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 *
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36 /**
37 * \file
38 *
39 * This file contains pipe service functions.
40 */
41 #include "config.h"
42 #include "util/tube.h"
43 #include "util/log.h"
44 #include "util/net_help.h"
45 #include "util/netevent.h"
46 #include "util/fptr_wlist.h"
47 #include "util/ub_event.h"
48 #ifdef HAVE_POLL_H
49 #include <poll.h>
50 #endif
51
52 #ifndef USE_WINSOCK
53 /* on unix */
54
55 #ifndef HAVE_SOCKETPAIR
56 /** no socketpair() available, like on Minix 3.1.7, use pipe */
57 #define socketpair(f, t, p, sv) pipe(sv)
58 #endif /* HAVE_SOCKETPAIR */
59
tube_create(void)60 struct tube* tube_create(void)
61 {
62 struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
63 int sv[2];
64 if(!tube) {
65 int err = errno;
66 log_err("tube_create: out of memory");
67 errno = err;
68 return NULL;
69 }
70 tube->sr = -1;
71 tube->sw = -1;
72 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
73 int err = errno;
74 log_err("socketpair: %s", strerror(errno));
75 free(tube);
76 errno = err;
77 return NULL;
78 }
79 tube->sr = sv[0];
80 tube->sw = sv[1];
81 if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
82 int err = errno;
83 log_err("tube: cannot set nonblocking");
84 tube_delete(tube);
85 errno = err;
86 return NULL;
87 }
88 return tube;
89 }
90
tube_delete(struct tube * tube)91 void tube_delete(struct tube* tube)
92 {
93 if(!tube) return;
94 tube_remove_bg_listen(tube);
95 tube_remove_bg_write(tube);
96 /* close fds after deleting commpoints, to be sure.
97 * Also epoll does not like closing fd before event_del */
98 tube_close_read(tube);
99 tube_close_write(tube);
100 free(tube);
101 }
102
tube_close_read(struct tube * tube)103 void tube_close_read(struct tube* tube)
104 {
105 if(tube->sr != -1) {
106 close(tube->sr);
107 tube->sr = -1;
108 }
109 }
110
tube_close_write(struct tube * tube)111 void tube_close_write(struct tube* tube)
112 {
113 if(tube->sw != -1) {
114 close(tube->sw);
115 tube->sw = -1;
116 }
117 }
118
tube_remove_bg_listen(struct tube * tube)119 void tube_remove_bg_listen(struct tube* tube)
120 {
121 if(tube->listen_com) {
122 comm_point_delete(tube->listen_com);
123 tube->listen_com = NULL;
124 }
125 free(tube->cmd_msg);
126 tube->cmd_msg = NULL;
127 }
128
tube_remove_bg_write(struct tube * tube)129 void tube_remove_bg_write(struct tube* tube)
130 {
131 if(tube->res_com) {
132 comm_point_delete(tube->res_com);
133 tube->res_com = NULL;
134 }
135 if(tube->res_list) {
136 struct tube_res_list* np, *p = tube->res_list;
137 tube->res_list = NULL;
138 tube->res_last = NULL;
139 while(p) {
140 np = p->next;
141 free(p->buf);
142 free(p);
143 p = np;
144 }
145 }
146 }
147
148 int
tube_handle_listen(struct comm_point * c,void * arg,int error,struct comm_reply * ATTR_UNUSED (reply_info))149 tube_handle_listen(struct comm_point* c, void* arg, int error,
150 struct comm_reply* ATTR_UNUSED(reply_info))
151 {
152 struct tube* tube = (struct tube*)arg;
153 ssize_t r;
154 if(error != NETEVENT_NOERROR) {
155 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
156 (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
157 return 0;
158 }
159
160 if(tube->cmd_read < sizeof(tube->cmd_len)) {
161 /* complete reading the length of control msg */
162 r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
163 sizeof(tube->cmd_len) - tube->cmd_read);
164 if(r==0) {
165 /* error has happened or */
166 /* parent closed pipe, must have exited somehow */
167 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
168 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
169 tube->listen_arg);
170 return 0;
171 }
172 if(r==-1) {
173 if(errno != EAGAIN && errno != EINTR) {
174 log_err("rpipe error: %s", strerror(errno));
175 }
176 /* nothing to read now, try later */
177 return 0;
178 }
179 tube->cmd_read += r;
180 if(tube->cmd_read < sizeof(tube->cmd_len)) {
181 /* not complete, try later */
182 return 0;
183 }
184 tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
185 if(!tube->cmd_msg) {
186 log_err("malloc failure");
187 tube->cmd_read = 0;
188 return 0;
189 }
190 }
191 /* cmd_len has been read, read remainder */
192 r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
193 tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
194 if(r==0) {
195 /* error has happened or */
196 /* parent closed pipe, must have exited somehow */
197 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
198 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
199 tube->listen_arg);
200 return 0;
201 }
202 if(r==-1) {
203 /* nothing to read now, try later */
204 if(errno != EAGAIN && errno != EINTR) {
205 log_err("rpipe error: %s", strerror(errno));
206 }
207 return 0;
208 }
209 tube->cmd_read += r;
210 if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
211 /* not complete, try later */
212 return 0;
213 }
214 tube->cmd_read = 0;
215
216 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
217 (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len,
218 NETEVENT_NOERROR, tube->listen_arg);
219 /* also frees the buf */
220 tube->cmd_msg = NULL;
221 return 0;
222 }
223
224 int
tube_handle_write(struct comm_point * c,void * arg,int error,struct comm_reply * ATTR_UNUSED (reply_info))225 tube_handle_write(struct comm_point* c, void* arg, int error,
226 struct comm_reply* ATTR_UNUSED(reply_info))
227 {
228 struct tube* tube = (struct tube*)arg;
229 struct tube_res_list* item = tube->res_list;
230 ssize_t r;
231 if(error != NETEVENT_NOERROR) {
232 log_err("tube_handle_write net error %d", error);
233 return 0;
234 }
235
236 if(!item) {
237 comm_point_stop_listening(c);
238 return 0;
239 }
240
241 if(tube->res_write < sizeof(item->len)) {
242 r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
243 sizeof(item->len) - tube->res_write);
244 if(r == -1) {
245 if(errno != EAGAIN && errno != EINTR) {
246 log_err("wpipe error: %s", strerror(errno));
247 }
248 return 0; /* try again later */
249 }
250 if(r == 0) {
251 /* error on pipe, must have exited somehow */
252 /* cannot signal this to pipe user */
253 return 0;
254 }
255 tube->res_write += r;
256 if(tube->res_write < sizeof(item->len))
257 return 0;
258 }
259 r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
260 item->len - (tube->res_write - sizeof(item->len)));
261 if(r == -1) {
262 if(errno != EAGAIN && errno != EINTR) {
263 log_err("wpipe error: %s", strerror(errno));
264 }
265 return 0; /* try again later */
266 }
267 if(r == 0) {
268 /* error on pipe, must have exited somehow */
269 /* cannot signal this to pipe user */
270 return 0;
271 }
272 tube->res_write += r;
273 if(tube->res_write < sizeof(item->len) + item->len)
274 return 0;
275 /* done this result, remove it */
276 free(item->buf);
277 item->buf = NULL;
278 tube->res_list = tube->res_list->next;
279 free(item);
280 if(!tube->res_list) {
281 tube->res_last = NULL;
282 comm_point_stop_listening(c);
283 }
284 tube->res_write = 0;
285 return 0;
286 }
287
tube_write_msg(struct tube * tube,uint8_t * buf,uint32_t len,int nonblock)288 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
289 int nonblock)
290 {
291 ssize_t r, d;
292 int fd = tube->sw;
293
294 /* test */
295 if(nonblock) {
296 r = write(fd, &len, sizeof(len));
297 if(r == -1) {
298 if(errno==EINTR || errno==EAGAIN)
299 return -1;
300 log_err("tube msg write failed: %s", strerror(errno));
301 return -1; /* can still continue, perhaps */
302 }
303 } else r = 0;
304 if(!fd_set_block(fd))
305 return 0;
306 /* write remainder */
307 d = r;
308 while(d != (ssize_t)sizeof(len)) {
309 if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) {
310 if(errno == EAGAIN)
311 continue; /* temporarily unavail: try again*/
312 log_err("tube msg write failed: %s", strerror(errno));
313 (void)fd_set_nonblock(fd);
314 return 0;
315 }
316 d += r;
317 }
318 d = 0;
319 while(d != (ssize_t)len) {
320 if((r=write(fd, buf+d, len-d)) == -1) {
321 if(errno == EAGAIN)
322 continue; /* temporarily unavail: try again*/
323 log_err("tube msg write failed: %s", strerror(errno));
324 (void)fd_set_nonblock(fd);
325 return 0;
326 }
327 d += r;
328 }
329 if(!fd_set_nonblock(fd))
330 return 0;
331 return 1;
332 }
333
tube_read_msg(struct tube * tube,uint8_t ** buf,uint32_t * len,int nonblock)334 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
335 int nonblock)
336 {
337 ssize_t r, d;
338 int fd = tube->sr;
339
340 /* test */
341 *len = 0;
342 if(nonblock) {
343 r = read(fd, len, sizeof(*len));
344 if(r == -1) {
345 if(errno==EINTR || errno==EAGAIN)
346 return -1;
347 log_err("tube msg read failed: %s", strerror(errno));
348 return -1; /* we can still continue, perhaps */
349 }
350 if(r == 0) /* EOF */
351 return 0;
352 } else r = 0;
353 if(!fd_set_block(fd))
354 return 0;
355 /* read remainder */
356 d = r;
357 while(d != (ssize_t)sizeof(*len)) {
358 if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) {
359 log_err("tube msg read failed: %s", strerror(errno));
360 (void)fd_set_nonblock(fd);
361 return 0;
362 }
363 if(r == 0) /* EOF */ {
364 (void)fd_set_nonblock(fd);
365 return 0;
366 }
367 d += r;
368 }
369 if (*len >= 65536*2) {
370 log_err("tube msg length %u is too big", (unsigned)*len);
371 (void)fd_set_nonblock(fd);
372 return 0;
373 }
374 *buf = (uint8_t*)malloc(*len);
375 if(!*buf) {
376 log_err("tube read out of memory");
377 (void)fd_set_nonblock(fd);
378 return 0;
379 }
380 d = 0;
381 while(d < (ssize_t)*len) {
382 if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) {
383 log_err("tube msg read failed: %s", strerror(errno));
384 (void)fd_set_nonblock(fd);
385 free(*buf);
386 return 0;
387 }
388 if(r == 0) { /* EOF */
389 (void)fd_set_nonblock(fd);
390 free(*buf);
391 return 0;
392 }
393 d += r;
394 }
395 if(!fd_set_nonblock(fd)) {
396 free(*buf);
397 return 0;
398 }
399 return 1;
400 }
401
402 /** perform poll() on the fd */
403 static int
pollit(int fd,struct timeval * t)404 pollit(int fd, struct timeval* t)
405 {
406 struct pollfd fds;
407 int pret;
408 int msec = -1;
409 memset(&fds, 0, sizeof(fds));
410 fds.fd = fd;
411 fds.events = POLLIN | POLLERR | POLLHUP;
412 #ifndef S_SPLINT_S
413 if(t)
414 msec = t->tv_sec*1000 + t->tv_usec/1000;
415 #endif
416
417 pret = poll(&fds, 1, msec);
418
419 if(pret == -1)
420 return 0;
421 if(pret != 0)
422 return 1;
423 return 0;
424 }
425
tube_poll(struct tube * tube)426 int tube_poll(struct tube* tube)
427 {
428 struct timeval t;
429 memset(&t, 0, sizeof(t));
430 return pollit(tube->sr, &t);
431 }
432
tube_wait(struct tube * tube)433 int tube_wait(struct tube* tube)
434 {
435 return pollit(tube->sr, NULL);
436 }
437
tube_wait_timeout(struct tube * tube,int msec)438 int tube_wait_timeout(struct tube* tube, int msec)
439 {
440 int ret = 0;
441
442 while(1) {
443 struct pollfd fds;
444 memset(&fds, 0, sizeof(fds));
445
446 fds.fd = tube->sr;
447 fds.events = POLLIN | POLLERR | POLLHUP;
448 ret = poll(&fds, 1, msec);
449
450 if(ret == -1) {
451 if(errno == EAGAIN || errno == EINTR)
452 continue;
453 return -1;
454 }
455 break;
456 }
457
458 if(ret != 0)
459 return 1;
460 return 0;
461 }
462
tube_read_fd(struct tube * tube)463 int tube_read_fd(struct tube* tube)
464 {
465 return tube->sr;
466 }
467
tube_setup_bg_listen(struct tube * tube,struct comm_base * base,tube_callback_type * cb,void * arg)468 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
469 tube_callback_type* cb, void* arg)
470 {
471 tube->listen_cb = cb;
472 tube->listen_arg = arg;
473 if(!(tube->listen_com = comm_point_create_raw(base, tube->sr,
474 0, tube_handle_listen, tube))) {
475 int err = errno;
476 log_err("tube_setup_bg_l: commpoint creation failed");
477 errno = err;
478 return 0;
479 }
480 return 1;
481 }
482
tube_setup_bg_write(struct tube * tube,struct comm_base * base)483 int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
484 {
485 if(!(tube->res_com = comm_point_create_raw(base, tube->sw,
486 1, tube_handle_write, tube))) {
487 int err = errno;
488 log_err("tube_setup_bg_w: commpoint creation failed");
489 errno = err;
490 return 0;
491 }
492 return 1;
493 }
494
tube_queue_item(struct tube * tube,uint8_t * msg,size_t len)495 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
496 {
497 struct tube_res_list* item;
498 if(!tube || !tube->res_com) return 0;
499 item = (struct tube_res_list*)malloc(sizeof(*item));
500 if(!item) {
501 free(msg);
502 log_err("out of memory for async answer");
503 return 0;
504 }
505 item->buf = msg;
506 item->len = len;
507 item->next = NULL;
508 /* add at back of list, since the first one may be partially written */
509 if(tube->res_last)
510 tube->res_last->next = item;
511 else tube->res_list = item;
512 tube->res_last = item;
513 if(tube->res_list == tube->res_last) {
514 /* first added item, start the write process */
515 comm_point_start_listening(tube->res_com, -1, -1);
516 }
517 return 1;
518 }
519
tube_handle_signal(int ATTR_UNUSED (fd),short ATTR_UNUSED (events),void * ATTR_UNUSED (arg))520 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
521 void* ATTR_UNUSED(arg))
522 {
523 log_assert(0);
524 }
525
526 #else /* USE_WINSOCK */
527 /* on windows */
528
529
tube_create(void)530 struct tube* tube_create(void)
531 {
532 /* windows does not have forks like unix, so we only support
533 * threads on windows. And thus the pipe need only connect
534 * threads. We use a mutex and a list of datagrams. */
535 struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
536 if(!tube) {
537 int err = errno;
538 log_err("tube_create: out of memory");
539 errno = err;
540 return NULL;
541 }
542 tube->event = WSACreateEvent();
543 if(tube->event == WSA_INVALID_EVENT) {
544 free(tube);
545 log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
546 return NULL;
547 }
548 if(!WSAResetEvent(tube->event)) {
549 log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError()));
550 }
551 lock_basic_init(&tube->res_lock);
552 verbose(VERB_ALGO, "tube created");
553 return tube;
554 }
555
tube_delete(struct tube * tube)556 void tube_delete(struct tube* tube)
557 {
558 if(!tube) return;
559 tube_remove_bg_listen(tube);
560 tube_remove_bg_write(tube);
561 tube_close_read(tube);
562 tube_close_write(tube);
563 if(!WSACloseEvent(tube->event))
564 log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
565 lock_basic_destroy(&tube->res_lock);
566 verbose(VERB_ALGO, "tube deleted");
567 free(tube);
568 }
569
tube_close_read(struct tube * ATTR_UNUSED (tube))570 void tube_close_read(struct tube* ATTR_UNUSED(tube))
571 {
572 verbose(VERB_ALGO, "tube close_read");
573 }
574
tube_close_write(struct tube * ATTR_UNUSED (tube))575 void tube_close_write(struct tube* ATTR_UNUSED(tube))
576 {
577 verbose(VERB_ALGO, "tube close_write");
578 /* wake up waiting reader with an empty queue */
579 if(!WSASetEvent(tube->event)) {
580 log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
581 }
582 }
583
tube_remove_bg_listen(struct tube * tube)584 void tube_remove_bg_listen(struct tube* tube)
585 {
586 verbose(VERB_ALGO, "tube remove_bg_listen");
587 ub_winsock_unregister_wsaevent(tube->ev_listen);
588 }
589
tube_remove_bg_write(struct tube * tube)590 void tube_remove_bg_write(struct tube* tube)
591 {
592 verbose(VERB_ALGO, "tube remove_bg_write");
593 if(tube->res_list) {
594 struct tube_res_list* np, *p = tube->res_list;
595 tube->res_list = NULL;
596 tube->res_last = NULL;
597 while(p) {
598 np = p->next;
599 free(p->buf);
600 free(p);
601 p = np;
602 }
603 }
604 }
605
tube_write_msg(struct tube * tube,uint8_t * buf,uint32_t len,int ATTR_UNUSED (nonblock))606 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
607 int ATTR_UNUSED(nonblock))
608 {
609 uint8_t* a;
610 verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
611 a = (uint8_t*)memdup(buf, len);
612 if(!a) {
613 log_err("out of memory in tube_write_msg");
614 return 0;
615 }
616 /* always nonblocking, this pipe cannot get full */
617 return tube_queue_item(tube, a, len);
618 }
619
tube_read_msg(struct tube * tube,uint8_t ** buf,uint32_t * len,int nonblock)620 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
621 int nonblock)
622 {
623 struct tube_res_list* item = NULL;
624 verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
625 *buf = NULL;
626 if(!tube_poll(tube)) {
627 verbose(VERB_ALGO, "tube read_msg nodata");
628 /* nothing ready right now, wait if we want to */
629 if(nonblock)
630 return -1; /* would block waiting for items */
631 if(!tube_wait(tube))
632 return 0;
633 }
634 lock_basic_lock(&tube->res_lock);
635 if(tube->res_list) {
636 item = tube->res_list;
637 tube->res_list = item->next;
638 if(tube->res_last == item) {
639 /* the list is now empty */
640 tube->res_last = NULL;
641 verbose(VERB_ALGO, "tube read_msg lastdata");
642 if(!WSAResetEvent(tube->event)) {
643 log_err("WSAResetEvent: %s",
644 wsa_strerror(WSAGetLastError()));
645 }
646 }
647 }
648 lock_basic_unlock(&tube->res_lock);
649 if(!item)
650 return 0; /* would block waiting for items */
651 *buf = item->buf;
652 *len = item->len;
653 free(item);
654 verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
655 return 1;
656 }
657
tube_poll(struct tube * tube)658 int tube_poll(struct tube* tube)
659 {
660 struct tube_res_list* item = NULL;
661 lock_basic_lock(&tube->res_lock);
662 item = tube->res_list;
663 lock_basic_unlock(&tube->res_lock);
664 if(item)
665 return 1;
666 return 0;
667 }
668
tube_wait(struct tube * tube)669 int tube_wait(struct tube* tube)
670 {
671 /* block on eventhandle */
672 DWORD res = WSAWaitForMultipleEvents(
673 1 /* one event in array */,
674 &tube->event /* the event to wait for, our pipe signal */,
675 0 /* wait for all events is false */,
676 WSA_INFINITE /* wait, no timeout */,
677 0 /* we are not alertable for IO completion routines */
678 );
679 if(res == WSA_WAIT_TIMEOUT) {
680 return 0;
681 }
682 if(res == WSA_WAIT_IO_COMPLETION) {
683 /* a bit unexpected, since we were not alertable */
684 return 0;
685 }
686 return 1;
687 }
688
tube_wait_timeout(struct tube * tube,int msec)689 int tube_wait_timeout(struct tube* tube, int msec)
690 {
691 /* block on eventhandle */
692 DWORD res = WSAWaitForMultipleEvents(
693 1 /* one event in array */,
694 &tube->event /* the event to wait for, our pipe signal */,
695 0 /* wait for all events is false */,
696 msec /* wait for timeout */,
697 0 /* we are not alertable for IO completion routines */
698 );
699 if(res == WSA_WAIT_TIMEOUT) {
700 return 0;
701 }
702 if(res == WSA_WAIT_IO_COMPLETION) {
703 /* a bit unexpected, since we were not alertable */
704 return -1;
705 }
706 return 1;
707 }
708
tube_read_fd(struct tube * ATTR_UNUSED (tube))709 int tube_read_fd(struct tube* ATTR_UNUSED(tube))
710 {
711 /* nothing sensible on Windows */
712 return -1;
713 }
714
715 int
tube_handle_listen(struct comm_point * ATTR_UNUSED (c),void * ATTR_UNUSED (arg),int ATTR_UNUSED (error),struct comm_reply * ATTR_UNUSED (reply_info))716 tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
717 int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
718 {
719 log_assert(0);
720 return 0;
721 }
722
723 int
tube_handle_write(struct comm_point * ATTR_UNUSED (c),void * ATTR_UNUSED (arg),int ATTR_UNUSED (error),struct comm_reply * ATTR_UNUSED (reply_info))724 tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
725 int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
726 {
727 log_assert(0);
728 return 0;
729 }
730
tube_setup_bg_listen(struct tube * tube,struct comm_base * base,tube_callback_type * cb,void * arg)731 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
732 tube_callback_type* cb, void* arg)
733 {
734 tube->listen_cb = cb;
735 tube->listen_arg = arg;
736 if(!comm_base_internal(base))
737 return 1; /* ignore when no comm base - testing */
738 tube->ev_listen = ub_winsock_register_wsaevent(
739 comm_base_internal(base), tube->event, &tube_handle_signal, tube);
740 return tube->ev_listen ? 1 : 0;
741 }
742
tube_setup_bg_write(struct tube * ATTR_UNUSED (tube),struct comm_base * ATTR_UNUSED (base))743 int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube),
744 struct comm_base* ATTR_UNUSED(base))
745 {
746 /* the queue item routine performs the signaling */
747 return 1;
748 }
749
tube_queue_item(struct tube * tube,uint8_t * msg,size_t len)750 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
751 {
752 struct tube_res_list* item;
753 if(!tube) return 0;
754 item = (struct tube_res_list*)malloc(sizeof(*item));
755 verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
756 if(!item) {
757 free(msg);
758 log_err("out of memory for async answer");
759 return 0;
760 }
761 item->buf = msg;
762 item->len = len;
763 item->next = NULL;
764 lock_basic_lock(&tube->res_lock);
765 /* add at back of list, since the first one may be partially written */
766 if(tube->res_last)
767 tube->res_last->next = item;
768 else tube->res_list = item;
769 tube->res_last = item;
770 /* signal the eventhandle */
771 if(!WSASetEvent(tube->event)) {
772 log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
773 }
774 lock_basic_unlock(&tube->res_lock);
775 return 1;
776 }
777
tube_handle_signal(int ATTR_UNUSED (fd),short ATTR_UNUSED (events),void * arg)778 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
779 void* arg)
780 {
781 struct tube* tube = (struct tube*)arg;
782 uint8_t* buf;
783 uint32_t len = 0;
784 verbose(VERB_ALGO, "tube handle_signal");
785 while(tube_poll(tube)) {
786 if(tube_read_msg(tube, &buf, &len, 1)) {
787 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
788 (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR,
789 tube->listen_arg);
790 }
791 }
792 }
793
794 #endif /* USE_WINSOCK */
795