xref: /titanic_44/usr/src/uts/sun4v/io/ds_common.c (revision 455859fb7ded7d1b5b1bac9cdcdf36c456fdfb1b)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * Domain Services Module Common Code.
29  *
30  * This module is intended to be used by both Solaris and the VBSC
31  * module.
32  */
33 
34 #include <sys/modctl.h>
35 #include <sys/ksynch.h>
36 #include <sys/taskq.h>
37 #include <sys/disp.h>
38 #include <sys/cmn_err.h>
39 #include <sys/note.h>
40 #include <sys/mach_descrip.h>
41 #include <sys/mdesc.h>
42 #include <sys/ldc.h>
43 #include <sys/ds.h>
44 #include <sys/ds_impl.h>
45 
46 #ifndef MIN
47 #define	MIN(a, b)	((a) < (b) ? (a) : (b))
48 #endif
49 
50 #define	DS_DECODE_BUF_LEN		30
51 
52 /*
53  * All DS ports in the system
54  *
55  * The list of DS ports is read in from the MD when the DS module is
56  * initialized and is never modified. This eliminates the need for
57  * locking to access the port array itself. Access to the individual
58  * ports are synchronized at the port level.
59  */
60 ds_port_t	ds_ports[DS_MAX_PORTS];
61 ds_portset_t	ds_allports;	/* all DS ports in the system */
62 
63 /*
64  * Table of registered services
65  *
66  * Locking: Accesses to the table of services are synchronized using
67  *   a mutex lock. The reader lock must be held when looking up service
68  *   information in the table. The writer lock must be held when any
69  *   service information is being modified.
70  */
71 ds_svcs_t	ds_svcs;
72 
73 /*
74  * Flag to prevent callbacks while in the middle of DS teardown.
75  */
76 boolean_t ds_enabled = B_FALSE;	/* enable/disable taskq processing */
77 
78 /*
79  * Retry count and delay for LDC reads and writes
80  */
81 #ifndef DS_DEFAULT_RETRIES
82 #define	DS_DEFAULT_RETRIES	10000	/* number of times to retry */
83 #endif
84 #ifndef DS_DEFAULT_DELAY
85 #define	DS_DEFAULT_DELAY	1000	/* usecs to wait between retries */
86 #endif
87 
88 static int ds_retries = DS_DEFAULT_RETRIES;
89 static clock_t ds_delay = DS_DEFAULT_DELAY;
90 
91 /*
92  * Supported versions of the DS message protocol
93  *
94  * The version array must be sorted in order from the highest
95  * supported version to the lowest. Support for a particular
96  * <major>.<minor> version implies all lower minor versions of
97  * that same major version are supported as well.
98  */
99 static ds_ver_t ds_vers[] = { { 1, 0 } };
100 
101 #define	DS_NUM_VER	(sizeof (ds_vers) / sizeof (ds_vers[0]))
102 
103 
104 /* incoming message handling functions */
105 typedef void (*ds_msg_handler_t)(ds_port_t *port, caddr_t buf, size_t len);
106 static void ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len);
107 static void ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len);
108 static void ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len);
109 static void ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len);
110 static void ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len);
111 static void ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len);
112 static void ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len);
113 static void ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len);
114 static void ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len);
115 static void ds_handle_data(ds_port_t *port, caddr_t buf, size_t len);
116 static void ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len);
117 
118 /*
119  * DS Message Handler Dispatch Table
120  *
121  * A table used to dispatch all incoming messages. This table
122  * contains handlers for all the fixed message types, as well as
123  * the the messages defined in the 1.0 version of the DS protocol.
124  * The handlers are indexed based on the DS header msg_type values
125  */
126 static const ds_msg_handler_t ds_msg_handlers[] = {
127 	ds_handle_init_req,		/* DS_INIT_REQ */
128 	ds_handle_init_ack,		/* DS_INIT_ACK */
129 	ds_handle_init_nack,		/* DS_INIT_NACK */
130 	ds_handle_reg_req,		/* DS_REG_REQ */
131 	ds_handle_reg_ack,		/* DS_REG_ACK */
132 	ds_handle_reg_nack,		/* DS_REG_NACK */
133 	ds_handle_unreg_req,		/* DS_UNREG */
134 	ds_handle_unreg_ack,		/* DS_UNREG_ACK */
135 	ds_handle_unreg_nack,		/* DS_UNREG_NACK */
136 	ds_handle_data,			/* DS_DATA */
137 	ds_handle_nack			/* DS_NACK */
138 };
139 
140 
141 
142 /* initialization functions */
143 static int ds_ldc_init(ds_port_t *port);
144 
145 /* event processing functions */
146 static uint_t ds_ldc_cb(uint64_t event, caddr_t arg);
147 static int ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep);
148 static void ds_handle_up_event(ds_port_t *port);
149 static void ds_handle_down_reset_events(ds_port_t *port);
150 static void ds_handle_recv(void *arg);
151 static void ds_dispatch_event(void *arg);
152 
153 /* message sending functions */
154 static int ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen);
155 static int ds_send_reg_req(ds_svc_t *svc, ds_port_t *port);
156 static void ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
157 static void ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
158 
159 /* walker functions */
160 static int ds_svc_isfree(ds_svc_t *svc, void *arg);
161 static int ds_svc_unregister(ds_svc_t *svc, void *arg);
162 static int ds_svc_port_up(ds_svc_t *svc, void *arg);
163 
164 /* service utilities */
165 static void ds_reset_svc(ds_svc_t *svc, ds_port_t *port);
166 static int ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port);
167 
168 /* port utilities */
169 static void ds_port_reset(ds_port_t *port);
170 static ldc_status_t ds_update_ldc_state(ds_port_t *port);
171 
172 /* misc utilities */
173 static void min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
174     uint16_t *min_major, uint16_t *max_major);
175 
176 /* debug */
177 static char *decode_ldc_events(uint64_t event, char *buf);
178 
179 /* loopback */
180 static void ds_loopback_register(ds_svc_hdl_t hdl);
181 static void ds_loopback_unregister(ds_svc_hdl_t hdl);
182 static void ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen);
183 static int ds_loopback_set_svc(ds_svc_t *svc, ds_svc_hdl_t lb_hdl);
184 
185 /* client handling */
186 static int i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
187     uint_t maxhdls);
188 static ds_svc_t *ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl,
189     ds_port_t *port);
190 static ds_svc_t *ds_find_svc_by_id_port(char *svc_id, int is_client,
191     ds_port_t *port);
192 static ds_svc_t *ds_svc_clone(ds_svc_t *svc);
193 static void ds_portset_del_active_clients(char *service, ds_portset_t *portsp);
194 static void ds_check_for_dup_services(ds_svc_t *svc);
195 static void ds_delete_svc_entry(ds_svc_t *svc);
196 
197 char *
198 ds_strdup(char *str)
199 {
200 	char *newstr;
201 
202 	newstr = DS_MALLOC(strlen(str) + 1);
203 	(void) strcpy(newstr, str);
204 	return (newstr);
205 }
206 
207 void
208 ds_common_init(void)
209 {
210 	/* Validate version table */
211 	ASSERT(ds_vers_isvalid(ds_vers, DS_NUM_VER) == DS_VERS_OK);
212 
213 	/* Initialize services table */
214 	ds_init_svcs_tbl(DS_MAXSVCS_INIT);
215 
216 	/* enable callback processing */
217 	ds_enabled = B_TRUE;
218 }
219 
220 /* BEGIN LDC SUPPORT FUNCTIONS */
221 
222 static char *
223 decode_ldc_events(uint64_t event, char *buf)
224 {
225 	buf[0] = 0;
226 	if (event & LDC_EVT_DOWN)	(void) strcat(buf, " DOWN");
227 	if (event & LDC_EVT_RESET)	(void) strcat(buf, " RESET");
228 	if (event & LDC_EVT_UP)		(void) strcat(buf, " UP");
229 	if (event & LDC_EVT_READ)	(void) strcat(buf, " READ");
230 	if (event & LDC_EVT_WRITE)	(void) strcat(buf, " WRITE");
231 	return (buf);
232 }
233 
234 static ldc_status_t
235 ds_update_ldc_state(ds_port_t *port)
236 {
237 	ldc_status_t	ldc_state;
238 	int		rv;
239 	char		ebuf[DS_EBUFSIZE];
240 
241 	ASSERT(MUTEX_HELD(&port->lock));
242 
243 	/*
244 	 * Read status and update ldc state info in port structure.
245 	 */
246 	if ((rv = ldc_status(port->ldc.hdl, &ldc_state)) != 0) {
247 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_status error: %s" DS_EOL,
248 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
249 		ldc_state = port->ldc.state;
250 	} else {
251 		port->ldc.state = ldc_state;
252 	}
253 
254 	return (ldc_state);
255 }
256 
257 static void
258 ds_handle_down_reset_events(ds_port_t *port)
259 {
260 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
261 	    __func__);
262 
263 	mutex_enter(&ds_svcs.lock);
264 	mutex_enter(&port->lock);
265 
266 	ds_sys_drain_events(port);
267 
268 	(void) ds_update_ldc_state(port);
269 
270 	/* reset the port state */
271 	ds_port_reset(port);
272 
273 	/* acknowledge the reset */
274 	(void) ldc_up(port->ldc.hdl);
275 
276 	mutex_exit(&port->lock);
277 	mutex_exit(&ds_svcs.lock);
278 
279 	ds_handle_up_event(port);
280 
281 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
282 }
283 
284 static void
285 ds_handle_up_event(ds_port_t *port)
286 {
287 	ldc_status_t	ldc_state;
288 
289 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
290 	    __func__);
291 
292 	mutex_enter(&port->lock);
293 
294 	ldc_state = ds_update_ldc_state(port);
295 
296 	mutex_exit(&port->lock);
297 
298 	if ((ldc_state == LDC_UP) && IS_DS_PORT(port)) {
299 		/*
300 		 * Initiate the handshake.
301 		 */
302 		ds_send_init_req(port);
303 	}
304 
305 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
306 }
307 
308 static uint_t
309 ds_ldc_cb(uint64_t event, caddr_t arg)
310 {
311 	ds_port_t	*port = (ds_port_t *)arg;
312 	char		evstring[DS_DECODE_BUF_LEN];
313 
314 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: %s event (%llx) received" DS_EOL,
315 	    PORTID(port), __func__, decode_ldc_events(event, evstring),
316 	    (u_longlong_t)event);
317 
318 	if (!ds_enabled) {
319 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: callback handling is disabled"
320 		    DS_EOL, PORTID(port), __func__);
321 		return (LDC_SUCCESS);
322 	}
323 
324 	if (event & (LDC_EVT_DOWN | LDC_EVT_RESET)) {
325 		ds_handle_down_reset_events(port);
326 		goto done;
327 	}
328 
329 	if (event & LDC_EVT_UP) {
330 		ds_handle_up_event(port);
331 	}
332 
333 	if (event & LDC_EVT_READ) {
334 		if (port->ldc.state != LDC_UP) {
335 			cmn_err(CE_WARN, "ds@%lx: %s: LDC READ event while "
336 			    "port not up" DS_EOL, PORTID(port), __func__);
337 			goto done;
338 		}
339 
340 		if (ds_sys_dispatch_func(ds_handle_recv, port)) {
341 			cmn_err(CE_WARN, "ds@%lx: error initiating LDC READ "
342 			    " event", PORTID(port));
343 		}
344 	}
345 
346 	if (event & LDC_EVT_WRITE) {
347 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: LDC WRITE event received, "
348 		    "not supported" DS_EOL, PORTID(port), __func__);
349 	}
350 
351 	if (event & ~(LDC_EVT_UP | LDC_EVT_READ)) {
352 		cmn_err(CE_WARN, "ds@%lx: %s: Unexpected LDC event received: "
353 		    "0x%llx" DS_EOL, PORTID(port), __func__,
354 		    (u_longlong_t)event);
355 	}
356 done:
357 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
358 
359 	return (LDC_SUCCESS);
360 }
361 
362 static int
363 ds_ldc_init(ds_port_t *port)
364 {
365 	int		rv;
366 	ldc_attr_t	ldc_attr;
367 	caddr_t		ldc_cb_arg = (caddr_t)port;
368 	char		ebuf[DS_EBUFSIZE];
369 
370 	ASSERT(MUTEX_HELD(&port->lock));
371 
372 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%lld" DS_EOL,
373 	    PORTID(port), __func__, (u_longlong_t)port->ldc.id);
374 
375 	ldc_attr.devclass = LDC_DEV_GENERIC;
376 	ldc_attr.instance = 0;
377 	ldc_attr.mode = LDC_MODE_RELIABLE;
378 	ldc_attr.mtu = DS_STREAM_MTU;
379 
380 	if ((rv = ldc_init(port->ldc.id, &ldc_attr, &port->ldc.hdl)) != 0) {
381 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_id: %lx, ldc_init error: %s"
382 		    DS_EOL, PORTID(port), __func__, port->ldc.id,
383 		    ds_errno_to_str(rv, ebuf));
384 		return (rv);
385 	}
386 
387 	rv = ldc_reg_callback(port->ldc.hdl, ds_ldc_cb, ldc_cb_arg);
388 	if (rv != 0) {
389 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_reg_callback error: %s"
390 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
391 		return (rv);
392 	}
393 
394 	ds_sys_ldc_init(port);
395 	return (0);
396 }
397 
398 int
399 ds_ldc_fini(ds_port_t *port)
400 {
401 	int	rv;
402 	char	ebuf[DS_EBUFSIZE];
403 
404 	ASSERT(port->state >= DS_PORT_LDC_INIT);
405 
406 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%ld" DS_EOL, PORTID(port),
407 	    __func__, port->ldc.id);
408 
409 	if ((rv = ldc_close(port->ldc.hdl)) != 0) {
410 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_close error: %s" DS_EOL,
411 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
412 		return (rv);
413 	}
414 
415 	if ((rv = ldc_unreg_callback(port->ldc.hdl)) != 0) {
416 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_unreg_callback error: %s"
417 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
418 		return (rv);
419 	}
420 
421 	if ((rv = ldc_fini(port->ldc.hdl)) != 0) {
422 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_fini error: %s" DS_EOL,
423 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
424 		return (rv);
425 	}
426 
427 	return (rv);
428 }
429 
430 /*
431  * Attempt to read a specified number of bytes from a particular LDC.
432  * Returns zero for success or the return code from the LDC read on
433  * failure. The actual number of bytes read from the LDC is returned
434  * in the size parameter.
435  */
436 static int
437 ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep)
438 {
439 	int	rv = 0;
440 	size_t	bytes_req = *sizep;
441 	size_t	bytes_left = bytes_req;
442 	size_t	nbytes;
443 	int	retry_count = 0;
444 	char	ebuf[DS_EBUFSIZE];
445 
446 	ASSERT(MUTEX_HELD(&port->rcv_lock));
447 
448 	*sizep = 0;
449 
450 	DS_DBG_LDC(CE_NOTE, "ds@%lx: attempting to read %ld bytes" DS_EOL,
451 	    PORTID(port), bytes_req);
452 
453 	while (bytes_left > 0) {
454 
455 		nbytes = bytes_left;
456 
457 		if ((rv = ldc_read(port->ldc.hdl, msgp, &nbytes)) != 0) {
458 			if (rv == ECONNRESET) {
459 				break;
460 			} else if (rv != EAGAIN) {
461 				cmn_err(CE_NOTE, "ds@%lx: %s: %s" DS_EOL,
462 				    PORTID(port), __func__,
463 				    ds_errno_to_str(rv, ebuf));
464 				break;
465 			}
466 		} else {
467 			if (nbytes != 0) {
468 				DS_DBG_LDC(CE_NOTE, "ds@%lx: "
469 				    "read %ld bytes, %d retries" DS_EOL,
470 				    PORTID(port), nbytes, retry_count);
471 
472 				*sizep += nbytes;
473 				msgp += nbytes;
474 				bytes_left -= nbytes;
475 
476 				/* reset counter on a successful read */
477 				retry_count = 0;
478 				continue;
479 			}
480 
481 			/*
482 			 * No data was read. Check if this is the
483 			 * first attempt. If so, just return since
484 			 * nothing has been read yet.
485 			 */
486 			if (bytes_left == bytes_req) {
487 				DS_DBG_LDC(CE_NOTE, "ds@%lx: read zero bytes, "
488 				    " no data available" DS_EOL, PORTID(port));
489 				break;
490 			}
491 		}
492 
493 		/*
494 		 * A retry is necessary because the read returned
495 		 * EAGAIN, or a zero length read occurred after
496 		 * reading a partial message.
497 		 */
498 		if (retry_count++ >= ds_retries) {
499 			DS_DBG_LDC(CE_NOTE, "ds@%lx: timed out waiting for "
500 			    "message" DS_EOL, PORTID(port));
501 			break;
502 		}
503 
504 		drv_usecwait(ds_delay);
505 	}
506 
507 	return (rv);
508 }
509 
510 static void
511 ds_handle_recv(void *arg)
512 {
513 	ds_port_t	*port = (ds_port_t *)arg;
514 	char		*hbuf;
515 	size_t		msglen;
516 	size_t		read_size;
517 	boolean_t	hasdata;
518 	ds_hdr_t	hdr;
519 	uint8_t		*msg;
520 	char		*currp;
521 	int		rv;
522 	ds_event_t	*devent;
523 
524 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s..." DS_EOL, PORTID(port), __func__);
525 
526 	/*
527 	 * Read messages from the channel until there are none
528 	 * pending. Valid messages are dispatched to be handled
529 	 * by a separate thread while any malformed messages are
530 	 * dropped.
531 	 */
532 
533 	mutex_enter(&port->rcv_lock);
534 
535 	while (((rv = ldc_chkq(port->ldc.hdl, &hasdata)) == 0) && hasdata) {
536 
537 		DS_DBG(CE_NOTE, "ds@%lx: %s: reading next message" DS_EOL,
538 		    PORTID(port), __func__);
539 
540 		/*
541 		 * Read in the next message.
542 		 */
543 		hbuf = (char *)&hdr;
544 		bzero(hbuf, DS_HDR_SZ);
545 		read_size = DS_HDR_SZ;
546 		currp = hbuf;
547 
548 		/* read in the message header */
549 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
550 			break;
551 		}
552 
553 		if (read_size < DS_HDR_SZ) {
554 			/*
555 			 * A zero length read is a valid signal that
556 			 * there is no data left on the channel.
557 			 */
558 			if (read_size != 0) {
559 				cmn_err(CE_WARN, "ds@%lx: invalid message "
560 				    "length, received %ld bytes, expected %ld"
561 				    DS_EOL, PORTID(port), read_size, DS_HDR_SZ);
562 			}
563 			continue;
564 		}
565 
566 		/* get payload size and allocate a buffer */
567 		read_size = ((ds_hdr_t *)hbuf)->payload_len;
568 		msglen = DS_HDR_SZ + read_size;
569 		msg = DS_MALLOC(msglen);
570 		if (!msg) {
571 			cmn_err(CE_WARN, "Memory allocation failed attempting "
572 			    " to allocate %d bytes." DS_EOL, (int)msglen);
573 			continue;
574 		}
575 
576 		DS_DBG(CE_NOTE, "ds@%lx: %s: message payload len %d" DS_EOL,
577 		    PORTID(port), __func__, (int)read_size);
578 
579 		/* move message header into buffer */
580 		(void) memcpy(msg, hbuf, DS_HDR_SZ);
581 		currp = (char *)(msg) + DS_HDR_SZ;
582 
583 		/* read in the message body */
584 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
585 			DS_FREE(msg, msglen);
586 			break;
587 		}
588 
589 		/* validate the size of the message */
590 		if ((DS_HDR_SZ + read_size) != msglen) {
591 			cmn_err(CE_WARN, "ds@%lx: %s: invalid message length, "
592 			    "received %ld bytes, expected %ld" DS_EOL,
593 			    PORTID(port), __func__, (DS_HDR_SZ + read_size),
594 			    msglen);
595 			DS_FREE(msg, msglen);
596 			continue;
597 		}
598 
599 		DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
600 
601 		/*
602 		 * Send the message for processing, and store it
603 		 * in the log. The memory is deallocated only when
604 		 * the message is removed from the log.
605 		 */
606 
607 		devent = DS_MALLOC(sizeof (ds_event_t));
608 		devent->port = port;
609 		devent->buf = (char *)msg;
610 		devent->buflen = msglen;
611 
612 		/* log the message */
613 		(void) ds_log_add_msg(DS_LOG_IN(port->id), msg, msglen);
614 
615 		if (ds_sys_dispatch_func(ds_dispatch_event, devent)) {
616 			cmn_err(CE_WARN, "ds@%lx: error initiating "
617 			    "event handler", PORTID(port));
618 			DS_FREE(devent, sizeof (ds_event_t));
619 		}
620 	}
621 
622 	mutex_exit(&port->rcv_lock);
623 
624 	/* handle connection reset errors returned from ds_recv_msg */
625 	if (rv == ECONNRESET) {
626 		ds_handle_down_reset_events(port);
627 	}
628 
629 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s done" DS_EOL, PORTID(port), __func__);
630 }
631 
632 static void
633 ds_dispatch_event(void *arg)
634 {
635 	ds_event_t	*event = (ds_event_t *)arg;
636 	ds_hdr_t	*hdr;
637 	ds_port_t	*port;
638 
639 	port = event->port;
640 
641 	hdr = (ds_hdr_t *)event->buf;
642 
643 	if (DS_MSG_TYPE_VALID(hdr->msg_type)) {
644 		DS_DBG(CE_NOTE, "ds@%lx: dispatch_event: msg_type=%d" DS_EOL,
645 		    PORTID(port), hdr->msg_type);
646 
647 		(*ds_msg_handlers[hdr->msg_type])(port, event->buf,
648 		    event->buflen);
649 	} else {
650 		cmn_err(CE_WARN, "ds@%lx: dispatch_event: invalid msg "
651 		    "type (%d)" DS_EOL, PORTID(port), hdr->msg_type);
652 	}
653 
654 	DS_FREE(event->buf, event->buflen);
655 	DS_FREE(event, sizeof (ds_event_t));
656 }
657 
658 int
659 ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen)
660 {
661 	int	rv;
662 	caddr_t	currp = msg;
663 	size_t	amt_left = msglen;
664 	int	loopcnt = 0;
665 
666 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s msglen: %ld" DS_EOL, PORTID(port),
667 	    __func__, msglen);
668 	DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
669 
670 	/*
671 	 * Ensure that no other messages can be sent on this port by holding
672 	 * the tx_lock mutex in case the write doesn't get sent with one write.
673 	 * This guarantees that the message doesn't become fragmented.
674 	 */
675 	mutex_enter(&port->tx_lock);
676 
677 	do {
678 		if ((rv = ldc_write(port->ldc.hdl, currp, &msglen)) != 0) {
679 			if (rv == ECONNRESET) {
680 				mutex_exit(&port->tx_lock);
681 				ds_handle_down_reset_events(port);
682 				return (rv);
683 			} else if ((rv == EWOULDBLOCK) &&
684 			    (loopcnt++ < ds_retries)) {
685 				drv_usecwait(ds_delay);
686 			} else {
687 				cmn_err(CE_WARN, "ds@%lx: send_msg: ldc_write "
688 				    "failed (%d), %d bytes remaining" DS_EOL,
689 				    PORTID(port), rv, (int)amt_left);
690 				goto error;
691 			}
692 		} else {
693 			amt_left -= msglen;
694 			currp += msglen;
695 			msglen = amt_left;
696 			loopcnt = 0;
697 		}
698 	} while (amt_left > 0);
699 error:
700 	mutex_exit(&port->tx_lock);
701 
702 	return (rv);
703 }
704 
705 /* END LDC SUPPORT FUNCTIONS */
706 
707 
708 /* BEGIN DS PROTOCOL SUPPORT FUNCTIONS */
709 
710 static void
711 ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len)
712 {
713 	ds_hdr_t	*hdr;
714 	ds_init_ack_t	*ack;
715 	ds_init_nack_t	*nack;
716 	char		*msg;
717 	size_t		msglen;
718 	ds_init_req_t	*req;
719 	size_t		explen = DS_MSG_LEN(ds_init_req_t);
720 	uint16_t	new_major;
721 	uint16_t	new_minor;
722 	boolean_t	match;
723 
724 	/* sanity check the incoming message */
725 	if (len != explen) {
726 		cmn_err(CE_WARN, "ds@%lx: <init_req: invalid message "
727 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
728 		    explen);
729 		return;
730 	}
731 
732 	req = (ds_init_req_t *)(buf + DS_HDR_SZ);
733 
734 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_req: ver=%d.%d" DS_EOL,
735 	    PORTID(port), req->major_vers, req->minor_vers);
736 
737 	match = negotiate_version(DS_NUM_VER, &ds_vers[0],
738 	    req->major_vers, &new_major, &new_minor);
739 
740 	/*
741 	 * Check version info. ACK only if the major numbers exactly
742 	 * match. The service entity can retry with a new minor
743 	 * based on the response sent as part of the NACK.
744 	 */
745 	if (match) {
746 		msglen = DS_MSG_LEN(ds_init_ack_t);
747 		msg = DS_MALLOC(msglen);
748 
749 		hdr = (ds_hdr_t *)msg;
750 		hdr->msg_type = DS_INIT_ACK;
751 		hdr->payload_len = sizeof (ds_init_ack_t);
752 
753 		ack = (ds_init_ack_t *)(msg + DS_HDR_SZ);
754 		ack->minor_vers = MIN(new_minor, req->minor_vers);
755 
756 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_ack>: minor=0x%04X" DS_EOL,
757 		    PORTID(port), MIN(new_minor, req->minor_vers));
758 	} else {
759 		msglen = DS_MSG_LEN(ds_init_nack_t);
760 		msg = DS_MALLOC(msglen);
761 
762 		hdr = (ds_hdr_t *)msg;
763 		hdr->msg_type = DS_INIT_NACK;
764 		hdr->payload_len = sizeof (ds_init_nack_t);
765 
766 		nack = (ds_init_nack_t *)(msg + DS_HDR_SZ);
767 		nack->major_vers = new_major;
768 
769 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_nack>: major=0x%04X" DS_EOL,
770 		    PORTID(port), new_major);
771 	}
772 
773 	/*
774 	 * Send the response
775 	 */
776 	(void) ds_send_msg(port, msg, msglen);
777 	DS_FREE(msg, msglen);
778 }
779 
780 static void
781 ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len)
782 {
783 	ds_init_ack_t	*ack;
784 	ds_ver_t	*ver;
785 	size_t		explen = DS_MSG_LEN(ds_init_ack_t);
786 
787 	/* sanity check the incoming message */
788 	if (len != explen) {
789 		cmn_err(CE_WARN, "ds@%lx: <init_ack: invalid message "
790 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
791 		    explen);
792 		return;
793 	}
794 
795 	ack = (ds_init_ack_t *)(buf + DS_HDR_SZ);
796 
797 	mutex_enter(&port->lock);
798 
799 	if (port->state != DS_PORT_INIT_REQ) {
800 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: invalid state: %d"
801 		    DS_EOL, PORTID(port), port->state);
802 		mutex_exit(&port->lock);
803 		return;
804 	}
805 
806 	ver = &(ds_vers[port->ver_idx]);
807 
808 	/* agreed upon a major version */
809 	port->ver.major = ver->major;
810 
811 	/*
812 	 * If the returned minor version is larger than
813 	 * the requested minor version, use the lower of
814 	 * the two, i.e. the requested version.
815 	 */
816 	if (ack->minor_vers >= ver->minor) {
817 		/*
818 		 * Use the minor version specified in the
819 		 * original request.
820 		 */
821 		port->ver.minor = ver->minor;
822 	} else {
823 		/*
824 		 * Use the lower minor version returned in
825 		 * the ack. By definition, all lower minor
826 		 * versions must be supported.
827 		 */
828 		port->ver.minor = ack->minor_vers;
829 	}
830 
831 	port->state = DS_PORT_READY;
832 
833 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: port ready v%d.%d" DS_EOL,
834 	    PORTID(port), port->ver.major, port->ver.minor);
835 
836 	mutex_exit(&port->lock);
837 
838 	/*
839 	 * The port came up, so update all the services
840 	 * with this information. Follow that up with an
841 	 * attempt to register any service that is not
842 	 * already registered.
843 	 */
844 	mutex_enter(&ds_svcs.lock);
845 
846 	(void) ds_walk_svcs(ds_svc_port_up, port);
847 	(void) ds_walk_svcs(ds_svc_register, NULL);
848 
849 	mutex_exit(&ds_svcs.lock);
850 }
851 
852 static void
853 ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len)
854 {
855 	int		idx;
856 	ds_init_nack_t	*nack;
857 	ds_ver_t	*ver;
858 	size_t		explen = DS_MSG_LEN(ds_init_nack_t);
859 
860 	/* sanity check the incoming message */
861 	if (len != explen) {
862 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: invalid message "
863 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
864 		    explen);
865 		return;
866 	}
867 
868 	nack = (ds_init_nack_t *)(buf + DS_HDR_SZ);
869 
870 	mutex_enter(&port->lock);
871 
872 	if (port->state != DS_PORT_INIT_REQ) {
873 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: invalid state: %d"
874 		    DS_EOL, PORTID(port), port->state);
875 		mutex_exit(&port->lock);
876 		return;
877 	}
878 
879 	ver = &(ds_vers[port->ver_idx]);
880 
881 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: req=v%d.%d, nack=v%d.x"
882 	    DS_EOL, PORTID(port), ver->major, ver->minor, nack->major_vers);
883 
884 	if (nack->major_vers == 0) {
885 		/* no supported protocol version */
886 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS not supported"
887 		    DS_EOL, PORTID(port));
888 		mutex_exit(&port->lock);
889 		return;
890 	}
891 
892 	/*
893 	 * Walk the version list, looking for a major version
894 	 * that is as close to the requested major version as
895 	 * possible.
896 	 */
897 	for (idx = port->ver_idx; idx < DS_NUM_VER; idx++) {
898 		if (ds_vers[idx].major <= nack->major_vers) {
899 			/* found a version to try */
900 			goto done;
901 		}
902 	}
903 
904 	if (idx == DS_NUM_VER) {
905 		/* no supported version */
906 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS v%d.x not "
907 		    "supported" DS_EOL, PORTID(port), nack->major_vers);
908 
909 		mutex_exit(&port->lock);
910 		return;
911 	}
912 
913 done:
914 	/* start the handshake again */
915 	port->ver_idx = idx;
916 	port->state = DS_PORT_LDC_INIT;
917 	mutex_exit(&port->lock);
918 
919 	ds_send_init_req(port);
920 
921 }
922 
923 static ds_svc_t *
924 ds_find_svc_by_id_port(char *svc_id, int is_client, ds_port_t *port)
925 {
926 	int		idx;
927 	ds_svc_t	*svc, *found_svc = 0;
928 	uint32_t	flag_match = is_client ? DSSF_ISCLIENT : 0;
929 
930 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
931 
932 	/* walk every table entry */
933 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
934 		svc = ds_svcs.tbl[idx];
935 		if (DS_SVC_ISFREE(svc))
936 			continue;
937 		if (strcmp(svc->cap.svc_id, svc_id) != 0)
938 			continue;
939 		if ((svc->flags & DSSF_ISCLIENT) != flag_match)
940 			continue;
941 		if (port != NULL && svc->port == port) {
942 			return (svc);
943 		} else if (svc->state == DS_SVC_INACTIVE) {
944 			found_svc = svc;
945 		} else if (!found_svc) {
946 			found_svc = svc;
947 		}
948 	}
949 
950 	return (found_svc);
951 }
952 
953 static void
954 ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len)
955 {
956 	ds_reg_req_t	*req;
957 	ds_hdr_t	*hdr;
958 	ds_reg_ack_t	*ack;
959 	ds_reg_nack_t	*nack;
960 	char		*msg;
961 	size_t		msglen;
962 	size_t		explen = DS_MSG_LEN(ds_reg_req_t);
963 	ds_svc_t	*svc = NULL;
964 	ds_ver_t	version;
965 	uint16_t	new_major;
966 	uint16_t	new_minor;
967 	boolean_t	match;
968 
969 	/* sanity check the incoming message */
970 	if (len < explen) {
971 		cmn_err(CE_WARN, "ds@%lx: <reg_req: invalid message "
972 		    "length (%ld), expected at least %ld" DS_EOL,
973 		    PORTID(port), len, explen);
974 		return;
975 	}
976 
977 	req = (ds_reg_req_t *)(buf + DS_HDR_SZ);
978 
979 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' ver=%d.%d, hdl=0x%llx"
980 	    DS_EOL, PORTID(port), req->svc_id, req->major_vers, req->minor_vers,
981 	    (u_longlong_t)req->svc_handle);
982 
983 	mutex_enter(&ds_svcs.lock);
984 	svc = ds_find_svc_by_id_port(req->svc_id,
985 	    DS_HDL_ISCLIENT(req->svc_handle) == 0, port);
986 	if (svc == NULL) {
987 
988 do_reg_nack:
989 		mutex_exit(&ds_svcs.lock);
990 
991 		msglen = DS_MSG_LEN(ds_reg_nack_t);
992 		msg = DS_MALLOC(msglen);
993 
994 		hdr = (ds_hdr_t *)msg;
995 		hdr->msg_type = DS_REG_NACK;
996 		hdr->payload_len = sizeof (ds_reg_nack_t);
997 
998 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
999 		nack->svc_handle = req->svc_handle;
1000 		nack->result = DS_REG_VER_NACK;
1001 		nack->major_vers = 0;
1002 
1003 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s'" DS_EOL,
1004 		    PORTID(port), req->svc_id);
1005 		/*
1006 		 * Send the response
1007 		 */
1008 		(void) ds_send_msg(port, msg, msglen);
1009 		DS_FREE(msg, msglen);
1010 		return;
1011 	}
1012 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' found, hdl: 0x%llx" DS_EOL,
1013 	    PORTID(port), req->svc_id, (u_longlong_t)svc->hdl);
1014 
1015 	/*
1016 	 * A client sends out a reg req in order to force service providers to
1017 	 * initiate a reg req from their end (limitation in the protocol).  If a
1018 	 * service provider already thinks it's talking to someone on that port,
1019 	 * something's gone wrong (probably an unreg req request was dropped).
1020 	 * If we see that the service is in the wrong state, reset it.
1021 	 */
1022 
1023 	if (DS_HDL_ISCLIENT(req->svc_handle)) {
1024 		if (svc->state != DS_SVC_INACTIVE) {
1025 			(void) ds_svc_unregister(svc, port);
1026 		}
1027 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging client"
1028 		    DS_EOL, PORTID(port), req->svc_id);
1029 		(void) ds_svc_port_up(svc, port);
1030 		(void) ds_svc_register_onport(svc, port);
1031 		mutex_exit(&ds_svcs.lock);
1032 		return;
1033 	}
1034 
1035 	/*
1036 	 * Only remote service providers can initiate a registration.  The
1037 	 * local sevice from here must be a client service.
1038 	 */
1039 
1040 	match = negotiate_version(svc->cap.nvers, svc->cap.vers,
1041 	    req->major_vers, &new_major, &new_minor);
1042 
1043 	/*
1044 	 * Check version info. ACK only if the major numbers exactly
1045 	 * match. The service entity can retry with a new minor
1046 	 * based on the response sent as part of the NACK.
1047 	 */
1048 	if (match) {
1049 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' svc%d: state: %x "
1050 		    "svc_portid: %d" DS_EOL, PORTID(port), req->svc_id,
1051 		    (int)DS_HDL2IDX(svc->hdl), svc->state,
1052 		    (int)(svc->port == NULL ? -1 : PORTID(svc->port)));
1053 		/*
1054 		 * If the current local service is already in use and
1055 		 * it's not on this port, clone it.
1056 		 */
1057 		if (svc->state != DS_SVC_INACTIVE) {
1058 			if (svc->port != NULL && port == svc->port) {
1059 				/*
1060 				 * Someone probably dropped an unreg req
1061 				 * somewhere.  Force a local unreg.
1062 				 */
1063 				(void) ds_svc_unregister(svc, port);
1064 			} else if (!DS_HDL_ISCLIENT(svc->hdl)) {
1065 				/*
1066 				 * Can't clone a non-client (service provider)
1067 				 * handle.  This is because old in-kernel
1068 				 * service providers can't deal with multiple
1069 				 * handles.
1070 				 */
1071 				goto do_reg_nack;
1072 			} else {
1073 				svc = ds_svc_clone(svc);
1074 			}
1075 		}
1076 		svc->port = port;
1077 		svc->svc_hdl = req->svc_handle;
1078 		svc->state = DS_SVC_ACTIVE;
1079 
1080 		msglen = DS_MSG_LEN(ds_reg_ack_t);
1081 		msg = DS_MALLOC(msglen);
1082 
1083 		hdr = (ds_hdr_t *)msg;
1084 		hdr->msg_type = DS_REG_ACK;
1085 		hdr->payload_len = sizeof (ds_reg_ack_t);
1086 
1087 		ack = (ds_reg_ack_t *)(msg + DS_HDR_SZ);
1088 		ack->svc_handle = req->svc_handle;
1089 		ack->minor_vers = MIN(new_minor, req->minor_vers);
1090 
1091 
1092 		if (svc->ops.ds_reg_cb) {
1093 			/* Call the registration callback */
1094 			version.major = req->major_vers;
1095 			version.minor = ack->minor_vers;
1096 			(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &version,
1097 			    svc->hdl);
1098 		}
1099 		mutex_exit(&ds_svcs.lock);
1100 
1101 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_ack>: '%s' minor=0x%04X"
1102 		    DS_EOL, PORTID(port), svc->cap.svc_id,
1103 		    MIN(new_minor, req->minor_vers));
1104 	} else {
1105 		mutex_exit(&ds_svcs.lock);
1106 
1107 		msglen = DS_MSG_LEN(ds_reg_nack_t);
1108 		msg = DS_MALLOC(msglen);
1109 
1110 		hdr = (ds_hdr_t *)msg;
1111 		hdr->msg_type = DS_REG_NACK;
1112 		hdr->payload_len = sizeof (ds_reg_nack_t);
1113 
1114 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1115 		nack->svc_handle = req->svc_handle;
1116 		nack->result = DS_REG_VER_NACK;
1117 		nack->major_vers = new_major;
1118 
1119 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s' major=0x%04X"
1120 		    DS_EOL, PORTID(port), svc->cap.svc_id, new_major);
1121 	}
1122 
1123 	/* send message */
1124 	(void) ds_send_msg(port, msg, msglen);
1125 	DS_FREE(msg, msglen);
1126 }
1127 
1128 static void
1129 ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len)
1130 {
1131 	ds_reg_ack_t	*ack;
1132 	ds_ver_t	*ver;
1133 	ds_ver_t	tmpver;
1134 	ds_svc_t	*svc;
1135 	size_t		explen = DS_MSG_LEN(ds_reg_ack_t);
1136 
1137 	/* sanity check the incoming message */
1138 	if (len != explen) {
1139 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid message "
1140 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1141 		    explen);
1142 		return;
1143 	}
1144 
1145 	ack = (ds_reg_ack_t *)(buf + DS_HDR_SZ);
1146 
1147 	mutex_enter(&ds_svcs.lock);
1148 
1149 	/*
1150 	 * This searches for service based on how we generate handles
1151 	 * and so only works because this is a reg ack.
1152 	 */
1153 	if (DS_HDL_ISCLIENT(ack->svc_handle) ||
1154 	    (svc = ds_get_svc(ack->svc_handle)) == NULL) {
1155 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid handle 0x%llx"
1156 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1157 		goto done;
1158 	}
1159 
1160 	/* make sure the message makes sense */
1161 	if (svc->state != DS_SVC_REG_PENDING) {
1162 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid state (%d)" DS_EOL,
1163 		    PORTID(port), svc->state);
1164 		goto done;
1165 	}
1166 
1167 	ver = &(svc->cap.vers[svc->ver_idx]);
1168 
1169 	/* major version has been agreed upon */
1170 	svc->ver.major = ver->major;
1171 
1172 	if (ack->minor_vers >= ver->minor) {
1173 		/*
1174 		 * Use the minor version specified in the
1175 		 * original request.
1176 		 */
1177 		svc->ver.minor = ver->minor;
1178 	} else {
1179 		/*
1180 		 * Use the lower minor version returned in
1181 		 * the ack. By defninition, all lower minor
1182 		 * versions must be supported.
1183 		 */
1184 		svc->ver.minor = ack->minor_vers;
1185 	}
1186 
1187 	svc->state = DS_SVC_ACTIVE;
1188 	svc->port = port;
1189 
1190 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_ack: '%s' v%d.%d ready, hdl=0x%llx"
1191 	    DS_EOL, PORTID(port), svc->cap.svc_id, svc->ver.major,
1192 	    svc->ver.minor, (u_longlong_t)svc->hdl);
1193 
1194 	/* notify the client that registration is complete */
1195 	if (svc->ops.ds_reg_cb) {
1196 		/*
1197 		 * Use a temporary version structure so that
1198 		 * the copy in the svc structure cannot be
1199 		 * modified by the client.
1200 		 */
1201 		tmpver.major = svc->ver.major;
1202 		tmpver.minor = svc->ver.minor;
1203 
1204 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &tmpver, svc->hdl);
1205 	}
1206 
1207 done:
1208 	mutex_exit(&ds_svcs.lock);
1209 }
1210 
1211 static void
1212 ds_try_next_port(ds_svc_t *svc, int portid)
1213 {
1214 	ds_port_t *port;
1215 	ds_portset_t totry;
1216 	int i;
1217 
1218 	DS_DBG_LDC(CE_NOTE, "ds@%x %s" DS_EOL, portid, __func__);
1219 	DS_PORTSET_NOT(totry, svc->tried);
1220 	DS_PORTSET_AND(totry, svc->avail);
1221 	if (DS_PORTSET_ISNULL(totry))
1222 		return;
1223 
1224 	for (i = 0; i < DS_MAX_PORTS; i++, portid++) {
1225 		if (portid >= DS_MAX_PORTS) {
1226 			portid = 0;
1227 		}
1228 
1229 		/*
1230 		 * If the port is not in the available list,
1231 		 * it is not a candidate for registration.
1232 		 */
1233 		if (!DS_PORT_IN_SET(totry, portid)) {
1234 			continue;
1235 		}
1236 
1237 		port = &ds_ports[portid];
1238 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s trying ldc.id: %d" DS_EOL,
1239 		    portid, __func__, (uint_t)(port->ldc.id));
1240 		if (ds_send_reg_req(svc, port) == 0) {
1241 			DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send OK" DS_EOL,
1242 			    portid, __func__);
1243 			/* register sent successfully */
1244 			break;
1245 		}
1246 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send FAIL" DS_EOL,
1247 		    portid, __func__);
1248 
1249 		/* reset the service to try the next port */
1250 		ds_reset_svc(svc, port);
1251 	}
1252 }
1253 
1254 static void
1255 ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len)
1256 {
1257 	ds_reg_nack_t	*nack;
1258 	ds_svc_t	*svc;
1259 	int		idx;
1260 	size_t		explen = DS_MSG_LEN(ds_reg_nack_t);
1261 
1262 	/* sanity check the incoming message */
1263 	if (len != explen) {
1264 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid message "
1265 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1266 		    explen);
1267 		return;
1268 	}
1269 
1270 	nack = (ds_reg_nack_t *)(buf + DS_HDR_SZ);
1271 
1272 	mutex_enter(&ds_svcs.lock);
1273 
1274 	/*
1275 	 * We expect a reg_nack for a client ping.
1276 	 */
1277 	if (DS_HDL_ISCLIENT(nack->svc_handle)) {
1278 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: ping hdl: 0x%llx"
1279 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1280 		goto done;
1281 	}
1282 
1283 	/*
1284 	 * This searches for service based on how we generate handles
1285 	 * and so only works because this is a reg nack.
1286 	 */
1287 	if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1288 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid handle 0x%llx"
1289 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1290 		goto done;
1291 	}
1292 
1293 	/* make sure the message makes sense */
1294 	if (svc->state != DS_SVC_REG_PENDING) {
1295 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid state (%d)" DS_EOL,
1296 		    PORTID(port), svc->state);
1297 		goto done;
1298 	}
1299 
1300 	if (nack->result == DS_REG_DUP) {
1301 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: duplicate registration "
1302 		    " for %s" DS_EOL, PORTID(port), svc->cap.svc_id);
1303 		ds_reset_svc(svc, port);
1304 		goto done;
1305 	}
1306 
1307 	/*
1308 	 * A major version of zero indicates that the
1309 	 * service is not supported at all.
1310 	 */
1311 	if (nack->major_vers == 0) {
1312 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' not supported"
1313 		    DS_EOL, PORTID(port), svc->cap.svc_id);
1314 		ds_reset_svc(svc, port);
1315 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1316 			ds_try_next_port(svc, PORTID(port) + 1);
1317 		goto done;
1318 	}
1319 
1320 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' hdl=0x%llx, nack=%d.x"
1321 	    DS_EOL, PORTID(port), svc->cap.svc_id,
1322 	    (u_longlong_t)nack->svc_handle, nack->major_vers);
1323 
1324 	/*
1325 	 * Walk the version list for the service, looking for
1326 	 * a major version that is as close to the requested
1327 	 * major version as possible.
1328 	 */
1329 	for (idx = svc->ver_idx; idx < svc->cap.nvers; idx++) {
1330 		if (svc->cap.vers[idx].major <= nack->major_vers) {
1331 			/* found a version to try */
1332 			break;
1333 		}
1334 	}
1335 
1336 	if (idx == svc->cap.nvers) {
1337 		/* no supported version */
1338 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: %s v%d.x not supported"
1339 		    DS_EOL, PORTID(port), svc->cap.svc_id, nack->major_vers);
1340 		ds_reset_svc(svc, port);
1341 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1342 			ds_try_next_port(svc, PORTID(port) + 1);
1343 		goto done;
1344 	}
1345 
1346 	/* start the handshake again */
1347 	svc->state = DS_SVC_INACTIVE;
1348 	svc->ver_idx = idx;
1349 
1350 	(void) ds_svc_register(svc, NULL);
1351 
1352 done:
1353 	mutex_exit(&ds_svcs.lock);
1354 }
1355 
1356 static void
1357 ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len)
1358 {
1359 	ds_hdr_t	*hdr;
1360 	ds_unreg_req_t	*req;
1361 	ds_unreg_ack_t	*ack;
1362 	ds_svc_t	*svc;
1363 	char		*msg;
1364 	size_t		msglen;
1365 	size_t		explen = DS_MSG_LEN(ds_unreg_req_t);
1366 
1367 	/* sanity check the incoming message */
1368 	if (len != explen) {
1369 		cmn_err(CE_WARN, "ds@%lx: <unreg_req: invalid message "
1370 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1371 		    explen);
1372 		return;
1373 	}
1374 
1375 	req = (ds_unreg_req_t *)(buf + DS_HDR_SZ);
1376 
1377 	mutex_enter(&ds_svcs.lock);
1378 
1379 	/* lookup appropriate client or service */
1380 	if (DS_HDL_ISCLIENT(req->svc_handle) ||
1381 	    ((svc = ds_find_clnt_svc_by_hdl_port(req->svc_handle, port))
1382 	    == NULL && ((svc = ds_get_svc(req->svc_handle)) == NULL ||
1383 	    svc->port != port))) {
1384 		cmn_err(CE_WARN, "ds@%lx: <unreg_req: invalid handle 0x%llx"
1385 		    DS_EOL, PORTID(port), (u_longlong_t)req->svc_handle);
1386 		ds_send_unreg_nack(port, req->svc_handle);
1387 		mutex_exit(&ds_svcs.lock);
1388 		return;
1389 	}
1390 
1391 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_req: '%s' handle 0x%llx" DS_EOL,
1392 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1393 
1394 	(void) ds_svc_unregister(svc, svc->port);
1395 
1396 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_ack>: '%s' hdl=0x%llx" DS_EOL,
1397 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1398 
1399 	ds_check_for_dup_services(svc);
1400 
1401 	mutex_exit(&ds_svcs.lock);
1402 
1403 	msglen = DS_HDR_SZ + sizeof (ds_unreg_ack_t);
1404 	msg = DS_MALLOC(msglen);
1405 
1406 	hdr = (ds_hdr_t *)msg;
1407 	hdr->msg_type = DS_UNREG_ACK;
1408 	hdr->payload_len = sizeof (ds_unreg_ack_t);
1409 
1410 	ack = (ds_unreg_ack_t *)(msg + DS_HDR_SZ);
1411 	ack->svc_handle = req->svc_handle;
1412 
1413 	/* send message */
1414 	(void) ds_send_msg(port, msg, msglen);
1415 	DS_FREE(msg, msglen);
1416 
1417 }
1418 
1419 static void
1420 ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len)
1421 {
1422 	ds_unreg_ack_t	*ack;
1423 	size_t		explen = DS_MSG_LEN(ds_unreg_ack_t);
1424 
1425 	/* sanity check the incoming message */
1426 	if (len != explen) {
1427 		cmn_err(CE_WARN, "ds@%lx: <unreg_ack: invalid message "
1428 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1429 		    explen);
1430 		return;
1431 	}
1432 
1433 	ack = (ds_unreg_ack_t *)(buf + DS_HDR_SZ);
1434 
1435 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: hdl=0x%llx" DS_EOL,
1436 	    PORTID(port), (u_longlong_t)ack->svc_handle);
1437 
1438 	mutex_enter(&ds_svcs.lock);
1439 
1440 	/*
1441 	 * Since the unregister request was initiated locally,
1442 	 * the service structure has already been torn down.
1443 	 * Just perform a sanity check to make sure the message
1444 	 * is appropriate.
1445 	 */
1446 	if (ds_get_svc(ack->svc_handle) != NULL) {
1447 		cmn_err(CE_WARN, "ds@%lx: <unreg_ack: handle 0x%llx in use"
1448 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1449 	}
1450 
1451 	mutex_exit(&ds_svcs.lock);
1452 }
1453 
1454 static void
1455 ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len)
1456 {
1457 	ds_unreg_nack_t	*nack;
1458 	size_t		explen = DS_MSG_LEN(ds_unreg_nack_t);
1459 
1460 	/* sanity check the incoming message */
1461 	if (len != explen) {
1462 		cmn_err(CE_WARN, "ds@%lx: <unreg_nack: invalid message "
1463 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1464 		    explen);
1465 		return;
1466 	}
1467 
1468 	nack = (ds_unreg_nack_t *)(buf + DS_HDR_SZ);
1469 
1470 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: hdl=0x%llx" DS_EOL,
1471 	    PORTID(port), (u_longlong_t)nack->svc_handle);
1472 
1473 	mutex_enter(&ds_svcs.lock);
1474 
1475 	/*
1476 	 * Since the unregister request was initiated locally,
1477 	 * the service structure has already been torn down.
1478 	 * Just perform a sanity check to make sure the message
1479 	 * is appropriate.
1480 	 */
1481 	if (ds_get_svc(nack->svc_handle) != NULL) {
1482 		cmn_err(CE_WARN, "ds@%lx: <unreg_nack: handle 0x%llx in use"
1483 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1484 	}
1485 
1486 	mutex_exit(&ds_svcs.lock);
1487 }
1488 
1489 static void
1490 ds_handle_data(ds_port_t *port, caddr_t buf, size_t len)
1491 {
1492 	ds_data_handle_t	*data;
1493 	ds_svc_t		*svc;
1494 	char			*msg;
1495 	int			msgsz;
1496 	int			hdrsz;
1497 	size_t			explen = DS_MSG_LEN(ds_data_handle_t);
1498 
1499 	/* sanity check the incoming message */
1500 	if (len < explen) {
1501 		cmn_err(CE_WARN, "ds@%lx: <data: invalid message length "
1502 		    "(%ld), expected at least %ld" DS_EOL, PORTID(port), len,
1503 		    explen);
1504 		return;
1505 	}
1506 
1507 	data = (ds_data_handle_t *)(buf + DS_HDR_SZ);
1508 
1509 	hdrsz = DS_HDR_SZ + sizeof (ds_data_handle_t);
1510 	msgsz = len - hdrsz;
1511 
1512 	/* strip off the header for the client */
1513 	msg = (msgsz) ? (buf + hdrsz) : NULL;
1514 
1515 	mutex_enter(&ds_svcs.lock);
1516 
1517 	if ((svc = ds_find_clnt_svc_by_hdl_port(data->svc_handle, port))
1518 	    == NULL) {
1519 		if ((svc = ds_get_svc(data->svc_handle)) == NULL) {
1520 			mutex_exit(&ds_svcs.lock);
1521 			cmn_err(CE_WARN, "ds@%lx: <data: invalid handle 0x%llx"
1522 			    DS_EOL, PORTID(port),
1523 			    (u_longlong_t)data->svc_handle);
1524 			ds_send_data_nack(port, data->svc_handle);
1525 			return;
1526 		}
1527 	}
1528 
1529 	mutex_exit(&ds_svcs.lock);
1530 
1531 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <data: '%s' hdl=0x%llx" DS_EOL,
1532 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)svc->hdl);
1533 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msgsz);
1534 
1535 	/* dispatch this message to the client */
1536 	(*svc->ops.ds_data_cb)(svc->ops.cb_arg, msg, msgsz);
1537 }
1538 
1539 static void
1540 ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len)
1541 {
1542 	ds_svc_t	*svc;
1543 	ds_data_nack_t	*nack;
1544 	size_t		explen = DS_MSG_LEN(ds_data_nack_t);
1545 
1546 	/* sanity check the incoming message */
1547 	if (len != explen) {
1548 		cmn_err(CE_WARN, "ds@%lx: <data_nack: invalid message "
1549 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1550 		    explen);
1551 		return;
1552 	}
1553 
1554 	nack = (ds_data_nack_t *)(buf + DS_HDR_SZ);
1555 
1556 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack: hdl=0x%llx, result=0x%llx"
1557 	    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle,
1558 	    (u_longlong_t)nack->result);
1559 
1560 	if (nack->result == DS_INV_HDL) {
1561 
1562 		mutex_enter(&ds_svcs.lock);
1563 
1564 		if ((svc = ds_find_clnt_svc_by_hdl_port(nack->svc_handle,
1565 		    port)) == NULL) {
1566 			if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1567 				mutex_exit(&ds_svcs.lock);
1568 				return;
1569 			}
1570 		}
1571 
1572 		cmn_err(CE_WARN, "ds@%lx: <data_nack: handle 0x%llx reported "
1573 		    " as invalid" DS_EOL, PORTID(port),
1574 		    (u_longlong_t)nack->svc_handle);
1575 
1576 		(void) ds_svc_unregister(svc, svc->port);
1577 
1578 		mutex_exit(&ds_svcs.lock);
1579 	}
1580 }
1581 
1582 /* Initialize the port */
1583 void
1584 ds_send_init_req(ds_port_t *port)
1585 {
1586 	ds_hdr_t	*hdr;
1587 	ds_init_req_t	*init_req;
1588 	size_t		msglen;
1589 	ds_ver_t	*vers = &ds_vers[port->ver_idx];
1590 
1591 	mutex_enter(&port->lock);
1592 	if (port->state != DS_PORT_LDC_INIT) {
1593 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: invalid state: %d"
1594 		    DS_EOL, PORTID(port), port->state);
1595 		mutex_exit(&port->lock);
1596 		return;
1597 	}
1598 	mutex_exit(&port->lock);
1599 
1600 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: req=v%d.%d" DS_EOL,
1601 	    PORTID(port), vers->major, vers->minor);
1602 
1603 	msglen = DS_HDR_SZ + sizeof (ds_init_req_t);
1604 	hdr = DS_MALLOC(msglen);
1605 
1606 	hdr->msg_type = DS_INIT_REQ;
1607 	hdr->payload_len = sizeof (ds_init_req_t);
1608 
1609 	init_req = (ds_init_req_t *)((caddr_t)hdr + DS_HDR_SZ);
1610 	init_req->major_vers = vers->major;
1611 	init_req->minor_vers = vers->minor;
1612 
1613 	if (ds_send_msg(port, (caddr_t)hdr, msglen) == 0) {
1614 		/*
1615 		 * We've left the port state unlocked over the malloc/send,
1616 		 * make sure no one has changed the state under us before
1617 		 * we update the state.
1618 		 */
1619 		mutex_enter(&port->lock);
1620 		if (port->state == DS_PORT_LDC_INIT)
1621 			port->state = DS_PORT_INIT_REQ;
1622 		mutex_exit(&port->lock);
1623 	}
1624 	DS_FREE(hdr, msglen);
1625 }
1626 
1627 static int
1628 ds_send_reg_req(ds_svc_t *svc, ds_port_t *port)
1629 {
1630 	ds_ver_t	*ver;
1631 	ds_hdr_t	*hdr;
1632 	caddr_t		msg;
1633 	size_t		msglen;
1634 	ds_reg_req_t	*req;
1635 	size_t		idlen;
1636 	int		rv;
1637 
1638 	ASSERT(svc->state == DS_SVC_INACTIVE ||
1639 	    (svc->flags & DSSF_ISCLIENT) != 0);
1640 
1641 	mutex_enter(&port->lock);
1642 
1643 	/* check on the LDC to Zeus */
1644 	if (port->ldc.state != LDC_UP) {
1645 		/* can not send message */
1646 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: channel %ld is not up"
1647 		    DS_EOL, PORTID(port), port->ldc.id);
1648 		mutex_exit(&port->lock);
1649 		return (-1);
1650 	}
1651 
1652 	/* make sure port is ready */
1653 	if (port->state != DS_PORT_READY) {
1654 		/* can not send message */
1655 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: port is not ready"
1656 		    DS_EOL, PORTID(port));
1657 		mutex_exit(&port->lock);
1658 		return (-1);
1659 	}
1660 
1661 	mutex_exit(&port->lock);
1662 
1663 	/* allocate the message buffer */
1664 	idlen = strlen(svc->cap.svc_id);
1665 	msglen = DS_HDR_SZ + sizeof (ds_reg_req_t) + idlen;
1666 	msg = DS_MALLOC(msglen);
1667 
1668 	/* copy in the header data */
1669 	hdr = (ds_hdr_t *)msg;
1670 	hdr->msg_type = DS_REG_REQ;
1671 	hdr->payload_len = sizeof (ds_reg_req_t) + idlen;
1672 
1673 	req = (ds_reg_req_t *)(msg + DS_HDR_SZ);
1674 	req->svc_handle = svc->hdl;
1675 	ver = &(svc->cap.vers[svc->ver_idx]);
1676 	req->major_vers = ver->major;
1677 	req->minor_vers = ver->minor;
1678 
1679 	/* copy in the service id */
1680 	(void) memcpy(req->svc_id, svc->cap.svc_id, idlen + 1);
1681 
1682 	/* send the message */
1683 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: '%s' ver=%d.%d, hdl=0x%llx"
1684 	    DS_EOL, PORTID(port), svc->cap.svc_id, ver->major, ver->minor,
1685 	    (u_longlong_t)svc->hdl);
1686 
1687 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1688 		svc->port = port;
1689 		rv = -1;
1690 	} else if ((svc->flags & DSSF_ISCLIENT) == 0) {
1691 		svc->state = DS_SVC_REG_PENDING;
1692 	}
1693 	DS_FREE(msg, msglen);
1694 
1695 	return (rv);
1696 }
1697 
1698 /*
1699  * Keep around in case we want this later
1700  */
1701 int
1702 ds_send_unreg_req(ds_svc_t *svc)
1703 {
1704 	caddr_t		msg;
1705 	size_t		msglen;
1706 	ds_hdr_t	*hdr;
1707 	ds_unreg_req_t	*req;
1708 	ds_port_t	*port = svc->port;
1709 	int		rv;
1710 
1711 	if (port == NULL) {
1712 		DS_DBG(CE_NOTE, "send_unreg_req: service '%s' not "
1713 		    "associated with a port" DS_EOL, svc->cap.svc_id);
1714 		return (-1);
1715 	}
1716 
1717 	mutex_enter(&port->lock);
1718 
1719 	/* check on the LDC to Zeus */
1720 	if (port->ldc.state != LDC_UP) {
1721 		/* can not send message */
1722 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: channel %ld is not up"
1723 		    DS_EOL, PORTID(port), port->ldc.id);
1724 		mutex_exit(&port->lock);
1725 		return (-1);
1726 	}
1727 
1728 	/* make sure port is ready */
1729 	if (port->state != DS_PORT_READY) {
1730 		/* can not send message */
1731 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: port is not ready" DS_EOL,
1732 		    PORTID(port));
1733 		mutex_exit(&port->lock);
1734 		return (-1);
1735 	}
1736 
1737 	mutex_exit(&port->lock);
1738 
1739 	msglen = DS_HDR_SZ + sizeof (ds_unreg_req_t);
1740 	msg = DS_MALLOC(msglen);
1741 
1742 	/* copy in the header data */
1743 	hdr = (ds_hdr_t *)msg;
1744 	hdr->msg_type = DS_UNREG;
1745 	hdr->payload_len = sizeof (ds_unreg_req_t);
1746 
1747 	req = (ds_unreg_req_t *)(msg + DS_HDR_SZ);
1748 	if (svc->flags & DSSF_ISCLIENT) {
1749 		req->svc_handle = svc->svc_hdl;
1750 	} else {
1751 		req->svc_handle = svc->hdl;
1752 	}
1753 
1754 	/* send the message */
1755 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_req>: '%s' hdl=0x%llx" DS_EOL,
1756 	    PORTID(port), (svc->cap.svc_id) ? svc->cap.svc_id : "NULL",
1757 	    (u_longlong_t)svc->hdl);
1758 
1759 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1760 		rv = -1;
1761 	}
1762 	DS_FREE(msg, msglen);
1763 
1764 	return (rv);
1765 }
1766 
1767 static void
1768 ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1769 {
1770 	caddr_t		msg;
1771 	size_t		msglen;
1772 	ds_hdr_t	*hdr;
1773 	ds_unreg_nack_t	*nack;
1774 
1775 	mutex_enter(&port->lock);
1776 
1777 	/* check on the LDC to Zeus */
1778 	if (port->ldc.state != LDC_UP) {
1779 		/* can not send message */
1780 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: channel %ld is not up"
1781 		    DS_EOL, PORTID(port), port->ldc.id);
1782 		mutex_exit(&port->lock);
1783 		return;
1784 	}
1785 
1786 	/* make sure port is ready */
1787 	if (port->state != DS_PORT_READY) {
1788 		/* can not send message */
1789 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: port is not ready"
1790 		    DS_EOL, PORTID(port));
1791 		mutex_exit(&port->lock);
1792 		return;
1793 	}
1794 
1795 	mutex_exit(&port->lock);
1796 
1797 	msglen = DS_HDR_SZ + sizeof (ds_unreg_nack_t);
1798 	msg = DS_MALLOC(msglen);
1799 
1800 	/* copy in the header data */
1801 	hdr = (ds_hdr_t *)msg;
1802 	hdr->msg_type = DS_UNREG_NACK;
1803 	hdr->payload_len = sizeof (ds_unreg_nack_t);
1804 
1805 	nack = (ds_unreg_nack_t *)(msg + DS_HDR_SZ);
1806 	nack->svc_handle = bad_hdl;
1807 
1808 	/* send the message */
1809 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_nack>: hdl=0x%llx" DS_EOL,
1810 	    PORTID(port), (u_longlong_t)bad_hdl);
1811 
1812 	(void) ds_send_msg(port, msg, msglen);
1813 	DS_FREE(msg, msglen);
1814 }
1815 
1816 static void
1817 ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1818 {
1819 	caddr_t		msg;
1820 	size_t		msglen;
1821 	ds_hdr_t	*hdr;
1822 	ds_data_nack_t	*nack;
1823 
1824 	mutex_enter(&port->lock);
1825 
1826 	/* check on the LDC to Zeus */
1827 	if (port->ldc.state != LDC_UP) {
1828 		/* can not send message */
1829 		cmn_err(CE_WARN, "ds@%lx: data_nack>: channel %ld is not up"
1830 		    DS_EOL, PORTID(port), port->ldc.id);
1831 		mutex_exit(&port->lock);
1832 		return;
1833 	}
1834 
1835 	/* make sure port is ready */
1836 	if (port->state != DS_PORT_READY) {
1837 		/* can not send message */
1838 		cmn_err(CE_WARN, "ds@%lx: data_nack>: port is not ready" DS_EOL,
1839 		    PORTID(port));
1840 		mutex_exit(&port->lock);
1841 		return;
1842 	}
1843 
1844 	mutex_exit(&port->lock);
1845 
1846 	msglen = DS_HDR_SZ + sizeof (ds_data_nack_t);
1847 	msg = DS_MALLOC(msglen);
1848 
1849 	/* copy in the header data */
1850 	hdr = (ds_hdr_t *)msg;
1851 	hdr->msg_type = DS_NACK;
1852 	hdr->payload_len = sizeof (ds_data_nack_t);
1853 
1854 	nack = (ds_data_nack_t *)(msg + DS_HDR_SZ);
1855 	nack->svc_handle = bad_hdl;
1856 	nack->result = DS_INV_HDL;
1857 
1858 	/* send the message */
1859 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack>: hdl=0x%llx" DS_EOL,
1860 	    PORTID(port), (u_longlong_t)bad_hdl);
1861 
1862 	(void) ds_send_msg(port, msg, msglen);
1863 	DS_FREE(msg, msglen);
1864 }
1865 
1866 /* END DS PROTOCOL SUPPORT FUNCTIONS */
1867 
1868 #ifdef DEBUG
1869 
1870 #define	BYTESPERLINE	8
1871 #define	LINEWIDTH	((BYTESPERLINE * 3) + (BYTESPERLINE + 2) + 1)
1872 #define	ASCIIOFFSET	((BYTESPERLINE * 3) + 2)
1873 #define	ISPRINT(c)	((c >= ' ') && (c <= '~'))
1874 
1875 /*
1876  * Output a buffer formatted with a set number of bytes on
1877  * each line. Append each line with the ASCII equivalent of
1878  * each byte if it falls within the printable ASCII range,
1879  * and '.' otherwise.
1880  */
1881 void
1882 ds_dump_msg(void *vbuf, size_t len)
1883 {
1884 	int	i, j;
1885 	char	*curr;
1886 	char	*aoff;
1887 	char	line[LINEWIDTH];
1888 	uint8_t	*buf = vbuf;
1889 
1890 	if (len > 128)
1891 		len = 128;
1892 
1893 	/* walk the buffer one line at a time */
1894 	for (i = 0; i < len; i += BYTESPERLINE) {
1895 
1896 		bzero(line, LINEWIDTH);
1897 
1898 		curr = line;
1899 		aoff = line + ASCIIOFFSET;
1900 
1901 		/*
1902 		 * Walk the bytes in the current line, storing
1903 		 * the hex value for the byte as well as the
1904 		 * ASCII representation in a temporary buffer.
1905 		 * All ASCII values are placed at the end of
1906 		 * the line.
1907 		 */
1908 		for (j = 0; (j < BYTESPERLINE) && ((i + j) < len); j++) {
1909 			(void) sprintf(curr, " %02x", buf[i + j]);
1910 			*aoff = (ISPRINT(buf[i + j])) ? buf[i + j] : '.';
1911 			curr += 3;
1912 			aoff++;
1913 		}
1914 
1915 		/*
1916 		 * Fill in to the start of the ASCII translation
1917 		 * with spaces. This will only be necessary if
1918 		 * this is the last line and there are not enough
1919 		 * bytes to fill the whole line.
1920 		 */
1921 		while (curr != (line + ASCIIOFFSET))
1922 			*curr++ = ' ';
1923 
1924 		cmn_err(CE_NOTE, "%s" DS_EOL, line);
1925 	}
1926 }
1927 #endif /* DEBUG */
1928 
1929 
1930 /*
1931  * Walk the table of registered services, executing the specified callback
1932  * function for each service on a port. A non-zero return value from the
1933  * callback is used to terminate the walk, not to indicate an error. Returns
1934  * the index of the last service visited.
1935  */
1936 int
1937 ds_walk_svcs(svc_cb_t svc_cb, void *arg)
1938 {
1939 	int		idx;
1940 	ds_svc_t	*svc;
1941 
1942 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
1943 
1944 	/* walk every table entry */
1945 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
1946 		svc = ds_svcs.tbl[idx];
1947 
1948 		/* execute the callback */
1949 		if ((*svc_cb)(svc, arg) != 0)
1950 			break;
1951 	}
1952 
1953 	return (idx);
1954 }
1955 
1956 static int
1957 ds_svc_isfree(ds_svc_t *svc, void *arg)
1958 {
1959 	_NOTE(ARGUNUSED(arg))
1960 
1961 	/*
1962 	 * Looking for a free service. This may be a NULL entry
1963 	 * in the table, or an unused structure that could be
1964 	 * reused.
1965 	 */
1966 
1967 	if (DS_SVC_ISFREE(svc)) {
1968 		/* yes, it is free */
1969 		return (1);
1970 	}
1971 
1972 	/* not a candidate */
1973 	return (0);
1974 }
1975 
1976 int
1977 ds_svc_ismatch(ds_svc_t *svc, void *arg)
1978 {
1979 	if (DS_SVC_ISFREE(svc)) {
1980 		return (0);
1981 	}
1982 
1983 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
1984 	    (svc->flags & DSSF_ISCLIENT) == 0) {
1985 		/* found a match */
1986 		return (1);
1987 	}
1988 
1989 	return (0);
1990 }
1991 
1992 int
1993 ds_svc_clnt_ismatch(ds_svc_t *svc, void *arg)
1994 {
1995 	if (DS_SVC_ISFREE(svc)) {
1996 		return (0);
1997 	}
1998 
1999 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2000 	    (svc->flags & DSSF_ISCLIENT) != 0) {
2001 		/* found a match */
2002 		return (1);
2003 	}
2004 
2005 	return (0);
2006 }
2007 
2008 int
2009 ds_svc_free(ds_svc_t *svc, void *arg)
2010 {
2011 	_NOTE(ARGUNUSED(arg))
2012 
2013 	if (svc == NULL) {
2014 		return (0);
2015 	}
2016 
2017 	if (svc->cap.svc_id) {
2018 		DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
2019 		svc->cap.svc_id = NULL;
2020 	}
2021 
2022 	if (svc->cap.vers) {
2023 		DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
2024 		svc->cap.vers = NULL;
2025 	}
2026 
2027 	DS_FREE(svc, sizeof (ds_svc_t));
2028 
2029 	return (0);
2030 }
2031 
2032 static int
2033 ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port)
2034 {
2035 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2036 
2037 	if (DS_SVC_ISFREE(svc))
2038 		return (0);
2039 
2040 	if (!DS_PORT_IN_SET(svc->avail, PORTID(port)))
2041 		return (0);
2042 
2043 	DS_PORTSET_ADD(svc->tried, PORTID(port));
2044 
2045 	if (ds_send_reg_req(svc, port) == 0) {
2046 		/* register sent successfully */
2047 		return (1);
2048 	}
2049 
2050 	if ((svc->flags & DSSF_ISCLIENT) == 0) {
2051 		/* reset the service */
2052 		ds_reset_svc(svc, port);
2053 	}
2054 	return (0);
2055 }
2056 
2057 int
2058 ds_svc_register(ds_svc_t *svc, void *arg)
2059 {
2060 	_NOTE(ARGUNUSED(arg))
2061 	ds_portset_t ports;
2062 	ds_port_t *port;
2063 	int	idx;
2064 
2065 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2066 
2067 	if (DS_SVC_ISFREE(svc))
2068 		return (0);
2069 
2070 	ports = svc->avail;
2071 	if (svc->flags & DSSF_ISCLIENT) {
2072 		ds_portset_del_active_clients(svc->cap.svc_id, &ports);
2073 	} else if (svc->state != DS_SVC_INACTIVE)
2074 		return (0);
2075 
2076 	if (DS_PORTSET_ISNULL(ports))
2077 		return (0);
2078 
2079 	/*
2080 	 * Attempt to register the service. Start with the lowest
2081 	 * numbered port and continue until a registration message
2082 	 * is sent successfully, or there are no ports left to try.
2083 	 */
2084 	for (idx = 0; idx < DS_MAX_PORTS; idx++) {
2085 
2086 		/*
2087 		 * If the port is not in the available list,
2088 		 * it is not a candidate for registration.
2089 		 */
2090 		if (!DS_PORT_IN_SET(ports, idx)) {
2091 			continue;
2092 		}
2093 
2094 		port = &ds_ports[idx];
2095 		if (ds_svc_register_onport(svc, port)) {
2096 			if ((svc->flags & DSSF_ISCLIENT) == 0)
2097 				break;
2098 			DS_PORTSET_DEL(svc->avail, idx);
2099 		}
2100 	}
2101 
2102 	return (0);
2103 }
2104 
2105 static int
2106 ds_svc_unregister(ds_svc_t *svc, void *arg)
2107 {
2108 	ds_port_t *port = (ds_port_t *)arg;
2109 	ds_svc_hdl_t hdl;
2110 
2111 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2112 
2113 	if (DS_SVC_ISFREE(svc)) {
2114 		return (0);
2115 	}
2116 
2117 	/* make sure the service is using this port */
2118 	if (svc->port != port) {
2119 		return (0);
2120 	}
2121 
2122 	if (port) {
2123 		DS_DBG(CE_NOTE, "ds@%lx: svc_unreg: id='%s', ver=%d.%d, "
2124 		    " hdl=0x%09lx" DS_EOL, PORTID(port), svc->cap.svc_id,
2125 		    svc->ver.major, svc->ver.minor, svc->hdl);
2126 	} else {
2127 		DS_DBG(CE_NOTE, "port=NULL: svc_unreg: id='%s', ver=%d.%d, "
2128 		    " hdl=0x%09lx" DS_EOL, svc->cap.svc_id, svc->ver.major,
2129 		    svc->ver.minor, svc->hdl);
2130 	}
2131 
2132 	/* reset the service structure */
2133 	ds_reset_svc(svc, port);
2134 
2135 	/* call the client unregister callback */
2136 	if (svc->ops.ds_unreg_cb) {
2137 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
2138 	}
2139 
2140 	/* increment the count in the handle to prevent reuse */
2141 	hdl = DS_ALLOC_HDL(DS_HDL2IDX(svc->hdl), DS_HDL2COUNT(svc->hdl));
2142 	if (DS_HDL_ISCLIENT(svc->hdl)) {
2143 		DS_HDL_SET_ISCLIENT(hdl);
2144 	}
2145 	svc->hdl = hdl;
2146 
2147 	if (svc->state != DS_SVC_UNREG_PENDING) {
2148 		/* try to initiate a new registration */
2149 		(void) ds_svc_register(svc, NULL);
2150 	}
2151 
2152 	return (0);
2153 }
2154 
2155 static int
2156 ds_svc_port_up(ds_svc_t *svc, void *arg)
2157 {
2158 	ds_port_t *port = (ds_port_t *)arg;
2159 
2160 	if (DS_SVC_ISFREE(svc)) {
2161 		/* nothing to do */
2162 		return (0);
2163 	}
2164 
2165 	DS_PORTSET_ADD(svc->avail, port->id);
2166 	DS_PORTSET_DEL(svc->tried, port->id);
2167 
2168 	return (0);
2169 }
2170 
2171 ds_svc_t *
2172 ds_alloc_svc(void)
2173 {
2174 	int		idx;
2175 	uint_t		newmaxsvcs;
2176 	ds_svc_t	**newtbl;
2177 	ds_svc_t	*newsvc;
2178 
2179 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2180 
2181 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2182 
2183 	if (idx != ds_svcs.maxsvcs) {
2184 		goto found;
2185 	}
2186 
2187 	/*
2188 	 * There was no free space in the table. Grow
2189 	 * the table to double its current size.
2190 	 */
2191 	newmaxsvcs = ds_svcs.maxsvcs * 2;
2192 	newtbl = DS_MALLOC(newmaxsvcs * sizeof (ds_svc_t *));
2193 
2194 	/* copy old table data to the new table */
2195 	(void) memcpy(newtbl, ds_svcs.tbl,
2196 	    ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2197 
2198 	/* clean up the old table */
2199 	DS_FREE(ds_svcs.tbl, ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2200 	ds_svcs.tbl = newtbl;
2201 	ds_svcs.maxsvcs = newmaxsvcs;
2202 
2203 	/* search for a free space again */
2204 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2205 
2206 	/* the table is locked so should find a free slot */
2207 	ASSERT(idx != ds_svcs.maxsvcs);
2208 
2209 found:
2210 	/* allocate a new svc structure if necessary */
2211 	if ((newsvc = ds_svcs.tbl[idx]) == NULL) {
2212 		/* allocate a new service */
2213 		newsvc = DS_MALLOC(sizeof (ds_svc_t));
2214 		ds_svcs.tbl[idx] = newsvc;
2215 	}
2216 
2217 	/* fill in the handle */
2218 	newsvc->hdl = DS_ALLOC_HDL(idx, DS_HDL2COUNT(newsvc->hdl));
2219 	newsvc->state = DS_SVC_FREE;	/* Mark as free temporarily */
2220 
2221 	return (newsvc);
2222 }
2223 
2224 static void
2225 ds_reset_svc(ds_svc_t *svc, ds_port_t *port)
2226 {
2227 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2228 
2229 	if (svc->state != DS_SVC_UNREG_PENDING)
2230 		svc->state = DS_SVC_INACTIVE;
2231 	svc->ver_idx = 0;
2232 	svc->ver.major = 0;
2233 	svc->ver.minor = 0;
2234 	svc->port = NULL;
2235 	if (port) {
2236 		DS_PORTSET_DEL(svc->avail, port->id);
2237 	}
2238 }
2239 
2240 ds_svc_t *
2241 ds_get_svc(ds_svc_hdl_t hdl)
2242 {
2243 	int		idx;
2244 	ds_svc_t	*svc;
2245 
2246 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2247 
2248 	if (hdl == DS_INVALID_HDL)
2249 		return (NULL);
2250 
2251 	idx = DS_HDL2IDX(hdl);
2252 
2253 	/* check if index is out of bounds */
2254 	if ((idx < 0) || (idx >= ds_svcs.maxsvcs))
2255 		return (NULL);
2256 
2257 	svc = ds_svcs.tbl[idx];
2258 
2259 	/* check for a valid service */
2260 	if (DS_SVC_ISFREE(svc))
2261 		return (NULL);
2262 
2263 	/* make sure the handle is an exact match */
2264 	if (svc->hdl != hdl)
2265 		return (NULL);
2266 
2267 	return (svc);
2268 }
2269 
2270 static void
2271 ds_port_reset(ds_port_t *port)
2272 {
2273 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2274 	ASSERT(MUTEX_HELD(&port->lock));
2275 
2276 	/* connection went down, mark everything inactive */
2277 	(void) ds_walk_svcs(ds_svc_unregister, port);
2278 
2279 	port->ver_idx = 0;
2280 	port->ver.major = 0;
2281 	port->ver.minor = 0;
2282 	port->state = DS_PORT_LDC_INIT;
2283 }
2284 
2285 /*
2286  * Verify that a version array is sorted as expected for the
2287  * version negotiation to work correctly.
2288  */
2289 ds_vers_check_t
2290 ds_vers_isvalid(ds_ver_t *vers, int nvers)
2291 {
2292 	uint16_t	curr_major;
2293 	uint16_t	curr_minor;
2294 	int		idx;
2295 
2296 	curr_major = vers[0].major;
2297 	curr_minor = vers[0].minor;
2298 
2299 	/*
2300 	 * Walk the version array, verifying correct ordering.
2301 	 * The array must be sorted from highest supported
2302 	 * version to lowest supported version.
2303 	 */
2304 	for (idx = 0; idx < nvers; idx++) {
2305 		if (vers[idx].major > curr_major) {
2306 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2307 			    " increasing major versions" DS_EOL);
2308 			return (DS_VERS_INCREASING_MAJOR_ERR);
2309 		}
2310 
2311 		if (vers[idx].major < curr_major) {
2312 			curr_major = vers[idx].major;
2313 			curr_minor = vers[idx].minor;
2314 			continue;
2315 		}
2316 
2317 		if (vers[idx].minor > curr_minor) {
2318 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2319 			    " increasing minor versions" DS_EOL);
2320 			return (DS_VERS_INCREASING_MINOR_ERR);
2321 		}
2322 
2323 		curr_minor = vers[idx].minor;
2324 	}
2325 
2326 	return (DS_VERS_OK);
2327 }
2328 
2329 /*
2330  * Extended user capability init.
2331  */
2332 int
2333 ds_ucap_init(ds_capability_t *cap, ds_clnt_ops_t *ops, uint32_t flags,
2334     int instance, ds_svc_hdl_t *hdlp)
2335 {
2336 	ds_vers_check_t	status;
2337 	ds_svc_t	*svc;
2338 	int		rv = 0;
2339 	ds_svc_hdl_t 	lb_hdl, hdl;
2340 	int		is_loopback;
2341 	int		is_client;
2342 
2343 	/* sanity check the args */
2344 	if ((cap == NULL) || (ops == NULL)) {
2345 		cmn_err(CE_NOTE, "%s: invalid arguments" DS_EOL, __func__);
2346 		return (EINVAL);
2347 	}
2348 
2349 	/* sanity check the capability specifier */
2350 	if ((cap->svc_id == NULL) || (cap->vers == NULL) || (cap->nvers == 0)) {
2351 		cmn_err(CE_NOTE, "%s: invalid capability specifier" DS_EOL,
2352 		    __func__);
2353 		return (EINVAL);
2354 	}
2355 
2356 	/* sanity check the version array */
2357 	if ((status = ds_vers_isvalid(cap->vers, cap->nvers)) != DS_VERS_OK) {
2358 		cmn_err(CE_NOTE, "%s: invalid capability version array "
2359 		    "for %s service: %s" DS_EOL, __func__, cap->svc_id,
2360 		    (status == DS_VERS_INCREASING_MAJOR_ERR) ?
2361 		    "increasing major versions" :
2362 		    "increasing minor versions");
2363 		return (EINVAL);
2364 	}
2365 
2366 	/* data and register callbacks are required */
2367 	if ((ops->ds_data_cb == NULL) || (ops->ds_reg_cb == NULL)) {
2368 		cmn_err(CE_NOTE, "%s: invalid ops specifier for %s service"
2369 		    DS_EOL, __func__, cap->svc_id);
2370 		return (EINVAL);
2371 	}
2372 
2373 	flags &= DSSF_USERFLAGS;
2374 	is_client = flags & DSSF_ISCLIENT;
2375 
2376 	DS_DBG_USR(CE_NOTE, "%s: svc_id='%s', data_cb=0x%lx, cb_arg=0x%lx"
2377 	    DS_EOL, __func__, cap->svc_id, PTR_TO_LONG(ops->ds_data_cb),
2378 	    PTR_TO_LONG(ops->cb_arg));
2379 
2380 	mutex_enter(&ds_svcs.lock);
2381 
2382 	/* check if the service is already registered */
2383 	if (i_ds_hdl_lookup(cap->svc_id, is_client, NULL, 1) == 1) {
2384 		/* already registered */
2385 		cmn_err(CE_NOTE, "Service '%s'/%s already registered" DS_EOL,
2386 		    cap->svc_id,
2387 		    (flags & DSSF_ISCLIENT) ? "client" : "service");
2388 		mutex_exit(&ds_svcs.lock);
2389 		return (EALREADY);
2390 	}
2391 
2392 	svc = ds_alloc_svc();
2393 	if (is_client) {
2394 		DS_HDL_SET_ISCLIENT(svc->hdl);
2395 	}
2396 
2397 	svc->state = DS_SVC_FREE;
2398 	svc->svc_hdl = DS_BADHDL1;
2399 
2400 	svc->flags = flags;
2401 	svc->drvi = instance;
2402 	svc->drv_psp = NULL;
2403 
2404 	/*
2405 	 * Check for loopback.
2406 	 */
2407 	if (i_ds_hdl_lookup(cap->svc_id, is_client == 0, &lb_hdl, 1) == 1) {
2408 		if ((rv = ds_loopback_set_svc(svc, lb_hdl)) != 0) {
2409 			cmn_err(CE_WARN, "%s: ds_loopback_set_svc err (%d)"
2410 			    DS_EOL, __func__, rv);
2411 			mutex_exit(&ds_svcs.lock);
2412 			return (rv);
2413 		}
2414 		is_loopback = 1;
2415 	} else
2416 		is_loopback = 0;
2417 
2418 	/* copy over all the client information */
2419 	(void) memcpy(&svc->cap, cap, sizeof (ds_capability_t));
2420 
2421 	/* make a copy of the service name */
2422 	svc->cap.svc_id = ds_strdup(cap->svc_id);
2423 
2424 	/* make a copy of the version array */
2425 	svc->cap.vers = DS_MALLOC(cap->nvers * sizeof (ds_ver_t));
2426 	(void) memcpy(svc->cap.vers, cap->vers, cap->nvers * sizeof (ds_ver_t));
2427 
2428 	/* copy the client ops vector */
2429 	(void) memcpy(&svc->ops, ops, sizeof (ds_clnt_ops_t));
2430 
2431 	svc->state = DS_SVC_INACTIVE;
2432 	svc->ver_idx = 0;
2433 	DS_PORTSET_DUP(svc->avail, ds_allports);
2434 	DS_PORTSET_SETNULL(svc->tried);
2435 
2436 	ds_svcs.nsvcs++;
2437 
2438 	hdl = svc->hdl;
2439 
2440 	/*
2441 	 * kludge to allow user callback code to get handle and user args.
2442 	 * Make sure the callback arg points to the svc structure.
2443 	 */
2444 	if ((flags & DSSF_ISUSER) != 0) {
2445 		ds_cbarg_set_cookie(svc);
2446 	}
2447 
2448 	if (is_loopback) {
2449 		ds_loopback_register(hdl);
2450 		ds_loopback_register(lb_hdl);
2451 	}
2452 
2453 	/*
2454 	 * If this is a client or a non-loopback service provider, send
2455 	 * out register requests.
2456 	 */
2457 	if (!is_loopback || (flags & DSSF_ISCLIENT) != 0)
2458 		(void) ds_svc_register(svc, NULL);
2459 
2460 	if (hdlp) {
2461 		*hdlp = hdl;
2462 	}
2463 
2464 	mutex_exit(&ds_svcs.lock);
2465 
2466 	DS_DBG_USR(CE_NOTE, "%s: service '%s' assigned handle 0x%09lx" DS_EOL,
2467 	    __func__, svc->cap.svc_id, hdl);
2468 
2469 	return (0);
2470 }
2471 
2472 /*
2473  * ds_cap_init interface for previous revision.
2474  */
2475 int
2476 ds_cap_init(ds_capability_t *cap, ds_clnt_ops_t *ops)
2477 {
2478 	return (ds_ucap_init(cap, ops, 0, DS_INVALID_INSTANCE, NULL));
2479 }
2480 
2481 /*
2482  * Interface for ds_unreg_hdl in lds driver.
2483  */
2484 int
2485 ds_unreg_hdl(ds_svc_hdl_t hdl)
2486 {
2487 	ds_svc_t	*svc;
2488 	int		is_loopback;
2489 	ds_svc_hdl_t	lb_hdl;
2490 
2491 	DS_DBG_USR(CE_NOTE, "%s: hdl=0x%09lx" DS_EOL, __func__, hdl);
2492 
2493 	mutex_enter(&ds_svcs.lock);
2494 	if ((svc = ds_get_svc(hdl)) == NULL) {
2495 		mutex_exit(&ds_svcs.lock);
2496 		DS_DBG_USR(CE_NOTE, "%s: unknown hdl: 0x%llx" DS_EOL, __func__,
2497 		    (u_longlong_t)hdl);
2498 		return (ENXIO);
2499 	}
2500 
2501 	DS_DBG_USR(CE_NOTE, "%s: svcid='%s', hdl=0x%llx" DS_EOL, __func__,
2502 	    svc->cap.svc_id, (u_longlong_t)svc->hdl);
2503 
2504 	svc->state = DS_SVC_UNREG_PENDING;
2505 
2506 	is_loopback = ((svc->flags & DSSF_LOOPBACK) != 0);
2507 	lb_hdl = svc->svc_hdl;
2508 
2509 	if (svc->port) {
2510 		(void) ds_send_unreg_req(svc);
2511 	}
2512 
2513 	(void) ds_svc_unregister(svc, svc->port);
2514 
2515 	ds_delete_svc_entry(svc);
2516 
2517 	if (is_loopback) {
2518 		ds_loopback_unregister(lb_hdl);
2519 	}
2520 
2521 	mutex_exit(&ds_svcs.lock);
2522 
2523 	return (0);
2524 }
2525 
2526 int
2527 ds_cap_fini(ds_capability_t *cap)
2528 {
2529 	ds_svc_hdl_t	hdl;
2530 	int rv;
2531 	uint_t nhdls = 0;
2532 
2533 	DS_DBG(CE_NOTE, "%s: '%s'" DS_EOL, __func__, cap->svc_id);
2534 	if ((rv = ds_hdl_lookup(cap->svc_id, 0, &hdl, 1, &nhdls)) != 0) {
2535 		DS_DBG(CE_NOTE, "%s: ds_hdl_lookup '%s' err (%d)" DS_EOL,
2536 		    __func__, cap->svc_id, rv);
2537 		return (rv);
2538 	}
2539 
2540 	if (nhdls == 0) {
2541 		DS_DBG(CE_NOTE, "%s: no such service '%s'" DS_EOL,
2542 		    __func__, cap->svc_id);
2543 		return (ENXIO);
2544 	}
2545 
2546 	if ((rv = ds_is_my_hdl(hdl, DS_INVALID_INSTANCE)) != 0) {
2547 		DS_DBG(CE_NOTE, "%s: ds_is_my_handle err (%d)" DS_EOL, __func__,
2548 		    rv);
2549 		return (rv);
2550 	}
2551 
2552 	if ((rv = ds_unreg_hdl(hdl)) != 0) {
2553 		DS_DBG(CE_NOTE, "%s: ds_unreg_hdl err (%d)" DS_EOL, __func__,
2554 		    rv);
2555 		return (rv);
2556 	}
2557 
2558 	return (0);
2559 }
2560 
2561 int
2562 ds_cap_send(ds_svc_hdl_t hdl, void *buf, size_t len)
2563 {
2564 	int		rv;
2565 	ds_hdr_t	*hdr;
2566 	caddr_t		msg;
2567 	size_t		msglen;
2568 	size_t		hdrlen;
2569 	caddr_t		payload;
2570 	ds_svc_t	*svc;
2571 	ds_port_t	*port;
2572 	ds_data_handle_t *data;
2573 	ds_svc_hdl_t	svc_hdl;
2574 	int		is_client = 0;
2575 
2576 	DS_DBG(CE_NOTE, "%s: hdl: 0x%llx, buf: %lx, len: %ld" DS_EOL, __func__,
2577 	    (u_longlong_t)hdl, (ulong_t)buf, len);
2578 
2579 	mutex_enter(&ds_svcs.lock);
2580 
2581 	if ((svc = ds_get_svc(hdl)) == NULL) {
2582 		cmn_err(CE_WARN, "%s: invalid handle 0x%llx" DS_EOL, __func__,
2583 		    (u_longlong_t)hdl);
2584 		mutex_exit(&ds_svcs.lock);
2585 		return (ENXIO);
2586 	}
2587 
2588 	if (svc->state != DS_SVC_ACTIVE) {
2589 		/* channel is up, but svc is not registered */
2590 		DS_DBG(CE_NOTE, "%s: invalid service state 0x%x" DS_EOL,
2591 		    __func__, svc->state);
2592 		mutex_exit(&ds_svcs.lock);
2593 		return (ENOTCONN);
2594 	}
2595 
2596 	if (svc->flags & DSSF_LOOPBACK) {
2597 		hdl = svc->svc_hdl;
2598 		mutex_exit(&ds_svcs.lock);
2599 		ds_loopback_send(hdl, buf, len);
2600 		return (0);
2601 	}
2602 
2603 	if ((port = svc->port) == NULL) {
2604 		DS_DBG(CE_NOTE, "%s: service '%s' not associated with a port"
2605 		    DS_EOL, __func__, svc->cap.svc_id);
2606 		mutex_exit(&ds_svcs.lock);
2607 		return (ECONNRESET);
2608 	}
2609 
2610 	if (svc->flags & DSSF_ISCLIENT) {
2611 		is_client = 1;
2612 		svc_hdl = svc->svc_hdl;
2613 	}
2614 
2615 	mutex_exit(&ds_svcs.lock);
2616 
2617 	/* check that the LDC channel is ready */
2618 	if (port->ldc.state != LDC_UP) {
2619 		DS_DBG(CE_NOTE, "%s: LDC channel is not up" DS_EOL, __func__);
2620 		return (ECONNRESET);
2621 	}
2622 
2623 	hdrlen = DS_HDR_SZ + sizeof (ds_data_handle_t);
2624 
2625 	msg = DS_MALLOC(len + hdrlen);
2626 	hdr = (ds_hdr_t *)msg;
2627 	payload = msg + hdrlen;
2628 	msglen = len + hdrlen;
2629 
2630 	hdr->payload_len = len + sizeof (ds_data_handle_t);
2631 	hdr->msg_type = DS_DATA;
2632 
2633 	data = (ds_data_handle_t *)(msg + DS_HDR_SZ);
2634 	if (is_client) {
2635 		data->svc_handle = svc_hdl;
2636 	} else {
2637 		data->svc_handle = hdl;
2638 	}
2639 
2640 	if ((buf != NULL) && (len != 0)) {
2641 		(void) memcpy(payload, buf, len);
2642 	}
2643 
2644 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data>: hdl=0x%llx, len=%ld, "
2645 	    " payload_len=%d" DS_EOL, PORTID(port), (u_longlong_t)svc->hdl,
2646 	    msglen, hdr->payload_len);
2647 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msglen);
2648 
2649 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
2650 		rv = (rv == EIO) ? ECONNRESET : rv;
2651 	}
2652 	DS_FREE(msg, msglen);
2653 
2654 	return (rv);
2655 }
2656 
2657 void
2658 ds_port_common_init(ds_port_t *port)
2659 {
2660 	int rv;
2661 
2662 	if ((port->flags & DS_PORT_MUTEX_INITED) == 0) {
2663 		mutex_init(&port->lock, NULL, MUTEX_DRIVER, NULL);
2664 		mutex_init(&port->tx_lock, NULL, MUTEX_DRIVER, NULL);
2665 		mutex_init(&port->rcv_lock, NULL, MUTEX_DRIVER, NULL);
2666 		port->flags |= DS_PORT_MUTEX_INITED;
2667 	}
2668 
2669 	port->state = DS_PORT_INIT;
2670 	DS_PORTSET_ADD(ds_allports, port->id);
2671 
2672 	ds_sys_port_init(port);
2673 
2674 	mutex_enter(&port->lock);
2675 	rv = ds_ldc_init(port);
2676 	mutex_exit(&port->lock);
2677 
2678 	/*
2679 	 * If LDC successfully init'ed, try to kick off protocol for this port.
2680 	 */
2681 	if (rv == 0) {
2682 		ds_handle_up_event(port);
2683 	}
2684 }
2685 
2686 void
2687 ds_port_common_fini(ds_port_t *port, int is_fini)
2688 {
2689 	port->state = DS_PORT_FREE;
2690 
2691 	if (is_fini && (port->flags & DS_PORT_MUTEX_INITED) != 0) {
2692 		mutex_destroy(&port->lock);
2693 		mutex_destroy(&port->tx_lock);
2694 		mutex_destroy(&port->rcv_lock);
2695 		port->flags &= ~DS_PORT_MUTEX_INITED;
2696 	}
2697 
2698 	DS_PORTSET_DEL(ds_allports, port->id);
2699 
2700 	ds_sys_port_fini(port);
2701 }
2702 
2703 /*
2704  * Initialize table of registered service classes
2705  */
2706 void
2707 ds_init_svcs_tbl(uint_t nentries)
2708 {
2709 	int	tblsz;
2710 
2711 	ds_svcs.maxsvcs = nentries;
2712 
2713 	tblsz = ds_svcs.maxsvcs * sizeof (ds_svc_t *);
2714 	ds_svcs.tbl = (ds_svc_t **)DS_MALLOC(tblsz);
2715 
2716 	ds_svcs.nsvcs = 0;
2717 }
2718 
2719 /*
2720  * Find the max and min version supported.
2721  * Hacked from zeus workspace, support.c
2722  */
2723 static void
2724 min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
2725     uint16_t *min_major, uint16_t *max_major)
2726 {
2727 	int i;
2728 
2729 	*min_major = sup_versionsp[0].major;
2730 	*max_major = *min_major;
2731 
2732 	for (i = 1; i < num_versions; i++) {
2733 		if (sup_versionsp[i].major < *min_major)
2734 			*min_major = sup_versionsp[i].major;
2735 
2736 		if (sup_versionsp[i].major > *max_major)
2737 			*max_major = sup_versionsp[i].major;
2738 	}
2739 }
2740 
2741 /*
2742  * Check whether the major and minor numbers requested by the peer can be
2743  * satisfied. If the requested major is supported, true is returned, and the
2744  * agreed minor is returned in new_minor. If the requested major is not
2745  * supported, the routine returns false, and the closest major is returned in
2746  * *new_major, upon which the peer should re-negotiate. The closest major is
2747  * the just lower that the requested major number.
2748  *
2749  * Hacked from zeus workspace, support.c
2750  */
2751 boolean_t
2752 negotiate_version(int num_versions, ds_ver_t *sup_versionsp,
2753     uint16_t req_major, uint16_t *new_majorp, uint16_t *new_minorp)
2754 {
2755 	int i;
2756 	uint16_t major, lower_major;
2757 	uint16_t min_major = 0, max_major;
2758 	boolean_t found_match = B_FALSE;
2759 
2760 	min_max_versions(num_versions, sup_versionsp, &min_major, &max_major);
2761 
2762 	DS_DBG(CE_NOTE, "negotiate_version: req_major = %u, min = %u, max = %u"
2763 	    DS_EOL, req_major, min_major, max_major);
2764 
2765 	/*
2766 	 * If the minimum version supported is greater than
2767 	 * the version requested, return the lowest version
2768 	 * supported
2769 	 */
2770 	if (min_major > req_major) {
2771 		*new_majorp = min_major;
2772 		return (B_FALSE);
2773 	}
2774 
2775 	/*
2776 	 * If the largest version supported is lower than
2777 	 * the version requested, return the largest version
2778 	 * supported
2779 	 */
2780 	if (max_major < req_major) {
2781 		*new_majorp = max_major;
2782 		return (B_FALSE);
2783 	}
2784 
2785 	/*
2786 	 * Now we know that the requested version lies between the
2787 	 * min and max versions supported. Check if the requested
2788 	 * major can be found in supported versions.
2789 	 */
2790 	lower_major = min_major;
2791 	for (i = 0; i < num_versions; i++) {
2792 		major = sup_versionsp[i].major;
2793 		if (major == req_major) {
2794 			found_match = B_TRUE;
2795 			*new_majorp = req_major;
2796 			*new_minorp = sup_versionsp[i].minor;
2797 			break;
2798 		} else {
2799 			if ((major < req_major) && (major > lower_major))
2800 				lower_major = major;
2801 		}
2802 	}
2803 
2804 	/*
2805 	 * If no match is found, return the closest available number
2806 	 */
2807 	if (!found_match)
2808 		*new_majorp = lower_major;
2809 
2810 	return (found_match);
2811 }
2812 
2813 /*
2814  * Specific errno's that are used by ds.c and ldc.c
2815  */
2816 static struct {
2817 	int ds_errno;
2818 	char *estr;
2819 } ds_errno_to_str_tab[] = {
2820 	{ EIO,		"I/O error" },
2821 	{ ENXIO,	"No such device or address" },
2822 	{ EAGAIN,	"Resource temporarily unavailable" },
2823 	{ ENOMEM,	"Not enough space" },
2824 	{ EACCES,	"Permission denied" },
2825 	{ EFAULT,	"Bad address" },
2826 	{ EBUSY,	"Device busy" },
2827 	{ EINVAL,	"Invalid argument" },
2828 	{ ENOSPC,	"No space left on device" },
2829 	{ ENOMSG,	"No message of desired type" },
2830 #ifdef	ECHRNG
2831 	{ ECHRNG,	"Channel number out of range" },
2832 #endif
2833 	{ ENOTSUP,	"Operation not supported" },
2834 	{ EMSGSIZE,	"Message too long" },
2835 	{ EADDRINUSE,	"Address already in use" },
2836 	{ ECONNRESET,	"Connection reset by peer" },
2837 	{ ENOBUFS,	"No buffer space available" },
2838 	{ ENOTCONN,	"Socket is not connected" },
2839 	{ ECONNREFUSED,	"Connection refused" },
2840 	{ EALREADY,	"Operation already in progress" },
2841 	{ 0,		NULL },
2842 };
2843 
2844 char *
2845 ds_errno_to_str(int ds_errno, char *ebuf)
2846 {
2847 	int i, en;
2848 
2849 	for (i = 0; (en = ds_errno_to_str_tab[i].ds_errno) != 0; i++) {
2850 		if (en == ds_errno) {
2851 			(void) strcpy(ebuf, ds_errno_to_str_tab[i].estr);
2852 			return (ebuf);
2853 		}
2854 	}
2855 
2856 	(void) sprintf(ebuf, "ds_errno (%d)", ds_errno);
2857 	return (ebuf);
2858 }
2859 
2860 static void
2861 ds_loopback_register(ds_svc_hdl_t hdl)
2862 {
2863 	ds_ver_t ds_ver = { 1, 0};
2864 	ds_svc_t *svc;
2865 
2866 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2867 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2868 	    (u_longlong_t)hdl);
2869 	if ((svc = ds_get_svc(hdl)) == NULL) {
2870 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2871 		    (u_longlong_t)hdl);
2872 		return;
2873 	}
2874 	svc->state = DS_SVC_ACTIVE;
2875 
2876 	if (svc->ops.ds_reg_cb) {
2877 		DS_DBG_LOOP(CE_NOTE, "%s: loopback regcb: hdl: 0x%llx" DS_EOL,
2878 		    __func__, (u_longlong_t)hdl);
2879 		ds_ver.major = svc->ver.major;
2880 		ds_ver.minor = svc->ver.minor;
2881 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &ds_ver, hdl);
2882 	}
2883 }
2884 
2885 static void
2886 ds_loopback_unregister(ds_svc_hdl_t hdl)
2887 {
2888 	ds_svc_t *svc;
2889 
2890 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2891 	if ((svc = ds_get_svc(hdl)) == NULL) {
2892 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2893 		    (u_longlong_t)hdl);
2894 		return;
2895 	}
2896 
2897 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2898 	    (u_longlong_t)hdl);
2899 
2900 	svc->flags &= ~DSSF_LOOPBACK;
2901 	svc->svc_hdl = DS_BADHDL2;
2902 
2903 	if (svc->ops.ds_unreg_cb) {
2904 		DS_DBG_LOOP(CE_NOTE, "%s: loopback unregcb: hdl: 0x%llx" DS_EOL,
2905 		    __func__, (u_longlong_t)hdl);
2906 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
2907 	}
2908 }
2909 
2910 static void
2911 ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen)
2912 {
2913 	ds_svc_t *svc;
2914 
2915 	mutex_enter(&ds_svcs.lock);
2916 	if ((svc = ds_get_svc(hdl)) == NULL) {
2917 		mutex_exit(&ds_svcs.lock);
2918 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2919 		    (u_longlong_t)hdl);
2920 		return;
2921 	}
2922 	mutex_exit(&ds_svcs.lock);
2923 
2924 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2925 	    (u_longlong_t)hdl);
2926 
2927 	if (svc->ops.ds_data_cb) {
2928 		DS_DBG_LOOP(CE_NOTE, "%s: loopback datacb hdl: 0x%llx" DS_EOL,
2929 		    __func__, (u_longlong_t)hdl);
2930 		(*svc->ops.ds_data_cb)(svc->ops.cb_arg, buf, buflen);
2931 	}
2932 }
2933 
2934 static int
2935 ds_loopback_set_svc(ds_svc_t *svc, ds_svc_hdl_t lb_hdl)
2936 {
2937 	ds_svc_t *lb_svc;
2938 
2939 	if ((lb_svc = ds_get_svc(lb_hdl)) == NULL) {
2940 		DS_DBG_LOOP(CE_NOTE, "%s: loopback: hdl: 0x%llx invalid" DS_EOL,
2941 		    __func__, (u_longlong_t)lb_hdl);
2942 		return (ENXIO);
2943 	}
2944 	if (lb_svc->state != DS_SVC_INACTIVE) {
2945 		DS_DBG_LOOP(CE_NOTE, "%s: loopback inactive: hdl: 0x%llx"
2946 		    DS_EOL, __func__, (u_longlong_t)lb_hdl);
2947 		if ((lb_svc->flags & DSSF_ISCLIENT) == 0) {
2948 			DS_DBG_LOOP(CE_NOTE, "%s: loopback busy hdl: 0x%llx"
2949 			    DS_EOL, __func__, (u_longlong_t)lb_hdl);
2950 			return (EBUSY);
2951 		}
2952 		lb_svc = ds_svc_clone(lb_svc);
2953 		DS_DBG_LOOP(CE_NOTE, "%s: loopback clone: ohdl: 0x%llx "
2954 		    "nhdl: 0x%llx" DS_EOL, __func__, (u_longlong_t)lb_hdl,
2955 		    (u_longlong_t)lb_svc->hdl);
2956 	}
2957 
2958 	svc->flags |= DSSF_LOOPBACK;
2959 	svc->svc_hdl = lb_svc->hdl;
2960 	svc->port = NULL;
2961 
2962 	lb_svc->flags |= DSSF_LOOPBACK;
2963 	lb_svc->svc_hdl = svc->hdl;
2964 	lb_svc->port = NULL;
2965 
2966 	DS_DBG_LOOP(CE_NOTE, "%s: setting loopback between: 0x%llx and 0x%llx"
2967 	    DS_EOL, __func__, (u_longlong_t)svc->hdl,
2968 	    (u_longlong_t)lb_svc->hdl);
2969 	return (0);
2970 }
2971 
2972 static ds_svc_t *
2973 ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl, ds_port_t *port)
2974 {
2975 	int		idx;
2976 	ds_svc_t	*svc;
2977 
2978 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s looking up clnt hdl: 0x%llx" DS_EOL,
2979 	    PORTID(port), __func__, (u_longlong_t)hdl);
2980 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2981 
2982 	/* walk every table entry */
2983 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
2984 		svc = ds_svcs.tbl[idx];
2985 		if (DS_SVC_ISFREE(svc))
2986 			continue;
2987 		if ((svc->flags & DSSF_ISCLIENT) != 0 &&
2988 		    svc->svc_hdl == hdl && svc->port == port) {
2989 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s found clnt hdl "
2990 			    "0x%llx: svc%d" DS_EOL, PORTID(port), __func__,
2991 			    (u_longlong_t)hdl, (uint_t)DS_HDL2IDX(svc->hdl));
2992 			return (svc);
2993 		}
2994 	}
2995 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s clnt hdl: 0x%llx not found" DS_EOL,
2996 	    PORTID(port), __func__, (u_longlong_t)hdl);
2997 
2998 	return (NULL);
2999 }
3000 
3001 static ds_svc_t *
3002 ds_svc_clone(ds_svc_t *svc)
3003 {
3004 	ds_svc_t *newsvc;
3005 	ds_svc_hdl_t hdl;
3006 
3007 	ASSERT(svc->flags & DSSF_ISCLIENT);
3008 
3009 	newsvc = ds_alloc_svc();
3010 
3011 	/* Can only clone clients for now */
3012 	hdl = newsvc->hdl | DS_HDL_ISCLIENT_BIT;
3013 	DS_DBG_USR(CE_NOTE, "%s: cloning client: old hdl: 0x%llx new hdl: "
3014 	    "0x%llx" DS_EOL, __func__, (u_longlong_t)svc->hdl,
3015 	    (u_longlong_t)hdl);
3016 	(void) memcpy(newsvc, svc, sizeof (ds_svc_t));
3017 	newsvc->hdl = hdl;
3018 	newsvc->flags &= ~DSSF_LOOPBACK;
3019 	newsvc->port = NULL;
3020 	newsvc->svc_hdl = DS_BADHDL2;
3021 	newsvc->cap.svc_id = ds_strdup(svc->cap.svc_id);
3022 	newsvc->cap.vers = DS_MALLOC(svc->cap.nvers * sizeof (ds_ver_t));
3023 	(void) memcpy(newsvc->cap.vers, svc->cap.vers,
3024 	    svc->cap.nvers * sizeof (ds_ver_t));
3025 
3026 	/*
3027 	 * Kludge to allow lds driver user callbacks to get access to current
3028 	 * svc structure.  Arg could be index to svc table or some other piece
3029 	 * of info to get to the svc table entry.
3030 	 */
3031 	if (newsvc->flags & DSSF_ISUSER) {
3032 		newsvc->ops.cb_arg = (ds_cb_arg_t)(newsvc);
3033 	}
3034 	return (newsvc);
3035 }
3036 
3037 /*
3038  * Internal handle lookup function.
3039  */
3040 static int
3041 i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3042     uint_t maxhdls)
3043 {
3044 	int idx;
3045 	int nhdls = 0;
3046 	ds_svc_t *svc;
3047 	uint32_t client_flag = is_client ? DSSF_ISCLIENT : 0;
3048 
3049 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3050 
3051 	for (idx = 0; idx < ds_svcs.maxsvcs && nhdls < maxhdls; idx++) {
3052 		svc = ds_svcs.tbl[idx];
3053 		if (DS_SVC_ISFREE(svc))
3054 			continue;
3055 		if (strcmp(svc->cap.svc_id, service) == 0 &&
3056 		    (svc->flags & DSSF_ISCLIENT) == client_flag) {
3057 			if (hdlp != NULL && nhdls < maxhdls) {
3058 				hdlp[nhdls] = svc->hdl;
3059 				nhdls++;
3060 			} else {
3061 				nhdls++;
3062 			}
3063 		}
3064 	}
3065 	return (nhdls);
3066 }
3067 
3068 /*
3069  * Interface for ds_hdl_lookup in lds driver.
3070  */
3071 int
3072 ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3073     uint_t maxhdls, uint_t *nhdlsp)
3074 {
3075 	mutex_enter(&ds_svcs.lock);
3076 	*nhdlsp = i_ds_hdl_lookup(service, is_client, hdlp, maxhdls);
3077 	mutex_exit(&ds_svcs.lock);
3078 	return (0);
3079 }
3080 
3081 static void
3082 ds_portset_del_active_clients(char *service, ds_portset_t *portsp)
3083 {
3084 	ds_portset_t ports = *portsp;
3085 	int idx;
3086 	ds_svc_t *svc;
3087 
3088 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3089 
3090 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
3091 		svc = ds_svcs.tbl[idx];
3092 		if (DS_SVC_ISFREE(svc))
3093 			continue;
3094 		if (strcmp(svc->cap.svc_id, service) == 0 &&
3095 		    (svc->flags & DSSF_ISCLIENT) != 0 &&
3096 		    svc->state != DS_SVC_INACTIVE &&
3097 		    svc->port != NULL) {
3098 			DS_PORTSET_DEL(ports, PORTID(svc->port));
3099 		}
3100 	}
3101 	*portsp = ports;
3102 }
3103 
3104 /*
3105  * After an UNREG REQ, check if this is a client service with multiple
3106  * handles.  If it is, then we can eliminate this entry.
3107  */
3108 static void
3109 ds_check_for_dup_services(ds_svc_t *svc)
3110 {
3111 	if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3112 	    svc->state == DS_SVC_INACTIVE &&
3113 	    i_ds_hdl_lookup(svc->cap.svc_id, 1, NULL, 2) == 2) {
3114 		ds_delete_svc_entry(svc);
3115 	}
3116 }
3117 
3118 static void
3119 ds_delete_svc_entry(ds_svc_t *svc)
3120 {
3121 	ds_svc_hdl_t tmp_hdl;
3122 
3123 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3124 
3125 	/*
3126 	 * Clear out the structure, but do not deallocate the
3127 	 * memory. It can be reused for the next registration.
3128 	 */
3129 	DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
3130 	DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
3131 
3132 	/* save the handle to prevent reuse */
3133 	tmp_hdl = svc->hdl;
3134 	bzero((void *)svc, sizeof (ds_svc_t));
3135 
3136 	/* initialize for next use */
3137 	svc->hdl = tmp_hdl;
3138 	svc->state = DS_SVC_FREE;
3139 
3140 	ds_svcs.nsvcs--;
3141 }
3142