xref: /titanic_41/usr/src/uts/sun4v/io/ds_common.c (revision 55f338e6ff66b050744472388b2b9ee3d320414f)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * Domain Services Module Common Code.
29  *
30  * This module is intended to be used by both Solaris and the VBSC
31  * module.
32  */
33 
34 #include <sys/modctl.h>
35 #include <sys/ksynch.h>
36 #include <sys/taskq.h>
37 #include <sys/disp.h>
38 #include <sys/cmn_err.h>
39 #include <sys/note.h>
40 #include <sys/mach_descrip.h>
41 #include <sys/mdesc.h>
42 #include <sys/ldc.h>
43 #include <sys/ds.h>
44 #include <sys/ds_impl.h>
45 
46 #ifndef MIN
47 #define	MIN(a, b)	((a) < (b) ? (a) : (b))
48 #endif
49 
50 #define	DS_DECODE_BUF_LEN		30
51 
52 /*
53  * All DS ports in the system
54  *
55  * The list of DS ports is read in from the MD when the DS module is
56  * initialized and is never modified. This eliminates the need for
57  * locking to access the port array itself. Access to the individual
58  * ports are synchronized at the port level.
59  */
60 ds_port_t	ds_ports[DS_MAX_PORTS];
61 ds_portset_t	ds_allports;	/* all DS ports in the system */
62 ds_portset_t	ds_nullport;	/* allows test against null portset */
63 
64 /* DS SP port id */
65 uint64_t ds_sp_port_id = DS_PORTID_INVALID;
66 
67 /*
68  * Table of registered services
69  *
70  * Locking: Accesses to the table of services are synchronized using
71  *   a mutex lock. The reader lock must be held when looking up service
72  *   information in the table. The writer lock must be held when any
73  *   service information is being modified.
74  */
75 ds_svcs_t	ds_svcs;
76 
77 /*
78  * Flag to prevent callbacks while in the middle of DS teardown.
79  */
80 boolean_t ds_enabled = B_FALSE;	/* enable/disable taskq processing */
81 
82 /*
83  * Retry count and delay for LDC reads and writes
84  */
85 #ifndef DS_DEFAULT_RETRIES
86 #define	DS_DEFAULT_RETRIES	10000	/* number of times to retry */
87 #endif
88 #ifndef DS_DEFAULT_DELAY
89 #define	DS_DEFAULT_DELAY	1000	/* usecs to wait between retries */
90 #endif
91 
92 static int ds_retries = DS_DEFAULT_RETRIES;
93 static clock_t ds_delay = DS_DEFAULT_DELAY;
94 
95 /*
96  * Supported versions of the DS message protocol
97  *
98  * The version array must be sorted in order from the highest
99  * supported version to the lowest. Support for a particular
100  * <major>.<minor> version implies all lower minor versions of
101  * that same major version are supported as well.
102  */
103 static ds_ver_t ds_vers[] = { { 1, 0 } };
104 
105 #define	DS_NUM_VER	(sizeof (ds_vers) / sizeof (ds_vers[0]))
106 
107 
108 /* incoming message handling functions */
109 typedef void (*ds_msg_handler_t)(ds_port_t *port, caddr_t buf, size_t len);
110 static void ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len);
111 static void ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len);
112 static void ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len);
113 static void ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len);
114 static void ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len);
115 static void ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len);
116 static void ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len);
117 static void ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len);
118 static void ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len);
119 static void ds_handle_data(ds_port_t *port, caddr_t buf, size_t len);
120 static void ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len);
121 
122 /*
123  * DS Message Handler Dispatch Table
124  *
125  * A table used to dispatch all incoming messages. This table
126  * contains handlers for all the fixed message types, as well as
127  * the the messages defined in the 1.0 version of the DS protocol.
128  * The handlers are indexed based on the DS header msg_type values
129  */
130 static const ds_msg_handler_t ds_msg_handlers[] = {
131 	ds_handle_init_req,		/* DS_INIT_REQ */
132 	ds_handle_init_ack,		/* DS_INIT_ACK */
133 	ds_handle_init_nack,		/* DS_INIT_NACK */
134 	ds_handle_reg_req,		/* DS_REG_REQ */
135 	ds_handle_reg_ack,		/* DS_REG_ACK */
136 	ds_handle_reg_nack,		/* DS_REG_NACK */
137 	ds_handle_unreg_req,		/* DS_UNREG */
138 	ds_handle_unreg_ack,		/* DS_UNREG_ACK */
139 	ds_handle_unreg_nack,		/* DS_UNREG_NACK */
140 	ds_handle_data,			/* DS_DATA */
141 	ds_handle_nack			/* DS_NACK */
142 };
143 
144 
145 
146 /* initialization functions */
147 static int ds_ldc_init(ds_port_t *port);
148 
149 /* event processing functions */
150 static uint_t ds_ldc_cb(uint64_t event, caddr_t arg);
151 static int ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep);
152 static void ds_handle_up_event(ds_port_t *port);
153 static void ds_handle_down_reset_events(ds_port_t *port);
154 static void ds_handle_recv(void *arg);
155 static void ds_dispatch_event(void *arg);
156 
157 /* message sending functions */
158 static int ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen);
159 static int ds_send_reg_req(ds_svc_t *svc, ds_port_t *port);
160 static void ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
161 static void ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
162 
163 /* walker functions */
164 static int ds_svc_isfree(ds_svc_t *svc, void *arg);
165 static int ds_svc_unregister(ds_svc_t *svc, void *arg);
166 static int ds_svc_port_up(ds_svc_t *svc, void *arg);
167 
168 /* service utilities */
169 static void ds_reset_svc(ds_svc_t *svc, ds_port_t *port);
170 static int ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port);
171 
172 /* port utilities */
173 static void ds_port_reset(ds_port_t *port);
174 static ldc_status_t ds_update_ldc_state(ds_port_t *port);
175 
176 /* misc utilities */
177 static void min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
178     uint16_t *min_major, uint16_t *max_major);
179 
180 /* debug */
181 static char *decode_ldc_events(uint64_t event, char *buf);
182 
183 /* loopback */
184 static void ds_loopback_register(ds_svc_hdl_t hdl);
185 static void ds_loopback_unregister(ds_svc_hdl_t hdl);
186 static void ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen);
187 static int ds_loopback_set_svc(ds_svc_t *svc, ds_capability_t *cap,
188     ds_svc_hdl_t *lb_hdlp);
189 
190 /* client handling */
191 static int i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
192     uint_t maxhdls);
193 static ds_svc_t *ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl,
194     ds_port_t *port);
195 static ds_svc_t *ds_find_svc_by_id_port(char *svc_id, int is_client,
196     ds_port_t *port);
197 static ds_svc_t *ds_svc_clone(ds_svc_t *svc);
198 static void ds_portset_del_active_clients(char *service, ds_portset_t *portsp);
199 static void ds_check_for_dup_services(ds_svc_t *svc);
200 static void ds_delete_svc_entry(ds_svc_t *svc);
201 
202 char *
203 ds_strdup(char *str)
204 {
205 	char *newstr;
206 
207 	newstr = DS_MALLOC(strlen(str) + 1);
208 	(void) strcpy(newstr, str);
209 	return (newstr);
210 }
211 
212 void
213 ds_common_init(void)
214 {
215 	/* Validate version table */
216 	ASSERT(ds_vers_isvalid(ds_vers, DS_NUM_VER) == DS_VERS_OK);
217 
218 	/* Initialize services table */
219 	ds_init_svcs_tbl(DS_MAXSVCS_INIT);
220 
221 	/* enable callback processing */
222 	ds_enabled = B_TRUE;
223 }
224 
225 /* BEGIN LDC SUPPORT FUNCTIONS */
226 
227 static char *
228 decode_ldc_events(uint64_t event, char *buf)
229 {
230 	buf[0] = 0;
231 	if (event & LDC_EVT_DOWN)	(void) strcat(buf, " DOWN");
232 	if (event & LDC_EVT_RESET)	(void) strcat(buf, " RESET");
233 	if (event & LDC_EVT_UP)		(void) strcat(buf, " UP");
234 	if (event & LDC_EVT_READ)	(void) strcat(buf, " READ");
235 	if (event & LDC_EVT_WRITE)	(void) strcat(buf, " WRITE");
236 	return (buf);
237 }
238 
239 static ldc_status_t
240 ds_update_ldc_state(ds_port_t *port)
241 {
242 	ldc_status_t	ldc_state;
243 	int		rv;
244 	char		ebuf[DS_EBUFSIZE];
245 
246 	ASSERT(MUTEX_HELD(&port->lock));
247 
248 	/*
249 	 * Read status and update ldc state info in port structure.
250 	 */
251 	if ((rv = ldc_status(port->ldc.hdl, &ldc_state)) != 0) {
252 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_status error: %s" DS_EOL,
253 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
254 		ldc_state = port->ldc.state;
255 	} else {
256 		port->ldc.state = ldc_state;
257 	}
258 
259 	return (ldc_state);
260 }
261 
262 static void
263 ds_handle_down_reset_events(ds_port_t *port)
264 {
265 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
266 	    __func__);
267 
268 	mutex_enter(&ds_svcs.lock);
269 	mutex_enter(&port->lock);
270 
271 	ds_sys_drain_events(port);
272 
273 	(void) ds_update_ldc_state(port);
274 
275 	/* reset the port state */
276 	ds_port_reset(port);
277 
278 	/* acknowledge the reset */
279 	(void) ldc_up(port->ldc.hdl);
280 
281 	mutex_exit(&port->lock);
282 	mutex_exit(&ds_svcs.lock);
283 
284 	ds_handle_up_event(port);
285 
286 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
287 }
288 
289 static void
290 ds_handle_up_event(ds_port_t *port)
291 {
292 	ldc_status_t	ldc_state;
293 
294 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
295 	    __func__);
296 
297 	mutex_enter(&port->lock);
298 
299 	ldc_state = ds_update_ldc_state(port);
300 
301 	mutex_exit(&port->lock);
302 
303 	if ((ldc_state == LDC_UP) && IS_DS_PORT(port)) {
304 		/*
305 		 * Initiate the handshake.
306 		 */
307 		ds_send_init_req(port);
308 	}
309 
310 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
311 }
312 
313 static uint_t
314 ds_ldc_cb(uint64_t event, caddr_t arg)
315 {
316 	ds_port_t	*port = (ds_port_t *)arg;
317 	char		evstring[DS_DECODE_BUF_LEN];
318 
319 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: %s event (%llx) received" DS_EOL,
320 	    PORTID(port), __func__, decode_ldc_events(event, evstring),
321 	    (u_longlong_t)event);
322 
323 	if (!ds_enabled) {
324 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: callback handling is disabled"
325 		    DS_EOL, PORTID(port), __func__);
326 		return (LDC_SUCCESS);
327 	}
328 
329 	if (event & (LDC_EVT_DOWN | LDC_EVT_RESET)) {
330 		ds_handle_down_reset_events(port);
331 		goto done;
332 	}
333 
334 	if (event & LDC_EVT_UP) {
335 		ds_handle_up_event(port);
336 	}
337 
338 	if (event & LDC_EVT_READ) {
339 		if (port->ldc.state != LDC_UP) {
340 			cmn_err(CE_WARN, "ds@%lx: %s: LDC READ event while "
341 			    "port not up" DS_EOL, PORTID(port), __func__);
342 			goto done;
343 		}
344 
345 		if (ds_sys_dispatch_func(ds_handle_recv, port)) {
346 			cmn_err(CE_WARN, "ds@%lx: error initiating LDC READ "
347 			    " event", PORTID(port));
348 		}
349 	}
350 
351 	if (event & LDC_EVT_WRITE) {
352 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: LDC WRITE event received, "
353 		    "not supported" DS_EOL, PORTID(port), __func__);
354 	}
355 
356 	if (event & ~(LDC_EVT_UP | LDC_EVT_READ)) {
357 		cmn_err(CE_WARN, "ds@%lx: %s: Unexpected LDC event received: "
358 		    "0x%llx" DS_EOL, PORTID(port), __func__,
359 		    (u_longlong_t)event);
360 	}
361 done:
362 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
363 
364 	return (LDC_SUCCESS);
365 }
366 
367 static int
368 ds_ldc_init(ds_port_t *port)
369 {
370 	int		rv;
371 	ldc_attr_t	ldc_attr;
372 	caddr_t		ldc_cb_arg = (caddr_t)port;
373 	char		ebuf[DS_EBUFSIZE];
374 
375 	ASSERT(MUTEX_HELD(&port->lock));
376 
377 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%lld" DS_EOL,
378 	    PORTID(port), __func__, (u_longlong_t)port->ldc.id);
379 
380 	ldc_attr.devclass = LDC_DEV_GENERIC;
381 	ldc_attr.instance = 0;
382 	ldc_attr.mode = LDC_MODE_RELIABLE;
383 	ldc_attr.mtu = DS_STREAM_MTU;
384 
385 	if ((rv = ldc_init(port->ldc.id, &ldc_attr, &port->ldc.hdl)) != 0) {
386 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_id: %lx, ldc_init error: %s"
387 		    DS_EOL, PORTID(port), __func__, port->ldc.id,
388 		    ds_errno_to_str(rv, ebuf));
389 		return (rv);
390 	}
391 
392 	rv = ldc_reg_callback(port->ldc.hdl, ds_ldc_cb, ldc_cb_arg);
393 	if (rv != 0) {
394 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_reg_callback error: %s"
395 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
396 		return (rv);
397 	}
398 
399 	ds_sys_ldc_init(port);
400 	return (0);
401 }
402 
403 int
404 ds_ldc_fini(ds_port_t *port)
405 {
406 	int	rv;
407 	char	ebuf[DS_EBUFSIZE];
408 
409 	ASSERT(port->state >= DS_PORT_LDC_INIT);
410 	ASSERT(MUTEX_HELD(&port->lock));
411 
412 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%ld" DS_EOL, PORTID(port),
413 	    __func__, port->ldc.id);
414 
415 	if ((rv = ldc_close(port->ldc.hdl)) != 0) {
416 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_close error: %s" DS_EOL,
417 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
418 		return (rv);
419 	}
420 
421 	if ((rv = ldc_unreg_callback(port->ldc.hdl)) != 0) {
422 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_unreg_callback error: %s"
423 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
424 		return (rv);
425 	}
426 
427 	if ((rv = ldc_fini(port->ldc.hdl)) != 0) {
428 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_fini error: %s" DS_EOL,
429 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
430 		return (rv);
431 	}
432 
433 	port->ldc.id = (uint64_t)-1;
434 	port->ldc.hdl = NULL;
435 	port->ldc.state = 0;
436 
437 	return (rv);
438 }
439 
440 /*
441  * Attempt to read a specified number of bytes from a particular LDC.
442  * Returns zero for success or the return code from the LDC read on
443  * failure. The actual number of bytes read from the LDC is returned
444  * in the size parameter.
445  */
446 static int
447 ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep)
448 {
449 	int	rv = 0;
450 	size_t	bytes_req = *sizep;
451 	size_t	bytes_left = bytes_req;
452 	size_t	nbytes;
453 	int	retry_count = 0;
454 	char	ebuf[DS_EBUFSIZE];
455 
456 	ASSERT(MUTEX_HELD(&port->rcv_lock));
457 
458 	*sizep = 0;
459 
460 	DS_DBG_LDC(CE_NOTE, "ds@%lx: attempting to read %ld bytes" DS_EOL,
461 	    PORTID(port), bytes_req);
462 
463 	while (bytes_left > 0) {
464 
465 		nbytes = bytes_left;
466 
467 		mutex_enter(&port->lock);
468 		if (port->ldc.state == LDC_UP) {
469 			rv = ldc_read(port->ldc.hdl, msgp, &nbytes);
470 		} else
471 			rv = ENXIO;
472 		mutex_exit(&port->lock);
473 		if (rv != 0) {
474 			if (rv == ECONNRESET) {
475 				break;
476 			} else if (rv != EAGAIN) {
477 				cmn_err(CE_NOTE, "ds@%lx: %s: %s" DS_EOL,
478 				    PORTID(port), __func__,
479 				    ds_errno_to_str(rv, ebuf));
480 				break;
481 			}
482 		} else {
483 			if (nbytes != 0) {
484 				DS_DBG_LDC(CE_NOTE, "ds@%lx: "
485 				    "read %ld bytes, %d retries" DS_EOL,
486 				    PORTID(port), nbytes, retry_count);
487 
488 				*sizep += nbytes;
489 				msgp += nbytes;
490 				bytes_left -= nbytes;
491 
492 				/* reset counter on a successful read */
493 				retry_count = 0;
494 				continue;
495 			}
496 
497 			/*
498 			 * No data was read. Check if this is the
499 			 * first attempt. If so, just return since
500 			 * nothing has been read yet.
501 			 */
502 			if (bytes_left == bytes_req) {
503 				DS_DBG_LDC(CE_NOTE, "ds@%lx: read zero bytes, "
504 				    " no data available" DS_EOL, PORTID(port));
505 				break;
506 			}
507 		}
508 
509 		/*
510 		 * A retry is necessary because the read returned
511 		 * EAGAIN, or a zero length read occurred after
512 		 * reading a partial message.
513 		 */
514 		if (retry_count++ >= ds_retries) {
515 			DS_DBG_LDC(CE_NOTE, "ds@%lx: timed out waiting for "
516 			    "message" DS_EOL, PORTID(port));
517 			break;
518 		}
519 
520 		drv_usecwait(ds_delay);
521 	}
522 
523 	return (rv);
524 }
525 
526 static void
527 ds_handle_recv(void *arg)
528 {
529 	ds_port_t	*port = (ds_port_t *)arg;
530 	char		*hbuf;
531 	size_t		msglen;
532 	size_t		read_size;
533 	boolean_t	hasdata;
534 	ds_hdr_t	hdr;
535 	uint8_t		*msg;
536 	char		*currp;
537 	int		rv;
538 	ds_event_t	*devent;
539 
540 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s..." DS_EOL, PORTID(port), __func__);
541 
542 	/*
543 	 * Read messages from the channel until there are none
544 	 * pending. Valid messages are dispatched to be handled
545 	 * by a separate thread while any malformed messages are
546 	 * dropped.
547 	 */
548 
549 	mutex_enter(&port->rcv_lock);
550 
551 	for (;;) {
552 		mutex_enter(&port->lock);
553 		if (port->ldc.state == LDC_UP) {
554 			rv = ldc_chkq(port->ldc.hdl, &hasdata);
555 		} else
556 			rv = ENXIO;
557 		mutex_exit(&port->lock);
558 		if (rv != 0 || !hasdata)
559 			break;
560 
561 		DS_DBG(CE_NOTE, "ds@%lx: %s: reading next message" DS_EOL,
562 		    PORTID(port), __func__);
563 
564 		/*
565 		 * Read in the next message.
566 		 */
567 		hbuf = (char *)&hdr;
568 		bzero(hbuf, DS_HDR_SZ);
569 		read_size = DS_HDR_SZ;
570 		currp = hbuf;
571 
572 		/* read in the message header */
573 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
574 			break;
575 		}
576 
577 		if (read_size < DS_HDR_SZ) {
578 			/*
579 			 * A zero length read is a valid signal that
580 			 * there is no data left on the channel.
581 			 */
582 			if (read_size != 0) {
583 				cmn_err(CE_WARN, "ds@%lx: invalid message "
584 				    "length, received %ld bytes, expected %ld"
585 				    DS_EOL, PORTID(port), read_size, DS_HDR_SZ);
586 			}
587 			continue;
588 		}
589 
590 		/* get payload size and allocate a buffer */
591 		read_size = ((ds_hdr_t *)hbuf)->payload_len;
592 		msglen = DS_HDR_SZ + read_size;
593 		msg = DS_MALLOC(msglen);
594 		if (!msg) {
595 			cmn_err(CE_WARN, "Memory allocation failed attempting "
596 			    " to allocate %d bytes." DS_EOL, (int)msglen);
597 			continue;
598 		}
599 
600 		DS_DBG(CE_NOTE, "ds@%lx: %s: message payload len %d" DS_EOL,
601 		    PORTID(port), __func__, (int)read_size);
602 
603 		/* move message header into buffer */
604 		(void) memcpy(msg, hbuf, DS_HDR_SZ);
605 		currp = (char *)(msg) + DS_HDR_SZ;
606 
607 		/* read in the message body */
608 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
609 			DS_FREE(msg, msglen);
610 			break;
611 		}
612 
613 		/* validate the size of the message */
614 		if ((DS_HDR_SZ + read_size) != msglen) {
615 			cmn_err(CE_WARN, "ds@%lx: %s: invalid message length, "
616 			    "received %ld bytes, expected %ld" DS_EOL,
617 			    PORTID(port), __func__, (DS_HDR_SZ + read_size),
618 			    msglen);
619 			DS_FREE(msg, msglen);
620 			continue;
621 		}
622 
623 		DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
624 
625 		/*
626 		 * Send the message for processing, and store it
627 		 * in the log. The memory is deallocated only when
628 		 * the message is removed from the log.
629 		 */
630 
631 		devent = DS_MALLOC(sizeof (ds_event_t));
632 		devent->port = port;
633 		devent->buf = (char *)msg;
634 		devent->buflen = msglen;
635 
636 		/* log the message */
637 		(void) ds_log_add_msg(DS_LOG_IN(port->id), msg, msglen);
638 
639 		if (ds_sys_dispatch_func(ds_dispatch_event, devent)) {
640 			cmn_err(CE_WARN, "ds@%lx: error initiating "
641 			    "event handler", PORTID(port));
642 			DS_FREE(devent, sizeof (ds_event_t));
643 		}
644 	}
645 
646 	mutex_exit(&port->rcv_lock);
647 
648 	/* handle connection reset errors returned from ds_recv_msg */
649 	if (rv == ECONNRESET) {
650 		ds_handle_down_reset_events(port);
651 	}
652 
653 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s done" DS_EOL, PORTID(port), __func__);
654 }
655 
656 static void
657 ds_dispatch_event(void *arg)
658 {
659 	ds_event_t	*event = (ds_event_t *)arg;
660 	ds_hdr_t	*hdr;
661 	ds_port_t	*port;
662 
663 	port = event->port;
664 
665 	hdr = (ds_hdr_t *)event->buf;
666 
667 	if (DS_MSG_TYPE_VALID(hdr->msg_type)) {
668 		DS_DBG(CE_NOTE, "ds@%lx: dispatch_event: msg_type=%d" DS_EOL,
669 		    PORTID(port), hdr->msg_type);
670 
671 		(*ds_msg_handlers[hdr->msg_type])(port, event->buf,
672 		    event->buflen);
673 	} else {
674 		cmn_err(CE_WARN, "ds@%lx: dispatch_event: invalid msg "
675 		    "type (%d)" DS_EOL, PORTID(port), hdr->msg_type);
676 	}
677 
678 	DS_FREE(event->buf, event->buflen);
679 	DS_FREE(event, sizeof (ds_event_t));
680 }
681 
682 int
683 ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen)
684 {
685 	int	rv;
686 	caddr_t	currp = msg;
687 	size_t	amt_left = msglen;
688 	int	loopcnt = 0;
689 
690 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s msglen: %ld" DS_EOL, PORTID(port),
691 	    __func__, msglen);
692 	DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
693 
694 	/*
695 	 * Ensure that no other messages can be sent on this port by holding
696 	 * the tx_lock mutex in case the write doesn't get sent with one write.
697 	 * This guarantees that the message doesn't become fragmented.
698 	 */
699 	mutex_enter(&port->tx_lock);
700 
701 	do {
702 		mutex_enter(&port->lock);
703 		if (port->ldc.state == LDC_UP) {
704 			rv = ldc_write(port->ldc.hdl, currp, &msglen);
705 		} else
706 			rv = ENXIO;
707 		mutex_exit(&port->lock);
708 		if (rv != 0) {
709 			if (rv == ECONNRESET) {
710 				mutex_exit(&port->tx_lock);
711 				ds_handle_down_reset_events(port);
712 				return (rv);
713 			} else if ((rv == EWOULDBLOCK) &&
714 			    (loopcnt++ < ds_retries)) {
715 				drv_usecwait(ds_delay);
716 			} else {
717 				cmn_err(CE_WARN, "ds@%lx: send_msg: ldc_write "
718 				    "failed (%d), %d bytes remaining" DS_EOL,
719 				    PORTID(port), rv, (int)amt_left);
720 				goto error;
721 			}
722 		} else {
723 			amt_left -= msglen;
724 			currp += msglen;
725 			msglen = amt_left;
726 			loopcnt = 0;
727 		}
728 	} while (amt_left > 0);
729 error:
730 	mutex_exit(&port->tx_lock);
731 
732 	return (rv);
733 }
734 
735 /* END LDC SUPPORT FUNCTIONS */
736 
737 
738 /* BEGIN DS PROTOCOL SUPPORT FUNCTIONS */
739 
740 static void
741 ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len)
742 {
743 	ds_hdr_t	*hdr;
744 	ds_init_ack_t	*ack;
745 	ds_init_nack_t	*nack;
746 	char		*msg;
747 	size_t		msglen;
748 	ds_init_req_t	*req;
749 	size_t		explen = DS_MSG_LEN(ds_init_req_t);
750 	uint16_t	new_major;
751 	uint16_t	new_minor;
752 	boolean_t	match;
753 
754 	/* sanity check the incoming message */
755 	if (len != explen) {
756 		cmn_err(CE_WARN, "ds@%lx: <init_req: invalid message "
757 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
758 		    explen);
759 		return;
760 	}
761 
762 	req = (ds_init_req_t *)(buf + DS_HDR_SZ);
763 
764 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_req: ver=%d.%d" DS_EOL,
765 	    PORTID(port), req->major_vers, req->minor_vers);
766 
767 	match = negotiate_version(DS_NUM_VER, &ds_vers[0],
768 	    req->major_vers, &new_major, &new_minor);
769 
770 	/*
771 	 * Check version info. ACK only if the major numbers exactly
772 	 * match. The service entity can retry with a new minor
773 	 * based on the response sent as part of the NACK.
774 	 */
775 	if (match) {
776 		msglen = DS_MSG_LEN(ds_init_ack_t);
777 		msg = DS_MALLOC(msglen);
778 
779 		hdr = (ds_hdr_t *)msg;
780 		hdr->msg_type = DS_INIT_ACK;
781 		hdr->payload_len = sizeof (ds_init_ack_t);
782 
783 		ack = (ds_init_ack_t *)(msg + DS_HDR_SZ);
784 		ack->minor_vers = MIN(new_minor, req->minor_vers);
785 
786 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_ack>: minor=0x%04X" DS_EOL,
787 		    PORTID(port), MIN(new_minor, req->minor_vers));
788 	} else {
789 		msglen = DS_MSG_LEN(ds_init_nack_t);
790 		msg = DS_MALLOC(msglen);
791 
792 		hdr = (ds_hdr_t *)msg;
793 		hdr->msg_type = DS_INIT_NACK;
794 		hdr->payload_len = sizeof (ds_init_nack_t);
795 
796 		nack = (ds_init_nack_t *)(msg + DS_HDR_SZ);
797 		nack->major_vers = new_major;
798 
799 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_nack>: major=0x%04X" DS_EOL,
800 		    PORTID(port), new_major);
801 	}
802 
803 	/*
804 	 * Send the response
805 	 */
806 	(void) ds_send_msg(port, msg, msglen);
807 	DS_FREE(msg, msglen);
808 }
809 
810 static void
811 ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len)
812 {
813 	ds_init_ack_t	*ack;
814 	ds_ver_t	*ver;
815 	size_t		explen = DS_MSG_LEN(ds_init_ack_t);
816 
817 	/* sanity check the incoming message */
818 	if (len != explen) {
819 		cmn_err(CE_WARN, "ds@%lx: <init_ack: invalid message "
820 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
821 		    explen);
822 		return;
823 	}
824 
825 	ack = (ds_init_ack_t *)(buf + DS_HDR_SZ);
826 
827 	mutex_enter(&port->lock);
828 
829 	if (port->state != DS_PORT_INIT_REQ) {
830 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: invalid state: %d"
831 		    DS_EOL, PORTID(port), port->state);
832 		mutex_exit(&port->lock);
833 		return;
834 	}
835 
836 	ver = &(ds_vers[port->ver_idx]);
837 
838 	/* agreed upon a major version */
839 	port->ver.major = ver->major;
840 
841 	/*
842 	 * If the returned minor version is larger than
843 	 * the requested minor version, use the lower of
844 	 * the two, i.e. the requested version.
845 	 */
846 	if (ack->minor_vers >= ver->minor) {
847 		/*
848 		 * Use the minor version specified in the
849 		 * original request.
850 		 */
851 		port->ver.minor = ver->minor;
852 	} else {
853 		/*
854 		 * Use the lower minor version returned in
855 		 * the ack. By definition, all lower minor
856 		 * versions must be supported.
857 		 */
858 		port->ver.minor = ack->minor_vers;
859 	}
860 
861 	port->state = DS_PORT_READY;
862 
863 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: port ready v%d.%d" DS_EOL,
864 	    PORTID(port), port->ver.major, port->ver.minor);
865 
866 	mutex_exit(&port->lock);
867 
868 	/*
869 	 * The port came up, so update all the services
870 	 * with this information. Follow that up with an
871 	 * attempt to register any service that is not
872 	 * already registered.
873 	 */
874 	mutex_enter(&ds_svcs.lock);
875 
876 	(void) ds_walk_svcs(ds_svc_port_up, port);
877 	(void) ds_walk_svcs(ds_svc_register, NULL);
878 
879 	mutex_exit(&ds_svcs.lock);
880 }
881 
882 static void
883 ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len)
884 {
885 	int		idx;
886 	ds_init_nack_t	*nack;
887 	ds_ver_t	*ver;
888 	size_t		explen = DS_MSG_LEN(ds_init_nack_t);
889 
890 	/* sanity check the incoming message */
891 	if (len != explen) {
892 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: invalid message "
893 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
894 		    explen);
895 		return;
896 	}
897 
898 	nack = (ds_init_nack_t *)(buf + DS_HDR_SZ);
899 
900 	mutex_enter(&port->lock);
901 
902 	if (port->state != DS_PORT_INIT_REQ) {
903 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: invalid state: %d"
904 		    DS_EOL, PORTID(port), port->state);
905 		mutex_exit(&port->lock);
906 		return;
907 	}
908 
909 	ver = &(ds_vers[port->ver_idx]);
910 
911 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: req=v%d.%d, nack=v%d.x"
912 	    DS_EOL, PORTID(port), ver->major, ver->minor, nack->major_vers);
913 
914 	if (nack->major_vers == 0) {
915 		/* no supported protocol version */
916 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS not supported"
917 		    DS_EOL, PORTID(port));
918 		mutex_exit(&port->lock);
919 		return;
920 	}
921 
922 	/*
923 	 * Walk the version list, looking for a major version
924 	 * that is as close to the requested major version as
925 	 * possible.
926 	 */
927 	for (idx = port->ver_idx; idx < DS_NUM_VER; idx++) {
928 		if (ds_vers[idx].major <= nack->major_vers) {
929 			/* found a version to try */
930 			goto done;
931 		}
932 	}
933 
934 	if (idx == DS_NUM_VER) {
935 		/* no supported version */
936 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS v%d.x not "
937 		    "supported" DS_EOL, PORTID(port), nack->major_vers);
938 
939 		mutex_exit(&port->lock);
940 		return;
941 	}
942 
943 done:
944 	/* start the handshake again */
945 	port->ver_idx = idx;
946 	port->state = DS_PORT_LDC_INIT;
947 	mutex_exit(&port->lock);
948 
949 	ds_send_init_req(port);
950 
951 }
952 
953 static ds_svc_t *
954 ds_find_svc_by_id_port(char *svc_id, int is_client, ds_port_t *port)
955 {
956 	int		idx;
957 	ds_svc_t	*svc, *found_svc = 0;
958 	uint32_t	flag_match = is_client ? DSSF_ISCLIENT : 0;
959 
960 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
961 
962 	/* walk every table entry */
963 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
964 		svc = ds_svcs.tbl[idx];
965 		if (DS_SVC_ISFREE(svc))
966 			continue;
967 		if (strcmp(svc->cap.svc_id, svc_id) != 0)
968 			continue;
969 		if ((svc->flags & DSSF_ISCLIENT) != flag_match)
970 			continue;
971 		if (port != NULL && svc->port == port) {
972 			return (svc);
973 		} else if (svc->state == DS_SVC_INACTIVE) {
974 			found_svc = svc;
975 		} else if (!found_svc) {
976 			found_svc = svc;
977 		}
978 	}
979 
980 	return (found_svc);
981 }
982 
983 static void
984 ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len)
985 {
986 	ds_reg_req_t	*req;
987 	ds_hdr_t	*hdr;
988 	ds_reg_ack_t	*ack;
989 	ds_reg_nack_t	*nack;
990 	char		*msg;
991 	size_t		msglen;
992 	size_t		explen = DS_MSG_LEN(ds_reg_req_t);
993 	ds_svc_t	*svc = NULL;
994 	ds_ver_t	version;
995 	uint16_t	new_major;
996 	uint16_t	new_minor;
997 	boolean_t	match;
998 
999 	/* sanity check the incoming message */
1000 	if (len < explen) {
1001 		cmn_err(CE_WARN, "ds@%lx: <reg_req: invalid message "
1002 		    "length (%ld), expected at least %ld" DS_EOL,
1003 		    PORTID(port), len, explen);
1004 		return;
1005 	}
1006 
1007 	req = (ds_reg_req_t *)(buf + DS_HDR_SZ);
1008 
1009 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' ver=%d.%d, hdl=0x%llx"
1010 	    DS_EOL, PORTID(port), req->svc_id, req->major_vers, req->minor_vers,
1011 	    (u_longlong_t)req->svc_handle);
1012 
1013 	mutex_enter(&ds_svcs.lock);
1014 	svc = ds_find_svc_by_id_port(req->svc_id,
1015 	    DS_HDL_ISCLIENT(req->svc_handle) == 0, port);
1016 	if (svc == NULL) {
1017 
1018 do_reg_nack:
1019 		mutex_exit(&ds_svcs.lock);
1020 
1021 		msglen = DS_MSG_LEN(ds_reg_nack_t);
1022 		msg = DS_MALLOC(msglen);
1023 
1024 		hdr = (ds_hdr_t *)msg;
1025 		hdr->msg_type = DS_REG_NACK;
1026 		hdr->payload_len = sizeof (ds_reg_nack_t);
1027 
1028 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1029 		nack->svc_handle = req->svc_handle;
1030 		nack->result = DS_REG_VER_NACK;
1031 		nack->major_vers = 0;
1032 
1033 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s'" DS_EOL,
1034 		    PORTID(port), req->svc_id);
1035 		/*
1036 		 * Send the response
1037 		 */
1038 		(void) ds_send_msg(port, msg, msglen);
1039 		DS_FREE(msg, msglen);
1040 		return;
1041 	}
1042 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' found, hdl: 0x%llx" DS_EOL,
1043 	    PORTID(port), req->svc_id, (u_longlong_t)svc->hdl);
1044 
1045 	/*
1046 	 * A client sends out a reg req in order to force service providers to
1047 	 * initiate a reg req from their end (limitation in the protocol).  We
1048 	 * expect the service provider to be in the inactive (DS_SVC_INACTIVE)
1049 	 * state.  If the service provider has already sent out a reg req (the
1050 	 * state is DS_SVC_REG_PENDING) or has already handshaken (the
1051 	 * state is DS_SVC_ACTIVE), then we can simply ignore this reg
1052 	 * req.  For any other state, we force an unregister before initiating
1053 	 * a reg req.
1054 	 */
1055 
1056 	if (DS_HDL_ISCLIENT(req->svc_handle)) {
1057 		switch (svc->state) {
1058 
1059 		case DS_SVC_REG_PENDING:
1060 		case DS_SVC_ACTIVE:
1061 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1062 			    "client, state (%x)" DS_EOL, PORTID(port),
1063 			    req->svc_id, svc->state);
1064 			mutex_exit(&ds_svcs.lock);
1065 			return;
1066 
1067 		case DS_SVC_INACTIVE:
1068 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1069 			    "client" DS_EOL, PORTID(port), req->svc_id);
1070 			break;
1071 
1072 		default:
1073 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1074 			    "client forced unreg, state (%x)" DS_EOL,
1075 			    PORTID(port), req->svc_id, svc->state);
1076 			(void) ds_svc_unregister(svc, port);
1077 			break;
1078 		}
1079 		(void) ds_svc_port_up(svc, port);
1080 		(void) ds_svc_register_onport(svc, port);
1081 		mutex_exit(&ds_svcs.lock);
1082 		return;
1083 	}
1084 
1085 	/*
1086 	 * Only remote service providers can initiate a registration.  The
1087 	 * local sevice from here must be a client service.
1088 	 */
1089 
1090 	match = negotiate_version(svc->cap.nvers, svc->cap.vers,
1091 	    req->major_vers, &new_major, &new_minor);
1092 
1093 	/*
1094 	 * Check version info. ACK only if the major numbers exactly
1095 	 * match. The service entity can retry with a new minor
1096 	 * based on the response sent as part of the NACK.
1097 	 */
1098 	if (match) {
1099 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' svc%d: state: %x "
1100 		    "svc_portid: %d" DS_EOL, PORTID(port), req->svc_id,
1101 		    (int)DS_HDL2IDX(svc->hdl), svc->state,
1102 		    (int)(svc->port == NULL ? -1 : PORTID(svc->port)));
1103 		/*
1104 		 * If the current local service is already in use and
1105 		 * it's not on this port, clone it.
1106 		 */
1107 		if (svc->state != DS_SVC_INACTIVE) {
1108 			if (svc->port != NULL && port == svc->port) {
1109 				/*
1110 				 * Someone probably dropped an unreg req
1111 				 * somewhere.  Force a local unreg.
1112 				 */
1113 				(void) ds_svc_unregister(svc, port);
1114 			} else if (!DS_HDL_ISCLIENT(svc->hdl)) {
1115 				/*
1116 				 * Can't clone a non-client (service provider)
1117 				 * handle.  This is because old in-kernel
1118 				 * service providers can't deal with multiple
1119 				 * handles.
1120 				 */
1121 				goto do_reg_nack;
1122 			} else {
1123 				svc = ds_svc_clone(svc);
1124 			}
1125 		}
1126 		svc->port = port;
1127 		svc->svc_hdl = req->svc_handle;
1128 		svc->state = DS_SVC_ACTIVE;
1129 
1130 		msglen = DS_MSG_LEN(ds_reg_ack_t);
1131 		msg = DS_MALLOC(msglen);
1132 
1133 		hdr = (ds_hdr_t *)msg;
1134 		hdr->msg_type = DS_REG_ACK;
1135 		hdr->payload_len = sizeof (ds_reg_ack_t);
1136 
1137 		ack = (ds_reg_ack_t *)(msg + DS_HDR_SZ);
1138 		ack->svc_handle = req->svc_handle;
1139 		ack->minor_vers = MIN(new_minor, req->minor_vers);
1140 
1141 
1142 		if (svc->ops.ds_reg_cb) {
1143 			/* Call the registration callback */
1144 			version.major = req->major_vers;
1145 			version.minor = ack->minor_vers;
1146 			(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &version,
1147 			    svc->hdl);
1148 		}
1149 		mutex_exit(&ds_svcs.lock);
1150 
1151 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_ack>: '%s' minor=0x%04X"
1152 		    DS_EOL, PORTID(port), svc->cap.svc_id,
1153 		    MIN(new_minor, req->minor_vers));
1154 	} else {
1155 		mutex_exit(&ds_svcs.lock);
1156 
1157 		msglen = DS_MSG_LEN(ds_reg_nack_t);
1158 		msg = DS_MALLOC(msglen);
1159 
1160 		hdr = (ds_hdr_t *)msg;
1161 		hdr->msg_type = DS_REG_NACK;
1162 		hdr->payload_len = sizeof (ds_reg_nack_t);
1163 
1164 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1165 		nack->svc_handle = req->svc_handle;
1166 		nack->result = DS_REG_VER_NACK;
1167 		nack->major_vers = new_major;
1168 
1169 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s' major=0x%04X"
1170 		    DS_EOL, PORTID(port), svc->cap.svc_id, new_major);
1171 	}
1172 
1173 	/* send message */
1174 	(void) ds_send_msg(port, msg, msglen);
1175 	DS_FREE(msg, msglen);
1176 }
1177 
1178 static void
1179 ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len)
1180 {
1181 	ds_reg_ack_t	*ack;
1182 	ds_ver_t	*ver;
1183 	ds_ver_t	tmpver;
1184 	ds_svc_t	*svc;
1185 	size_t		explen = DS_MSG_LEN(ds_reg_ack_t);
1186 
1187 	/* sanity check the incoming message */
1188 	if (len != explen) {
1189 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid message "
1190 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1191 		    explen);
1192 		return;
1193 	}
1194 
1195 	ack = (ds_reg_ack_t *)(buf + DS_HDR_SZ);
1196 
1197 	mutex_enter(&ds_svcs.lock);
1198 
1199 	/*
1200 	 * This searches for service based on how we generate handles
1201 	 * and so only works because this is a reg ack.
1202 	 */
1203 	if (DS_HDL_ISCLIENT(ack->svc_handle) ||
1204 	    (svc = ds_get_svc(ack->svc_handle)) == NULL) {
1205 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid handle 0x%llx"
1206 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1207 		goto done;
1208 	}
1209 
1210 	/* make sure the message makes sense */
1211 	if (svc->state != DS_SVC_REG_PENDING) {
1212 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid state (%d)" DS_EOL,
1213 		    PORTID(port), svc->state);
1214 		goto done;
1215 	}
1216 
1217 	ver = &(svc->cap.vers[svc->ver_idx]);
1218 
1219 	/* major version has been agreed upon */
1220 	svc->ver.major = ver->major;
1221 
1222 	if (ack->minor_vers >= ver->minor) {
1223 		/*
1224 		 * Use the minor version specified in the
1225 		 * original request.
1226 		 */
1227 		svc->ver.minor = ver->minor;
1228 	} else {
1229 		/*
1230 		 * Use the lower minor version returned in
1231 		 * the ack. By defninition, all lower minor
1232 		 * versions must be supported.
1233 		 */
1234 		svc->ver.minor = ack->minor_vers;
1235 	}
1236 
1237 	svc->state = DS_SVC_ACTIVE;
1238 	svc->port = port;
1239 
1240 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_ack: '%s' v%d.%d ready, hdl=0x%llx"
1241 	    DS_EOL, PORTID(port), svc->cap.svc_id, svc->ver.major,
1242 	    svc->ver.minor, (u_longlong_t)svc->hdl);
1243 
1244 	/* notify the client that registration is complete */
1245 	if (svc->ops.ds_reg_cb) {
1246 		/*
1247 		 * Use a temporary version structure so that
1248 		 * the copy in the svc structure cannot be
1249 		 * modified by the client.
1250 		 */
1251 		tmpver.major = svc->ver.major;
1252 		tmpver.minor = svc->ver.minor;
1253 
1254 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &tmpver, svc->hdl);
1255 	}
1256 
1257 done:
1258 	mutex_exit(&ds_svcs.lock);
1259 }
1260 
1261 static void
1262 ds_try_next_port(ds_svc_t *svc, int portid)
1263 {
1264 	ds_port_t *port;
1265 	ds_portset_t totry;
1266 	int i;
1267 
1268 	DS_DBG_LDC(CE_NOTE, "ds@%x %s" DS_EOL, portid, __func__);
1269 
1270 	/*
1271 	 * Get the ports that haven't been tried yet and are available to try.
1272 	 */
1273 	DS_PORTSET_SETNULL(totry);
1274 	for (i = 0; i < DS_MAX_PORTS; i++) {
1275 		if (!DS_PORT_IN_SET(svc->tried, i) &&
1276 		    DS_PORT_IN_SET(svc->avail, i))
1277 			DS_PORTSET_ADD(totry, i);
1278 	}
1279 
1280 	if (DS_PORTSET_ISNULL(totry))
1281 		return;
1282 
1283 	for (i = 0; i < DS_MAX_PORTS; i++, portid++) {
1284 		if (portid >= DS_MAX_PORTS) {
1285 			portid = 0;
1286 		}
1287 
1288 		/*
1289 		 * If the port is not in the available list,
1290 		 * it is not a candidate for registration.
1291 		 */
1292 		if (!DS_PORT_IN_SET(totry, portid)) {
1293 			continue;
1294 		}
1295 
1296 		port = &ds_ports[portid];
1297 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s trying ldc.id: %d" DS_EOL,
1298 		    portid, __func__, (uint_t)(port->ldc.id));
1299 		if (ds_send_reg_req(svc, port) == 0) {
1300 			DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send OK" DS_EOL,
1301 			    portid, __func__);
1302 			/* register sent successfully */
1303 			break;
1304 		}
1305 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send FAIL" DS_EOL,
1306 		    portid, __func__);
1307 
1308 		/* reset the service to try the next port */
1309 		ds_reset_svc(svc, port);
1310 	}
1311 }
1312 
1313 static void
1314 ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len)
1315 {
1316 	ds_reg_nack_t	*nack;
1317 	ds_svc_t	*svc;
1318 	int		idx;
1319 	size_t		explen = DS_MSG_LEN(ds_reg_nack_t);
1320 
1321 	/* sanity check the incoming message */
1322 	if (len != explen) {
1323 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid message "
1324 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1325 		    explen);
1326 		return;
1327 	}
1328 
1329 	nack = (ds_reg_nack_t *)(buf + DS_HDR_SZ);
1330 
1331 	mutex_enter(&ds_svcs.lock);
1332 
1333 	/*
1334 	 * We expect a reg_nack for a client ping.
1335 	 */
1336 	if (DS_HDL_ISCLIENT(nack->svc_handle)) {
1337 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: ping hdl: 0x%llx"
1338 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1339 		goto done;
1340 	}
1341 
1342 	/*
1343 	 * This searches for service based on how we generate handles
1344 	 * and so only works because this is a reg nack.
1345 	 */
1346 	if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1347 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid handle 0x%llx"
1348 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1349 		goto done;
1350 	}
1351 
1352 	/* make sure the message makes sense */
1353 	if (svc->state != DS_SVC_REG_PENDING) {
1354 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid state (%d)" DS_EOL,
1355 		    PORTID(port), svc->state);
1356 		goto done;
1357 	}
1358 
1359 	if (nack->result == DS_REG_DUP) {
1360 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: duplicate registration "
1361 		    " for %s" DS_EOL, PORTID(port), svc->cap.svc_id);
1362 		ds_reset_svc(svc, port);
1363 		goto done;
1364 	}
1365 
1366 	/*
1367 	 * A major version of zero indicates that the
1368 	 * service is not supported at all.
1369 	 */
1370 	if (nack->major_vers == 0) {
1371 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' not supported"
1372 		    DS_EOL, PORTID(port), svc->cap.svc_id);
1373 		ds_reset_svc(svc, port);
1374 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1375 			ds_try_next_port(svc, PORTID(port) + 1);
1376 		goto done;
1377 	}
1378 
1379 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' hdl=0x%llx, nack=%d.x"
1380 	    DS_EOL, PORTID(port), svc->cap.svc_id,
1381 	    (u_longlong_t)nack->svc_handle, nack->major_vers);
1382 
1383 	/*
1384 	 * Walk the version list for the service, looking for
1385 	 * a major version that is as close to the requested
1386 	 * major version as possible.
1387 	 */
1388 	for (idx = svc->ver_idx; idx < svc->cap.nvers; idx++) {
1389 		if (svc->cap.vers[idx].major <= nack->major_vers) {
1390 			/* found a version to try */
1391 			break;
1392 		}
1393 	}
1394 
1395 	if (idx == svc->cap.nvers) {
1396 		/* no supported version */
1397 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: %s v%d.x not supported"
1398 		    DS_EOL, PORTID(port), svc->cap.svc_id, nack->major_vers);
1399 		ds_reset_svc(svc, port);
1400 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1401 			ds_try_next_port(svc, PORTID(port) + 1);
1402 		goto done;
1403 	}
1404 
1405 	/* start the handshake again */
1406 	svc->state = DS_SVC_INACTIVE;
1407 	svc->ver_idx = idx;
1408 
1409 	(void) ds_svc_register(svc, NULL);
1410 
1411 done:
1412 	mutex_exit(&ds_svcs.lock);
1413 }
1414 
1415 static void
1416 ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len)
1417 {
1418 	ds_hdr_t	*hdr;
1419 	ds_unreg_req_t	*req;
1420 	ds_unreg_ack_t	*ack;
1421 	ds_svc_t	*svc;
1422 	char		*msg;
1423 	size_t		msglen;
1424 	size_t		explen = DS_MSG_LEN(ds_unreg_req_t);
1425 	boolean_t	is_up;
1426 
1427 	/* sanity check the incoming message */
1428 	if (len != explen) {
1429 		cmn_err(CE_WARN, "ds@%lx: <unreg_req: invalid message "
1430 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1431 		    explen);
1432 		return;
1433 	}
1434 
1435 	req = (ds_unreg_req_t *)(buf + DS_HDR_SZ);
1436 
1437 	mutex_enter(&ds_svcs.lock);
1438 
1439 	/* lookup appropriate client or service */
1440 	if (DS_HDL_ISCLIENT(req->svc_handle) ||
1441 	    ((svc = ds_find_clnt_svc_by_hdl_port(req->svc_handle, port))
1442 	    == NULL && ((svc = ds_get_svc(req->svc_handle)) == NULL ||
1443 	    svc->port != port))) {
1444 		mutex_exit(&ds_svcs.lock);
1445 		mutex_enter(&port->lock);
1446 		is_up = (port->ldc.state == LDC_UP);
1447 		mutex_exit(&port->lock);
1448 		if (!is_up)
1449 			return;
1450 		cmn_err(CE_WARN, "ds@%lx: <unreg_req: invalid handle 0x%llx"
1451 		    DS_EOL, PORTID(port), (u_longlong_t)req->svc_handle);
1452 		ds_send_unreg_nack(port, req->svc_handle);
1453 		return;
1454 	}
1455 
1456 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_req: '%s' handle 0x%llx" DS_EOL,
1457 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1458 
1459 	(void) ds_svc_unregister(svc, svc->port);
1460 
1461 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_ack>: '%s' hdl=0x%llx" DS_EOL,
1462 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1463 
1464 	ds_check_for_dup_services(svc);
1465 
1466 	mutex_exit(&ds_svcs.lock);
1467 
1468 	msglen = DS_HDR_SZ + sizeof (ds_unreg_ack_t);
1469 	msg = DS_MALLOC(msglen);
1470 
1471 	hdr = (ds_hdr_t *)msg;
1472 	hdr->msg_type = DS_UNREG_ACK;
1473 	hdr->payload_len = sizeof (ds_unreg_ack_t);
1474 
1475 	ack = (ds_unreg_ack_t *)(msg + DS_HDR_SZ);
1476 	ack->svc_handle = req->svc_handle;
1477 
1478 	/* send message */
1479 	(void) ds_send_msg(port, msg, msglen);
1480 	DS_FREE(msg, msglen);
1481 
1482 }
1483 
1484 static void
1485 ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len)
1486 {
1487 	ds_unreg_ack_t	*ack;
1488 	size_t		explen = DS_MSG_LEN(ds_unreg_ack_t);
1489 
1490 	/* sanity check the incoming message */
1491 	if (len != explen) {
1492 		cmn_err(CE_WARN, "ds@%lx: <unreg_ack: invalid message "
1493 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1494 		    explen);
1495 		return;
1496 	}
1497 
1498 	ack = (ds_unreg_ack_t *)(buf + DS_HDR_SZ);
1499 
1500 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: hdl=0x%llx" DS_EOL,
1501 	    PORTID(port), (u_longlong_t)ack->svc_handle);
1502 
1503 #ifdef DEBUG
1504 	mutex_enter(&ds_svcs.lock);
1505 
1506 	/*
1507 	 * Since the unregister request was initiated locally,
1508 	 * the service structure has already been torn down.
1509 	 * Just perform a sanity check to make sure the message
1510 	 * is appropriate.
1511 	 */
1512 	if (ds_get_svc(ack->svc_handle) != NULL) {
1513 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: handle 0x%llx in use"
1514 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1515 	}
1516 
1517 	mutex_exit(&ds_svcs.lock);
1518 #endif	/* DEBUG */
1519 }
1520 
1521 static void
1522 ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len)
1523 {
1524 	ds_unreg_nack_t	*nack;
1525 	size_t		explen = DS_MSG_LEN(ds_unreg_nack_t);
1526 
1527 	/* sanity check the incoming message */
1528 	if (len != explen) {
1529 		cmn_err(CE_WARN, "ds@%lx: <unreg_nack: invalid message "
1530 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1531 		    explen);
1532 		return;
1533 	}
1534 
1535 	nack = (ds_unreg_nack_t *)(buf + DS_HDR_SZ);
1536 
1537 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: hdl=0x%llx" DS_EOL,
1538 	    PORTID(port), (u_longlong_t)nack->svc_handle);
1539 
1540 #ifdef DEBUG
1541 	mutex_enter(&ds_svcs.lock);
1542 
1543 	/*
1544 	 * Since the unregister request was initiated locally,
1545 	 * the service structure has already been torn down.
1546 	 * Just perform a sanity check to make sure the message
1547 	 * is appropriate.
1548 	 */
1549 	if (ds_get_svc(nack->svc_handle) != NULL) {
1550 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: handle 0x%llx in use"
1551 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1552 	}
1553 
1554 	mutex_exit(&ds_svcs.lock);
1555 #endif	/* DEBUG */
1556 }
1557 
1558 static void
1559 ds_handle_data(ds_port_t *port, caddr_t buf, size_t len)
1560 {
1561 	ds_data_handle_t	*data;
1562 	ds_svc_t		*svc;
1563 	char			*msg;
1564 	int			msgsz;
1565 	int			hdrsz;
1566 	size_t			explen = DS_MSG_LEN(ds_data_handle_t);
1567 
1568 	/* sanity check the incoming message */
1569 	if (len < explen) {
1570 		cmn_err(CE_WARN, "ds@%lx: <data: invalid message length "
1571 		    "(%ld), expected at least %ld" DS_EOL, PORTID(port), len,
1572 		    explen);
1573 		return;
1574 	}
1575 
1576 	data = (ds_data_handle_t *)(buf + DS_HDR_SZ);
1577 
1578 	hdrsz = DS_HDR_SZ + sizeof (ds_data_handle_t);
1579 	msgsz = len - hdrsz;
1580 
1581 	/* strip off the header for the client */
1582 	msg = (msgsz) ? (buf + hdrsz) : NULL;
1583 
1584 	mutex_enter(&ds_svcs.lock);
1585 
1586 	if ((svc = ds_find_clnt_svc_by_hdl_port(data->svc_handle, port))
1587 	    == NULL) {
1588 		if ((svc = ds_get_svc(data->svc_handle)) == NULL) {
1589 			mutex_exit(&ds_svcs.lock);
1590 			cmn_err(CE_WARN, "ds@%lx: <data: invalid handle 0x%llx"
1591 			    DS_EOL, PORTID(port),
1592 			    (u_longlong_t)data->svc_handle);
1593 			ds_send_data_nack(port, data->svc_handle);
1594 			return;
1595 		}
1596 	}
1597 
1598 	mutex_exit(&ds_svcs.lock);
1599 
1600 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <data: '%s' hdl=0x%llx" DS_EOL,
1601 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)svc->hdl);
1602 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msgsz);
1603 
1604 	/* dispatch this message to the client */
1605 	(*svc->ops.ds_data_cb)(svc->ops.cb_arg, msg, msgsz);
1606 }
1607 
1608 static void
1609 ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len)
1610 {
1611 	ds_svc_t	*svc;
1612 	ds_data_nack_t	*nack;
1613 	size_t		explen = DS_MSG_LEN(ds_data_nack_t);
1614 
1615 	/* sanity check the incoming message */
1616 	if (len != explen) {
1617 		cmn_err(CE_WARN, "ds@%lx: <data_nack: invalid message "
1618 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1619 		    explen);
1620 		return;
1621 	}
1622 
1623 	nack = (ds_data_nack_t *)(buf + DS_HDR_SZ);
1624 
1625 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack: hdl=0x%llx, result=0x%llx"
1626 	    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle,
1627 	    (u_longlong_t)nack->result);
1628 
1629 	if (nack->result == DS_INV_HDL) {
1630 
1631 		mutex_enter(&ds_svcs.lock);
1632 
1633 		if ((svc = ds_find_clnt_svc_by_hdl_port(nack->svc_handle,
1634 		    port)) == NULL) {
1635 			if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1636 				mutex_exit(&ds_svcs.lock);
1637 				return;
1638 			}
1639 		}
1640 
1641 		cmn_err(CE_WARN, "ds@%lx: <data_nack: handle 0x%llx reported "
1642 		    " as invalid" DS_EOL, PORTID(port),
1643 		    (u_longlong_t)nack->svc_handle);
1644 
1645 		(void) ds_svc_unregister(svc, svc->port);
1646 
1647 		mutex_exit(&ds_svcs.lock);
1648 	}
1649 }
1650 
1651 /* Initialize the port */
1652 void
1653 ds_send_init_req(ds_port_t *port)
1654 {
1655 	ds_hdr_t	*hdr;
1656 	ds_init_req_t	*init_req;
1657 	size_t		msglen;
1658 	ds_ver_t	*vers = &ds_vers[port->ver_idx];
1659 
1660 	mutex_enter(&port->lock);
1661 	if (port->state != DS_PORT_LDC_INIT) {
1662 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: invalid state: %d"
1663 		    DS_EOL, PORTID(port), port->state);
1664 		mutex_exit(&port->lock);
1665 		return;
1666 	}
1667 	mutex_exit(&port->lock);
1668 
1669 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: req=v%d.%d" DS_EOL,
1670 	    PORTID(port), vers->major, vers->minor);
1671 
1672 	msglen = DS_HDR_SZ + sizeof (ds_init_req_t);
1673 	hdr = DS_MALLOC(msglen);
1674 
1675 	hdr->msg_type = DS_INIT_REQ;
1676 	hdr->payload_len = sizeof (ds_init_req_t);
1677 
1678 	init_req = (ds_init_req_t *)((caddr_t)hdr + DS_HDR_SZ);
1679 	init_req->major_vers = vers->major;
1680 	init_req->minor_vers = vers->minor;
1681 
1682 	if (ds_send_msg(port, (caddr_t)hdr, msglen) == 0) {
1683 		/*
1684 		 * We've left the port state unlocked over the malloc/send,
1685 		 * make sure no one has changed the state under us before
1686 		 * we update the state.
1687 		 */
1688 		mutex_enter(&port->lock);
1689 		if (port->state == DS_PORT_LDC_INIT)
1690 			port->state = DS_PORT_INIT_REQ;
1691 		mutex_exit(&port->lock);
1692 	}
1693 	DS_FREE(hdr, msglen);
1694 }
1695 
1696 static int
1697 ds_send_reg_req(ds_svc_t *svc, ds_port_t *port)
1698 {
1699 	ds_ver_t	*ver;
1700 	ds_hdr_t	*hdr;
1701 	caddr_t		msg;
1702 	size_t		msglen;
1703 	ds_reg_req_t	*req;
1704 	size_t		idlen;
1705 	int		rv;
1706 
1707 	if ((svc->state != DS_SVC_INACTIVE) &&
1708 	    ((svc->flags & DSSF_ISCLIENT) == 0)) {
1709 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: invalid svc state (%d) "
1710 		    "for svc '%s'" DS_EOL, PORTID(port), svc->state,
1711 		    svc->cap.svc_id);
1712 		return (-1);
1713 	}
1714 
1715 	mutex_enter(&port->lock);
1716 
1717 	/* check on the LDC to Zeus */
1718 	if (port->ldc.state != LDC_UP) {
1719 		/* can not send message */
1720 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: channel %ld is not up"
1721 		    DS_EOL, PORTID(port), port->ldc.id);
1722 		mutex_exit(&port->lock);
1723 		return (-1);
1724 	}
1725 
1726 	/* make sure port is ready */
1727 	if (port->state != DS_PORT_READY) {
1728 		/* can not send message */
1729 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: port is not ready"
1730 		    DS_EOL, PORTID(port));
1731 		mutex_exit(&port->lock);
1732 		return (-1);
1733 	}
1734 
1735 	mutex_exit(&port->lock);
1736 
1737 	/* allocate the message buffer */
1738 	idlen = strlen(svc->cap.svc_id);
1739 	msglen = DS_HDR_SZ + sizeof (ds_reg_req_t) + idlen;
1740 	msg = DS_MALLOC(msglen);
1741 
1742 	/* copy in the header data */
1743 	hdr = (ds_hdr_t *)msg;
1744 	hdr->msg_type = DS_REG_REQ;
1745 	hdr->payload_len = sizeof (ds_reg_req_t) + idlen;
1746 
1747 	req = (ds_reg_req_t *)(msg + DS_HDR_SZ);
1748 	req->svc_handle = svc->hdl;
1749 	ver = &(svc->cap.vers[svc->ver_idx]);
1750 	req->major_vers = ver->major;
1751 	req->minor_vers = ver->minor;
1752 
1753 	/* copy in the service id */
1754 	(void) memcpy(req->svc_id, svc->cap.svc_id, idlen + 1);
1755 
1756 	/* send the message */
1757 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: '%s' ver=%d.%d, hdl=0x%llx"
1758 	    DS_EOL, PORTID(port), svc->cap.svc_id, ver->major, ver->minor,
1759 	    (u_longlong_t)svc->hdl);
1760 
1761 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1762 		svc->port = port;
1763 		rv = -1;
1764 	} else if ((svc->flags & DSSF_ISCLIENT) == 0) {
1765 		svc->state = DS_SVC_REG_PENDING;
1766 	}
1767 	DS_FREE(msg, msglen);
1768 
1769 	return (rv);
1770 }
1771 
1772 /*
1773  * Keep around in case we want this later
1774  */
1775 int
1776 ds_send_unreg_req(ds_svc_t *svc)
1777 {
1778 	caddr_t		msg;
1779 	size_t		msglen;
1780 	ds_hdr_t	*hdr;
1781 	ds_unreg_req_t	*req;
1782 	ds_port_t	*port = svc->port;
1783 	int		rv;
1784 
1785 	if (port == NULL) {
1786 		DS_DBG(CE_NOTE, "send_unreg_req: service '%s' not "
1787 		    "associated with a port" DS_EOL, svc->cap.svc_id);
1788 		return (-1);
1789 	}
1790 
1791 	mutex_enter(&port->lock);
1792 
1793 	/* check on the LDC to Zeus */
1794 	if (port->ldc.state != LDC_UP) {
1795 		/* can not send message */
1796 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: channel %ld is not up"
1797 		    DS_EOL, PORTID(port), port->ldc.id);
1798 		mutex_exit(&port->lock);
1799 		return (-1);
1800 	}
1801 
1802 	/* make sure port is ready */
1803 	if (port->state != DS_PORT_READY) {
1804 		/* can not send message */
1805 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: port is not ready" DS_EOL,
1806 		    PORTID(port));
1807 		mutex_exit(&port->lock);
1808 		return (-1);
1809 	}
1810 
1811 	mutex_exit(&port->lock);
1812 
1813 	msglen = DS_HDR_SZ + sizeof (ds_unreg_req_t);
1814 	msg = DS_MALLOC(msglen);
1815 
1816 	/* copy in the header data */
1817 	hdr = (ds_hdr_t *)msg;
1818 	hdr->msg_type = DS_UNREG;
1819 	hdr->payload_len = sizeof (ds_unreg_req_t);
1820 
1821 	req = (ds_unreg_req_t *)(msg + DS_HDR_SZ);
1822 	if (svc->flags & DSSF_ISCLIENT) {
1823 		req->svc_handle = svc->svc_hdl;
1824 	} else {
1825 		req->svc_handle = svc->hdl;
1826 	}
1827 
1828 	/* send the message */
1829 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_req>: '%s' hdl=0x%llx" DS_EOL,
1830 	    PORTID(port), (svc->cap.svc_id) ? svc->cap.svc_id : "NULL",
1831 	    (u_longlong_t)svc->hdl);
1832 
1833 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1834 		rv = -1;
1835 	}
1836 	DS_FREE(msg, msglen);
1837 
1838 	return (rv);
1839 }
1840 
1841 static void
1842 ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1843 {
1844 	caddr_t		msg;
1845 	size_t		msglen;
1846 	ds_hdr_t	*hdr;
1847 	ds_unreg_nack_t	*nack;
1848 
1849 	mutex_enter(&port->lock);
1850 
1851 	/* check on the LDC to Zeus */
1852 	if (port->ldc.state != LDC_UP) {
1853 		/* can not send message */
1854 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: channel %ld is not up"
1855 		    DS_EOL, PORTID(port), port->ldc.id);
1856 		mutex_exit(&port->lock);
1857 		return;
1858 	}
1859 
1860 	/* make sure port is ready */
1861 	if (port->state != DS_PORT_READY) {
1862 		/* can not send message */
1863 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: port is not ready"
1864 		    DS_EOL, PORTID(port));
1865 		mutex_exit(&port->lock);
1866 		return;
1867 	}
1868 
1869 	mutex_exit(&port->lock);
1870 
1871 	msglen = DS_HDR_SZ + sizeof (ds_unreg_nack_t);
1872 	msg = DS_MALLOC(msglen);
1873 
1874 	/* copy in the header data */
1875 	hdr = (ds_hdr_t *)msg;
1876 	hdr->msg_type = DS_UNREG_NACK;
1877 	hdr->payload_len = sizeof (ds_unreg_nack_t);
1878 
1879 	nack = (ds_unreg_nack_t *)(msg + DS_HDR_SZ);
1880 	nack->svc_handle = bad_hdl;
1881 
1882 	/* send the message */
1883 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_nack>: hdl=0x%llx" DS_EOL,
1884 	    PORTID(port), (u_longlong_t)bad_hdl);
1885 
1886 	(void) ds_send_msg(port, msg, msglen);
1887 	DS_FREE(msg, msglen);
1888 }
1889 
1890 static void
1891 ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1892 {
1893 	caddr_t		msg;
1894 	size_t		msglen;
1895 	ds_hdr_t	*hdr;
1896 	ds_data_nack_t	*nack;
1897 
1898 	mutex_enter(&port->lock);
1899 
1900 	/* check on the LDC to Zeus */
1901 	if (port->ldc.state != LDC_UP) {
1902 		/* can not send message */
1903 		cmn_err(CE_WARN, "ds@%lx: data_nack>: channel %ld is not up"
1904 		    DS_EOL, PORTID(port), port->ldc.id);
1905 		mutex_exit(&port->lock);
1906 		return;
1907 	}
1908 
1909 	/* make sure port is ready */
1910 	if (port->state != DS_PORT_READY) {
1911 		/* can not send message */
1912 		cmn_err(CE_WARN, "ds@%lx: data_nack>: port is not ready" DS_EOL,
1913 		    PORTID(port));
1914 		mutex_exit(&port->lock);
1915 		return;
1916 	}
1917 
1918 	mutex_exit(&port->lock);
1919 
1920 	msglen = DS_HDR_SZ + sizeof (ds_data_nack_t);
1921 	msg = DS_MALLOC(msglen);
1922 
1923 	/* copy in the header data */
1924 	hdr = (ds_hdr_t *)msg;
1925 	hdr->msg_type = DS_NACK;
1926 	hdr->payload_len = sizeof (ds_data_nack_t);
1927 
1928 	nack = (ds_data_nack_t *)(msg + DS_HDR_SZ);
1929 	nack->svc_handle = bad_hdl;
1930 	nack->result = DS_INV_HDL;
1931 
1932 	/* send the message */
1933 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack>: hdl=0x%llx" DS_EOL,
1934 	    PORTID(port), (u_longlong_t)bad_hdl);
1935 
1936 	(void) ds_send_msg(port, msg, msglen);
1937 	DS_FREE(msg, msglen);
1938 }
1939 
1940 /* END DS PROTOCOL SUPPORT FUNCTIONS */
1941 
1942 #ifdef DEBUG
1943 
1944 #define	BYTESPERLINE	8
1945 #define	LINEWIDTH	((BYTESPERLINE * 3) + (BYTESPERLINE + 2) + 1)
1946 #define	ASCIIOFFSET	((BYTESPERLINE * 3) + 2)
1947 #define	ISPRINT(c)	((c >= ' ') && (c <= '~'))
1948 
1949 /*
1950  * Output a buffer formatted with a set number of bytes on
1951  * each line. Append each line with the ASCII equivalent of
1952  * each byte if it falls within the printable ASCII range,
1953  * and '.' otherwise.
1954  */
1955 void
1956 ds_dump_msg(void *vbuf, size_t len)
1957 {
1958 	int	i, j;
1959 	char	*curr;
1960 	char	*aoff;
1961 	char	line[LINEWIDTH];
1962 	uint8_t	*buf = vbuf;
1963 
1964 	if (len > 128)
1965 		len = 128;
1966 
1967 	/* walk the buffer one line at a time */
1968 	for (i = 0; i < len; i += BYTESPERLINE) {
1969 
1970 		bzero(line, LINEWIDTH);
1971 
1972 		curr = line;
1973 		aoff = line + ASCIIOFFSET;
1974 
1975 		/*
1976 		 * Walk the bytes in the current line, storing
1977 		 * the hex value for the byte as well as the
1978 		 * ASCII representation in a temporary buffer.
1979 		 * All ASCII values are placed at the end of
1980 		 * the line.
1981 		 */
1982 		for (j = 0; (j < BYTESPERLINE) && ((i + j) < len); j++) {
1983 			(void) sprintf(curr, " %02x", buf[i + j]);
1984 			*aoff = (ISPRINT(buf[i + j])) ? buf[i + j] : '.';
1985 			curr += 3;
1986 			aoff++;
1987 		}
1988 
1989 		/*
1990 		 * Fill in to the start of the ASCII translation
1991 		 * with spaces. This will only be necessary if
1992 		 * this is the last line and there are not enough
1993 		 * bytes to fill the whole line.
1994 		 */
1995 		while (curr != (line + ASCIIOFFSET))
1996 			*curr++ = ' ';
1997 
1998 		cmn_err(CE_NOTE, "%s" DS_EOL, line);
1999 	}
2000 }
2001 #endif /* DEBUG */
2002 
2003 
2004 /*
2005  * Walk the table of registered services, executing the specified callback
2006  * function for each service on a port. A non-zero return value from the
2007  * callback is used to terminate the walk, not to indicate an error. Returns
2008  * the index of the last service visited.
2009  */
2010 int
2011 ds_walk_svcs(svc_cb_t svc_cb, void *arg)
2012 {
2013 	int		idx;
2014 	ds_svc_t	*svc;
2015 
2016 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2017 
2018 	/* walk every table entry */
2019 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
2020 		svc = ds_svcs.tbl[idx];
2021 
2022 		/* execute the callback */
2023 		if ((*svc_cb)(svc, arg) != 0)
2024 			break;
2025 	}
2026 
2027 	return (idx);
2028 }
2029 
2030 static int
2031 ds_svc_isfree(ds_svc_t *svc, void *arg)
2032 {
2033 	_NOTE(ARGUNUSED(arg))
2034 
2035 	/*
2036 	 * Looking for a free service. This may be a NULL entry
2037 	 * in the table, or an unused structure that could be
2038 	 * reused.
2039 	 */
2040 
2041 	if (DS_SVC_ISFREE(svc)) {
2042 		/* yes, it is free */
2043 		return (1);
2044 	}
2045 
2046 	/* not a candidate */
2047 	return (0);
2048 }
2049 
2050 int
2051 ds_svc_ismatch(ds_svc_t *svc, void *arg)
2052 {
2053 	if (DS_SVC_ISFREE(svc)) {
2054 		return (0);
2055 	}
2056 
2057 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2058 	    (svc->flags & DSSF_ISCLIENT) == 0) {
2059 		/* found a match */
2060 		return (1);
2061 	}
2062 
2063 	return (0);
2064 }
2065 
2066 int
2067 ds_svc_clnt_ismatch(ds_svc_t *svc, void *arg)
2068 {
2069 	if (DS_SVC_ISFREE(svc)) {
2070 		return (0);
2071 	}
2072 
2073 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2074 	    (svc->flags & DSSF_ISCLIENT) != 0) {
2075 		/* found a match */
2076 		return (1);
2077 	}
2078 
2079 	return (0);
2080 }
2081 
2082 int
2083 ds_svc_free(ds_svc_t *svc, void *arg)
2084 {
2085 	_NOTE(ARGUNUSED(arg))
2086 
2087 	if (svc == NULL) {
2088 		return (0);
2089 	}
2090 
2091 	if (svc->cap.svc_id) {
2092 		DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
2093 		svc->cap.svc_id = NULL;
2094 	}
2095 
2096 	if (svc->cap.vers) {
2097 		DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
2098 		svc->cap.vers = NULL;
2099 	}
2100 
2101 	DS_FREE(svc, sizeof (ds_svc_t));
2102 
2103 	return (0);
2104 }
2105 
2106 static int
2107 ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port)
2108 {
2109 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2110 
2111 	if (DS_SVC_ISFREE(svc))
2112 		return (0);
2113 
2114 	if (!DS_PORT_IN_SET(svc->avail, PORTID(port)))
2115 		return (0);
2116 
2117 	DS_PORTSET_ADD(svc->tried, PORTID(port));
2118 
2119 	if (ds_send_reg_req(svc, port) == 0) {
2120 		/* register sent successfully */
2121 		return (1);
2122 	}
2123 
2124 	if ((svc->flags & DSSF_ISCLIENT) == 0) {
2125 		/* reset the service */
2126 		ds_reset_svc(svc, port);
2127 	}
2128 	return (0);
2129 }
2130 
2131 int
2132 ds_svc_register(ds_svc_t *svc, void *arg)
2133 {
2134 	_NOTE(ARGUNUSED(arg))
2135 	ds_portset_t ports;
2136 	ds_port_t *port;
2137 	int	idx;
2138 
2139 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2140 
2141 	if (DS_SVC_ISFREE(svc))
2142 		return (0);
2143 
2144 	DS_PORTSET_DUP(ports, svc->avail);
2145 	if (svc->flags & DSSF_ISCLIENT) {
2146 		ds_portset_del_active_clients(svc->cap.svc_id, &ports);
2147 	} else if (svc->state != DS_SVC_INACTIVE)
2148 		return (0);
2149 
2150 	if (DS_PORTSET_ISNULL(ports))
2151 		return (0);
2152 
2153 	/*
2154 	 * Attempt to register the service. Start with the lowest
2155 	 * numbered port and continue until a registration message
2156 	 * is sent successfully, or there are no ports left to try.
2157 	 */
2158 	for (idx = 0; idx < DS_MAX_PORTS; idx++) {
2159 
2160 		/*
2161 		 * If the port is not in the available list,
2162 		 * it is not a candidate for registration.
2163 		 */
2164 		if (!DS_PORT_IN_SET(ports, idx)) {
2165 			continue;
2166 		}
2167 
2168 		port = &ds_ports[idx];
2169 		if (ds_svc_register_onport(svc, port)) {
2170 			if ((svc->flags & DSSF_ISCLIENT) == 0)
2171 				break;
2172 			DS_PORTSET_DEL(svc->avail, idx);
2173 		}
2174 	}
2175 
2176 	return (0);
2177 }
2178 
2179 static int
2180 ds_svc_unregister(ds_svc_t *svc, void *arg)
2181 {
2182 	ds_port_t *port = (ds_port_t *)arg;
2183 	ds_svc_hdl_t hdl;
2184 
2185 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2186 
2187 	if (DS_SVC_ISFREE(svc)) {
2188 		return (0);
2189 	}
2190 
2191 	/* make sure the service is using this port */
2192 	if (svc->port != port) {
2193 		return (0);
2194 	}
2195 
2196 	if (port) {
2197 		DS_DBG(CE_NOTE, "ds@%lx: svc_unreg: id='%s', ver=%d.%d, "
2198 		    " hdl=0x%09lx" DS_EOL, PORTID(port), svc->cap.svc_id,
2199 		    svc->ver.major, svc->ver.minor, svc->hdl);
2200 	} else {
2201 		DS_DBG(CE_NOTE, "port=NULL: svc_unreg: id='%s', ver=%d.%d, "
2202 		    " hdl=0x%09lx" DS_EOL, svc->cap.svc_id, svc->ver.major,
2203 		    svc->ver.minor, svc->hdl);
2204 	}
2205 
2206 	/* reset the service structure */
2207 	ds_reset_svc(svc, port);
2208 
2209 	/* call the client unregister callback */
2210 	if (svc->ops.ds_unreg_cb) {
2211 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
2212 	}
2213 
2214 	/* increment the count in the handle to prevent reuse */
2215 	hdl = DS_ALLOC_HDL(DS_HDL2IDX(svc->hdl), DS_HDL2COUNT(svc->hdl));
2216 	if (DS_HDL_ISCLIENT(svc->hdl)) {
2217 		DS_HDL_SET_ISCLIENT(hdl);
2218 	}
2219 	svc->hdl = hdl;
2220 
2221 	if (svc->state != DS_SVC_UNREG_PENDING) {
2222 		/* try to initiate a new registration */
2223 		(void) ds_svc_register(svc, NULL);
2224 	}
2225 
2226 	return (0);
2227 }
2228 
2229 static int
2230 ds_svc_port_up(ds_svc_t *svc, void *arg)
2231 {
2232 	ds_port_t *port = (ds_port_t *)arg;
2233 
2234 	if (DS_SVC_ISFREE(svc)) {
2235 		/* nothing to do */
2236 		return (0);
2237 	}
2238 
2239 	DS_PORTSET_ADD(svc->avail, port->id);
2240 	DS_PORTSET_DEL(svc->tried, port->id);
2241 
2242 	return (0);
2243 }
2244 
2245 ds_svc_t *
2246 ds_alloc_svc(void)
2247 {
2248 	int		idx;
2249 	uint_t		newmaxsvcs;
2250 	ds_svc_t	**newtbl;
2251 	ds_svc_t	*newsvc;
2252 
2253 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2254 
2255 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2256 
2257 	if (idx != ds_svcs.maxsvcs) {
2258 		goto found;
2259 	}
2260 
2261 	/*
2262 	 * There was no free space in the table. Grow
2263 	 * the table to double its current size.
2264 	 */
2265 	newmaxsvcs = ds_svcs.maxsvcs * 2;
2266 	newtbl = DS_MALLOC(newmaxsvcs * sizeof (ds_svc_t *));
2267 
2268 	/* copy old table data to the new table */
2269 	(void) memcpy(newtbl, ds_svcs.tbl,
2270 	    ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2271 
2272 	/* clean up the old table */
2273 	DS_FREE(ds_svcs.tbl, ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2274 	ds_svcs.tbl = newtbl;
2275 	ds_svcs.maxsvcs = newmaxsvcs;
2276 
2277 	/* search for a free space again */
2278 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2279 
2280 	/* the table is locked so should find a free slot */
2281 	ASSERT(idx != ds_svcs.maxsvcs);
2282 
2283 found:
2284 	/* allocate a new svc structure if necessary */
2285 	if ((newsvc = ds_svcs.tbl[idx]) == NULL) {
2286 		/* allocate a new service */
2287 		newsvc = DS_MALLOC(sizeof (ds_svc_t));
2288 		ds_svcs.tbl[idx] = newsvc;
2289 	}
2290 
2291 	/* fill in the handle */
2292 	newsvc->hdl = DS_ALLOC_HDL(idx, DS_HDL2COUNT(newsvc->hdl));
2293 	newsvc->state = DS_SVC_FREE;	/* Mark as free temporarily */
2294 
2295 	return (newsvc);
2296 }
2297 
2298 static void
2299 ds_reset_svc(ds_svc_t *svc, ds_port_t *port)
2300 {
2301 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2302 
2303 	if (svc->state != DS_SVC_UNREG_PENDING)
2304 		svc->state = DS_SVC_INACTIVE;
2305 	svc->ver_idx = 0;
2306 	svc->ver.major = 0;
2307 	svc->ver.minor = 0;
2308 	svc->port = NULL;
2309 	if (port) {
2310 		DS_PORTSET_DEL(svc->avail, port->id);
2311 	}
2312 }
2313 
2314 ds_svc_t *
2315 ds_get_svc(ds_svc_hdl_t hdl)
2316 {
2317 	int		idx;
2318 	ds_svc_t	*svc;
2319 
2320 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2321 
2322 	if (hdl == DS_INVALID_HDL)
2323 		return (NULL);
2324 
2325 	idx = DS_HDL2IDX(hdl);
2326 
2327 	/* check if index is out of bounds */
2328 	if ((idx < 0) || (idx >= ds_svcs.maxsvcs))
2329 		return (NULL);
2330 
2331 	svc = ds_svcs.tbl[idx];
2332 
2333 	/* check for a valid service */
2334 	if (DS_SVC_ISFREE(svc))
2335 		return (NULL);
2336 
2337 	/* make sure the handle is an exact match */
2338 	if (svc->hdl != hdl)
2339 		return (NULL);
2340 
2341 	return (svc);
2342 }
2343 
2344 static void
2345 ds_port_reset(ds_port_t *port)
2346 {
2347 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2348 	ASSERT(MUTEX_HELD(&port->lock));
2349 
2350 	/* connection went down, mark everything inactive */
2351 	(void) ds_walk_svcs(ds_svc_unregister, port);
2352 
2353 	port->ver_idx = 0;
2354 	port->ver.major = 0;
2355 	port->ver.minor = 0;
2356 	port->state = DS_PORT_LDC_INIT;
2357 }
2358 
2359 /*
2360  * Verify that a version array is sorted as expected for the
2361  * version negotiation to work correctly.
2362  */
2363 ds_vers_check_t
2364 ds_vers_isvalid(ds_ver_t *vers, int nvers)
2365 {
2366 	uint16_t	curr_major;
2367 	uint16_t	curr_minor;
2368 	int		idx;
2369 
2370 	curr_major = vers[0].major;
2371 	curr_minor = vers[0].minor;
2372 
2373 	/*
2374 	 * Walk the version array, verifying correct ordering.
2375 	 * The array must be sorted from highest supported
2376 	 * version to lowest supported version.
2377 	 */
2378 	for (idx = 0; idx < nvers; idx++) {
2379 		if (vers[idx].major > curr_major) {
2380 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2381 			    " increasing major versions" DS_EOL);
2382 			return (DS_VERS_INCREASING_MAJOR_ERR);
2383 		}
2384 
2385 		if (vers[idx].major < curr_major) {
2386 			curr_major = vers[idx].major;
2387 			curr_minor = vers[idx].minor;
2388 			continue;
2389 		}
2390 
2391 		if (vers[idx].minor > curr_minor) {
2392 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2393 			    " increasing minor versions" DS_EOL);
2394 			return (DS_VERS_INCREASING_MINOR_ERR);
2395 		}
2396 
2397 		curr_minor = vers[idx].minor;
2398 	}
2399 
2400 	return (DS_VERS_OK);
2401 }
2402 
2403 /*
2404  * Extended user capability init.
2405  */
2406 int
2407 ds_ucap_init(ds_capability_t *cap, ds_clnt_ops_t *ops, uint32_t flags,
2408     int instance, ds_svc_hdl_t *hdlp)
2409 {
2410 	ds_vers_check_t	status;
2411 	ds_svc_t	*svc;
2412 	int		rv = 0;
2413 	ds_svc_hdl_t 	lb_hdl, hdl;
2414 	int		is_loopback;
2415 	int		is_client;
2416 
2417 	/* sanity check the args */
2418 	if ((cap == NULL) || (ops == NULL)) {
2419 		cmn_err(CE_NOTE, "%s: invalid arguments" DS_EOL, __func__);
2420 		return (EINVAL);
2421 	}
2422 
2423 	/* sanity check the capability specifier */
2424 	if ((cap->svc_id == NULL) || (cap->vers == NULL) || (cap->nvers == 0)) {
2425 		cmn_err(CE_NOTE, "%s: invalid capability specifier" DS_EOL,
2426 		    __func__);
2427 		return (EINVAL);
2428 	}
2429 
2430 	/* sanity check the version array */
2431 	if ((status = ds_vers_isvalid(cap->vers, cap->nvers)) != DS_VERS_OK) {
2432 		cmn_err(CE_NOTE, "%s: invalid capability version array "
2433 		    "for %s service: %s" DS_EOL, __func__, cap->svc_id,
2434 		    (status == DS_VERS_INCREASING_MAJOR_ERR) ?
2435 		    "increasing major versions" :
2436 		    "increasing minor versions");
2437 		return (EINVAL);
2438 	}
2439 
2440 	/* data and register callbacks are required */
2441 	if ((ops->ds_data_cb == NULL) || (ops->ds_reg_cb == NULL)) {
2442 		cmn_err(CE_NOTE, "%s: invalid ops specifier for %s service"
2443 		    DS_EOL, __func__, cap->svc_id);
2444 		return (EINVAL);
2445 	}
2446 
2447 	flags &= DSSF_USERFLAGS;
2448 	is_client = flags & DSSF_ISCLIENT;
2449 
2450 	DS_DBG_USR(CE_NOTE, "%s: svc_id='%s', data_cb=0x%lx, cb_arg=0x%lx"
2451 	    DS_EOL, __func__, cap->svc_id, PTR_TO_LONG(ops->ds_data_cb),
2452 	    PTR_TO_LONG(ops->cb_arg));
2453 
2454 	mutex_enter(&ds_svcs.lock);
2455 
2456 	/* check if the service is already registered */
2457 	if (i_ds_hdl_lookup(cap->svc_id, is_client, NULL, 1) == 1) {
2458 		/* already registered */
2459 		cmn_err(CE_NOTE, "Service '%s'/%s already registered" DS_EOL,
2460 		    cap->svc_id,
2461 		    (flags & DSSF_ISCLIENT) ? "client" : "service");
2462 		mutex_exit(&ds_svcs.lock);
2463 		return (EALREADY);
2464 	}
2465 
2466 	svc = ds_alloc_svc();
2467 	if (is_client) {
2468 		DS_HDL_SET_ISCLIENT(svc->hdl);
2469 	}
2470 
2471 	svc->state = DS_SVC_FREE;
2472 	svc->svc_hdl = DS_BADHDL1;
2473 
2474 	svc->flags = flags;
2475 	svc->drvi = instance;
2476 	svc->drv_psp = NULL;
2477 
2478 	/*
2479 	 * Check for loopback.  "pri" is a legacy service that assumes it
2480 	 * will never use loopback mode.
2481 	 */
2482 	if (strcmp(cap->svc_id, "pri") == 0) {
2483 		is_loopback = 0;
2484 	} else if (i_ds_hdl_lookup(cap->svc_id, is_client == 0, &lb_hdl, 1)
2485 	    == 1) {
2486 		if ((rv = ds_loopback_set_svc(svc, cap, &lb_hdl)) != 0) {
2487 			DS_DBG_USR(CE_NOTE, "%s: ds_loopback_set_svc '%s' err "
2488 			    " (%d)" DS_EOL, __func__, cap->svc_id, rv);
2489 			mutex_exit(&ds_svcs.lock);
2490 			return (rv);
2491 		}
2492 		is_loopback = 1;
2493 	} else
2494 		is_loopback = 0;
2495 
2496 	/* copy over all the client information */
2497 	(void) memcpy(&svc->cap, cap, sizeof (ds_capability_t));
2498 
2499 	/* make a copy of the service name */
2500 	svc->cap.svc_id = ds_strdup(cap->svc_id);
2501 
2502 	/* make a copy of the version array */
2503 	svc->cap.vers = DS_MALLOC(cap->nvers * sizeof (ds_ver_t));
2504 	(void) memcpy(svc->cap.vers, cap->vers, cap->nvers * sizeof (ds_ver_t));
2505 
2506 	/* copy the client ops vector */
2507 	(void) memcpy(&svc->ops, ops, sizeof (ds_clnt_ops_t));
2508 
2509 	svc->state = DS_SVC_INACTIVE;
2510 	svc->ver_idx = 0;
2511 	DS_PORTSET_DUP(svc->avail, ds_allports);
2512 	DS_PORTSET_SETNULL(svc->tried);
2513 
2514 	ds_svcs.nsvcs++;
2515 
2516 	hdl = svc->hdl;
2517 
2518 	/*
2519 	 * kludge to allow user callback code to get handle and user args.
2520 	 * Make sure the callback arg points to the svc structure.
2521 	 */
2522 	if ((flags & DSSF_ISUSER) != 0) {
2523 		ds_cbarg_set_cookie(svc);
2524 	}
2525 
2526 	if (is_loopback) {
2527 		ds_loopback_register(hdl);
2528 		ds_loopback_register(lb_hdl);
2529 	}
2530 
2531 	/*
2532 	 * If this is a client or a non-loopback service provider, send
2533 	 * out register requests.
2534 	 */
2535 	if (!is_loopback || (flags & DSSF_ISCLIENT) != 0)
2536 		(void) ds_svc_register(svc, NULL);
2537 
2538 	if (hdlp) {
2539 		*hdlp = hdl;
2540 	}
2541 
2542 	mutex_exit(&ds_svcs.lock);
2543 
2544 	DS_DBG_USR(CE_NOTE, "%s: service '%s' assigned handle 0x%09lx" DS_EOL,
2545 	    __func__, svc->cap.svc_id, hdl);
2546 
2547 	return (0);
2548 }
2549 
2550 /*
2551  * ds_cap_init interface for previous revision.
2552  */
2553 int
2554 ds_cap_init(ds_capability_t *cap, ds_clnt_ops_t *ops)
2555 {
2556 	return (ds_ucap_init(cap, ops, 0, DS_INVALID_INSTANCE, NULL));
2557 }
2558 
2559 /*
2560  * Interface for ds_unreg_hdl in lds driver.
2561  */
2562 int
2563 ds_unreg_hdl(ds_svc_hdl_t hdl)
2564 {
2565 	ds_svc_t	*svc;
2566 	int		is_loopback;
2567 	ds_svc_hdl_t	lb_hdl;
2568 
2569 	DS_DBG_USR(CE_NOTE, "%s: hdl=0x%09lx" DS_EOL, __func__, hdl);
2570 
2571 	mutex_enter(&ds_svcs.lock);
2572 	if ((svc = ds_get_svc(hdl)) == NULL) {
2573 		mutex_exit(&ds_svcs.lock);
2574 		DS_DBG_USR(CE_NOTE, "%s: unknown hdl: 0x%llx" DS_EOL, __func__,
2575 		    (u_longlong_t)hdl);
2576 		return (ENXIO);
2577 	}
2578 
2579 	DS_DBG_USR(CE_NOTE, "%s: svcid='%s', hdl=0x%llx" DS_EOL, __func__,
2580 	    svc->cap.svc_id, (u_longlong_t)svc->hdl);
2581 
2582 	svc->state = DS_SVC_UNREG_PENDING;
2583 
2584 	is_loopback = ((svc->flags & DSSF_LOOPBACK) != 0);
2585 	lb_hdl = svc->svc_hdl;
2586 
2587 	if (svc->port) {
2588 		(void) ds_send_unreg_req(svc);
2589 	}
2590 
2591 	(void) ds_svc_unregister(svc, svc->port);
2592 
2593 	ds_delete_svc_entry(svc);
2594 
2595 	if (is_loopback) {
2596 		ds_loopback_unregister(lb_hdl);
2597 	}
2598 
2599 	mutex_exit(&ds_svcs.lock);
2600 
2601 	return (0);
2602 }
2603 
2604 int
2605 ds_cap_fini(ds_capability_t *cap)
2606 {
2607 	ds_svc_hdl_t	hdl;
2608 	int rv;
2609 	uint_t nhdls = 0;
2610 
2611 	DS_DBG(CE_NOTE, "%s: '%s'" DS_EOL, __func__, cap->svc_id);
2612 	if ((rv = ds_hdl_lookup(cap->svc_id, 0, &hdl, 1, &nhdls)) != 0) {
2613 		DS_DBG(CE_NOTE, "%s: ds_hdl_lookup '%s' err (%d)" DS_EOL,
2614 		    __func__, cap->svc_id, rv);
2615 		return (rv);
2616 	}
2617 
2618 	if (nhdls == 0) {
2619 		DS_DBG(CE_NOTE, "%s: no such service '%s'" DS_EOL,
2620 		    __func__, cap->svc_id);
2621 		return (ENXIO);
2622 	}
2623 
2624 	if ((rv = ds_is_my_hdl(hdl, DS_INVALID_INSTANCE)) != 0) {
2625 		DS_DBG(CE_NOTE, "%s: ds_is_my_handle err (%d)" DS_EOL, __func__,
2626 		    rv);
2627 		return (rv);
2628 	}
2629 
2630 	if ((rv = ds_unreg_hdl(hdl)) != 0) {
2631 		DS_DBG(CE_NOTE, "%s: ds_unreg_hdl err (%d)" DS_EOL, __func__,
2632 		    rv);
2633 		return (rv);
2634 	}
2635 
2636 	return (0);
2637 }
2638 
2639 int
2640 ds_cap_send(ds_svc_hdl_t hdl, void *buf, size_t len)
2641 {
2642 	int		rv;
2643 	ds_hdr_t	*hdr;
2644 	caddr_t		msg;
2645 	size_t		msglen;
2646 	size_t		hdrlen;
2647 	caddr_t		payload;
2648 	ds_svc_t	*svc;
2649 	ds_port_t	*port;
2650 	ds_data_handle_t *data;
2651 	ds_svc_hdl_t	svc_hdl;
2652 	int		is_client = 0;
2653 
2654 	DS_DBG(CE_NOTE, "%s: hdl: 0x%llx, buf: %lx, len: %ld" DS_EOL, __func__,
2655 	    (u_longlong_t)hdl, (ulong_t)buf, len);
2656 
2657 	mutex_enter(&ds_svcs.lock);
2658 
2659 	if ((svc = ds_get_svc(hdl)) == NULL) {
2660 		cmn_err(CE_WARN, "%s: invalid handle 0x%llx" DS_EOL, __func__,
2661 		    (u_longlong_t)hdl);
2662 		mutex_exit(&ds_svcs.lock);
2663 		return (ENXIO);
2664 	}
2665 
2666 	if (svc->state != DS_SVC_ACTIVE) {
2667 		/* channel is up, but svc is not registered */
2668 		DS_DBG(CE_NOTE, "%s: invalid service state 0x%x" DS_EOL,
2669 		    __func__, svc->state);
2670 		mutex_exit(&ds_svcs.lock);
2671 		return (ENOTCONN);
2672 	}
2673 
2674 	if (svc->flags & DSSF_LOOPBACK) {
2675 		hdl = svc->svc_hdl;
2676 		mutex_exit(&ds_svcs.lock);
2677 		ds_loopback_send(hdl, buf, len);
2678 		return (0);
2679 	}
2680 
2681 	if ((port = svc->port) == NULL) {
2682 		DS_DBG(CE_NOTE, "%s: service '%s' not associated with a port"
2683 		    DS_EOL, __func__, svc->cap.svc_id);
2684 		mutex_exit(&ds_svcs.lock);
2685 		return (ECONNRESET);
2686 	}
2687 
2688 	if (svc->flags & DSSF_ISCLIENT) {
2689 		is_client = 1;
2690 		svc_hdl = svc->svc_hdl;
2691 	}
2692 
2693 	mutex_exit(&ds_svcs.lock);
2694 
2695 	/* check that the LDC channel is ready */
2696 	if (port->ldc.state != LDC_UP) {
2697 		DS_DBG(CE_NOTE, "%s: LDC channel is not up" DS_EOL, __func__);
2698 		return (ECONNRESET);
2699 	}
2700 
2701 	hdrlen = DS_HDR_SZ + sizeof (ds_data_handle_t);
2702 
2703 	msg = DS_MALLOC(len + hdrlen);
2704 	hdr = (ds_hdr_t *)msg;
2705 	payload = msg + hdrlen;
2706 	msglen = len + hdrlen;
2707 
2708 	hdr->payload_len = len + sizeof (ds_data_handle_t);
2709 	hdr->msg_type = DS_DATA;
2710 
2711 	data = (ds_data_handle_t *)(msg + DS_HDR_SZ);
2712 	if (is_client) {
2713 		data->svc_handle = svc_hdl;
2714 	} else {
2715 		data->svc_handle = hdl;
2716 	}
2717 
2718 	if ((buf != NULL) && (len != 0)) {
2719 		(void) memcpy(payload, buf, len);
2720 	}
2721 
2722 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data>: hdl=0x%llx, len=%ld, "
2723 	    " payload_len=%d" DS_EOL, PORTID(port), (u_longlong_t)svc->hdl,
2724 	    msglen, hdr->payload_len);
2725 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msglen);
2726 
2727 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
2728 		rv = (rv == EIO) ? ECONNRESET : rv;
2729 	}
2730 	DS_FREE(msg, msglen);
2731 
2732 	return (rv);
2733 }
2734 
2735 void
2736 ds_port_common_init(ds_port_t *port)
2737 {
2738 	int rv;
2739 
2740 	if ((port->flags & DS_PORT_MUTEX_INITED) == 0) {
2741 		mutex_init(&port->lock, NULL, MUTEX_DRIVER, NULL);
2742 		mutex_init(&port->tx_lock, NULL, MUTEX_DRIVER, NULL);
2743 		mutex_init(&port->rcv_lock, NULL, MUTEX_DRIVER, NULL);
2744 		port->flags |= DS_PORT_MUTEX_INITED;
2745 	}
2746 
2747 	port->state = DS_PORT_INIT;
2748 	DS_PORTSET_ADD(ds_allports, port->id);
2749 
2750 	ds_sys_port_init(port);
2751 
2752 	mutex_enter(&port->lock);
2753 	rv = ds_ldc_init(port);
2754 	mutex_exit(&port->lock);
2755 
2756 	/*
2757 	 * If LDC successfully init'ed, try to kick off protocol for this port.
2758 	 */
2759 	if (rv == 0) {
2760 		ds_handle_up_event(port);
2761 	}
2762 }
2763 
2764 void
2765 ds_port_common_fini(ds_port_t *port)
2766 {
2767 	ASSERT(MUTEX_HELD(&port->lock));
2768 
2769 	port->state = DS_PORT_FREE;
2770 
2771 	DS_PORTSET_DEL(ds_allports, port->id);
2772 
2773 	ds_sys_port_fini(port);
2774 }
2775 
2776 /*
2777  * Initialize table of registered service classes
2778  */
2779 void
2780 ds_init_svcs_tbl(uint_t nentries)
2781 {
2782 	int	tblsz;
2783 
2784 	ds_svcs.maxsvcs = nentries;
2785 
2786 	tblsz = ds_svcs.maxsvcs * sizeof (ds_svc_t *);
2787 	ds_svcs.tbl = (ds_svc_t **)DS_MALLOC(tblsz);
2788 
2789 	ds_svcs.nsvcs = 0;
2790 }
2791 
2792 /*
2793  * Find the max and min version supported.
2794  * Hacked from zeus workspace, support.c
2795  */
2796 static void
2797 min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
2798     uint16_t *min_major, uint16_t *max_major)
2799 {
2800 	int i;
2801 
2802 	*min_major = sup_versionsp[0].major;
2803 	*max_major = *min_major;
2804 
2805 	for (i = 1; i < num_versions; i++) {
2806 		if (sup_versionsp[i].major < *min_major)
2807 			*min_major = sup_versionsp[i].major;
2808 
2809 		if (sup_versionsp[i].major > *max_major)
2810 			*max_major = sup_versionsp[i].major;
2811 	}
2812 }
2813 
2814 /*
2815  * Check whether the major and minor numbers requested by the peer can be
2816  * satisfied. If the requested major is supported, true is returned, and the
2817  * agreed minor is returned in new_minor. If the requested major is not
2818  * supported, the routine returns false, and the closest major is returned in
2819  * *new_major, upon which the peer should re-negotiate. The closest major is
2820  * the just lower that the requested major number.
2821  *
2822  * Hacked from zeus workspace, support.c
2823  */
2824 boolean_t
2825 negotiate_version(int num_versions, ds_ver_t *sup_versionsp,
2826     uint16_t req_major, uint16_t *new_majorp, uint16_t *new_minorp)
2827 {
2828 	int i;
2829 	uint16_t major, lower_major;
2830 	uint16_t min_major = 0, max_major;
2831 	boolean_t found_match = B_FALSE;
2832 
2833 	min_max_versions(num_versions, sup_versionsp, &min_major, &max_major);
2834 
2835 	DS_DBG(CE_NOTE, "negotiate_version: req_major = %u, min = %u, max = %u"
2836 	    DS_EOL, req_major, min_major, max_major);
2837 
2838 	/*
2839 	 * If the minimum version supported is greater than
2840 	 * the version requested, return the lowest version
2841 	 * supported
2842 	 */
2843 	if (min_major > req_major) {
2844 		*new_majorp = min_major;
2845 		return (B_FALSE);
2846 	}
2847 
2848 	/*
2849 	 * If the largest version supported is lower than
2850 	 * the version requested, return the largest version
2851 	 * supported
2852 	 */
2853 	if (max_major < req_major) {
2854 		*new_majorp = max_major;
2855 		return (B_FALSE);
2856 	}
2857 
2858 	/*
2859 	 * Now we know that the requested version lies between the
2860 	 * min and max versions supported. Check if the requested
2861 	 * major can be found in supported versions.
2862 	 */
2863 	lower_major = min_major;
2864 	for (i = 0; i < num_versions; i++) {
2865 		major = sup_versionsp[i].major;
2866 		if (major == req_major) {
2867 			found_match = B_TRUE;
2868 			*new_majorp = req_major;
2869 			*new_minorp = sup_versionsp[i].minor;
2870 			break;
2871 		} else {
2872 			if ((major < req_major) && (major > lower_major))
2873 				lower_major = major;
2874 		}
2875 	}
2876 
2877 	/*
2878 	 * If no match is found, return the closest available number
2879 	 */
2880 	if (!found_match)
2881 		*new_majorp = lower_major;
2882 
2883 	return (found_match);
2884 }
2885 
2886 /*
2887  * Specific errno's that are used by ds.c and ldc.c
2888  */
2889 static struct {
2890 	int ds_errno;
2891 	char *estr;
2892 } ds_errno_to_str_tab[] = {
2893 	{ EIO,		"I/O error" },
2894 	{ ENXIO,	"No such device or address" },
2895 	{ EAGAIN,	"Resource temporarily unavailable" },
2896 	{ ENOMEM,	"Not enough space" },
2897 	{ EACCES,	"Permission denied" },
2898 	{ EFAULT,	"Bad address" },
2899 	{ EBUSY,	"Device busy" },
2900 	{ EINVAL,	"Invalid argument" },
2901 	{ ENOSPC,	"No space left on device" },
2902 	{ ENOMSG,	"No message of desired type" },
2903 #ifdef	ECHRNG
2904 	{ ECHRNG,	"Channel number out of range" },
2905 #endif
2906 	{ ENOTSUP,	"Operation not supported" },
2907 	{ EMSGSIZE,	"Message too long" },
2908 	{ EADDRINUSE,	"Address already in use" },
2909 	{ ECONNRESET,	"Connection reset by peer" },
2910 	{ ENOBUFS,	"No buffer space available" },
2911 	{ ENOTCONN,	"Socket is not connected" },
2912 	{ ECONNREFUSED,	"Connection refused" },
2913 	{ EALREADY,	"Operation already in progress" },
2914 	{ 0,		NULL },
2915 };
2916 
2917 char *
2918 ds_errno_to_str(int ds_errno, char *ebuf)
2919 {
2920 	int i, en;
2921 
2922 	for (i = 0; (en = ds_errno_to_str_tab[i].ds_errno) != 0; i++) {
2923 		if (en == ds_errno) {
2924 			(void) strcpy(ebuf, ds_errno_to_str_tab[i].estr);
2925 			return (ebuf);
2926 		}
2927 	}
2928 
2929 	(void) sprintf(ebuf, "ds_errno (%d)", ds_errno);
2930 	return (ebuf);
2931 }
2932 
2933 static void
2934 ds_loopback_register(ds_svc_hdl_t hdl)
2935 {
2936 	ds_ver_t ds_ver;
2937 	ds_svc_t *svc;
2938 
2939 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2940 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2941 	    (u_longlong_t)hdl);
2942 	if ((svc = ds_get_svc(hdl)) == NULL) {
2943 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2944 		    (u_longlong_t)hdl);
2945 		return;
2946 	}
2947 
2948 	svc->state = DS_SVC_ACTIVE;
2949 
2950 	if (svc->ops.ds_reg_cb) {
2951 		DS_DBG_LOOP(CE_NOTE, "%s: loopback regcb: hdl: 0x%llx" DS_EOL,
2952 		    __func__, (u_longlong_t)hdl);
2953 		ds_ver.major = svc->ver.major;
2954 		ds_ver.minor = svc->ver.minor;
2955 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &ds_ver, hdl);
2956 	}
2957 }
2958 
2959 static void
2960 ds_loopback_unregister(ds_svc_hdl_t hdl)
2961 {
2962 	ds_svc_t *svc;
2963 
2964 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2965 	if ((svc = ds_get_svc(hdl)) == NULL) {
2966 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2967 		    (u_longlong_t)hdl);
2968 		return;
2969 	}
2970 
2971 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2972 	    (u_longlong_t)hdl);
2973 
2974 	svc->flags &= ~DSSF_LOOPBACK;
2975 	svc->svc_hdl = DS_BADHDL2;
2976 	svc->state = DS_SVC_INACTIVE;
2977 
2978 	if (svc->ops.ds_unreg_cb) {
2979 		DS_DBG_LOOP(CE_NOTE, "%s: loopback unregcb: hdl: 0x%llx" DS_EOL,
2980 		    __func__, (u_longlong_t)hdl);
2981 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
2982 	}
2983 }
2984 
2985 static void
2986 ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen)
2987 {
2988 	ds_svc_t *svc;
2989 
2990 	mutex_enter(&ds_svcs.lock);
2991 	if ((svc = ds_get_svc(hdl)) == NULL) {
2992 		mutex_exit(&ds_svcs.lock);
2993 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2994 		    (u_longlong_t)hdl);
2995 		return;
2996 	}
2997 	mutex_exit(&ds_svcs.lock);
2998 
2999 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
3000 	    (u_longlong_t)hdl);
3001 
3002 	if (svc->ops.ds_data_cb) {
3003 		DS_DBG_LOOP(CE_NOTE, "%s: loopback datacb hdl: 0x%llx" DS_EOL,
3004 		    __func__, (u_longlong_t)hdl);
3005 		(*svc->ops.ds_data_cb)(svc->ops.cb_arg, buf, buflen);
3006 	}
3007 }
3008 
3009 static int
3010 ds_loopback_set_svc(ds_svc_t *svc, ds_capability_t *cap, ds_svc_hdl_t *lb_hdlp)
3011 {
3012 	ds_svc_t *lb_svc;
3013 	ds_svc_hdl_t lb_hdl = *lb_hdlp;
3014 	int i;
3015 	int match = 0;
3016 	uint16_t new_major;
3017 	uint16_t new_minor;
3018 
3019 	if ((lb_svc = ds_get_svc(lb_hdl)) == NULL) {
3020 		DS_DBG_LOOP(CE_NOTE, "%s: loopback: hdl: 0x%llx invalid" DS_EOL,
3021 		    __func__, (u_longlong_t)lb_hdl);
3022 		return (ENXIO);
3023 	}
3024 
3025 	/* negotiate a version between loopback services, if possible */
3026 	for (i = 0; i < lb_svc->cap.nvers && match == 0; i++) {
3027 		match = negotiate_version(cap->nvers, cap->vers,
3028 		    lb_svc->cap.vers[i].major, &new_major, &new_minor);
3029 	}
3030 	if (!match) {
3031 		DS_DBG_LOOP(CE_NOTE, "%s: loopback version negotiate failed"
3032 		    DS_EOL, __func__);
3033 		return (ENOTSUP);
3034 	}
3035 	if (lb_svc->state != DS_SVC_INACTIVE) {
3036 		DS_DBG_LOOP(CE_NOTE, "%s: loopback active: hdl: 0x%llx"
3037 		    DS_EOL, __func__, (u_longlong_t)lb_hdl);
3038 		if ((lb_svc->flags & DSSF_ISCLIENT) == 0) {
3039 			DS_DBG_LOOP(CE_NOTE, "%s: loopback busy hdl: 0x%llx"
3040 			    DS_EOL, __func__, (u_longlong_t)lb_hdl);
3041 			return (EBUSY);
3042 		}
3043 		svc->state = DS_SVC_INACTIVE;	/* prevent alloc'ing svc */
3044 		lb_svc = ds_svc_clone(lb_svc);
3045 		DS_DBG_LOOP(CE_NOTE, "%s: loopback clone: ohdl: 0x%llx "
3046 		    "nhdl: 0x%llx" DS_EOL, __func__, (u_longlong_t)lb_hdl,
3047 		    (u_longlong_t)lb_svc->hdl);
3048 		*lb_hdlp = lb_svc->hdl;
3049 	}
3050 
3051 	svc->flags |= DSSF_LOOPBACK;
3052 	svc->svc_hdl = lb_svc->hdl;
3053 	svc->port = NULL;
3054 	svc->ver.major = new_major;
3055 	svc->ver.minor = new_minor;
3056 
3057 	lb_svc->flags |= DSSF_LOOPBACK;
3058 	lb_svc->svc_hdl = svc->hdl;
3059 	lb_svc->port = NULL;
3060 	lb_svc->ver.major = new_major;
3061 	lb_svc->ver.minor = new_minor;
3062 
3063 	DS_DBG_LOOP(CE_NOTE, "%s: setting loopback between: 0x%llx and 0x%llx"
3064 	    DS_EOL, __func__, (u_longlong_t)svc->hdl,
3065 	    (u_longlong_t)lb_svc->hdl);
3066 	return (0);
3067 }
3068 
3069 static ds_svc_t *
3070 ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl, ds_port_t *port)
3071 {
3072 	int		idx;
3073 	ds_svc_t	*svc;
3074 
3075 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s looking up clnt hdl: 0x%llx" DS_EOL,
3076 	    PORTID(port), __func__, (u_longlong_t)hdl);
3077 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3078 
3079 	/* walk every table entry */
3080 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
3081 		svc = ds_svcs.tbl[idx];
3082 		if (DS_SVC_ISFREE(svc))
3083 			continue;
3084 		if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3085 		    svc->svc_hdl == hdl && svc->port == port) {
3086 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s found clnt hdl "
3087 			    "0x%llx: svc%d" DS_EOL, PORTID(port), __func__,
3088 			    (u_longlong_t)hdl, (uint_t)DS_HDL2IDX(svc->hdl));
3089 			return (svc);
3090 		}
3091 	}
3092 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s clnt hdl: 0x%llx not found" DS_EOL,
3093 	    PORTID(port), __func__, (u_longlong_t)hdl);
3094 
3095 	return (NULL);
3096 }
3097 
3098 static ds_svc_t *
3099 ds_svc_clone(ds_svc_t *svc)
3100 {
3101 	ds_svc_t *newsvc;
3102 	ds_svc_hdl_t hdl;
3103 
3104 	ASSERT(svc->flags & DSSF_ISCLIENT);
3105 
3106 	newsvc = ds_alloc_svc();
3107 
3108 	/* Can only clone clients for now */
3109 	hdl = newsvc->hdl | DS_HDL_ISCLIENT_BIT;
3110 	DS_DBG_USR(CE_NOTE, "%s: cloning client: old hdl: 0x%llx new hdl: "
3111 	    "0x%llx" DS_EOL, __func__, (u_longlong_t)svc->hdl,
3112 	    (u_longlong_t)hdl);
3113 	(void) memcpy(newsvc, svc, sizeof (ds_svc_t));
3114 	newsvc->hdl = hdl;
3115 	newsvc->flags &= ~DSSF_LOOPBACK;
3116 	newsvc->port = NULL;
3117 	newsvc->svc_hdl = DS_BADHDL2;
3118 	newsvc->cap.svc_id = ds_strdup(svc->cap.svc_id);
3119 	newsvc->cap.vers = DS_MALLOC(svc->cap.nvers * sizeof (ds_ver_t));
3120 	(void) memcpy(newsvc->cap.vers, svc->cap.vers,
3121 	    svc->cap.nvers * sizeof (ds_ver_t));
3122 
3123 	/*
3124 	 * Kludge to allow lds driver user callbacks to get access to current
3125 	 * svc structure.  Arg could be index to svc table or some other piece
3126 	 * of info to get to the svc table entry.
3127 	 */
3128 	if (newsvc->flags & DSSF_ISUSER) {
3129 		newsvc->ops.cb_arg = (ds_cb_arg_t)(newsvc);
3130 	}
3131 	return (newsvc);
3132 }
3133 
3134 /*
3135  * Internal handle lookup function.
3136  */
3137 static int
3138 i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3139     uint_t maxhdls)
3140 {
3141 	int idx;
3142 	int nhdls = 0;
3143 	ds_svc_t *svc;
3144 	uint32_t client_flag = is_client ? DSSF_ISCLIENT : 0;
3145 
3146 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3147 
3148 	for (idx = 0; idx < ds_svcs.maxsvcs && nhdls < maxhdls; idx++) {
3149 		svc = ds_svcs.tbl[idx];
3150 		if (DS_SVC_ISFREE(svc))
3151 			continue;
3152 		if (strcmp(svc->cap.svc_id, service) == 0 &&
3153 		    (svc->flags & DSSF_ISCLIENT) == client_flag) {
3154 			if (hdlp != NULL && nhdls < maxhdls) {
3155 				hdlp[nhdls] = svc->hdl;
3156 				nhdls++;
3157 			} else {
3158 				nhdls++;
3159 			}
3160 		}
3161 	}
3162 	return (nhdls);
3163 }
3164 
3165 /*
3166  * Interface for ds_hdl_lookup in lds driver.
3167  */
3168 int
3169 ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3170     uint_t maxhdls, uint_t *nhdlsp)
3171 {
3172 	mutex_enter(&ds_svcs.lock);
3173 	*nhdlsp = i_ds_hdl_lookup(service, is_client, hdlp, maxhdls);
3174 	mutex_exit(&ds_svcs.lock);
3175 	return (0);
3176 }
3177 
3178 static void
3179 ds_portset_del_active_clients(char *service, ds_portset_t *portsp)
3180 {
3181 	ds_portset_t ports;
3182 	int idx;
3183 	ds_svc_t *svc;
3184 
3185 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3186 
3187 	DS_PORTSET_DUP(ports, *portsp);
3188 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
3189 		svc = ds_svcs.tbl[idx];
3190 		if (DS_SVC_ISFREE(svc))
3191 			continue;
3192 		if (strcmp(svc->cap.svc_id, service) == 0 &&
3193 		    (svc->flags & DSSF_ISCLIENT) != 0 &&
3194 		    svc->state != DS_SVC_INACTIVE &&
3195 		    svc->port != NULL) {
3196 			DS_PORTSET_DEL(ports, PORTID(svc->port));
3197 		}
3198 	}
3199 
3200 	/*
3201 	 * Never send a client reg req to the SP.
3202 	 */
3203 	if (ds_sp_port_id != DS_PORTID_INVALID) {
3204 		DS_PORTSET_DEL(ports, ds_sp_port_id);
3205 	}
3206 	DS_PORTSET_DUP(*portsp, ports);
3207 }
3208 
3209 /*
3210  * After an UNREG REQ, check if this is a client service with multiple
3211  * handles.  If it is, then we can eliminate this entry.
3212  */
3213 static void
3214 ds_check_for_dup_services(ds_svc_t *svc)
3215 {
3216 	if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3217 	    svc->state == DS_SVC_INACTIVE &&
3218 	    i_ds_hdl_lookup(svc->cap.svc_id, 1, NULL, 2) == 2) {
3219 		ds_delete_svc_entry(svc);
3220 	}
3221 }
3222 
3223 static void
3224 ds_delete_svc_entry(ds_svc_t *svc)
3225 {
3226 	ds_svc_hdl_t tmp_hdl;
3227 
3228 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3229 
3230 	/*
3231 	 * Clear out the structure, but do not deallocate the
3232 	 * memory. It can be reused for the next registration.
3233 	 */
3234 	DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
3235 	DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
3236 
3237 	/* save the handle to prevent reuse */
3238 	tmp_hdl = svc->hdl;
3239 	bzero((void *)svc, sizeof (ds_svc_t));
3240 
3241 	/* initialize for next use */
3242 	svc->hdl = tmp_hdl;
3243 	svc->state = DS_SVC_FREE;
3244 
3245 	ds_svcs.nsvcs--;
3246 }
3247