1 /* 2 * util/tube.c - pipe service 3 * 4 * Copyright (c) 2008, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 /** 37 * \file 38 * 39 * This file contains pipe service functions. 40 */ 41 #include "config.h" 42 #include "util/tube.h" 43 #include "util/log.h" 44 #include "util/net_help.h" 45 #include "util/netevent.h" 46 #include "util/fptr_wlist.h" 47 #include "util/ub_event.h" 48 49 #ifndef USE_WINSOCK 50 /* on unix */ 51 52 #ifndef HAVE_SOCKETPAIR 53 /** no socketpair() available, like on Minix 3.1.7, use pipe */ 54 #define socketpair(f, t, p, sv) pipe(sv) 55 #endif /* HAVE_SOCKETPAIR */ 56 57 struct tube* tube_create(void) 58 { 59 struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); 60 int sv[2]; 61 if(!tube) { 62 int err = errno; 63 log_err("tube_create: out of memory"); 64 errno = err; 65 return NULL; 66 } 67 tube->sr = -1; 68 tube->sw = -1; 69 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { 70 int err = errno; 71 log_err("socketpair: %s", strerror(errno)); 72 free(tube); 73 errno = err; 74 return NULL; 75 } 76 tube->sr = sv[0]; 77 tube->sw = sv[1]; 78 if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) { 79 int err = errno; 80 log_err("tube: cannot set nonblocking"); 81 tube_delete(tube); 82 errno = err; 83 return NULL; 84 } 85 return tube; 86 } 87 88 void tube_delete(struct tube* tube) 89 { 90 if(!tube) return; 91 tube_remove_bg_listen(tube); 92 tube_remove_bg_write(tube); 93 /* close fds after deleting commpoints, to be sure. 94 * Also epoll does not like closing fd before event_del */ 95 tube_close_read(tube); 96 tube_close_write(tube); 97 free(tube); 98 } 99 100 void tube_close_read(struct tube* tube) 101 { 102 if(tube->sr != -1) { 103 close(tube->sr); 104 tube->sr = -1; 105 } 106 } 107 108 void tube_close_write(struct tube* tube) 109 { 110 if(tube->sw != -1) { 111 close(tube->sw); 112 tube->sw = -1; 113 } 114 } 115 116 void tube_remove_bg_listen(struct tube* tube) 117 { 118 if(tube->listen_com) { 119 comm_point_delete(tube->listen_com); 120 tube->listen_com = NULL; 121 } 122 free(tube->cmd_msg); 123 tube->cmd_msg = NULL; 124 } 125 126 void tube_remove_bg_write(struct tube* tube) 127 { 128 if(tube->res_com) { 129 comm_point_delete(tube->res_com); 130 tube->res_com = NULL; 131 } 132 if(tube->res_list) { 133 struct tube_res_list* np, *p = tube->res_list; 134 tube->res_list = NULL; 135 tube->res_last = NULL; 136 while(p) { 137 np = p->next; 138 free(p->buf); 139 free(p); 140 p = np; 141 } 142 } 143 } 144 145 int 146 tube_handle_listen(struct comm_point* c, void* arg, int error, 147 struct comm_reply* ATTR_UNUSED(reply_info)) 148 { 149 struct tube* tube = (struct tube*)arg; 150 ssize_t r; 151 if(error != NETEVENT_NOERROR) { 152 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 153 (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg); 154 return 0; 155 } 156 157 if(tube->cmd_read < sizeof(tube->cmd_len)) { 158 /* complete reading the length of control msg */ 159 r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read, 160 sizeof(tube->cmd_len) - tube->cmd_read); 161 if(r==0) { 162 /* error has happened or */ 163 /* parent closed pipe, must have exited somehow */ 164 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 165 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 166 tube->listen_arg); 167 return 0; 168 } 169 if(r==-1) { 170 if(errno != EAGAIN && errno != EINTR) { 171 log_err("rpipe error: %s", strerror(errno)); 172 } 173 /* nothing to read now, try later */ 174 return 0; 175 } 176 tube->cmd_read += r; 177 if(tube->cmd_read < sizeof(tube->cmd_len)) { 178 /* not complete, try later */ 179 return 0; 180 } 181 tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len); 182 if(!tube->cmd_msg) { 183 log_err("malloc failure"); 184 tube->cmd_read = 0; 185 return 0; 186 } 187 } 188 /* cmd_len has been read, read remainder */ 189 r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len), 190 tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len))); 191 if(r==0) { 192 /* error has happened or */ 193 /* parent closed pipe, must have exited somehow */ 194 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 195 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 196 tube->listen_arg); 197 return 0; 198 } 199 if(r==-1) { 200 /* nothing to read now, try later */ 201 if(errno != EAGAIN && errno != EINTR) { 202 log_err("rpipe error: %s", strerror(errno)); 203 } 204 return 0; 205 } 206 tube->cmd_read += r; 207 if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) { 208 /* not complete, try later */ 209 return 0; 210 } 211 tube->cmd_read = 0; 212 213 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 214 (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len, 215 NETEVENT_NOERROR, tube->listen_arg); 216 /* also frees the buf */ 217 tube->cmd_msg = NULL; 218 return 0; 219 } 220 221 int 222 tube_handle_write(struct comm_point* c, void* arg, int error, 223 struct comm_reply* ATTR_UNUSED(reply_info)) 224 { 225 struct tube* tube = (struct tube*)arg; 226 struct tube_res_list* item = tube->res_list; 227 ssize_t r; 228 if(error != NETEVENT_NOERROR) { 229 log_err("tube_handle_write net error %d", error); 230 return 0; 231 } 232 233 if(!item) { 234 comm_point_stop_listening(c); 235 return 0; 236 } 237 238 if(tube->res_write < sizeof(item->len)) { 239 r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write, 240 sizeof(item->len) - tube->res_write); 241 if(r == -1) { 242 if(errno != EAGAIN && errno != EINTR) { 243 log_err("wpipe error: %s", strerror(errno)); 244 } 245 return 0; /* try again later */ 246 } 247 if(r == 0) { 248 /* error on pipe, must have exited somehow */ 249 /* cannot signal this to pipe user */ 250 return 0; 251 } 252 tube->res_write += r; 253 if(tube->res_write < sizeof(item->len)) 254 return 0; 255 } 256 r = write(c->fd, item->buf + tube->res_write - sizeof(item->len), 257 item->len - (tube->res_write - sizeof(item->len))); 258 if(r == -1) { 259 if(errno != EAGAIN && errno != EINTR) { 260 log_err("wpipe error: %s", strerror(errno)); 261 } 262 return 0; /* try again later */ 263 } 264 if(r == 0) { 265 /* error on pipe, must have exited somehow */ 266 /* cannot signal this to pipe user */ 267 return 0; 268 } 269 tube->res_write += r; 270 if(tube->res_write < sizeof(item->len) + item->len) 271 return 0; 272 /* done this result, remove it */ 273 free(item->buf); 274 item->buf = NULL; 275 tube->res_list = tube->res_list->next; 276 free(item); 277 if(!tube->res_list) { 278 tube->res_last = NULL; 279 comm_point_stop_listening(c); 280 } 281 tube->res_write = 0; 282 return 0; 283 } 284 285 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 286 int nonblock) 287 { 288 ssize_t r, d; 289 int fd = tube->sw; 290 291 /* test */ 292 if(nonblock) { 293 r = write(fd, &len, sizeof(len)); 294 if(r == -1) { 295 if(errno==EINTR || errno==EAGAIN) 296 return -1; 297 log_err("tube msg write failed: %s", strerror(errno)); 298 return -1; /* can still continue, perhaps */ 299 } 300 } else r = 0; 301 if(!fd_set_block(fd)) 302 return 0; 303 /* write remainder */ 304 d = r; 305 while(d != (ssize_t)sizeof(len)) { 306 if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) { 307 if(errno == EAGAIN) 308 continue; /* temporarily unavail: try again*/ 309 log_err("tube msg write failed: %s", strerror(errno)); 310 (void)fd_set_nonblock(fd); 311 return 0; 312 } 313 d += r; 314 } 315 d = 0; 316 while(d != (ssize_t)len) { 317 if((r=write(fd, buf+d, len-d)) == -1) { 318 if(errno == EAGAIN) 319 continue; /* temporarily unavail: try again*/ 320 log_err("tube msg write failed: %s", strerror(errno)); 321 (void)fd_set_nonblock(fd); 322 return 0; 323 } 324 d += r; 325 } 326 if(!fd_set_nonblock(fd)) 327 return 0; 328 return 1; 329 } 330 331 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 332 int nonblock) 333 { 334 ssize_t r, d; 335 int fd = tube->sr; 336 337 /* test */ 338 *len = 0; 339 if(nonblock) { 340 r = read(fd, len, sizeof(*len)); 341 if(r == -1) { 342 if(errno==EINTR || errno==EAGAIN) 343 return -1; 344 log_err("tube msg read failed: %s", strerror(errno)); 345 return -1; /* we can still continue, perhaps */ 346 } 347 if(r == 0) /* EOF */ 348 return 0; 349 } else r = 0; 350 if(!fd_set_block(fd)) 351 return 0; 352 /* read remainder */ 353 d = r; 354 while(d != (ssize_t)sizeof(*len)) { 355 if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) { 356 log_err("tube msg read failed: %s", strerror(errno)); 357 (void)fd_set_nonblock(fd); 358 return 0; 359 } 360 if(r == 0) /* EOF */ { 361 (void)fd_set_nonblock(fd); 362 return 0; 363 } 364 d += r; 365 } 366 if (*len >= 65536*2) { 367 log_err("tube msg length %u is too big", (unsigned)*len); 368 (void)fd_set_nonblock(fd); 369 return 0; 370 } 371 *buf = (uint8_t*)malloc(*len); 372 if(!*buf) { 373 log_err("tube read out of memory"); 374 (void)fd_set_nonblock(fd); 375 return 0; 376 } 377 d = 0; 378 while(d < (ssize_t)*len) { 379 if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) { 380 log_err("tube msg read failed: %s", strerror(errno)); 381 (void)fd_set_nonblock(fd); 382 free(*buf); 383 return 0; 384 } 385 if(r == 0) { /* EOF */ 386 (void)fd_set_nonblock(fd); 387 free(*buf); 388 return 0; 389 } 390 d += r; 391 } 392 if(!fd_set_nonblock(fd)) { 393 free(*buf); 394 return 0; 395 } 396 return 1; 397 } 398 399 /** perform a select() on the fd */ 400 static int 401 pollit(int fd, struct timeval* t) 402 { 403 fd_set r; 404 #ifndef S_SPLINT_S 405 FD_ZERO(&r); 406 FD_SET(FD_SET_T fd, &r); 407 #endif 408 if(select(fd+1, &r, NULL, NULL, t) == -1) { 409 return 0; 410 } 411 errno = 0; 412 return (int)(FD_ISSET(fd, &r)); 413 } 414 415 int tube_poll(struct tube* tube) 416 { 417 struct timeval t; 418 memset(&t, 0, sizeof(t)); 419 return pollit(tube->sr, &t); 420 } 421 422 int tube_wait(struct tube* tube) 423 { 424 return pollit(tube->sr, NULL); 425 } 426 427 int tube_wait_timeout(struct tube* tube, int msec) 428 { 429 struct timeval t; 430 int fd = tube->sr; 431 fd_set r; 432 t.tv_sec = msec/1000; 433 t.tv_usec = (msec%1000)*1000; 434 #ifndef S_SPLINT_S 435 FD_ZERO(&r); 436 FD_SET(FD_SET_T fd, &r); 437 #endif 438 while(1) { 439 if(select(fd+1, &r, NULL, NULL, &t) == -1) { 440 if(errno == EAGAIN || errno == EINTR) 441 continue; 442 return -1; 443 } 444 break; 445 } 446 return (int)(FD_ISSET(fd, &r)); 447 } 448 449 int tube_read_fd(struct tube* tube) 450 { 451 return tube->sr; 452 } 453 454 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base, 455 tube_callback_type* cb, void* arg) 456 { 457 tube->listen_cb = cb; 458 tube->listen_arg = arg; 459 if(!(tube->listen_com = comm_point_create_raw(base, tube->sr, 460 0, tube_handle_listen, tube))) { 461 int err = errno; 462 log_err("tube_setup_bg_l: commpoint creation failed"); 463 errno = err; 464 return 0; 465 } 466 return 1; 467 } 468 469 int tube_setup_bg_write(struct tube* tube, struct comm_base* base) 470 { 471 if(!(tube->res_com = comm_point_create_raw(base, tube->sw, 472 1, tube_handle_write, tube))) { 473 int err = errno; 474 log_err("tube_setup_bg_w: commpoint creation failed"); 475 errno = err; 476 return 0; 477 } 478 return 1; 479 } 480 481 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) 482 { 483 struct tube_res_list* item; 484 if(!tube || !tube->res_com) return 0; 485 item = (struct tube_res_list*)malloc(sizeof(*item)); 486 if(!item) { 487 free(msg); 488 log_err("out of memory for async answer"); 489 return 0; 490 } 491 item->buf = msg; 492 item->len = len; 493 item->next = NULL; 494 /* add at back of list, since the first one may be partially written */ 495 if(tube->res_last) 496 tube->res_last->next = item; 497 else tube->res_list = item; 498 tube->res_last = item; 499 if(tube->res_list == tube->res_last) { 500 /* first added item, start the write process */ 501 comm_point_start_listening(tube->res_com, -1, -1); 502 } 503 return 1; 504 } 505 506 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 507 void* ATTR_UNUSED(arg)) 508 { 509 log_assert(0); 510 } 511 512 #else /* USE_WINSOCK */ 513 /* on windows */ 514 515 516 struct tube* tube_create(void) 517 { 518 /* windows does not have forks like unix, so we only support 519 * threads on windows. And thus the pipe need only connect 520 * threads. We use a mutex and a list of datagrams. */ 521 struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); 522 if(!tube) { 523 int err = errno; 524 log_err("tube_create: out of memory"); 525 errno = err; 526 return NULL; 527 } 528 tube->event = WSACreateEvent(); 529 if(tube->event == WSA_INVALID_EVENT) { 530 free(tube); 531 log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError())); 532 } 533 if(!WSAResetEvent(tube->event)) { 534 log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError())); 535 } 536 lock_basic_init(&tube->res_lock); 537 verbose(VERB_ALGO, "tube created"); 538 return tube; 539 } 540 541 void tube_delete(struct tube* tube) 542 { 543 if(!tube) return; 544 tube_remove_bg_listen(tube); 545 tube_remove_bg_write(tube); 546 tube_close_read(tube); 547 tube_close_write(tube); 548 if(!WSACloseEvent(tube->event)) 549 log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError())); 550 lock_basic_destroy(&tube->res_lock); 551 verbose(VERB_ALGO, "tube deleted"); 552 free(tube); 553 } 554 555 void tube_close_read(struct tube* ATTR_UNUSED(tube)) 556 { 557 verbose(VERB_ALGO, "tube close_read"); 558 } 559 560 void tube_close_write(struct tube* ATTR_UNUSED(tube)) 561 { 562 verbose(VERB_ALGO, "tube close_write"); 563 /* wake up waiting reader with an empty queue */ 564 if(!WSASetEvent(tube->event)) { 565 log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); 566 } 567 } 568 569 void tube_remove_bg_listen(struct tube* tube) 570 { 571 verbose(VERB_ALGO, "tube remove_bg_listen"); 572 ub_winsock_unregister_wsaevent(tube->ev_listen); 573 } 574 575 void tube_remove_bg_write(struct tube* tube) 576 { 577 verbose(VERB_ALGO, "tube remove_bg_write"); 578 if(tube->res_list) { 579 struct tube_res_list* np, *p = tube->res_list; 580 tube->res_list = NULL; 581 tube->res_last = NULL; 582 while(p) { 583 np = p->next; 584 free(p->buf); 585 free(p); 586 p = np; 587 } 588 } 589 } 590 591 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 592 int ATTR_UNUSED(nonblock)) 593 { 594 uint8_t* a; 595 verbose(VERB_ALGO, "tube write_msg len %d", (int)len); 596 a = (uint8_t*)memdup(buf, len); 597 if(!a) { 598 log_err("out of memory in tube_write_msg"); 599 return 0; 600 } 601 /* always nonblocking, this pipe cannot get full */ 602 return tube_queue_item(tube, a, len); 603 } 604 605 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 606 int nonblock) 607 { 608 struct tube_res_list* item = NULL; 609 verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking"); 610 *buf = NULL; 611 if(!tube_poll(tube)) { 612 verbose(VERB_ALGO, "tube read_msg nodata"); 613 /* nothing ready right now, wait if we want to */ 614 if(nonblock) 615 return -1; /* would block waiting for items */ 616 if(!tube_wait(tube)) 617 return 0; 618 } 619 lock_basic_lock(&tube->res_lock); 620 if(tube->res_list) { 621 item = tube->res_list; 622 tube->res_list = item->next; 623 if(tube->res_last == item) { 624 /* the list is now empty */ 625 tube->res_last = NULL; 626 verbose(VERB_ALGO, "tube read_msg lastdata"); 627 if(!WSAResetEvent(tube->event)) { 628 log_err("WSAResetEvent: %s", 629 wsa_strerror(WSAGetLastError())); 630 } 631 } 632 } 633 lock_basic_unlock(&tube->res_lock); 634 if(!item) 635 return 0; /* would block waiting for items */ 636 *buf = item->buf; 637 *len = item->len; 638 free(item); 639 verbose(VERB_ALGO, "tube read_msg len %d", (int)*len); 640 return 1; 641 } 642 643 int tube_poll(struct tube* tube) 644 { 645 struct tube_res_list* item = NULL; 646 lock_basic_lock(&tube->res_lock); 647 item = tube->res_list; 648 lock_basic_unlock(&tube->res_lock); 649 if(item) 650 return 1; 651 return 0; 652 } 653 654 int tube_wait(struct tube* tube) 655 { 656 /* block on eventhandle */ 657 DWORD res = WSAWaitForMultipleEvents( 658 1 /* one event in array */, 659 &tube->event /* the event to wait for, our pipe signal */, 660 0 /* wait for all events is false */, 661 WSA_INFINITE /* wait, no timeout */, 662 0 /* we are not alertable for IO completion routines */ 663 ); 664 if(res == WSA_WAIT_TIMEOUT) { 665 return 0; 666 } 667 if(res == WSA_WAIT_IO_COMPLETION) { 668 /* a bit unexpected, since we were not alertable */ 669 return 0; 670 } 671 return 1; 672 } 673 674 int tube_wait_timeout(struct tube* tube, int msec) 675 { 676 /* block on eventhandle */ 677 DWORD res = WSAWaitForMultipleEvents( 678 1 /* one event in array */, 679 &tube->event /* the event to wait for, our pipe signal */, 680 0 /* wait for all events is false */, 681 msec /* wait for timeout */, 682 0 /* we are not alertable for IO completion routines */ 683 ); 684 if(res == WSA_WAIT_TIMEOUT) { 685 return 0; 686 } 687 if(res == WSA_WAIT_IO_COMPLETION) { 688 /* a bit unexpected, since we were not alertable */ 689 return -1; 690 } 691 return 1; 692 } 693 694 int tube_read_fd(struct tube* ATTR_UNUSED(tube)) 695 { 696 /* nothing sensible on Windows */ 697 return -1; 698 } 699 700 int 701 tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 702 int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info)) 703 { 704 log_assert(0); 705 return 0; 706 } 707 708 int 709 tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 710 int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info)) 711 { 712 log_assert(0); 713 return 0; 714 } 715 716 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base, 717 tube_callback_type* cb, void* arg) 718 { 719 tube->listen_cb = cb; 720 tube->listen_arg = arg; 721 if(!comm_base_internal(base)) 722 return 1; /* ignore when no comm base - testing */ 723 tube->ev_listen = ub_winsock_register_wsaevent( 724 comm_base_internal(base), tube->event, &tube_handle_signal, tube); 725 return tube->ev_listen ? 1 : 0; 726 } 727 728 int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube), 729 struct comm_base* ATTR_UNUSED(base)) 730 { 731 /* the queue item routine performs the signaling */ 732 return 1; 733 } 734 735 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) 736 { 737 struct tube_res_list* item; 738 if(!tube) return 0; 739 item = (struct tube_res_list*)malloc(sizeof(*item)); 740 verbose(VERB_ALGO, "tube queue_item len %d", (int)len); 741 if(!item) { 742 free(msg); 743 log_err("out of memory for async answer"); 744 return 0; 745 } 746 item->buf = msg; 747 item->len = len; 748 item->next = NULL; 749 lock_basic_lock(&tube->res_lock); 750 /* add at back of list, since the first one may be partially written */ 751 if(tube->res_last) 752 tube->res_last->next = item; 753 else tube->res_list = item; 754 tube->res_last = item; 755 /* signal the eventhandle */ 756 if(!WSASetEvent(tube->event)) { 757 log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); 758 } 759 lock_basic_unlock(&tube->res_lock); 760 return 1; 761 } 762 763 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 764 void* arg) 765 { 766 struct tube* tube = (struct tube*)arg; 767 uint8_t* buf; 768 uint32_t len = 0; 769 verbose(VERB_ALGO, "tube handle_signal"); 770 while(tube_poll(tube)) { 771 if(tube_read_msg(tube, &buf, &len, 1)) { 772 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 773 (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR, 774 tube->listen_arg); 775 } 776 } 777 } 778 779 #endif /* USE_WINSOCK */ 780