1 /* 2 * util/tube.c - pipe service 3 * 4 * Copyright (c) 2008, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 /** 37 * \file 38 * 39 * This file contains pipe service functions. 40 */ 41 #include "config.h" 42 #include "util/tube.h" 43 #include "util/log.h" 44 #include "util/net_help.h" 45 #include "util/netevent.h" 46 #include "util/fptr_wlist.h" 47 #include "util/ub_event.h" 48 49 #ifndef USE_WINSOCK 50 /* on unix */ 51 52 #ifndef HAVE_SOCKETPAIR 53 /** no socketpair() available, like on Minix 3.1.7, use pipe */ 54 #define socketpair(f, t, p, sv) pipe(sv) 55 #endif /* HAVE_SOCKETPAIR */ 56 57 struct tube* tube_create(void) 58 { 59 struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); 60 int sv[2]; 61 if(!tube) { 62 int err = errno; 63 log_err("tube_create: out of memory"); 64 errno = err; 65 return NULL; 66 } 67 tube->sr = -1; 68 tube->sw = -1; 69 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { 70 int err = errno; 71 log_err("socketpair: %s", strerror(errno)); 72 free(tube); 73 errno = err; 74 return NULL; 75 } 76 tube->sr = sv[0]; 77 tube->sw = sv[1]; 78 if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) { 79 int err = errno; 80 log_err("tube: cannot set nonblocking"); 81 tube_delete(tube); 82 errno = err; 83 return NULL; 84 } 85 return tube; 86 } 87 88 void tube_delete(struct tube* tube) 89 { 90 if(!tube) return; 91 tube_remove_bg_listen(tube); 92 tube_remove_bg_write(tube); 93 /* close fds after deleting commpoints, to be sure. 94 * Also epoll does not like closing fd before event_del */ 95 tube_close_read(tube); 96 tube_close_write(tube); 97 free(tube); 98 } 99 100 void tube_close_read(struct tube* tube) 101 { 102 if(tube->sr != -1) { 103 close(tube->sr); 104 tube->sr = -1; 105 } 106 } 107 108 void tube_close_write(struct tube* tube) 109 { 110 if(tube->sw != -1) { 111 close(tube->sw); 112 tube->sw = -1; 113 } 114 } 115 116 void tube_remove_bg_listen(struct tube* tube) 117 { 118 if(tube->listen_com) { 119 comm_point_delete(tube->listen_com); 120 tube->listen_com = NULL; 121 } 122 free(tube->cmd_msg); 123 tube->cmd_msg = NULL; 124 } 125 126 void tube_remove_bg_write(struct tube* tube) 127 { 128 if(tube->res_com) { 129 comm_point_delete(tube->res_com); 130 tube->res_com = NULL; 131 } 132 if(tube->res_list) { 133 struct tube_res_list* np, *p = tube->res_list; 134 tube->res_list = NULL; 135 tube->res_last = NULL; 136 while(p) { 137 np = p->next; 138 free(p->buf); 139 free(p); 140 p = np; 141 } 142 } 143 } 144 145 int 146 tube_handle_listen(struct comm_point* c, void* arg, int error, 147 struct comm_reply* ATTR_UNUSED(reply_info)) 148 { 149 struct tube* tube = (struct tube*)arg; 150 ssize_t r; 151 if(error != NETEVENT_NOERROR) { 152 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 153 (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg); 154 return 0; 155 } 156 157 if(tube->cmd_read < sizeof(tube->cmd_len)) { 158 /* complete reading the length of control msg */ 159 r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read, 160 sizeof(tube->cmd_len) - tube->cmd_read); 161 if(r==0) { 162 /* error has happened or */ 163 /* parent closed pipe, must have exited somehow */ 164 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 165 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 166 tube->listen_arg); 167 return 0; 168 } 169 if(r==-1) { 170 if(errno != EAGAIN && errno != EINTR) { 171 log_err("rpipe error: %s", strerror(errno)); 172 } 173 /* nothing to read now, try later */ 174 return 0; 175 } 176 tube->cmd_read += r; 177 if(tube->cmd_read < sizeof(tube->cmd_len)) { 178 /* not complete, try later */ 179 return 0; 180 } 181 tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len); 182 if(!tube->cmd_msg) { 183 log_err("malloc failure"); 184 tube->cmd_read = 0; 185 return 0; 186 } 187 } 188 /* cmd_len has been read, read remainder */ 189 r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len), 190 tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len))); 191 if(r==0) { 192 /* error has happened or */ 193 /* parent closed pipe, must have exited somehow */ 194 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 195 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 196 tube->listen_arg); 197 return 0; 198 } 199 if(r==-1) { 200 /* nothing to read now, try later */ 201 if(errno != EAGAIN && errno != EINTR) { 202 log_err("rpipe error: %s", strerror(errno)); 203 } 204 return 0; 205 } 206 tube->cmd_read += r; 207 if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) { 208 /* not complete, try later */ 209 return 0; 210 } 211 tube->cmd_read = 0; 212 213 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 214 (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len, 215 NETEVENT_NOERROR, tube->listen_arg); 216 /* also frees the buf */ 217 tube->cmd_msg = NULL; 218 return 0; 219 } 220 221 int 222 tube_handle_write(struct comm_point* c, void* arg, int error, 223 struct comm_reply* ATTR_UNUSED(reply_info)) 224 { 225 struct tube* tube = (struct tube*)arg; 226 struct tube_res_list* item = tube->res_list; 227 ssize_t r; 228 if(error != NETEVENT_NOERROR) { 229 log_err("tube_handle_write net error %d", error); 230 return 0; 231 } 232 233 if(!item) { 234 comm_point_stop_listening(c); 235 return 0; 236 } 237 238 if(tube->res_write < sizeof(item->len)) { 239 r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write, 240 sizeof(item->len) - tube->res_write); 241 if(r == -1) { 242 if(errno != EAGAIN && errno != EINTR) { 243 log_err("wpipe error: %s", strerror(errno)); 244 } 245 return 0; /* try again later */ 246 } 247 if(r == 0) { 248 /* error on pipe, must have exited somehow */ 249 /* cannot signal this to pipe user */ 250 return 0; 251 } 252 tube->res_write += r; 253 if(tube->res_write < sizeof(item->len)) 254 return 0; 255 } 256 r = write(c->fd, item->buf + tube->res_write - sizeof(item->len), 257 item->len - (tube->res_write - sizeof(item->len))); 258 if(r == -1) { 259 if(errno != EAGAIN && errno != EINTR) { 260 log_err("wpipe error: %s", strerror(errno)); 261 } 262 return 0; /* try again later */ 263 } 264 if(r == 0) { 265 /* error on pipe, must have exited somehow */ 266 /* cannot signal this to pipe user */ 267 return 0; 268 } 269 tube->res_write += r; 270 if(tube->res_write < sizeof(item->len) + item->len) 271 return 0; 272 /* done this result, remove it */ 273 free(item->buf); 274 item->buf = NULL; 275 tube->res_list = tube->res_list->next; 276 free(item); 277 if(!tube->res_list) { 278 tube->res_last = NULL; 279 comm_point_stop_listening(c); 280 } 281 tube->res_write = 0; 282 return 0; 283 } 284 285 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 286 int nonblock) 287 { 288 ssize_t r, d; 289 int fd = tube->sw; 290 291 /* test */ 292 if(nonblock) { 293 r = write(fd, &len, sizeof(len)); 294 if(r == -1) { 295 if(errno==EINTR || errno==EAGAIN) 296 return -1; 297 log_err("tube msg write failed: %s", strerror(errno)); 298 return -1; /* can still continue, perhaps */ 299 } 300 } else r = 0; 301 if(!fd_set_block(fd)) 302 return 0; 303 /* write remainder */ 304 d = r; 305 while(d != (ssize_t)sizeof(len)) { 306 if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) { 307 if(errno == EAGAIN) 308 continue; /* temporarily unavail: try again*/ 309 log_err("tube msg write failed: %s", strerror(errno)); 310 (void)fd_set_nonblock(fd); 311 return 0; 312 } 313 d += r; 314 } 315 d = 0; 316 while(d != (ssize_t)len) { 317 if((r=write(fd, buf+d, len-d)) == -1) { 318 if(errno == EAGAIN) 319 continue; /* temporarily unavail: try again*/ 320 log_err("tube msg write failed: %s", strerror(errno)); 321 (void)fd_set_nonblock(fd); 322 return 0; 323 } 324 d += r; 325 } 326 if(!fd_set_nonblock(fd)) 327 return 0; 328 return 1; 329 } 330 331 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 332 int nonblock) 333 { 334 ssize_t r, d; 335 int fd = tube->sr; 336 337 /* test */ 338 *len = 0; 339 if(nonblock) { 340 r = read(fd, len, sizeof(*len)); 341 if(r == -1) { 342 if(errno==EINTR || errno==EAGAIN) 343 return -1; 344 log_err("tube msg read failed: %s", strerror(errno)); 345 return -1; /* we can still continue, perhaps */ 346 } 347 if(r == 0) /* EOF */ 348 return 0; 349 } else r = 0; 350 if(!fd_set_block(fd)) 351 return 0; 352 /* read remainder */ 353 d = r; 354 while(d != (ssize_t)sizeof(*len)) { 355 if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) { 356 log_err("tube msg read failed: %s", strerror(errno)); 357 (void)fd_set_nonblock(fd); 358 return 0; 359 } 360 if(r == 0) /* EOF */ { 361 (void)fd_set_nonblock(fd); 362 return 0; 363 } 364 d += r; 365 } 366 if (*len >= 65536*2) { 367 log_err("tube msg length %u is too big", (unsigned)*len); 368 (void)fd_set_nonblock(fd); 369 return 0; 370 } 371 *buf = (uint8_t*)malloc(*len); 372 if(!*buf) { 373 log_err("tube read out of memory"); 374 (void)fd_set_nonblock(fd); 375 return 0; 376 } 377 d = 0; 378 while(d < (ssize_t)*len) { 379 if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) { 380 log_err("tube msg read failed: %s", strerror(errno)); 381 (void)fd_set_nonblock(fd); 382 free(*buf); 383 return 0; 384 } 385 if(r == 0) { /* EOF */ 386 (void)fd_set_nonblock(fd); 387 free(*buf); 388 return 0; 389 } 390 d += r; 391 } 392 if(!fd_set_nonblock(fd)) { 393 free(*buf); 394 return 0; 395 } 396 return 1; 397 } 398 399 /** perform a select() on the fd */ 400 static int 401 pollit(int fd, struct timeval* t) 402 { 403 fd_set r; 404 #ifndef S_SPLINT_S 405 FD_ZERO(&r); 406 FD_SET(FD_SET_T fd, &r); 407 #endif 408 if(select(fd+1, &r, NULL, NULL, t) == -1) { 409 return 0; 410 } 411 errno = 0; 412 return (int)(FD_ISSET(fd, &r)); 413 } 414 415 int tube_poll(struct tube* tube) 416 { 417 struct timeval t; 418 memset(&t, 0, sizeof(t)); 419 return pollit(tube->sr, &t); 420 } 421 422 int tube_wait(struct tube* tube) 423 { 424 return pollit(tube->sr, NULL); 425 } 426 427 int tube_read_fd(struct tube* tube) 428 { 429 return tube->sr; 430 } 431 432 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base, 433 tube_callback_type* cb, void* arg) 434 { 435 tube->listen_cb = cb; 436 tube->listen_arg = arg; 437 if(!(tube->listen_com = comm_point_create_raw(base, tube->sr, 438 0, tube_handle_listen, tube))) { 439 int err = errno; 440 log_err("tube_setup_bg_l: commpoint creation failed"); 441 errno = err; 442 return 0; 443 } 444 return 1; 445 } 446 447 int tube_setup_bg_write(struct tube* tube, struct comm_base* base) 448 { 449 if(!(tube->res_com = comm_point_create_raw(base, tube->sw, 450 1, tube_handle_write, tube))) { 451 int err = errno; 452 log_err("tube_setup_bg_w: commpoint creation failed"); 453 errno = err; 454 return 0; 455 } 456 return 1; 457 } 458 459 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) 460 { 461 struct tube_res_list* item; 462 if(!tube || !tube->res_com) return 0; 463 item = (struct tube_res_list*)malloc(sizeof(*item)); 464 if(!item) { 465 free(msg); 466 log_err("out of memory for async answer"); 467 return 0; 468 } 469 item->buf = msg; 470 item->len = len; 471 item->next = NULL; 472 /* add at back of list, since the first one may be partially written */ 473 if(tube->res_last) 474 tube->res_last->next = item; 475 else tube->res_list = item; 476 tube->res_last = item; 477 if(tube->res_list == tube->res_last) { 478 /* first added item, start the write process */ 479 comm_point_start_listening(tube->res_com, -1, -1); 480 } 481 return 1; 482 } 483 484 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 485 void* ATTR_UNUSED(arg)) 486 { 487 log_assert(0); 488 } 489 490 #else /* USE_WINSOCK */ 491 /* on windows */ 492 493 494 struct tube* tube_create(void) 495 { 496 /* windows does not have forks like unix, so we only support 497 * threads on windows. And thus the pipe need only connect 498 * threads. We use a mutex and a list of datagrams. */ 499 struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); 500 if(!tube) { 501 int err = errno; 502 log_err("tube_create: out of memory"); 503 errno = err; 504 return NULL; 505 } 506 tube->event = WSACreateEvent(); 507 if(tube->event == WSA_INVALID_EVENT) { 508 free(tube); 509 log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError())); 510 } 511 if(!WSAResetEvent(tube->event)) { 512 log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError())); 513 } 514 lock_basic_init(&tube->res_lock); 515 verbose(VERB_ALGO, "tube created"); 516 return tube; 517 } 518 519 void tube_delete(struct tube* tube) 520 { 521 if(!tube) return; 522 tube_remove_bg_listen(tube); 523 tube_remove_bg_write(tube); 524 tube_close_read(tube); 525 tube_close_write(tube); 526 if(!WSACloseEvent(tube->event)) 527 log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError())); 528 lock_basic_destroy(&tube->res_lock); 529 verbose(VERB_ALGO, "tube deleted"); 530 free(tube); 531 } 532 533 void tube_close_read(struct tube* ATTR_UNUSED(tube)) 534 { 535 verbose(VERB_ALGO, "tube close_read"); 536 } 537 538 void tube_close_write(struct tube* ATTR_UNUSED(tube)) 539 { 540 verbose(VERB_ALGO, "tube close_write"); 541 /* wake up waiting reader with an empty queue */ 542 if(!WSASetEvent(tube->event)) { 543 log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); 544 } 545 } 546 547 void tube_remove_bg_listen(struct tube* tube) 548 { 549 verbose(VERB_ALGO, "tube remove_bg_listen"); 550 ub_winsock_unregister_wsaevent(tube->ev_listen); 551 } 552 553 void tube_remove_bg_write(struct tube* tube) 554 { 555 verbose(VERB_ALGO, "tube remove_bg_write"); 556 if(tube->res_list) { 557 struct tube_res_list* np, *p = tube->res_list; 558 tube->res_list = NULL; 559 tube->res_last = NULL; 560 while(p) { 561 np = p->next; 562 free(p->buf); 563 free(p); 564 p = np; 565 } 566 } 567 } 568 569 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 570 int ATTR_UNUSED(nonblock)) 571 { 572 uint8_t* a; 573 verbose(VERB_ALGO, "tube write_msg len %d", (int)len); 574 a = (uint8_t*)memdup(buf, len); 575 if(!a) { 576 log_err("out of memory in tube_write_msg"); 577 return 0; 578 } 579 /* always nonblocking, this pipe cannot get full */ 580 return tube_queue_item(tube, a, len); 581 } 582 583 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 584 int nonblock) 585 { 586 struct tube_res_list* item = NULL; 587 verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking"); 588 *buf = NULL; 589 if(!tube_poll(tube)) { 590 verbose(VERB_ALGO, "tube read_msg nodata"); 591 /* nothing ready right now, wait if we want to */ 592 if(nonblock) 593 return -1; /* would block waiting for items */ 594 if(!tube_wait(tube)) 595 return 0; 596 } 597 lock_basic_lock(&tube->res_lock); 598 if(tube->res_list) { 599 item = tube->res_list; 600 tube->res_list = item->next; 601 if(tube->res_last == item) { 602 /* the list is now empty */ 603 tube->res_last = NULL; 604 verbose(VERB_ALGO, "tube read_msg lastdata"); 605 if(!WSAResetEvent(tube->event)) { 606 log_err("WSAResetEvent: %s", 607 wsa_strerror(WSAGetLastError())); 608 } 609 } 610 } 611 lock_basic_unlock(&tube->res_lock); 612 if(!item) 613 return 0; /* would block waiting for items */ 614 *buf = item->buf; 615 *len = item->len; 616 free(item); 617 verbose(VERB_ALGO, "tube read_msg len %d", (int)*len); 618 return 1; 619 } 620 621 int tube_poll(struct tube* tube) 622 { 623 struct tube_res_list* item = NULL; 624 lock_basic_lock(&tube->res_lock); 625 item = tube->res_list; 626 lock_basic_unlock(&tube->res_lock); 627 if(item) 628 return 1; 629 return 0; 630 } 631 632 int tube_wait(struct tube* tube) 633 { 634 /* block on eventhandle */ 635 DWORD res = WSAWaitForMultipleEvents( 636 1 /* one event in array */, 637 &tube->event /* the event to wait for, our pipe signal */, 638 0 /* wait for all events is false */, 639 WSA_INFINITE /* wait, no timeout */, 640 0 /* we are not alertable for IO completion routines */ 641 ); 642 if(res == WSA_WAIT_TIMEOUT) { 643 return 0; 644 } 645 if(res == WSA_WAIT_IO_COMPLETION) { 646 /* a bit unexpected, since we were not alertable */ 647 return 0; 648 } 649 return 1; 650 } 651 652 int tube_read_fd(struct tube* ATTR_UNUSED(tube)) 653 { 654 /* nothing sensible on Windows */ 655 return -1; 656 } 657 658 int 659 tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 660 int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info)) 661 { 662 log_assert(0); 663 return 0; 664 } 665 666 int 667 tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 668 int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info)) 669 { 670 log_assert(0); 671 return 0; 672 } 673 674 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base, 675 tube_callback_type* cb, void* arg) 676 { 677 tube->listen_cb = cb; 678 tube->listen_arg = arg; 679 if(!comm_base_internal(base)) 680 return 1; /* ignore when no comm base - testing */ 681 tube->ev_listen = ub_winsock_register_wsaevent( 682 comm_base_internal(base), tube->event, &tube_handle_signal, tube); 683 return tube->ev_listen ? 1 : 0; 684 } 685 686 int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube), 687 struct comm_base* ATTR_UNUSED(base)) 688 { 689 /* the queue item routine performs the signaling */ 690 return 1; 691 } 692 693 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) 694 { 695 struct tube_res_list* item; 696 if(!tube) return 0; 697 item = (struct tube_res_list*)malloc(sizeof(*item)); 698 verbose(VERB_ALGO, "tube queue_item len %d", (int)len); 699 if(!item) { 700 free(msg); 701 log_err("out of memory for async answer"); 702 return 0; 703 } 704 item->buf = msg; 705 item->len = len; 706 item->next = NULL; 707 lock_basic_lock(&tube->res_lock); 708 /* add at back of list, since the first one may be partially written */ 709 if(tube->res_last) 710 tube->res_last->next = item; 711 else tube->res_list = item; 712 tube->res_last = item; 713 /* signal the eventhandle */ 714 if(!WSASetEvent(tube->event)) { 715 log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); 716 } 717 lock_basic_unlock(&tube->res_lock); 718 return 1; 719 } 720 721 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 722 void* arg) 723 { 724 struct tube* tube = (struct tube*)arg; 725 uint8_t* buf; 726 uint32_t len = 0; 727 verbose(VERB_ALGO, "tube handle_signal"); 728 while(tube_poll(tube)) { 729 if(tube_read_msg(tube, &buf, &len, 1)) { 730 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 731 (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR, 732 tube->listen_arg); 733 } 734 } 735 } 736 737 #endif /* USE_WINSOCK */ 738