xref: /illumos-gate/usr/src/uts/sun4v/io/ds_common.c (revision 17f1e64a433a4ca00ffed7539e10c297580a7002)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * Domain Services Module Common Code.
29  *
30  * This module is intended to be used by both Solaris and the VBSC
31  * module.
32  */
33 
34 #include <sys/modctl.h>
35 #include <sys/ksynch.h>
36 #include <sys/taskq.h>
37 #include <sys/disp.h>
38 #include <sys/cmn_err.h>
39 #include <sys/note.h>
40 #include <sys/mach_descrip.h>
41 #include <sys/mdesc.h>
42 #include <sys/ldc.h>
43 #include <sys/ds.h>
44 #include <sys/ds_impl.h>
45 
46 #ifndef MIN
47 #define	MIN(a, b)	((a) < (b) ? (a) : (b))
48 #endif
49 
50 #define	DS_DECODE_BUF_LEN		30
51 
52 /*
53  * All DS ports in the system
54  *
55  * The list of DS ports is read in from the MD when the DS module is
56  * initialized and is never modified. This eliminates the need for
57  * locking to access the port array itself. Access to the individual
58  * ports are synchronized at the port level.
59  */
60 ds_port_t	ds_ports[DS_MAX_PORTS];
61 ds_portset_t	ds_allports;	/* all DS ports in the system */
62 ds_portset_t	ds_nullport;	/* allows test against null portset */
63 
64 /* DS SP port id */
65 uint64_t ds_sp_port_id = DS_PORTID_INVALID;
66 
67 /*
68  * Table of registered services
69  *
70  * Locking: Accesses to the table of services are synchronized using
71  *   a mutex lock. The reader lock must be held when looking up service
72  *   information in the table. The writer lock must be held when any
73  *   service information is being modified.
74  */
75 ds_svcs_t	ds_svcs;
76 
77 /*
78  * Flag to prevent callbacks while in the middle of DS teardown.
79  */
80 boolean_t ds_enabled = B_FALSE;	/* enable/disable taskq processing */
81 
82 /*
83  * Retry count and delay for LDC reads and writes
84  */
85 #ifndef DS_DEFAULT_RETRIES
86 #define	DS_DEFAULT_RETRIES	10000	/* number of times to retry */
87 #endif
88 #ifndef DS_DEFAULT_DELAY
89 #define	DS_DEFAULT_DELAY	1000	/* usecs to wait between retries */
90 #endif
91 
92 static int ds_retries = DS_DEFAULT_RETRIES;
93 static clock_t ds_delay = DS_DEFAULT_DELAY;
94 
95 /*
96  * Supported versions of the DS message protocol
97  *
98  * The version array must be sorted in order from the highest
99  * supported version to the lowest. Support for a particular
100  * <major>.<minor> version implies all lower minor versions of
101  * that same major version are supported as well.
102  */
103 static ds_ver_t ds_vers[] = { { 1, 0 } };
104 
105 #define	DS_NUM_VER	(sizeof (ds_vers) / sizeof (ds_vers[0]))
106 
107 
108 /* incoming message handling functions */
109 typedef void (*ds_msg_handler_t)(ds_port_t *port, caddr_t buf, size_t len);
110 static void ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len);
111 static void ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len);
112 static void ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len);
113 static void ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len);
114 static void ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len);
115 static void ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len);
116 static void ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len);
117 static void ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len);
118 static void ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len);
119 static void ds_handle_data(ds_port_t *port, caddr_t buf, size_t len);
120 static void ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len);
121 
122 /*
123  * DS Message Handler Dispatch Table
124  *
125  * A table used to dispatch all incoming messages. This table
126  * contains handlers for all the fixed message types, as well as
127  * the the messages defined in the 1.0 version of the DS protocol.
128  * The handlers are indexed based on the DS header msg_type values
129  */
130 static const ds_msg_handler_t ds_msg_handlers[] = {
131 	ds_handle_init_req,		/* DS_INIT_REQ */
132 	ds_handle_init_ack,		/* DS_INIT_ACK */
133 	ds_handle_init_nack,		/* DS_INIT_NACK */
134 	ds_handle_reg_req,		/* DS_REG_REQ */
135 	ds_handle_reg_ack,		/* DS_REG_ACK */
136 	ds_handle_reg_nack,		/* DS_REG_NACK */
137 	ds_handle_unreg_req,		/* DS_UNREG */
138 	ds_handle_unreg_ack,		/* DS_UNREG_ACK */
139 	ds_handle_unreg_nack,		/* DS_UNREG_NACK */
140 	ds_handle_data,			/* DS_DATA */
141 	ds_handle_nack			/* DS_NACK */
142 };
143 
144 
145 
146 /* initialization functions */
147 static int ds_ldc_init(ds_port_t *port);
148 
149 /* event processing functions */
150 static uint_t ds_ldc_cb(uint64_t event, caddr_t arg);
151 static int ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep);
152 static void ds_handle_up_event(ds_port_t *port);
153 static void ds_handle_down_reset_events(ds_port_t *port);
154 static void ds_handle_recv(void *arg);
155 static void ds_dispatch_event(void *arg);
156 
157 /* message sending functions */
158 static int ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen);
159 static int ds_send_reg_req(ds_svc_t *svc, ds_port_t *port);
160 static void ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
161 static void ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
162 
163 /* walker functions */
164 static int ds_svc_isfree(ds_svc_t *svc, void *arg);
165 static int ds_svc_unregister(ds_svc_t *svc, void *arg);
166 static int ds_svc_port_up(ds_svc_t *svc, void *arg);
167 
168 /* service utilities */
169 static void ds_reset_svc(ds_svc_t *svc, ds_port_t *port);
170 static int ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port);
171 
172 /* port utilities */
173 static void ds_port_reset(ds_port_t *port);
174 static ldc_status_t ds_update_ldc_state(ds_port_t *port);
175 
176 /* misc utilities */
177 static void min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
178     uint16_t *min_major, uint16_t *max_major);
179 
180 /* debug */
181 static char *decode_ldc_events(uint64_t event, char *buf);
182 
183 /* loopback */
184 static void ds_loopback_register(ds_svc_hdl_t hdl);
185 static void ds_loopback_unregister(ds_svc_hdl_t hdl);
186 static void ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen);
187 static int ds_loopback_set_svc(ds_svc_t *svc, ds_capability_t *cap,
188     ds_svc_hdl_t *lb_hdlp);
189 
190 /* client handling */
191 static int i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
192     uint_t maxhdls);
193 static ds_svc_t *ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl,
194     ds_port_t *port);
195 static ds_svc_t *ds_find_svc_by_id_port(char *svc_id, int is_client,
196     ds_port_t *port);
197 static ds_svc_t *ds_svc_clone(ds_svc_t *svc);
198 static void ds_portset_del_active_clients(char *service, ds_portset_t *portsp);
199 static void ds_check_for_dup_services(ds_svc_t *svc);
200 static void ds_delete_svc_entry(ds_svc_t *svc);
201 
202 char *
203 ds_strdup(char *str)
204 {
205 	char *newstr;
206 
207 	newstr = DS_MALLOC(strlen(str) + 1);
208 	(void) strcpy(newstr, str);
209 	return (newstr);
210 }
211 
212 void
213 ds_common_init(void)
214 {
215 	/* Validate version table */
216 	ASSERT(ds_vers_isvalid(ds_vers, DS_NUM_VER) == DS_VERS_OK);
217 
218 	/* Initialize services table */
219 	ds_init_svcs_tbl(DS_MAXSVCS_INIT);
220 
221 	/* enable callback processing */
222 	ds_enabled = B_TRUE;
223 }
224 
225 /* BEGIN LDC SUPPORT FUNCTIONS */
226 
227 static char *
228 decode_ldc_events(uint64_t event, char *buf)
229 {
230 	buf[0] = 0;
231 	if (event & LDC_EVT_DOWN)	(void) strcat(buf, " DOWN");
232 	if (event & LDC_EVT_RESET)	(void) strcat(buf, " RESET");
233 	if (event & LDC_EVT_UP)		(void) strcat(buf, " UP");
234 	if (event & LDC_EVT_READ)	(void) strcat(buf, " READ");
235 	if (event & LDC_EVT_WRITE)	(void) strcat(buf, " WRITE");
236 	return (buf);
237 }
238 
239 static ldc_status_t
240 ds_update_ldc_state(ds_port_t *port)
241 {
242 	ldc_status_t	ldc_state;
243 	int		rv;
244 	char		ebuf[DS_EBUFSIZE];
245 
246 	ASSERT(MUTEX_HELD(&port->lock));
247 
248 	/*
249 	 * Read status and update ldc state info in port structure.
250 	 */
251 	if ((rv = ldc_status(port->ldc.hdl, &ldc_state)) != 0) {
252 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_status error: %s" DS_EOL,
253 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
254 		ldc_state = port->ldc.state;
255 	} else {
256 		port->ldc.state = ldc_state;
257 	}
258 
259 	return (ldc_state);
260 }
261 
262 static void
263 ds_handle_down_reset_events(ds_port_t *port)
264 {
265 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
266 	    __func__);
267 
268 	mutex_enter(&ds_svcs.lock);
269 	mutex_enter(&port->lock);
270 
271 	ds_sys_drain_events(port);
272 
273 	(void) ds_update_ldc_state(port);
274 
275 	/* reset the port state */
276 	ds_port_reset(port);
277 
278 	/* acknowledge the reset */
279 	(void) ldc_up(port->ldc.hdl);
280 
281 	mutex_exit(&port->lock);
282 	mutex_exit(&ds_svcs.lock);
283 
284 	ds_handle_up_event(port);
285 
286 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
287 }
288 
289 static void
290 ds_handle_up_event(ds_port_t *port)
291 {
292 	ldc_status_t	ldc_state;
293 
294 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
295 	    __func__);
296 
297 	mutex_enter(&port->lock);
298 
299 	ldc_state = ds_update_ldc_state(port);
300 
301 	mutex_exit(&port->lock);
302 
303 	if ((ldc_state == LDC_UP) && IS_DS_PORT(port)) {
304 		/*
305 		 * Initiate the handshake.
306 		 */
307 		ds_send_init_req(port);
308 	}
309 
310 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
311 }
312 
313 static uint_t
314 ds_ldc_cb(uint64_t event, caddr_t arg)
315 {
316 	ds_port_t	*port = (ds_port_t *)arg;
317 	char		evstring[DS_DECODE_BUF_LEN];
318 
319 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: %s event (%llx) received" DS_EOL,
320 	    PORTID(port), __func__, decode_ldc_events(event, evstring),
321 	    (u_longlong_t)event);
322 
323 	if (!ds_enabled) {
324 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: callback handling is disabled"
325 		    DS_EOL, PORTID(port), __func__);
326 		return (LDC_SUCCESS);
327 	}
328 
329 	if (event & (LDC_EVT_DOWN | LDC_EVT_RESET)) {
330 		ds_handle_down_reset_events(port);
331 		goto done;
332 	}
333 
334 	if (event & LDC_EVT_UP) {
335 		ds_handle_up_event(port);
336 	}
337 
338 	if (event & LDC_EVT_READ) {
339 		if (port->ldc.state != LDC_UP) {
340 			cmn_err(CE_WARN, "ds@%lx: %s: LDC READ event while "
341 			    "port not up" DS_EOL, PORTID(port), __func__);
342 			goto done;
343 		}
344 
345 		if (ds_sys_dispatch_func(ds_handle_recv, port)) {
346 			cmn_err(CE_WARN, "ds@%lx: error initiating LDC READ "
347 			    " event", PORTID(port));
348 		}
349 	}
350 
351 	if (event & LDC_EVT_WRITE) {
352 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: LDC WRITE event received, "
353 		    "not supported" DS_EOL, PORTID(port), __func__);
354 	}
355 
356 	if (event & ~(LDC_EVT_UP | LDC_EVT_READ)) {
357 		cmn_err(CE_WARN, "ds@%lx: %s: Unexpected LDC event received: "
358 		    "0x%llx" DS_EOL, PORTID(port), __func__,
359 		    (u_longlong_t)event);
360 	}
361 done:
362 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
363 
364 	return (LDC_SUCCESS);
365 }
366 
367 static int
368 ds_ldc_init(ds_port_t *port)
369 {
370 	int		rv;
371 	ldc_attr_t	ldc_attr;
372 	caddr_t		ldc_cb_arg = (caddr_t)port;
373 	char		ebuf[DS_EBUFSIZE];
374 
375 	ASSERT(MUTEX_HELD(&port->lock));
376 
377 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%lld" DS_EOL,
378 	    PORTID(port), __func__, (u_longlong_t)port->ldc.id);
379 
380 	ldc_attr.devclass = LDC_DEV_GENERIC;
381 	ldc_attr.instance = 0;
382 	ldc_attr.mode = LDC_MODE_RELIABLE;
383 	ldc_attr.mtu = DS_STREAM_MTU;
384 
385 	if ((rv = ldc_init(port->ldc.id, &ldc_attr, &port->ldc.hdl)) != 0) {
386 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_id: %lx, ldc_init error: %s"
387 		    DS_EOL, PORTID(port), __func__, port->ldc.id,
388 		    ds_errno_to_str(rv, ebuf));
389 		return (rv);
390 	}
391 
392 	rv = ldc_reg_callback(port->ldc.hdl, ds_ldc_cb, ldc_cb_arg);
393 	if (rv != 0) {
394 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_reg_callback error: %s"
395 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
396 		return (rv);
397 	}
398 
399 	ds_sys_ldc_init(port);
400 	return (0);
401 }
402 
403 int
404 ds_ldc_fini(ds_port_t *port)
405 {
406 	int	rv;
407 	char	ebuf[DS_EBUFSIZE];
408 
409 	ASSERT(port->state >= DS_PORT_LDC_INIT);
410 
411 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%ld" DS_EOL, PORTID(port),
412 	    __func__, port->ldc.id);
413 
414 	if ((rv = ldc_close(port->ldc.hdl)) != 0) {
415 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_close error: %s" DS_EOL,
416 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
417 		return (rv);
418 	}
419 
420 	if ((rv = ldc_unreg_callback(port->ldc.hdl)) != 0) {
421 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_unreg_callback error: %s"
422 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
423 		return (rv);
424 	}
425 
426 	if ((rv = ldc_fini(port->ldc.hdl)) != 0) {
427 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_fini error: %s" DS_EOL,
428 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
429 		return (rv);
430 	}
431 
432 	return (rv);
433 }
434 
435 /*
436  * Attempt to read a specified number of bytes from a particular LDC.
437  * Returns zero for success or the return code from the LDC read on
438  * failure. The actual number of bytes read from the LDC is returned
439  * in the size parameter.
440  */
441 static int
442 ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep)
443 {
444 	int	rv = 0;
445 	size_t	bytes_req = *sizep;
446 	size_t	bytes_left = bytes_req;
447 	size_t	nbytes;
448 	int	retry_count = 0;
449 	char	ebuf[DS_EBUFSIZE];
450 
451 	ASSERT(MUTEX_HELD(&port->rcv_lock));
452 
453 	*sizep = 0;
454 
455 	DS_DBG_LDC(CE_NOTE, "ds@%lx: attempting to read %ld bytes" DS_EOL,
456 	    PORTID(port), bytes_req);
457 
458 	while (bytes_left > 0) {
459 
460 		nbytes = bytes_left;
461 
462 		if ((rv = ldc_read(port->ldc.hdl, msgp, &nbytes)) != 0) {
463 			if (rv == ECONNRESET) {
464 				break;
465 			} else if (rv != EAGAIN) {
466 				cmn_err(CE_NOTE, "ds@%lx: %s: %s" DS_EOL,
467 				    PORTID(port), __func__,
468 				    ds_errno_to_str(rv, ebuf));
469 				break;
470 			}
471 		} else {
472 			if (nbytes != 0) {
473 				DS_DBG_LDC(CE_NOTE, "ds@%lx: "
474 				    "read %ld bytes, %d retries" DS_EOL,
475 				    PORTID(port), nbytes, retry_count);
476 
477 				*sizep += nbytes;
478 				msgp += nbytes;
479 				bytes_left -= nbytes;
480 
481 				/* reset counter on a successful read */
482 				retry_count = 0;
483 				continue;
484 			}
485 
486 			/*
487 			 * No data was read. Check if this is the
488 			 * first attempt. If so, just return since
489 			 * nothing has been read yet.
490 			 */
491 			if (bytes_left == bytes_req) {
492 				DS_DBG_LDC(CE_NOTE, "ds@%lx: read zero bytes, "
493 				    " no data available" DS_EOL, PORTID(port));
494 				break;
495 			}
496 		}
497 
498 		/*
499 		 * A retry is necessary because the read returned
500 		 * EAGAIN, or a zero length read occurred after
501 		 * reading a partial message.
502 		 */
503 		if (retry_count++ >= ds_retries) {
504 			DS_DBG_LDC(CE_NOTE, "ds@%lx: timed out waiting for "
505 			    "message" DS_EOL, PORTID(port));
506 			break;
507 		}
508 
509 		drv_usecwait(ds_delay);
510 	}
511 
512 	return (rv);
513 }
514 
515 static void
516 ds_handle_recv(void *arg)
517 {
518 	ds_port_t	*port = (ds_port_t *)arg;
519 	char		*hbuf;
520 	size_t		msglen;
521 	size_t		read_size;
522 	boolean_t	hasdata;
523 	ds_hdr_t	hdr;
524 	uint8_t		*msg;
525 	char		*currp;
526 	int		rv;
527 	ds_event_t	*devent;
528 
529 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s..." DS_EOL, PORTID(port), __func__);
530 
531 	/*
532 	 * Read messages from the channel until there are none
533 	 * pending. Valid messages are dispatched to be handled
534 	 * by a separate thread while any malformed messages are
535 	 * dropped.
536 	 */
537 
538 	mutex_enter(&port->rcv_lock);
539 
540 	while (((rv = ldc_chkq(port->ldc.hdl, &hasdata)) == 0) && hasdata) {
541 
542 		DS_DBG(CE_NOTE, "ds@%lx: %s: reading next message" DS_EOL,
543 		    PORTID(port), __func__);
544 
545 		/*
546 		 * Read in the next message.
547 		 */
548 		hbuf = (char *)&hdr;
549 		bzero(hbuf, DS_HDR_SZ);
550 		read_size = DS_HDR_SZ;
551 		currp = hbuf;
552 
553 		/* read in the message header */
554 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
555 			break;
556 		}
557 
558 		if (read_size < DS_HDR_SZ) {
559 			/*
560 			 * A zero length read is a valid signal that
561 			 * there is no data left on the channel.
562 			 */
563 			if (read_size != 0) {
564 				cmn_err(CE_WARN, "ds@%lx: invalid message "
565 				    "length, received %ld bytes, expected %ld"
566 				    DS_EOL, PORTID(port), read_size, DS_HDR_SZ);
567 			}
568 			continue;
569 		}
570 
571 		/* get payload size and allocate a buffer */
572 		read_size = ((ds_hdr_t *)hbuf)->payload_len;
573 		msglen = DS_HDR_SZ + read_size;
574 		msg = DS_MALLOC(msglen);
575 		if (!msg) {
576 			cmn_err(CE_WARN, "Memory allocation failed attempting "
577 			    " to allocate %d bytes." DS_EOL, (int)msglen);
578 			continue;
579 		}
580 
581 		DS_DBG(CE_NOTE, "ds@%lx: %s: message payload len %d" DS_EOL,
582 		    PORTID(port), __func__, (int)read_size);
583 
584 		/* move message header into buffer */
585 		(void) memcpy(msg, hbuf, DS_HDR_SZ);
586 		currp = (char *)(msg) + DS_HDR_SZ;
587 
588 		/* read in the message body */
589 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
590 			DS_FREE(msg, msglen);
591 			break;
592 		}
593 
594 		/* validate the size of the message */
595 		if ((DS_HDR_SZ + read_size) != msglen) {
596 			cmn_err(CE_WARN, "ds@%lx: %s: invalid message length, "
597 			    "received %ld bytes, expected %ld" DS_EOL,
598 			    PORTID(port), __func__, (DS_HDR_SZ + read_size),
599 			    msglen);
600 			DS_FREE(msg, msglen);
601 			continue;
602 		}
603 
604 		DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
605 
606 		/*
607 		 * Send the message for processing, and store it
608 		 * in the log. The memory is deallocated only when
609 		 * the message is removed from the log.
610 		 */
611 
612 		devent = DS_MALLOC(sizeof (ds_event_t));
613 		devent->port = port;
614 		devent->buf = (char *)msg;
615 		devent->buflen = msglen;
616 
617 		/* log the message */
618 		(void) ds_log_add_msg(DS_LOG_IN(port->id), msg, msglen);
619 
620 		if (ds_sys_dispatch_func(ds_dispatch_event, devent)) {
621 			cmn_err(CE_WARN, "ds@%lx: error initiating "
622 			    "event handler", PORTID(port));
623 			DS_FREE(devent, sizeof (ds_event_t));
624 		}
625 	}
626 
627 	mutex_exit(&port->rcv_lock);
628 
629 	/* handle connection reset errors returned from ds_recv_msg */
630 	if (rv == ECONNRESET) {
631 		ds_handle_down_reset_events(port);
632 	}
633 
634 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s done" DS_EOL, PORTID(port), __func__);
635 }
636 
637 static void
638 ds_dispatch_event(void *arg)
639 {
640 	ds_event_t	*event = (ds_event_t *)arg;
641 	ds_hdr_t	*hdr;
642 	ds_port_t	*port;
643 
644 	port = event->port;
645 
646 	hdr = (ds_hdr_t *)event->buf;
647 
648 	if (DS_MSG_TYPE_VALID(hdr->msg_type)) {
649 		DS_DBG(CE_NOTE, "ds@%lx: dispatch_event: msg_type=%d" DS_EOL,
650 		    PORTID(port), hdr->msg_type);
651 
652 		(*ds_msg_handlers[hdr->msg_type])(port, event->buf,
653 		    event->buflen);
654 	} else {
655 		cmn_err(CE_WARN, "ds@%lx: dispatch_event: invalid msg "
656 		    "type (%d)" DS_EOL, PORTID(port), hdr->msg_type);
657 	}
658 
659 	DS_FREE(event->buf, event->buflen);
660 	DS_FREE(event, sizeof (ds_event_t));
661 }
662 
663 int
664 ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen)
665 {
666 	int	rv;
667 	caddr_t	currp = msg;
668 	size_t	amt_left = msglen;
669 	int	loopcnt = 0;
670 
671 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s msglen: %ld" DS_EOL, PORTID(port),
672 	    __func__, msglen);
673 	DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
674 
675 	/*
676 	 * Ensure that no other messages can be sent on this port by holding
677 	 * the tx_lock mutex in case the write doesn't get sent with one write.
678 	 * This guarantees that the message doesn't become fragmented.
679 	 */
680 	mutex_enter(&port->tx_lock);
681 
682 	do {
683 		if ((rv = ldc_write(port->ldc.hdl, currp, &msglen)) != 0) {
684 			if (rv == ECONNRESET) {
685 				mutex_exit(&port->tx_lock);
686 				ds_handle_down_reset_events(port);
687 				return (rv);
688 			} else if ((rv == EWOULDBLOCK) &&
689 			    (loopcnt++ < ds_retries)) {
690 				drv_usecwait(ds_delay);
691 			} else {
692 				cmn_err(CE_WARN, "ds@%lx: send_msg: ldc_write "
693 				    "failed (%d), %d bytes remaining" DS_EOL,
694 				    PORTID(port), rv, (int)amt_left);
695 				goto error;
696 			}
697 		} else {
698 			amt_left -= msglen;
699 			currp += msglen;
700 			msglen = amt_left;
701 			loopcnt = 0;
702 		}
703 	} while (amt_left > 0);
704 error:
705 	mutex_exit(&port->tx_lock);
706 
707 	return (rv);
708 }
709 
710 /* END LDC SUPPORT FUNCTIONS */
711 
712 
713 /* BEGIN DS PROTOCOL SUPPORT FUNCTIONS */
714 
715 static void
716 ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len)
717 {
718 	ds_hdr_t	*hdr;
719 	ds_init_ack_t	*ack;
720 	ds_init_nack_t	*nack;
721 	char		*msg;
722 	size_t		msglen;
723 	ds_init_req_t	*req;
724 	size_t		explen = DS_MSG_LEN(ds_init_req_t);
725 	uint16_t	new_major;
726 	uint16_t	new_minor;
727 	boolean_t	match;
728 
729 	/* sanity check the incoming message */
730 	if (len != explen) {
731 		cmn_err(CE_WARN, "ds@%lx: <init_req: invalid message "
732 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
733 		    explen);
734 		return;
735 	}
736 
737 	req = (ds_init_req_t *)(buf + DS_HDR_SZ);
738 
739 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_req: ver=%d.%d" DS_EOL,
740 	    PORTID(port), req->major_vers, req->minor_vers);
741 
742 	match = negotiate_version(DS_NUM_VER, &ds_vers[0],
743 	    req->major_vers, &new_major, &new_minor);
744 
745 	/*
746 	 * Check version info. ACK only if the major numbers exactly
747 	 * match. The service entity can retry with a new minor
748 	 * based on the response sent as part of the NACK.
749 	 */
750 	if (match) {
751 		msglen = DS_MSG_LEN(ds_init_ack_t);
752 		msg = DS_MALLOC(msglen);
753 
754 		hdr = (ds_hdr_t *)msg;
755 		hdr->msg_type = DS_INIT_ACK;
756 		hdr->payload_len = sizeof (ds_init_ack_t);
757 
758 		ack = (ds_init_ack_t *)(msg + DS_HDR_SZ);
759 		ack->minor_vers = MIN(new_minor, req->minor_vers);
760 
761 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_ack>: minor=0x%04X" DS_EOL,
762 		    PORTID(port), MIN(new_minor, req->minor_vers));
763 	} else {
764 		msglen = DS_MSG_LEN(ds_init_nack_t);
765 		msg = DS_MALLOC(msglen);
766 
767 		hdr = (ds_hdr_t *)msg;
768 		hdr->msg_type = DS_INIT_NACK;
769 		hdr->payload_len = sizeof (ds_init_nack_t);
770 
771 		nack = (ds_init_nack_t *)(msg + DS_HDR_SZ);
772 		nack->major_vers = new_major;
773 
774 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_nack>: major=0x%04X" DS_EOL,
775 		    PORTID(port), new_major);
776 	}
777 
778 	/*
779 	 * Send the response
780 	 */
781 	(void) ds_send_msg(port, msg, msglen);
782 	DS_FREE(msg, msglen);
783 }
784 
785 static void
786 ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len)
787 {
788 	ds_init_ack_t	*ack;
789 	ds_ver_t	*ver;
790 	size_t		explen = DS_MSG_LEN(ds_init_ack_t);
791 
792 	/* sanity check the incoming message */
793 	if (len != explen) {
794 		cmn_err(CE_WARN, "ds@%lx: <init_ack: invalid message "
795 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
796 		    explen);
797 		return;
798 	}
799 
800 	ack = (ds_init_ack_t *)(buf + DS_HDR_SZ);
801 
802 	mutex_enter(&port->lock);
803 
804 	if (port->state != DS_PORT_INIT_REQ) {
805 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: invalid state: %d"
806 		    DS_EOL, PORTID(port), port->state);
807 		mutex_exit(&port->lock);
808 		return;
809 	}
810 
811 	ver = &(ds_vers[port->ver_idx]);
812 
813 	/* agreed upon a major version */
814 	port->ver.major = ver->major;
815 
816 	/*
817 	 * If the returned minor version is larger than
818 	 * the requested minor version, use the lower of
819 	 * the two, i.e. the requested version.
820 	 */
821 	if (ack->minor_vers >= ver->minor) {
822 		/*
823 		 * Use the minor version specified in the
824 		 * original request.
825 		 */
826 		port->ver.minor = ver->minor;
827 	} else {
828 		/*
829 		 * Use the lower minor version returned in
830 		 * the ack. By definition, all lower minor
831 		 * versions must be supported.
832 		 */
833 		port->ver.minor = ack->minor_vers;
834 	}
835 
836 	port->state = DS_PORT_READY;
837 
838 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: port ready v%d.%d" DS_EOL,
839 	    PORTID(port), port->ver.major, port->ver.minor);
840 
841 	mutex_exit(&port->lock);
842 
843 	/*
844 	 * The port came up, so update all the services
845 	 * with this information. Follow that up with an
846 	 * attempt to register any service that is not
847 	 * already registered.
848 	 */
849 	mutex_enter(&ds_svcs.lock);
850 
851 	(void) ds_walk_svcs(ds_svc_port_up, port);
852 	(void) ds_walk_svcs(ds_svc_register, NULL);
853 
854 	mutex_exit(&ds_svcs.lock);
855 }
856 
857 static void
858 ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len)
859 {
860 	int		idx;
861 	ds_init_nack_t	*nack;
862 	ds_ver_t	*ver;
863 	size_t		explen = DS_MSG_LEN(ds_init_nack_t);
864 
865 	/* sanity check the incoming message */
866 	if (len != explen) {
867 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: invalid message "
868 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
869 		    explen);
870 		return;
871 	}
872 
873 	nack = (ds_init_nack_t *)(buf + DS_HDR_SZ);
874 
875 	mutex_enter(&port->lock);
876 
877 	if (port->state != DS_PORT_INIT_REQ) {
878 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: invalid state: %d"
879 		    DS_EOL, PORTID(port), port->state);
880 		mutex_exit(&port->lock);
881 		return;
882 	}
883 
884 	ver = &(ds_vers[port->ver_idx]);
885 
886 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: req=v%d.%d, nack=v%d.x"
887 	    DS_EOL, PORTID(port), ver->major, ver->minor, nack->major_vers);
888 
889 	if (nack->major_vers == 0) {
890 		/* no supported protocol version */
891 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS not supported"
892 		    DS_EOL, PORTID(port));
893 		mutex_exit(&port->lock);
894 		return;
895 	}
896 
897 	/*
898 	 * Walk the version list, looking for a major version
899 	 * that is as close to the requested major version as
900 	 * possible.
901 	 */
902 	for (idx = port->ver_idx; idx < DS_NUM_VER; idx++) {
903 		if (ds_vers[idx].major <= nack->major_vers) {
904 			/* found a version to try */
905 			goto done;
906 		}
907 	}
908 
909 	if (idx == DS_NUM_VER) {
910 		/* no supported version */
911 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS v%d.x not "
912 		    "supported" DS_EOL, PORTID(port), nack->major_vers);
913 
914 		mutex_exit(&port->lock);
915 		return;
916 	}
917 
918 done:
919 	/* start the handshake again */
920 	port->ver_idx = idx;
921 	port->state = DS_PORT_LDC_INIT;
922 	mutex_exit(&port->lock);
923 
924 	ds_send_init_req(port);
925 
926 }
927 
928 static ds_svc_t *
929 ds_find_svc_by_id_port(char *svc_id, int is_client, ds_port_t *port)
930 {
931 	int		idx;
932 	ds_svc_t	*svc, *found_svc = 0;
933 	uint32_t	flag_match = is_client ? DSSF_ISCLIENT : 0;
934 
935 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
936 
937 	/* walk every table entry */
938 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
939 		svc = ds_svcs.tbl[idx];
940 		if (DS_SVC_ISFREE(svc))
941 			continue;
942 		if (strcmp(svc->cap.svc_id, svc_id) != 0)
943 			continue;
944 		if ((svc->flags & DSSF_ISCLIENT) != flag_match)
945 			continue;
946 		if (port != NULL && svc->port == port) {
947 			return (svc);
948 		} else if (svc->state == DS_SVC_INACTIVE) {
949 			found_svc = svc;
950 		} else if (!found_svc) {
951 			found_svc = svc;
952 		}
953 	}
954 
955 	return (found_svc);
956 }
957 
958 static void
959 ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len)
960 {
961 	ds_reg_req_t	*req;
962 	ds_hdr_t	*hdr;
963 	ds_reg_ack_t	*ack;
964 	ds_reg_nack_t	*nack;
965 	char		*msg;
966 	size_t		msglen;
967 	size_t		explen = DS_MSG_LEN(ds_reg_req_t);
968 	ds_svc_t	*svc = NULL;
969 	ds_ver_t	version;
970 	uint16_t	new_major;
971 	uint16_t	new_minor;
972 	boolean_t	match;
973 
974 	/* sanity check the incoming message */
975 	if (len < explen) {
976 		cmn_err(CE_WARN, "ds@%lx: <reg_req: invalid message "
977 		    "length (%ld), expected at least %ld" DS_EOL,
978 		    PORTID(port), len, explen);
979 		return;
980 	}
981 
982 	req = (ds_reg_req_t *)(buf + DS_HDR_SZ);
983 
984 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' ver=%d.%d, hdl=0x%llx"
985 	    DS_EOL, PORTID(port), req->svc_id, req->major_vers, req->minor_vers,
986 	    (u_longlong_t)req->svc_handle);
987 
988 	mutex_enter(&ds_svcs.lock);
989 	svc = ds_find_svc_by_id_port(req->svc_id,
990 	    DS_HDL_ISCLIENT(req->svc_handle) == 0, port);
991 	if (svc == NULL) {
992 
993 do_reg_nack:
994 		mutex_exit(&ds_svcs.lock);
995 
996 		msglen = DS_MSG_LEN(ds_reg_nack_t);
997 		msg = DS_MALLOC(msglen);
998 
999 		hdr = (ds_hdr_t *)msg;
1000 		hdr->msg_type = DS_REG_NACK;
1001 		hdr->payload_len = sizeof (ds_reg_nack_t);
1002 
1003 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1004 		nack->svc_handle = req->svc_handle;
1005 		nack->result = DS_REG_VER_NACK;
1006 		nack->major_vers = 0;
1007 
1008 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s'" DS_EOL,
1009 		    PORTID(port), req->svc_id);
1010 		/*
1011 		 * Send the response
1012 		 */
1013 		(void) ds_send_msg(port, msg, msglen);
1014 		DS_FREE(msg, msglen);
1015 		return;
1016 	}
1017 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' found, hdl: 0x%llx" DS_EOL,
1018 	    PORTID(port), req->svc_id, (u_longlong_t)svc->hdl);
1019 
1020 	/*
1021 	 * A client sends out a reg req in order to force service providers to
1022 	 * initiate a reg req from their end (limitation in the protocol).  We
1023 	 * expect the service provider to be in the inactive (DS_SVC_INACTIVE)
1024 	 * state.  If the service provider has already sent out a reg req (the
1025 	 * state is DS_SVC_REG_PENDING) or has already handshaken (the
1026 	 * state is DS_SVC_ACTIVE), then we can simply ignore this reg
1027 	 * req.  For any other state, we force an unregister before initiating
1028 	 * a reg req.
1029 	 */
1030 
1031 	if (DS_HDL_ISCLIENT(req->svc_handle)) {
1032 		switch (svc->state) {
1033 
1034 		case DS_SVC_REG_PENDING:
1035 		case DS_SVC_ACTIVE:
1036 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1037 			    "client, state (%x)" DS_EOL, PORTID(port),
1038 			    req->svc_id, svc->state);
1039 			mutex_exit(&ds_svcs.lock);
1040 			return;
1041 
1042 		case DS_SVC_INACTIVE:
1043 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1044 			    "client" DS_EOL, PORTID(port), req->svc_id);
1045 			break;
1046 
1047 		default:
1048 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1049 			    "client forced unreg, state (%x)" DS_EOL,
1050 			    PORTID(port), req->svc_id, svc->state);
1051 			(void) ds_svc_unregister(svc, port);
1052 			break;
1053 		}
1054 		(void) ds_svc_port_up(svc, port);
1055 		(void) ds_svc_register_onport(svc, port);
1056 		mutex_exit(&ds_svcs.lock);
1057 		return;
1058 	}
1059 
1060 	/*
1061 	 * Only remote service providers can initiate a registration.  The
1062 	 * local sevice from here must be a client service.
1063 	 */
1064 
1065 	match = negotiate_version(svc->cap.nvers, svc->cap.vers,
1066 	    req->major_vers, &new_major, &new_minor);
1067 
1068 	/*
1069 	 * Check version info. ACK only if the major numbers exactly
1070 	 * match. The service entity can retry with a new minor
1071 	 * based on the response sent as part of the NACK.
1072 	 */
1073 	if (match) {
1074 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' svc%d: state: %x "
1075 		    "svc_portid: %d" DS_EOL, PORTID(port), req->svc_id,
1076 		    (int)DS_HDL2IDX(svc->hdl), svc->state,
1077 		    (int)(svc->port == NULL ? -1 : PORTID(svc->port)));
1078 		/*
1079 		 * If the current local service is already in use and
1080 		 * it's not on this port, clone it.
1081 		 */
1082 		if (svc->state != DS_SVC_INACTIVE) {
1083 			if (svc->port != NULL && port == svc->port) {
1084 				/*
1085 				 * Someone probably dropped an unreg req
1086 				 * somewhere.  Force a local unreg.
1087 				 */
1088 				(void) ds_svc_unregister(svc, port);
1089 			} else if (!DS_HDL_ISCLIENT(svc->hdl)) {
1090 				/*
1091 				 * Can't clone a non-client (service provider)
1092 				 * handle.  This is because old in-kernel
1093 				 * service providers can't deal with multiple
1094 				 * handles.
1095 				 */
1096 				goto do_reg_nack;
1097 			} else {
1098 				svc = ds_svc_clone(svc);
1099 			}
1100 		}
1101 		svc->port = port;
1102 		svc->svc_hdl = req->svc_handle;
1103 		svc->state = DS_SVC_ACTIVE;
1104 
1105 		msglen = DS_MSG_LEN(ds_reg_ack_t);
1106 		msg = DS_MALLOC(msglen);
1107 
1108 		hdr = (ds_hdr_t *)msg;
1109 		hdr->msg_type = DS_REG_ACK;
1110 		hdr->payload_len = sizeof (ds_reg_ack_t);
1111 
1112 		ack = (ds_reg_ack_t *)(msg + DS_HDR_SZ);
1113 		ack->svc_handle = req->svc_handle;
1114 		ack->minor_vers = MIN(new_minor, req->minor_vers);
1115 
1116 
1117 		if (svc->ops.ds_reg_cb) {
1118 			/* Call the registration callback */
1119 			version.major = req->major_vers;
1120 			version.minor = ack->minor_vers;
1121 			(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &version,
1122 			    svc->hdl);
1123 		}
1124 		mutex_exit(&ds_svcs.lock);
1125 
1126 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_ack>: '%s' minor=0x%04X"
1127 		    DS_EOL, PORTID(port), svc->cap.svc_id,
1128 		    MIN(new_minor, req->minor_vers));
1129 	} else {
1130 		mutex_exit(&ds_svcs.lock);
1131 
1132 		msglen = DS_MSG_LEN(ds_reg_nack_t);
1133 		msg = DS_MALLOC(msglen);
1134 
1135 		hdr = (ds_hdr_t *)msg;
1136 		hdr->msg_type = DS_REG_NACK;
1137 		hdr->payload_len = sizeof (ds_reg_nack_t);
1138 
1139 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1140 		nack->svc_handle = req->svc_handle;
1141 		nack->result = DS_REG_VER_NACK;
1142 		nack->major_vers = new_major;
1143 
1144 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s' major=0x%04X"
1145 		    DS_EOL, PORTID(port), svc->cap.svc_id, new_major);
1146 	}
1147 
1148 	/* send message */
1149 	(void) ds_send_msg(port, msg, msglen);
1150 	DS_FREE(msg, msglen);
1151 }
1152 
1153 static void
1154 ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len)
1155 {
1156 	ds_reg_ack_t	*ack;
1157 	ds_ver_t	*ver;
1158 	ds_ver_t	tmpver;
1159 	ds_svc_t	*svc;
1160 	size_t		explen = DS_MSG_LEN(ds_reg_ack_t);
1161 
1162 	/* sanity check the incoming message */
1163 	if (len != explen) {
1164 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid message "
1165 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1166 		    explen);
1167 		return;
1168 	}
1169 
1170 	ack = (ds_reg_ack_t *)(buf + DS_HDR_SZ);
1171 
1172 	mutex_enter(&ds_svcs.lock);
1173 
1174 	/*
1175 	 * This searches for service based on how we generate handles
1176 	 * and so only works because this is a reg ack.
1177 	 */
1178 	if (DS_HDL_ISCLIENT(ack->svc_handle) ||
1179 	    (svc = ds_get_svc(ack->svc_handle)) == NULL) {
1180 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid handle 0x%llx"
1181 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1182 		goto done;
1183 	}
1184 
1185 	/* make sure the message makes sense */
1186 	if (svc->state != DS_SVC_REG_PENDING) {
1187 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid state (%d)" DS_EOL,
1188 		    PORTID(port), svc->state);
1189 		goto done;
1190 	}
1191 
1192 	ver = &(svc->cap.vers[svc->ver_idx]);
1193 
1194 	/* major version has been agreed upon */
1195 	svc->ver.major = ver->major;
1196 
1197 	if (ack->minor_vers >= ver->minor) {
1198 		/*
1199 		 * Use the minor version specified in the
1200 		 * original request.
1201 		 */
1202 		svc->ver.minor = ver->minor;
1203 	} else {
1204 		/*
1205 		 * Use the lower minor version returned in
1206 		 * the ack. By defninition, all lower minor
1207 		 * versions must be supported.
1208 		 */
1209 		svc->ver.minor = ack->minor_vers;
1210 	}
1211 
1212 	svc->state = DS_SVC_ACTIVE;
1213 	svc->port = port;
1214 
1215 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_ack: '%s' v%d.%d ready, hdl=0x%llx"
1216 	    DS_EOL, PORTID(port), svc->cap.svc_id, svc->ver.major,
1217 	    svc->ver.minor, (u_longlong_t)svc->hdl);
1218 
1219 	/* notify the client that registration is complete */
1220 	if (svc->ops.ds_reg_cb) {
1221 		/*
1222 		 * Use a temporary version structure so that
1223 		 * the copy in the svc structure cannot be
1224 		 * modified by the client.
1225 		 */
1226 		tmpver.major = svc->ver.major;
1227 		tmpver.minor = svc->ver.minor;
1228 
1229 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &tmpver, svc->hdl);
1230 	}
1231 
1232 done:
1233 	mutex_exit(&ds_svcs.lock);
1234 }
1235 
1236 static void
1237 ds_try_next_port(ds_svc_t *svc, int portid)
1238 {
1239 	ds_port_t *port;
1240 	ds_portset_t totry;
1241 	int i;
1242 
1243 	DS_DBG_LDC(CE_NOTE, "ds@%x %s" DS_EOL, portid, __func__);
1244 
1245 	/*
1246 	 * Get the ports that haven't been tried yet and are available to try.
1247 	 */
1248 	DS_PORTSET_SETNULL(totry);
1249 	for (i = 0; i < DS_MAX_PORTS; i++) {
1250 		if (!DS_PORT_IN_SET(svc->tried, i) &&
1251 		    DS_PORT_IN_SET(svc->avail, i))
1252 			DS_PORTSET_ADD(totry, i);
1253 	}
1254 
1255 	if (DS_PORTSET_ISNULL(totry))
1256 		return;
1257 
1258 	for (i = 0; i < DS_MAX_PORTS; i++, portid++) {
1259 		if (portid >= DS_MAX_PORTS) {
1260 			portid = 0;
1261 		}
1262 
1263 		/*
1264 		 * If the port is not in the available list,
1265 		 * it is not a candidate for registration.
1266 		 */
1267 		if (!DS_PORT_IN_SET(totry, portid)) {
1268 			continue;
1269 		}
1270 
1271 		port = &ds_ports[portid];
1272 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s trying ldc.id: %d" DS_EOL,
1273 		    portid, __func__, (uint_t)(port->ldc.id));
1274 		if (ds_send_reg_req(svc, port) == 0) {
1275 			DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send OK" DS_EOL,
1276 			    portid, __func__);
1277 			/* register sent successfully */
1278 			break;
1279 		}
1280 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send FAIL" DS_EOL,
1281 		    portid, __func__);
1282 
1283 		/* reset the service to try the next port */
1284 		ds_reset_svc(svc, port);
1285 	}
1286 }
1287 
1288 static void
1289 ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len)
1290 {
1291 	ds_reg_nack_t	*nack;
1292 	ds_svc_t	*svc;
1293 	int		idx;
1294 	size_t		explen = DS_MSG_LEN(ds_reg_nack_t);
1295 
1296 	/* sanity check the incoming message */
1297 	if (len != explen) {
1298 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid message "
1299 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1300 		    explen);
1301 		return;
1302 	}
1303 
1304 	nack = (ds_reg_nack_t *)(buf + DS_HDR_SZ);
1305 
1306 	mutex_enter(&ds_svcs.lock);
1307 
1308 	/*
1309 	 * We expect a reg_nack for a client ping.
1310 	 */
1311 	if (DS_HDL_ISCLIENT(nack->svc_handle)) {
1312 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: ping hdl: 0x%llx"
1313 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1314 		goto done;
1315 	}
1316 
1317 	/*
1318 	 * This searches for service based on how we generate handles
1319 	 * and so only works because this is a reg nack.
1320 	 */
1321 	if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1322 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid handle 0x%llx"
1323 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1324 		goto done;
1325 	}
1326 
1327 	/* make sure the message makes sense */
1328 	if (svc->state != DS_SVC_REG_PENDING) {
1329 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid state (%d)" DS_EOL,
1330 		    PORTID(port), svc->state);
1331 		goto done;
1332 	}
1333 
1334 	if (nack->result == DS_REG_DUP) {
1335 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: duplicate registration "
1336 		    " for %s" DS_EOL, PORTID(port), svc->cap.svc_id);
1337 		ds_reset_svc(svc, port);
1338 		goto done;
1339 	}
1340 
1341 	/*
1342 	 * A major version of zero indicates that the
1343 	 * service is not supported at all.
1344 	 */
1345 	if (nack->major_vers == 0) {
1346 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' not supported"
1347 		    DS_EOL, PORTID(port), svc->cap.svc_id);
1348 		ds_reset_svc(svc, port);
1349 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1350 			ds_try_next_port(svc, PORTID(port) + 1);
1351 		goto done;
1352 	}
1353 
1354 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' hdl=0x%llx, nack=%d.x"
1355 	    DS_EOL, PORTID(port), svc->cap.svc_id,
1356 	    (u_longlong_t)nack->svc_handle, nack->major_vers);
1357 
1358 	/*
1359 	 * Walk the version list for the service, looking for
1360 	 * a major version that is as close to the requested
1361 	 * major version as possible.
1362 	 */
1363 	for (idx = svc->ver_idx; idx < svc->cap.nvers; idx++) {
1364 		if (svc->cap.vers[idx].major <= nack->major_vers) {
1365 			/* found a version to try */
1366 			break;
1367 		}
1368 	}
1369 
1370 	if (idx == svc->cap.nvers) {
1371 		/* no supported version */
1372 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: %s v%d.x not supported"
1373 		    DS_EOL, PORTID(port), svc->cap.svc_id, nack->major_vers);
1374 		ds_reset_svc(svc, port);
1375 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1376 			ds_try_next_port(svc, PORTID(port) + 1);
1377 		goto done;
1378 	}
1379 
1380 	/* start the handshake again */
1381 	svc->state = DS_SVC_INACTIVE;
1382 	svc->ver_idx = idx;
1383 
1384 	(void) ds_svc_register(svc, NULL);
1385 
1386 done:
1387 	mutex_exit(&ds_svcs.lock);
1388 }
1389 
1390 static void
1391 ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len)
1392 {
1393 	ds_hdr_t	*hdr;
1394 	ds_unreg_req_t	*req;
1395 	ds_unreg_ack_t	*ack;
1396 	ds_svc_t	*svc;
1397 	char		*msg;
1398 	size_t		msglen;
1399 	size_t		explen = DS_MSG_LEN(ds_unreg_req_t);
1400 
1401 	/* sanity check the incoming message */
1402 	if (len != explen) {
1403 		cmn_err(CE_WARN, "ds@%lx: <unreg_req: invalid message "
1404 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1405 		    explen);
1406 		return;
1407 	}
1408 
1409 	req = (ds_unreg_req_t *)(buf + DS_HDR_SZ);
1410 
1411 	mutex_enter(&ds_svcs.lock);
1412 
1413 	/* lookup appropriate client or service */
1414 	if (DS_HDL_ISCLIENT(req->svc_handle) ||
1415 	    ((svc = ds_find_clnt_svc_by_hdl_port(req->svc_handle, port))
1416 	    == NULL && ((svc = ds_get_svc(req->svc_handle)) == NULL ||
1417 	    svc->port != port))) {
1418 		cmn_err(CE_WARN, "ds@%lx: <unreg_req: invalid handle 0x%llx"
1419 		    DS_EOL, PORTID(port), (u_longlong_t)req->svc_handle);
1420 		ds_send_unreg_nack(port, req->svc_handle);
1421 		mutex_exit(&ds_svcs.lock);
1422 		return;
1423 	}
1424 
1425 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_req: '%s' handle 0x%llx" DS_EOL,
1426 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1427 
1428 	(void) ds_svc_unregister(svc, svc->port);
1429 
1430 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_ack>: '%s' hdl=0x%llx" DS_EOL,
1431 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1432 
1433 	ds_check_for_dup_services(svc);
1434 
1435 	mutex_exit(&ds_svcs.lock);
1436 
1437 	msglen = DS_HDR_SZ + sizeof (ds_unreg_ack_t);
1438 	msg = DS_MALLOC(msglen);
1439 
1440 	hdr = (ds_hdr_t *)msg;
1441 	hdr->msg_type = DS_UNREG_ACK;
1442 	hdr->payload_len = sizeof (ds_unreg_ack_t);
1443 
1444 	ack = (ds_unreg_ack_t *)(msg + DS_HDR_SZ);
1445 	ack->svc_handle = req->svc_handle;
1446 
1447 	/* send message */
1448 	(void) ds_send_msg(port, msg, msglen);
1449 	DS_FREE(msg, msglen);
1450 
1451 }
1452 
1453 static void
1454 ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len)
1455 {
1456 	ds_unreg_ack_t	*ack;
1457 	size_t		explen = DS_MSG_LEN(ds_unreg_ack_t);
1458 
1459 	/* sanity check the incoming message */
1460 	if (len != explen) {
1461 		cmn_err(CE_WARN, "ds@%lx: <unreg_ack: invalid message "
1462 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1463 		    explen);
1464 		return;
1465 	}
1466 
1467 	ack = (ds_unreg_ack_t *)(buf + DS_HDR_SZ);
1468 
1469 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: hdl=0x%llx" DS_EOL,
1470 	    PORTID(port), (u_longlong_t)ack->svc_handle);
1471 
1472 #ifdef DEBUG
1473 	mutex_enter(&ds_svcs.lock);
1474 
1475 	/*
1476 	 * Since the unregister request was initiated locally,
1477 	 * the service structure has already been torn down.
1478 	 * Just perform a sanity check to make sure the message
1479 	 * is appropriate.
1480 	 */
1481 	if (ds_get_svc(ack->svc_handle) != NULL) {
1482 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: handle 0x%llx in use"
1483 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1484 	}
1485 
1486 	mutex_exit(&ds_svcs.lock);
1487 #endif	/* DEBUG */
1488 }
1489 
1490 static void
1491 ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len)
1492 {
1493 	ds_unreg_nack_t	*nack;
1494 	size_t		explen = DS_MSG_LEN(ds_unreg_nack_t);
1495 
1496 	/* sanity check the incoming message */
1497 	if (len != explen) {
1498 		cmn_err(CE_WARN, "ds@%lx: <unreg_nack: invalid message "
1499 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1500 		    explen);
1501 		return;
1502 	}
1503 
1504 	nack = (ds_unreg_nack_t *)(buf + DS_HDR_SZ);
1505 
1506 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: hdl=0x%llx" DS_EOL,
1507 	    PORTID(port), (u_longlong_t)nack->svc_handle);
1508 
1509 #ifdef DEBUG
1510 	mutex_enter(&ds_svcs.lock);
1511 
1512 	/*
1513 	 * Since the unregister request was initiated locally,
1514 	 * the service structure has already been torn down.
1515 	 * Just perform a sanity check to make sure the message
1516 	 * is appropriate.
1517 	 */
1518 	if (ds_get_svc(nack->svc_handle) != NULL) {
1519 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: handle 0x%llx in use"
1520 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1521 	}
1522 
1523 	mutex_exit(&ds_svcs.lock);
1524 #endif	/* DEBUG */
1525 }
1526 
1527 static void
1528 ds_handle_data(ds_port_t *port, caddr_t buf, size_t len)
1529 {
1530 	ds_data_handle_t	*data;
1531 	ds_svc_t		*svc;
1532 	char			*msg;
1533 	int			msgsz;
1534 	int			hdrsz;
1535 	size_t			explen = DS_MSG_LEN(ds_data_handle_t);
1536 
1537 	/* sanity check the incoming message */
1538 	if (len < explen) {
1539 		cmn_err(CE_WARN, "ds@%lx: <data: invalid message length "
1540 		    "(%ld), expected at least %ld" DS_EOL, PORTID(port), len,
1541 		    explen);
1542 		return;
1543 	}
1544 
1545 	data = (ds_data_handle_t *)(buf + DS_HDR_SZ);
1546 
1547 	hdrsz = DS_HDR_SZ + sizeof (ds_data_handle_t);
1548 	msgsz = len - hdrsz;
1549 
1550 	/* strip off the header for the client */
1551 	msg = (msgsz) ? (buf + hdrsz) : NULL;
1552 
1553 	mutex_enter(&ds_svcs.lock);
1554 
1555 	if ((svc = ds_find_clnt_svc_by_hdl_port(data->svc_handle, port))
1556 	    == NULL) {
1557 		if ((svc = ds_get_svc(data->svc_handle)) == NULL) {
1558 			mutex_exit(&ds_svcs.lock);
1559 			cmn_err(CE_WARN, "ds@%lx: <data: invalid handle 0x%llx"
1560 			    DS_EOL, PORTID(port),
1561 			    (u_longlong_t)data->svc_handle);
1562 			ds_send_data_nack(port, data->svc_handle);
1563 			return;
1564 		}
1565 	}
1566 
1567 	mutex_exit(&ds_svcs.lock);
1568 
1569 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <data: '%s' hdl=0x%llx" DS_EOL,
1570 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)svc->hdl);
1571 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msgsz);
1572 
1573 	/* dispatch this message to the client */
1574 	(*svc->ops.ds_data_cb)(svc->ops.cb_arg, msg, msgsz);
1575 }
1576 
1577 static void
1578 ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len)
1579 {
1580 	ds_svc_t	*svc;
1581 	ds_data_nack_t	*nack;
1582 	size_t		explen = DS_MSG_LEN(ds_data_nack_t);
1583 
1584 	/* sanity check the incoming message */
1585 	if (len != explen) {
1586 		cmn_err(CE_WARN, "ds@%lx: <data_nack: invalid message "
1587 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1588 		    explen);
1589 		return;
1590 	}
1591 
1592 	nack = (ds_data_nack_t *)(buf + DS_HDR_SZ);
1593 
1594 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack: hdl=0x%llx, result=0x%llx"
1595 	    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle,
1596 	    (u_longlong_t)nack->result);
1597 
1598 	if (nack->result == DS_INV_HDL) {
1599 
1600 		mutex_enter(&ds_svcs.lock);
1601 
1602 		if ((svc = ds_find_clnt_svc_by_hdl_port(nack->svc_handle,
1603 		    port)) == NULL) {
1604 			if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1605 				mutex_exit(&ds_svcs.lock);
1606 				return;
1607 			}
1608 		}
1609 
1610 		cmn_err(CE_WARN, "ds@%lx: <data_nack: handle 0x%llx reported "
1611 		    " as invalid" DS_EOL, PORTID(port),
1612 		    (u_longlong_t)nack->svc_handle);
1613 
1614 		(void) ds_svc_unregister(svc, svc->port);
1615 
1616 		mutex_exit(&ds_svcs.lock);
1617 	}
1618 }
1619 
1620 /* Initialize the port */
1621 void
1622 ds_send_init_req(ds_port_t *port)
1623 {
1624 	ds_hdr_t	*hdr;
1625 	ds_init_req_t	*init_req;
1626 	size_t		msglen;
1627 	ds_ver_t	*vers = &ds_vers[port->ver_idx];
1628 
1629 	mutex_enter(&port->lock);
1630 	if (port->state != DS_PORT_LDC_INIT) {
1631 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: invalid state: %d"
1632 		    DS_EOL, PORTID(port), port->state);
1633 		mutex_exit(&port->lock);
1634 		return;
1635 	}
1636 	mutex_exit(&port->lock);
1637 
1638 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: req=v%d.%d" DS_EOL,
1639 	    PORTID(port), vers->major, vers->minor);
1640 
1641 	msglen = DS_HDR_SZ + sizeof (ds_init_req_t);
1642 	hdr = DS_MALLOC(msglen);
1643 
1644 	hdr->msg_type = DS_INIT_REQ;
1645 	hdr->payload_len = sizeof (ds_init_req_t);
1646 
1647 	init_req = (ds_init_req_t *)((caddr_t)hdr + DS_HDR_SZ);
1648 	init_req->major_vers = vers->major;
1649 	init_req->minor_vers = vers->minor;
1650 
1651 	if (ds_send_msg(port, (caddr_t)hdr, msglen) == 0) {
1652 		/*
1653 		 * We've left the port state unlocked over the malloc/send,
1654 		 * make sure no one has changed the state under us before
1655 		 * we update the state.
1656 		 */
1657 		mutex_enter(&port->lock);
1658 		if (port->state == DS_PORT_LDC_INIT)
1659 			port->state = DS_PORT_INIT_REQ;
1660 		mutex_exit(&port->lock);
1661 	}
1662 	DS_FREE(hdr, msglen);
1663 }
1664 
1665 static int
1666 ds_send_reg_req(ds_svc_t *svc, ds_port_t *port)
1667 {
1668 	ds_ver_t	*ver;
1669 	ds_hdr_t	*hdr;
1670 	caddr_t		msg;
1671 	size_t		msglen;
1672 	ds_reg_req_t	*req;
1673 	size_t		idlen;
1674 	int		rv;
1675 
1676 	if ((svc->state != DS_SVC_INACTIVE) &&
1677 	    ((svc->flags & DSSF_ISCLIENT) == 0)) {
1678 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: invalid svc state (%d) "
1679 		    "for svc '%s'" DS_EOL, PORTID(port), svc->state,
1680 		    svc->cap.svc_id);
1681 		return (-1);
1682 	}
1683 
1684 	mutex_enter(&port->lock);
1685 
1686 	/* check on the LDC to Zeus */
1687 	if (port->ldc.state != LDC_UP) {
1688 		/* can not send message */
1689 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: channel %ld is not up"
1690 		    DS_EOL, PORTID(port), port->ldc.id);
1691 		mutex_exit(&port->lock);
1692 		return (-1);
1693 	}
1694 
1695 	/* make sure port is ready */
1696 	if (port->state != DS_PORT_READY) {
1697 		/* can not send message */
1698 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: port is not ready"
1699 		    DS_EOL, PORTID(port));
1700 		mutex_exit(&port->lock);
1701 		return (-1);
1702 	}
1703 
1704 	mutex_exit(&port->lock);
1705 
1706 	/* allocate the message buffer */
1707 	idlen = strlen(svc->cap.svc_id);
1708 	msglen = DS_HDR_SZ + sizeof (ds_reg_req_t) + idlen;
1709 	msg = DS_MALLOC(msglen);
1710 
1711 	/* copy in the header data */
1712 	hdr = (ds_hdr_t *)msg;
1713 	hdr->msg_type = DS_REG_REQ;
1714 	hdr->payload_len = sizeof (ds_reg_req_t) + idlen;
1715 
1716 	req = (ds_reg_req_t *)(msg + DS_HDR_SZ);
1717 	req->svc_handle = svc->hdl;
1718 	ver = &(svc->cap.vers[svc->ver_idx]);
1719 	req->major_vers = ver->major;
1720 	req->minor_vers = ver->minor;
1721 
1722 	/* copy in the service id */
1723 	(void) memcpy(req->svc_id, svc->cap.svc_id, idlen + 1);
1724 
1725 	/* send the message */
1726 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: '%s' ver=%d.%d, hdl=0x%llx"
1727 	    DS_EOL, PORTID(port), svc->cap.svc_id, ver->major, ver->minor,
1728 	    (u_longlong_t)svc->hdl);
1729 
1730 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1731 		svc->port = port;
1732 		rv = -1;
1733 	} else if ((svc->flags & DSSF_ISCLIENT) == 0) {
1734 		svc->state = DS_SVC_REG_PENDING;
1735 	}
1736 	DS_FREE(msg, msglen);
1737 
1738 	return (rv);
1739 }
1740 
1741 /*
1742  * Keep around in case we want this later
1743  */
1744 int
1745 ds_send_unreg_req(ds_svc_t *svc)
1746 {
1747 	caddr_t		msg;
1748 	size_t		msglen;
1749 	ds_hdr_t	*hdr;
1750 	ds_unreg_req_t	*req;
1751 	ds_port_t	*port = svc->port;
1752 	int		rv;
1753 
1754 	if (port == NULL) {
1755 		DS_DBG(CE_NOTE, "send_unreg_req: service '%s' not "
1756 		    "associated with a port" DS_EOL, svc->cap.svc_id);
1757 		return (-1);
1758 	}
1759 
1760 	mutex_enter(&port->lock);
1761 
1762 	/* check on the LDC to Zeus */
1763 	if (port->ldc.state != LDC_UP) {
1764 		/* can not send message */
1765 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: channel %ld is not up"
1766 		    DS_EOL, PORTID(port), port->ldc.id);
1767 		mutex_exit(&port->lock);
1768 		return (-1);
1769 	}
1770 
1771 	/* make sure port is ready */
1772 	if (port->state != DS_PORT_READY) {
1773 		/* can not send message */
1774 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: port is not ready" DS_EOL,
1775 		    PORTID(port));
1776 		mutex_exit(&port->lock);
1777 		return (-1);
1778 	}
1779 
1780 	mutex_exit(&port->lock);
1781 
1782 	msglen = DS_HDR_SZ + sizeof (ds_unreg_req_t);
1783 	msg = DS_MALLOC(msglen);
1784 
1785 	/* copy in the header data */
1786 	hdr = (ds_hdr_t *)msg;
1787 	hdr->msg_type = DS_UNREG;
1788 	hdr->payload_len = sizeof (ds_unreg_req_t);
1789 
1790 	req = (ds_unreg_req_t *)(msg + DS_HDR_SZ);
1791 	if (svc->flags & DSSF_ISCLIENT) {
1792 		req->svc_handle = svc->svc_hdl;
1793 	} else {
1794 		req->svc_handle = svc->hdl;
1795 	}
1796 
1797 	/* send the message */
1798 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_req>: '%s' hdl=0x%llx" DS_EOL,
1799 	    PORTID(port), (svc->cap.svc_id) ? svc->cap.svc_id : "NULL",
1800 	    (u_longlong_t)svc->hdl);
1801 
1802 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1803 		rv = -1;
1804 	}
1805 	DS_FREE(msg, msglen);
1806 
1807 	return (rv);
1808 }
1809 
1810 static void
1811 ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1812 {
1813 	caddr_t		msg;
1814 	size_t		msglen;
1815 	ds_hdr_t	*hdr;
1816 	ds_unreg_nack_t	*nack;
1817 
1818 	mutex_enter(&port->lock);
1819 
1820 	/* check on the LDC to Zeus */
1821 	if (port->ldc.state != LDC_UP) {
1822 		/* can not send message */
1823 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: channel %ld is not up"
1824 		    DS_EOL, PORTID(port), port->ldc.id);
1825 		mutex_exit(&port->lock);
1826 		return;
1827 	}
1828 
1829 	/* make sure port is ready */
1830 	if (port->state != DS_PORT_READY) {
1831 		/* can not send message */
1832 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: port is not ready"
1833 		    DS_EOL, PORTID(port));
1834 		mutex_exit(&port->lock);
1835 		return;
1836 	}
1837 
1838 	mutex_exit(&port->lock);
1839 
1840 	msglen = DS_HDR_SZ + sizeof (ds_unreg_nack_t);
1841 	msg = DS_MALLOC(msglen);
1842 
1843 	/* copy in the header data */
1844 	hdr = (ds_hdr_t *)msg;
1845 	hdr->msg_type = DS_UNREG_NACK;
1846 	hdr->payload_len = sizeof (ds_unreg_nack_t);
1847 
1848 	nack = (ds_unreg_nack_t *)(msg + DS_HDR_SZ);
1849 	nack->svc_handle = bad_hdl;
1850 
1851 	/* send the message */
1852 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_nack>: hdl=0x%llx" DS_EOL,
1853 	    PORTID(port), (u_longlong_t)bad_hdl);
1854 
1855 	(void) ds_send_msg(port, msg, msglen);
1856 	DS_FREE(msg, msglen);
1857 }
1858 
1859 static void
1860 ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1861 {
1862 	caddr_t		msg;
1863 	size_t		msglen;
1864 	ds_hdr_t	*hdr;
1865 	ds_data_nack_t	*nack;
1866 
1867 	mutex_enter(&port->lock);
1868 
1869 	/* check on the LDC to Zeus */
1870 	if (port->ldc.state != LDC_UP) {
1871 		/* can not send message */
1872 		cmn_err(CE_WARN, "ds@%lx: data_nack>: channel %ld is not up"
1873 		    DS_EOL, PORTID(port), port->ldc.id);
1874 		mutex_exit(&port->lock);
1875 		return;
1876 	}
1877 
1878 	/* make sure port is ready */
1879 	if (port->state != DS_PORT_READY) {
1880 		/* can not send message */
1881 		cmn_err(CE_WARN, "ds@%lx: data_nack>: port is not ready" DS_EOL,
1882 		    PORTID(port));
1883 		mutex_exit(&port->lock);
1884 		return;
1885 	}
1886 
1887 	mutex_exit(&port->lock);
1888 
1889 	msglen = DS_HDR_SZ + sizeof (ds_data_nack_t);
1890 	msg = DS_MALLOC(msglen);
1891 
1892 	/* copy in the header data */
1893 	hdr = (ds_hdr_t *)msg;
1894 	hdr->msg_type = DS_NACK;
1895 	hdr->payload_len = sizeof (ds_data_nack_t);
1896 
1897 	nack = (ds_data_nack_t *)(msg + DS_HDR_SZ);
1898 	nack->svc_handle = bad_hdl;
1899 	nack->result = DS_INV_HDL;
1900 
1901 	/* send the message */
1902 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack>: hdl=0x%llx" DS_EOL,
1903 	    PORTID(port), (u_longlong_t)bad_hdl);
1904 
1905 	(void) ds_send_msg(port, msg, msglen);
1906 	DS_FREE(msg, msglen);
1907 }
1908 
1909 /* END DS PROTOCOL SUPPORT FUNCTIONS */
1910 
1911 #ifdef DEBUG
1912 
1913 #define	BYTESPERLINE	8
1914 #define	LINEWIDTH	((BYTESPERLINE * 3) + (BYTESPERLINE + 2) + 1)
1915 #define	ASCIIOFFSET	((BYTESPERLINE * 3) + 2)
1916 #define	ISPRINT(c)	((c >= ' ') && (c <= '~'))
1917 
1918 /*
1919  * Output a buffer formatted with a set number of bytes on
1920  * each line. Append each line with the ASCII equivalent of
1921  * each byte if it falls within the printable ASCII range,
1922  * and '.' otherwise.
1923  */
1924 void
1925 ds_dump_msg(void *vbuf, size_t len)
1926 {
1927 	int	i, j;
1928 	char	*curr;
1929 	char	*aoff;
1930 	char	line[LINEWIDTH];
1931 	uint8_t	*buf = vbuf;
1932 
1933 	if (len > 128)
1934 		len = 128;
1935 
1936 	/* walk the buffer one line at a time */
1937 	for (i = 0; i < len; i += BYTESPERLINE) {
1938 
1939 		bzero(line, LINEWIDTH);
1940 
1941 		curr = line;
1942 		aoff = line + ASCIIOFFSET;
1943 
1944 		/*
1945 		 * Walk the bytes in the current line, storing
1946 		 * the hex value for the byte as well as the
1947 		 * ASCII representation in a temporary buffer.
1948 		 * All ASCII values are placed at the end of
1949 		 * the line.
1950 		 */
1951 		for (j = 0; (j < BYTESPERLINE) && ((i + j) < len); j++) {
1952 			(void) sprintf(curr, " %02x", buf[i + j]);
1953 			*aoff = (ISPRINT(buf[i + j])) ? buf[i + j] : '.';
1954 			curr += 3;
1955 			aoff++;
1956 		}
1957 
1958 		/*
1959 		 * Fill in to the start of the ASCII translation
1960 		 * with spaces. This will only be necessary if
1961 		 * this is the last line and there are not enough
1962 		 * bytes to fill the whole line.
1963 		 */
1964 		while (curr != (line + ASCIIOFFSET))
1965 			*curr++ = ' ';
1966 
1967 		cmn_err(CE_NOTE, "%s" DS_EOL, line);
1968 	}
1969 }
1970 #endif /* DEBUG */
1971 
1972 
1973 /*
1974  * Walk the table of registered services, executing the specified callback
1975  * function for each service on a port. A non-zero return value from the
1976  * callback is used to terminate the walk, not to indicate an error. Returns
1977  * the index of the last service visited.
1978  */
1979 int
1980 ds_walk_svcs(svc_cb_t svc_cb, void *arg)
1981 {
1982 	int		idx;
1983 	ds_svc_t	*svc;
1984 
1985 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
1986 
1987 	/* walk every table entry */
1988 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
1989 		svc = ds_svcs.tbl[idx];
1990 
1991 		/* execute the callback */
1992 		if ((*svc_cb)(svc, arg) != 0)
1993 			break;
1994 	}
1995 
1996 	return (idx);
1997 }
1998 
1999 static int
2000 ds_svc_isfree(ds_svc_t *svc, void *arg)
2001 {
2002 	_NOTE(ARGUNUSED(arg))
2003 
2004 	/*
2005 	 * Looking for a free service. This may be a NULL entry
2006 	 * in the table, or an unused structure that could be
2007 	 * reused.
2008 	 */
2009 
2010 	if (DS_SVC_ISFREE(svc)) {
2011 		/* yes, it is free */
2012 		return (1);
2013 	}
2014 
2015 	/* not a candidate */
2016 	return (0);
2017 }
2018 
2019 int
2020 ds_svc_ismatch(ds_svc_t *svc, void *arg)
2021 {
2022 	if (DS_SVC_ISFREE(svc)) {
2023 		return (0);
2024 	}
2025 
2026 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2027 	    (svc->flags & DSSF_ISCLIENT) == 0) {
2028 		/* found a match */
2029 		return (1);
2030 	}
2031 
2032 	return (0);
2033 }
2034 
2035 int
2036 ds_svc_clnt_ismatch(ds_svc_t *svc, void *arg)
2037 {
2038 	if (DS_SVC_ISFREE(svc)) {
2039 		return (0);
2040 	}
2041 
2042 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2043 	    (svc->flags & DSSF_ISCLIENT) != 0) {
2044 		/* found a match */
2045 		return (1);
2046 	}
2047 
2048 	return (0);
2049 }
2050 
2051 int
2052 ds_svc_free(ds_svc_t *svc, void *arg)
2053 {
2054 	_NOTE(ARGUNUSED(arg))
2055 
2056 	if (svc == NULL) {
2057 		return (0);
2058 	}
2059 
2060 	if (svc->cap.svc_id) {
2061 		DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
2062 		svc->cap.svc_id = NULL;
2063 	}
2064 
2065 	if (svc->cap.vers) {
2066 		DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
2067 		svc->cap.vers = NULL;
2068 	}
2069 
2070 	DS_FREE(svc, sizeof (ds_svc_t));
2071 
2072 	return (0);
2073 }
2074 
2075 static int
2076 ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port)
2077 {
2078 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2079 
2080 	if (DS_SVC_ISFREE(svc))
2081 		return (0);
2082 
2083 	if (!DS_PORT_IN_SET(svc->avail, PORTID(port)))
2084 		return (0);
2085 
2086 	DS_PORTSET_ADD(svc->tried, PORTID(port));
2087 
2088 	if (ds_send_reg_req(svc, port) == 0) {
2089 		/* register sent successfully */
2090 		return (1);
2091 	}
2092 
2093 	if ((svc->flags & DSSF_ISCLIENT) == 0) {
2094 		/* reset the service */
2095 		ds_reset_svc(svc, port);
2096 	}
2097 	return (0);
2098 }
2099 
2100 int
2101 ds_svc_register(ds_svc_t *svc, void *arg)
2102 {
2103 	_NOTE(ARGUNUSED(arg))
2104 	ds_portset_t ports;
2105 	ds_port_t *port;
2106 	int	idx;
2107 
2108 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2109 
2110 	if (DS_SVC_ISFREE(svc))
2111 		return (0);
2112 
2113 	DS_PORTSET_DUP(ports, svc->avail);
2114 	if (svc->flags & DSSF_ISCLIENT) {
2115 		ds_portset_del_active_clients(svc->cap.svc_id, &ports);
2116 	} else if (svc->state != DS_SVC_INACTIVE)
2117 		return (0);
2118 
2119 	if (DS_PORTSET_ISNULL(ports))
2120 		return (0);
2121 
2122 	/*
2123 	 * Attempt to register the service. Start with the lowest
2124 	 * numbered port and continue until a registration message
2125 	 * is sent successfully, or there are no ports left to try.
2126 	 */
2127 	for (idx = 0; idx < DS_MAX_PORTS; idx++) {
2128 
2129 		/*
2130 		 * If the port is not in the available list,
2131 		 * it is not a candidate for registration.
2132 		 */
2133 		if (!DS_PORT_IN_SET(ports, idx)) {
2134 			continue;
2135 		}
2136 
2137 		port = &ds_ports[idx];
2138 		if (ds_svc_register_onport(svc, port)) {
2139 			if ((svc->flags & DSSF_ISCLIENT) == 0)
2140 				break;
2141 			DS_PORTSET_DEL(svc->avail, idx);
2142 		}
2143 	}
2144 
2145 	return (0);
2146 }
2147 
2148 static int
2149 ds_svc_unregister(ds_svc_t *svc, void *arg)
2150 {
2151 	ds_port_t *port = (ds_port_t *)arg;
2152 	ds_svc_hdl_t hdl;
2153 
2154 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2155 
2156 	if (DS_SVC_ISFREE(svc)) {
2157 		return (0);
2158 	}
2159 
2160 	/* make sure the service is using this port */
2161 	if (svc->port != port) {
2162 		return (0);
2163 	}
2164 
2165 	if (port) {
2166 		DS_DBG(CE_NOTE, "ds@%lx: svc_unreg: id='%s', ver=%d.%d, "
2167 		    " hdl=0x%09lx" DS_EOL, PORTID(port), svc->cap.svc_id,
2168 		    svc->ver.major, svc->ver.minor, svc->hdl);
2169 	} else {
2170 		DS_DBG(CE_NOTE, "port=NULL: svc_unreg: id='%s', ver=%d.%d, "
2171 		    " hdl=0x%09lx" DS_EOL, svc->cap.svc_id, svc->ver.major,
2172 		    svc->ver.minor, svc->hdl);
2173 	}
2174 
2175 	/* reset the service structure */
2176 	ds_reset_svc(svc, port);
2177 
2178 	/* call the client unregister callback */
2179 	if (svc->ops.ds_unreg_cb) {
2180 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
2181 	}
2182 
2183 	/* increment the count in the handle to prevent reuse */
2184 	hdl = DS_ALLOC_HDL(DS_HDL2IDX(svc->hdl), DS_HDL2COUNT(svc->hdl));
2185 	if (DS_HDL_ISCLIENT(svc->hdl)) {
2186 		DS_HDL_SET_ISCLIENT(hdl);
2187 	}
2188 	svc->hdl = hdl;
2189 
2190 	if (svc->state != DS_SVC_UNREG_PENDING) {
2191 		/* try to initiate a new registration */
2192 		(void) ds_svc_register(svc, NULL);
2193 	}
2194 
2195 	return (0);
2196 }
2197 
2198 static int
2199 ds_svc_port_up(ds_svc_t *svc, void *arg)
2200 {
2201 	ds_port_t *port = (ds_port_t *)arg;
2202 
2203 	if (DS_SVC_ISFREE(svc)) {
2204 		/* nothing to do */
2205 		return (0);
2206 	}
2207 
2208 	DS_PORTSET_ADD(svc->avail, port->id);
2209 	DS_PORTSET_DEL(svc->tried, port->id);
2210 
2211 	return (0);
2212 }
2213 
2214 ds_svc_t *
2215 ds_alloc_svc(void)
2216 {
2217 	int		idx;
2218 	uint_t		newmaxsvcs;
2219 	ds_svc_t	**newtbl;
2220 	ds_svc_t	*newsvc;
2221 
2222 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2223 
2224 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2225 
2226 	if (idx != ds_svcs.maxsvcs) {
2227 		goto found;
2228 	}
2229 
2230 	/*
2231 	 * There was no free space in the table. Grow
2232 	 * the table to double its current size.
2233 	 */
2234 	newmaxsvcs = ds_svcs.maxsvcs * 2;
2235 	newtbl = DS_MALLOC(newmaxsvcs * sizeof (ds_svc_t *));
2236 
2237 	/* copy old table data to the new table */
2238 	(void) memcpy(newtbl, ds_svcs.tbl,
2239 	    ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2240 
2241 	/* clean up the old table */
2242 	DS_FREE(ds_svcs.tbl, ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2243 	ds_svcs.tbl = newtbl;
2244 	ds_svcs.maxsvcs = newmaxsvcs;
2245 
2246 	/* search for a free space again */
2247 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2248 
2249 	/* the table is locked so should find a free slot */
2250 	ASSERT(idx != ds_svcs.maxsvcs);
2251 
2252 found:
2253 	/* allocate a new svc structure if necessary */
2254 	if ((newsvc = ds_svcs.tbl[idx]) == NULL) {
2255 		/* allocate a new service */
2256 		newsvc = DS_MALLOC(sizeof (ds_svc_t));
2257 		ds_svcs.tbl[idx] = newsvc;
2258 	}
2259 
2260 	/* fill in the handle */
2261 	newsvc->hdl = DS_ALLOC_HDL(idx, DS_HDL2COUNT(newsvc->hdl));
2262 	newsvc->state = DS_SVC_FREE;	/* Mark as free temporarily */
2263 
2264 	return (newsvc);
2265 }
2266 
2267 static void
2268 ds_reset_svc(ds_svc_t *svc, ds_port_t *port)
2269 {
2270 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2271 
2272 	if (svc->state != DS_SVC_UNREG_PENDING)
2273 		svc->state = DS_SVC_INACTIVE;
2274 	svc->ver_idx = 0;
2275 	svc->ver.major = 0;
2276 	svc->ver.minor = 0;
2277 	svc->port = NULL;
2278 	if (port) {
2279 		DS_PORTSET_DEL(svc->avail, port->id);
2280 	}
2281 }
2282 
2283 ds_svc_t *
2284 ds_get_svc(ds_svc_hdl_t hdl)
2285 {
2286 	int		idx;
2287 	ds_svc_t	*svc;
2288 
2289 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2290 
2291 	if (hdl == DS_INVALID_HDL)
2292 		return (NULL);
2293 
2294 	idx = DS_HDL2IDX(hdl);
2295 
2296 	/* check if index is out of bounds */
2297 	if ((idx < 0) || (idx >= ds_svcs.maxsvcs))
2298 		return (NULL);
2299 
2300 	svc = ds_svcs.tbl[idx];
2301 
2302 	/* check for a valid service */
2303 	if (DS_SVC_ISFREE(svc))
2304 		return (NULL);
2305 
2306 	/* make sure the handle is an exact match */
2307 	if (svc->hdl != hdl)
2308 		return (NULL);
2309 
2310 	return (svc);
2311 }
2312 
2313 static void
2314 ds_port_reset(ds_port_t *port)
2315 {
2316 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2317 	ASSERT(MUTEX_HELD(&port->lock));
2318 
2319 	/* connection went down, mark everything inactive */
2320 	(void) ds_walk_svcs(ds_svc_unregister, port);
2321 
2322 	port->ver_idx = 0;
2323 	port->ver.major = 0;
2324 	port->ver.minor = 0;
2325 	port->state = DS_PORT_LDC_INIT;
2326 }
2327 
2328 /*
2329  * Verify that a version array is sorted as expected for the
2330  * version negotiation to work correctly.
2331  */
2332 ds_vers_check_t
2333 ds_vers_isvalid(ds_ver_t *vers, int nvers)
2334 {
2335 	uint16_t	curr_major;
2336 	uint16_t	curr_minor;
2337 	int		idx;
2338 
2339 	curr_major = vers[0].major;
2340 	curr_minor = vers[0].minor;
2341 
2342 	/*
2343 	 * Walk the version array, verifying correct ordering.
2344 	 * The array must be sorted from highest supported
2345 	 * version to lowest supported version.
2346 	 */
2347 	for (idx = 0; idx < nvers; idx++) {
2348 		if (vers[idx].major > curr_major) {
2349 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2350 			    " increasing major versions" DS_EOL);
2351 			return (DS_VERS_INCREASING_MAJOR_ERR);
2352 		}
2353 
2354 		if (vers[idx].major < curr_major) {
2355 			curr_major = vers[idx].major;
2356 			curr_minor = vers[idx].minor;
2357 			continue;
2358 		}
2359 
2360 		if (vers[idx].minor > curr_minor) {
2361 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2362 			    " increasing minor versions" DS_EOL);
2363 			return (DS_VERS_INCREASING_MINOR_ERR);
2364 		}
2365 
2366 		curr_minor = vers[idx].minor;
2367 	}
2368 
2369 	return (DS_VERS_OK);
2370 }
2371 
2372 /*
2373  * Extended user capability init.
2374  */
2375 int
2376 ds_ucap_init(ds_capability_t *cap, ds_clnt_ops_t *ops, uint32_t flags,
2377     int instance, ds_svc_hdl_t *hdlp)
2378 {
2379 	ds_vers_check_t	status;
2380 	ds_svc_t	*svc;
2381 	int		rv = 0;
2382 	ds_svc_hdl_t 	lb_hdl, hdl;
2383 	int		is_loopback;
2384 	int		is_client;
2385 
2386 	/* sanity check the args */
2387 	if ((cap == NULL) || (ops == NULL)) {
2388 		cmn_err(CE_NOTE, "%s: invalid arguments" DS_EOL, __func__);
2389 		return (EINVAL);
2390 	}
2391 
2392 	/* sanity check the capability specifier */
2393 	if ((cap->svc_id == NULL) || (cap->vers == NULL) || (cap->nvers == 0)) {
2394 		cmn_err(CE_NOTE, "%s: invalid capability specifier" DS_EOL,
2395 		    __func__);
2396 		return (EINVAL);
2397 	}
2398 
2399 	/* sanity check the version array */
2400 	if ((status = ds_vers_isvalid(cap->vers, cap->nvers)) != DS_VERS_OK) {
2401 		cmn_err(CE_NOTE, "%s: invalid capability version array "
2402 		    "for %s service: %s" DS_EOL, __func__, cap->svc_id,
2403 		    (status == DS_VERS_INCREASING_MAJOR_ERR) ?
2404 		    "increasing major versions" :
2405 		    "increasing minor versions");
2406 		return (EINVAL);
2407 	}
2408 
2409 	/* data and register callbacks are required */
2410 	if ((ops->ds_data_cb == NULL) || (ops->ds_reg_cb == NULL)) {
2411 		cmn_err(CE_NOTE, "%s: invalid ops specifier for %s service"
2412 		    DS_EOL, __func__, cap->svc_id);
2413 		return (EINVAL);
2414 	}
2415 
2416 	flags &= DSSF_USERFLAGS;
2417 	is_client = flags & DSSF_ISCLIENT;
2418 
2419 	DS_DBG_USR(CE_NOTE, "%s: svc_id='%s', data_cb=0x%lx, cb_arg=0x%lx"
2420 	    DS_EOL, __func__, cap->svc_id, PTR_TO_LONG(ops->ds_data_cb),
2421 	    PTR_TO_LONG(ops->cb_arg));
2422 
2423 	mutex_enter(&ds_svcs.lock);
2424 
2425 	/* check if the service is already registered */
2426 	if (i_ds_hdl_lookup(cap->svc_id, is_client, NULL, 1) == 1) {
2427 		/* already registered */
2428 		cmn_err(CE_NOTE, "Service '%s'/%s already registered" DS_EOL,
2429 		    cap->svc_id,
2430 		    (flags & DSSF_ISCLIENT) ? "client" : "service");
2431 		mutex_exit(&ds_svcs.lock);
2432 		return (EALREADY);
2433 	}
2434 
2435 	svc = ds_alloc_svc();
2436 	if (is_client) {
2437 		DS_HDL_SET_ISCLIENT(svc->hdl);
2438 	}
2439 
2440 	svc->state = DS_SVC_FREE;
2441 	svc->svc_hdl = DS_BADHDL1;
2442 
2443 	svc->flags = flags;
2444 	svc->drvi = instance;
2445 	svc->drv_psp = NULL;
2446 
2447 	/*
2448 	 * Check for loopback.  "pri" is a legacy service that assumes it
2449 	 * will never use loopback mode.
2450 	 */
2451 	if (strcmp(cap->svc_id, "pri") == 0) {
2452 		is_loopback = 0;
2453 	} else if (i_ds_hdl_lookup(cap->svc_id, is_client == 0, &lb_hdl, 1)
2454 	    == 1) {
2455 		if ((rv = ds_loopback_set_svc(svc, cap, &lb_hdl)) != 0) {
2456 			DS_DBG_USR(CE_NOTE, "%s: ds_loopback_set_svc '%s' err "
2457 			    " (%d)" DS_EOL, __func__, cap->svc_id, rv);
2458 			mutex_exit(&ds_svcs.lock);
2459 			return (rv);
2460 		}
2461 		is_loopback = 1;
2462 	} else
2463 		is_loopback = 0;
2464 
2465 	/* copy over all the client information */
2466 	(void) memcpy(&svc->cap, cap, sizeof (ds_capability_t));
2467 
2468 	/* make a copy of the service name */
2469 	svc->cap.svc_id = ds_strdup(cap->svc_id);
2470 
2471 	/* make a copy of the version array */
2472 	svc->cap.vers = DS_MALLOC(cap->nvers * sizeof (ds_ver_t));
2473 	(void) memcpy(svc->cap.vers, cap->vers, cap->nvers * sizeof (ds_ver_t));
2474 
2475 	/* copy the client ops vector */
2476 	(void) memcpy(&svc->ops, ops, sizeof (ds_clnt_ops_t));
2477 
2478 	svc->state = DS_SVC_INACTIVE;
2479 	svc->ver_idx = 0;
2480 	DS_PORTSET_DUP(svc->avail, ds_allports);
2481 	DS_PORTSET_SETNULL(svc->tried);
2482 
2483 	ds_svcs.nsvcs++;
2484 
2485 	hdl = svc->hdl;
2486 
2487 	/*
2488 	 * kludge to allow user callback code to get handle and user args.
2489 	 * Make sure the callback arg points to the svc structure.
2490 	 */
2491 	if ((flags & DSSF_ISUSER) != 0) {
2492 		ds_cbarg_set_cookie(svc);
2493 	}
2494 
2495 	if (is_loopback) {
2496 		ds_loopback_register(hdl);
2497 		ds_loopback_register(lb_hdl);
2498 	}
2499 
2500 	/*
2501 	 * If this is a client or a non-loopback service provider, send
2502 	 * out register requests.
2503 	 */
2504 	if (!is_loopback || (flags & DSSF_ISCLIENT) != 0)
2505 		(void) ds_svc_register(svc, NULL);
2506 
2507 	if (hdlp) {
2508 		*hdlp = hdl;
2509 	}
2510 
2511 	mutex_exit(&ds_svcs.lock);
2512 
2513 	DS_DBG_USR(CE_NOTE, "%s: service '%s' assigned handle 0x%09lx" DS_EOL,
2514 	    __func__, svc->cap.svc_id, hdl);
2515 
2516 	return (0);
2517 }
2518 
2519 /*
2520  * ds_cap_init interface for previous revision.
2521  */
2522 int
2523 ds_cap_init(ds_capability_t *cap, ds_clnt_ops_t *ops)
2524 {
2525 	return (ds_ucap_init(cap, ops, 0, DS_INVALID_INSTANCE, NULL));
2526 }
2527 
2528 /*
2529  * Interface for ds_unreg_hdl in lds driver.
2530  */
2531 int
2532 ds_unreg_hdl(ds_svc_hdl_t hdl)
2533 {
2534 	ds_svc_t	*svc;
2535 	int		is_loopback;
2536 	ds_svc_hdl_t	lb_hdl;
2537 
2538 	DS_DBG_USR(CE_NOTE, "%s: hdl=0x%09lx" DS_EOL, __func__, hdl);
2539 
2540 	mutex_enter(&ds_svcs.lock);
2541 	if ((svc = ds_get_svc(hdl)) == NULL) {
2542 		mutex_exit(&ds_svcs.lock);
2543 		DS_DBG_USR(CE_NOTE, "%s: unknown hdl: 0x%llx" DS_EOL, __func__,
2544 		    (u_longlong_t)hdl);
2545 		return (ENXIO);
2546 	}
2547 
2548 	DS_DBG_USR(CE_NOTE, "%s: svcid='%s', hdl=0x%llx" DS_EOL, __func__,
2549 	    svc->cap.svc_id, (u_longlong_t)svc->hdl);
2550 
2551 	svc->state = DS_SVC_UNREG_PENDING;
2552 
2553 	is_loopback = ((svc->flags & DSSF_LOOPBACK) != 0);
2554 	lb_hdl = svc->svc_hdl;
2555 
2556 	if (svc->port) {
2557 		(void) ds_send_unreg_req(svc);
2558 	}
2559 
2560 	(void) ds_svc_unregister(svc, svc->port);
2561 
2562 	ds_delete_svc_entry(svc);
2563 
2564 	if (is_loopback) {
2565 		ds_loopback_unregister(lb_hdl);
2566 	}
2567 
2568 	mutex_exit(&ds_svcs.lock);
2569 
2570 	return (0);
2571 }
2572 
2573 int
2574 ds_cap_fini(ds_capability_t *cap)
2575 {
2576 	ds_svc_hdl_t	hdl;
2577 	int rv;
2578 	uint_t nhdls = 0;
2579 
2580 	DS_DBG(CE_NOTE, "%s: '%s'" DS_EOL, __func__, cap->svc_id);
2581 	if ((rv = ds_hdl_lookup(cap->svc_id, 0, &hdl, 1, &nhdls)) != 0) {
2582 		DS_DBG(CE_NOTE, "%s: ds_hdl_lookup '%s' err (%d)" DS_EOL,
2583 		    __func__, cap->svc_id, rv);
2584 		return (rv);
2585 	}
2586 
2587 	if (nhdls == 0) {
2588 		DS_DBG(CE_NOTE, "%s: no such service '%s'" DS_EOL,
2589 		    __func__, cap->svc_id);
2590 		return (ENXIO);
2591 	}
2592 
2593 	if ((rv = ds_is_my_hdl(hdl, DS_INVALID_INSTANCE)) != 0) {
2594 		DS_DBG(CE_NOTE, "%s: ds_is_my_handle err (%d)" DS_EOL, __func__,
2595 		    rv);
2596 		return (rv);
2597 	}
2598 
2599 	if ((rv = ds_unreg_hdl(hdl)) != 0) {
2600 		DS_DBG(CE_NOTE, "%s: ds_unreg_hdl err (%d)" DS_EOL, __func__,
2601 		    rv);
2602 		return (rv);
2603 	}
2604 
2605 	return (0);
2606 }
2607 
2608 int
2609 ds_cap_send(ds_svc_hdl_t hdl, void *buf, size_t len)
2610 {
2611 	int		rv;
2612 	ds_hdr_t	*hdr;
2613 	caddr_t		msg;
2614 	size_t		msglen;
2615 	size_t		hdrlen;
2616 	caddr_t		payload;
2617 	ds_svc_t	*svc;
2618 	ds_port_t	*port;
2619 	ds_data_handle_t *data;
2620 	ds_svc_hdl_t	svc_hdl;
2621 	int		is_client = 0;
2622 
2623 	DS_DBG(CE_NOTE, "%s: hdl: 0x%llx, buf: %lx, len: %ld" DS_EOL, __func__,
2624 	    (u_longlong_t)hdl, (ulong_t)buf, len);
2625 
2626 	mutex_enter(&ds_svcs.lock);
2627 
2628 	if ((svc = ds_get_svc(hdl)) == NULL) {
2629 		cmn_err(CE_WARN, "%s: invalid handle 0x%llx" DS_EOL, __func__,
2630 		    (u_longlong_t)hdl);
2631 		mutex_exit(&ds_svcs.lock);
2632 		return (ENXIO);
2633 	}
2634 
2635 	if (svc->state != DS_SVC_ACTIVE) {
2636 		/* channel is up, but svc is not registered */
2637 		DS_DBG(CE_NOTE, "%s: invalid service state 0x%x" DS_EOL,
2638 		    __func__, svc->state);
2639 		mutex_exit(&ds_svcs.lock);
2640 		return (ENOTCONN);
2641 	}
2642 
2643 	if (svc->flags & DSSF_LOOPBACK) {
2644 		hdl = svc->svc_hdl;
2645 		mutex_exit(&ds_svcs.lock);
2646 		ds_loopback_send(hdl, buf, len);
2647 		return (0);
2648 	}
2649 
2650 	if ((port = svc->port) == NULL) {
2651 		DS_DBG(CE_NOTE, "%s: service '%s' not associated with a port"
2652 		    DS_EOL, __func__, svc->cap.svc_id);
2653 		mutex_exit(&ds_svcs.lock);
2654 		return (ECONNRESET);
2655 	}
2656 
2657 	if (svc->flags & DSSF_ISCLIENT) {
2658 		is_client = 1;
2659 		svc_hdl = svc->svc_hdl;
2660 	}
2661 
2662 	mutex_exit(&ds_svcs.lock);
2663 
2664 	/* check that the LDC channel is ready */
2665 	if (port->ldc.state != LDC_UP) {
2666 		DS_DBG(CE_NOTE, "%s: LDC channel is not up" DS_EOL, __func__);
2667 		return (ECONNRESET);
2668 	}
2669 
2670 	hdrlen = DS_HDR_SZ + sizeof (ds_data_handle_t);
2671 
2672 	msg = DS_MALLOC(len + hdrlen);
2673 	hdr = (ds_hdr_t *)msg;
2674 	payload = msg + hdrlen;
2675 	msglen = len + hdrlen;
2676 
2677 	hdr->payload_len = len + sizeof (ds_data_handle_t);
2678 	hdr->msg_type = DS_DATA;
2679 
2680 	data = (ds_data_handle_t *)(msg + DS_HDR_SZ);
2681 	if (is_client) {
2682 		data->svc_handle = svc_hdl;
2683 	} else {
2684 		data->svc_handle = hdl;
2685 	}
2686 
2687 	if ((buf != NULL) && (len != 0)) {
2688 		(void) memcpy(payload, buf, len);
2689 	}
2690 
2691 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data>: hdl=0x%llx, len=%ld, "
2692 	    " payload_len=%d" DS_EOL, PORTID(port), (u_longlong_t)svc->hdl,
2693 	    msglen, hdr->payload_len);
2694 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msglen);
2695 
2696 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
2697 		rv = (rv == EIO) ? ECONNRESET : rv;
2698 	}
2699 	DS_FREE(msg, msglen);
2700 
2701 	return (rv);
2702 }
2703 
2704 void
2705 ds_port_common_init(ds_port_t *port)
2706 {
2707 	int rv;
2708 
2709 	if ((port->flags & DS_PORT_MUTEX_INITED) == 0) {
2710 		mutex_init(&port->lock, NULL, MUTEX_DRIVER, NULL);
2711 		mutex_init(&port->tx_lock, NULL, MUTEX_DRIVER, NULL);
2712 		mutex_init(&port->rcv_lock, NULL, MUTEX_DRIVER, NULL);
2713 		port->flags |= DS_PORT_MUTEX_INITED;
2714 	}
2715 
2716 	port->state = DS_PORT_INIT;
2717 	DS_PORTSET_ADD(ds_allports, port->id);
2718 
2719 	ds_sys_port_init(port);
2720 
2721 	mutex_enter(&port->lock);
2722 	rv = ds_ldc_init(port);
2723 	mutex_exit(&port->lock);
2724 
2725 	/*
2726 	 * If LDC successfully init'ed, try to kick off protocol for this port.
2727 	 */
2728 	if (rv == 0) {
2729 		ds_handle_up_event(port);
2730 	}
2731 }
2732 
2733 void
2734 ds_port_common_fini(ds_port_t *port, int is_fini)
2735 {
2736 	port->state = DS_PORT_FREE;
2737 
2738 	if (is_fini && (port->flags & DS_PORT_MUTEX_INITED) != 0) {
2739 		mutex_destroy(&port->lock);
2740 		mutex_destroy(&port->tx_lock);
2741 		mutex_destroy(&port->rcv_lock);
2742 		port->flags &= ~DS_PORT_MUTEX_INITED;
2743 	}
2744 
2745 	DS_PORTSET_DEL(ds_allports, port->id);
2746 
2747 	ds_sys_port_fini(port);
2748 }
2749 
2750 /*
2751  * Initialize table of registered service classes
2752  */
2753 void
2754 ds_init_svcs_tbl(uint_t nentries)
2755 {
2756 	int	tblsz;
2757 
2758 	ds_svcs.maxsvcs = nentries;
2759 
2760 	tblsz = ds_svcs.maxsvcs * sizeof (ds_svc_t *);
2761 	ds_svcs.tbl = (ds_svc_t **)DS_MALLOC(tblsz);
2762 
2763 	ds_svcs.nsvcs = 0;
2764 }
2765 
2766 /*
2767  * Find the max and min version supported.
2768  * Hacked from zeus workspace, support.c
2769  */
2770 static void
2771 min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
2772     uint16_t *min_major, uint16_t *max_major)
2773 {
2774 	int i;
2775 
2776 	*min_major = sup_versionsp[0].major;
2777 	*max_major = *min_major;
2778 
2779 	for (i = 1; i < num_versions; i++) {
2780 		if (sup_versionsp[i].major < *min_major)
2781 			*min_major = sup_versionsp[i].major;
2782 
2783 		if (sup_versionsp[i].major > *max_major)
2784 			*max_major = sup_versionsp[i].major;
2785 	}
2786 }
2787 
2788 /*
2789  * Check whether the major and minor numbers requested by the peer can be
2790  * satisfied. If the requested major is supported, true is returned, and the
2791  * agreed minor is returned in new_minor. If the requested major is not
2792  * supported, the routine returns false, and the closest major is returned in
2793  * *new_major, upon which the peer should re-negotiate. The closest major is
2794  * the just lower that the requested major number.
2795  *
2796  * Hacked from zeus workspace, support.c
2797  */
2798 boolean_t
2799 negotiate_version(int num_versions, ds_ver_t *sup_versionsp,
2800     uint16_t req_major, uint16_t *new_majorp, uint16_t *new_minorp)
2801 {
2802 	int i;
2803 	uint16_t major, lower_major;
2804 	uint16_t min_major = 0, max_major;
2805 	boolean_t found_match = B_FALSE;
2806 
2807 	min_max_versions(num_versions, sup_versionsp, &min_major, &max_major);
2808 
2809 	DS_DBG(CE_NOTE, "negotiate_version: req_major = %u, min = %u, max = %u"
2810 	    DS_EOL, req_major, min_major, max_major);
2811 
2812 	/*
2813 	 * If the minimum version supported is greater than
2814 	 * the version requested, return the lowest version
2815 	 * supported
2816 	 */
2817 	if (min_major > req_major) {
2818 		*new_majorp = min_major;
2819 		return (B_FALSE);
2820 	}
2821 
2822 	/*
2823 	 * If the largest version supported is lower than
2824 	 * the version requested, return the largest version
2825 	 * supported
2826 	 */
2827 	if (max_major < req_major) {
2828 		*new_majorp = max_major;
2829 		return (B_FALSE);
2830 	}
2831 
2832 	/*
2833 	 * Now we know that the requested version lies between the
2834 	 * min and max versions supported. Check if the requested
2835 	 * major can be found in supported versions.
2836 	 */
2837 	lower_major = min_major;
2838 	for (i = 0; i < num_versions; i++) {
2839 		major = sup_versionsp[i].major;
2840 		if (major == req_major) {
2841 			found_match = B_TRUE;
2842 			*new_majorp = req_major;
2843 			*new_minorp = sup_versionsp[i].minor;
2844 			break;
2845 		} else {
2846 			if ((major < req_major) && (major > lower_major))
2847 				lower_major = major;
2848 		}
2849 	}
2850 
2851 	/*
2852 	 * If no match is found, return the closest available number
2853 	 */
2854 	if (!found_match)
2855 		*new_majorp = lower_major;
2856 
2857 	return (found_match);
2858 }
2859 
2860 /*
2861  * Specific errno's that are used by ds.c and ldc.c
2862  */
2863 static struct {
2864 	int ds_errno;
2865 	char *estr;
2866 } ds_errno_to_str_tab[] = {
2867 	{ EIO,		"I/O error" },
2868 	{ ENXIO,	"No such device or address" },
2869 	{ EAGAIN,	"Resource temporarily unavailable" },
2870 	{ ENOMEM,	"Not enough space" },
2871 	{ EACCES,	"Permission denied" },
2872 	{ EFAULT,	"Bad address" },
2873 	{ EBUSY,	"Device busy" },
2874 	{ EINVAL,	"Invalid argument" },
2875 	{ ENOSPC,	"No space left on device" },
2876 	{ ENOMSG,	"No message of desired type" },
2877 #ifdef	ECHRNG
2878 	{ ECHRNG,	"Channel number out of range" },
2879 #endif
2880 	{ ENOTSUP,	"Operation not supported" },
2881 	{ EMSGSIZE,	"Message too long" },
2882 	{ EADDRINUSE,	"Address already in use" },
2883 	{ ECONNRESET,	"Connection reset by peer" },
2884 	{ ENOBUFS,	"No buffer space available" },
2885 	{ ENOTCONN,	"Socket is not connected" },
2886 	{ ECONNREFUSED,	"Connection refused" },
2887 	{ EALREADY,	"Operation already in progress" },
2888 	{ 0,		NULL },
2889 };
2890 
2891 char *
2892 ds_errno_to_str(int ds_errno, char *ebuf)
2893 {
2894 	int i, en;
2895 
2896 	for (i = 0; (en = ds_errno_to_str_tab[i].ds_errno) != 0; i++) {
2897 		if (en == ds_errno) {
2898 			(void) strcpy(ebuf, ds_errno_to_str_tab[i].estr);
2899 			return (ebuf);
2900 		}
2901 	}
2902 
2903 	(void) sprintf(ebuf, "ds_errno (%d)", ds_errno);
2904 	return (ebuf);
2905 }
2906 
2907 static void
2908 ds_loopback_register(ds_svc_hdl_t hdl)
2909 {
2910 	ds_ver_t ds_ver;
2911 	ds_svc_t *svc;
2912 
2913 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2914 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2915 	    (u_longlong_t)hdl);
2916 	if ((svc = ds_get_svc(hdl)) == NULL) {
2917 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2918 		    (u_longlong_t)hdl);
2919 		return;
2920 	}
2921 
2922 	svc->state = DS_SVC_ACTIVE;
2923 
2924 	if (svc->ops.ds_reg_cb) {
2925 		DS_DBG_LOOP(CE_NOTE, "%s: loopback regcb: hdl: 0x%llx" DS_EOL,
2926 		    __func__, (u_longlong_t)hdl);
2927 		ds_ver.major = svc->ver.major;
2928 		ds_ver.minor = svc->ver.minor;
2929 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &ds_ver, hdl);
2930 	}
2931 }
2932 
2933 static void
2934 ds_loopback_unregister(ds_svc_hdl_t hdl)
2935 {
2936 	ds_svc_t *svc;
2937 
2938 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2939 	if ((svc = ds_get_svc(hdl)) == NULL) {
2940 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2941 		    (u_longlong_t)hdl);
2942 		return;
2943 	}
2944 
2945 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2946 	    (u_longlong_t)hdl);
2947 
2948 	svc->flags &= ~DSSF_LOOPBACK;
2949 	svc->svc_hdl = DS_BADHDL2;
2950 	svc->state = DS_SVC_INACTIVE;
2951 
2952 	if (svc->ops.ds_unreg_cb) {
2953 		DS_DBG_LOOP(CE_NOTE, "%s: loopback unregcb: hdl: 0x%llx" DS_EOL,
2954 		    __func__, (u_longlong_t)hdl);
2955 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
2956 	}
2957 }
2958 
2959 static void
2960 ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen)
2961 {
2962 	ds_svc_t *svc;
2963 
2964 	mutex_enter(&ds_svcs.lock);
2965 	if ((svc = ds_get_svc(hdl)) == NULL) {
2966 		mutex_exit(&ds_svcs.lock);
2967 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2968 		    (u_longlong_t)hdl);
2969 		return;
2970 	}
2971 	mutex_exit(&ds_svcs.lock);
2972 
2973 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2974 	    (u_longlong_t)hdl);
2975 
2976 	if (svc->ops.ds_data_cb) {
2977 		DS_DBG_LOOP(CE_NOTE, "%s: loopback datacb hdl: 0x%llx" DS_EOL,
2978 		    __func__, (u_longlong_t)hdl);
2979 		(*svc->ops.ds_data_cb)(svc->ops.cb_arg, buf, buflen);
2980 	}
2981 }
2982 
2983 static int
2984 ds_loopback_set_svc(ds_svc_t *svc, ds_capability_t *cap, ds_svc_hdl_t *lb_hdlp)
2985 {
2986 	ds_svc_t *lb_svc;
2987 	ds_svc_hdl_t lb_hdl = *lb_hdlp;
2988 	int i;
2989 	int match = 0;
2990 	uint16_t new_major;
2991 	uint16_t new_minor;
2992 
2993 	if ((lb_svc = ds_get_svc(lb_hdl)) == NULL) {
2994 		DS_DBG_LOOP(CE_NOTE, "%s: loopback: hdl: 0x%llx invalid" DS_EOL,
2995 		    __func__, (u_longlong_t)lb_hdl);
2996 		return (ENXIO);
2997 	}
2998 
2999 	/* negotiate a version between loopback services, if possible */
3000 	for (i = 0; i < lb_svc->cap.nvers && match == 0; i++) {
3001 		match = negotiate_version(cap->nvers, cap->vers,
3002 		    lb_svc->cap.vers[i].major, &new_major, &new_minor);
3003 	}
3004 	if (!match) {
3005 		DS_DBG_LOOP(CE_NOTE, "%s: loopback version negotiate failed"
3006 		    DS_EOL, __func__);
3007 		return (ENOTSUP);
3008 	}
3009 	if (lb_svc->state != DS_SVC_INACTIVE) {
3010 		DS_DBG_LOOP(CE_NOTE, "%s: loopback active: hdl: 0x%llx"
3011 		    DS_EOL, __func__, (u_longlong_t)lb_hdl);
3012 		if ((lb_svc->flags & DSSF_ISCLIENT) == 0) {
3013 			DS_DBG_LOOP(CE_NOTE, "%s: loopback busy hdl: 0x%llx"
3014 			    DS_EOL, __func__, (u_longlong_t)lb_hdl);
3015 			return (EBUSY);
3016 		}
3017 		svc->state = DS_SVC_INACTIVE;	/* prevent alloc'ing svc */
3018 		lb_svc = ds_svc_clone(lb_svc);
3019 		DS_DBG_LOOP(CE_NOTE, "%s: loopback clone: ohdl: 0x%llx "
3020 		    "nhdl: 0x%llx" DS_EOL, __func__, (u_longlong_t)lb_hdl,
3021 		    (u_longlong_t)lb_svc->hdl);
3022 		*lb_hdlp = lb_svc->hdl;
3023 	}
3024 
3025 	svc->flags |= DSSF_LOOPBACK;
3026 	svc->svc_hdl = lb_svc->hdl;
3027 	svc->port = NULL;
3028 	svc->ver.major = new_major;
3029 	svc->ver.minor = new_minor;
3030 
3031 	lb_svc->flags |= DSSF_LOOPBACK;
3032 	lb_svc->svc_hdl = svc->hdl;
3033 	lb_svc->port = NULL;
3034 	lb_svc->ver.major = new_major;
3035 	lb_svc->ver.minor = new_minor;
3036 
3037 	DS_DBG_LOOP(CE_NOTE, "%s: setting loopback between: 0x%llx and 0x%llx"
3038 	    DS_EOL, __func__, (u_longlong_t)svc->hdl,
3039 	    (u_longlong_t)lb_svc->hdl);
3040 	return (0);
3041 }
3042 
3043 static ds_svc_t *
3044 ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl, ds_port_t *port)
3045 {
3046 	int		idx;
3047 	ds_svc_t	*svc;
3048 
3049 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s looking up clnt hdl: 0x%llx" DS_EOL,
3050 	    PORTID(port), __func__, (u_longlong_t)hdl);
3051 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3052 
3053 	/* walk every table entry */
3054 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
3055 		svc = ds_svcs.tbl[idx];
3056 		if (DS_SVC_ISFREE(svc))
3057 			continue;
3058 		if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3059 		    svc->svc_hdl == hdl && svc->port == port) {
3060 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s found clnt hdl "
3061 			    "0x%llx: svc%d" DS_EOL, PORTID(port), __func__,
3062 			    (u_longlong_t)hdl, (uint_t)DS_HDL2IDX(svc->hdl));
3063 			return (svc);
3064 		}
3065 	}
3066 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s clnt hdl: 0x%llx not found" DS_EOL,
3067 	    PORTID(port), __func__, (u_longlong_t)hdl);
3068 
3069 	return (NULL);
3070 }
3071 
3072 static ds_svc_t *
3073 ds_svc_clone(ds_svc_t *svc)
3074 {
3075 	ds_svc_t *newsvc;
3076 	ds_svc_hdl_t hdl;
3077 
3078 	ASSERT(svc->flags & DSSF_ISCLIENT);
3079 
3080 	newsvc = ds_alloc_svc();
3081 
3082 	/* Can only clone clients for now */
3083 	hdl = newsvc->hdl | DS_HDL_ISCLIENT_BIT;
3084 	DS_DBG_USR(CE_NOTE, "%s: cloning client: old hdl: 0x%llx new hdl: "
3085 	    "0x%llx" DS_EOL, __func__, (u_longlong_t)svc->hdl,
3086 	    (u_longlong_t)hdl);
3087 	(void) memcpy(newsvc, svc, sizeof (ds_svc_t));
3088 	newsvc->hdl = hdl;
3089 	newsvc->flags &= ~DSSF_LOOPBACK;
3090 	newsvc->port = NULL;
3091 	newsvc->svc_hdl = DS_BADHDL2;
3092 	newsvc->cap.svc_id = ds_strdup(svc->cap.svc_id);
3093 	newsvc->cap.vers = DS_MALLOC(svc->cap.nvers * sizeof (ds_ver_t));
3094 	(void) memcpy(newsvc->cap.vers, svc->cap.vers,
3095 	    svc->cap.nvers * sizeof (ds_ver_t));
3096 
3097 	/*
3098 	 * Kludge to allow lds driver user callbacks to get access to current
3099 	 * svc structure.  Arg could be index to svc table or some other piece
3100 	 * of info to get to the svc table entry.
3101 	 */
3102 	if (newsvc->flags & DSSF_ISUSER) {
3103 		newsvc->ops.cb_arg = (ds_cb_arg_t)(newsvc);
3104 	}
3105 	return (newsvc);
3106 }
3107 
3108 /*
3109  * Internal handle lookup function.
3110  */
3111 static int
3112 i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3113     uint_t maxhdls)
3114 {
3115 	int idx;
3116 	int nhdls = 0;
3117 	ds_svc_t *svc;
3118 	uint32_t client_flag = is_client ? DSSF_ISCLIENT : 0;
3119 
3120 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3121 
3122 	for (idx = 0; idx < ds_svcs.maxsvcs && nhdls < maxhdls; idx++) {
3123 		svc = ds_svcs.tbl[idx];
3124 		if (DS_SVC_ISFREE(svc))
3125 			continue;
3126 		if (strcmp(svc->cap.svc_id, service) == 0 &&
3127 		    (svc->flags & DSSF_ISCLIENT) == client_flag) {
3128 			if (hdlp != NULL && nhdls < maxhdls) {
3129 				hdlp[nhdls] = svc->hdl;
3130 				nhdls++;
3131 			} else {
3132 				nhdls++;
3133 			}
3134 		}
3135 	}
3136 	return (nhdls);
3137 }
3138 
3139 /*
3140  * Interface for ds_hdl_lookup in lds driver.
3141  */
3142 int
3143 ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3144     uint_t maxhdls, uint_t *nhdlsp)
3145 {
3146 	mutex_enter(&ds_svcs.lock);
3147 	*nhdlsp = i_ds_hdl_lookup(service, is_client, hdlp, maxhdls);
3148 	mutex_exit(&ds_svcs.lock);
3149 	return (0);
3150 }
3151 
3152 static void
3153 ds_portset_del_active_clients(char *service, ds_portset_t *portsp)
3154 {
3155 	ds_portset_t ports;
3156 	int idx;
3157 	ds_svc_t *svc;
3158 
3159 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3160 
3161 	DS_PORTSET_DUP(ports, *portsp);
3162 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
3163 		svc = ds_svcs.tbl[idx];
3164 		if (DS_SVC_ISFREE(svc))
3165 			continue;
3166 		if (strcmp(svc->cap.svc_id, service) == 0 &&
3167 		    (svc->flags & DSSF_ISCLIENT) != 0 &&
3168 		    svc->state != DS_SVC_INACTIVE &&
3169 		    svc->port != NULL) {
3170 			DS_PORTSET_DEL(ports, PORTID(svc->port));
3171 		}
3172 	}
3173 
3174 	/*
3175 	 * Never send a client reg req to the SP.
3176 	 */
3177 	if (ds_sp_port_id != DS_PORTID_INVALID) {
3178 		DS_PORTSET_DEL(ports, ds_sp_port_id);
3179 	}
3180 	DS_PORTSET_DUP(*portsp, ports);
3181 }
3182 
3183 /*
3184  * After an UNREG REQ, check if this is a client service with multiple
3185  * handles.  If it is, then we can eliminate this entry.
3186  */
3187 static void
3188 ds_check_for_dup_services(ds_svc_t *svc)
3189 {
3190 	if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3191 	    svc->state == DS_SVC_INACTIVE &&
3192 	    i_ds_hdl_lookup(svc->cap.svc_id, 1, NULL, 2) == 2) {
3193 		ds_delete_svc_entry(svc);
3194 	}
3195 }
3196 
3197 static void
3198 ds_delete_svc_entry(ds_svc_t *svc)
3199 {
3200 	ds_svc_hdl_t tmp_hdl;
3201 
3202 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3203 
3204 	/*
3205 	 * Clear out the structure, but do not deallocate the
3206 	 * memory. It can be reused for the next registration.
3207 	 */
3208 	DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
3209 	DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
3210 
3211 	/* save the handle to prevent reuse */
3212 	tmp_hdl = svc->hdl;
3213 	bzero((void *)svc, sizeof (ds_svc_t));
3214 
3215 	/* initialize for next use */
3216 	svc->hdl = tmp_hdl;
3217 	svc->state = DS_SVC_FREE;
3218 
3219 	ds_svcs.nsvcs--;
3220 }
3221