1 /* 2 * util/tube.c - pipe service 3 * 4 * Copyright (c) 2008, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 /** 37 * \file 38 * 39 * This file contains pipe service functions. 40 */ 41 #include "config.h" 42 #include "util/tube.h" 43 #include "util/log.h" 44 #include "util/net_help.h" 45 #include "util/netevent.h" 46 #include "util/fptr_wlist.h" 47 #include "util/ub_event.h" 48 49 #ifndef USE_WINSOCK 50 /* on unix */ 51 52 #ifndef HAVE_SOCKETPAIR 53 /** no socketpair() available, like on Minix 3.1.7, use pipe */ 54 #define socketpair(f, t, p, sv) pipe(sv) 55 #endif /* HAVE_SOCKETPAIR */ 56 57 struct tube* tube_create(void) 58 { 59 struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); 60 int sv[2]; 61 if(!tube) { 62 int err = errno; 63 log_err("tube_create: out of memory"); 64 errno = err; 65 return NULL; 66 } 67 tube->sr = -1; 68 tube->sw = -1; 69 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { 70 int err = errno; 71 log_err("socketpair: %s", strerror(errno)); 72 free(tube); 73 errno = err; 74 return NULL; 75 } 76 tube->sr = sv[0]; 77 tube->sw = sv[1]; 78 if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) { 79 int err = errno; 80 log_err("tube: cannot set nonblocking"); 81 tube_delete(tube); 82 errno = err; 83 return NULL; 84 } 85 return tube; 86 } 87 88 void tube_delete(struct tube* tube) 89 { 90 if(!tube) return; 91 tube_remove_bg_listen(tube); 92 tube_remove_bg_write(tube); 93 /* close fds after deleting commpoints, to be sure. 94 * Also epoll does not like closing fd before event_del */ 95 tube_close_read(tube); 96 tube_close_write(tube); 97 free(tube); 98 } 99 100 void tube_close_read(struct tube* tube) 101 { 102 if(tube->sr != -1) { 103 close(tube->sr); 104 tube->sr = -1; 105 } 106 } 107 108 void tube_close_write(struct tube* tube) 109 { 110 if(tube->sw != -1) { 111 close(tube->sw); 112 tube->sw = -1; 113 } 114 } 115 116 void tube_remove_bg_listen(struct tube* tube) 117 { 118 if(tube->listen_com) { 119 comm_point_delete(tube->listen_com); 120 tube->listen_com = NULL; 121 } 122 free(tube->cmd_msg); 123 tube->cmd_msg = NULL; 124 } 125 126 void tube_remove_bg_write(struct tube* tube) 127 { 128 if(tube->res_com) { 129 comm_point_delete(tube->res_com); 130 tube->res_com = NULL; 131 } 132 if(tube->res_list) { 133 struct tube_res_list* np, *p = tube->res_list; 134 tube->res_list = NULL; 135 tube->res_last = NULL; 136 while(p) { 137 np = p->next; 138 free(p->buf); 139 free(p); 140 p = np; 141 } 142 } 143 } 144 145 int 146 tube_handle_listen(struct comm_point* c, void* arg, int error, 147 struct comm_reply* ATTR_UNUSED(reply_info)) 148 { 149 struct tube* tube = (struct tube*)arg; 150 ssize_t r; 151 if(error != NETEVENT_NOERROR) { 152 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 153 (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg); 154 return 0; 155 } 156 157 if(tube->cmd_read < sizeof(tube->cmd_len)) { 158 /* complete reading the length of control msg */ 159 r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read, 160 sizeof(tube->cmd_len) - tube->cmd_read); 161 if(r==0) { 162 /* error has happened or */ 163 /* parent closed pipe, must have exited somehow */ 164 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 165 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 166 tube->listen_arg); 167 return 0; 168 } 169 if(r==-1) { 170 if(errno != EAGAIN && errno != EINTR) { 171 log_err("rpipe error: %s", strerror(errno)); 172 } 173 /* nothing to read now, try later */ 174 return 0; 175 } 176 tube->cmd_read += r; 177 if(tube->cmd_read < sizeof(tube->cmd_len)) { 178 /* not complete, try later */ 179 return 0; 180 } 181 tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len); 182 if(!tube->cmd_msg) { 183 log_err("malloc failure"); 184 tube->cmd_read = 0; 185 return 0; 186 } 187 } 188 /* cmd_len has been read, read remainder */ 189 r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len), 190 tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len))); 191 if(r==0) { 192 /* error has happened or */ 193 /* parent closed pipe, must have exited somehow */ 194 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 195 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 196 tube->listen_arg); 197 return 0; 198 } 199 if(r==-1) { 200 /* nothing to read now, try later */ 201 if(errno != EAGAIN && errno != EINTR) { 202 log_err("rpipe error: %s", strerror(errno)); 203 } 204 return 0; 205 } 206 tube->cmd_read += r; 207 if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) { 208 /* not complete, try later */ 209 return 0; 210 } 211 tube->cmd_read = 0; 212 213 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 214 (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len, 215 NETEVENT_NOERROR, tube->listen_arg); 216 /* also frees the buf */ 217 tube->cmd_msg = NULL; 218 return 0; 219 } 220 221 int 222 tube_handle_write(struct comm_point* c, void* arg, int error, 223 struct comm_reply* ATTR_UNUSED(reply_info)) 224 { 225 struct tube* tube = (struct tube*)arg; 226 struct tube_res_list* item = tube->res_list; 227 ssize_t r; 228 if(error != NETEVENT_NOERROR) { 229 log_err("tube_handle_write net error %d", error); 230 return 0; 231 } 232 233 if(!item) { 234 comm_point_stop_listening(c); 235 return 0; 236 } 237 238 if(tube->res_write < sizeof(item->len)) { 239 r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write, 240 sizeof(item->len) - tube->res_write); 241 if(r == -1) { 242 if(errno != EAGAIN && errno != EINTR) { 243 log_err("wpipe error: %s", strerror(errno)); 244 } 245 return 0; /* try again later */ 246 } 247 if(r == 0) { 248 /* error on pipe, must have exited somehow */ 249 /* cannot signal this to pipe user */ 250 return 0; 251 } 252 tube->res_write += r; 253 if(tube->res_write < sizeof(item->len)) 254 return 0; 255 } 256 r = write(c->fd, item->buf + tube->res_write - sizeof(item->len), 257 item->len - (tube->res_write - sizeof(item->len))); 258 if(r == -1) { 259 if(errno != EAGAIN && errno != EINTR) { 260 log_err("wpipe error: %s", strerror(errno)); 261 } 262 return 0; /* try again later */ 263 } 264 if(r == 0) { 265 /* error on pipe, must have exited somehow */ 266 /* cannot signal this to pipe user */ 267 return 0; 268 } 269 tube->res_write += r; 270 if(tube->res_write < sizeof(item->len) + item->len) 271 return 0; 272 /* done this result, remove it */ 273 free(item->buf); 274 item->buf = NULL; 275 tube->res_list = tube->res_list->next; 276 free(item); 277 if(!tube->res_list) { 278 tube->res_last = NULL; 279 comm_point_stop_listening(c); 280 } 281 tube->res_write = 0; 282 return 0; 283 } 284 285 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 286 int nonblock) 287 { 288 ssize_t r, d; 289 int fd = tube->sw; 290 291 /* test */ 292 if(nonblock) { 293 r = write(fd, &len, sizeof(len)); 294 if(r == -1) { 295 if(errno==EINTR || errno==EAGAIN) 296 return -1; 297 log_err("tube msg write failed: %s", strerror(errno)); 298 return -1; /* can still continue, perhaps */ 299 } 300 } else r = 0; 301 if(!fd_set_block(fd)) 302 return 0; 303 /* write remainder */ 304 d = r; 305 while(d != (ssize_t)sizeof(len)) { 306 if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) { 307 if(errno == EAGAIN) 308 continue; /* temporarily unavail: try again*/ 309 log_err("tube msg write failed: %s", strerror(errno)); 310 (void)fd_set_nonblock(fd); 311 return 0; 312 } 313 d += r; 314 } 315 d = 0; 316 while(d != (ssize_t)len) { 317 if((r=write(fd, buf+d, len-d)) == -1) { 318 if(errno == EAGAIN) 319 continue; /* temporarily unavail: try again*/ 320 log_err("tube msg write failed: %s", strerror(errno)); 321 (void)fd_set_nonblock(fd); 322 return 0; 323 } 324 d += r; 325 } 326 if(!fd_set_nonblock(fd)) 327 return 0; 328 return 1; 329 } 330 331 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 332 int nonblock) 333 { 334 ssize_t r, d; 335 int fd = tube->sr; 336 337 /* test */ 338 *len = 0; 339 if(nonblock) { 340 r = read(fd, len, sizeof(*len)); 341 if(r == -1) { 342 if(errno==EINTR || errno==EAGAIN) 343 return -1; 344 log_err("tube msg read failed: %s", strerror(errno)); 345 return -1; /* we can still continue, perhaps */ 346 } 347 if(r == 0) /* EOF */ 348 return 0; 349 } else r = 0; 350 if(!fd_set_block(fd)) 351 return 0; 352 /* read remainder */ 353 d = r; 354 while(d != (ssize_t)sizeof(*len)) { 355 if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) { 356 log_err("tube msg read failed: %s", strerror(errno)); 357 (void)fd_set_nonblock(fd); 358 return 0; 359 } 360 if(r == 0) /* EOF */ { 361 (void)fd_set_nonblock(fd); 362 return 0; 363 } 364 d += r; 365 } 366 log_assert(*len < 65536*2); 367 *buf = (uint8_t*)malloc(*len); 368 if(!*buf) { 369 log_err("tube read out of memory"); 370 (void)fd_set_nonblock(fd); 371 return 0; 372 } 373 d = 0; 374 while(d < (ssize_t)*len) { 375 if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) { 376 log_err("tube msg read failed: %s", strerror(errno)); 377 (void)fd_set_nonblock(fd); 378 free(*buf); 379 return 0; 380 } 381 if(r == 0) { /* EOF */ 382 (void)fd_set_nonblock(fd); 383 free(*buf); 384 return 0; 385 } 386 d += r; 387 } 388 if(!fd_set_nonblock(fd)) { 389 free(*buf); 390 return 0; 391 } 392 return 1; 393 } 394 395 /** perform a select() on the fd */ 396 static int 397 pollit(int fd, struct timeval* t) 398 { 399 fd_set r; 400 #ifndef S_SPLINT_S 401 FD_ZERO(&r); 402 FD_SET(FD_SET_T fd, &r); 403 #endif 404 if(select(fd+1, &r, NULL, NULL, t) == -1) { 405 return 0; 406 } 407 errno = 0; 408 return (int)(FD_ISSET(fd, &r)); 409 } 410 411 int tube_poll(struct tube* tube) 412 { 413 struct timeval t; 414 memset(&t, 0, sizeof(t)); 415 return pollit(tube->sr, &t); 416 } 417 418 int tube_wait(struct tube* tube) 419 { 420 return pollit(tube->sr, NULL); 421 } 422 423 int tube_read_fd(struct tube* tube) 424 { 425 return tube->sr; 426 } 427 428 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base, 429 tube_callback_t* cb, void* arg) 430 { 431 tube->listen_cb = cb; 432 tube->listen_arg = arg; 433 if(!(tube->listen_com = comm_point_create_raw(base, tube->sr, 434 0, tube_handle_listen, tube))) { 435 int err = errno; 436 log_err("tube_setup_bg_l: commpoint creation failed"); 437 errno = err; 438 return 0; 439 } 440 return 1; 441 } 442 443 int tube_setup_bg_write(struct tube* tube, struct comm_base* base) 444 { 445 if(!(tube->res_com = comm_point_create_raw(base, tube->sw, 446 1, tube_handle_write, tube))) { 447 int err = errno; 448 log_err("tube_setup_bg_w: commpoint creation failed"); 449 errno = err; 450 return 0; 451 } 452 return 1; 453 } 454 455 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) 456 { 457 struct tube_res_list* item = 458 (struct tube_res_list*)malloc(sizeof(*item)); 459 if(!item) { 460 free(msg); 461 log_err("out of memory for async answer"); 462 return 0; 463 } 464 item->buf = msg; 465 item->len = len; 466 item->next = NULL; 467 /* add at back of list, since the first one may be partially written */ 468 if(tube->res_last) 469 tube->res_last->next = item; 470 else tube->res_list = item; 471 tube->res_last = item; 472 if(tube->res_list == tube->res_last) { 473 /* first added item, start the write process */ 474 comm_point_start_listening(tube->res_com, -1, -1); 475 } 476 return 1; 477 } 478 479 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 480 void* ATTR_UNUSED(arg)) 481 { 482 log_assert(0); 483 } 484 485 #else /* USE_WINSOCK */ 486 /* on windows */ 487 488 489 struct tube* tube_create(void) 490 { 491 /* windows does not have forks like unix, so we only support 492 * threads on windows. And thus the pipe need only connect 493 * threads. We use a mutex and a list of datagrams. */ 494 struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); 495 if(!tube) { 496 int err = errno; 497 log_err("tube_create: out of memory"); 498 errno = err; 499 return NULL; 500 } 501 tube->event = WSACreateEvent(); 502 if(tube->event == WSA_INVALID_EVENT) { 503 free(tube); 504 log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError())); 505 } 506 if(!WSAResetEvent(tube->event)) { 507 log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError())); 508 } 509 lock_basic_init(&tube->res_lock); 510 verbose(VERB_ALGO, "tube created"); 511 return tube; 512 } 513 514 void tube_delete(struct tube* tube) 515 { 516 if(!tube) return; 517 tube_remove_bg_listen(tube); 518 tube_remove_bg_write(tube); 519 tube_close_read(tube); 520 tube_close_write(tube); 521 if(!WSACloseEvent(tube->event)) 522 log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError())); 523 lock_basic_destroy(&tube->res_lock); 524 verbose(VERB_ALGO, "tube deleted"); 525 free(tube); 526 } 527 528 void tube_close_read(struct tube* ATTR_UNUSED(tube)) 529 { 530 verbose(VERB_ALGO, "tube close_read"); 531 } 532 533 void tube_close_write(struct tube* ATTR_UNUSED(tube)) 534 { 535 verbose(VERB_ALGO, "tube close_write"); 536 /* wake up waiting reader with an empty queue */ 537 if(!WSASetEvent(tube->event)) { 538 log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); 539 } 540 } 541 542 void tube_remove_bg_listen(struct tube* tube) 543 { 544 verbose(VERB_ALGO, "tube remove_bg_listen"); 545 ub_winsock_unregister_wsaevent(tube->ev_listen); 546 } 547 548 void tube_remove_bg_write(struct tube* tube) 549 { 550 verbose(VERB_ALGO, "tube remove_bg_write"); 551 if(tube->res_list) { 552 struct tube_res_list* np, *p = tube->res_list; 553 tube->res_list = NULL; 554 tube->res_last = NULL; 555 while(p) { 556 np = p->next; 557 free(p->buf); 558 free(p); 559 p = np; 560 } 561 } 562 } 563 564 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 565 int ATTR_UNUSED(nonblock)) 566 { 567 uint8_t* a; 568 verbose(VERB_ALGO, "tube write_msg len %d", (int)len); 569 a = (uint8_t*)memdup(buf, len); 570 if(!a) { 571 log_err("out of memory in tube_write_msg"); 572 return 0; 573 } 574 /* always nonblocking, this pipe cannot get full */ 575 return tube_queue_item(tube, a, len); 576 } 577 578 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 579 int nonblock) 580 { 581 struct tube_res_list* item = NULL; 582 verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking"); 583 *buf = NULL; 584 if(!tube_poll(tube)) { 585 verbose(VERB_ALGO, "tube read_msg nodata"); 586 /* nothing ready right now, wait if we want to */ 587 if(nonblock) 588 return -1; /* would block waiting for items */ 589 if(!tube_wait(tube)) 590 return 0; 591 } 592 lock_basic_lock(&tube->res_lock); 593 if(tube->res_list) { 594 item = tube->res_list; 595 tube->res_list = item->next; 596 if(tube->res_last == item) { 597 /* the list is now empty */ 598 tube->res_last = NULL; 599 verbose(VERB_ALGO, "tube read_msg lastdata"); 600 if(!WSAResetEvent(tube->event)) { 601 log_err("WSAResetEvent: %s", 602 wsa_strerror(WSAGetLastError())); 603 } 604 } 605 } 606 lock_basic_unlock(&tube->res_lock); 607 if(!item) 608 return 0; /* would block waiting for items */ 609 *buf = item->buf; 610 *len = item->len; 611 free(item); 612 verbose(VERB_ALGO, "tube read_msg len %d", (int)*len); 613 return 1; 614 } 615 616 int tube_poll(struct tube* tube) 617 { 618 struct tube_res_list* item = NULL; 619 lock_basic_lock(&tube->res_lock); 620 item = tube->res_list; 621 lock_basic_unlock(&tube->res_lock); 622 if(item) 623 return 1; 624 return 0; 625 } 626 627 int tube_wait(struct tube* tube) 628 { 629 /* block on eventhandle */ 630 DWORD res = WSAWaitForMultipleEvents( 631 1 /* one event in array */, 632 &tube->event /* the event to wait for, our pipe signal */, 633 0 /* wait for all events is false */, 634 WSA_INFINITE /* wait, no timeout */, 635 0 /* we are not alertable for IO completion routines */ 636 ); 637 if(res == WSA_WAIT_TIMEOUT) { 638 return 0; 639 } 640 if(res == WSA_WAIT_IO_COMPLETION) { 641 /* a bit unexpected, since we were not alertable */ 642 return 0; 643 } 644 return 1; 645 } 646 647 int tube_read_fd(struct tube* ATTR_UNUSED(tube)) 648 { 649 /* nothing sensible on Windows */ 650 return -1; 651 } 652 653 int 654 tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 655 int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info)) 656 { 657 log_assert(0); 658 return 0; 659 } 660 661 int 662 tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 663 int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info)) 664 { 665 log_assert(0); 666 return 0; 667 } 668 669 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base, 670 tube_callback_t* cb, void* arg) 671 { 672 tube->listen_cb = cb; 673 tube->listen_arg = arg; 674 if(!comm_base_internal(base)) 675 return 1; /* ignore when no comm base - testing */ 676 tube->ev_listen = ub_winsock_register_wsaevent( 677 comm_base_internal(base), tube->event, &tube_handle_signal, tube); 678 return tube->ev_listen ? 1 : 0; 679 } 680 681 int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube), 682 struct comm_base* ATTR_UNUSED(base)) 683 { 684 /* the queue item routine performs the signaling */ 685 return 1; 686 } 687 688 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) 689 { 690 struct tube_res_list* item = 691 (struct tube_res_list*)malloc(sizeof(*item)); 692 verbose(VERB_ALGO, "tube queue_item len %d", (int)len); 693 if(!item) { 694 free(msg); 695 log_err("out of memory for async answer"); 696 return 0; 697 } 698 item->buf = msg; 699 item->len = len; 700 item->next = NULL; 701 lock_basic_lock(&tube->res_lock); 702 /* add at back of list, since the first one may be partially written */ 703 if(tube->res_last) 704 tube->res_last->next = item; 705 else tube->res_list = item; 706 tube->res_last = item; 707 /* signal the eventhandle */ 708 if(!WSASetEvent(tube->event)) { 709 log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); 710 } 711 lock_basic_unlock(&tube->res_lock); 712 return 1; 713 } 714 715 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 716 void* arg) 717 { 718 struct tube* tube = (struct tube*)arg; 719 uint8_t* buf; 720 uint32_t len = 0; 721 verbose(VERB_ALGO, "tube handle_signal"); 722 while(tube_poll(tube)) { 723 if(tube_read_msg(tube, &buf, &len, 1)) { 724 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 725 (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR, 726 tube->listen_arg); 727 } 728 } 729 } 730 731 #endif /* USE_WINSOCK */ 732