xref: /titanic_52/usr/src/uts/sun4v/io/ds_common.c (revision b695575577bae0337af339d76949713bfe1c9013)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * Domain Services Module Common Code.
29  *
30  * This module is intended to be used by both Solaris and the VBSC
31  * module.
32  */
33 
34 #include <sys/modctl.h>
35 #include <sys/ksynch.h>
36 #include <sys/taskq.h>
37 #include <sys/disp.h>
38 #include <sys/cmn_err.h>
39 #include <sys/note.h>
40 #include <sys/mach_descrip.h>
41 #include <sys/mdesc.h>
42 #include <sys/ldc.h>
43 #include <sys/ds.h>
44 #include <sys/ds_impl.h>
45 
46 #ifndef MIN
47 #define	MIN(a, b)	((a) < (b) ? (a) : (b))
48 #endif
49 
50 #define	DS_DECODE_BUF_LEN		30
51 
52 /*
53  * All DS ports in the system
54  *
55  * The list of DS ports is read in from the MD when the DS module is
56  * initialized and is never modified. This eliminates the need for
57  * locking to access the port array itself. Access to the individual
58  * ports are synchronized at the port level.
59  */
60 ds_port_t	ds_ports[DS_MAX_PORTS];
61 ds_portset_t	ds_allports;	/* all DS ports in the system */
62 
63 /*
64  * Table of registered services
65  *
66  * Locking: Accesses to the table of services are synchronized using
67  *   a mutex lock. The reader lock must be held when looking up service
68  *   information in the table. The writer lock must be held when any
69  *   service information is being modified.
70  */
71 ds_svcs_t	ds_svcs;
72 
73 /*
74  * Flag to prevent callbacks while in the middle of DS teardown.
75  */
76 boolean_t ds_enabled = B_FALSE;	/* enable/disable taskq processing */
77 
78 /*
79  * Retry count and delay for LDC reads and writes
80  */
81 #ifndef DS_DEFAULT_RETRIES
82 #define	DS_DEFAULT_RETRIES	10000	/* number of times to retry */
83 #endif
84 #ifndef DS_DEFAULT_DELAY
85 #define	DS_DEFAULT_DELAY	1000	/* usecs to wait between retries */
86 #endif
87 
88 static int ds_retries = DS_DEFAULT_RETRIES;
89 static clock_t ds_delay = DS_DEFAULT_DELAY;
90 
91 /*
92  * Supported versions of the DS message protocol
93  *
94  * The version array must be sorted in order from the highest
95  * supported version to the lowest. Support for a particular
96  * <major>.<minor> version implies all lower minor versions of
97  * that same major version are supported as well.
98  */
99 static ds_ver_t ds_vers[] = { { 1, 0 } };
100 
101 #define	DS_NUM_VER	(sizeof (ds_vers) / sizeof (ds_vers[0]))
102 
103 
104 /* incoming message handling functions */
105 typedef void (*ds_msg_handler_t)(ds_port_t *port, caddr_t buf, size_t len);
106 static void ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len);
107 static void ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len);
108 static void ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len);
109 static void ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len);
110 static void ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len);
111 static void ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len);
112 static void ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len);
113 static void ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len);
114 static void ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len);
115 static void ds_handle_data(ds_port_t *port, caddr_t buf, size_t len);
116 static void ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len);
117 
118 /*
119  * DS Message Handler Dispatch Table
120  *
121  * A table used to dispatch all incoming messages. This table
122  * contains handlers for all the fixed message types, as well as
123  * the the messages defined in the 1.0 version of the DS protocol.
124  * The handlers are indexed based on the DS header msg_type values
125  */
126 static const ds_msg_handler_t ds_msg_handlers[] = {
127 	ds_handle_init_req,		/* DS_INIT_REQ */
128 	ds_handle_init_ack,		/* DS_INIT_ACK */
129 	ds_handle_init_nack,		/* DS_INIT_NACK */
130 	ds_handle_reg_req,		/* DS_REG_REQ */
131 	ds_handle_reg_ack,		/* DS_REG_ACK */
132 	ds_handle_reg_nack,		/* DS_REG_NACK */
133 	ds_handle_unreg_req,		/* DS_UNREG */
134 	ds_handle_unreg_ack,		/* DS_UNREG_ACK */
135 	ds_handle_unreg_nack,		/* DS_UNREG_NACK */
136 	ds_handle_data,			/* DS_DATA */
137 	ds_handle_nack			/* DS_NACK */
138 };
139 
140 
141 
142 /* initialization functions */
143 static int ds_ldc_init(ds_port_t *port);
144 
145 /* event processing functions */
146 static uint_t ds_ldc_cb(uint64_t event, caddr_t arg);
147 static int ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep);
148 static void ds_handle_up_event(ds_port_t *port);
149 static void ds_handle_down_reset_events(ds_port_t *port);
150 static void ds_handle_recv(void *arg);
151 static void ds_dispatch_event(void *arg);
152 
153 /* message sending functions */
154 static int ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen);
155 static int ds_send_reg_req(ds_svc_t *svc, ds_port_t *port);
156 static void ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
157 static void ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
158 
159 /* walker functions */
160 static int ds_svc_isfree(ds_svc_t *svc, void *arg);
161 static int ds_svc_unregister(ds_svc_t *svc, void *arg);
162 static int ds_svc_port_up(ds_svc_t *svc, void *arg);
163 
164 /* service utilities */
165 static void ds_reset_svc(ds_svc_t *svc, ds_port_t *port);
166 static int ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port);
167 
168 /* port utilities */
169 static void ds_port_reset(ds_port_t *port);
170 static ldc_status_t ds_update_ldc_state(ds_port_t *port);
171 
172 /* misc utilities */
173 static void min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
174     uint16_t *min_major, uint16_t *max_major);
175 
176 /* debug */
177 static char *decode_ldc_events(uint64_t event, char *buf);
178 
179 /* loopback */
180 static void ds_loopback_register(ds_svc_hdl_t hdl);
181 static void ds_loopback_unregister(ds_svc_hdl_t hdl);
182 static void ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen);
183 static int ds_loopback_set_svc(ds_svc_t *svc, ds_capability_t *cap,
184     ds_svc_hdl_t *lb_hdlp);
185 
186 /* client handling */
187 static int i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
188     uint_t maxhdls);
189 static ds_svc_t *ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl,
190     ds_port_t *port);
191 static ds_svc_t *ds_find_svc_by_id_port(char *svc_id, int is_client,
192     ds_port_t *port);
193 static ds_svc_t *ds_svc_clone(ds_svc_t *svc);
194 static void ds_portset_del_active_clients(char *service, ds_portset_t *portsp);
195 static void ds_check_for_dup_services(ds_svc_t *svc);
196 static void ds_delete_svc_entry(ds_svc_t *svc);
197 
198 char *
199 ds_strdup(char *str)
200 {
201 	char *newstr;
202 
203 	newstr = DS_MALLOC(strlen(str) + 1);
204 	(void) strcpy(newstr, str);
205 	return (newstr);
206 }
207 
208 void
209 ds_common_init(void)
210 {
211 	/* Validate version table */
212 	ASSERT(ds_vers_isvalid(ds_vers, DS_NUM_VER) == DS_VERS_OK);
213 
214 	/* Initialize services table */
215 	ds_init_svcs_tbl(DS_MAXSVCS_INIT);
216 
217 	/* enable callback processing */
218 	ds_enabled = B_TRUE;
219 }
220 
221 /* BEGIN LDC SUPPORT FUNCTIONS */
222 
223 static char *
224 decode_ldc_events(uint64_t event, char *buf)
225 {
226 	buf[0] = 0;
227 	if (event & LDC_EVT_DOWN)	(void) strcat(buf, " DOWN");
228 	if (event & LDC_EVT_RESET)	(void) strcat(buf, " RESET");
229 	if (event & LDC_EVT_UP)		(void) strcat(buf, " UP");
230 	if (event & LDC_EVT_READ)	(void) strcat(buf, " READ");
231 	if (event & LDC_EVT_WRITE)	(void) strcat(buf, " WRITE");
232 	return (buf);
233 }
234 
235 static ldc_status_t
236 ds_update_ldc_state(ds_port_t *port)
237 {
238 	ldc_status_t	ldc_state;
239 	int		rv;
240 	char		ebuf[DS_EBUFSIZE];
241 
242 	ASSERT(MUTEX_HELD(&port->lock));
243 
244 	/*
245 	 * Read status and update ldc state info in port structure.
246 	 */
247 	if ((rv = ldc_status(port->ldc.hdl, &ldc_state)) != 0) {
248 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_status error: %s" DS_EOL,
249 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
250 		ldc_state = port->ldc.state;
251 	} else {
252 		port->ldc.state = ldc_state;
253 	}
254 
255 	return (ldc_state);
256 }
257 
258 static void
259 ds_handle_down_reset_events(ds_port_t *port)
260 {
261 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
262 	    __func__);
263 
264 	mutex_enter(&ds_svcs.lock);
265 	mutex_enter(&port->lock);
266 
267 	ds_sys_drain_events(port);
268 
269 	(void) ds_update_ldc_state(port);
270 
271 	/* reset the port state */
272 	ds_port_reset(port);
273 
274 	/* acknowledge the reset */
275 	(void) ldc_up(port->ldc.hdl);
276 
277 	mutex_exit(&port->lock);
278 	mutex_exit(&ds_svcs.lock);
279 
280 	ds_handle_up_event(port);
281 
282 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
283 }
284 
285 static void
286 ds_handle_up_event(ds_port_t *port)
287 {
288 	ldc_status_t	ldc_state;
289 
290 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
291 	    __func__);
292 
293 	mutex_enter(&port->lock);
294 
295 	ldc_state = ds_update_ldc_state(port);
296 
297 	mutex_exit(&port->lock);
298 
299 	if ((ldc_state == LDC_UP) && IS_DS_PORT(port)) {
300 		/*
301 		 * Initiate the handshake.
302 		 */
303 		ds_send_init_req(port);
304 	}
305 
306 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
307 }
308 
309 static uint_t
310 ds_ldc_cb(uint64_t event, caddr_t arg)
311 {
312 	ds_port_t	*port = (ds_port_t *)arg;
313 	char		evstring[DS_DECODE_BUF_LEN];
314 
315 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: %s event (%llx) received" DS_EOL,
316 	    PORTID(port), __func__, decode_ldc_events(event, evstring),
317 	    (u_longlong_t)event);
318 
319 	if (!ds_enabled) {
320 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: callback handling is disabled"
321 		    DS_EOL, PORTID(port), __func__);
322 		return (LDC_SUCCESS);
323 	}
324 
325 	if (event & (LDC_EVT_DOWN | LDC_EVT_RESET)) {
326 		ds_handle_down_reset_events(port);
327 		goto done;
328 	}
329 
330 	if (event & LDC_EVT_UP) {
331 		ds_handle_up_event(port);
332 	}
333 
334 	if (event & LDC_EVT_READ) {
335 		if (port->ldc.state != LDC_UP) {
336 			cmn_err(CE_WARN, "ds@%lx: %s: LDC READ event while "
337 			    "port not up" DS_EOL, PORTID(port), __func__);
338 			goto done;
339 		}
340 
341 		if (ds_sys_dispatch_func(ds_handle_recv, port)) {
342 			cmn_err(CE_WARN, "ds@%lx: error initiating LDC READ "
343 			    " event", PORTID(port));
344 		}
345 	}
346 
347 	if (event & LDC_EVT_WRITE) {
348 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: LDC WRITE event received, "
349 		    "not supported" DS_EOL, PORTID(port), __func__);
350 	}
351 
352 	if (event & ~(LDC_EVT_UP | LDC_EVT_READ)) {
353 		cmn_err(CE_WARN, "ds@%lx: %s: Unexpected LDC event received: "
354 		    "0x%llx" DS_EOL, PORTID(port), __func__,
355 		    (u_longlong_t)event);
356 	}
357 done:
358 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
359 
360 	return (LDC_SUCCESS);
361 }
362 
363 static int
364 ds_ldc_init(ds_port_t *port)
365 {
366 	int		rv;
367 	ldc_attr_t	ldc_attr;
368 	caddr_t		ldc_cb_arg = (caddr_t)port;
369 	char		ebuf[DS_EBUFSIZE];
370 
371 	ASSERT(MUTEX_HELD(&port->lock));
372 
373 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%lld" DS_EOL,
374 	    PORTID(port), __func__, (u_longlong_t)port->ldc.id);
375 
376 	ldc_attr.devclass = LDC_DEV_GENERIC;
377 	ldc_attr.instance = 0;
378 	ldc_attr.mode = LDC_MODE_RELIABLE;
379 	ldc_attr.mtu = DS_STREAM_MTU;
380 
381 	if ((rv = ldc_init(port->ldc.id, &ldc_attr, &port->ldc.hdl)) != 0) {
382 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_id: %lx, ldc_init error: %s"
383 		    DS_EOL, PORTID(port), __func__, port->ldc.id,
384 		    ds_errno_to_str(rv, ebuf));
385 		return (rv);
386 	}
387 
388 	rv = ldc_reg_callback(port->ldc.hdl, ds_ldc_cb, ldc_cb_arg);
389 	if (rv != 0) {
390 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_reg_callback error: %s"
391 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
392 		return (rv);
393 	}
394 
395 	ds_sys_ldc_init(port);
396 	return (0);
397 }
398 
399 int
400 ds_ldc_fini(ds_port_t *port)
401 {
402 	int	rv;
403 	char	ebuf[DS_EBUFSIZE];
404 
405 	ASSERT(port->state >= DS_PORT_LDC_INIT);
406 
407 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%ld" DS_EOL, PORTID(port),
408 	    __func__, port->ldc.id);
409 
410 	if ((rv = ldc_close(port->ldc.hdl)) != 0) {
411 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_close error: %s" DS_EOL,
412 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
413 		return (rv);
414 	}
415 
416 	if ((rv = ldc_unreg_callback(port->ldc.hdl)) != 0) {
417 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_unreg_callback error: %s"
418 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
419 		return (rv);
420 	}
421 
422 	if ((rv = ldc_fini(port->ldc.hdl)) != 0) {
423 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_fini error: %s" DS_EOL,
424 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
425 		return (rv);
426 	}
427 
428 	return (rv);
429 }
430 
431 /*
432  * Attempt to read a specified number of bytes from a particular LDC.
433  * Returns zero for success or the return code from the LDC read on
434  * failure. The actual number of bytes read from the LDC is returned
435  * in the size parameter.
436  */
437 static int
438 ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep)
439 {
440 	int	rv = 0;
441 	size_t	bytes_req = *sizep;
442 	size_t	bytes_left = bytes_req;
443 	size_t	nbytes;
444 	int	retry_count = 0;
445 	char	ebuf[DS_EBUFSIZE];
446 
447 	ASSERT(MUTEX_HELD(&port->rcv_lock));
448 
449 	*sizep = 0;
450 
451 	DS_DBG_LDC(CE_NOTE, "ds@%lx: attempting to read %ld bytes" DS_EOL,
452 	    PORTID(port), bytes_req);
453 
454 	while (bytes_left > 0) {
455 
456 		nbytes = bytes_left;
457 
458 		if ((rv = ldc_read(port->ldc.hdl, msgp, &nbytes)) != 0) {
459 			if (rv == ECONNRESET) {
460 				break;
461 			} else if (rv != EAGAIN) {
462 				cmn_err(CE_NOTE, "ds@%lx: %s: %s" DS_EOL,
463 				    PORTID(port), __func__,
464 				    ds_errno_to_str(rv, ebuf));
465 				break;
466 			}
467 		} else {
468 			if (nbytes != 0) {
469 				DS_DBG_LDC(CE_NOTE, "ds@%lx: "
470 				    "read %ld bytes, %d retries" DS_EOL,
471 				    PORTID(port), nbytes, retry_count);
472 
473 				*sizep += nbytes;
474 				msgp += nbytes;
475 				bytes_left -= nbytes;
476 
477 				/* reset counter on a successful read */
478 				retry_count = 0;
479 				continue;
480 			}
481 
482 			/*
483 			 * No data was read. Check if this is the
484 			 * first attempt. If so, just return since
485 			 * nothing has been read yet.
486 			 */
487 			if (bytes_left == bytes_req) {
488 				DS_DBG_LDC(CE_NOTE, "ds@%lx: read zero bytes, "
489 				    " no data available" DS_EOL, PORTID(port));
490 				break;
491 			}
492 		}
493 
494 		/*
495 		 * A retry is necessary because the read returned
496 		 * EAGAIN, or a zero length read occurred after
497 		 * reading a partial message.
498 		 */
499 		if (retry_count++ >= ds_retries) {
500 			DS_DBG_LDC(CE_NOTE, "ds@%lx: timed out waiting for "
501 			    "message" DS_EOL, PORTID(port));
502 			break;
503 		}
504 
505 		drv_usecwait(ds_delay);
506 	}
507 
508 	return (rv);
509 }
510 
511 static void
512 ds_handle_recv(void *arg)
513 {
514 	ds_port_t	*port = (ds_port_t *)arg;
515 	char		*hbuf;
516 	size_t		msglen;
517 	size_t		read_size;
518 	boolean_t	hasdata;
519 	ds_hdr_t	hdr;
520 	uint8_t		*msg;
521 	char		*currp;
522 	int		rv;
523 	ds_event_t	*devent;
524 
525 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s..." DS_EOL, PORTID(port), __func__);
526 
527 	/*
528 	 * Read messages from the channel until there are none
529 	 * pending. Valid messages are dispatched to be handled
530 	 * by a separate thread while any malformed messages are
531 	 * dropped.
532 	 */
533 
534 	mutex_enter(&port->rcv_lock);
535 
536 	while (((rv = ldc_chkq(port->ldc.hdl, &hasdata)) == 0) && hasdata) {
537 
538 		DS_DBG(CE_NOTE, "ds@%lx: %s: reading next message" DS_EOL,
539 		    PORTID(port), __func__);
540 
541 		/*
542 		 * Read in the next message.
543 		 */
544 		hbuf = (char *)&hdr;
545 		bzero(hbuf, DS_HDR_SZ);
546 		read_size = DS_HDR_SZ;
547 		currp = hbuf;
548 
549 		/* read in the message header */
550 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
551 			break;
552 		}
553 
554 		if (read_size < DS_HDR_SZ) {
555 			/*
556 			 * A zero length read is a valid signal that
557 			 * there is no data left on the channel.
558 			 */
559 			if (read_size != 0) {
560 				cmn_err(CE_WARN, "ds@%lx: invalid message "
561 				    "length, received %ld bytes, expected %ld"
562 				    DS_EOL, PORTID(port), read_size, DS_HDR_SZ);
563 			}
564 			continue;
565 		}
566 
567 		/* get payload size and allocate a buffer */
568 		read_size = ((ds_hdr_t *)hbuf)->payload_len;
569 		msglen = DS_HDR_SZ + read_size;
570 		msg = DS_MALLOC(msglen);
571 		if (!msg) {
572 			cmn_err(CE_WARN, "Memory allocation failed attempting "
573 			    " to allocate %d bytes." DS_EOL, (int)msglen);
574 			continue;
575 		}
576 
577 		DS_DBG(CE_NOTE, "ds@%lx: %s: message payload len %d" DS_EOL,
578 		    PORTID(port), __func__, (int)read_size);
579 
580 		/* move message header into buffer */
581 		(void) memcpy(msg, hbuf, DS_HDR_SZ);
582 		currp = (char *)(msg) + DS_HDR_SZ;
583 
584 		/* read in the message body */
585 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
586 			DS_FREE(msg, msglen);
587 			break;
588 		}
589 
590 		/* validate the size of the message */
591 		if ((DS_HDR_SZ + read_size) != msglen) {
592 			cmn_err(CE_WARN, "ds@%lx: %s: invalid message length, "
593 			    "received %ld bytes, expected %ld" DS_EOL,
594 			    PORTID(port), __func__, (DS_HDR_SZ + read_size),
595 			    msglen);
596 			DS_FREE(msg, msglen);
597 			continue;
598 		}
599 
600 		DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
601 
602 		/*
603 		 * Send the message for processing, and store it
604 		 * in the log. The memory is deallocated only when
605 		 * the message is removed from the log.
606 		 */
607 
608 		devent = DS_MALLOC(sizeof (ds_event_t));
609 		devent->port = port;
610 		devent->buf = (char *)msg;
611 		devent->buflen = msglen;
612 
613 		/* log the message */
614 		(void) ds_log_add_msg(DS_LOG_IN(port->id), msg, msglen);
615 
616 		if (ds_sys_dispatch_func(ds_dispatch_event, devent)) {
617 			cmn_err(CE_WARN, "ds@%lx: error initiating "
618 			    "event handler", PORTID(port));
619 			DS_FREE(devent, sizeof (ds_event_t));
620 		}
621 	}
622 
623 	mutex_exit(&port->rcv_lock);
624 
625 	/* handle connection reset errors returned from ds_recv_msg */
626 	if (rv == ECONNRESET) {
627 		ds_handle_down_reset_events(port);
628 	}
629 
630 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s done" DS_EOL, PORTID(port), __func__);
631 }
632 
633 static void
634 ds_dispatch_event(void *arg)
635 {
636 	ds_event_t	*event = (ds_event_t *)arg;
637 	ds_hdr_t	*hdr;
638 	ds_port_t	*port;
639 
640 	port = event->port;
641 
642 	hdr = (ds_hdr_t *)event->buf;
643 
644 	if (DS_MSG_TYPE_VALID(hdr->msg_type)) {
645 		DS_DBG(CE_NOTE, "ds@%lx: dispatch_event: msg_type=%d" DS_EOL,
646 		    PORTID(port), hdr->msg_type);
647 
648 		(*ds_msg_handlers[hdr->msg_type])(port, event->buf,
649 		    event->buflen);
650 	} else {
651 		cmn_err(CE_WARN, "ds@%lx: dispatch_event: invalid msg "
652 		    "type (%d)" DS_EOL, PORTID(port), hdr->msg_type);
653 	}
654 
655 	DS_FREE(event->buf, event->buflen);
656 	DS_FREE(event, sizeof (ds_event_t));
657 }
658 
659 int
660 ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen)
661 {
662 	int	rv;
663 	caddr_t	currp = msg;
664 	size_t	amt_left = msglen;
665 	int	loopcnt = 0;
666 
667 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s msglen: %ld" DS_EOL, PORTID(port),
668 	    __func__, msglen);
669 	DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
670 
671 	/*
672 	 * Ensure that no other messages can be sent on this port by holding
673 	 * the tx_lock mutex in case the write doesn't get sent with one write.
674 	 * This guarantees that the message doesn't become fragmented.
675 	 */
676 	mutex_enter(&port->tx_lock);
677 
678 	do {
679 		if ((rv = ldc_write(port->ldc.hdl, currp, &msglen)) != 0) {
680 			if (rv == ECONNRESET) {
681 				mutex_exit(&port->tx_lock);
682 				ds_handle_down_reset_events(port);
683 				return (rv);
684 			} else if ((rv == EWOULDBLOCK) &&
685 			    (loopcnt++ < ds_retries)) {
686 				drv_usecwait(ds_delay);
687 			} else {
688 				cmn_err(CE_WARN, "ds@%lx: send_msg: ldc_write "
689 				    "failed (%d), %d bytes remaining" DS_EOL,
690 				    PORTID(port), rv, (int)amt_left);
691 				goto error;
692 			}
693 		} else {
694 			amt_left -= msglen;
695 			currp += msglen;
696 			msglen = amt_left;
697 			loopcnt = 0;
698 		}
699 	} while (amt_left > 0);
700 error:
701 	mutex_exit(&port->tx_lock);
702 
703 	return (rv);
704 }
705 
706 /* END LDC SUPPORT FUNCTIONS */
707 
708 
709 /* BEGIN DS PROTOCOL SUPPORT FUNCTIONS */
710 
711 static void
712 ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len)
713 {
714 	ds_hdr_t	*hdr;
715 	ds_init_ack_t	*ack;
716 	ds_init_nack_t	*nack;
717 	char		*msg;
718 	size_t		msglen;
719 	ds_init_req_t	*req;
720 	size_t		explen = DS_MSG_LEN(ds_init_req_t);
721 	uint16_t	new_major;
722 	uint16_t	new_minor;
723 	boolean_t	match;
724 
725 	/* sanity check the incoming message */
726 	if (len != explen) {
727 		cmn_err(CE_WARN, "ds@%lx: <init_req: invalid message "
728 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
729 		    explen);
730 		return;
731 	}
732 
733 	req = (ds_init_req_t *)(buf + DS_HDR_SZ);
734 
735 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_req: ver=%d.%d" DS_EOL,
736 	    PORTID(port), req->major_vers, req->minor_vers);
737 
738 	match = negotiate_version(DS_NUM_VER, &ds_vers[0],
739 	    req->major_vers, &new_major, &new_minor);
740 
741 	/*
742 	 * Check version info. ACK only if the major numbers exactly
743 	 * match. The service entity can retry with a new minor
744 	 * based on the response sent as part of the NACK.
745 	 */
746 	if (match) {
747 		msglen = DS_MSG_LEN(ds_init_ack_t);
748 		msg = DS_MALLOC(msglen);
749 
750 		hdr = (ds_hdr_t *)msg;
751 		hdr->msg_type = DS_INIT_ACK;
752 		hdr->payload_len = sizeof (ds_init_ack_t);
753 
754 		ack = (ds_init_ack_t *)(msg + DS_HDR_SZ);
755 		ack->minor_vers = MIN(new_minor, req->minor_vers);
756 
757 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_ack>: minor=0x%04X" DS_EOL,
758 		    PORTID(port), MIN(new_minor, req->minor_vers));
759 	} else {
760 		msglen = DS_MSG_LEN(ds_init_nack_t);
761 		msg = DS_MALLOC(msglen);
762 
763 		hdr = (ds_hdr_t *)msg;
764 		hdr->msg_type = DS_INIT_NACK;
765 		hdr->payload_len = sizeof (ds_init_nack_t);
766 
767 		nack = (ds_init_nack_t *)(msg + DS_HDR_SZ);
768 		nack->major_vers = new_major;
769 
770 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_nack>: major=0x%04X" DS_EOL,
771 		    PORTID(port), new_major);
772 	}
773 
774 	/*
775 	 * Send the response
776 	 */
777 	(void) ds_send_msg(port, msg, msglen);
778 	DS_FREE(msg, msglen);
779 }
780 
781 static void
782 ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len)
783 {
784 	ds_init_ack_t	*ack;
785 	ds_ver_t	*ver;
786 	size_t		explen = DS_MSG_LEN(ds_init_ack_t);
787 
788 	/* sanity check the incoming message */
789 	if (len != explen) {
790 		cmn_err(CE_WARN, "ds@%lx: <init_ack: invalid message "
791 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
792 		    explen);
793 		return;
794 	}
795 
796 	ack = (ds_init_ack_t *)(buf + DS_HDR_SZ);
797 
798 	mutex_enter(&port->lock);
799 
800 	if (port->state != DS_PORT_INIT_REQ) {
801 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: invalid state: %d"
802 		    DS_EOL, PORTID(port), port->state);
803 		mutex_exit(&port->lock);
804 		return;
805 	}
806 
807 	ver = &(ds_vers[port->ver_idx]);
808 
809 	/* agreed upon a major version */
810 	port->ver.major = ver->major;
811 
812 	/*
813 	 * If the returned minor version is larger than
814 	 * the requested minor version, use the lower of
815 	 * the two, i.e. the requested version.
816 	 */
817 	if (ack->minor_vers >= ver->minor) {
818 		/*
819 		 * Use the minor version specified in the
820 		 * original request.
821 		 */
822 		port->ver.minor = ver->minor;
823 	} else {
824 		/*
825 		 * Use the lower minor version returned in
826 		 * the ack. By definition, all lower minor
827 		 * versions must be supported.
828 		 */
829 		port->ver.minor = ack->minor_vers;
830 	}
831 
832 	port->state = DS_PORT_READY;
833 
834 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: port ready v%d.%d" DS_EOL,
835 	    PORTID(port), port->ver.major, port->ver.minor);
836 
837 	mutex_exit(&port->lock);
838 
839 	/*
840 	 * The port came up, so update all the services
841 	 * with this information. Follow that up with an
842 	 * attempt to register any service that is not
843 	 * already registered.
844 	 */
845 	mutex_enter(&ds_svcs.lock);
846 
847 	(void) ds_walk_svcs(ds_svc_port_up, port);
848 	(void) ds_walk_svcs(ds_svc_register, NULL);
849 
850 	mutex_exit(&ds_svcs.lock);
851 }
852 
853 static void
854 ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len)
855 {
856 	int		idx;
857 	ds_init_nack_t	*nack;
858 	ds_ver_t	*ver;
859 	size_t		explen = DS_MSG_LEN(ds_init_nack_t);
860 
861 	/* sanity check the incoming message */
862 	if (len != explen) {
863 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: invalid message "
864 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
865 		    explen);
866 		return;
867 	}
868 
869 	nack = (ds_init_nack_t *)(buf + DS_HDR_SZ);
870 
871 	mutex_enter(&port->lock);
872 
873 	if (port->state != DS_PORT_INIT_REQ) {
874 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: invalid state: %d"
875 		    DS_EOL, PORTID(port), port->state);
876 		mutex_exit(&port->lock);
877 		return;
878 	}
879 
880 	ver = &(ds_vers[port->ver_idx]);
881 
882 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: req=v%d.%d, nack=v%d.x"
883 	    DS_EOL, PORTID(port), ver->major, ver->minor, nack->major_vers);
884 
885 	if (nack->major_vers == 0) {
886 		/* no supported protocol version */
887 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS not supported"
888 		    DS_EOL, PORTID(port));
889 		mutex_exit(&port->lock);
890 		return;
891 	}
892 
893 	/*
894 	 * Walk the version list, looking for a major version
895 	 * that is as close to the requested major version as
896 	 * possible.
897 	 */
898 	for (idx = port->ver_idx; idx < DS_NUM_VER; idx++) {
899 		if (ds_vers[idx].major <= nack->major_vers) {
900 			/* found a version to try */
901 			goto done;
902 		}
903 	}
904 
905 	if (idx == DS_NUM_VER) {
906 		/* no supported version */
907 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS v%d.x not "
908 		    "supported" DS_EOL, PORTID(port), nack->major_vers);
909 
910 		mutex_exit(&port->lock);
911 		return;
912 	}
913 
914 done:
915 	/* start the handshake again */
916 	port->ver_idx = idx;
917 	port->state = DS_PORT_LDC_INIT;
918 	mutex_exit(&port->lock);
919 
920 	ds_send_init_req(port);
921 
922 }
923 
924 static ds_svc_t *
925 ds_find_svc_by_id_port(char *svc_id, int is_client, ds_port_t *port)
926 {
927 	int		idx;
928 	ds_svc_t	*svc, *found_svc = 0;
929 	uint32_t	flag_match = is_client ? DSSF_ISCLIENT : 0;
930 
931 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
932 
933 	/* walk every table entry */
934 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
935 		svc = ds_svcs.tbl[idx];
936 		if (DS_SVC_ISFREE(svc))
937 			continue;
938 		if (strcmp(svc->cap.svc_id, svc_id) != 0)
939 			continue;
940 		if ((svc->flags & DSSF_ISCLIENT) != flag_match)
941 			continue;
942 		if (port != NULL && svc->port == port) {
943 			return (svc);
944 		} else if (svc->state == DS_SVC_INACTIVE) {
945 			found_svc = svc;
946 		} else if (!found_svc) {
947 			found_svc = svc;
948 		}
949 	}
950 
951 	return (found_svc);
952 }
953 
954 static void
955 ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len)
956 {
957 	ds_reg_req_t	*req;
958 	ds_hdr_t	*hdr;
959 	ds_reg_ack_t	*ack;
960 	ds_reg_nack_t	*nack;
961 	char		*msg;
962 	size_t		msglen;
963 	size_t		explen = DS_MSG_LEN(ds_reg_req_t);
964 	ds_svc_t	*svc = NULL;
965 	ds_ver_t	version;
966 	uint16_t	new_major;
967 	uint16_t	new_minor;
968 	boolean_t	match;
969 
970 	/* sanity check the incoming message */
971 	if (len < explen) {
972 		cmn_err(CE_WARN, "ds@%lx: <reg_req: invalid message "
973 		    "length (%ld), expected at least %ld" DS_EOL,
974 		    PORTID(port), len, explen);
975 		return;
976 	}
977 
978 	req = (ds_reg_req_t *)(buf + DS_HDR_SZ);
979 
980 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' ver=%d.%d, hdl=0x%llx"
981 	    DS_EOL, PORTID(port), req->svc_id, req->major_vers, req->minor_vers,
982 	    (u_longlong_t)req->svc_handle);
983 
984 	mutex_enter(&ds_svcs.lock);
985 	svc = ds_find_svc_by_id_port(req->svc_id,
986 	    DS_HDL_ISCLIENT(req->svc_handle) == 0, port);
987 	if (svc == NULL) {
988 
989 do_reg_nack:
990 		mutex_exit(&ds_svcs.lock);
991 
992 		msglen = DS_MSG_LEN(ds_reg_nack_t);
993 		msg = DS_MALLOC(msglen);
994 
995 		hdr = (ds_hdr_t *)msg;
996 		hdr->msg_type = DS_REG_NACK;
997 		hdr->payload_len = sizeof (ds_reg_nack_t);
998 
999 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1000 		nack->svc_handle = req->svc_handle;
1001 		nack->result = DS_REG_VER_NACK;
1002 		nack->major_vers = 0;
1003 
1004 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s'" DS_EOL,
1005 		    PORTID(port), req->svc_id);
1006 		/*
1007 		 * Send the response
1008 		 */
1009 		(void) ds_send_msg(port, msg, msglen);
1010 		DS_FREE(msg, msglen);
1011 		return;
1012 	}
1013 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' found, hdl: 0x%llx" DS_EOL,
1014 	    PORTID(port), req->svc_id, (u_longlong_t)svc->hdl);
1015 
1016 	/*
1017 	 * A client sends out a reg req in order to force service providers to
1018 	 * initiate a reg req from their end (limitation in the protocol).  We
1019 	 * expect the service provider to be in the inactive (DS_SVC_INACTIVE)
1020 	 * state.  If the service provider has already sent out a reg req (the
1021 	 * state is DS_SVC_REG_PENDING) or has already handshaken (the
1022 	 * state is DS_SVC_ACTIVE), then we can simply ignore this reg
1023 	 * req.  For any other state, we force an unregister before initiating
1024 	 * a reg req.
1025 	 */
1026 
1027 	if (DS_HDL_ISCLIENT(req->svc_handle)) {
1028 		switch (svc->state) {
1029 
1030 		case DS_SVC_REG_PENDING:
1031 		case DS_SVC_ACTIVE:
1032 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1033 			    "client, state (%x)" DS_EOL, PORTID(port),
1034 			    req->svc_id, svc->state);
1035 			mutex_exit(&ds_svcs.lock);
1036 			return;
1037 
1038 		case DS_SVC_INACTIVE:
1039 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1040 			    "client" DS_EOL, PORTID(port), req->svc_id);
1041 			break;
1042 
1043 		default:
1044 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1045 			    "client forced unreg, state (%x)" DS_EOL,
1046 			    PORTID(port), req->svc_id, svc->state);
1047 			(void) ds_svc_unregister(svc, port);
1048 			break;
1049 		}
1050 		(void) ds_svc_port_up(svc, port);
1051 		(void) ds_svc_register_onport(svc, port);
1052 		mutex_exit(&ds_svcs.lock);
1053 		return;
1054 	}
1055 
1056 	/*
1057 	 * Only remote service providers can initiate a registration.  The
1058 	 * local sevice from here must be a client service.
1059 	 */
1060 
1061 	match = negotiate_version(svc->cap.nvers, svc->cap.vers,
1062 	    req->major_vers, &new_major, &new_minor);
1063 
1064 	/*
1065 	 * Check version info. ACK only if the major numbers exactly
1066 	 * match. The service entity can retry with a new minor
1067 	 * based on the response sent as part of the NACK.
1068 	 */
1069 	if (match) {
1070 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' svc%d: state: %x "
1071 		    "svc_portid: %d" DS_EOL, PORTID(port), req->svc_id,
1072 		    (int)DS_HDL2IDX(svc->hdl), svc->state,
1073 		    (int)(svc->port == NULL ? -1 : PORTID(svc->port)));
1074 		/*
1075 		 * If the current local service is already in use and
1076 		 * it's not on this port, clone it.
1077 		 */
1078 		if (svc->state != DS_SVC_INACTIVE) {
1079 			if (svc->port != NULL && port == svc->port) {
1080 				/*
1081 				 * Someone probably dropped an unreg req
1082 				 * somewhere.  Force a local unreg.
1083 				 */
1084 				(void) ds_svc_unregister(svc, port);
1085 			} else if (!DS_HDL_ISCLIENT(svc->hdl)) {
1086 				/*
1087 				 * Can't clone a non-client (service provider)
1088 				 * handle.  This is because old in-kernel
1089 				 * service providers can't deal with multiple
1090 				 * handles.
1091 				 */
1092 				goto do_reg_nack;
1093 			} else {
1094 				svc = ds_svc_clone(svc);
1095 			}
1096 		}
1097 		svc->port = port;
1098 		svc->svc_hdl = req->svc_handle;
1099 		svc->state = DS_SVC_ACTIVE;
1100 
1101 		msglen = DS_MSG_LEN(ds_reg_ack_t);
1102 		msg = DS_MALLOC(msglen);
1103 
1104 		hdr = (ds_hdr_t *)msg;
1105 		hdr->msg_type = DS_REG_ACK;
1106 		hdr->payload_len = sizeof (ds_reg_ack_t);
1107 
1108 		ack = (ds_reg_ack_t *)(msg + DS_HDR_SZ);
1109 		ack->svc_handle = req->svc_handle;
1110 		ack->minor_vers = MIN(new_minor, req->minor_vers);
1111 
1112 
1113 		if (svc->ops.ds_reg_cb) {
1114 			/* Call the registration callback */
1115 			version.major = req->major_vers;
1116 			version.minor = ack->minor_vers;
1117 			(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &version,
1118 			    svc->hdl);
1119 		}
1120 		mutex_exit(&ds_svcs.lock);
1121 
1122 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_ack>: '%s' minor=0x%04X"
1123 		    DS_EOL, PORTID(port), svc->cap.svc_id,
1124 		    MIN(new_minor, req->minor_vers));
1125 	} else {
1126 		mutex_exit(&ds_svcs.lock);
1127 
1128 		msglen = DS_MSG_LEN(ds_reg_nack_t);
1129 		msg = DS_MALLOC(msglen);
1130 
1131 		hdr = (ds_hdr_t *)msg;
1132 		hdr->msg_type = DS_REG_NACK;
1133 		hdr->payload_len = sizeof (ds_reg_nack_t);
1134 
1135 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1136 		nack->svc_handle = req->svc_handle;
1137 		nack->result = DS_REG_VER_NACK;
1138 		nack->major_vers = new_major;
1139 
1140 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s' major=0x%04X"
1141 		    DS_EOL, PORTID(port), svc->cap.svc_id, new_major);
1142 	}
1143 
1144 	/* send message */
1145 	(void) ds_send_msg(port, msg, msglen);
1146 	DS_FREE(msg, msglen);
1147 }
1148 
1149 static void
1150 ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len)
1151 {
1152 	ds_reg_ack_t	*ack;
1153 	ds_ver_t	*ver;
1154 	ds_ver_t	tmpver;
1155 	ds_svc_t	*svc;
1156 	size_t		explen = DS_MSG_LEN(ds_reg_ack_t);
1157 
1158 	/* sanity check the incoming message */
1159 	if (len != explen) {
1160 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid message "
1161 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1162 		    explen);
1163 		return;
1164 	}
1165 
1166 	ack = (ds_reg_ack_t *)(buf + DS_HDR_SZ);
1167 
1168 	mutex_enter(&ds_svcs.lock);
1169 
1170 	/*
1171 	 * This searches for service based on how we generate handles
1172 	 * and so only works because this is a reg ack.
1173 	 */
1174 	if (DS_HDL_ISCLIENT(ack->svc_handle) ||
1175 	    (svc = ds_get_svc(ack->svc_handle)) == NULL) {
1176 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid handle 0x%llx"
1177 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1178 		goto done;
1179 	}
1180 
1181 	/* make sure the message makes sense */
1182 	if (svc->state != DS_SVC_REG_PENDING) {
1183 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid state (%d)" DS_EOL,
1184 		    PORTID(port), svc->state);
1185 		goto done;
1186 	}
1187 
1188 	ver = &(svc->cap.vers[svc->ver_idx]);
1189 
1190 	/* major version has been agreed upon */
1191 	svc->ver.major = ver->major;
1192 
1193 	if (ack->minor_vers >= ver->minor) {
1194 		/*
1195 		 * Use the minor version specified in the
1196 		 * original request.
1197 		 */
1198 		svc->ver.minor = ver->minor;
1199 	} else {
1200 		/*
1201 		 * Use the lower minor version returned in
1202 		 * the ack. By defninition, all lower minor
1203 		 * versions must be supported.
1204 		 */
1205 		svc->ver.minor = ack->minor_vers;
1206 	}
1207 
1208 	svc->state = DS_SVC_ACTIVE;
1209 	svc->port = port;
1210 
1211 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_ack: '%s' v%d.%d ready, hdl=0x%llx"
1212 	    DS_EOL, PORTID(port), svc->cap.svc_id, svc->ver.major,
1213 	    svc->ver.minor, (u_longlong_t)svc->hdl);
1214 
1215 	/* notify the client that registration is complete */
1216 	if (svc->ops.ds_reg_cb) {
1217 		/*
1218 		 * Use a temporary version structure so that
1219 		 * the copy in the svc structure cannot be
1220 		 * modified by the client.
1221 		 */
1222 		tmpver.major = svc->ver.major;
1223 		tmpver.minor = svc->ver.minor;
1224 
1225 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &tmpver, svc->hdl);
1226 	}
1227 
1228 done:
1229 	mutex_exit(&ds_svcs.lock);
1230 }
1231 
1232 static void
1233 ds_try_next_port(ds_svc_t *svc, int portid)
1234 {
1235 	ds_port_t *port;
1236 	ds_portset_t totry;
1237 	int i;
1238 
1239 	DS_DBG_LDC(CE_NOTE, "ds@%x %s" DS_EOL, portid, __func__);
1240 	DS_PORTSET_NOT(totry, svc->tried);
1241 	DS_PORTSET_AND(totry, svc->avail);
1242 	if (DS_PORTSET_ISNULL(totry))
1243 		return;
1244 
1245 	for (i = 0; i < DS_MAX_PORTS; i++, portid++) {
1246 		if (portid >= DS_MAX_PORTS) {
1247 			portid = 0;
1248 		}
1249 
1250 		/*
1251 		 * If the port is not in the available list,
1252 		 * it is not a candidate for registration.
1253 		 */
1254 		if (!DS_PORT_IN_SET(totry, portid)) {
1255 			continue;
1256 		}
1257 
1258 		port = &ds_ports[portid];
1259 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s trying ldc.id: %d" DS_EOL,
1260 		    portid, __func__, (uint_t)(port->ldc.id));
1261 		if (ds_send_reg_req(svc, port) == 0) {
1262 			DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send OK" DS_EOL,
1263 			    portid, __func__);
1264 			/* register sent successfully */
1265 			break;
1266 		}
1267 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send FAIL" DS_EOL,
1268 		    portid, __func__);
1269 
1270 		/* reset the service to try the next port */
1271 		ds_reset_svc(svc, port);
1272 	}
1273 }
1274 
1275 static void
1276 ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len)
1277 {
1278 	ds_reg_nack_t	*nack;
1279 	ds_svc_t	*svc;
1280 	int		idx;
1281 	size_t		explen = DS_MSG_LEN(ds_reg_nack_t);
1282 
1283 	/* sanity check the incoming message */
1284 	if (len != explen) {
1285 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid message "
1286 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1287 		    explen);
1288 		return;
1289 	}
1290 
1291 	nack = (ds_reg_nack_t *)(buf + DS_HDR_SZ);
1292 
1293 	mutex_enter(&ds_svcs.lock);
1294 
1295 	/*
1296 	 * We expect a reg_nack for a client ping.
1297 	 */
1298 	if (DS_HDL_ISCLIENT(nack->svc_handle)) {
1299 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: ping hdl: 0x%llx"
1300 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1301 		goto done;
1302 	}
1303 
1304 	/*
1305 	 * This searches for service based on how we generate handles
1306 	 * and so only works because this is a reg nack.
1307 	 */
1308 	if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1309 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid handle 0x%llx"
1310 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1311 		goto done;
1312 	}
1313 
1314 	/* make sure the message makes sense */
1315 	if (svc->state != DS_SVC_REG_PENDING) {
1316 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid state (%d)" DS_EOL,
1317 		    PORTID(port), svc->state);
1318 		goto done;
1319 	}
1320 
1321 	if (nack->result == DS_REG_DUP) {
1322 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: duplicate registration "
1323 		    " for %s" DS_EOL, PORTID(port), svc->cap.svc_id);
1324 		ds_reset_svc(svc, port);
1325 		goto done;
1326 	}
1327 
1328 	/*
1329 	 * A major version of zero indicates that the
1330 	 * service is not supported at all.
1331 	 */
1332 	if (nack->major_vers == 0) {
1333 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' not supported"
1334 		    DS_EOL, PORTID(port), svc->cap.svc_id);
1335 		ds_reset_svc(svc, port);
1336 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1337 			ds_try_next_port(svc, PORTID(port) + 1);
1338 		goto done;
1339 	}
1340 
1341 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' hdl=0x%llx, nack=%d.x"
1342 	    DS_EOL, PORTID(port), svc->cap.svc_id,
1343 	    (u_longlong_t)nack->svc_handle, nack->major_vers);
1344 
1345 	/*
1346 	 * Walk the version list for the service, looking for
1347 	 * a major version that is as close to the requested
1348 	 * major version as possible.
1349 	 */
1350 	for (idx = svc->ver_idx; idx < svc->cap.nvers; idx++) {
1351 		if (svc->cap.vers[idx].major <= nack->major_vers) {
1352 			/* found a version to try */
1353 			break;
1354 		}
1355 	}
1356 
1357 	if (idx == svc->cap.nvers) {
1358 		/* no supported version */
1359 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: %s v%d.x not supported"
1360 		    DS_EOL, PORTID(port), svc->cap.svc_id, nack->major_vers);
1361 		ds_reset_svc(svc, port);
1362 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1363 			ds_try_next_port(svc, PORTID(port) + 1);
1364 		goto done;
1365 	}
1366 
1367 	/* start the handshake again */
1368 	svc->state = DS_SVC_INACTIVE;
1369 	svc->ver_idx = idx;
1370 
1371 	(void) ds_svc_register(svc, NULL);
1372 
1373 done:
1374 	mutex_exit(&ds_svcs.lock);
1375 }
1376 
1377 static void
1378 ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len)
1379 {
1380 	ds_hdr_t	*hdr;
1381 	ds_unreg_req_t	*req;
1382 	ds_unreg_ack_t	*ack;
1383 	ds_svc_t	*svc;
1384 	char		*msg;
1385 	size_t		msglen;
1386 	size_t		explen = DS_MSG_LEN(ds_unreg_req_t);
1387 
1388 	/* sanity check the incoming message */
1389 	if (len != explen) {
1390 		cmn_err(CE_WARN, "ds@%lx: <unreg_req: invalid message "
1391 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1392 		    explen);
1393 		return;
1394 	}
1395 
1396 	req = (ds_unreg_req_t *)(buf + DS_HDR_SZ);
1397 
1398 	mutex_enter(&ds_svcs.lock);
1399 
1400 	/* lookup appropriate client or service */
1401 	if (DS_HDL_ISCLIENT(req->svc_handle) ||
1402 	    ((svc = ds_find_clnt_svc_by_hdl_port(req->svc_handle, port))
1403 	    == NULL && ((svc = ds_get_svc(req->svc_handle)) == NULL ||
1404 	    svc->port != port))) {
1405 		cmn_err(CE_WARN, "ds@%lx: <unreg_req: invalid handle 0x%llx"
1406 		    DS_EOL, PORTID(port), (u_longlong_t)req->svc_handle);
1407 		ds_send_unreg_nack(port, req->svc_handle);
1408 		mutex_exit(&ds_svcs.lock);
1409 		return;
1410 	}
1411 
1412 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_req: '%s' handle 0x%llx" DS_EOL,
1413 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1414 
1415 	(void) ds_svc_unregister(svc, svc->port);
1416 
1417 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_ack>: '%s' hdl=0x%llx" DS_EOL,
1418 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1419 
1420 	ds_check_for_dup_services(svc);
1421 
1422 	mutex_exit(&ds_svcs.lock);
1423 
1424 	msglen = DS_HDR_SZ + sizeof (ds_unreg_ack_t);
1425 	msg = DS_MALLOC(msglen);
1426 
1427 	hdr = (ds_hdr_t *)msg;
1428 	hdr->msg_type = DS_UNREG_ACK;
1429 	hdr->payload_len = sizeof (ds_unreg_ack_t);
1430 
1431 	ack = (ds_unreg_ack_t *)(msg + DS_HDR_SZ);
1432 	ack->svc_handle = req->svc_handle;
1433 
1434 	/* send message */
1435 	(void) ds_send_msg(port, msg, msglen);
1436 	DS_FREE(msg, msglen);
1437 
1438 }
1439 
1440 static void
1441 ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len)
1442 {
1443 	ds_unreg_ack_t	*ack;
1444 	size_t		explen = DS_MSG_LEN(ds_unreg_ack_t);
1445 
1446 	/* sanity check the incoming message */
1447 	if (len != explen) {
1448 		cmn_err(CE_WARN, "ds@%lx: <unreg_ack: invalid message "
1449 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1450 		    explen);
1451 		return;
1452 	}
1453 
1454 	ack = (ds_unreg_ack_t *)(buf + DS_HDR_SZ);
1455 
1456 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: hdl=0x%llx" DS_EOL,
1457 	    PORTID(port), (u_longlong_t)ack->svc_handle);
1458 
1459 #ifdef DEBUG
1460 	mutex_enter(&ds_svcs.lock);
1461 
1462 	/*
1463 	 * Since the unregister request was initiated locally,
1464 	 * the service structure has already been torn down.
1465 	 * Just perform a sanity check to make sure the message
1466 	 * is appropriate.
1467 	 */
1468 	if (ds_get_svc(ack->svc_handle) != NULL) {
1469 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: handle 0x%llx in use"
1470 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1471 	}
1472 
1473 	mutex_exit(&ds_svcs.lock);
1474 #endif	/* DEBUG */
1475 }
1476 
1477 static void
1478 ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len)
1479 {
1480 	ds_unreg_nack_t	*nack;
1481 	size_t		explen = DS_MSG_LEN(ds_unreg_nack_t);
1482 
1483 	/* sanity check the incoming message */
1484 	if (len != explen) {
1485 		cmn_err(CE_WARN, "ds@%lx: <unreg_nack: invalid message "
1486 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1487 		    explen);
1488 		return;
1489 	}
1490 
1491 	nack = (ds_unreg_nack_t *)(buf + DS_HDR_SZ);
1492 
1493 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: hdl=0x%llx" DS_EOL,
1494 	    PORTID(port), (u_longlong_t)nack->svc_handle);
1495 
1496 #ifdef DEBUG
1497 	mutex_enter(&ds_svcs.lock);
1498 
1499 	/*
1500 	 * Since the unregister request was initiated locally,
1501 	 * the service structure has already been torn down.
1502 	 * Just perform a sanity check to make sure the message
1503 	 * is appropriate.
1504 	 */
1505 	if (ds_get_svc(nack->svc_handle) != NULL) {
1506 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: handle 0x%llx in use"
1507 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1508 	}
1509 
1510 	mutex_exit(&ds_svcs.lock);
1511 #endif	/* DEBUG */
1512 }
1513 
1514 static void
1515 ds_handle_data(ds_port_t *port, caddr_t buf, size_t len)
1516 {
1517 	ds_data_handle_t	*data;
1518 	ds_svc_t		*svc;
1519 	char			*msg;
1520 	int			msgsz;
1521 	int			hdrsz;
1522 	size_t			explen = DS_MSG_LEN(ds_data_handle_t);
1523 
1524 	/* sanity check the incoming message */
1525 	if (len < explen) {
1526 		cmn_err(CE_WARN, "ds@%lx: <data: invalid message length "
1527 		    "(%ld), expected at least %ld" DS_EOL, PORTID(port), len,
1528 		    explen);
1529 		return;
1530 	}
1531 
1532 	data = (ds_data_handle_t *)(buf + DS_HDR_SZ);
1533 
1534 	hdrsz = DS_HDR_SZ + sizeof (ds_data_handle_t);
1535 	msgsz = len - hdrsz;
1536 
1537 	/* strip off the header for the client */
1538 	msg = (msgsz) ? (buf + hdrsz) : NULL;
1539 
1540 	mutex_enter(&ds_svcs.lock);
1541 
1542 	if ((svc = ds_find_clnt_svc_by_hdl_port(data->svc_handle, port))
1543 	    == NULL) {
1544 		if ((svc = ds_get_svc(data->svc_handle)) == NULL) {
1545 			mutex_exit(&ds_svcs.lock);
1546 			cmn_err(CE_WARN, "ds@%lx: <data: invalid handle 0x%llx"
1547 			    DS_EOL, PORTID(port),
1548 			    (u_longlong_t)data->svc_handle);
1549 			ds_send_data_nack(port, data->svc_handle);
1550 			return;
1551 		}
1552 	}
1553 
1554 	mutex_exit(&ds_svcs.lock);
1555 
1556 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <data: '%s' hdl=0x%llx" DS_EOL,
1557 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)svc->hdl);
1558 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msgsz);
1559 
1560 	/* dispatch this message to the client */
1561 	(*svc->ops.ds_data_cb)(svc->ops.cb_arg, msg, msgsz);
1562 }
1563 
1564 static void
1565 ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len)
1566 {
1567 	ds_svc_t	*svc;
1568 	ds_data_nack_t	*nack;
1569 	size_t		explen = DS_MSG_LEN(ds_data_nack_t);
1570 
1571 	/* sanity check the incoming message */
1572 	if (len != explen) {
1573 		cmn_err(CE_WARN, "ds@%lx: <data_nack: invalid message "
1574 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1575 		    explen);
1576 		return;
1577 	}
1578 
1579 	nack = (ds_data_nack_t *)(buf + DS_HDR_SZ);
1580 
1581 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack: hdl=0x%llx, result=0x%llx"
1582 	    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle,
1583 	    (u_longlong_t)nack->result);
1584 
1585 	if (nack->result == DS_INV_HDL) {
1586 
1587 		mutex_enter(&ds_svcs.lock);
1588 
1589 		if ((svc = ds_find_clnt_svc_by_hdl_port(nack->svc_handle,
1590 		    port)) == NULL) {
1591 			if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1592 				mutex_exit(&ds_svcs.lock);
1593 				return;
1594 			}
1595 		}
1596 
1597 		cmn_err(CE_WARN, "ds@%lx: <data_nack: handle 0x%llx reported "
1598 		    " as invalid" DS_EOL, PORTID(port),
1599 		    (u_longlong_t)nack->svc_handle);
1600 
1601 		(void) ds_svc_unregister(svc, svc->port);
1602 
1603 		mutex_exit(&ds_svcs.lock);
1604 	}
1605 }
1606 
1607 /* Initialize the port */
1608 void
1609 ds_send_init_req(ds_port_t *port)
1610 {
1611 	ds_hdr_t	*hdr;
1612 	ds_init_req_t	*init_req;
1613 	size_t		msglen;
1614 	ds_ver_t	*vers = &ds_vers[port->ver_idx];
1615 
1616 	mutex_enter(&port->lock);
1617 	if (port->state != DS_PORT_LDC_INIT) {
1618 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: invalid state: %d"
1619 		    DS_EOL, PORTID(port), port->state);
1620 		mutex_exit(&port->lock);
1621 		return;
1622 	}
1623 	mutex_exit(&port->lock);
1624 
1625 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: req=v%d.%d" DS_EOL,
1626 	    PORTID(port), vers->major, vers->minor);
1627 
1628 	msglen = DS_HDR_SZ + sizeof (ds_init_req_t);
1629 	hdr = DS_MALLOC(msglen);
1630 
1631 	hdr->msg_type = DS_INIT_REQ;
1632 	hdr->payload_len = sizeof (ds_init_req_t);
1633 
1634 	init_req = (ds_init_req_t *)((caddr_t)hdr + DS_HDR_SZ);
1635 	init_req->major_vers = vers->major;
1636 	init_req->minor_vers = vers->minor;
1637 
1638 	if (ds_send_msg(port, (caddr_t)hdr, msglen) == 0) {
1639 		/*
1640 		 * We've left the port state unlocked over the malloc/send,
1641 		 * make sure no one has changed the state under us before
1642 		 * we update the state.
1643 		 */
1644 		mutex_enter(&port->lock);
1645 		if (port->state == DS_PORT_LDC_INIT)
1646 			port->state = DS_PORT_INIT_REQ;
1647 		mutex_exit(&port->lock);
1648 	}
1649 	DS_FREE(hdr, msglen);
1650 }
1651 
1652 static int
1653 ds_send_reg_req(ds_svc_t *svc, ds_port_t *port)
1654 {
1655 	ds_ver_t	*ver;
1656 	ds_hdr_t	*hdr;
1657 	caddr_t		msg;
1658 	size_t		msglen;
1659 	ds_reg_req_t	*req;
1660 	size_t		idlen;
1661 	int		rv;
1662 
1663 	if ((svc->state != DS_SVC_INACTIVE) &&
1664 	    ((svc->flags & DSSF_ISCLIENT) == 0)) {
1665 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: invalid svc state (%d) "
1666 		    "for svc '%s'" DS_EOL, PORTID(port), svc->state,
1667 		    svc->cap.svc_id);
1668 		return (-1);
1669 	}
1670 
1671 	mutex_enter(&port->lock);
1672 
1673 	/* check on the LDC to Zeus */
1674 	if (port->ldc.state != LDC_UP) {
1675 		/* can not send message */
1676 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: channel %ld is not up"
1677 		    DS_EOL, PORTID(port), port->ldc.id);
1678 		mutex_exit(&port->lock);
1679 		return (-1);
1680 	}
1681 
1682 	/* make sure port is ready */
1683 	if (port->state != DS_PORT_READY) {
1684 		/* can not send message */
1685 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: port is not ready"
1686 		    DS_EOL, PORTID(port));
1687 		mutex_exit(&port->lock);
1688 		return (-1);
1689 	}
1690 
1691 	mutex_exit(&port->lock);
1692 
1693 	/* allocate the message buffer */
1694 	idlen = strlen(svc->cap.svc_id);
1695 	msglen = DS_HDR_SZ + sizeof (ds_reg_req_t) + idlen;
1696 	msg = DS_MALLOC(msglen);
1697 
1698 	/* copy in the header data */
1699 	hdr = (ds_hdr_t *)msg;
1700 	hdr->msg_type = DS_REG_REQ;
1701 	hdr->payload_len = sizeof (ds_reg_req_t) + idlen;
1702 
1703 	req = (ds_reg_req_t *)(msg + DS_HDR_SZ);
1704 	req->svc_handle = svc->hdl;
1705 	ver = &(svc->cap.vers[svc->ver_idx]);
1706 	req->major_vers = ver->major;
1707 	req->minor_vers = ver->minor;
1708 
1709 	/* copy in the service id */
1710 	(void) memcpy(req->svc_id, svc->cap.svc_id, idlen + 1);
1711 
1712 	/* send the message */
1713 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: '%s' ver=%d.%d, hdl=0x%llx"
1714 	    DS_EOL, PORTID(port), svc->cap.svc_id, ver->major, ver->minor,
1715 	    (u_longlong_t)svc->hdl);
1716 
1717 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1718 		svc->port = port;
1719 		rv = -1;
1720 	} else if ((svc->flags & DSSF_ISCLIENT) == 0) {
1721 		svc->state = DS_SVC_REG_PENDING;
1722 	}
1723 	DS_FREE(msg, msglen);
1724 
1725 	return (rv);
1726 }
1727 
1728 /*
1729  * Keep around in case we want this later
1730  */
1731 int
1732 ds_send_unreg_req(ds_svc_t *svc)
1733 {
1734 	caddr_t		msg;
1735 	size_t		msglen;
1736 	ds_hdr_t	*hdr;
1737 	ds_unreg_req_t	*req;
1738 	ds_port_t	*port = svc->port;
1739 	int		rv;
1740 
1741 	if (port == NULL) {
1742 		DS_DBG(CE_NOTE, "send_unreg_req: service '%s' not "
1743 		    "associated with a port" DS_EOL, svc->cap.svc_id);
1744 		return (-1);
1745 	}
1746 
1747 	mutex_enter(&port->lock);
1748 
1749 	/* check on the LDC to Zeus */
1750 	if (port->ldc.state != LDC_UP) {
1751 		/* can not send message */
1752 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: channel %ld is not up"
1753 		    DS_EOL, PORTID(port), port->ldc.id);
1754 		mutex_exit(&port->lock);
1755 		return (-1);
1756 	}
1757 
1758 	/* make sure port is ready */
1759 	if (port->state != DS_PORT_READY) {
1760 		/* can not send message */
1761 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: port is not ready" DS_EOL,
1762 		    PORTID(port));
1763 		mutex_exit(&port->lock);
1764 		return (-1);
1765 	}
1766 
1767 	mutex_exit(&port->lock);
1768 
1769 	msglen = DS_HDR_SZ + sizeof (ds_unreg_req_t);
1770 	msg = DS_MALLOC(msglen);
1771 
1772 	/* copy in the header data */
1773 	hdr = (ds_hdr_t *)msg;
1774 	hdr->msg_type = DS_UNREG;
1775 	hdr->payload_len = sizeof (ds_unreg_req_t);
1776 
1777 	req = (ds_unreg_req_t *)(msg + DS_HDR_SZ);
1778 	if (svc->flags & DSSF_ISCLIENT) {
1779 		req->svc_handle = svc->svc_hdl;
1780 	} else {
1781 		req->svc_handle = svc->hdl;
1782 	}
1783 
1784 	/* send the message */
1785 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_req>: '%s' hdl=0x%llx" DS_EOL,
1786 	    PORTID(port), (svc->cap.svc_id) ? svc->cap.svc_id : "NULL",
1787 	    (u_longlong_t)svc->hdl);
1788 
1789 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1790 		rv = -1;
1791 	}
1792 	DS_FREE(msg, msglen);
1793 
1794 	return (rv);
1795 }
1796 
1797 static void
1798 ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1799 {
1800 	caddr_t		msg;
1801 	size_t		msglen;
1802 	ds_hdr_t	*hdr;
1803 	ds_unreg_nack_t	*nack;
1804 
1805 	mutex_enter(&port->lock);
1806 
1807 	/* check on the LDC to Zeus */
1808 	if (port->ldc.state != LDC_UP) {
1809 		/* can not send message */
1810 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: channel %ld is not up"
1811 		    DS_EOL, PORTID(port), port->ldc.id);
1812 		mutex_exit(&port->lock);
1813 		return;
1814 	}
1815 
1816 	/* make sure port is ready */
1817 	if (port->state != DS_PORT_READY) {
1818 		/* can not send message */
1819 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: port is not ready"
1820 		    DS_EOL, PORTID(port));
1821 		mutex_exit(&port->lock);
1822 		return;
1823 	}
1824 
1825 	mutex_exit(&port->lock);
1826 
1827 	msglen = DS_HDR_SZ + sizeof (ds_unreg_nack_t);
1828 	msg = DS_MALLOC(msglen);
1829 
1830 	/* copy in the header data */
1831 	hdr = (ds_hdr_t *)msg;
1832 	hdr->msg_type = DS_UNREG_NACK;
1833 	hdr->payload_len = sizeof (ds_unreg_nack_t);
1834 
1835 	nack = (ds_unreg_nack_t *)(msg + DS_HDR_SZ);
1836 	nack->svc_handle = bad_hdl;
1837 
1838 	/* send the message */
1839 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_nack>: hdl=0x%llx" DS_EOL,
1840 	    PORTID(port), (u_longlong_t)bad_hdl);
1841 
1842 	(void) ds_send_msg(port, msg, msglen);
1843 	DS_FREE(msg, msglen);
1844 }
1845 
1846 static void
1847 ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1848 {
1849 	caddr_t		msg;
1850 	size_t		msglen;
1851 	ds_hdr_t	*hdr;
1852 	ds_data_nack_t	*nack;
1853 
1854 	mutex_enter(&port->lock);
1855 
1856 	/* check on the LDC to Zeus */
1857 	if (port->ldc.state != LDC_UP) {
1858 		/* can not send message */
1859 		cmn_err(CE_WARN, "ds@%lx: data_nack>: channel %ld is not up"
1860 		    DS_EOL, PORTID(port), port->ldc.id);
1861 		mutex_exit(&port->lock);
1862 		return;
1863 	}
1864 
1865 	/* make sure port is ready */
1866 	if (port->state != DS_PORT_READY) {
1867 		/* can not send message */
1868 		cmn_err(CE_WARN, "ds@%lx: data_nack>: port is not ready" DS_EOL,
1869 		    PORTID(port));
1870 		mutex_exit(&port->lock);
1871 		return;
1872 	}
1873 
1874 	mutex_exit(&port->lock);
1875 
1876 	msglen = DS_HDR_SZ + sizeof (ds_data_nack_t);
1877 	msg = DS_MALLOC(msglen);
1878 
1879 	/* copy in the header data */
1880 	hdr = (ds_hdr_t *)msg;
1881 	hdr->msg_type = DS_NACK;
1882 	hdr->payload_len = sizeof (ds_data_nack_t);
1883 
1884 	nack = (ds_data_nack_t *)(msg + DS_HDR_SZ);
1885 	nack->svc_handle = bad_hdl;
1886 	nack->result = DS_INV_HDL;
1887 
1888 	/* send the message */
1889 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack>: hdl=0x%llx" DS_EOL,
1890 	    PORTID(port), (u_longlong_t)bad_hdl);
1891 
1892 	(void) ds_send_msg(port, msg, msglen);
1893 	DS_FREE(msg, msglen);
1894 }
1895 
1896 /* END DS PROTOCOL SUPPORT FUNCTIONS */
1897 
1898 #ifdef DEBUG
1899 
1900 #define	BYTESPERLINE	8
1901 #define	LINEWIDTH	((BYTESPERLINE * 3) + (BYTESPERLINE + 2) + 1)
1902 #define	ASCIIOFFSET	((BYTESPERLINE * 3) + 2)
1903 #define	ISPRINT(c)	((c >= ' ') && (c <= '~'))
1904 
1905 /*
1906  * Output a buffer formatted with a set number of bytes on
1907  * each line. Append each line with the ASCII equivalent of
1908  * each byte if it falls within the printable ASCII range,
1909  * and '.' otherwise.
1910  */
1911 void
1912 ds_dump_msg(void *vbuf, size_t len)
1913 {
1914 	int	i, j;
1915 	char	*curr;
1916 	char	*aoff;
1917 	char	line[LINEWIDTH];
1918 	uint8_t	*buf = vbuf;
1919 
1920 	if (len > 128)
1921 		len = 128;
1922 
1923 	/* walk the buffer one line at a time */
1924 	for (i = 0; i < len; i += BYTESPERLINE) {
1925 
1926 		bzero(line, LINEWIDTH);
1927 
1928 		curr = line;
1929 		aoff = line + ASCIIOFFSET;
1930 
1931 		/*
1932 		 * Walk the bytes in the current line, storing
1933 		 * the hex value for the byte as well as the
1934 		 * ASCII representation in a temporary buffer.
1935 		 * All ASCII values are placed at the end of
1936 		 * the line.
1937 		 */
1938 		for (j = 0; (j < BYTESPERLINE) && ((i + j) < len); j++) {
1939 			(void) sprintf(curr, " %02x", buf[i + j]);
1940 			*aoff = (ISPRINT(buf[i + j])) ? buf[i + j] : '.';
1941 			curr += 3;
1942 			aoff++;
1943 		}
1944 
1945 		/*
1946 		 * Fill in to the start of the ASCII translation
1947 		 * with spaces. This will only be necessary if
1948 		 * this is the last line and there are not enough
1949 		 * bytes to fill the whole line.
1950 		 */
1951 		while (curr != (line + ASCIIOFFSET))
1952 			*curr++ = ' ';
1953 
1954 		cmn_err(CE_NOTE, "%s" DS_EOL, line);
1955 	}
1956 }
1957 #endif /* DEBUG */
1958 
1959 
1960 /*
1961  * Walk the table of registered services, executing the specified callback
1962  * function for each service on a port. A non-zero return value from the
1963  * callback is used to terminate the walk, not to indicate an error. Returns
1964  * the index of the last service visited.
1965  */
1966 int
1967 ds_walk_svcs(svc_cb_t svc_cb, void *arg)
1968 {
1969 	int		idx;
1970 	ds_svc_t	*svc;
1971 
1972 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
1973 
1974 	/* walk every table entry */
1975 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
1976 		svc = ds_svcs.tbl[idx];
1977 
1978 		/* execute the callback */
1979 		if ((*svc_cb)(svc, arg) != 0)
1980 			break;
1981 	}
1982 
1983 	return (idx);
1984 }
1985 
1986 static int
1987 ds_svc_isfree(ds_svc_t *svc, void *arg)
1988 {
1989 	_NOTE(ARGUNUSED(arg))
1990 
1991 	/*
1992 	 * Looking for a free service. This may be a NULL entry
1993 	 * in the table, or an unused structure that could be
1994 	 * reused.
1995 	 */
1996 
1997 	if (DS_SVC_ISFREE(svc)) {
1998 		/* yes, it is free */
1999 		return (1);
2000 	}
2001 
2002 	/* not a candidate */
2003 	return (0);
2004 }
2005 
2006 int
2007 ds_svc_ismatch(ds_svc_t *svc, void *arg)
2008 {
2009 	if (DS_SVC_ISFREE(svc)) {
2010 		return (0);
2011 	}
2012 
2013 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2014 	    (svc->flags & DSSF_ISCLIENT) == 0) {
2015 		/* found a match */
2016 		return (1);
2017 	}
2018 
2019 	return (0);
2020 }
2021 
2022 int
2023 ds_svc_clnt_ismatch(ds_svc_t *svc, void *arg)
2024 {
2025 	if (DS_SVC_ISFREE(svc)) {
2026 		return (0);
2027 	}
2028 
2029 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2030 	    (svc->flags & DSSF_ISCLIENT) != 0) {
2031 		/* found a match */
2032 		return (1);
2033 	}
2034 
2035 	return (0);
2036 }
2037 
2038 int
2039 ds_svc_free(ds_svc_t *svc, void *arg)
2040 {
2041 	_NOTE(ARGUNUSED(arg))
2042 
2043 	if (svc == NULL) {
2044 		return (0);
2045 	}
2046 
2047 	if (svc->cap.svc_id) {
2048 		DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
2049 		svc->cap.svc_id = NULL;
2050 	}
2051 
2052 	if (svc->cap.vers) {
2053 		DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
2054 		svc->cap.vers = NULL;
2055 	}
2056 
2057 	DS_FREE(svc, sizeof (ds_svc_t));
2058 
2059 	return (0);
2060 }
2061 
2062 static int
2063 ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port)
2064 {
2065 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2066 
2067 	if (DS_SVC_ISFREE(svc))
2068 		return (0);
2069 
2070 	if (!DS_PORT_IN_SET(svc->avail, PORTID(port)))
2071 		return (0);
2072 
2073 	DS_PORTSET_ADD(svc->tried, PORTID(port));
2074 
2075 	if (ds_send_reg_req(svc, port) == 0) {
2076 		/* register sent successfully */
2077 		return (1);
2078 	}
2079 
2080 	if ((svc->flags & DSSF_ISCLIENT) == 0) {
2081 		/* reset the service */
2082 		ds_reset_svc(svc, port);
2083 	}
2084 	return (0);
2085 }
2086 
2087 int
2088 ds_svc_register(ds_svc_t *svc, void *arg)
2089 {
2090 	_NOTE(ARGUNUSED(arg))
2091 	ds_portset_t ports;
2092 	ds_port_t *port;
2093 	int	idx;
2094 
2095 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2096 
2097 	if (DS_SVC_ISFREE(svc))
2098 		return (0);
2099 
2100 	ports = svc->avail;
2101 	if (svc->flags & DSSF_ISCLIENT) {
2102 		ds_portset_del_active_clients(svc->cap.svc_id, &ports);
2103 	} else if (svc->state != DS_SVC_INACTIVE)
2104 		return (0);
2105 
2106 	if (DS_PORTSET_ISNULL(ports))
2107 		return (0);
2108 
2109 	/*
2110 	 * Attempt to register the service. Start with the lowest
2111 	 * numbered port and continue until a registration message
2112 	 * is sent successfully, or there are no ports left to try.
2113 	 */
2114 	for (idx = 0; idx < DS_MAX_PORTS; idx++) {
2115 
2116 		/*
2117 		 * If the port is not in the available list,
2118 		 * it is not a candidate for registration.
2119 		 */
2120 		if (!DS_PORT_IN_SET(ports, idx)) {
2121 			continue;
2122 		}
2123 
2124 		port = &ds_ports[idx];
2125 		if (ds_svc_register_onport(svc, port)) {
2126 			if ((svc->flags & DSSF_ISCLIENT) == 0)
2127 				break;
2128 			DS_PORTSET_DEL(svc->avail, idx);
2129 		}
2130 	}
2131 
2132 	return (0);
2133 }
2134 
2135 static int
2136 ds_svc_unregister(ds_svc_t *svc, void *arg)
2137 {
2138 	ds_port_t *port = (ds_port_t *)arg;
2139 	ds_svc_hdl_t hdl;
2140 
2141 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2142 
2143 	if (DS_SVC_ISFREE(svc)) {
2144 		return (0);
2145 	}
2146 
2147 	/* make sure the service is using this port */
2148 	if (svc->port != port) {
2149 		return (0);
2150 	}
2151 
2152 	if (port) {
2153 		DS_DBG(CE_NOTE, "ds@%lx: svc_unreg: id='%s', ver=%d.%d, "
2154 		    " hdl=0x%09lx" DS_EOL, PORTID(port), svc->cap.svc_id,
2155 		    svc->ver.major, svc->ver.minor, svc->hdl);
2156 	} else {
2157 		DS_DBG(CE_NOTE, "port=NULL: svc_unreg: id='%s', ver=%d.%d, "
2158 		    " hdl=0x%09lx" DS_EOL, svc->cap.svc_id, svc->ver.major,
2159 		    svc->ver.minor, svc->hdl);
2160 	}
2161 
2162 	/* reset the service structure */
2163 	ds_reset_svc(svc, port);
2164 
2165 	/* call the client unregister callback */
2166 	if (svc->ops.ds_unreg_cb) {
2167 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
2168 	}
2169 
2170 	/* increment the count in the handle to prevent reuse */
2171 	hdl = DS_ALLOC_HDL(DS_HDL2IDX(svc->hdl), DS_HDL2COUNT(svc->hdl));
2172 	if (DS_HDL_ISCLIENT(svc->hdl)) {
2173 		DS_HDL_SET_ISCLIENT(hdl);
2174 	}
2175 	svc->hdl = hdl;
2176 
2177 	if (svc->state != DS_SVC_UNREG_PENDING) {
2178 		/* try to initiate a new registration */
2179 		(void) ds_svc_register(svc, NULL);
2180 	}
2181 
2182 	return (0);
2183 }
2184 
2185 static int
2186 ds_svc_port_up(ds_svc_t *svc, void *arg)
2187 {
2188 	ds_port_t *port = (ds_port_t *)arg;
2189 
2190 	if (DS_SVC_ISFREE(svc)) {
2191 		/* nothing to do */
2192 		return (0);
2193 	}
2194 
2195 	DS_PORTSET_ADD(svc->avail, port->id);
2196 	DS_PORTSET_DEL(svc->tried, port->id);
2197 
2198 	return (0);
2199 }
2200 
2201 ds_svc_t *
2202 ds_alloc_svc(void)
2203 {
2204 	int		idx;
2205 	uint_t		newmaxsvcs;
2206 	ds_svc_t	**newtbl;
2207 	ds_svc_t	*newsvc;
2208 
2209 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2210 
2211 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2212 
2213 	if (idx != ds_svcs.maxsvcs) {
2214 		goto found;
2215 	}
2216 
2217 	/*
2218 	 * There was no free space in the table. Grow
2219 	 * the table to double its current size.
2220 	 */
2221 	newmaxsvcs = ds_svcs.maxsvcs * 2;
2222 	newtbl = DS_MALLOC(newmaxsvcs * sizeof (ds_svc_t *));
2223 
2224 	/* copy old table data to the new table */
2225 	(void) memcpy(newtbl, ds_svcs.tbl,
2226 	    ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2227 
2228 	/* clean up the old table */
2229 	DS_FREE(ds_svcs.tbl, ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2230 	ds_svcs.tbl = newtbl;
2231 	ds_svcs.maxsvcs = newmaxsvcs;
2232 
2233 	/* search for a free space again */
2234 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2235 
2236 	/* the table is locked so should find a free slot */
2237 	ASSERT(idx != ds_svcs.maxsvcs);
2238 
2239 found:
2240 	/* allocate a new svc structure if necessary */
2241 	if ((newsvc = ds_svcs.tbl[idx]) == NULL) {
2242 		/* allocate a new service */
2243 		newsvc = DS_MALLOC(sizeof (ds_svc_t));
2244 		ds_svcs.tbl[idx] = newsvc;
2245 	}
2246 
2247 	/* fill in the handle */
2248 	newsvc->hdl = DS_ALLOC_HDL(idx, DS_HDL2COUNT(newsvc->hdl));
2249 	newsvc->state = DS_SVC_FREE;	/* Mark as free temporarily */
2250 
2251 	return (newsvc);
2252 }
2253 
2254 static void
2255 ds_reset_svc(ds_svc_t *svc, ds_port_t *port)
2256 {
2257 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2258 
2259 	if (svc->state != DS_SVC_UNREG_PENDING)
2260 		svc->state = DS_SVC_INACTIVE;
2261 	svc->ver_idx = 0;
2262 	svc->ver.major = 0;
2263 	svc->ver.minor = 0;
2264 	svc->port = NULL;
2265 	if (port) {
2266 		DS_PORTSET_DEL(svc->avail, port->id);
2267 	}
2268 }
2269 
2270 ds_svc_t *
2271 ds_get_svc(ds_svc_hdl_t hdl)
2272 {
2273 	int		idx;
2274 	ds_svc_t	*svc;
2275 
2276 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2277 
2278 	if (hdl == DS_INVALID_HDL)
2279 		return (NULL);
2280 
2281 	idx = DS_HDL2IDX(hdl);
2282 
2283 	/* check if index is out of bounds */
2284 	if ((idx < 0) || (idx >= ds_svcs.maxsvcs))
2285 		return (NULL);
2286 
2287 	svc = ds_svcs.tbl[idx];
2288 
2289 	/* check for a valid service */
2290 	if (DS_SVC_ISFREE(svc))
2291 		return (NULL);
2292 
2293 	/* make sure the handle is an exact match */
2294 	if (svc->hdl != hdl)
2295 		return (NULL);
2296 
2297 	return (svc);
2298 }
2299 
2300 static void
2301 ds_port_reset(ds_port_t *port)
2302 {
2303 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2304 	ASSERT(MUTEX_HELD(&port->lock));
2305 
2306 	/* connection went down, mark everything inactive */
2307 	(void) ds_walk_svcs(ds_svc_unregister, port);
2308 
2309 	port->ver_idx = 0;
2310 	port->ver.major = 0;
2311 	port->ver.minor = 0;
2312 	port->state = DS_PORT_LDC_INIT;
2313 }
2314 
2315 /*
2316  * Verify that a version array is sorted as expected for the
2317  * version negotiation to work correctly.
2318  */
2319 ds_vers_check_t
2320 ds_vers_isvalid(ds_ver_t *vers, int nvers)
2321 {
2322 	uint16_t	curr_major;
2323 	uint16_t	curr_minor;
2324 	int		idx;
2325 
2326 	curr_major = vers[0].major;
2327 	curr_minor = vers[0].minor;
2328 
2329 	/*
2330 	 * Walk the version array, verifying correct ordering.
2331 	 * The array must be sorted from highest supported
2332 	 * version to lowest supported version.
2333 	 */
2334 	for (idx = 0; idx < nvers; idx++) {
2335 		if (vers[idx].major > curr_major) {
2336 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2337 			    " increasing major versions" DS_EOL);
2338 			return (DS_VERS_INCREASING_MAJOR_ERR);
2339 		}
2340 
2341 		if (vers[idx].major < curr_major) {
2342 			curr_major = vers[idx].major;
2343 			curr_minor = vers[idx].minor;
2344 			continue;
2345 		}
2346 
2347 		if (vers[idx].minor > curr_minor) {
2348 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2349 			    " increasing minor versions" DS_EOL);
2350 			return (DS_VERS_INCREASING_MINOR_ERR);
2351 		}
2352 
2353 		curr_minor = vers[idx].minor;
2354 	}
2355 
2356 	return (DS_VERS_OK);
2357 }
2358 
2359 /*
2360  * Extended user capability init.
2361  */
2362 int
2363 ds_ucap_init(ds_capability_t *cap, ds_clnt_ops_t *ops, uint32_t flags,
2364     int instance, ds_svc_hdl_t *hdlp)
2365 {
2366 	ds_vers_check_t	status;
2367 	ds_svc_t	*svc;
2368 	int		rv = 0;
2369 	ds_svc_hdl_t 	lb_hdl, hdl;
2370 	int		is_loopback;
2371 	int		is_client;
2372 
2373 	/* sanity check the args */
2374 	if ((cap == NULL) || (ops == NULL)) {
2375 		cmn_err(CE_NOTE, "%s: invalid arguments" DS_EOL, __func__);
2376 		return (EINVAL);
2377 	}
2378 
2379 	/* sanity check the capability specifier */
2380 	if ((cap->svc_id == NULL) || (cap->vers == NULL) || (cap->nvers == 0)) {
2381 		cmn_err(CE_NOTE, "%s: invalid capability specifier" DS_EOL,
2382 		    __func__);
2383 		return (EINVAL);
2384 	}
2385 
2386 	/* sanity check the version array */
2387 	if ((status = ds_vers_isvalid(cap->vers, cap->nvers)) != DS_VERS_OK) {
2388 		cmn_err(CE_NOTE, "%s: invalid capability version array "
2389 		    "for %s service: %s" DS_EOL, __func__, cap->svc_id,
2390 		    (status == DS_VERS_INCREASING_MAJOR_ERR) ?
2391 		    "increasing major versions" :
2392 		    "increasing minor versions");
2393 		return (EINVAL);
2394 	}
2395 
2396 	/* data and register callbacks are required */
2397 	if ((ops->ds_data_cb == NULL) || (ops->ds_reg_cb == NULL)) {
2398 		cmn_err(CE_NOTE, "%s: invalid ops specifier for %s service"
2399 		    DS_EOL, __func__, cap->svc_id);
2400 		return (EINVAL);
2401 	}
2402 
2403 	flags &= DSSF_USERFLAGS;
2404 	is_client = flags & DSSF_ISCLIENT;
2405 
2406 	DS_DBG_USR(CE_NOTE, "%s: svc_id='%s', data_cb=0x%lx, cb_arg=0x%lx"
2407 	    DS_EOL, __func__, cap->svc_id, PTR_TO_LONG(ops->ds_data_cb),
2408 	    PTR_TO_LONG(ops->cb_arg));
2409 
2410 	mutex_enter(&ds_svcs.lock);
2411 
2412 	/* check if the service is already registered */
2413 	if (i_ds_hdl_lookup(cap->svc_id, is_client, NULL, 1) == 1) {
2414 		/* already registered */
2415 		cmn_err(CE_NOTE, "Service '%s'/%s already registered" DS_EOL,
2416 		    cap->svc_id,
2417 		    (flags & DSSF_ISCLIENT) ? "client" : "service");
2418 		mutex_exit(&ds_svcs.lock);
2419 		return (EALREADY);
2420 	}
2421 
2422 	svc = ds_alloc_svc();
2423 	if (is_client) {
2424 		DS_HDL_SET_ISCLIENT(svc->hdl);
2425 	}
2426 
2427 	svc->state = DS_SVC_FREE;
2428 	svc->svc_hdl = DS_BADHDL1;
2429 
2430 	svc->flags = flags;
2431 	svc->drvi = instance;
2432 	svc->drv_psp = NULL;
2433 
2434 	/*
2435 	 * Check for loopback.  "pri" is a legacy service that assumes it
2436 	 * will never use loopback mode.
2437 	 */
2438 	if (strcmp(cap->svc_id, "pri") == 0) {
2439 		is_loopback = 0;
2440 	} else if (i_ds_hdl_lookup(cap->svc_id, is_client == 0, &lb_hdl, 1)
2441 	    == 1) {
2442 		if ((rv = ds_loopback_set_svc(svc, cap, &lb_hdl)) != 0) {
2443 			DS_DBG_USR(CE_NOTE, "%s: ds_loopback_set_svc '%s' err "
2444 			    " (%d)" DS_EOL, __func__, cap->svc_id, rv);
2445 			mutex_exit(&ds_svcs.lock);
2446 			return (rv);
2447 		}
2448 		is_loopback = 1;
2449 	} else
2450 		is_loopback = 0;
2451 
2452 	/* copy over all the client information */
2453 	(void) memcpy(&svc->cap, cap, sizeof (ds_capability_t));
2454 
2455 	/* make a copy of the service name */
2456 	svc->cap.svc_id = ds_strdup(cap->svc_id);
2457 
2458 	/* make a copy of the version array */
2459 	svc->cap.vers = DS_MALLOC(cap->nvers * sizeof (ds_ver_t));
2460 	(void) memcpy(svc->cap.vers, cap->vers, cap->nvers * sizeof (ds_ver_t));
2461 
2462 	/* copy the client ops vector */
2463 	(void) memcpy(&svc->ops, ops, sizeof (ds_clnt_ops_t));
2464 
2465 	svc->state = DS_SVC_INACTIVE;
2466 	svc->ver_idx = 0;
2467 	DS_PORTSET_DUP(svc->avail, ds_allports);
2468 	DS_PORTSET_SETNULL(svc->tried);
2469 
2470 	ds_svcs.nsvcs++;
2471 
2472 	hdl = svc->hdl;
2473 
2474 	/*
2475 	 * kludge to allow user callback code to get handle and user args.
2476 	 * Make sure the callback arg points to the svc structure.
2477 	 */
2478 	if ((flags & DSSF_ISUSER) != 0) {
2479 		ds_cbarg_set_cookie(svc);
2480 	}
2481 
2482 	if (is_loopback) {
2483 		ds_loopback_register(hdl);
2484 		ds_loopback_register(lb_hdl);
2485 	}
2486 
2487 	/*
2488 	 * If this is a client or a non-loopback service provider, send
2489 	 * out register requests.
2490 	 */
2491 	if (!is_loopback || (flags & DSSF_ISCLIENT) != 0)
2492 		(void) ds_svc_register(svc, NULL);
2493 
2494 	if (hdlp) {
2495 		*hdlp = hdl;
2496 	}
2497 
2498 	mutex_exit(&ds_svcs.lock);
2499 
2500 	DS_DBG_USR(CE_NOTE, "%s: service '%s' assigned handle 0x%09lx" DS_EOL,
2501 	    __func__, svc->cap.svc_id, hdl);
2502 
2503 	return (0);
2504 }
2505 
2506 /*
2507  * ds_cap_init interface for previous revision.
2508  */
2509 int
2510 ds_cap_init(ds_capability_t *cap, ds_clnt_ops_t *ops)
2511 {
2512 	return (ds_ucap_init(cap, ops, 0, DS_INVALID_INSTANCE, NULL));
2513 }
2514 
2515 /*
2516  * Interface for ds_unreg_hdl in lds driver.
2517  */
2518 int
2519 ds_unreg_hdl(ds_svc_hdl_t hdl)
2520 {
2521 	ds_svc_t	*svc;
2522 	int		is_loopback;
2523 	ds_svc_hdl_t	lb_hdl;
2524 
2525 	DS_DBG_USR(CE_NOTE, "%s: hdl=0x%09lx" DS_EOL, __func__, hdl);
2526 
2527 	mutex_enter(&ds_svcs.lock);
2528 	if ((svc = ds_get_svc(hdl)) == NULL) {
2529 		mutex_exit(&ds_svcs.lock);
2530 		DS_DBG_USR(CE_NOTE, "%s: unknown hdl: 0x%llx" DS_EOL, __func__,
2531 		    (u_longlong_t)hdl);
2532 		return (ENXIO);
2533 	}
2534 
2535 	DS_DBG_USR(CE_NOTE, "%s: svcid='%s', hdl=0x%llx" DS_EOL, __func__,
2536 	    svc->cap.svc_id, (u_longlong_t)svc->hdl);
2537 
2538 	svc->state = DS_SVC_UNREG_PENDING;
2539 
2540 	is_loopback = ((svc->flags & DSSF_LOOPBACK) != 0);
2541 	lb_hdl = svc->svc_hdl;
2542 
2543 	if (svc->port) {
2544 		(void) ds_send_unreg_req(svc);
2545 	}
2546 
2547 	(void) ds_svc_unregister(svc, svc->port);
2548 
2549 	ds_delete_svc_entry(svc);
2550 
2551 	if (is_loopback) {
2552 		ds_loopback_unregister(lb_hdl);
2553 	}
2554 
2555 	mutex_exit(&ds_svcs.lock);
2556 
2557 	return (0);
2558 }
2559 
2560 int
2561 ds_cap_fini(ds_capability_t *cap)
2562 {
2563 	ds_svc_hdl_t	hdl;
2564 	int rv;
2565 	uint_t nhdls = 0;
2566 
2567 	DS_DBG(CE_NOTE, "%s: '%s'" DS_EOL, __func__, cap->svc_id);
2568 	if ((rv = ds_hdl_lookup(cap->svc_id, 0, &hdl, 1, &nhdls)) != 0) {
2569 		DS_DBG(CE_NOTE, "%s: ds_hdl_lookup '%s' err (%d)" DS_EOL,
2570 		    __func__, cap->svc_id, rv);
2571 		return (rv);
2572 	}
2573 
2574 	if (nhdls == 0) {
2575 		DS_DBG(CE_NOTE, "%s: no such service '%s'" DS_EOL,
2576 		    __func__, cap->svc_id);
2577 		return (ENXIO);
2578 	}
2579 
2580 	if ((rv = ds_is_my_hdl(hdl, DS_INVALID_INSTANCE)) != 0) {
2581 		DS_DBG(CE_NOTE, "%s: ds_is_my_handle err (%d)" DS_EOL, __func__,
2582 		    rv);
2583 		return (rv);
2584 	}
2585 
2586 	if ((rv = ds_unreg_hdl(hdl)) != 0) {
2587 		DS_DBG(CE_NOTE, "%s: ds_unreg_hdl err (%d)" DS_EOL, __func__,
2588 		    rv);
2589 		return (rv);
2590 	}
2591 
2592 	return (0);
2593 }
2594 
2595 int
2596 ds_cap_send(ds_svc_hdl_t hdl, void *buf, size_t len)
2597 {
2598 	int		rv;
2599 	ds_hdr_t	*hdr;
2600 	caddr_t		msg;
2601 	size_t		msglen;
2602 	size_t		hdrlen;
2603 	caddr_t		payload;
2604 	ds_svc_t	*svc;
2605 	ds_port_t	*port;
2606 	ds_data_handle_t *data;
2607 	ds_svc_hdl_t	svc_hdl;
2608 	int		is_client = 0;
2609 
2610 	DS_DBG(CE_NOTE, "%s: hdl: 0x%llx, buf: %lx, len: %ld" DS_EOL, __func__,
2611 	    (u_longlong_t)hdl, (ulong_t)buf, len);
2612 
2613 	mutex_enter(&ds_svcs.lock);
2614 
2615 	if ((svc = ds_get_svc(hdl)) == NULL) {
2616 		cmn_err(CE_WARN, "%s: invalid handle 0x%llx" DS_EOL, __func__,
2617 		    (u_longlong_t)hdl);
2618 		mutex_exit(&ds_svcs.lock);
2619 		return (ENXIO);
2620 	}
2621 
2622 	if (svc->state != DS_SVC_ACTIVE) {
2623 		/* channel is up, but svc is not registered */
2624 		DS_DBG(CE_NOTE, "%s: invalid service state 0x%x" DS_EOL,
2625 		    __func__, svc->state);
2626 		mutex_exit(&ds_svcs.lock);
2627 		return (ENOTCONN);
2628 	}
2629 
2630 	if (svc->flags & DSSF_LOOPBACK) {
2631 		hdl = svc->svc_hdl;
2632 		mutex_exit(&ds_svcs.lock);
2633 		ds_loopback_send(hdl, buf, len);
2634 		return (0);
2635 	}
2636 
2637 	if ((port = svc->port) == NULL) {
2638 		DS_DBG(CE_NOTE, "%s: service '%s' not associated with a port"
2639 		    DS_EOL, __func__, svc->cap.svc_id);
2640 		mutex_exit(&ds_svcs.lock);
2641 		return (ECONNRESET);
2642 	}
2643 
2644 	if (svc->flags & DSSF_ISCLIENT) {
2645 		is_client = 1;
2646 		svc_hdl = svc->svc_hdl;
2647 	}
2648 
2649 	mutex_exit(&ds_svcs.lock);
2650 
2651 	/* check that the LDC channel is ready */
2652 	if (port->ldc.state != LDC_UP) {
2653 		DS_DBG(CE_NOTE, "%s: LDC channel is not up" DS_EOL, __func__);
2654 		return (ECONNRESET);
2655 	}
2656 
2657 	hdrlen = DS_HDR_SZ + sizeof (ds_data_handle_t);
2658 
2659 	msg = DS_MALLOC(len + hdrlen);
2660 	hdr = (ds_hdr_t *)msg;
2661 	payload = msg + hdrlen;
2662 	msglen = len + hdrlen;
2663 
2664 	hdr->payload_len = len + sizeof (ds_data_handle_t);
2665 	hdr->msg_type = DS_DATA;
2666 
2667 	data = (ds_data_handle_t *)(msg + DS_HDR_SZ);
2668 	if (is_client) {
2669 		data->svc_handle = svc_hdl;
2670 	} else {
2671 		data->svc_handle = hdl;
2672 	}
2673 
2674 	if ((buf != NULL) && (len != 0)) {
2675 		(void) memcpy(payload, buf, len);
2676 	}
2677 
2678 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data>: hdl=0x%llx, len=%ld, "
2679 	    " payload_len=%d" DS_EOL, PORTID(port), (u_longlong_t)svc->hdl,
2680 	    msglen, hdr->payload_len);
2681 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msglen);
2682 
2683 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
2684 		rv = (rv == EIO) ? ECONNRESET : rv;
2685 	}
2686 	DS_FREE(msg, msglen);
2687 
2688 	return (rv);
2689 }
2690 
2691 void
2692 ds_port_common_init(ds_port_t *port)
2693 {
2694 	int rv;
2695 
2696 	if ((port->flags & DS_PORT_MUTEX_INITED) == 0) {
2697 		mutex_init(&port->lock, NULL, MUTEX_DRIVER, NULL);
2698 		mutex_init(&port->tx_lock, NULL, MUTEX_DRIVER, NULL);
2699 		mutex_init(&port->rcv_lock, NULL, MUTEX_DRIVER, NULL);
2700 		port->flags |= DS_PORT_MUTEX_INITED;
2701 	}
2702 
2703 	port->state = DS_PORT_INIT;
2704 	DS_PORTSET_ADD(ds_allports, port->id);
2705 
2706 	ds_sys_port_init(port);
2707 
2708 	mutex_enter(&port->lock);
2709 	rv = ds_ldc_init(port);
2710 	mutex_exit(&port->lock);
2711 
2712 	/*
2713 	 * If LDC successfully init'ed, try to kick off protocol for this port.
2714 	 */
2715 	if (rv == 0) {
2716 		ds_handle_up_event(port);
2717 	}
2718 }
2719 
2720 void
2721 ds_port_common_fini(ds_port_t *port, int is_fini)
2722 {
2723 	port->state = DS_PORT_FREE;
2724 
2725 	if (is_fini && (port->flags & DS_PORT_MUTEX_INITED) != 0) {
2726 		mutex_destroy(&port->lock);
2727 		mutex_destroy(&port->tx_lock);
2728 		mutex_destroy(&port->rcv_lock);
2729 		port->flags &= ~DS_PORT_MUTEX_INITED;
2730 	}
2731 
2732 	DS_PORTSET_DEL(ds_allports, port->id);
2733 
2734 	ds_sys_port_fini(port);
2735 }
2736 
2737 /*
2738  * Initialize table of registered service classes
2739  */
2740 void
2741 ds_init_svcs_tbl(uint_t nentries)
2742 {
2743 	int	tblsz;
2744 
2745 	ds_svcs.maxsvcs = nentries;
2746 
2747 	tblsz = ds_svcs.maxsvcs * sizeof (ds_svc_t *);
2748 	ds_svcs.tbl = (ds_svc_t **)DS_MALLOC(tblsz);
2749 
2750 	ds_svcs.nsvcs = 0;
2751 }
2752 
2753 /*
2754  * Find the max and min version supported.
2755  * Hacked from zeus workspace, support.c
2756  */
2757 static void
2758 min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
2759     uint16_t *min_major, uint16_t *max_major)
2760 {
2761 	int i;
2762 
2763 	*min_major = sup_versionsp[0].major;
2764 	*max_major = *min_major;
2765 
2766 	for (i = 1; i < num_versions; i++) {
2767 		if (sup_versionsp[i].major < *min_major)
2768 			*min_major = sup_versionsp[i].major;
2769 
2770 		if (sup_versionsp[i].major > *max_major)
2771 			*max_major = sup_versionsp[i].major;
2772 	}
2773 }
2774 
2775 /*
2776  * Check whether the major and minor numbers requested by the peer can be
2777  * satisfied. If the requested major is supported, true is returned, and the
2778  * agreed minor is returned in new_minor. If the requested major is not
2779  * supported, the routine returns false, and the closest major is returned in
2780  * *new_major, upon which the peer should re-negotiate. The closest major is
2781  * the just lower that the requested major number.
2782  *
2783  * Hacked from zeus workspace, support.c
2784  */
2785 boolean_t
2786 negotiate_version(int num_versions, ds_ver_t *sup_versionsp,
2787     uint16_t req_major, uint16_t *new_majorp, uint16_t *new_minorp)
2788 {
2789 	int i;
2790 	uint16_t major, lower_major;
2791 	uint16_t min_major = 0, max_major;
2792 	boolean_t found_match = B_FALSE;
2793 
2794 	min_max_versions(num_versions, sup_versionsp, &min_major, &max_major);
2795 
2796 	DS_DBG(CE_NOTE, "negotiate_version: req_major = %u, min = %u, max = %u"
2797 	    DS_EOL, req_major, min_major, max_major);
2798 
2799 	/*
2800 	 * If the minimum version supported is greater than
2801 	 * the version requested, return the lowest version
2802 	 * supported
2803 	 */
2804 	if (min_major > req_major) {
2805 		*new_majorp = min_major;
2806 		return (B_FALSE);
2807 	}
2808 
2809 	/*
2810 	 * If the largest version supported is lower than
2811 	 * the version requested, return the largest version
2812 	 * supported
2813 	 */
2814 	if (max_major < req_major) {
2815 		*new_majorp = max_major;
2816 		return (B_FALSE);
2817 	}
2818 
2819 	/*
2820 	 * Now we know that the requested version lies between the
2821 	 * min and max versions supported. Check if the requested
2822 	 * major can be found in supported versions.
2823 	 */
2824 	lower_major = min_major;
2825 	for (i = 0; i < num_versions; i++) {
2826 		major = sup_versionsp[i].major;
2827 		if (major == req_major) {
2828 			found_match = B_TRUE;
2829 			*new_majorp = req_major;
2830 			*new_minorp = sup_versionsp[i].minor;
2831 			break;
2832 		} else {
2833 			if ((major < req_major) && (major > lower_major))
2834 				lower_major = major;
2835 		}
2836 	}
2837 
2838 	/*
2839 	 * If no match is found, return the closest available number
2840 	 */
2841 	if (!found_match)
2842 		*new_majorp = lower_major;
2843 
2844 	return (found_match);
2845 }
2846 
2847 /*
2848  * Specific errno's that are used by ds.c and ldc.c
2849  */
2850 static struct {
2851 	int ds_errno;
2852 	char *estr;
2853 } ds_errno_to_str_tab[] = {
2854 	{ EIO,		"I/O error" },
2855 	{ ENXIO,	"No such device or address" },
2856 	{ EAGAIN,	"Resource temporarily unavailable" },
2857 	{ ENOMEM,	"Not enough space" },
2858 	{ EACCES,	"Permission denied" },
2859 	{ EFAULT,	"Bad address" },
2860 	{ EBUSY,	"Device busy" },
2861 	{ EINVAL,	"Invalid argument" },
2862 	{ ENOSPC,	"No space left on device" },
2863 	{ ENOMSG,	"No message of desired type" },
2864 #ifdef	ECHRNG
2865 	{ ECHRNG,	"Channel number out of range" },
2866 #endif
2867 	{ ENOTSUP,	"Operation not supported" },
2868 	{ EMSGSIZE,	"Message too long" },
2869 	{ EADDRINUSE,	"Address already in use" },
2870 	{ ECONNRESET,	"Connection reset by peer" },
2871 	{ ENOBUFS,	"No buffer space available" },
2872 	{ ENOTCONN,	"Socket is not connected" },
2873 	{ ECONNREFUSED,	"Connection refused" },
2874 	{ EALREADY,	"Operation already in progress" },
2875 	{ 0,		NULL },
2876 };
2877 
2878 char *
2879 ds_errno_to_str(int ds_errno, char *ebuf)
2880 {
2881 	int i, en;
2882 
2883 	for (i = 0; (en = ds_errno_to_str_tab[i].ds_errno) != 0; i++) {
2884 		if (en == ds_errno) {
2885 			(void) strcpy(ebuf, ds_errno_to_str_tab[i].estr);
2886 			return (ebuf);
2887 		}
2888 	}
2889 
2890 	(void) sprintf(ebuf, "ds_errno (%d)", ds_errno);
2891 	return (ebuf);
2892 }
2893 
2894 static void
2895 ds_loopback_register(ds_svc_hdl_t hdl)
2896 {
2897 	ds_ver_t ds_ver;
2898 	ds_svc_t *svc;
2899 
2900 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2901 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2902 	    (u_longlong_t)hdl);
2903 	if ((svc = ds_get_svc(hdl)) == NULL) {
2904 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2905 		    (u_longlong_t)hdl);
2906 		return;
2907 	}
2908 
2909 	svc->state = DS_SVC_ACTIVE;
2910 
2911 	if (svc->ops.ds_reg_cb) {
2912 		DS_DBG_LOOP(CE_NOTE, "%s: loopback regcb: hdl: 0x%llx" DS_EOL,
2913 		    __func__, (u_longlong_t)hdl);
2914 		ds_ver.major = svc->ver.major;
2915 		ds_ver.minor = svc->ver.minor;
2916 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &ds_ver, hdl);
2917 	}
2918 }
2919 
2920 static void
2921 ds_loopback_unregister(ds_svc_hdl_t hdl)
2922 {
2923 	ds_svc_t *svc;
2924 
2925 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2926 	if ((svc = ds_get_svc(hdl)) == NULL) {
2927 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2928 		    (u_longlong_t)hdl);
2929 		return;
2930 	}
2931 
2932 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2933 	    (u_longlong_t)hdl);
2934 
2935 	svc->flags &= ~DSSF_LOOPBACK;
2936 	svc->svc_hdl = DS_BADHDL2;
2937 	svc->state = DS_SVC_INACTIVE;
2938 
2939 	if (svc->ops.ds_unreg_cb) {
2940 		DS_DBG_LOOP(CE_NOTE, "%s: loopback unregcb: hdl: 0x%llx" DS_EOL,
2941 		    __func__, (u_longlong_t)hdl);
2942 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
2943 	}
2944 }
2945 
2946 static void
2947 ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen)
2948 {
2949 	ds_svc_t *svc;
2950 
2951 	mutex_enter(&ds_svcs.lock);
2952 	if ((svc = ds_get_svc(hdl)) == NULL) {
2953 		mutex_exit(&ds_svcs.lock);
2954 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2955 		    (u_longlong_t)hdl);
2956 		return;
2957 	}
2958 	mutex_exit(&ds_svcs.lock);
2959 
2960 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2961 	    (u_longlong_t)hdl);
2962 
2963 	if (svc->ops.ds_data_cb) {
2964 		DS_DBG_LOOP(CE_NOTE, "%s: loopback datacb hdl: 0x%llx" DS_EOL,
2965 		    __func__, (u_longlong_t)hdl);
2966 		(*svc->ops.ds_data_cb)(svc->ops.cb_arg, buf, buflen);
2967 	}
2968 }
2969 
2970 static int
2971 ds_loopback_set_svc(ds_svc_t *svc, ds_capability_t *cap, ds_svc_hdl_t *lb_hdlp)
2972 {
2973 	ds_svc_t *lb_svc;
2974 	ds_svc_hdl_t lb_hdl = *lb_hdlp;
2975 	int i;
2976 	int match = 0;
2977 	uint16_t new_major;
2978 	uint16_t new_minor;
2979 
2980 	if ((lb_svc = ds_get_svc(lb_hdl)) == NULL) {
2981 		DS_DBG_LOOP(CE_NOTE, "%s: loopback: hdl: 0x%llx invalid" DS_EOL,
2982 		    __func__, (u_longlong_t)lb_hdl);
2983 		return (ENXIO);
2984 	}
2985 
2986 	/* negotiate a version between loopback services, if possible */
2987 	for (i = 0; i < lb_svc->cap.nvers && match == 0; i++) {
2988 		match = negotiate_version(cap->nvers, cap->vers,
2989 		    lb_svc->cap.vers[i].major, &new_major, &new_minor);
2990 	}
2991 	if (!match) {
2992 		DS_DBG_LOOP(CE_NOTE, "%s: loopback version negotiate failed"
2993 		    DS_EOL, __func__);
2994 		return (ENOTSUP);
2995 	}
2996 	if (lb_svc->state != DS_SVC_INACTIVE) {
2997 		DS_DBG_LOOP(CE_NOTE, "%s: loopback active: hdl: 0x%llx"
2998 		    DS_EOL, __func__, (u_longlong_t)lb_hdl);
2999 		if ((lb_svc->flags & DSSF_ISCLIENT) == 0) {
3000 			DS_DBG_LOOP(CE_NOTE, "%s: loopback busy hdl: 0x%llx"
3001 			    DS_EOL, __func__, (u_longlong_t)lb_hdl);
3002 			return (EBUSY);
3003 		}
3004 		svc->state = DS_SVC_INACTIVE;	/* prevent alloc'ing svc */
3005 		lb_svc = ds_svc_clone(lb_svc);
3006 		DS_DBG_LOOP(CE_NOTE, "%s: loopback clone: ohdl: 0x%llx "
3007 		    "nhdl: 0x%llx" DS_EOL, __func__, (u_longlong_t)lb_hdl,
3008 		    (u_longlong_t)lb_svc->hdl);
3009 		*lb_hdlp = lb_svc->hdl;
3010 	}
3011 
3012 	svc->flags |= DSSF_LOOPBACK;
3013 	svc->svc_hdl = lb_svc->hdl;
3014 	svc->port = NULL;
3015 	svc->ver.major = new_major;
3016 	svc->ver.minor = new_minor;
3017 
3018 	lb_svc->flags |= DSSF_LOOPBACK;
3019 	lb_svc->svc_hdl = svc->hdl;
3020 	lb_svc->port = NULL;
3021 	lb_svc->ver.major = new_major;
3022 	lb_svc->ver.minor = new_minor;
3023 
3024 	DS_DBG_LOOP(CE_NOTE, "%s: setting loopback between: 0x%llx and 0x%llx"
3025 	    DS_EOL, __func__, (u_longlong_t)svc->hdl,
3026 	    (u_longlong_t)lb_svc->hdl);
3027 	return (0);
3028 }
3029 
3030 static ds_svc_t *
3031 ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl, ds_port_t *port)
3032 {
3033 	int		idx;
3034 	ds_svc_t	*svc;
3035 
3036 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s looking up clnt hdl: 0x%llx" DS_EOL,
3037 	    PORTID(port), __func__, (u_longlong_t)hdl);
3038 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3039 
3040 	/* walk every table entry */
3041 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
3042 		svc = ds_svcs.tbl[idx];
3043 		if (DS_SVC_ISFREE(svc))
3044 			continue;
3045 		if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3046 		    svc->svc_hdl == hdl && svc->port == port) {
3047 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s found clnt hdl "
3048 			    "0x%llx: svc%d" DS_EOL, PORTID(port), __func__,
3049 			    (u_longlong_t)hdl, (uint_t)DS_HDL2IDX(svc->hdl));
3050 			return (svc);
3051 		}
3052 	}
3053 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s clnt hdl: 0x%llx not found" DS_EOL,
3054 	    PORTID(port), __func__, (u_longlong_t)hdl);
3055 
3056 	return (NULL);
3057 }
3058 
3059 static ds_svc_t *
3060 ds_svc_clone(ds_svc_t *svc)
3061 {
3062 	ds_svc_t *newsvc;
3063 	ds_svc_hdl_t hdl;
3064 
3065 	ASSERT(svc->flags & DSSF_ISCLIENT);
3066 
3067 	newsvc = ds_alloc_svc();
3068 
3069 	/* Can only clone clients for now */
3070 	hdl = newsvc->hdl | DS_HDL_ISCLIENT_BIT;
3071 	DS_DBG_USR(CE_NOTE, "%s: cloning client: old hdl: 0x%llx new hdl: "
3072 	    "0x%llx" DS_EOL, __func__, (u_longlong_t)svc->hdl,
3073 	    (u_longlong_t)hdl);
3074 	(void) memcpy(newsvc, svc, sizeof (ds_svc_t));
3075 	newsvc->hdl = hdl;
3076 	newsvc->flags &= ~DSSF_LOOPBACK;
3077 	newsvc->port = NULL;
3078 	newsvc->svc_hdl = DS_BADHDL2;
3079 	newsvc->cap.svc_id = ds_strdup(svc->cap.svc_id);
3080 	newsvc->cap.vers = DS_MALLOC(svc->cap.nvers * sizeof (ds_ver_t));
3081 	(void) memcpy(newsvc->cap.vers, svc->cap.vers,
3082 	    svc->cap.nvers * sizeof (ds_ver_t));
3083 
3084 	/*
3085 	 * Kludge to allow lds driver user callbacks to get access to current
3086 	 * svc structure.  Arg could be index to svc table or some other piece
3087 	 * of info to get to the svc table entry.
3088 	 */
3089 	if (newsvc->flags & DSSF_ISUSER) {
3090 		newsvc->ops.cb_arg = (ds_cb_arg_t)(newsvc);
3091 	}
3092 	return (newsvc);
3093 }
3094 
3095 /*
3096  * Internal handle lookup function.
3097  */
3098 static int
3099 i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3100     uint_t maxhdls)
3101 {
3102 	int idx;
3103 	int nhdls = 0;
3104 	ds_svc_t *svc;
3105 	uint32_t client_flag = is_client ? DSSF_ISCLIENT : 0;
3106 
3107 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3108 
3109 	for (idx = 0; idx < ds_svcs.maxsvcs && nhdls < maxhdls; idx++) {
3110 		svc = ds_svcs.tbl[idx];
3111 		if (DS_SVC_ISFREE(svc))
3112 			continue;
3113 		if (strcmp(svc->cap.svc_id, service) == 0 &&
3114 		    (svc->flags & DSSF_ISCLIENT) == client_flag) {
3115 			if (hdlp != NULL && nhdls < maxhdls) {
3116 				hdlp[nhdls] = svc->hdl;
3117 				nhdls++;
3118 			} else {
3119 				nhdls++;
3120 			}
3121 		}
3122 	}
3123 	return (nhdls);
3124 }
3125 
3126 /*
3127  * Interface for ds_hdl_lookup in lds driver.
3128  */
3129 int
3130 ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3131     uint_t maxhdls, uint_t *nhdlsp)
3132 {
3133 	mutex_enter(&ds_svcs.lock);
3134 	*nhdlsp = i_ds_hdl_lookup(service, is_client, hdlp, maxhdls);
3135 	mutex_exit(&ds_svcs.lock);
3136 	return (0);
3137 }
3138 
3139 static void
3140 ds_portset_del_active_clients(char *service, ds_portset_t *portsp)
3141 {
3142 	ds_portset_t ports = *portsp;
3143 	int idx;
3144 	ds_svc_t *svc;
3145 	ds_svc_hdl_t hdl;
3146 
3147 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3148 
3149 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
3150 		svc = ds_svcs.tbl[idx];
3151 		if (DS_SVC_ISFREE(svc))
3152 			continue;
3153 		if (strcmp(svc->cap.svc_id, service) == 0 &&
3154 		    (svc->flags & DSSF_ISCLIENT) != 0 &&
3155 		    svc->state != DS_SVC_INACTIVE &&
3156 		    svc->port != NULL) {
3157 			DS_PORTSET_DEL(ports, PORTID(svc->port));
3158 		}
3159 	}
3160 
3161 	/*
3162 	 * Legacy "pri" client service should not try to make a
3163 	 * connection to the SP if the existing "pri" provider
3164 	 * service has a connection.
3165 	 */
3166 	if (strcmp(service, "pri") == 0 &&
3167 	    i_ds_hdl_lookup(service, 0, &hdl, 1) == 1 &&
3168 	    (svc = ds_get_svc(hdl)) != NULL && svc->state == DS_SVC_ACTIVE &&
3169 	    svc->port != NULL) {
3170 		DS_PORTSET_DEL(ports, PORTID(svc->port));
3171 	}
3172 	*portsp = ports;
3173 }
3174 
3175 /*
3176  * After an UNREG REQ, check if this is a client service with multiple
3177  * handles.  If it is, then we can eliminate this entry.
3178  */
3179 static void
3180 ds_check_for_dup_services(ds_svc_t *svc)
3181 {
3182 	if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3183 	    svc->state == DS_SVC_INACTIVE &&
3184 	    i_ds_hdl_lookup(svc->cap.svc_id, 1, NULL, 2) == 2) {
3185 		ds_delete_svc_entry(svc);
3186 	}
3187 }
3188 
3189 static void
3190 ds_delete_svc_entry(ds_svc_t *svc)
3191 {
3192 	ds_svc_hdl_t tmp_hdl;
3193 
3194 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3195 
3196 	/*
3197 	 * Clear out the structure, but do not deallocate the
3198 	 * memory. It can be reused for the next registration.
3199 	 */
3200 	DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
3201 	DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
3202 
3203 	/* save the handle to prevent reuse */
3204 	tmp_hdl = svc->hdl;
3205 	bzero((void *)svc, sizeof (ds_svc_t));
3206 
3207 	/* initialize for next use */
3208 	svc->hdl = tmp_hdl;
3209 	svc->state = DS_SVC_FREE;
3210 
3211 	ds_svcs.nsvcs--;
3212 }
3213