xref: /titanic_51/usr/src/lib/libslp/javalib/com/sun/slp/Transact.java (revision 7c478bd95313f5f23a4c958a745db2134aa03244)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * ident	"%Z%%M%	%I%	%E% SMI"
24  *
25  * Copyright 1999-2002 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  *
28  */
29 
30 //  Transact.java:    Low level details of performing an SLP
31 //                    network transaction.
32 
33 package com.sun.slp;
34 
35 import java.util.*;
36 import java.net.*;
37 import java.io.*;
38 
39 /**
40  * Transact performs the low level details for transacting an SLP network
41  * query. Note that, in the future, this class may spin separate threads
42  * for DA requests as well.
43  */
44 
45 class Transact extends Object implements Runnable {
46 
47     // Cache of open TCP sockets.
48 
49     private static final Hashtable TCPSocketCache = new Hashtable();
50 
51     // SLP config object.
52 
53     protected static SLPConfig config = null;
54 
55     // Message to send.
56 
57     protected SrvLocMsg msgOut = null;
58 
59     // Vector of return values.
60 
61     protected Vector returns = null;
62 
63     // Timeout for multicast convergence. Varies if it's DA discovery or
64     //  request multicast.
65 
66     protected int[] MSTimeouts;
67 
68     // Maximum results desired for multicast.
69 
70     protected int maxResults = 0;
71 
72     // Exception to throw.
73 
74     protected ServiceLocationException exErr = null;
75 
76     // Multicast address to use.
77 
78     protected InetAddress address = null;
79 
80     // If this is true, continue multicast after the first set of stuff
81     //  is found. Exit when three tries have happened without finding
82     //  anything.
83 
84     boolean continueAfterFound = false;
85 
86     /**
87      * Perform a query to the SLP network. The multicast query is performed
88      * in a separate thread for performance reasons. DAs having the
89      * same scope set are queried until one answers. These DAs form
90      * an equivalence class.
91      *
92      * @param daEquivClasses Vector of DATable.DARecord objects in the
93      *			  same equivalence clase w.r.t. scopes.
94      * @param uniMsg A unicast message to send.
95      * @param multiMsg A multicast message to send.
96      * @param address Multicast address to use.
97      * @return Vector of SrvLocMsg objects with results.
98      */
99 
100     static Vector
101 	transactUA(Vector daEquivClasses,
102 		   SrvLocMsg uniMsg,
103 		   SrvLocMsg multiMsg,
104 		   InetAddress address)
105 	throws ServiceLocationException {
106 
107 	// If we need to multicast, then start the multicast thread.
108 
109 	Vector ret = new Vector();
110 	Thread multiThread = null;
111 	Transact tracon = null;
112 
113 	if (multiMsg != null) {
114 
115 	    // Create a new Transact multicast thread.
116 
117             // The final argument to the constructor of Transact determines
118             // whether to return after the first result or to continue to
119             // gather more than one result.  The value to this field
120             // continueAfterFound MUST be set to 'true' or else multicast
121             // based discovery will find the first result, not all results,
122             // as it should.
123 	    tracon =
124 		new Transact(multiMsg,
125 			     ret,
126 			     config.getMulticastTimeouts(),
127 			     config.getMaximumResults(),
128 			     address,
129 			     true);  // continueAfterFound
130 
131 	    multiThread = new Thread(tracon);
132 
133 	    // Run it.
134 
135 	    multiThread.start();
136 
137 	}
138 
139 	// Go through the msgTable doing all the DAs.
140 
141 	ServiceLocationException exx = null;
142 
143 	if (daEquivClasses != null) {
144 	    exx =
145 		transactUnicastMsg(daEquivClasses,
146 				   uniMsg,
147 				   ret,
148 				   config.getMaximumResults());
149 
150 	}
151 
152 	// Wait until the TransactConverge thread is done, if necessary.
153 
154 	if (multiThread != null) {
155 
156 	    try {
157 		multiThread.join();
158 
159 	    } catch (InterruptedException ex) {
160 
161 	    }
162 
163 	}
164 
165 	// If there was a problem in either the multicast thread or in
166 	//  the unicast call, throw an exception, but *only* if no
167 	//  results came back.
168 
169 	if (ret.size() <= 0) {
170 
171 	    if (exx != null) {
172 		short err = exx.getErrorCode();
173 
174 		if (err != ServiceLocationException.VERSION_NOT_SUPPORTED &&
175 		    err != ServiceLocationException.INTERNAL_ERROR &&
176 		    err != ServiceLocationException.OPTION_NOT_SUPPORTED &&
177 		    err != ServiceLocationException.REQUEST_NOT_SUPPORTED) {
178 		    throw exx;
179 
180 		}
181 
182 	    }
183 
184 	    if (tracon != null && tracon.exErr != null) {
185 		short err = tracon.exErr.getErrorCode();
186 
187 		if (err != ServiceLocationException.VERSION_NOT_SUPPORTED &&
188 		    err != ServiceLocationException.INTERNAL_ERROR &&
189 		    err != ServiceLocationException.OPTION_NOT_SUPPORTED &&
190 		    err != ServiceLocationException.REQUEST_NOT_SUPPORTED) {
191 		    throw tracon.exErr;
192 
193 		}
194 	    }
195 	}
196 
197 
198 	// Return the result to the client.
199 
200 	return ret;
201 
202     }
203 
204     /**
205      * Transact a message with DAs. Put the returned SrvLocMsg
206      * object into the Vector ret.
207      *
208      * @param daEquivClasses Vector of DATable.DARecord objects in the
209      *			   same equivalence clase w.r.t. scopes.
210      * @param msg SrvLocMsg Message to send.
211      * @param ret Vector for returns.
212      * @param maxResults Maximum results expected.
213      * @return A ServiceLocationException object if an exception occured.
214      * @exception ServiceLocationException
215      *            If results cannot be obtained in the timeout interval
216      *            specified in the 'config.' or
217      *            If networking resources cannot be obtained or used
218      *            effectively.
219      */
220     static ServiceLocationException
221 	transactUnicastMsg(Vector daEquivClasses,
222 			   SrvLocMsg msg,
223 			   Vector ret,
224 			   int maxResults) {
225 
226 	// Get the config object if we need it.
227 
228 	if (config == null) {
229 	    config = SLPConfig.getSLPConfig();
230 
231 	}
232 
233 	DatagramSocket ds = null;
234 	int i, n = daEquivClasses.size();
235 	ServiceLocationException exx = null;
236 	InetAddress addr = null;
237 	int numReplies = 0;
238 	DATable daTable = DATable.getDATable();
239 
240 	try {
241 
242 	    // Go through the DA address equivalence classes we need
243 	    //  to query.
244 
245 	    for (i = 0; i < n && numReplies < maxResults; i++) {
246 
247 		DATable.DARecord rec =
248 		    (DATable.DARecord)daEquivClasses.elementAt(i);
249 		Vector daAddresses = (Vector)rec.daAddresses.clone();
250 
251 		// Get a new outgoing socket.
252 
253 		if (ds == null) {
254 		    ds = new DatagramSocket();
255 
256 		}
257 
258 		// Go through the DA addresses until we get a reply from one.
259 
260 		Enumeration en = daAddresses.elements();
261 		SrvLocHeader mhdr = msg.getHeader();
262 
263 		while (en.hasMoreElements()) {
264 
265 		    try {
266 
267 			addr = (InetAddress)en.nextElement();
268 
269 			if (config.traceDATraffic()) {
270 			    config.writeLog("sending_da_trace",
271 					    new Object[] {
272 				Integer.toHexString(mhdr.xid),
273 				    addr});
274 
275 			}
276 
277 			// Get the reply message if any.
278 
279 			SrvLocMsg rply = transactDatagramMsg(ds, addr, msg);
280 
281 			if (!filterRply(msg, rply, addr)) {
282 			    continue;
283 
284 			}
285 
286 			SrvLocHeader rhdr = rply.getHeader();
287 
288 			if (config.traceDATraffic()) {
289 			    config.writeLog("reply_da_trace",
290 					    new Object[] {
291 				Integer.toHexString(rhdr.xid),
292 				    addr});
293 
294 			}
295 
296 			// If overflow, try TCP.
297 
298 			if (rhdr.overflow) {
299 			    if (config.traceDATraffic()) {
300 				config.writeLog("tcp_send_da_trace",
301 						new Object[] {
302 				    Integer.toHexString(mhdr.xid),
303 					addr});
304 
305 			    }
306 
307 			    rply = transactTCPMsg(addr, msg, false);
308 
309 			    if (config.traceDATraffic()) {
310 				config.writeLog("tcp_reply_da_trace",
311 						new Object[] {
312 				    (msg == null ? "<null>":
313 				     Integer.toHexString(mhdr.xid)),
314 					addr});
315 
316 			    }
317 
318 			    if (rply == null) {
319 				continue;
320 
321 			    }
322 
323 			}
324 
325 			// Increment number of replies we received.
326 
327 			SrvLocHeader hdr = rply.getHeader();
328 
329 			numReplies += hdr.iNumReplies;
330 
331 			// Add to return vector.
332 
333 			ret.addElement(rply);
334 
335 			// Break out of the loop, since we only need one in
336 			//  this equivalence class.
337 
338 			break;
339 
340 		    } catch (ServiceLocationException ex) {
341 
342 			config.writeLog("da_exception_trace",
343 					new Object[] {
344 			    new Short(ex.getErrorCode()),
345 				addr,
346 				ex.getMessage()});
347 
348 			// In case we are querying more than one DA, we
349 			// save th exception, returning it to the caller to
350 			// decide if it should be thrown. We ignore DA_BUSY,
351 			// though, since the DA may free up later.
352 
353 			short errCode = ex.getErrorCode();
354 
355 			if (errCode != ServiceLocationException.DA_BUSY) {
356 			    exx = ex;
357 
358 			}
359 
360 			// If the error code is NETWORK_TIMED_OUT, then remove
361 			//  this DA from the DA table. If it's just down
362 			//  temporarily, we'll get it next time we go to
363 			//  the server to get the DA addresses.
364 
365 			if (errCode ==
366 			    ServiceLocationException.NETWORK_TIMED_OUT) {
367 
368 			    if (config.traceDATraffic()) {
369 				config.writeLog("da_drop",
370 						new Object[] {
371 				    addr, rec.scopes});
372 
373 			    }
374 
375 			    daTable.removeDA(addr, rec.scopes);
376 
377 			}
378 		    }
379 		}
380 	    }
381 
382 	} catch (SocketException ex) {
383 	    exx =
384 		new ServiceLocationException(
385 				ServiceLocationException.NETWORK_ERROR,
386 				"socket_creation_failure",
387 				new Object[] {addr, ex.getMessage()});
388 
389 	} finally {
390 
391 	    // Clean up socket.
392 
393 	    if (ds != null) {
394 		ds.close();
395 	    }
396 	}
397 
398 	return exx;
399     }
400 
401     /**
402      * Transact a message via. UDP. Try a maximum of three times if
403      * a timeout.
404      *
405      * @param ds The datagram socket to use.
406      * @param addr The DA to contact.
407      * @param msg The message to send.
408      * @return The SrvLocMsg returned or null if none.
409      * @exception ServiceLocationException Due to errors in parsing message.
410      */
411 
412     static private SrvLocMsg
413 	transactDatagramMsg(DatagramSocket ds, InetAddress addr, SrvLocMsg msg)
414 	throws ServiceLocationException {
415 
416 	SrvLocMsg rply = null;
417 	byte[] outbuf = getBytes(msg, false, false);
418 	byte[] inbuf = new byte[Defaults.iReadMaxMTU];
419 
420 	// Construct the datagram packet to send.
421 
422 	DatagramPacket dpReply =
423 	    new DatagramPacket(inbuf, inbuf.length);
424 	DatagramPacket dpRequest =
425 	    new DatagramPacket(outbuf, outbuf.length, addr, Defaults.iSLPPort);
426 	int[] timeouts = config.getDatagramTimeouts();
427 
428 	// Resend for number of timeouts in timeout interval.
429 
430 	int i;
431 
432 	for (i = 0; i < timeouts.length; i++) {
433 
434 	    // Catch timeout and IO errors.
435 
436 	    try {
437 
438 		ds.setSoTimeout(timeouts[i]);
439 		ds.send(dpRequest);
440 		ds.receive(dpReply);
441 
442 		// Process result into a reply object.
443 
444 		DataInputStream dis =
445 		    new DataInputStream(
446 			new ByteArrayInputStream(dpReply.getData()));
447 
448 		rply = internalize(dis, addr);
449 		break;
450 
451 	    } catch (InterruptedIOException ex) {
452 
453 		// Did not get it on the first timeout, try again.
454 
455 		if (config.traceDrop()|| config.traceDATraffic()) {
456 		    config.writeLog("udp_timeout",
457 				    new Object[] {addr});
458 
459 		}
460 
461 		continue;
462 
463 	    } catch (IOException ex) {
464 		Object[] message = {addr, ex.getMessage()};
465 
466 		if (config.traceDrop() || config.traceDATraffic()) {
467 		    config.writeLog("datagram_io_error",
468 				    message);
469 
470 		}
471 
472 		throw
473 		    new ServiceLocationException(
474 				ServiceLocationException.NETWORK_ERROR,
475 				"datagram_io_error",
476 				message);
477 
478 	    }
479 	}
480 
481 	// If nothing, then we've timed out. DAs with no matching
482 	//  info should at least return a reply.
483 
484 	if (rply == null) {
485 	    throw
486 		new ServiceLocationException(
487 				ServiceLocationException.NETWORK_TIMED_OUT,
488 				"udp_timeout",
489 				new Object[] {addr});
490 
491 	}
492 
493 	return rply;
494     }
495 
496     /**
497      * Transact a message using TCP, since the reply was too big.
498      * @parameter addr Address of the DA to contact.
499      * @parameter msg  The message object to use.
500      * @parameter cacheIt Cache socket, if new.
501      * @return  The SrvLocMsg returned if any.
502      * @exception ServiceLocationException
503      *            If results cannot be obtained in the timeout interval
504      *            specified in the 'config.'
505      *            If networking resources cannot be obtained or used
506      *            effectively.
507      */
508 
509     static SrvLocMsg
510 	transactTCPMsg(InetAddress addr, SrvLocMsg msg, boolean cacheIt)
511 	throws ServiceLocationException {
512 
513 	// Get the config object if we need it.
514 
515 	if (config == null) {
516 	    config = SLPConfig.getSLPConfig();
517 
518 	}
519 
520 	SrvLocMsg rply = null;
521 
522 	try {
523 
524 	    // Transact the message, taking care of socket caching.
525 
526 	    rply = transactMsg(addr, msg, cacheIt, true);
527 
528 	} catch (InterruptedIOException ex) {
529 	    Object[] message = {addr};
530 
531 	    if (config.traceDrop()|| config.traceDATraffic()) {
532 		config.writeLog("tcp_timeout",
533 				message);
534 
535 	    }
536 
537 	    throw
538 		new ServiceLocationException(
539 				ServiceLocationException.NETWORK_TIMED_OUT,
540 				"tcp_timeout",
541 				message);
542 
543 	} catch (IOException ex) {
544 	    Object[] message = {addr, ex.getMessage()};
545 
546 	    if (config.traceDrop() || config.traceDATraffic()) {
547 		config.writeLog("tcp_io_error",
548 				message);
549 
550 	    }
551 
552 	    throw
553 		new ServiceLocationException(
554 				ServiceLocationException.NETWORK_ERROR,
555 				"tcp_io_error",
556 				message);
557 
558 	}
559 
560 	// Filter reply for nulls, invalid xid.
561 
562 	if (!filterRply(msg, rply, addr)) {
563 	    return null;
564 
565 	}
566 
567 	return rply;
568     }
569 
570     // Uncache a socket.
571 
572     static private void uncacheSocket(InetAddress addr, Socket s) {
573 
574 	try {
575 
576 	    s.close();
577 
578 	} catch (IOException ex) {
579 
580 	}
581 
582 	TCPSocketCache.remove(addr);
583 
584     }
585 
586     // Get a (possibly cached) TCP socket, cache it if cache is on.
587 
588     static private Socket getTCPSocket(InetAddress addr, boolean cacheIt)
589 	throws IOException {
590 
591 	Socket s = null;
592 
593 	// We use the cached socket if we've got it.
594 
595 	s = (Socket)TCPSocketCache.get(addr);
596 
597 	if (s == null) {
598 	    s = new Socket(addr, Defaults.iSLPPort);
599 
600 	    // Set it so the socket will block for fixed timeout.
601 
602 	    s.setSoTimeout(config.getTCPTimeout());
603 
604 	}
605 
606 	// We cache it if we're supposed to.
607 
608 	if (cacheIt) {
609 	    TCPSocketCache.put(addr, s);
610 
611 	}
612 
613 	return s;
614     }
615 
616     // Transact the message, using cached socket if necessary. Retry if
617     //  flag is true.
618 
619     static private SrvLocMsg
620 	transactMsg(InetAddress addr,
621 		    SrvLocMsg msg,
622 		    boolean cacheIt,
623 		    boolean retry)
624 	throws InterruptedIOException, IOException, ServiceLocationException {
625 
626 	Socket s = null;
627 	byte outbuf[] = getBytes(msg, false, true);
628 
629 	try {
630 
631 	    s = getTCPSocket(addr, cacheIt);
632 
633 	    DataOutputStream dos = new DataOutputStream(s.getOutputStream());
634 	    DataInputStream dis = new DataInputStream(s.getInputStream());
635 
636 	    // In case the server cuts us off...
637 
638 	    try {
639 
640 		// Only one thread at a time gets to use this socket, in case
641 		//  it was cached. Otherwise, we *may* get interleaved i/o.
642 
643 		synchronized (s) {
644 
645 		    // Send the request.
646 
647 		    dos.write(outbuf, 0, outbuf.length);
648 
649 		    // Read reply.
650 
651 		    return internalize(dis, addr);
652 
653 		}
654 
655 	    } catch (IOException ex) {
656 
657 		// Uncache it, get a new one. If that one doesn't work, we're
658 		//  hosed.
659 
660 		uncacheSocket(addr, s);
661 
662 		s = null;
663 
664 		if (!retry) {
665 		    throw ex;
666 
667 		}
668 
669 		// Recursively call ourselves to take care of this, but
670 		//  don't retry it.
671 
672 		return transactMsg(addr, msg, cacheIt, false);
673 
674 	    }
675 
676 	} finally {
677 
678 	    if (s != null && !cacheIt) {
679 		uncacheSocket(addr, s);
680 
681 	    }
682 	}
683     }
684 
685     // Externalize the message into bytes.
686 
687     static protected byte[] getBytes(SrvLocMsg slm,
688 				     boolean isMulti,
689 				     boolean isTCP)
690 	throws ServiceLocationException {
691 
692 	ByteArrayOutputStream baos = new ByteArrayOutputStream();
693 	SrvLocHeader hdr = slm.getHeader();
694 
695 	hdr.externalize(baos, isMulti, isTCP);
696 
697 	byte[] outbuf = baos.toByteArray();
698 
699 	// Check if it excceds the output buffer length.
700 
701 	if (hdr.overflow) {
702 	    throw
703 		new ServiceLocationException(
704 				ServiceLocationException.BUFFER_OVERFLOW,
705 				"buffer_overflow",
706 				new Object[] {
707 		    new Integer(outbuf.length),
708 			new Integer(config.getMTU())});
709 	}
710 
711 	return outbuf;
712     }
713 
714     // Filter the reply to make sure the xid matches and that it's not null.
715 
716     static protected boolean
717 	filterRply(SrvLocMsg msg, SrvLocMsg rply, InetAddress addr) {
718 
719 	SrvLocHeader mhdr = msg.getHeader();
720 	SrvLocHeader rhdr = rply.getHeader();
721 
722 	if (rply == null) {
723 	    if (config.traceDrop()) {
724 		config.writeLog("reply_unparsable",
725 				new Object[] {addr});
726 
727 	    }
728 
729 	    return false;
730 
731 	}
732 
733 	// Check for invalid xid.
734 
735 	if (mhdr.xid != rhdr.xid) {
736 	    if (config.traceDrop()) {
737 		config.writeLog("wrong_xid",
738 				new Object[] {addr});
739 
740 	    }
741 	    return false;
742 
743 	}
744 	return true;
745 
746     }
747 
748     /**
749      * Internalize the byte array in the input stream into a SrvLocMsg
750      * subclass. It will be an appropriate subclass for the client agent.
751      * If an exception comes out of this method, it is converted into
752      * a SrvLocMsg with error code.
753      *
754      *
755      * @param dis The input stream containing the packet.
756      * @param addr The address of the replying agent (for error reporting).
757      * @return The right SrvLocMsg subclass appropriate for the Client Agent.
758      *		If null is returned, the function code wasn't recognized,
759      *		and so it may be appropriate for another agent.
760      * @exception ServiceLocationException If the character set was not valid
761      *		or an error occured during parsing.
762      * @exception IOException If DataInputStream throws it.
763      */
764 
765     static protected SrvLocMsg internalize(DataInputStream dis,
766 					   InetAddress addr)
767 	throws ServiceLocationException {
768 
769 	int ver = 0, fun = 0;
770 	SrvLocMsg msg = null;
771 	SrvLocHeader hdr = null;
772 	byte[] b = new byte[2];
773 
774 	try {
775 
776 	    dis.readFully(b, 0, 2);
777 
778 	    ver = (int) ((char)b[0] & 0XFF);
779 	    fun = (int) ((char)b[1] & 0XFF);
780 
781 	    // Unrecognized version number if header not returned.
782 
783 	    if (ver != Defaults.version) {
784 		throw
785 		    new ServiceLocationException(
786 				ServiceLocationException.VERSION_NOT_SUPPORTED,
787 				"version_number_error",
788 				new Object[] {new Integer(ver)});
789 
790 	    }
791 
792 	    // Create the header. Note that we only need to create a
793 	    //  client side header here, because that is all that
794 	    //  will be expected by the client side code. Note that we
795 	    //  *can't* use the SrvLocHeader.newInstance() mechanism
796 	    //  because Transact lives in the server as well, and
797 	    //  SrvLocHeader can only handle one header class per
798 	    //  version.
799 
800 	    hdr = new SLPHeaderV2();
801 
802 	    // Parse header.
803 
804 	    hdr.parseHeader(fun, dis);
805 
806 	    // Parse body.
807 
808 	    if ((msg = hdr.parseMsg(dis)) != null) {
809 
810 		// Parse options, if any.
811 
812 		hdr.parseOptions(dis);
813 
814 	    }
815 
816 	} catch (IllegalArgumentException ex) {
817 
818 	    // During parsing, this can be thrown if syntax errors occur.
819 
820 	    throw
821 		new ServiceLocationException(
822 				ServiceLocationException.PARSE_ERROR,
823 				"passthrough_addr",
824 				new Object[] {ex.getMessage(), addr});
825 
826 	} catch (IOException ex) {
827 
828 	    // If version code is zero, then we suspect a network error,
829 	    //  otherwise, it is probably a parse error.
830 
831 	    String fcode = (fun == 0 ? "???":Integer.toString(fun));
832 	    short exCode =
833 		(ver == 0 ? ServiceLocationException.NETWORK_ERROR:
834 		 ServiceLocationException.PARSE_ERROR);
835 
836 	    // During parsing, this can be thrown if the message stream
837 	    //  is improperly formatted.
838 
839 	    throw
840 		new ServiceLocationException(exCode,
841 					     "ioexception_parsing",
842 					     new Object[] {
843 		    ex, fcode, addr, ex.getMessage()});
844 
845 	} catch (ServiceLocationException ex) {
846 
847 	    // Add the address of the replying agent.
848 
849 	    throw
850 		new ServiceLocationException(ex.getErrorCode(),
851 					     "passthrough_addr",
852 					     new Object[] {
853 		    ex.getMessage(), addr});
854 
855 	}
856 
857 	return msg;
858     }
859 
860     // Send out the message.
861 
862     static protected void
863 	send(DatagramSocket ds, SrvLocMsg msg, InetAddress addr)
864 	throws ServiceLocationException, IOException {
865 
866 	byte[] outbuf = getBytes(msg, true, false);
867 	DatagramPacket dpsend =
868 	    new DatagramPacket(outbuf, outbuf.length, addr, Defaults.iSLPPort);
869 	ds.send(dpsend);
870 
871     }
872 
873     // Check the response and add the previous responder if it is OK.
874 
875     static protected boolean
876 	addPreviousResponder(SrvLocMsg msg, InetAddress addr) {
877 
878 	// Add incoming result to the vector.
879 
880 	SrvLocHeader hdr = msg.getHeader();
881 	Vector v = hdr.previousResponders;
882 	String srcAddr = addr.getHostAddress();
883 
884 	if (v.contains(srcAddr)) {	// the SRC ignored its PR list
885 	    if (config.traceDrop()) {
886 		config.writeLog("drop_pr",
887 				new Object[] {
888 		    srcAddr,
889 			Integer.toHexString(hdr.xid)});
890 
891 	    }
892 	    return false;
893 
894 	} else {
895 	    hdr.addPreviousResponder(addr);
896 	    return true;
897 
898 	}
899     }
900 
901     // Transact an active request for DA or SA adverts.
902 
903     static Vector transactActiveAdvertRequest(ServiceType type,
904 					      SrvLocMsg rqst,
905 					      ServerDATable daTable)
906 	throws ServiceLocationException {
907 
908 	// Perform active advertisement.
909 
910 	Vector ret = new Vector();
911 	Vector results = new Vector();
912 
913 	// Create Transact object and start.
914 
915 	Transact tran = new Transact(rqst,
916 				     results,
917 				     config.getMulticastTimeouts(),
918 				     Integer.MAX_VALUE, // config doesn't apply
919 				     config.getMulticastAddress(),
920 				     true);
921 
922 	Thread multiThread = new Thread(tran);
923 
924 	multiThread.start();
925 
926 	// Wait until the TransactConverge thread is done, if necessary.
927 
928 	try {
929 	    multiThread.join();
930 
931 	} catch (InterruptedException ex) {
932 
933 	}
934 
935 	ServiceLocationException ex = tran.exErr;
936 
937 	// Report error.
938 
939 	if (ex != null && config.traceDATraffic()) {
940 	    config.writeLog("sdat_active_err",
941 			    new Object[] {new Integer(ex.getErrorCode()),
942 					      ex.getMessage()});
943 
944 	    throw ex;
945 
946 	}
947 
948 	// Process the results.
949 
950 	int i, n = results.size();
951 
952 	for (i = 0; i < n; i++) {
953 	    Object msg = results.elementAt(i);
954 
955 	    if ((type.equals(Defaults.DA_SERVICE_TYPE) &&
956 		!(msg instanceof CDAAdvert)) ||
957 		(type.equals(Defaults.SA_SERVICE_TYPE) &&
958 		!(msg instanceof CSAAdvert))) {
959 
960 		if (config.traceDrop()) {
961 		    config.writeLog("sdat_nonadvert_err",
962 				    new Object[] {
963 			msg});
964 
965 		}
966 
967 		continue;
968 	    }
969 
970 	    // Let DA table handle it if it`s a DAAdvert.
971 
972 	    if (type.equals(Defaults.DA_SERVICE_TYPE)) {
973 		CDAAdvert advert = (CDAAdvert)msg;
974 
975 		daTable.handleAdvertIn(advert);
976 
977 	    } else {
978 
979 		// Add scopes from the SAAdvert if not already there.
980 
981 		SrvLocHeader hdr = ((SrvLocMsg)msg).getHeader();
982 
983 		int j, m = hdr.scopes.size();
984 
985 		for (j = 0; j < m; j++) {
986 		    Object o = hdr.scopes.elementAt(j);
987 
988 		    if (!ret.contains(o)) {
989 			ret.addElement(o);
990 
991 		    }
992 		}
993 	    }
994 	}
995 
996 	return ret;
997     }
998 
999     // Construct a Transact object to run a convergence transaction in
1000     //  a separate thread.
1001 
1002     Transact(SrvLocMsg msg,
1003 	     Vector ret,
1004 	     int[] msT,
1005 	     int mResults,
1006 	     InetAddress address,
1007 	     boolean continueAfterFound) {
1008 
1009 	msgOut = msg;
1010 	returns = ret;
1011 	MSTimeouts = msT;
1012 	maxResults = mResults;
1013 	this.address = address;
1014 	this.continueAfterFound = continueAfterFound;
1015     }
1016 
1017     // Run the multicast convergence algorithm.
1018 
1019     public void run() {
1020 
1021 	Exception xes = null;
1022 	DatagramSocket ds = null;
1023 
1024 	// Get the config object if we need it.
1025 
1026 	if (config == null) {
1027 	    config = SLPConfig.getSLPConfig();
1028 
1029 	}
1030 
1031 	// Set thread name.
1032 
1033 	if (config.isBroadcastOnly()) {
1034 	    Thread.currentThread().setName("SLP Broadcast Transact");
1035 	    address = config.getBroadcastAddress();
1036 
1037 	} else {
1038 	    Thread.currentThread().setName("SLP Multicast Transact");
1039 
1040 	}
1041 
1042 	try {
1043 
1044 	    // Multicast out on the default interface only.
1045 
1046 	    ds = config.getMulticastSocketOnInterface(config.getLocalHost(),
1047 						      true);
1048 
1049 	    // Perform convergence.
1050 
1051 	    transactConvergeMsg(address,
1052 				ds,
1053 				msgOut,
1054 				returns,
1055 				MSTimeouts,
1056 				maxResults,
1057 				continueAfterFound);
1058 
1059 	    ds.close();
1060 
1061 	    ds = null;
1062 
1063 	} catch (ServiceLocationException ex) {
1064 
1065 	    // Ignore DA_BUSY, the DA may free up later.
1066 
1067 	    if (ex.getErrorCode() != ServiceLocationException.DA_BUSY) {
1068 		exErr = ex;
1069 		xes = ex;
1070 
1071 	    }
1072 
1073 	} catch (Exception ex) {
1074 
1075 	    // Create new exception to be thrown.
1076 
1077 	    xes = ex;
1078 	    exErr = new ServiceLocationException(
1079 				ServiceLocationException.INTERNAL_SYSTEM_ERROR,
1080 				"passthrough",
1081 				new Object[] {ex.getMessage()});
1082 
1083 	} finally {
1084 
1085 	    // Close the socket if it's been opened.
1086 
1087 	    if (ds != null) {
1088 		ds.close();
1089 
1090 	    }
1091 	}
1092 
1093 	// Log any errors.
1094 
1095 	if (xes != null) {
1096 	    StringWriter sw = new StringWriter();
1097 	    PrintWriter pw = new PrintWriter(sw);
1098 
1099 	    xes.printStackTrace(pw);
1100 	    pw.flush();
1101 
1102 	    config.writeLog("multicast_error",
1103 			    new Object[] {xes.getMessage(),
1104 					      sw.toString()});
1105 
1106 	}
1107     }
1108 
1109     /**
1110      * Send the message using multicast and use convergence to gather the
1111      * results. Note that this routine must be synchronized because
1112      * only one multicast can be active at a time; othewise, the client
1113      * may get back an unexpected result. However, there can be many unicast
1114      * requests active along with a multicast request, hence the separate
1115      * thread for multicast.
1116      *
1117      * The subtlety of the timing routine is that it will only resend the
1118      * message when one of the multicast convergence timeout intervals
1119      * elapses.  Further, for efficiency, it will give up after a complete
1120      * interval has gone by without receiving any results.  This may mean
1121      * that the intervals have to be extended in larger networks.  In the
1122      * common case, multicast convergence will complete under 3 seconds
1123      * as all results will arrive during the first interval (1 second long)
1124      * and none will arrive during the second interval.
1125      *
1126      * @param     addr  The multicast/broadcast address to send the request to.
1127      * @param     ds  The datagram socket to send on.
1128      * @param     msg The message to send.
1129      * @param     vResult A vector in which to put the returns.
1130      * @param	msTimeouts Array of timeout values for multicast convergence.
1131      * @param     maxResults Maximum replies desired.
1132      * @param	continueAfterFound If true, continue after something is
1133      *				   found. Try three times if nothing was
1134      *				   found. If false, exit at the first
1135      *				   timeout. DA discovery should set this
1136      *				   to true so as many DAs as possible are
1137      *				   found, otherwise, it should be false.
1138      * @exception ServiceLocationException
1139      *            If results cannot be obtained in the timeout interval
1140      *            specified in the 'config.' or
1141      *            if networking resources cannot be obtained or used
1142      *            effectively.
1143      */
1144 
1145     static public void
1146 	transactConvergeMsg(InetAddress addr,
1147 			    DatagramSocket ds,
1148 			    SrvLocMsg   msg,
1149 			    Vector vResult,
1150 			    int[] msTimeouts,
1151 			    int maxResults,
1152 			    boolean continueAfterFound)
1153 	throws ServiceLocationException {
1154 
1155 	// Get the config object if we need it.
1156 
1157 	if (config == null) {
1158 	    config = SLPConfig.getSLPConfig();
1159 
1160 	}
1161 
1162 	int numReplies = 0;
1163 	int tries = 0;
1164 	SrvLocMsg rply = null;
1165 	ByteArrayOutputStream baos = null;
1166 	int multiMax = config.getMulticastMaximumWait();
1167 	long lStartTime = System.currentTimeMillis();
1168 	int mtu = config.getMTU();
1169 
1170 	try {
1171 
1172 	    // Send the request for the 1st iteration.  It will be sent again
1173 	    //  only when the timeout intervals elapse.
1174 
1175 	    send(ds, msg, addr);
1176 	    tries++;
1177 
1178 	    long lTimeSent = System.currentTimeMillis();
1179 
1180 	    // Continue collecting results only as long as we need more for
1181 	    //   the 'max results' configuration.
1182 
1183 	    while (numReplies < maxResults) {
1184 
1185 		// Set up the reply buffer.
1186 
1187 		byte [] incoming = new byte[mtu];
1188 		DatagramPacket dprecv =
1189 		    new DatagramPacket(incoming, incoming.length);
1190 
1191 		// Block on receive (no longer than max timeout - time spent).
1192 
1193 		int iTimeout =
1194 		    getTimeout(lStartTime, lTimeSent, multiMax, msTimeouts);
1195 
1196 		if (iTimeout < 0) {
1197 		    break; // we have no time left!
1198 		}
1199 
1200 		ds.setSoTimeout(iTimeout);
1201 
1202 		try {
1203 		    ds.receive(dprecv);
1204 
1205 		} catch (InterruptedIOException ex) {
1206 
1207 		    // We try sending at least three times, unless there was
1208 		    // a timeout. If continueAfterFound is false, we exit
1209 		    // after the first timeout if something was found.
1210 
1211 		    if ((!continueAfterFound && numReplies > 0) ||
1212 		(int)(System.currentTimeMillis() - lStartTime) > multiMax ||
1213 			tries >= 3) {
1214 			break;
1215 
1216 		    }
1217 
1218 		    // Now resend the request...
1219 
1220 		    send(ds, msg, addr);
1221 		    tries++;
1222 
1223 		    lTimeSent = System.currentTimeMillis();
1224 		    continue; // since we did not receive anything, continue...
1225 
1226 		}
1227 
1228 		// Data was received without timeout or fail.
1229 
1230 		DataInputStream dis =
1231 		    new DataInputStream(
1232 			new ByteArrayInputStream(dprecv.getData()));
1233 
1234 		InetAddress raddr = dprecv.getAddress();
1235 		rply = internalize(dis, raddr);
1236 
1237 		if (!filterRply(msg, rply, raddr)) {
1238 		    continue;
1239 
1240 		}
1241 
1242 		// Add this responder to previous responders. If the message
1243 		//  was already received but the SA resent because it isn't
1244 		//  doing multicast convergence correctly, then ignore it.
1245 
1246 		if (!addPreviousResponder(msg, raddr)) {
1247 		    continue;
1248 
1249 		}
1250 
1251 		// Handle any overflow thru TCP.
1252 
1253 		SrvLocHeader rhdr = rply.getHeader();
1254 
1255 		if (rhdr.overflow) {
1256 
1257 		    rply = transactTCPMsg(raddr, msg, false);
1258 
1259 		    if (rply == null) {
1260 			continue;
1261 
1262 		    }
1263 
1264 		    rhdr = rply.getHeader();
1265 		}
1266 
1267 		// Add response to list.
1268 
1269 		if (vResult.size() < maxResults) {
1270 		    vResult.addElement(rply);
1271 
1272 		}
1273 
1274 		// Increment the number of results returned.
1275 
1276 		numReplies += rhdr.iNumReplies;
1277 
1278 		// Exit if we should not continue.
1279 
1280 		if (!continueAfterFound) {
1281 		    break;
1282 
1283 		}
1284 	    }
1285 	} catch (ServiceLocationException ex) {
1286 
1287 	    // If we broke off because the previous responder's list is too
1288 	    // long, then return, otherwise throw the exception again.
1289 
1290 	    if (ex.getErrorCode() ==
1291 		ServiceLocationException.PREVIOUS_RESPONDER_OVERFLOW) {
1292 		return;
1293 
1294 	    }
1295 
1296 	    throw ex;
1297 
1298 	} catch (IOException ex) {
1299 
1300 	    throw
1301 		new ServiceLocationException(
1302 				ServiceLocationException.NETWORK_ERROR,
1303 				"ioexception_conv",
1304 				new Object[] {ex, ex.getMessage()});
1305 
1306 	}
1307     }
1308 
1309     // Calculate the multicast timeout depending on where we are in the loop.
1310 
1311     static private int
1312 	getTimeout(long lStart, long lSent, int iTimeout, int[] a_iTOs) {
1313 	int iTotal = (int)(lSent - lStart);
1314 
1315 	if (iTimeout < iTotal) {
1316 	    return -1;
1317 
1318 	}
1319 
1320 	int iWaitTotal = 0;
1321 	int i;
1322 
1323 	for (i = 0; i < a_iTOs.length; i++) {
1324 	    iWaitTotal += a_iTOs[i];
1325 
1326 	    int iTillNext = (iWaitTotal - iTotal);
1327 
1328 	    if (iTotal < iWaitTotal) {
1329 		if (iTimeout < (iTotal + iTillNext)) {
1330 		    return (iTimeout - iTotal);  // max to wait is iTimeout
1331 
1332 		} else {
1333 		    return iTillNext; // otherwise wait till next interval
1334 		}
1335 	    }
1336 	}
1337 
1338 	return -1; // if we get here we have waited past all of the timeouts
1339     }
1340 
1341     static {
1342 
1343 	config = SLPConfig.getSLPConfig();
1344 
1345     }
1346 }
1347