1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * ident "%Z%%M% %I% %E% SMI" 24 * 25 * Copyright 1999-2002 Sun Microsystems, Inc. All rights reserved. 26 * Use is subject to license terms. 27 * 28 */ 29 30 // Transact.java: Low level details of performing an SLP 31 // network transaction. 32 33 package com.sun.slp; 34 35 import java.util.*; 36 import java.net.*; 37 import java.io.*; 38 39 /** 40 * Transact performs the low level details for transacting an SLP network 41 * query. Note that, in the future, this class may spin separate threads 42 * for DA requests as well. 43 */ 44 45 class Transact extends Object implements Runnable { 46 47 // Cache of open TCP sockets. 48 49 private static final Hashtable TCPSocketCache = new Hashtable(); 50 51 // SLP config object. 52 53 protected static SLPConfig config = null; 54 55 // Message to send. 56 57 protected SrvLocMsg msgOut = null; 58 59 // Vector of return values. 60 61 protected Vector returns = null; 62 63 // Timeout for multicast convergence. Varies if it's DA discovery or 64 // request multicast. 65 66 protected int[] MSTimeouts; 67 68 // Maximum results desired for multicast. 69 70 protected int maxResults = 0; 71 72 // Exception to throw. 73 74 protected ServiceLocationException exErr = null; 75 76 // Multicast address to use. 77 78 protected InetAddress address = null; 79 80 // If this is true, continue multicast after the first set of stuff 81 // is found. Exit when three tries have happened without finding 82 // anything. 83 84 boolean continueAfterFound = false; 85 86 /** 87 * Perform a query to the SLP network. The multicast query is performed 88 * in a separate thread for performance reasons. DAs having the 89 * same scope set are queried until one answers. These DAs form 90 * an equivalence class. 91 * 92 * @param daEquivClasses Vector of DATable.DARecord objects in the 93 * same equivalence clase w.r.t. scopes. 94 * @param uniMsg A unicast message to send. 95 * @param multiMsg A multicast message to send. 96 * @param address Multicast address to use. 97 * @return Vector of SrvLocMsg objects with results. 98 */ 99 100 static Vector 101 transactUA(Vector daEquivClasses, 102 SrvLocMsg uniMsg, 103 SrvLocMsg multiMsg, 104 InetAddress address) 105 throws ServiceLocationException { 106 107 // If we need to multicast, then start the multicast thread. 108 109 Vector ret = new Vector(); 110 Thread multiThread = null; 111 Transact tracon = null; 112 113 if (multiMsg != null) { 114 115 // Create a new Transact multicast thread. 116 117 // The final argument to the constructor of Transact determines 118 // whether to return after the first result or to continue to 119 // gather more than one result. The value to this field 120 // continueAfterFound MUST be set to 'true' or else multicast 121 // based discovery will find the first result, not all results, 122 // as it should. 123 tracon = 124 new Transact(multiMsg, 125 ret, 126 config.getMulticastTimeouts(), 127 config.getMaximumResults(), 128 address, 129 true); // continueAfterFound 130 131 multiThread = new Thread(tracon); 132 133 // Run it. 134 135 multiThread.start(); 136 137 } 138 139 // Go through the msgTable doing all the DAs. 140 141 ServiceLocationException exx = null; 142 143 if (daEquivClasses != null) { 144 exx = 145 transactUnicastMsg(daEquivClasses, 146 uniMsg, 147 ret, 148 config.getMaximumResults()); 149 150 } 151 152 // Wait until the TransactConverge thread is done, if necessary. 153 154 if (multiThread != null) { 155 156 try { 157 multiThread.join(); 158 159 } catch (InterruptedException ex) { 160 161 } 162 163 } 164 165 // If there was a problem in either the multicast thread or in 166 // the unicast call, throw an exception, but *only* if no 167 // results came back. 168 169 if (ret.size() <= 0) { 170 171 if (exx != null) { 172 short err = exx.getErrorCode(); 173 174 if (err != ServiceLocationException.VERSION_NOT_SUPPORTED && 175 err != ServiceLocationException.INTERNAL_ERROR && 176 err != ServiceLocationException.OPTION_NOT_SUPPORTED && 177 err != ServiceLocationException.REQUEST_NOT_SUPPORTED) { 178 throw exx; 179 180 } 181 182 } 183 184 if (tracon != null && tracon.exErr != null) { 185 short err = tracon.exErr.getErrorCode(); 186 187 if (err != ServiceLocationException.VERSION_NOT_SUPPORTED && 188 err != ServiceLocationException.INTERNAL_ERROR && 189 err != ServiceLocationException.OPTION_NOT_SUPPORTED && 190 err != ServiceLocationException.REQUEST_NOT_SUPPORTED) { 191 throw tracon.exErr; 192 193 } 194 } 195 } 196 197 198 // Return the result to the client. 199 200 return ret; 201 202 } 203 204 /** 205 * Transact a message with DAs. Put the returned SrvLocMsg 206 * object into the Vector ret. 207 * 208 * @param daEquivClasses Vector of DATable.DARecord objects in the 209 * same equivalence clase w.r.t. scopes. 210 * @param msg SrvLocMsg Message to send. 211 * @param ret Vector for returns. 212 * @param maxResults Maximum results expected. 213 * @return A ServiceLocationException object if an exception occured. 214 * @exception ServiceLocationException 215 * If results cannot be obtained in the timeout interval 216 * specified in the 'config.' or 217 * If networking resources cannot be obtained or used 218 * effectively. 219 */ 220 static ServiceLocationException 221 transactUnicastMsg(Vector daEquivClasses, 222 SrvLocMsg msg, 223 Vector ret, 224 int maxResults) { 225 226 // Get the config object if we need it. 227 228 if (config == null) { 229 config = SLPConfig.getSLPConfig(); 230 231 } 232 233 DatagramSocket ds = null; 234 int i, n = daEquivClasses.size(); 235 ServiceLocationException exx = null; 236 InetAddress addr = null; 237 int numReplies = 0; 238 DATable daTable = DATable.getDATable(); 239 240 try { 241 242 // Go through the DA address equivalence classes we need 243 // to query. 244 245 for (i = 0; i < n && numReplies < maxResults; i++) { 246 247 DATable.DARecord rec = 248 (DATable.DARecord)daEquivClasses.elementAt(i); 249 Vector daAddresses = (Vector)rec.daAddresses.clone(); 250 251 // Get a new outgoing socket. 252 253 if (ds == null) { 254 ds = new DatagramSocket(); 255 256 } 257 258 // Go through the DA addresses until we get a reply from one. 259 260 Enumeration en = daAddresses.elements(); 261 SrvLocHeader mhdr = msg.getHeader(); 262 263 while (en.hasMoreElements()) { 264 265 try { 266 267 addr = (InetAddress)en.nextElement(); 268 269 if (config.traceDATraffic()) { 270 config.writeLog("sending_da_trace", 271 new Object[] { 272 Integer.toHexString(mhdr.xid), 273 addr}); 274 275 } 276 277 // Get the reply message if any. 278 279 SrvLocMsg rply = transactDatagramMsg(ds, addr, msg); 280 281 if (!filterRply(msg, rply, addr)) { 282 continue; 283 284 } 285 286 SrvLocHeader rhdr = rply.getHeader(); 287 288 if (config.traceDATraffic()) { 289 config.writeLog("reply_da_trace", 290 new Object[] { 291 Integer.toHexString(rhdr.xid), 292 addr}); 293 294 } 295 296 // If overflow, try TCP. 297 298 if (rhdr.overflow) { 299 if (config.traceDATraffic()) { 300 config.writeLog("tcp_send_da_trace", 301 new Object[] { 302 Integer.toHexString(mhdr.xid), 303 addr}); 304 305 } 306 307 rply = transactTCPMsg(addr, msg, false); 308 309 if (config.traceDATraffic()) { 310 config.writeLog("tcp_reply_da_trace", 311 new Object[] { 312 (msg == null ? "<null>": 313 Integer.toHexString(mhdr.xid)), 314 addr}); 315 316 } 317 318 if (rply == null) { 319 continue; 320 321 } 322 323 } 324 325 // Increment number of replies we received. 326 327 SrvLocHeader hdr = rply.getHeader(); 328 329 numReplies += hdr.iNumReplies; 330 331 // Add to return vector. 332 333 ret.addElement(rply); 334 335 // Break out of the loop, since we only need one in 336 // this equivalence class. 337 338 break; 339 340 } catch (ServiceLocationException ex) { 341 342 config.writeLog("da_exception_trace", 343 new Object[] { 344 new Short(ex.getErrorCode()), 345 addr, 346 ex.getMessage()}); 347 348 // In case we are querying more than one DA, we 349 // save th exception, returning it to the caller to 350 // decide if it should be thrown. We ignore DA_BUSY, 351 // though, since the DA may free up later. 352 353 short errCode = ex.getErrorCode(); 354 355 if (errCode != ServiceLocationException.DA_BUSY) { 356 exx = ex; 357 358 } 359 360 // If the error code is NETWORK_TIMED_OUT, then remove 361 // this DA from the DA table. If it's just down 362 // temporarily, we'll get it next time we go to 363 // the server to get the DA addresses. 364 365 if (errCode == 366 ServiceLocationException.NETWORK_TIMED_OUT) { 367 368 if (config.traceDATraffic()) { 369 config.writeLog("da_drop", 370 new Object[] { 371 addr, rec.scopes}); 372 373 } 374 375 daTable.removeDA(addr, rec.scopes); 376 377 } 378 } 379 } 380 } 381 382 } catch (SocketException ex) { 383 exx = 384 new ServiceLocationException( 385 ServiceLocationException.NETWORK_ERROR, 386 "socket_creation_failure", 387 new Object[] {addr, ex.getMessage()}); 388 389 } finally { 390 391 // Clean up socket. 392 393 if (ds != null) { 394 ds.close(); 395 } 396 } 397 398 return exx; 399 } 400 401 /** 402 * Transact a message via. UDP. Try a maximum of three times if 403 * a timeout. 404 * 405 * @param ds The datagram socket to use. 406 * @param addr The DA to contact. 407 * @param msg The message to send. 408 * @return The SrvLocMsg returned or null if none. 409 * @exception ServiceLocationException Due to errors in parsing message. 410 */ 411 412 static private SrvLocMsg 413 transactDatagramMsg(DatagramSocket ds, InetAddress addr, SrvLocMsg msg) 414 throws ServiceLocationException { 415 416 SrvLocMsg rply = null; 417 byte[] outbuf = getBytes(msg, false, false); 418 byte[] inbuf = new byte[Defaults.iReadMaxMTU]; 419 420 // Construct the datagram packet to send. 421 422 DatagramPacket dpReply = 423 new DatagramPacket(inbuf, inbuf.length); 424 DatagramPacket dpRequest = 425 new DatagramPacket(outbuf, outbuf.length, addr, Defaults.iSLPPort); 426 int[] timeouts = config.getDatagramTimeouts(); 427 428 // Resend for number of timeouts in timeout interval. 429 430 int i; 431 432 for (i = 0; i < timeouts.length; i++) { 433 434 // Catch timeout and IO errors. 435 436 try { 437 438 ds.setSoTimeout(timeouts[i]); 439 ds.send(dpRequest); 440 ds.receive(dpReply); 441 442 // Process result into a reply object. 443 444 DataInputStream dis = 445 new DataInputStream( 446 new ByteArrayInputStream(dpReply.getData())); 447 448 rply = internalize(dis, addr); 449 break; 450 451 } catch (InterruptedIOException ex) { 452 453 // Did not get it on the first timeout, try again. 454 455 if (config.traceDrop()|| config.traceDATraffic()) { 456 config.writeLog("udp_timeout", 457 new Object[] {addr}); 458 459 } 460 461 continue; 462 463 } catch (IOException ex) { 464 Object[] message = {addr, ex.getMessage()}; 465 466 if (config.traceDrop() || config.traceDATraffic()) { 467 config.writeLog("datagram_io_error", 468 message); 469 470 } 471 472 throw 473 new ServiceLocationException( 474 ServiceLocationException.NETWORK_ERROR, 475 "datagram_io_error", 476 message); 477 478 } 479 } 480 481 // If nothing, then we've timed out. DAs with no matching 482 // info should at least return a reply. 483 484 if (rply == null) { 485 throw 486 new ServiceLocationException( 487 ServiceLocationException.NETWORK_TIMED_OUT, 488 "udp_timeout", 489 new Object[] {addr}); 490 491 } 492 493 return rply; 494 } 495 496 /** 497 * Transact a message using TCP, since the reply was too big. 498 * @parameter addr Address of the DA to contact. 499 * @parameter msg The message object to use. 500 * @parameter cacheIt Cache socket, if new. 501 * @return The SrvLocMsg returned if any. 502 * @exception ServiceLocationException 503 * If results cannot be obtained in the timeout interval 504 * specified in the 'config.' 505 * If networking resources cannot be obtained or used 506 * effectively. 507 */ 508 509 static SrvLocMsg 510 transactTCPMsg(InetAddress addr, SrvLocMsg msg, boolean cacheIt) 511 throws ServiceLocationException { 512 513 // Get the config object if we need it. 514 515 if (config == null) { 516 config = SLPConfig.getSLPConfig(); 517 518 } 519 520 SrvLocMsg rply = null; 521 522 try { 523 524 // Transact the message, taking care of socket caching. 525 526 rply = transactMsg(addr, msg, cacheIt, true); 527 528 } catch (InterruptedIOException ex) { 529 Object[] message = {addr}; 530 531 if (config.traceDrop()|| config.traceDATraffic()) { 532 config.writeLog("tcp_timeout", 533 message); 534 535 } 536 537 throw 538 new ServiceLocationException( 539 ServiceLocationException.NETWORK_TIMED_OUT, 540 "tcp_timeout", 541 message); 542 543 } catch (IOException ex) { 544 Object[] message = {addr, ex.getMessage()}; 545 546 if (config.traceDrop() || config.traceDATraffic()) { 547 config.writeLog("tcp_io_error", 548 message); 549 550 } 551 552 throw 553 new ServiceLocationException( 554 ServiceLocationException.NETWORK_ERROR, 555 "tcp_io_error", 556 message); 557 558 } 559 560 // Filter reply for nulls, invalid xid. 561 562 if (!filterRply(msg, rply, addr)) { 563 return null; 564 565 } 566 567 return rply; 568 } 569 570 // Uncache a socket. 571 572 static private void uncacheSocket(InetAddress addr, Socket s) { 573 574 try { 575 576 s.close(); 577 578 } catch (IOException ex) { 579 580 } 581 582 TCPSocketCache.remove(addr); 583 584 } 585 586 // Get a (possibly cached) TCP socket, cache it if cache is on. 587 588 static private Socket getTCPSocket(InetAddress addr, boolean cacheIt) 589 throws IOException { 590 591 Socket s = null; 592 593 // We use the cached socket if we've got it. 594 595 s = (Socket)TCPSocketCache.get(addr); 596 597 if (s == null) { 598 s = new Socket(addr, Defaults.iSLPPort); 599 600 // Set it so the socket will block for fixed timeout. 601 602 s.setSoTimeout(config.getTCPTimeout()); 603 604 } 605 606 // We cache it if we're supposed to. 607 608 if (cacheIt) { 609 TCPSocketCache.put(addr, s); 610 611 } 612 613 return s; 614 } 615 616 // Transact the message, using cached socket if necessary. Retry if 617 // flag is true. 618 619 static private SrvLocMsg 620 transactMsg(InetAddress addr, 621 SrvLocMsg msg, 622 boolean cacheIt, 623 boolean retry) 624 throws InterruptedIOException, IOException, ServiceLocationException { 625 626 Socket s = null; 627 byte outbuf[] = getBytes(msg, false, true); 628 629 try { 630 631 s = getTCPSocket(addr, cacheIt); 632 633 DataOutputStream dos = new DataOutputStream(s.getOutputStream()); 634 DataInputStream dis = new DataInputStream(s.getInputStream()); 635 636 // In case the server cuts us off... 637 638 try { 639 640 // Only one thread at a time gets to use this socket, in case 641 // it was cached. Otherwise, we *may* get interleaved i/o. 642 643 synchronized (s) { 644 645 // Send the request. 646 647 dos.write(outbuf, 0, outbuf.length); 648 649 // Read reply. 650 651 return internalize(dis, addr); 652 653 } 654 655 } catch (IOException ex) { 656 657 // Uncache it, get a new one. If that one doesn't work, we're 658 // hosed. 659 660 uncacheSocket(addr, s); 661 662 s = null; 663 664 if (!retry) { 665 throw ex; 666 667 } 668 669 // Recursively call ourselves to take care of this, but 670 // don't retry it. 671 672 return transactMsg(addr, msg, cacheIt, false); 673 674 } 675 676 } finally { 677 678 if (s != null && !cacheIt) { 679 uncacheSocket(addr, s); 680 681 } 682 } 683 } 684 685 // Externalize the message into bytes. 686 687 static protected byte[] getBytes(SrvLocMsg slm, 688 boolean isMulti, 689 boolean isTCP) 690 throws ServiceLocationException { 691 692 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 693 SrvLocHeader hdr = slm.getHeader(); 694 695 hdr.externalize(baos, isMulti, isTCP); 696 697 byte[] outbuf = baos.toByteArray(); 698 699 // Check if it excceds the output buffer length. 700 701 if (hdr.overflow) { 702 throw 703 new ServiceLocationException( 704 ServiceLocationException.BUFFER_OVERFLOW, 705 "buffer_overflow", 706 new Object[] { 707 new Integer(outbuf.length), 708 new Integer(config.getMTU())}); 709 } 710 711 return outbuf; 712 } 713 714 // Filter the reply to make sure the xid matches and that it's not null. 715 716 static protected boolean 717 filterRply(SrvLocMsg msg, SrvLocMsg rply, InetAddress addr) { 718 719 SrvLocHeader mhdr = msg.getHeader(); 720 SrvLocHeader rhdr = rply.getHeader(); 721 722 if (rply == null) { 723 if (config.traceDrop()) { 724 config.writeLog("reply_unparsable", 725 new Object[] {addr}); 726 727 } 728 729 return false; 730 731 } 732 733 // Check for invalid xid. 734 735 if (mhdr.xid != rhdr.xid) { 736 if (config.traceDrop()) { 737 config.writeLog("wrong_xid", 738 new Object[] {addr}); 739 740 } 741 return false; 742 743 } 744 return true; 745 746 } 747 748 /** 749 * Internalize the byte array in the input stream into a SrvLocMsg 750 * subclass. It will be an appropriate subclass for the client agent. 751 * If an exception comes out of this method, it is converted into 752 * a SrvLocMsg with error code. 753 * 754 * 755 * @param dis The input stream containing the packet. 756 * @param addr The address of the replying agent (for error reporting). 757 * @return The right SrvLocMsg subclass appropriate for the Client Agent. 758 * If null is returned, the function code wasn't recognized, 759 * and so it may be appropriate for another agent. 760 * @exception ServiceLocationException If the character set was not valid 761 * or an error occured during parsing. 762 * @exception IOException If DataInputStream throws it. 763 */ 764 765 static protected SrvLocMsg internalize(DataInputStream dis, 766 InetAddress addr) 767 throws ServiceLocationException { 768 769 int ver = 0, fun = 0; 770 SrvLocMsg msg = null; 771 SrvLocHeader hdr = null; 772 byte[] b = new byte[2]; 773 774 try { 775 776 dis.readFully(b, 0, 2); 777 778 ver = (int) ((char)b[0] & 0XFF); 779 fun = (int) ((char)b[1] & 0XFF); 780 781 // Unrecognized version number if header not returned. 782 783 if (ver != Defaults.version) { 784 throw 785 new ServiceLocationException( 786 ServiceLocationException.VERSION_NOT_SUPPORTED, 787 "version_number_error", 788 new Object[] {new Integer(ver)}); 789 790 } 791 792 // Create the header. Note that we only need to create a 793 // client side header here, because that is all that 794 // will be expected by the client side code. Note that we 795 // *can't* use the SrvLocHeader.newInstance() mechanism 796 // because Transact lives in the server as well, and 797 // SrvLocHeader can only handle one header class per 798 // version. 799 800 hdr = new SLPHeaderV2(); 801 802 // Parse header. 803 804 hdr.parseHeader(fun, dis); 805 806 // Parse body. 807 808 if ((msg = hdr.parseMsg(dis)) != null) { 809 810 // Parse options, if any. 811 812 hdr.parseOptions(dis); 813 814 } 815 816 } catch (IllegalArgumentException ex) { 817 818 // During parsing, this can be thrown if syntax errors occur. 819 820 throw 821 new ServiceLocationException( 822 ServiceLocationException.PARSE_ERROR, 823 "passthrough_addr", 824 new Object[] {ex.getMessage(), addr}); 825 826 } catch (IOException ex) { 827 828 // If version code is zero, then we suspect a network error, 829 // otherwise, it is probably a parse error. 830 831 String fcode = (fun == 0 ? "???":Integer.toString(fun)); 832 short exCode = 833 (ver == 0 ? ServiceLocationException.NETWORK_ERROR: 834 ServiceLocationException.PARSE_ERROR); 835 836 // During parsing, this can be thrown if the message stream 837 // is improperly formatted. 838 839 throw 840 new ServiceLocationException(exCode, 841 "ioexception_parsing", 842 new Object[] { 843 ex, fcode, addr, ex.getMessage()}); 844 845 } catch (ServiceLocationException ex) { 846 847 // Add the address of the replying agent. 848 849 throw 850 new ServiceLocationException(ex.getErrorCode(), 851 "passthrough_addr", 852 new Object[] { 853 ex.getMessage(), addr}); 854 855 } 856 857 return msg; 858 } 859 860 // Send out the message. 861 862 static protected void 863 send(DatagramSocket ds, SrvLocMsg msg, InetAddress addr) 864 throws ServiceLocationException, IOException { 865 866 byte[] outbuf = getBytes(msg, true, false); 867 DatagramPacket dpsend = 868 new DatagramPacket(outbuf, outbuf.length, addr, Defaults.iSLPPort); 869 ds.send(dpsend); 870 871 } 872 873 // Check the response and add the previous responder if it is OK. 874 875 static protected boolean 876 addPreviousResponder(SrvLocMsg msg, InetAddress addr) { 877 878 // Add incoming result to the vector. 879 880 SrvLocHeader hdr = msg.getHeader(); 881 Vector v = hdr.previousResponders; 882 String srcAddr = addr.getHostAddress(); 883 884 if (v.contains(srcAddr)) { // the SRC ignored its PR list 885 if (config.traceDrop()) { 886 config.writeLog("drop_pr", 887 new Object[] { 888 srcAddr, 889 Integer.toHexString(hdr.xid)}); 890 891 } 892 return false; 893 894 } else { 895 hdr.addPreviousResponder(addr); 896 return true; 897 898 } 899 } 900 901 // Transact an active request for DA or SA adverts. 902 903 static Vector transactActiveAdvertRequest(ServiceType type, 904 SrvLocMsg rqst, 905 ServerDATable daTable) 906 throws ServiceLocationException { 907 908 // Perform active advertisement. 909 910 Vector ret = new Vector(); 911 Vector results = new Vector(); 912 913 // Create Transact object and start. 914 915 Transact tran = new Transact(rqst, 916 results, 917 config.getMulticastTimeouts(), 918 Integer.MAX_VALUE, // config doesn't apply 919 config.getMulticastAddress(), 920 true); 921 922 Thread multiThread = new Thread(tran); 923 924 multiThread.start(); 925 926 // Wait until the TransactConverge thread is done, if necessary. 927 928 try { 929 multiThread.join(); 930 931 } catch (InterruptedException ex) { 932 933 } 934 935 ServiceLocationException ex = tran.exErr; 936 937 // Report error. 938 939 if (ex != null && config.traceDATraffic()) { 940 config.writeLog("sdat_active_err", 941 new Object[] {new Integer(ex.getErrorCode()), 942 ex.getMessage()}); 943 944 throw ex; 945 946 } 947 948 // Process the results. 949 950 int i, n = results.size(); 951 952 for (i = 0; i < n; i++) { 953 Object msg = results.elementAt(i); 954 955 if ((type.equals(Defaults.DA_SERVICE_TYPE) && 956 !(msg instanceof CDAAdvert)) || 957 (type.equals(Defaults.SA_SERVICE_TYPE) && 958 !(msg instanceof CSAAdvert))) { 959 960 if (config.traceDrop()) { 961 config.writeLog("sdat_nonadvert_err", 962 new Object[] { 963 msg}); 964 965 } 966 967 continue; 968 } 969 970 // Let DA table handle it if it`s a DAAdvert. 971 972 if (type.equals(Defaults.DA_SERVICE_TYPE)) { 973 CDAAdvert advert = (CDAAdvert)msg; 974 975 daTable.handleAdvertIn(advert); 976 977 } else { 978 979 // Add scopes from the SAAdvert if not already there. 980 981 SrvLocHeader hdr = ((SrvLocMsg)msg).getHeader(); 982 983 int j, m = hdr.scopes.size(); 984 985 for (j = 0; j < m; j++) { 986 Object o = hdr.scopes.elementAt(j); 987 988 if (!ret.contains(o)) { 989 ret.addElement(o); 990 991 } 992 } 993 } 994 } 995 996 return ret; 997 } 998 999 // Construct a Transact object to run a convergence transaction in 1000 // a separate thread. 1001 1002 Transact(SrvLocMsg msg, 1003 Vector ret, 1004 int[] msT, 1005 int mResults, 1006 InetAddress address, 1007 boolean continueAfterFound) { 1008 1009 msgOut = msg; 1010 returns = ret; 1011 MSTimeouts = msT; 1012 maxResults = mResults; 1013 this.address = address; 1014 this.continueAfterFound = continueAfterFound; 1015 } 1016 1017 // Run the multicast convergence algorithm. 1018 1019 public void run() { 1020 1021 Exception xes = null; 1022 DatagramSocket ds = null; 1023 1024 // Get the config object if we need it. 1025 1026 if (config == null) { 1027 config = SLPConfig.getSLPConfig(); 1028 1029 } 1030 1031 // Set thread name. 1032 1033 if (config.isBroadcastOnly()) { 1034 Thread.currentThread().setName("SLP Broadcast Transact"); 1035 address = config.getBroadcastAddress(); 1036 1037 } else { 1038 Thread.currentThread().setName("SLP Multicast Transact"); 1039 1040 } 1041 1042 try { 1043 1044 // Multicast out on the default interface only. 1045 1046 ds = config.getMulticastSocketOnInterface(config.getLocalHost(), 1047 true); 1048 1049 // Perform convergence. 1050 1051 transactConvergeMsg(address, 1052 ds, 1053 msgOut, 1054 returns, 1055 MSTimeouts, 1056 maxResults, 1057 continueAfterFound); 1058 1059 ds.close(); 1060 1061 ds = null; 1062 1063 } catch (ServiceLocationException ex) { 1064 1065 // Ignore DA_BUSY, the DA may free up later. 1066 1067 if (ex.getErrorCode() != ServiceLocationException.DA_BUSY) { 1068 exErr = ex; 1069 xes = ex; 1070 1071 } 1072 1073 } catch (Exception ex) { 1074 1075 // Create new exception to be thrown. 1076 1077 xes = ex; 1078 exErr = new ServiceLocationException( 1079 ServiceLocationException.INTERNAL_SYSTEM_ERROR, 1080 "passthrough", 1081 new Object[] {ex.getMessage()}); 1082 1083 } finally { 1084 1085 // Close the socket if it's been opened. 1086 1087 if (ds != null) { 1088 ds.close(); 1089 1090 } 1091 } 1092 1093 // Log any errors. 1094 1095 if (xes != null) { 1096 StringWriter sw = new StringWriter(); 1097 PrintWriter pw = new PrintWriter(sw); 1098 1099 xes.printStackTrace(pw); 1100 pw.flush(); 1101 1102 config.writeLog("multicast_error", 1103 new Object[] {xes.getMessage(), 1104 sw.toString()}); 1105 1106 } 1107 } 1108 1109 /** 1110 * Send the message using multicast and use convergence to gather the 1111 * results. Note that this routine must be synchronized because 1112 * only one multicast can be active at a time; othewise, the client 1113 * may get back an unexpected result. However, there can be many unicast 1114 * requests active along with a multicast request, hence the separate 1115 * thread for multicast. 1116 * 1117 * The subtlety of the timing routine is that it will only resend the 1118 * message when one of the multicast convergence timeout intervals 1119 * elapses. Further, for efficiency, it will give up after a complete 1120 * interval has gone by without receiving any results. This may mean 1121 * that the intervals have to be extended in larger networks. In the 1122 * common case, multicast convergence will complete under 3 seconds 1123 * as all results will arrive during the first interval (1 second long) 1124 * and none will arrive during the second interval. 1125 * 1126 * @param addr The multicast/broadcast address to send the request to. 1127 * @param ds The datagram socket to send on. 1128 * @param msg The message to send. 1129 * @param vResult A vector in which to put the returns. 1130 * @param msTimeouts Array of timeout values for multicast convergence. 1131 * @param maxResults Maximum replies desired. 1132 * @param continueAfterFound If true, continue after something is 1133 * found. Try three times if nothing was 1134 * found. If false, exit at the first 1135 * timeout. DA discovery should set this 1136 * to true so as many DAs as possible are 1137 * found, otherwise, it should be false. 1138 * @exception ServiceLocationException 1139 * If results cannot be obtained in the timeout interval 1140 * specified in the 'config.' or 1141 * if networking resources cannot be obtained or used 1142 * effectively. 1143 */ 1144 1145 static public void 1146 transactConvergeMsg(InetAddress addr, 1147 DatagramSocket ds, 1148 SrvLocMsg msg, 1149 Vector vResult, 1150 int[] msTimeouts, 1151 int maxResults, 1152 boolean continueAfterFound) 1153 throws ServiceLocationException { 1154 1155 // Get the config object if we need it. 1156 1157 if (config == null) { 1158 config = SLPConfig.getSLPConfig(); 1159 1160 } 1161 1162 int numReplies = 0; 1163 int tries = 0; 1164 SrvLocMsg rply = null; 1165 ByteArrayOutputStream baos = null; 1166 int multiMax = config.getMulticastMaximumWait(); 1167 long lStartTime = System.currentTimeMillis(); 1168 int mtu = config.getMTU(); 1169 1170 try { 1171 1172 // Send the request for the 1st iteration. It will be sent again 1173 // only when the timeout intervals elapse. 1174 1175 send(ds, msg, addr); 1176 tries++; 1177 1178 long lTimeSent = System.currentTimeMillis(); 1179 1180 // Continue collecting results only as long as we need more for 1181 // the 'max results' configuration. 1182 1183 while (numReplies < maxResults) { 1184 1185 // Set up the reply buffer. 1186 1187 byte [] incoming = new byte[mtu]; 1188 DatagramPacket dprecv = 1189 new DatagramPacket(incoming, incoming.length); 1190 1191 // Block on receive (no longer than max timeout - time spent). 1192 1193 int iTimeout = 1194 getTimeout(lStartTime, lTimeSent, multiMax, msTimeouts); 1195 1196 if (iTimeout < 0) { 1197 break; // we have no time left! 1198 } 1199 1200 ds.setSoTimeout(iTimeout); 1201 1202 try { 1203 ds.receive(dprecv); 1204 1205 } catch (InterruptedIOException ex) { 1206 1207 // We try sending at least three times, unless there was 1208 // a timeout. If continueAfterFound is false, we exit 1209 // after the first timeout if something was found. 1210 1211 if ((!continueAfterFound && numReplies > 0) || 1212 (int)(System.currentTimeMillis() - lStartTime) > multiMax || 1213 tries >= 3) { 1214 break; 1215 1216 } 1217 1218 // Now resend the request... 1219 1220 send(ds, msg, addr); 1221 tries++; 1222 1223 lTimeSent = System.currentTimeMillis(); 1224 continue; // since we did not receive anything, continue... 1225 1226 } 1227 1228 // Data was received without timeout or fail. 1229 1230 DataInputStream dis = 1231 new DataInputStream( 1232 new ByteArrayInputStream(dprecv.getData())); 1233 1234 InetAddress raddr = dprecv.getAddress(); 1235 rply = internalize(dis, raddr); 1236 1237 if (!filterRply(msg, rply, raddr)) { 1238 continue; 1239 1240 } 1241 1242 // Add this responder to previous responders. If the message 1243 // was already received but the SA resent because it isn't 1244 // doing multicast convergence correctly, then ignore it. 1245 1246 if (!addPreviousResponder(msg, raddr)) { 1247 continue; 1248 1249 } 1250 1251 // Handle any overflow thru TCP. 1252 1253 SrvLocHeader rhdr = rply.getHeader(); 1254 1255 if (rhdr.overflow) { 1256 1257 rply = transactTCPMsg(raddr, msg, false); 1258 1259 if (rply == null) { 1260 continue; 1261 1262 } 1263 1264 rhdr = rply.getHeader(); 1265 } 1266 1267 // Add response to list. 1268 1269 if (vResult.size() < maxResults) { 1270 vResult.addElement(rply); 1271 1272 } 1273 1274 // Increment the number of results returned. 1275 1276 numReplies += rhdr.iNumReplies; 1277 1278 // Exit if we should not continue. 1279 1280 if (!continueAfterFound) { 1281 break; 1282 1283 } 1284 } 1285 } catch (ServiceLocationException ex) { 1286 1287 // If we broke off because the previous responder's list is too 1288 // long, then return, otherwise throw the exception again. 1289 1290 if (ex.getErrorCode() == 1291 ServiceLocationException.PREVIOUS_RESPONDER_OVERFLOW) { 1292 return; 1293 1294 } 1295 1296 throw ex; 1297 1298 } catch (IOException ex) { 1299 1300 throw 1301 new ServiceLocationException( 1302 ServiceLocationException.NETWORK_ERROR, 1303 "ioexception_conv", 1304 new Object[] {ex, ex.getMessage()}); 1305 1306 } 1307 } 1308 1309 // Calculate the multicast timeout depending on where we are in the loop. 1310 1311 static private int 1312 getTimeout(long lStart, long lSent, int iTimeout, int[] a_iTOs) { 1313 int iTotal = (int)(lSent - lStart); 1314 1315 if (iTimeout < iTotal) { 1316 return -1; 1317 1318 } 1319 1320 int iWaitTotal = 0; 1321 int i; 1322 1323 for (i = 0; i < a_iTOs.length; i++) { 1324 iWaitTotal += a_iTOs[i]; 1325 1326 int iTillNext = (iWaitTotal - iTotal); 1327 1328 if (iTotal < iWaitTotal) { 1329 if (iTimeout < (iTotal + iTillNext)) { 1330 return (iTimeout - iTotal); // max to wait is iTimeout 1331 1332 } else { 1333 return iTillNext; // otherwise wait till next interval 1334 } 1335 } 1336 } 1337 1338 return -1; // if we get here we have waited past all of the timeouts 1339 } 1340 1341 static { 1342 1343 config = SLPConfig.getSLPConfig(); 1344 1345 } 1346 } 1347