1 /* 2 * util/tube.c - pipe service 3 * 4 * Copyright (c) 2008, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 /** 37 * \file 38 * 39 * This file contains pipe service functions. 40 */ 41 #include "config.h" 42 #include "util/tube.h" 43 #include "util/log.h" 44 #include "util/net_help.h" 45 #include "util/netevent.h" 46 #include "util/fptr_wlist.h" 47 #include "util/ub_event.h" 48 49 #ifndef USE_WINSOCK 50 /* on unix */ 51 52 #ifndef HAVE_SOCKETPAIR 53 /** no socketpair() available, like on Minix 3.1.7, use pipe */ 54 #define socketpair(f, t, p, sv) pipe(sv) 55 #endif /* HAVE_SOCKETPAIR */ 56 57 struct tube* tube_create(void) 58 { 59 struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); 60 int sv[2]; 61 if(!tube) { 62 int err = errno; 63 log_err("tube_create: out of memory"); 64 errno = err; 65 return NULL; 66 } 67 tube->sr = -1; 68 tube->sw = -1; 69 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { 70 int err = errno; 71 log_err("socketpair: %s", strerror(errno)); 72 free(tube); 73 errno = err; 74 return NULL; 75 } 76 tube->sr = sv[0]; 77 tube->sw = sv[1]; 78 if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) { 79 int err = errno; 80 log_err("tube: cannot set nonblocking"); 81 tube_delete(tube); 82 errno = err; 83 return NULL; 84 } 85 return tube; 86 } 87 88 void tube_delete(struct tube* tube) 89 { 90 if(!tube) return; 91 tube_remove_bg_listen(tube); 92 tube_remove_bg_write(tube); 93 /* close fds after deleting commpoints, to be sure. 94 * Also epoll does not like closing fd before event_del */ 95 tube_close_read(tube); 96 tube_close_write(tube); 97 free(tube); 98 } 99 100 void tube_close_read(struct tube* tube) 101 { 102 if(tube->sr != -1) { 103 close(tube->sr); 104 tube->sr = -1; 105 } 106 } 107 108 void tube_close_write(struct tube* tube) 109 { 110 if(tube->sw != -1) { 111 close(tube->sw); 112 tube->sw = -1; 113 } 114 } 115 116 void tube_remove_bg_listen(struct tube* tube) 117 { 118 if(tube->listen_com) { 119 comm_point_delete(tube->listen_com); 120 tube->listen_com = NULL; 121 } 122 free(tube->cmd_msg); 123 tube->cmd_msg = NULL; 124 } 125 126 void tube_remove_bg_write(struct tube* tube) 127 { 128 if(tube->res_com) { 129 comm_point_delete(tube->res_com); 130 tube->res_com = NULL; 131 } 132 if(tube->res_list) { 133 struct tube_res_list* np, *p = tube->res_list; 134 tube->res_list = NULL; 135 tube->res_last = NULL; 136 while(p) { 137 np = p->next; 138 free(p->buf); 139 free(p); 140 p = np; 141 } 142 } 143 } 144 145 int 146 tube_handle_listen(struct comm_point* c, void* arg, int error, 147 struct comm_reply* ATTR_UNUSED(reply_info)) 148 { 149 struct tube* tube = (struct tube*)arg; 150 ssize_t r; 151 if(error != NETEVENT_NOERROR) { 152 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 153 (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg); 154 return 0; 155 } 156 157 if(tube->cmd_read < sizeof(tube->cmd_len)) { 158 /* complete reading the length of control msg */ 159 r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read, 160 sizeof(tube->cmd_len) - tube->cmd_read); 161 if(r==0) { 162 /* error has happened or */ 163 /* parent closed pipe, must have exited somehow */ 164 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 165 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 166 tube->listen_arg); 167 return 0; 168 } 169 if(r==-1) { 170 if(errno != EAGAIN && errno != EINTR) { 171 log_err("rpipe error: %s", strerror(errno)); 172 } 173 /* nothing to read now, try later */ 174 return 0; 175 } 176 tube->cmd_read += r; 177 if(tube->cmd_read < sizeof(tube->cmd_len)) { 178 /* not complete, try later */ 179 return 0; 180 } 181 tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len); 182 if(!tube->cmd_msg) { 183 log_err("malloc failure"); 184 tube->cmd_read = 0; 185 return 0; 186 } 187 } 188 /* cmd_len has been read, read remainder */ 189 r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len), 190 tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len))); 191 if(r==0) { 192 /* error has happened or */ 193 /* parent closed pipe, must have exited somehow */ 194 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 195 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 196 tube->listen_arg); 197 return 0; 198 } 199 if(r==-1) { 200 /* nothing to read now, try later */ 201 if(errno != EAGAIN && errno != EINTR) { 202 log_err("rpipe error: %s", strerror(errno)); 203 } 204 return 0; 205 } 206 tube->cmd_read += r; 207 if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) { 208 /* not complete, try later */ 209 return 0; 210 } 211 tube->cmd_read = 0; 212 213 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 214 (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len, 215 NETEVENT_NOERROR, tube->listen_arg); 216 /* also frees the buf */ 217 tube->cmd_msg = NULL; 218 return 0; 219 } 220 221 int 222 tube_handle_write(struct comm_point* c, void* arg, int error, 223 struct comm_reply* ATTR_UNUSED(reply_info)) 224 { 225 struct tube* tube = (struct tube*)arg; 226 struct tube_res_list* item = tube->res_list; 227 ssize_t r; 228 if(error != NETEVENT_NOERROR) { 229 log_err("tube_handle_write net error %d", error); 230 return 0; 231 } 232 233 if(!item) { 234 comm_point_stop_listening(c); 235 return 0; 236 } 237 238 if(tube->res_write < sizeof(item->len)) { 239 r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write, 240 sizeof(item->len) - tube->res_write); 241 if(r == -1) { 242 if(errno != EAGAIN && errno != EINTR) { 243 log_err("wpipe error: %s", strerror(errno)); 244 } 245 return 0; /* try again later */ 246 } 247 if(r == 0) { 248 /* error on pipe, must have exited somehow */ 249 /* cannot signal this to pipe user */ 250 return 0; 251 } 252 tube->res_write += r; 253 if(tube->res_write < sizeof(item->len)) 254 return 0; 255 } 256 r = write(c->fd, item->buf + tube->res_write - sizeof(item->len), 257 item->len - (tube->res_write - sizeof(item->len))); 258 if(r == -1) { 259 if(errno != EAGAIN && errno != EINTR) { 260 log_err("wpipe error: %s", strerror(errno)); 261 } 262 return 0; /* try again later */ 263 } 264 if(r == 0) { 265 /* error on pipe, must have exited somehow */ 266 /* cannot signal this to pipe user */ 267 return 0; 268 } 269 tube->res_write += r; 270 if(tube->res_write < sizeof(item->len) + item->len) 271 return 0; 272 /* done this result, remove it */ 273 free(item->buf); 274 item->buf = NULL; 275 tube->res_list = tube->res_list->next; 276 free(item); 277 if(!tube->res_list) { 278 tube->res_last = NULL; 279 comm_point_stop_listening(c); 280 } 281 tube->res_write = 0; 282 return 0; 283 } 284 285 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 286 int nonblock) 287 { 288 ssize_t r, d; 289 int fd = tube->sw; 290 291 /* test */ 292 if(nonblock) { 293 r = write(fd, &len, sizeof(len)); 294 if(r == -1) { 295 if(errno==EINTR || errno==EAGAIN) 296 return -1; 297 log_err("tube msg write failed: %s", strerror(errno)); 298 return -1; /* can still continue, perhaps */ 299 } 300 } else r = 0; 301 if(!fd_set_block(fd)) 302 return 0; 303 /* write remainder */ 304 d = r; 305 while(d != (ssize_t)sizeof(len)) { 306 if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) { 307 if(errno == EAGAIN) 308 continue; /* temporarily unavail: try again*/ 309 log_err("tube msg write failed: %s", strerror(errno)); 310 (void)fd_set_nonblock(fd); 311 return 0; 312 } 313 d += r; 314 } 315 d = 0; 316 while(d != (ssize_t)len) { 317 if((r=write(fd, buf+d, len-d)) == -1) { 318 if(errno == EAGAIN) 319 continue; /* temporarily unavail: try again*/ 320 log_err("tube msg write failed: %s", strerror(errno)); 321 (void)fd_set_nonblock(fd); 322 return 0; 323 } 324 d += r; 325 } 326 if(!fd_set_nonblock(fd)) 327 return 0; 328 return 1; 329 } 330 331 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 332 int nonblock) 333 { 334 ssize_t r, d; 335 int fd = tube->sr; 336 337 /* test */ 338 *len = 0; 339 if(nonblock) { 340 r = read(fd, len, sizeof(*len)); 341 if(r == -1) { 342 if(errno==EINTR || errno==EAGAIN) 343 return -1; 344 log_err("tube msg read failed: %s", strerror(errno)); 345 return -1; /* we can still continue, perhaps */ 346 } 347 if(r == 0) /* EOF */ 348 return 0; 349 } else r = 0; 350 if(!fd_set_block(fd)) 351 return 0; 352 /* read remainder */ 353 d = r; 354 while(d != (ssize_t)sizeof(*len)) { 355 if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) { 356 log_err("tube msg read failed: %s", strerror(errno)); 357 (void)fd_set_nonblock(fd); 358 return 0; 359 } 360 if(r == 0) /* EOF */ { 361 (void)fd_set_nonblock(fd); 362 return 0; 363 } 364 d += r; 365 } 366 log_assert(*len < 65536*2); 367 *buf = (uint8_t*)malloc(*len); 368 if(!*buf) { 369 log_err("tube read out of memory"); 370 (void)fd_set_nonblock(fd); 371 return 0; 372 } 373 d = 0; 374 while(d < (ssize_t)*len) { 375 if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) { 376 log_err("tube msg read failed: %s", strerror(errno)); 377 (void)fd_set_nonblock(fd); 378 free(*buf); 379 return 0; 380 } 381 if(r == 0) { /* EOF */ 382 (void)fd_set_nonblock(fd); 383 free(*buf); 384 return 0; 385 } 386 d += r; 387 } 388 if(!fd_set_nonblock(fd)) { 389 free(*buf); 390 return 0; 391 } 392 return 1; 393 } 394 395 /** perform a select() on the fd */ 396 static int 397 pollit(int fd, struct timeval* t) 398 { 399 fd_set r; 400 #ifndef S_SPLINT_S 401 FD_ZERO(&r); 402 FD_SET(FD_SET_T fd, &r); 403 #endif 404 if(select(fd+1, &r, NULL, NULL, t) == -1) { 405 return 0; 406 } 407 errno = 0; 408 return (int)(FD_ISSET(fd, &r)); 409 } 410 411 int tube_poll(struct tube* tube) 412 { 413 struct timeval t; 414 memset(&t, 0, sizeof(t)); 415 return pollit(tube->sr, &t); 416 } 417 418 int tube_wait(struct tube* tube) 419 { 420 return pollit(tube->sr, NULL); 421 } 422 423 int tube_read_fd(struct tube* tube) 424 { 425 return tube->sr; 426 } 427 428 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base, 429 tube_callback_type* cb, void* arg) 430 { 431 tube->listen_cb = cb; 432 tube->listen_arg = arg; 433 if(!(tube->listen_com = comm_point_create_raw(base, tube->sr, 434 0, tube_handle_listen, tube))) { 435 int err = errno; 436 log_err("tube_setup_bg_l: commpoint creation failed"); 437 errno = err; 438 return 0; 439 } 440 return 1; 441 } 442 443 int tube_setup_bg_write(struct tube* tube, struct comm_base* base) 444 { 445 if(!(tube->res_com = comm_point_create_raw(base, tube->sw, 446 1, tube_handle_write, tube))) { 447 int err = errno; 448 log_err("tube_setup_bg_w: commpoint creation failed"); 449 errno = err; 450 return 0; 451 } 452 return 1; 453 } 454 455 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) 456 { 457 struct tube_res_list* item; 458 if(!tube || !tube->res_com) return 0; 459 item = (struct tube_res_list*)malloc(sizeof(*item)); 460 if(!item) { 461 free(msg); 462 log_err("out of memory for async answer"); 463 return 0; 464 } 465 item->buf = msg; 466 item->len = len; 467 item->next = NULL; 468 /* add at back of list, since the first one may be partially written */ 469 if(tube->res_last) 470 tube->res_last->next = item; 471 else tube->res_list = item; 472 tube->res_last = item; 473 if(tube->res_list == tube->res_last) { 474 /* first added item, start the write process */ 475 comm_point_start_listening(tube->res_com, -1, -1); 476 } 477 return 1; 478 } 479 480 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 481 void* ATTR_UNUSED(arg)) 482 { 483 log_assert(0); 484 } 485 486 #else /* USE_WINSOCK */ 487 /* on windows */ 488 489 490 struct tube* tube_create(void) 491 { 492 /* windows does not have forks like unix, so we only support 493 * threads on windows. And thus the pipe need only connect 494 * threads. We use a mutex and a list of datagrams. */ 495 struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); 496 if(!tube) { 497 int err = errno; 498 log_err("tube_create: out of memory"); 499 errno = err; 500 return NULL; 501 } 502 tube->event = WSACreateEvent(); 503 if(tube->event == WSA_INVALID_EVENT) { 504 free(tube); 505 log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError())); 506 } 507 if(!WSAResetEvent(tube->event)) { 508 log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError())); 509 } 510 lock_basic_init(&tube->res_lock); 511 verbose(VERB_ALGO, "tube created"); 512 return tube; 513 } 514 515 void tube_delete(struct tube* tube) 516 { 517 if(!tube) return; 518 tube_remove_bg_listen(tube); 519 tube_remove_bg_write(tube); 520 tube_close_read(tube); 521 tube_close_write(tube); 522 if(!WSACloseEvent(tube->event)) 523 log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError())); 524 lock_basic_destroy(&tube->res_lock); 525 verbose(VERB_ALGO, "tube deleted"); 526 free(tube); 527 } 528 529 void tube_close_read(struct tube* ATTR_UNUSED(tube)) 530 { 531 verbose(VERB_ALGO, "tube close_read"); 532 } 533 534 void tube_close_write(struct tube* ATTR_UNUSED(tube)) 535 { 536 verbose(VERB_ALGO, "tube close_write"); 537 /* wake up waiting reader with an empty queue */ 538 if(!WSASetEvent(tube->event)) { 539 log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); 540 } 541 } 542 543 void tube_remove_bg_listen(struct tube* tube) 544 { 545 verbose(VERB_ALGO, "tube remove_bg_listen"); 546 ub_winsock_unregister_wsaevent(tube->ev_listen); 547 } 548 549 void tube_remove_bg_write(struct tube* tube) 550 { 551 verbose(VERB_ALGO, "tube remove_bg_write"); 552 if(tube->res_list) { 553 struct tube_res_list* np, *p = tube->res_list; 554 tube->res_list = NULL; 555 tube->res_last = NULL; 556 while(p) { 557 np = p->next; 558 free(p->buf); 559 free(p); 560 p = np; 561 } 562 } 563 } 564 565 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 566 int ATTR_UNUSED(nonblock)) 567 { 568 uint8_t* a; 569 verbose(VERB_ALGO, "tube write_msg len %d", (int)len); 570 a = (uint8_t*)memdup(buf, len); 571 if(!a) { 572 log_err("out of memory in tube_write_msg"); 573 return 0; 574 } 575 /* always nonblocking, this pipe cannot get full */ 576 return tube_queue_item(tube, a, len); 577 } 578 579 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 580 int nonblock) 581 { 582 struct tube_res_list* item = NULL; 583 verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking"); 584 *buf = NULL; 585 if(!tube_poll(tube)) { 586 verbose(VERB_ALGO, "tube read_msg nodata"); 587 /* nothing ready right now, wait if we want to */ 588 if(nonblock) 589 return -1; /* would block waiting for items */ 590 if(!tube_wait(tube)) 591 return 0; 592 } 593 lock_basic_lock(&tube->res_lock); 594 if(tube->res_list) { 595 item = tube->res_list; 596 tube->res_list = item->next; 597 if(tube->res_last == item) { 598 /* the list is now empty */ 599 tube->res_last = NULL; 600 verbose(VERB_ALGO, "tube read_msg lastdata"); 601 if(!WSAResetEvent(tube->event)) { 602 log_err("WSAResetEvent: %s", 603 wsa_strerror(WSAGetLastError())); 604 } 605 } 606 } 607 lock_basic_unlock(&tube->res_lock); 608 if(!item) 609 return 0; /* would block waiting for items */ 610 *buf = item->buf; 611 *len = item->len; 612 free(item); 613 verbose(VERB_ALGO, "tube read_msg len %d", (int)*len); 614 return 1; 615 } 616 617 int tube_poll(struct tube* tube) 618 { 619 struct tube_res_list* item = NULL; 620 lock_basic_lock(&tube->res_lock); 621 item = tube->res_list; 622 lock_basic_unlock(&tube->res_lock); 623 if(item) 624 return 1; 625 return 0; 626 } 627 628 int tube_wait(struct tube* tube) 629 { 630 /* block on eventhandle */ 631 DWORD res = WSAWaitForMultipleEvents( 632 1 /* one event in array */, 633 &tube->event /* the event to wait for, our pipe signal */, 634 0 /* wait for all events is false */, 635 WSA_INFINITE /* wait, no timeout */, 636 0 /* we are not alertable for IO completion routines */ 637 ); 638 if(res == WSA_WAIT_TIMEOUT) { 639 return 0; 640 } 641 if(res == WSA_WAIT_IO_COMPLETION) { 642 /* a bit unexpected, since we were not alertable */ 643 return 0; 644 } 645 return 1; 646 } 647 648 int tube_read_fd(struct tube* ATTR_UNUSED(tube)) 649 { 650 /* nothing sensible on Windows */ 651 return -1; 652 } 653 654 int 655 tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 656 int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info)) 657 { 658 log_assert(0); 659 return 0; 660 } 661 662 int 663 tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 664 int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info)) 665 { 666 log_assert(0); 667 return 0; 668 } 669 670 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base, 671 tube_callback_type* cb, void* arg) 672 { 673 tube->listen_cb = cb; 674 tube->listen_arg = arg; 675 if(!comm_base_internal(base)) 676 return 1; /* ignore when no comm base - testing */ 677 tube->ev_listen = ub_winsock_register_wsaevent( 678 comm_base_internal(base), tube->event, &tube_handle_signal, tube); 679 return tube->ev_listen ? 1 : 0; 680 } 681 682 int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube), 683 struct comm_base* ATTR_UNUSED(base)) 684 { 685 /* the queue item routine performs the signaling */ 686 return 1; 687 } 688 689 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) 690 { 691 struct tube_res_list* item; 692 if(!tube) return 0; 693 item = (struct tube_res_list*)malloc(sizeof(*item)); 694 verbose(VERB_ALGO, "tube queue_item len %d", (int)len); 695 if(!item) { 696 free(msg); 697 log_err("out of memory for async answer"); 698 return 0; 699 } 700 item->buf = msg; 701 item->len = len; 702 item->next = NULL; 703 lock_basic_lock(&tube->res_lock); 704 /* add at back of list, since the first one may be partially written */ 705 if(tube->res_last) 706 tube->res_last->next = item; 707 else tube->res_list = item; 708 tube->res_last = item; 709 /* signal the eventhandle */ 710 if(!WSASetEvent(tube->event)) { 711 log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); 712 } 713 lock_basic_unlock(&tube->res_lock); 714 return 1; 715 } 716 717 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 718 void* arg) 719 { 720 struct tube* tube = (struct tube*)arg; 721 uint8_t* buf; 722 uint32_t len = 0; 723 verbose(VERB_ALGO, "tube handle_signal"); 724 while(tube_poll(tube)) { 725 if(tube_read_msg(tube, &buf, &len, 1)) { 726 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 727 (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR, 728 tube->listen_arg); 729 } 730 } 731 } 732 733 #endif /* USE_WINSOCK */ 734