xref: /illumos-gate/usr/src/uts/sun4v/io/ds_common.c (revision 635216b673cf196ac523ff2a7ab715717e553292)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * Domain Services Module Common Code.
29  *
30  * This module is intended to be used by both Solaris and the VBSC
31  * module.
32  */
33 
34 #include <sys/modctl.h>
35 #include <sys/ksynch.h>
36 #include <sys/taskq.h>
37 #include <sys/disp.h>
38 #include <sys/cmn_err.h>
39 #include <sys/note.h>
40 #include <sys/mach_descrip.h>
41 #include <sys/mdesc.h>
42 #include <sys/ldc.h>
43 #include <sys/ds.h>
44 #include <sys/ds_impl.h>
45 
46 #ifndef MIN
47 #define	MIN(a, b)	((a) < (b) ? (a) : (b))
48 #endif
49 
50 #define	DS_DECODE_BUF_LEN		30
51 
52 /*
53  * All DS ports in the system
54  *
55  * The list of DS ports is read in from the MD when the DS module is
56  * initialized and is never modified. This eliminates the need for
57  * locking to access the port array itself. Access to the individual
58  * ports are synchronized at the port level.
59  */
60 ds_port_t	ds_ports[DS_MAX_PORTS];
61 ds_portset_t	ds_allports;	/* all DS ports in the system */
62 ds_portset_t	ds_nullport;	/* allows test against null portset */
63 
64 /* DS SP port id */
65 uint64_t ds_sp_port_id = DS_PORTID_INVALID;
66 
67 /*
68  * Table of registered services
69  *
70  * Locking: Accesses to the table of services are synchronized using
71  *   a mutex lock. The reader lock must be held when looking up service
72  *   information in the table. The writer lock must be held when any
73  *   service information is being modified.
74  */
75 ds_svcs_t	ds_svcs;
76 
77 /*
78  * Flag to prevent callbacks while in the middle of DS teardown.
79  */
80 boolean_t ds_enabled = B_FALSE;	/* enable/disable taskq processing */
81 
82 /*
83  * Retry count and delay for LDC reads and writes
84  */
85 #ifndef DS_DEFAULT_RETRIES
86 #define	DS_DEFAULT_RETRIES	10000	/* number of times to retry */
87 #endif
88 #ifndef DS_DEFAULT_DELAY
89 #define	DS_DEFAULT_DELAY	1000	/* usecs to wait between retries */
90 #endif
91 
92 static int ds_retries = DS_DEFAULT_RETRIES;
93 static clock_t ds_delay = DS_DEFAULT_DELAY;
94 
95 /*
96  * Supported versions of the DS message protocol
97  *
98  * The version array must be sorted in order from the highest
99  * supported version to the lowest. Support for a particular
100  * <major>.<minor> version implies all lower minor versions of
101  * that same major version are supported as well.
102  */
103 static ds_ver_t ds_vers[] = { { 1, 0 } };
104 
105 #define	DS_NUM_VER	(sizeof (ds_vers) / sizeof (ds_vers[0]))
106 
107 
108 /* incoming message handling functions */
109 typedef void (*ds_msg_handler_t)(ds_port_t *port, caddr_t buf, size_t len);
110 static void ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len);
111 static void ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len);
112 static void ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len);
113 static void ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len);
114 static void ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len);
115 static void ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len);
116 static void ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len);
117 static void ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len);
118 static void ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len);
119 static void ds_handle_data(ds_port_t *port, caddr_t buf, size_t len);
120 static void ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len);
121 
122 /*
123  * DS Message Handler Dispatch Table
124  *
125  * A table used to dispatch all incoming messages. This table
126  * contains handlers for all the fixed message types, as well as
127  * the the messages defined in the 1.0 version of the DS protocol.
128  * The handlers are indexed based on the DS header msg_type values
129  */
130 static const ds_msg_handler_t ds_msg_handlers[] = {
131 	ds_handle_init_req,		/* DS_INIT_REQ */
132 	ds_handle_init_ack,		/* DS_INIT_ACK */
133 	ds_handle_init_nack,		/* DS_INIT_NACK */
134 	ds_handle_reg_req,		/* DS_REG_REQ */
135 	ds_handle_reg_ack,		/* DS_REG_ACK */
136 	ds_handle_reg_nack,		/* DS_REG_NACK */
137 	ds_handle_unreg_req,		/* DS_UNREG */
138 	ds_handle_unreg_ack,		/* DS_UNREG_ACK */
139 	ds_handle_unreg_nack,		/* DS_UNREG_NACK */
140 	ds_handle_data,			/* DS_DATA */
141 	ds_handle_nack			/* DS_NACK */
142 };
143 
144 
145 
146 /* initialization functions */
147 static int ds_ldc_init(ds_port_t *port);
148 
149 /* event processing functions */
150 static uint_t ds_ldc_cb(uint64_t event, caddr_t arg);
151 static int ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep);
152 static void ds_handle_up_event(ds_port_t *port);
153 static void ds_handle_down_reset_events(ds_port_t *port);
154 static void ds_handle_recv(void *arg);
155 static void ds_dispatch_event(void *arg);
156 
157 /* message sending functions */
158 static int ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen);
159 static int ds_send_reg_req(ds_svc_t *svc, ds_port_t *port);
160 static void ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
161 static void ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
162 
163 /* walker functions */
164 static int ds_svc_isfree(ds_svc_t *svc, void *arg);
165 static int ds_svc_unregister(ds_svc_t *svc, void *arg);
166 static int ds_svc_port_up(ds_svc_t *svc, void *arg);
167 
168 /* service utilities */
169 static void ds_reset_svc(ds_svc_t *svc, ds_port_t *port);
170 static int ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port);
171 static int ds_svc_register_onport_walker(ds_svc_t *svc, void *arg);
172 static void ds_set_port_ready(ds_port_t *port, uint16_t major, uint16_t minor);
173 
174 /* port utilities */
175 static void ds_port_reset(ds_port_t *port);
176 static ldc_status_t ds_update_ldc_state(ds_port_t *port);
177 
178 /* misc utilities */
179 static void min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
180     uint16_t *min_major, uint16_t *max_major);
181 
182 /* debug */
183 static char *decode_ldc_events(uint64_t event, char *buf);
184 
185 /* loopback */
186 static void ds_loopback_register(ds_svc_hdl_t hdl);
187 static void ds_loopback_unregister(ds_svc_hdl_t hdl);
188 static void ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen);
189 static int ds_loopback_set_svc(ds_svc_t *svc, ds_capability_t *cap,
190     ds_svc_hdl_t *lb_hdlp);
191 
192 /* client handling */
193 static int i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
194     uint_t maxhdls);
195 static ds_svc_t *ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl,
196     ds_port_t *port);
197 static ds_svc_t *ds_find_svc_by_id_port(char *svc_id, int is_client,
198     ds_port_t *port);
199 static ds_svc_t *ds_svc_clone(ds_svc_t *svc);
200 static void ds_check_for_dup_services(ds_svc_t *svc);
201 static void ds_delete_svc_entry(ds_svc_t *svc);
202 
203 char *
204 ds_strdup(char *str)
205 {
206 	char *newstr;
207 
208 	newstr = DS_MALLOC(strlen(str) + 1);
209 	(void) strcpy(newstr, str);
210 	return (newstr);
211 }
212 
213 void
214 ds_common_init(void)
215 {
216 	/* Validate version table */
217 	ASSERT(ds_vers_isvalid(ds_vers, DS_NUM_VER) == DS_VERS_OK);
218 
219 	/* Initialize services table */
220 	ds_init_svcs_tbl(DS_MAXSVCS_INIT);
221 
222 	/* enable callback processing */
223 	ds_enabled = B_TRUE;
224 }
225 
226 /* BEGIN LDC SUPPORT FUNCTIONS */
227 
228 static char *
229 decode_ldc_events(uint64_t event, char *buf)
230 {
231 	buf[0] = 0;
232 	if (event & LDC_EVT_DOWN)	(void) strcat(buf, " DOWN");
233 	if (event & LDC_EVT_RESET)	(void) strcat(buf, " RESET");
234 	if (event & LDC_EVT_UP)		(void) strcat(buf, " UP");
235 	if (event & LDC_EVT_READ)	(void) strcat(buf, " READ");
236 	if (event & LDC_EVT_WRITE)	(void) strcat(buf, " WRITE");
237 	return (buf);
238 }
239 
240 static ldc_status_t
241 ds_update_ldc_state(ds_port_t *port)
242 {
243 	ldc_status_t	ldc_state;
244 	int		rv;
245 	char		ebuf[DS_EBUFSIZE];
246 
247 	ASSERT(MUTEX_HELD(&port->lock));
248 
249 	/*
250 	 * Read status and update ldc state info in port structure.
251 	 */
252 	if ((rv = ldc_status(port->ldc.hdl, &ldc_state)) != 0) {
253 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_status error: %s" DS_EOL,
254 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
255 		ldc_state = port->ldc.state;
256 	} else {
257 		port->ldc.state = ldc_state;
258 	}
259 
260 	return (ldc_state);
261 }
262 
263 static void
264 ds_handle_down_reset_events(ds_port_t *port)
265 {
266 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
267 	    __func__);
268 
269 	mutex_enter(&ds_svcs.lock);
270 	mutex_enter(&port->lock);
271 
272 	ds_sys_drain_events(port);
273 
274 	(void) ds_update_ldc_state(port);
275 
276 	/* reset the port state */
277 	ds_port_reset(port);
278 
279 	/* acknowledge the reset */
280 	(void) ldc_up(port->ldc.hdl);
281 
282 	mutex_exit(&port->lock);
283 	mutex_exit(&ds_svcs.lock);
284 
285 	ds_handle_up_event(port);
286 
287 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
288 }
289 
290 static void
291 ds_handle_up_event(ds_port_t *port)
292 {
293 	ldc_status_t	ldc_state;
294 
295 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
296 	    __func__);
297 
298 	mutex_enter(&port->lock);
299 
300 	ldc_state = ds_update_ldc_state(port);
301 
302 	mutex_exit(&port->lock);
303 
304 	if ((ldc_state == LDC_UP) && IS_DS_PORT(port)) {
305 		/*
306 		 * Initiate the handshake.
307 		 */
308 		ds_send_init_req(port);
309 	}
310 
311 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
312 }
313 
314 static uint_t
315 ds_ldc_cb(uint64_t event, caddr_t arg)
316 {
317 	ds_port_t	*port = (ds_port_t *)arg;
318 	char		evstring[DS_DECODE_BUF_LEN];
319 
320 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: %s event (%llx) received" DS_EOL,
321 	    PORTID(port), __func__, decode_ldc_events(event, evstring),
322 	    (u_longlong_t)event);
323 
324 	if (!ds_enabled) {
325 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: callback handling is disabled"
326 		    DS_EOL, PORTID(port), __func__);
327 		return (LDC_SUCCESS);
328 	}
329 
330 	if (event & (LDC_EVT_DOWN | LDC_EVT_RESET)) {
331 		ds_handle_down_reset_events(port);
332 		goto done;
333 	}
334 
335 	if (event & LDC_EVT_UP) {
336 		ds_handle_up_event(port);
337 	}
338 
339 	if (event & LDC_EVT_READ) {
340 		if (port->ldc.state != LDC_UP) {
341 			cmn_err(CE_WARN, "ds@%lx: %s: LDC READ event while "
342 			    "port not up" DS_EOL, PORTID(port), __func__);
343 			goto done;
344 		}
345 
346 		if (ds_sys_dispatch_func(ds_handle_recv, port)) {
347 			cmn_err(CE_WARN, "ds@%lx: error initiating LDC READ "
348 			    " event", PORTID(port));
349 		}
350 	}
351 
352 	if (event & LDC_EVT_WRITE) {
353 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: LDC WRITE event received, "
354 		    "not supported" DS_EOL, PORTID(port), __func__);
355 	}
356 
357 	if (event & ~(LDC_EVT_UP | LDC_EVT_READ)) {
358 		cmn_err(CE_WARN, "ds@%lx: %s: Unexpected LDC event received: "
359 		    "0x%llx" DS_EOL, PORTID(port), __func__,
360 		    (u_longlong_t)event);
361 	}
362 done:
363 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
364 
365 	return (LDC_SUCCESS);
366 }
367 
368 static int
369 ds_ldc_init(ds_port_t *port)
370 {
371 	int		rv;
372 	ldc_attr_t	ldc_attr;
373 	caddr_t		ldc_cb_arg = (caddr_t)port;
374 	char		ebuf[DS_EBUFSIZE];
375 
376 	ASSERT(MUTEX_HELD(&port->lock));
377 
378 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%lld" DS_EOL,
379 	    PORTID(port), __func__, (u_longlong_t)port->ldc.id);
380 
381 	ldc_attr.devclass = LDC_DEV_GENERIC;
382 	ldc_attr.instance = 0;
383 	ldc_attr.mode = LDC_MODE_RELIABLE;
384 	ldc_attr.mtu = DS_STREAM_MTU;
385 
386 	if ((rv = ldc_init(port->ldc.id, &ldc_attr, &port->ldc.hdl)) != 0) {
387 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_id: %lx, ldc_init error: %s"
388 		    DS_EOL, PORTID(port), __func__, port->ldc.id,
389 		    ds_errno_to_str(rv, ebuf));
390 		return (rv);
391 	}
392 
393 	rv = ldc_reg_callback(port->ldc.hdl, ds_ldc_cb, ldc_cb_arg);
394 	if (rv != 0) {
395 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_reg_callback error: %s"
396 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
397 		return (rv);
398 	}
399 
400 	ds_sys_ldc_init(port);
401 	return (0);
402 }
403 
404 int
405 ds_ldc_fini(ds_port_t *port)
406 {
407 	int	rv;
408 	char	ebuf[DS_EBUFSIZE];
409 
410 	ASSERT(port->state >= DS_PORT_LDC_INIT);
411 	ASSERT(MUTEX_HELD(&port->lock));
412 
413 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%ld" DS_EOL, PORTID(port),
414 	    __func__, port->ldc.id);
415 
416 	if ((rv = ldc_close(port->ldc.hdl)) != 0) {
417 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_close error: %s" DS_EOL,
418 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
419 		return (rv);
420 	}
421 
422 	if ((rv = ldc_unreg_callback(port->ldc.hdl)) != 0) {
423 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_unreg_callback error: %s"
424 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
425 		return (rv);
426 	}
427 
428 	if ((rv = ldc_fini(port->ldc.hdl)) != 0) {
429 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_fini error: %s" DS_EOL,
430 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
431 		return (rv);
432 	}
433 
434 	port->ldc.id = (uint64_t)-1;
435 	port->ldc.hdl = NULL;
436 	port->ldc.state = 0;
437 
438 	return (rv);
439 }
440 
441 /*
442  * Attempt to read a specified number of bytes from a particular LDC.
443  * Returns zero for success or the return code from the LDC read on
444  * failure. The actual number of bytes read from the LDC is returned
445  * in the size parameter.
446  */
447 static int
448 ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep)
449 {
450 	int	rv = 0;
451 	size_t	bytes_req = *sizep;
452 	size_t	bytes_left = bytes_req;
453 	size_t	nbytes;
454 	int	retry_count = 0;
455 	char	ebuf[DS_EBUFSIZE];
456 
457 	ASSERT(MUTEX_HELD(&port->rcv_lock));
458 
459 	*sizep = 0;
460 
461 	DS_DBG_LDC(CE_NOTE, "ds@%lx: attempting to read %ld bytes" DS_EOL,
462 	    PORTID(port), bytes_req);
463 
464 	while (bytes_left > 0) {
465 
466 		nbytes = bytes_left;
467 
468 		mutex_enter(&port->lock);
469 		if (port->ldc.state == LDC_UP) {
470 			rv = ldc_read(port->ldc.hdl, msgp, &nbytes);
471 		} else
472 			rv = ENXIO;
473 		mutex_exit(&port->lock);
474 		if (rv != 0) {
475 			if (rv == ECONNRESET) {
476 				break;
477 			} else if (rv != EAGAIN) {
478 				cmn_err(CE_NOTE, "ds@%lx: %s: %s" DS_EOL,
479 				    PORTID(port), __func__,
480 				    ds_errno_to_str(rv, ebuf));
481 				break;
482 			}
483 		} else {
484 			if (nbytes != 0) {
485 				DS_DBG_LDC(CE_NOTE, "ds@%lx: "
486 				    "read %ld bytes, %d retries" DS_EOL,
487 				    PORTID(port), nbytes, retry_count);
488 
489 				*sizep += nbytes;
490 				msgp += nbytes;
491 				bytes_left -= nbytes;
492 
493 				/* reset counter on a successful read */
494 				retry_count = 0;
495 				continue;
496 			}
497 
498 			/*
499 			 * No data was read. Check if this is the
500 			 * first attempt. If so, just return since
501 			 * nothing has been read yet.
502 			 */
503 			if (bytes_left == bytes_req) {
504 				DS_DBG_LDC(CE_NOTE, "ds@%lx: read zero bytes, "
505 				    " no data available" DS_EOL, PORTID(port));
506 				break;
507 			}
508 		}
509 
510 		/*
511 		 * A retry is necessary because the read returned
512 		 * EAGAIN, or a zero length read occurred after
513 		 * reading a partial message.
514 		 */
515 		if (retry_count++ >= ds_retries) {
516 			DS_DBG_LDC(CE_NOTE, "ds@%lx: timed out waiting for "
517 			    "message" DS_EOL, PORTID(port));
518 			break;
519 		}
520 
521 		drv_usecwait(ds_delay);
522 	}
523 
524 	return (rv);
525 }
526 
527 static void
528 ds_handle_recv(void *arg)
529 {
530 	ds_port_t	*port = (ds_port_t *)arg;
531 	char		*hbuf;
532 	size_t		msglen;
533 	size_t		read_size;
534 	boolean_t	hasdata;
535 	ds_hdr_t	hdr;
536 	uint8_t		*msg;
537 	char		*currp;
538 	int		rv;
539 	ds_event_t	*devent;
540 
541 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s..." DS_EOL, PORTID(port), __func__);
542 
543 	/*
544 	 * Read messages from the channel until there are none
545 	 * pending. Valid messages are dispatched to be handled
546 	 * by a separate thread while any malformed messages are
547 	 * dropped.
548 	 */
549 
550 	mutex_enter(&port->rcv_lock);
551 
552 	for (;;) {
553 		mutex_enter(&port->lock);
554 		if (port->ldc.state == LDC_UP) {
555 			rv = ldc_chkq(port->ldc.hdl, &hasdata);
556 		} else
557 			rv = ENXIO;
558 		mutex_exit(&port->lock);
559 		if (rv != 0 || !hasdata)
560 			break;
561 
562 		DS_DBG(CE_NOTE, "ds@%lx: %s: reading next message" DS_EOL,
563 		    PORTID(port), __func__);
564 
565 		/*
566 		 * Read in the next message.
567 		 */
568 		hbuf = (char *)&hdr;
569 		bzero(hbuf, DS_HDR_SZ);
570 		read_size = DS_HDR_SZ;
571 		currp = hbuf;
572 
573 		/* read in the message header */
574 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
575 			break;
576 		}
577 
578 		if (read_size < DS_HDR_SZ) {
579 			/*
580 			 * A zero length read is a valid signal that
581 			 * there is no data left on the channel.
582 			 */
583 			if (read_size != 0) {
584 				cmn_err(CE_WARN, "ds@%lx: invalid message "
585 				    "length, received %ld bytes, expected %ld"
586 				    DS_EOL, PORTID(port), read_size, DS_HDR_SZ);
587 			}
588 			continue;
589 		}
590 
591 		/* get payload size and allocate a buffer */
592 		read_size = ((ds_hdr_t *)hbuf)->payload_len;
593 		msglen = DS_HDR_SZ + read_size;
594 		msg = DS_MALLOC(msglen);
595 		if (!msg) {
596 			cmn_err(CE_WARN, "Memory allocation failed attempting "
597 			    " to allocate %d bytes." DS_EOL, (int)msglen);
598 			continue;
599 		}
600 
601 		DS_DBG(CE_NOTE, "ds@%lx: %s: message payload len %d" DS_EOL,
602 		    PORTID(port), __func__, (int)read_size);
603 
604 		/* move message header into buffer */
605 		(void) memcpy(msg, hbuf, DS_HDR_SZ);
606 		currp = (char *)(msg) + DS_HDR_SZ;
607 
608 		/* read in the message body */
609 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
610 			DS_FREE(msg, msglen);
611 			break;
612 		}
613 
614 		/* validate the size of the message */
615 		if ((DS_HDR_SZ + read_size) != msglen) {
616 			cmn_err(CE_WARN, "ds@%lx: %s: invalid message length, "
617 			    "received %ld bytes, expected %ld" DS_EOL,
618 			    PORTID(port), __func__, (DS_HDR_SZ + read_size),
619 			    msglen);
620 			DS_FREE(msg, msglen);
621 			continue;
622 		}
623 
624 		DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
625 
626 		/*
627 		 * Send the message for processing, and store it
628 		 * in the log. The memory is deallocated only when
629 		 * the message is removed from the log.
630 		 */
631 
632 		devent = DS_MALLOC(sizeof (ds_event_t));
633 		devent->port = port;
634 		devent->buf = (char *)msg;
635 		devent->buflen = msglen;
636 
637 		/* log the message */
638 		(void) ds_log_add_msg(DS_LOG_IN(port->id), msg, msglen);
639 
640 		if (ds_sys_dispatch_func(ds_dispatch_event, devent)) {
641 			cmn_err(CE_WARN, "ds@%lx: error initiating "
642 			    "event handler", PORTID(port));
643 			DS_FREE(devent, sizeof (ds_event_t));
644 		}
645 	}
646 
647 	mutex_exit(&port->rcv_lock);
648 
649 	/* handle connection reset errors returned from ds_recv_msg */
650 	if (rv == ECONNRESET) {
651 		ds_handle_down_reset_events(port);
652 	}
653 
654 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s done" DS_EOL, PORTID(port), __func__);
655 }
656 
657 static void
658 ds_dispatch_event(void *arg)
659 {
660 	ds_event_t	*event = (ds_event_t *)arg;
661 	ds_hdr_t	*hdr;
662 	ds_port_t	*port;
663 
664 	port = event->port;
665 
666 	hdr = (ds_hdr_t *)event->buf;
667 
668 	if (DS_MSG_TYPE_VALID(hdr->msg_type)) {
669 		DS_DBG(CE_NOTE, "ds@%lx: dispatch_event: msg_type=%d" DS_EOL,
670 		    PORTID(port), hdr->msg_type);
671 
672 		(*ds_msg_handlers[hdr->msg_type])(port, event->buf,
673 		    event->buflen);
674 	} else {
675 		cmn_err(CE_WARN, "ds@%lx: dispatch_event: invalid msg "
676 		    "type (%d)" DS_EOL, PORTID(port), hdr->msg_type);
677 	}
678 
679 	DS_FREE(event->buf, event->buflen);
680 	DS_FREE(event, sizeof (ds_event_t));
681 }
682 
683 int
684 ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen)
685 {
686 	int	rv;
687 	caddr_t	currp = msg;
688 	size_t	amt_left = msglen;
689 	int	loopcnt = 0;
690 
691 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s msglen: %ld" DS_EOL, PORTID(port),
692 	    __func__, msglen);
693 	DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
694 
695 	(void) ds_log_add_msg(DS_LOG_OUT(port->id), (uint8_t *)msg, msglen);
696 
697 	/*
698 	 * Ensure that no other messages can be sent on this port by holding
699 	 * the tx_lock mutex in case the write doesn't get sent with one write.
700 	 * This guarantees that the message doesn't become fragmented.
701 	 */
702 	mutex_enter(&port->tx_lock);
703 
704 	do {
705 		mutex_enter(&port->lock);
706 		if (port->ldc.state == LDC_UP) {
707 			rv = ldc_write(port->ldc.hdl, currp, &msglen);
708 		} else
709 			rv = ENXIO;
710 		mutex_exit(&port->lock);
711 		if (rv != 0) {
712 			if (rv == ECONNRESET) {
713 				mutex_exit(&port->tx_lock);
714 				(void) ds_sys_dispatch_func((void (*)(void *))
715 				    ds_handle_down_reset_events, port);
716 				return (rv);
717 			} else if ((rv == EWOULDBLOCK) &&
718 			    (loopcnt++ < ds_retries)) {
719 				drv_usecwait(ds_delay);
720 			} else {
721 				DS_DBG_PRCL(CE_NOTE, "ds@%lx: send_msg: "
722 				    "ldc_write failed (%d), %d bytes "
723 				    "remaining" DS_EOL, PORTID(port), rv,
724 				    (int)amt_left);
725 				goto error;
726 			}
727 		} else {
728 			amt_left -= msglen;
729 			currp += msglen;
730 			msglen = amt_left;
731 			loopcnt = 0;
732 		}
733 	} while (amt_left > 0);
734 error:
735 	mutex_exit(&port->tx_lock);
736 
737 	return (rv);
738 }
739 
740 /* END LDC SUPPORT FUNCTIONS */
741 
742 
743 /* BEGIN DS PROTOCOL SUPPORT FUNCTIONS */
744 
745 static void
746 ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len)
747 {
748 	ds_hdr_t	*hdr;
749 	ds_init_ack_t	*ack;
750 	ds_init_nack_t	*nack;
751 	char		*msg;
752 	size_t		msglen;
753 	ds_init_req_t	*req;
754 	size_t		explen = DS_MSG_LEN(ds_init_req_t);
755 	uint16_t	new_major;
756 	uint16_t	new_minor;
757 	boolean_t	match;
758 
759 	/* sanity check the incoming message */
760 	if (len != explen) {
761 		cmn_err(CE_WARN, "ds@%lx: <init_req: invalid message "
762 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
763 		    explen);
764 		return;
765 	}
766 
767 	req = (ds_init_req_t *)(buf + DS_HDR_SZ);
768 
769 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_req: ver=%d.%d" DS_EOL,
770 	    PORTID(port), req->major_vers, req->minor_vers);
771 
772 	match = negotiate_version(DS_NUM_VER, &ds_vers[0],
773 	    req->major_vers, &new_major, &new_minor);
774 
775 	/*
776 	 * Check version info. ACK only if the major numbers exactly
777 	 * match. The service entity can retry with a new minor
778 	 * based on the response sent as part of the NACK.
779 	 */
780 	if (match) {
781 		msglen = DS_MSG_LEN(ds_init_ack_t);
782 		msg = DS_MALLOC(msglen);
783 
784 		hdr = (ds_hdr_t *)msg;
785 		hdr->msg_type = DS_INIT_ACK;
786 		hdr->payload_len = sizeof (ds_init_ack_t);
787 
788 		ack = (ds_init_ack_t *)(msg + DS_HDR_SZ);
789 		ack->minor_vers = MIN(new_minor, req->minor_vers);
790 
791 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_ack>: minor=0x%04X" DS_EOL,
792 		    PORTID(port), MIN(new_minor, req->minor_vers));
793 	} else {
794 		msglen = DS_MSG_LEN(ds_init_nack_t);
795 		msg = DS_MALLOC(msglen);
796 
797 		hdr = (ds_hdr_t *)msg;
798 		hdr->msg_type = DS_INIT_NACK;
799 		hdr->payload_len = sizeof (ds_init_nack_t);
800 
801 		nack = (ds_init_nack_t *)(msg + DS_HDR_SZ);
802 		nack->major_vers = new_major;
803 
804 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_nack>: major=0x%04X" DS_EOL,
805 		    PORTID(port), new_major);
806 	}
807 
808 	/*
809 	 * Send the response
810 	 */
811 	(void) ds_send_msg(port, msg, msglen);
812 	DS_FREE(msg, msglen);
813 
814 	if (match) {
815 		ds_set_port_ready(port, req->major_vers, ack->minor_vers);
816 	}
817 }
818 
819 static void
820 ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len)
821 {
822 	ds_init_ack_t	*ack;
823 	ds_ver_t	*ver;
824 	uint16_t	major;
825 	uint16_t	minor;
826 	size_t		explen = DS_MSG_LEN(ds_init_ack_t);
827 
828 	/* sanity check the incoming message */
829 	if (len != explen) {
830 		cmn_err(CE_WARN, "ds@%lx: <init_ack: invalid message "
831 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
832 		    explen);
833 		return;
834 	}
835 
836 	ack = (ds_init_ack_t *)(buf + DS_HDR_SZ);
837 
838 	mutex_enter(&port->lock);
839 
840 	if (port->state == DS_PORT_READY) {
841 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: port ready" DS_EOL,
842 		    PORTID(port));
843 		mutex_exit(&port->lock);
844 		return;
845 	}
846 
847 	if (port->state != DS_PORT_INIT_REQ) {
848 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: invalid state: %d"
849 		    DS_EOL, PORTID(port), port->state);
850 		mutex_exit(&port->lock);
851 		return;
852 	}
853 
854 	ver = &(ds_vers[port->ver_idx]);
855 	major = ver->major;
856 	minor = MIN(ver->minor, ack->minor_vers);
857 	mutex_exit(&port->lock);
858 
859 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: port ready v%d.%d" DS_EOL,
860 	    PORTID(port), major, minor);
861 
862 	ds_set_port_ready(port, major, minor);
863 }
864 
865 static void
866 ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len)
867 {
868 	int		idx;
869 	ds_init_nack_t	*nack;
870 	ds_ver_t	*ver;
871 	size_t		explen = DS_MSG_LEN(ds_init_nack_t);
872 
873 	/* sanity check the incoming message */
874 	if (len != explen) {
875 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: invalid message "
876 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
877 		    explen);
878 		return;
879 	}
880 
881 	nack = (ds_init_nack_t *)(buf + DS_HDR_SZ);
882 
883 	mutex_enter(&port->lock);
884 
885 	if (port->state != DS_PORT_INIT_REQ) {
886 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: invalid state: %d"
887 		    DS_EOL, PORTID(port), port->state);
888 		mutex_exit(&port->lock);
889 		return;
890 	}
891 
892 	ver = &(ds_vers[port->ver_idx]);
893 
894 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: req=v%d.%d, nack=v%d.x"
895 	    DS_EOL, PORTID(port), ver->major, ver->minor, nack->major_vers);
896 
897 	if (nack->major_vers == 0) {
898 		/* no supported protocol version */
899 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS not supported"
900 		    DS_EOL, PORTID(port));
901 		mutex_exit(&port->lock);
902 		return;
903 	}
904 
905 	/*
906 	 * Walk the version list, looking for a major version
907 	 * that is as close to the requested major version as
908 	 * possible.
909 	 */
910 	for (idx = port->ver_idx; idx < DS_NUM_VER; idx++) {
911 		if (ds_vers[idx].major <= nack->major_vers) {
912 			/* found a version to try */
913 			goto done;
914 		}
915 	}
916 
917 	if (idx == DS_NUM_VER) {
918 		/* no supported version */
919 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS v%d.x not "
920 		    "supported" DS_EOL, PORTID(port), nack->major_vers);
921 
922 		mutex_exit(&port->lock);
923 		return;
924 	}
925 
926 done:
927 	/* start the handshake again */
928 	port->ver_idx = idx;
929 	port->state = DS_PORT_LDC_INIT;
930 	mutex_exit(&port->lock);
931 
932 	ds_send_init_req(port);
933 
934 }
935 
936 static ds_svc_t *
937 ds_find_svc_by_id_port(char *svc_id, int is_client, ds_port_t *port)
938 {
939 	int		idx;
940 	ds_svc_t	*svc, *found_svc = 0;
941 	uint32_t	flag_match = is_client ? DSSF_ISCLIENT : 0;
942 
943 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
944 
945 	/* walk every table entry */
946 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
947 		svc = ds_svcs.tbl[idx];
948 		if (DS_SVC_ISFREE(svc))
949 			continue;
950 		if (strcmp(svc->cap.svc_id, svc_id) != 0)
951 			continue;
952 		if ((svc->flags & DSSF_ISCLIENT) != flag_match)
953 			continue;
954 		if (port != NULL && svc->port == port) {
955 			return (svc);
956 		} else if (svc->state == DS_SVC_INACTIVE) {
957 			found_svc = svc;
958 		} else if (!found_svc) {
959 			found_svc = svc;
960 		}
961 	}
962 
963 	return (found_svc);
964 }
965 
966 static void
967 ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len)
968 {
969 	ds_reg_req_t	*req;
970 	ds_hdr_t	*hdr;
971 	ds_reg_ack_t	*ack;
972 	ds_reg_nack_t	*nack;
973 	char		*msg;
974 	size_t		msglen;
975 	size_t		explen = DS_MSG_LEN(ds_reg_req_t);
976 	ds_svc_t	*svc = NULL;
977 	ds_ver_t	version;
978 	uint16_t	new_major;
979 	uint16_t	new_minor;
980 	boolean_t	match;
981 
982 	/* sanity check the incoming message */
983 	if (len < explen) {
984 		cmn_err(CE_WARN, "ds@%lx: <reg_req: invalid message "
985 		    "length (%ld), expected at least %ld" DS_EOL,
986 		    PORTID(port), len, explen);
987 		return;
988 	}
989 
990 	req = (ds_reg_req_t *)(buf + DS_HDR_SZ);
991 
992 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' ver=%d.%d, hdl=0x%llx"
993 	    DS_EOL, PORTID(port), req->svc_id, req->major_vers, req->minor_vers,
994 	    (u_longlong_t)req->svc_handle);
995 
996 	mutex_enter(&ds_svcs.lock);
997 	svc = ds_find_svc_by_id_port(req->svc_id,
998 	    DS_HDL_ISCLIENT(req->svc_handle) == 0, port);
999 	if (svc == NULL) {
1000 
1001 do_reg_nack:
1002 		mutex_exit(&ds_svcs.lock);
1003 
1004 		msglen = DS_MSG_LEN(ds_reg_nack_t);
1005 		msg = DS_MALLOC(msglen);
1006 
1007 		hdr = (ds_hdr_t *)msg;
1008 		hdr->msg_type = DS_REG_NACK;
1009 		hdr->payload_len = sizeof (ds_reg_nack_t);
1010 
1011 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1012 		nack->svc_handle = req->svc_handle;
1013 		nack->result = DS_REG_VER_NACK;
1014 		nack->major_vers = 0;
1015 
1016 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s'" DS_EOL,
1017 		    PORTID(port), req->svc_id);
1018 		/*
1019 		 * Send the response
1020 		 */
1021 		(void) ds_send_msg(port, msg, msglen);
1022 		DS_FREE(msg, msglen);
1023 		return;
1024 	}
1025 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' found, hdl: 0x%llx" DS_EOL,
1026 	    PORTID(port), req->svc_id, (u_longlong_t)svc->hdl);
1027 
1028 	/*
1029 	 * A client sends out a reg req in order to force service providers to
1030 	 * initiate a reg req from their end (limitation in the protocol).  We
1031 	 * expect the service provider to be in the inactive (DS_SVC_INACTIVE)
1032 	 * state.  If the service provider has already sent out a reg req (the
1033 	 * state is DS_SVC_REG_PENDING) or has already handshaken (the
1034 	 * state is DS_SVC_ACTIVE), then we can simply ignore this reg
1035 	 * req.  For any other state, we force an unregister before initiating
1036 	 * a reg req.
1037 	 */
1038 
1039 	if (DS_HDL_ISCLIENT(req->svc_handle)) {
1040 		switch (svc->state) {
1041 
1042 		case DS_SVC_REG_PENDING:
1043 		case DS_SVC_ACTIVE:
1044 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1045 			    "client, state (%x)" DS_EOL, PORTID(port),
1046 			    req->svc_id, svc->state);
1047 			mutex_exit(&ds_svcs.lock);
1048 			return;
1049 
1050 		case DS_SVC_INACTIVE:
1051 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1052 			    "client" DS_EOL, PORTID(port), req->svc_id);
1053 			break;
1054 
1055 		default:
1056 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1057 			    "client forced unreg, state (%x)" DS_EOL,
1058 			    PORTID(port), req->svc_id, svc->state);
1059 			(void) ds_svc_unregister(svc, port);
1060 			break;
1061 		}
1062 		(void) ds_svc_port_up(svc, port);
1063 		(void) ds_svc_register_onport(svc, port);
1064 		mutex_exit(&ds_svcs.lock);
1065 		return;
1066 	}
1067 
1068 	/*
1069 	 * Only remote service providers can initiate a registration.  The
1070 	 * local sevice from here must be a client service.
1071 	 */
1072 
1073 	match = negotiate_version(svc->cap.nvers, svc->cap.vers,
1074 	    req->major_vers, &new_major, &new_minor);
1075 
1076 	/*
1077 	 * Check version info. ACK only if the major numbers exactly
1078 	 * match. The service entity can retry with a new minor
1079 	 * based on the response sent as part of the NACK.
1080 	 */
1081 	if (match) {
1082 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' svc%d: state: %x "
1083 		    "svc_portid: %d" DS_EOL, PORTID(port), req->svc_id,
1084 		    (int)DS_HDL2IDX(svc->hdl), svc->state,
1085 		    (int)(svc->port == NULL ? -1 : PORTID(svc->port)));
1086 		/*
1087 		 * If the current local service is already in use and
1088 		 * it's not on this port, clone it.
1089 		 */
1090 		if (svc->state != DS_SVC_INACTIVE) {
1091 			if (svc->port != NULL && port == svc->port) {
1092 				/*
1093 				 * Someone probably dropped an unreg req
1094 				 * somewhere.  Force a local unreg.
1095 				 */
1096 				(void) ds_svc_unregister(svc, port);
1097 			} else if (!DS_HDL_ISCLIENT(svc->hdl)) {
1098 				/*
1099 				 * Can't clone a non-client (service provider)
1100 				 * handle.  This is because old in-kernel
1101 				 * service providers can't deal with multiple
1102 				 * handles.
1103 				 */
1104 				goto do_reg_nack;
1105 			} else {
1106 				svc = ds_svc_clone(svc);
1107 			}
1108 		}
1109 		svc->port = port;
1110 		svc->svc_hdl = req->svc_handle;
1111 		svc->state = DS_SVC_ACTIVE;
1112 
1113 		msglen = DS_MSG_LEN(ds_reg_ack_t);
1114 		msg = DS_MALLOC(msglen);
1115 
1116 		hdr = (ds_hdr_t *)msg;
1117 		hdr->msg_type = DS_REG_ACK;
1118 		hdr->payload_len = sizeof (ds_reg_ack_t);
1119 
1120 		ack = (ds_reg_ack_t *)(msg + DS_HDR_SZ);
1121 		ack->svc_handle = req->svc_handle;
1122 		ack->minor_vers = MIN(new_minor, req->minor_vers);
1123 
1124 
1125 		if (svc->ops.ds_reg_cb) {
1126 			/* Call the registration callback */
1127 			version.major = req->major_vers;
1128 			version.minor = ack->minor_vers;
1129 			(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &version,
1130 			    svc->hdl);
1131 		}
1132 		mutex_exit(&ds_svcs.lock);
1133 
1134 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_ack>: '%s' minor=0x%04X"
1135 		    DS_EOL, PORTID(port), svc->cap.svc_id,
1136 		    MIN(new_minor, req->minor_vers));
1137 	} else {
1138 		mutex_exit(&ds_svcs.lock);
1139 
1140 		msglen = DS_MSG_LEN(ds_reg_nack_t);
1141 		msg = DS_MALLOC(msglen);
1142 
1143 		hdr = (ds_hdr_t *)msg;
1144 		hdr->msg_type = DS_REG_NACK;
1145 		hdr->payload_len = sizeof (ds_reg_nack_t);
1146 
1147 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1148 		nack->svc_handle = req->svc_handle;
1149 		nack->result = DS_REG_VER_NACK;
1150 		nack->major_vers = new_major;
1151 
1152 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s' major=0x%04X"
1153 		    DS_EOL, PORTID(port), svc->cap.svc_id, new_major);
1154 	}
1155 
1156 	/* send message */
1157 	(void) ds_send_msg(port, msg, msglen);
1158 	DS_FREE(msg, msglen);
1159 }
1160 
1161 static void
1162 ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len)
1163 {
1164 	ds_reg_ack_t	*ack;
1165 	ds_ver_t	*ver;
1166 	ds_ver_t	tmpver;
1167 	ds_svc_t	*svc;
1168 	size_t		explen = DS_MSG_LEN(ds_reg_ack_t);
1169 
1170 	/* sanity check the incoming message */
1171 	if (len != explen) {
1172 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid message "
1173 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1174 		    explen);
1175 		return;
1176 	}
1177 
1178 	ack = (ds_reg_ack_t *)(buf + DS_HDR_SZ);
1179 
1180 	mutex_enter(&ds_svcs.lock);
1181 
1182 	/*
1183 	 * This searches for service based on how we generate handles
1184 	 * and so only works because this is a reg ack.
1185 	 */
1186 	if (DS_HDL_ISCLIENT(ack->svc_handle) ||
1187 	    (svc = ds_get_svc(ack->svc_handle)) == NULL) {
1188 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid handle 0x%llx"
1189 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1190 		goto done;
1191 	}
1192 
1193 	/* make sure the message makes sense */
1194 	if (svc->state != DS_SVC_REG_PENDING) {
1195 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid state (%d)" DS_EOL,
1196 		    PORTID(port), svc->state);
1197 		goto done;
1198 	}
1199 
1200 	ver = &(svc->cap.vers[svc->ver_idx]);
1201 
1202 	/* major version has been agreed upon */
1203 	svc->ver.major = ver->major;
1204 
1205 	if (ack->minor_vers >= ver->minor) {
1206 		/*
1207 		 * Use the minor version specified in the
1208 		 * original request.
1209 		 */
1210 		svc->ver.minor = ver->minor;
1211 	} else {
1212 		/*
1213 		 * Use the lower minor version returned in
1214 		 * the ack. By defninition, all lower minor
1215 		 * versions must be supported.
1216 		 */
1217 		svc->ver.minor = ack->minor_vers;
1218 	}
1219 
1220 	svc->state = DS_SVC_ACTIVE;
1221 	svc->port = port;
1222 
1223 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_ack: '%s' v%d.%d ready, hdl=0x%llx"
1224 	    DS_EOL, PORTID(port), svc->cap.svc_id, svc->ver.major,
1225 	    svc->ver.minor, (u_longlong_t)svc->hdl);
1226 
1227 	/* notify the client that registration is complete */
1228 	if (svc->ops.ds_reg_cb) {
1229 		/*
1230 		 * Use a temporary version structure so that
1231 		 * the copy in the svc structure cannot be
1232 		 * modified by the client.
1233 		 */
1234 		tmpver.major = svc->ver.major;
1235 		tmpver.minor = svc->ver.minor;
1236 
1237 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &tmpver, svc->hdl);
1238 	}
1239 
1240 done:
1241 	mutex_exit(&ds_svcs.lock);
1242 }
1243 
1244 static void
1245 ds_try_next_port(ds_svc_t *svc, int portid)
1246 {
1247 	ds_port_t *port;
1248 	ds_portset_t totry;
1249 	int i;
1250 
1251 	DS_DBG_LDC(CE_NOTE, "ds@%x %s" DS_EOL, portid, __func__);
1252 
1253 	/*
1254 	 * Get the ports that haven't been tried yet and are available to try.
1255 	 */
1256 	DS_PORTSET_DUP(totry, svc->avail);
1257 	for (i = 0; i < DS_MAX_PORTS; i++) {
1258 		if (DS_PORT_IN_SET(svc->tried, i))
1259 			DS_PORTSET_DEL(totry, i);
1260 	}
1261 
1262 	if (DS_PORTSET_ISNULL(totry))
1263 		return;
1264 
1265 	for (i = 0; i < DS_MAX_PORTS; i++, portid++) {
1266 		if (portid >= DS_MAX_PORTS) {
1267 			portid = 0;
1268 		}
1269 
1270 		/*
1271 		 * If the port is not in the available list,
1272 		 * it is not a candidate for registration.
1273 		 */
1274 		if (!DS_PORT_IN_SET(totry, portid)) {
1275 			continue;
1276 		}
1277 
1278 		port = &ds_ports[portid];
1279 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s trying ldc.id: %d" DS_EOL,
1280 		    portid, __func__, (uint_t)(port->ldc.id));
1281 		if (ds_send_reg_req(svc, port) == 0) {
1282 			DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send OK" DS_EOL,
1283 			    portid, __func__);
1284 			/* register sent successfully */
1285 			break;
1286 		}
1287 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send FAIL" DS_EOL,
1288 		    portid, __func__);
1289 
1290 		/* reset the service to try the next port */
1291 		ds_reset_svc(svc, port);
1292 	}
1293 }
1294 
1295 static void
1296 ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len)
1297 {
1298 	ds_reg_nack_t	*nack;
1299 	ds_svc_t	*svc;
1300 	int		idx;
1301 	size_t		explen = DS_MSG_LEN(ds_reg_nack_t);
1302 
1303 	/* sanity check the incoming message */
1304 	if (len != explen) {
1305 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid message "
1306 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1307 		    explen);
1308 		return;
1309 	}
1310 
1311 	nack = (ds_reg_nack_t *)(buf + DS_HDR_SZ);
1312 
1313 	mutex_enter(&ds_svcs.lock);
1314 
1315 	/*
1316 	 * We expect a reg_nack for a client ping.
1317 	 */
1318 	if (DS_HDL_ISCLIENT(nack->svc_handle)) {
1319 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: ping hdl: 0x%llx"
1320 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1321 		goto done;
1322 	}
1323 
1324 	/*
1325 	 * This searches for service based on how we generate handles
1326 	 * and so only works because this is a reg nack.
1327 	 */
1328 	if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1329 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid handle 0x%llx"
1330 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1331 		goto done;
1332 	}
1333 
1334 	/* make sure the message makes sense */
1335 	if (svc->state != DS_SVC_REG_PENDING) {
1336 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' handle: 0x%llx "
1337 		    "invalid state (%d)" DS_EOL, PORTID(port), svc->cap.svc_id,
1338 		    (u_longlong_t)nack->svc_handle, svc->state);
1339 		goto done;
1340 	}
1341 
1342 	if (nack->result == DS_REG_DUP) {
1343 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: duplicate registration "
1344 		    " for %s" DS_EOL, PORTID(port), svc->cap.svc_id);
1345 		ds_reset_svc(svc, port);
1346 		goto done;
1347 	}
1348 
1349 	/*
1350 	 * A major version of zero indicates that the
1351 	 * service is not supported at all.
1352 	 */
1353 	if (nack->major_vers == 0) {
1354 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' not supported"
1355 		    DS_EOL, PORTID(port), svc->cap.svc_id);
1356 		ds_reset_svc(svc, port);
1357 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1358 			ds_try_next_port(svc, PORTID(port) + 1);
1359 		goto done;
1360 	}
1361 
1362 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' hdl=0x%llx, nack=%d.x"
1363 	    DS_EOL, PORTID(port), svc->cap.svc_id,
1364 	    (u_longlong_t)nack->svc_handle, nack->major_vers);
1365 
1366 	/*
1367 	 * Walk the version list for the service, looking for
1368 	 * a major version that is as close to the requested
1369 	 * major version as possible.
1370 	 */
1371 	for (idx = svc->ver_idx; idx < svc->cap.nvers; idx++) {
1372 		if (svc->cap.vers[idx].major <= nack->major_vers) {
1373 			/* found a version to try */
1374 			break;
1375 		}
1376 	}
1377 
1378 	if (idx == svc->cap.nvers) {
1379 		/* no supported version */
1380 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: %s v%d.x not supported"
1381 		    DS_EOL, PORTID(port), svc->cap.svc_id, nack->major_vers);
1382 		ds_reset_svc(svc, port);
1383 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1384 			ds_try_next_port(svc, PORTID(port) + 1);
1385 		goto done;
1386 	}
1387 
1388 	/* start the handshake again */
1389 	svc->state = DS_SVC_INACTIVE;
1390 	svc->ver_idx = idx;
1391 
1392 	(void) ds_svc_register(svc, NULL);
1393 
1394 done:
1395 	mutex_exit(&ds_svcs.lock);
1396 }
1397 
1398 static void
1399 ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len)
1400 {
1401 	ds_hdr_t	*hdr;
1402 	ds_unreg_req_t	*req;
1403 	ds_unreg_ack_t	*ack;
1404 	ds_svc_t	*svc;
1405 	char		*msg;
1406 	size_t		msglen;
1407 	size_t		explen = DS_MSG_LEN(ds_unreg_req_t);
1408 	boolean_t	is_up;
1409 
1410 	/* sanity check the incoming message */
1411 	if (len != explen) {
1412 		cmn_err(CE_WARN, "ds@%lx: <unreg_req: invalid message "
1413 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1414 		    explen);
1415 		return;
1416 	}
1417 
1418 	req = (ds_unreg_req_t *)(buf + DS_HDR_SZ);
1419 
1420 	mutex_enter(&ds_svcs.lock);
1421 
1422 	/* lookup appropriate client or service */
1423 	if (DS_HDL_ISCLIENT(req->svc_handle) ||
1424 	    ((svc = ds_find_clnt_svc_by_hdl_port(req->svc_handle, port))
1425 	    == NULL && ((svc = ds_get_svc(req->svc_handle)) == NULL ||
1426 	    svc->port != port))) {
1427 		mutex_exit(&ds_svcs.lock);
1428 		mutex_enter(&port->lock);
1429 		is_up = (port->ldc.state == LDC_UP);
1430 		mutex_exit(&port->lock);
1431 		if (!is_up)
1432 			return;
1433 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_req: invalid handle 0x%llx"
1434 		    DS_EOL, PORTID(port), (u_longlong_t)req->svc_handle);
1435 		ds_send_unreg_nack(port, req->svc_handle);
1436 		return;
1437 	}
1438 
1439 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_req: '%s' handle 0x%llx" DS_EOL,
1440 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1441 
1442 	(void) ds_svc_unregister(svc, svc->port);
1443 
1444 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_ack>: '%s' hdl=0x%llx" DS_EOL,
1445 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1446 
1447 	ds_check_for_dup_services(svc);
1448 
1449 	mutex_exit(&ds_svcs.lock);
1450 
1451 	msglen = DS_HDR_SZ + sizeof (ds_unreg_ack_t);
1452 	msg = DS_MALLOC(msglen);
1453 
1454 	hdr = (ds_hdr_t *)msg;
1455 	hdr->msg_type = DS_UNREG_ACK;
1456 	hdr->payload_len = sizeof (ds_unreg_ack_t);
1457 
1458 	ack = (ds_unreg_ack_t *)(msg + DS_HDR_SZ);
1459 	ack->svc_handle = req->svc_handle;
1460 
1461 	/* send message */
1462 	(void) ds_send_msg(port, msg, msglen);
1463 	DS_FREE(msg, msglen);
1464 
1465 }
1466 
1467 static void
1468 ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len)
1469 {
1470 	ds_unreg_ack_t	*ack;
1471 	size_t		explen = DS_MSG_LEN(ds_unreg_ack_t);
1472 
1473 	/* sanity check the incoming message */
1474 	if (len != explen) {
1475 		cmn_err(CE_WARN, "ds@%lx: <unreg_ack: invalid message "
1476 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1477 		    explen);
1478 		return;
1479 	}
1480 
1481 	ack = (ds_unreg_ack_t *)(buf + DS_HDR_SZ);
1482 
1483 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: hdl=0x%llx" DS_EOL,
1484 	    PORTID(port), (u_longlong_t)ack->svc_handle);
1485 
1486 #ifdef DEBUG
1487 	mutex_enter(&ds_svcs.lock);
1488 
1489 	/*
1490 	 * Since the unregister request was initiated locally,
1491 	 * the service structure has already been torn down.
1492 	 * Just perform a sanity check to make sure the message
1493 	 * is appropriate.
1494 	 */
1495 	if (ds_get_svc(ack->svc_handle) != NULL) {
1496 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: handle 0x%llx in use"
1497 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1498 	}
1499 
1500 	mutex_exit(&ds_svcs.lock);
1501 #endif	/* DEBUG */
1502 }
1503 
1504 static void
1505 ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len)
1506 {
1507 	ds_unreg_nack_t	*nack;
1508 	size_t		explen = DS_MSG_LEN(ds_unreg_nack_t);
1509 
1510 	/* sanity check the incoming message */
1511 	if (len != explen) {
1512 		cmn_err(CE_WARN, "ds@%lx: <unreg_nack: invalid message "
1513 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1514 		    explen);
1515 		return;
1516 	}
1517 
1518 	nack = (ds_unreg_nack_t *)(buf + DS_HDR_SZ);
1519 
1520 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: hdl=0x%llx" DS_EOL,
1521 	    PORTID(port), (u_longlong_t)nack->svc_handle);
1522 
1523 #ifdef DEBUG
1524 	mutex_enter(&ds_svcs.lock);
1525 
1526 	/*
1527 	 * Since the unregister request was initiated locally,
1528 	 * the service structure has already been torn down.
1529 	 * Just perform a sanity check to make sure the message
1530 	 * is appropriate.
1531 	 */
1532 	if (ds_get_svc(nack->svc_handle) != NULL) {
1533 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: handle 0x%llx in use"
1534 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1535 	}
1536 
1537 	mutex_exit(&ds_svcs.lock);
1538 #endif	/* DEBUG */
1539 }
1540 
1541 static void
1542 ds_handle_data(ds_port_t *port, caddr_t buf, size_t len)
1543 {
1544 	ds_data_handle_t	*data;
1545 	ds_svc_t		*svc;
1546 	char			*msg;
1547 	int			msgsz;
1548 	int			hdrsz;
1549 	size_t			explen = DS_MSG_LEN(ds_data_handle_t);
1550 
1551 	/* sanity check the incoming message */
1552 	if (len < explen) {
1553 		cmn_err(CE_WARN, "ds@%lx: <data: invalid message length "
1554 		    "(%ld), expected at least %ld" DS_EOL, PORTID(port), len,
1555 		    explen);
1556 		return;
1557 	}
1558 
1559 	data = (ds_data_handle_t *)(buf + DS_HDR_SZ);
1560 
1561 	hdrsz = DS_HDR_SZ + sizeof (ds_data_handle_t);
1562 	msgsz = len - hdrsz;
1563 
1564 	/* strip off the header for the client */
1565 	msg = (msgsz) ? (buf + hdrsz) : NULL;
1566 
1567 	mutex_enter(&ds_svcs.lock);
1568 
1569 	if ((svc = ds_find_clnt_svc_by_hdl_port(data->svc_handle, port))
1570 	    == NULL) {
1571 		if ((svc = ds_get_svc(data->svc_handle)) == NULL) {
1572 			mutex_exit(&ds_svcs.lock);
1573 			cmn_err(CE_WARN, "ds@%lx: <data: invalid handle 0x%llx"
1574 			    DS_EOL, PORTID(port),
1575 			    (u_longlong_t)data->svc_handle);
1576 			ds_send_data_nack(port, data->svc_handle);
1577 			return;
1578 		}
1579 	}
1580 
1581 	mutex_exit(&ds_svcs.lock);
1582 
1583 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <data: '%s' hdl=0x%llx" DS_EOL,
1584 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)svc->hdl);
1585 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msgsz);
1586 
1587 	/* dispatch this message to the client */
1588 	(*svc->ops.ds_data_cb)(svc->ops.cb_arg, msg, msgsz);
1589 }
1590 
1591 static void
1592 ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len)
1593 {
1594 	ds_svc_t	*svc;
1595 	ds_data_nack_t	*nack;
1596 	size_t		explen = DS_MSG_LEN(ds_data_nack_t);
1597 
1598 	/* sanity check the incoming message */
1599 	if (len != explen) {
1600 		cmn_err(CE_WARN, "ds@%lx: <data_nack: invalid message "
1601 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1602 		    explen);
1603 		return;
1604 	}
1605 
1606 	nack = (ds_data_nack_t *)(buf + DS_HDR_SZ);
1607 
1608 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack: hdl=0x%llx, result=0x%llx"
1609 	    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle,
1610 	    (u_longlong_t)nack->result);
1611 
1612 	if (nack->result == DS_INV_HDL) {
1613 
1614 		mutex_enter(&ds_svcs.lock);
1615 
1616 		if ((svc = ds_find_clnt_svc_by_hdl_port(nack->svc_handle,
1617 		    port)) == NULL) {
1618 			if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1619 				mutex_exit(&ds_svcs.lock);
1620 				return;
1621 			}
1622 		}
1623 
1624 		cmn_err(CE_WARN, "ds@%lx: <data_nack: handle 0x%llx reported "
1625 		    " as invalid" DS_EOL, PORTID(port),
1626 		    (u_longlong_t)nack->svc_handle);
1627 
1628 		(void) ds_svc_unregister(svc, svc->port);
1629 
1630 		mutex_exit(&ds_svcs.lock);
1631 	}
1632 }
1633 
1634 /* Initialize the port */
1635 void
1636 ds_send_init_req(ds_port_t *port)
1637 {
1638 	ds_hdr_t	*hdr;
1639 	ds_init_req_t	*init_req;
1640 	size_t		msglen;
1641 	ds_ver_t	*vers = &ds_vers[port->ver_idx];
1642 
1643 	mutex_enter(&port->lock);
1644 	if (port->state != DS_PORT_LDC_INIT) {
1645 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: invalid state: %d"
1646 		    DS_EOL, PORTID(port), port->state);
1647 		mutex_exit(&port->lock);
1648 		return;
1649 	}
1650 	mutex_exit(&port->lock);
1651 
1652 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: req=v%d.%d" DS_EOL,
1653 	    PORTID(port), vers->major, vers->minor);
1654 
1655 	msglen = DS_HDR_SZ + sizeof (ds_init_req_t);
1656 	hdr = DS_MALLOC(msglen);
1657 
1658 	hdr->msg_type = DS_INIT_REQ;
1659 	hdr->payload_len = sizeof (ds_init_req_t);
1660 
1661 	init_req = (ds_init_req_t *)((caddr_t)hdr + DS_HDR_SZ);
1662 	init_req->major_vers = vers->major;
1663 	init_req->minor_vers = vers->minor;
1664 
1665 	if (ds_send_msg(port, (caddr_t)hdr, msglen) == 0) {
1666 		/*
1667 		 * We've left the port state unlocked over the malloc/send,
1668 		 * make sure no one has changed the state under us before
1669 		 * we update the state.
1670 		 */
1671 		mutex_enter(&port->lock);
1672 		if (port->state == DS_PORT_LDC_INIT)
1673 			port->state = DS_PORT_INIT_REQ;
1674 		mutex_exit(&port->lock);
1675 	}
1676 	DS_FREE(hdr, msglen);
1677 }
1678 
1679 static int
1680 ds_send_reg_req(ds_svc_t *svc, ds_port_t *port)
1681 {
1682 	ds_ver_t	*ver;
1683 	ds_hdr_t	*hdr;
1684 	caddr_t		msg;
1685 	size_t		msglen;
1686 	ds_reg_req_t	*req;
1687 	size_t		idlen;
1688 	int		rv;
1689 
1690 	if ((svc->state != DS_SVC_INACTIVE) &&
1691 	    ((svc->flags & DSSF_ISCLIENT) == 0)) {
1692 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: invalid svc state (%d) "
1693 		    "for svc '%s'" DS_EOL, PORTID(port), svc->state,
1694 		    svc->cap.svc_id);
1695 		return (-1);
1696 	}
1697 
1698 	mutex_enter(&port->lock);
1699 
1700 	/* check on the LDC to Zeus */
1701 	if (port->ldc.state != LDC_UP) {
1702 		/* can not send message */
1703 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: channel %ld is not up"
1704 		    DS_EOL, PORTID(port), port->ldc.id);
1705 		mutex_exit(&port->lock);
1706 		return (-1);
1707 	}
1708 
1709 	/* make sure port is ready */
1710 	if (port->state != DS_PORT_READY) {
1711 		/* can not send message */
1712 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: port is not ready"
1713 		    DS_EOL, PORTID(port));
1714 		mutex_exit(&port->lock);
1715 		return (-1);
1716 	}
1717 
1718 	mutex_exit(&port->lock);
1719 
1720 	/* allocate the message buffer */
1721 	idlen = strlen(svc->cap.svc_id);
1722 	msglen = DS_HDR_SZ + sizeof (ds_reg_req_t) + idlen;
1723 	msg = DS_MALLOC(msglen);
1724 
1725 	/* copy in the header data */
1726 	hdr = (ds_hdr_t *)msg;
1727 	hdr->msg_type = DS_REG_REQ;
1728 	hdr->payload_len = sizeof (ds_reg_req_t) + idlen;
1729 
1730 	req = (ds_reg_req_t *)(msg + DS_HDR_SZ);
1731 	req->svc_handle = svc->hdl;
1732 	ver = &(svc->cap.vers[svc->ver_idx]);
1733 	req->major_vers = ver->major;
1734 	req->minor_vers = ver->minor;
1735 
1736 	/* copy in the service id */
1737 	(void) memcpy(req->svc_id, svc->cap.svc_id, idlen + 1);
1738 
1739 	/* send the message */
1740 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: '%s' ver=%d.%d, hdl=0x%llx"
1741 	    DS_EOL, PORTID(port), svc->cap.svc_id, ver->major, ver->minor,
1742 	    (u_longlong_t)svc->hdl);
1743 
1744 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1745 		svc->port = port;
1746 		rv = -1;
1747 	} else if ((svc->flags & DSSF_ISCLIENT) == 0) {
1748 		svc->state = DS_SVC_REG_PENDING;
1749 	}
1750 	DS_FREE(msg, msglen);
1751 
1752 	return (rv);
1753 }
1754 
1755 /*
1756  * Keep around in case we want this later
1757  */
1758 int
1759 ds_send_unreg_req(ds_svc_t *svc)
1760 {
1761 	caddr_t		msg;
1762 	size_t		msglen;
1763 	ds_hdr_t	*hdr;
1764 	ds_unreg_req_t	*req;
1765 	ds_port_t	*port = svc->port;
1766 	int		rv;
1767 
1768 	if (port == NULL) {
1769 		DS_DBG(CE_NOTE, "send_unreg_req: service '%s' not "
1770 		    "associated with a port" DS_EOL, svc->cap.svc_id);
1771 		return (-1);
1772 	}
1773 
1774 	mutex_enter(&port->lock);
1775 
1776 	/* check on the LDC to Zeus */
1777 	if (port->ldc.state != LDC_UP) {
1778 		/* can not send message */
1779 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: channel %ld is not up"
1780 		    DS_EOL, PORTID(port), port->ldc.id);
1781 		mutex_exit(&port->lock);
1782 		return (-1);
1783 	}
1784 
1785 	/* make sure port is ready */
1786 	if (port->state != DS_PORT_READY) {
1787 		/* can not send message */
1788 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: port is not ready" DS_EOL,
1789 		    PORTID(port));
1790 		mutex_exit(&port->lock);
1791 		return (-1);
1792 	}
1793 
1794 	mutex_exit(&port->lock);
1795 
1796 	msglen = DS_HDR_SZ + sizeof (ds_unreg_req_t);
1797 	msg = DS_MALLOC(msglen);
1798 
1799 	/* copy in the header data */
1800 	hdr = (ds_hdr_t *)msg;
1801 	hdr->msg_type = DS_UNREG;
1802 	hdr->payload_len = sizeof (ds_unreg_req_t);
1803 
1804 	req = (ds_unreg_req_t *)(msg + DS_HDR_SZ);
1805 	if (svc->flags & DSSF_ISCLIENT) {
1806 		req->svc_handle = svc->svc_hdl;
1807 	} else {
1808 		req->svc_handle = svc->hdl;
1809 	}
1810 
1811 	/* send the message */
1812 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_req>: '%s' hdl=0x%llx" DS_EOL,
1813 	    PORTID(port), (svc->cap.svc_id) ? svc->cap.svc_id : "NULL",
1814 	    (u_longlong_t)svc->hdl);
1815 
1816 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1817 		rv = -1;
1818 	}
1819 	DS_FREE(msg, msglen);
1820 
1821 	return (rv);
1822 }
1823 
1824 static void
1825 ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1826 {
1827 	caddr_t		msg;
1828 	size_t		msglen;
1829 	ds_hdr_t	*hdr;
1830 	ds_unreg_nack_t	*nack;
1831 
1832 	mutex_enter(&port->lock);
1833 
1834 	/* check on the LDC to Zeus */
1835 	if (port->ldc.state != LDC_UP) {
1836 		/* can not send message */
1837 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: channel %ld is not up"
1838 		    DS_EOL, PORTID(port), port->ldc.id);
1839 		mutex_exit(&port->lock);
1840 		return;
1841 	}
1842 
1843 	/* make sure port is ready */
1844 	if (port->state != DS_PORT_READY) {
1845 		/* can not send message */
1846 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: port is not ready"
1847 		    DS_EOL, PORTID(port));
1848 		mutex_exit(&port->lock);
1849 		return;
1850 	}
1851 
1852 	mutex_exit(&port->lock);
1853 
1854 	msglen = DS_HDR_SZ + sizeof (ds_unreg_nack_t);
1855 	msg = DS_MALLOC(msglen);
1856 
1857 	/* copy in the header data */
1858 	hdr = (ds_hdr_t *)msg;
1859 	hdr->msg_type = DS_UNREG_NACK;
1860 	hdr->payload_len = sizeof (ds_unreg_nack_t);
1861 
1862 	nack = (ds_unreg_nack_t *)(msg + DS_HDR_SZ);
1863 	nack->svc_handle = bad_hdl;
1864 
1865 	/* send the message */
1866 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_nack>: hdl=0x%llx" DS_EOL,
1867 	    PORTID(port), (u_longlong_t)bad_hdl);
1868 
1869 	(void) ds_send_msg(port, msg, msglen);
1870 	DS_FREE(msg, msglen);
1871 }
1872 
1873 static void
1874 ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1875 {
1876 	caddr_t		msg;
1877 	size_t		msglen;
1878 	ds_hdr_t	*hdr;
1879 	ds_data_nack_t	*nack;
1880 
1881 	mutex_enter(&port->lock);
1882 
1883 	/* check on the LDC to Zeus */
1884 	if (port->ldc.state != LDC_UP) {
1885 		/* can not send message */
1886 		cmn_err(CE_WARN, "ds@%lx: data_nack>: channel %ld is not up"
1887 		    DS_EOL, PORTID(port), port->ldc.id);
1888 		mutex_exit(&port->lock);
1889 		return;
1890 	}
1891 
1892 	/* make sure port is ready */
1893 	if (port->state != DS_PORT_READY) {
1894 		/* can not send message */
1895 		cmn_err(CE_WARN, "ds@%lx: data_nack>: port is not ready" DS_EOL,
1896 		    PORTID(port));
1897 		mutex_exit(&port->lock);
1898 		return;
1899 	}
1900 
1901 	mutex_exit(&port->lock);
1902 
1903 	msglen = DS_HDR_SZ + sizeof (ds_data_nack_t);
1904 	msg = DS_MALLOC(msglen);
1905 
1906 	/* copy in the header data */
1907 	hdr = (ds_hdr_t *)msg;
1908 	hdr->msg_type = DS_NACK;
1909 	hdr->payload_len = sizeof (ds_data_nack_t);
1910 
1911 	nack = (ds_data_nack_t *)(msg + DS_HDR_SZ);
1912 	nack->svc_handle = bad_hdl;
1913 	nack->result = DS_INV_HDL;
1914 
1915 	/* send the message */
1916 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack>: hdl=0x%llx" DS_EOL,
1917 	    PORTID(port), (u_longlong_t)bad_hdl);
1918 
1919 	(void) ds_send_msg(port, msg, msglen);
1920 	DS_FREE(msg, msglen);
1921 }
1922 
1923 /* END DS PROTOCOL SUPPORT FUNCTIONS */
1924 
1925 #ifdef DEBUG
1926 
1927 #define	BYTESPERLINE	8
1928 #define	LINEWIDTH	((BYTESPERLINE * 3) + (BYTESPERLINE + 2) + 1)
1929 #define	ASCIIOFFSET	((BYTESPERLINE * 3) + 2)
1930 #define	ISPRINT(c)	((c >= ' ') && (c <= '~'))
1931 
1932 /*
1933  * Output a buffer formatted with a set number of bytes on
1934  * each line. Append each line with the ASCII equivalent of
1935  * each byte if it falls within the printable ASCII range,
1936  * and '.' otherwise.
1937  */
1938 void
1939 ds_dump_msg(void *vbuf, size_t len)
1940 {
1941 	int	i, j;
1942 	char	*curr;
1943 	char	*aoff;
1944 	char	line[LINEWIDTH];
1945 	uint8_t	*buf = vbuf;
1946 
1947 	if (len > 128)
1948 		len = 128;
1949 
1950 	/* walk the buffer one line at a time */
1951 	for (i = 0; i < len; i += BYTESPERLINE) {
1952 
1953 		bzero(line, LINEWIDTH);
1954 
1955 		curr = line;
1956 		aoff = line + ASCIIOFFSET;
1957 
1958 		/*
1959 		 * Walk the bytes in the current line, storing
1960 		 * the hex value for the byte as well as the
1961 		 * ASCII representation in a temporary buffer.
1962 		 * All ASCII values are placed at the end of
1963 		 * the line.
1964 		 */
1965 		for (j = 0; (j < BYTESPERLINE) && ((i + j) < len); j++) {
1966 			(void) sprintf(curr, " %02x", buf[i + j]);
1967 			*aoff = (ISPRINT(buf[i + j])) ? buf[i + j] : '.';
1968 			curr += 3;
1969 			aoff++;
1970 		}
1971 
1972 		/*
1973 		 * Fill in to the start of the ASCII translation
1974 		 * with spaces. This will only be necessary if
1975 		 * this is the last line and there are not enough
1976 		 * bytes to fill the whole line.
1977 		 */
1978 		while (curr != (line + ASCIIOFFSET))
1979 			*curr++ = ' ';
1980 
1981 		cmn_err(CE_NOTE, "%s" DS_EOL, line);
1982 	}
1983 }
1984 #endif /* DEBUG */
1985 
1986 
1987 /*
1988  * Walk the table of registered services, executing the specified callback
1989  * function for each service on a port. A non-zero return value from the
1990  * callback is used to terminate the walk, not to indicate an error. Returns
1991  * the index of the last service visited.
1992  */
1993 int
1994 ds_walk_svcs(svc_cb_t svc_cb, void *arg)
1995 {
1996 	int		idx;
1997 	ds_svc_t	*svc;
1998 
1999 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2000 
2001 	/* walk every table entry */
2002 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
2003 		svc = ds_svcs.tbl[idx];
2004 
2005 		/* execute the callback */
2006 		if ((*svc_cb)(svc, arg) != 0)
2007 			break;
2008 	}
2009 
2010 	return (idx);
2011 }
2012 
2013 static int
2014 ds_svc_isfree(ds_svc_t *svc, void *arg)
2015 {
2016 	_NOTE(ARGUNUSED(arg))
2017 
2018 	/*
2019 	 * Looking for a free service. This may be a NULL entry
2020 	 * in the table, or an unused structure that could be
2021 	 * reused.
2022 	 */
2023 
2024 	if (DS_SVC_ISFREE(svc)) {
2025 		/* yes, it is free */
2026 		return (1);
2027 	}
2028 
2029 	/* not a candidate */
2030 	return (0);
2031 }
2032 
2033 int
2034 ds_svc_ismatch(ds_svc_t *svc, void *arg)
2035 {
2036 	if (DS_SVC_ISFREE(svc)) {
2037 		return (0);
2038 	}
2039 
2040 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2041 	    (svc->flags & DSSF_ISCLIENT) == 0) {
2042 		/* found a match */
2043 		return (1);
2044 	}
2045 
2046 	return (0);
2047 }
2048 
2049 int
2050 ds_svc_clnt_ismatch(ds_svc_t *svc, void *arg)
2051 {
2052 	if (DS_SVC_ISFREE(svc)) {
2053 		return (0);
2054 	}
2055 
2056 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2057 	    (svc->flags & DSSF_ISCLIENT) != 0) {
2058 		/* found a match */
2059 		return (1);
2060 	}
2061 
2062 	return (0);
2063 }
2064 
2065 int
2066 ds_svc_free(ds_svc_t *svc, void *arg)
2067 {
2068 	_NOTE(ARGUNUSED(arg))
2069 
2070 	if (svc == NULL) {
2071 		return (0);
2072 	}
2073 
2074 	if (svc->cap.svc_id) {
2075 		DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
2076 		svc->cap.svc_id = NULL;
2077 	}
2078 
2079 	if (svc->cap.vers) {
2080 		DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
2081 		svc->cap.vers = NULL;
2082 	}
2083 
2084 	DS_FREE(svc, sizeof (ds_svc_t));
2085 
2086 	return (0);
2087 }
2088 
2089 static void
2090 ds_set_svc_port_tried(char *svc_id, ds_port_t *port)
2091 {
2092 	int		idx;
2093 	ds_svc_t	*svc;
2094 
2095 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2096 
2097 	/* walk every table entry */
2098 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
2099 		svc = ds_svcs.tbl[idx];
2100 		if (!DS_SVC_ISFREE(svc) && (svc->flags & DSSF_ISCLIENT) != 0 &&
2101 		    strcmp(svc_id, svc->cap.svc_id) == 0)
2102 			DS_PORTSET_ADD(svc->tried, PORTID(port));
2103 	}
2104 }
2105 
2106 static int
2107 ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port)
2108 {
2109 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2110 
2111 	if (DS_SVC_ISFREE(svc))
2112 		return (0);
2113 
2114 	if (!DS_PORT_IN_SET(svc->avail, PORTID(port)))
2115 		return (0);
2116 
2117 	if (DS_PORT_IN_SET(svc->tried, PORTID(port)))
2118 		return (0);
2119 
2120 	if ((svc->flags & DSSF_ISCLIENT) == 0) {
2121 		DS_PORTSET_ADD(svc->tried, PORTID(port));
2122 		if (svc->state != DS_SVC_INACTIVE)
2123 			return (0);
2124 	} else {
2125 		ds_set_svc_port_tried(svc->cap.svc_id, port);
2126 
2127 		/*
2128 		 * Never send a client reg req to the SP.
2129 		 */
2130 		if (PORTID(port) == ds_sp_port_id) {
2131 			return (0);
2132 		}
2133 	}
2134 
2135 	if (ds_send_reg_req(svc, port) == 0) {
2136 		/* register sent successfully */
2137 		return (1);
2138 	}
2139 
2140 	if ((svc->flags & DSSF_ISCLIENT) == 0) {
2141 		/* reset the service */
2142 		ds_reset_svc(svc, port);
2143 	}
2144 	return (0);
2145 }
2146 
2147 static int
2148 ds_svc_register_onport_walker(ds_svc_t *svc, void *arg)
2149 {
2150 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2151 
2152 	if (DS_SVC_ISFREE(svc))
2153 		return (0);
2154 
2155 	(void) ds_svc_register_onport(svc, arg);
2156 	return (0);
2157 }
2158 
2159 int
2160 ds_svc_register(ds_svc_t *svc, void *arg)
2161 {
2162 	_NOTE(ARGUNUSED(arg))
2163 	ds_portset_t ports;
2164 	ds_port_t *port;
2165 	int	idx;
2166 
2167 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2168 
2169 	if (DS_SVC_ISFREE(svc))
2170 		return (0);
2171 
2172 	DS_PORTSET_DUP(ports, svc->avail);
2173 	if (svc->flags & DSSF_ISCLIENT) {
2174 		for (idx = 0; idx < DS_MAX_PORTS; idx++) {
2175 			if (DS_PORT_IN_SET(svc->tried, idx))
2176 				DS_PORTSET_DEL(ports, idx);
2177 		}
2178 	} else if (svc->state != DS_SVC_INACTIVE)
2179 		return (0);
2180 
2181 	if (DS_PORTSET_ISNULL(ports))
2182 		return (0);
2183 
2184 	/*
2185 	 * Attempt to register the service. Start with the lowest
2186 	 * numbered port and continue until a registration message
2187 	 * is sent successfully, or there are no ports left to try.
2188 	 */
2189 	for (idx = 0; idx < DS_MAX_PORTS; idx++) {
2190 
2191 		/*
2192 		 * If the port is not in the available list,
2193 		 * it is not a candidate for registration.
2194 		 */
2195 		if (!DS_PORT_IN_SET(ports, idx)) {
2196 			continue;
2197 		}
2198 
2199 		port = &ds_ports[idx];
2200 		if (ds_svc_register_onport(svc, port)) {
2201 			if ((svc->flags & DSSF_ISCLIENT) == 0)
2202 				break;
2203 		}
2204 	}
2205 
2206 	return (0);
2207 }
2208 
2209 static int
2210 ds_svc_unregister(ds_svc_t *svc, void *arg)
2211 {
2212 	ds_port_t *port = (ds_port_t *)arg;
2213 	ds_svc_hdl_t hdl;
2214 
2215 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2216 
2217 	if (DS_SVC_ISFREE(svc)) {
2218 		return (0);
2219 	}
2220 
2221 	/* make sure the service is using this port */
2222 	if (svc->port != port) {
2223 		return (0);
2224 	}
2225 
2226 	if (port) {
2227 		DS_DBG(CE_NOTE, "ds@%lx: svc_unreg: id='%s', ver=%d.%d, "
2228 		    " hdl=0x%09lx" DS_EOL, PORTID(port), svc->cap.svc_id,
2229 		    svc->ver.major, svc->ver.minor, svc->hdl);
2230 	} else {
2231 		DS_DBG(CE_NOTE, "port=NULL: svc_unreg: id='%s', ver=%d.%d, "
2232 		    " hdl=0x%09lx" DS_EOL, svc->cap.svc_id, svc->ver.major,
2233 		    svc->ver.minor, svc->hdl);
2234 	}
2235 
2236 	/* reset the service structure */
2237 	ds_reset_svc(svc, port);
2238 
2239 	/* call the client unregister callback */
2240 	if (svc->ops.ds_unreg_cb) {
2241 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
2242 	}
2243 
2244 	/* increment the count in the handle to prevent reuse */
2245 	hdl = DS_ALLOC_HDL(DS_HDL2IDX(svc->hdl), DS_HDL2COUNT(svc->hdl));
2246 	if (DS_HDL_ISCLIENT(svc->hdl)) {
2247 		DS_HDL_SET_ISCLIENT(hdl);
2248 	}
2249 	svc->hdl = hdl;
2250 
2251 	if (svc->state != DS_SVC_UNREG_PENDING) {
2252 		/* try to initiate a new registration */
2253 		(void) ds_svc_register(svc, NULL);
2254 	}
2255 
2256 	return (0);
2257 }
2258 
2259 static int
2260 ds_svc_port_up(ds_svc_t *svc, void *arg)
2261 {
2262 	ds_port_t *port = (ds_port_t *)arg;
2263 
2264 	if (DS_SVC_ISFREE(svc)) {
2265 		/* nothing to do */
2266 		return (0);
2267 	}
2268 
2269 	DS_PORTSET_ADD(svc->avail, port->id);
2270 	DS_PORTSET_DEL(svc->tried, port->id);
2271 
2272 	return (0);
2273 }
2274 
2275 static void
2276 ds_set_port_ready(ds_port_t *port, uint16_t major, uint16_t minor)
2277 {
2278 	boolean_t was_ready;
2279 
2280 	mutex_enter(&port->lock);
2281 	was_ready = (port->state == DS_PORT_READY);
2282 	if (!was_ready) {
2283 		port->state = DS_PORT_READY;
2284 		port->ver.major = major;
2285 		port->ver.minor = minor;
2286 	}
2287 	mutex_exit(&port->lock);
2288 
2289 	if (!was_ready) {
2290 
2291 		/*
2292 		 * The port came up, so update all the services
2293 		 * with this information. Follow that up with an
2294 		 * attempt to register any service that is not
2295 		 * already registered.
2296 		 */
2297 		mutex_enter(&ds_svcs.lock);
2298 
2299 		(void) ds_walk_svcs(ds_svc_port_up, port);
2300 		(void) ds_walk_svcs(ds_svc_register_onport_walker, port);
2301 
2302 		mutex_exit(&ds_svcs.lock);
2303 	}
2304 }
2305 
2306 ds_svc_t *
2307 ds_alloc_svc(void)
2308 {
2309 	int		idx;
2310 	uint_t		newmaxsvcs;
2311 	ds_svc_t	**newtbl;
2312 	ds_svc_t	*newsvc;
2313 
2314 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2315 
2316 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2317 
2318 	if (idx != ds_svcs.maxsvcs) {
2319 		goto found;
2320 	}
2321 
2322 	/*
2323 	 * There was no free space in the table. Grow
2324 	 * the table to double its current size.
2325 	 */
2326 	newmaxsvcs = ds_svcs.maxsvcs * 2;
2327 	newtbl = DS_MALLOC(newmaxsvcs * sizeof (ds_svc_t *));
2328 
2329 	/* copy old table data to the new table */
2330 	(void) memcpy(newtbl, ds_svcs.tbl,
2331 	    ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2332 
2333 	/* clean up the old table */
2334 	DS_FREE(ds_svcs.tbl, ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2335 	ds_svcs.tbl = newtbl;
2336 	ds_svcs.maxsvcs = newmaxsvcs;
2337 
2338 	/* search for a free space again */
2339 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2340 
2341 	/* the table is locked so should find a free slot */
2342 	ASSERT(idx != ds_svcs.maxsvcs);
2343 
2344 found:
2345 	/* allocate a new svc structure if necessary */
2346 	if ((newsvc = ds_svcs.tbl[idx]) == NULL) {
2347 		/* allocate a new service */
2348 		newsvc = DS_MALLOC(sizeof (ds_svc_t));
2349 		ds_svcs.tbl[idx] = newsvc;
2350 	}
2351 
2352 	/* fill in the handle */
2353 	newsvc->hdl = DS_ALLOC_HDL(idx, DS_HDL2COUNT(newsvc->hdl));
2354 	newsvc->state = DS_SVC_FREE;	/* Mark as free temporarily */
2355 
2356 	return (newsvc);
2357 }
2358 
2359 static void
2360 ds_reset_svc(ds_svc_t *svc, ds_port_t *port)
2361 {
2362 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2363 
2364 	if (svc->state != DS_SVC_UNREG_PENDING)
2365 		svc->state = DS_SVC_INACTIVE;
2366 	svc->ver_idx = 0;
2367 	svc->ver.major = 0;
2368 	svc->ver.minor = 0;
2369 	svc->port = NULL;
2370 	if (port) {
2371 		DS_PORTSET_DEL(svc->avail, port->id);
2372 	}
2373 }
2374 
2375 ds_svc_t *
2376 ds_get_svc(ds_svc_hdl_t hdl)
2377 {
2378 	int		idx;
2379 	ds_svc_t	*svc;
2380 
2381 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2382 
2383 	if (hdl == DS_INVALID_HDL)
2384 		return (NULL);
2385 
2386 	idx = DS_HDL2IDX(hdl);
2387 
2388 	/* check if index is out of bounds */
2389 	if ((idx < 0) || (idx >= ds_svcs.maxsvcs))
2390 		return (NULL);
2391 
2392 	svc = ds_svcs.tbl[idx];
2393 
2394 	/* check for a valid service */
2395 	if (DS_SVC_ISFREE(svc))
2396 		return (NULL);
2397 
2398 	/* make sure the handle is an exact match */
2399 	if (svc->hdl != hdl)
2400 		return (NULL);
2401 
2402 	return (svc);
2403 }
2404 
2405 static void
2406 ds_port_reset(ds_port_t *port)
2407 {
2408 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2409 	ASSERT(MUTEX_HELD(&port->lock));
2410 
2411 	/* connection went down, mark everything inactive */
2412 	(void) ds_walk_svcs(ds_svc_unregister, port);
2413 
2414 	port->ver_idx = 0;
2415 	port->ver.major = 0;
2416 	port->ver.minor = 0;
2417 	port->state = DS_PORT_LDC_INIT;
2418 }
2419 
2420 /*
2421  * Verify that a version array is sorted as expected for the
2422  * version negotiation to work correctly.
2423  */
2424 ds_vers_check_t
2425 ds_vers_isvalid(ds_ver_t *vers, int nvers)
2426 {
2427 	uint16_t	curr_major;
2428 	uint16_t	curr_minor;
2429 	int		idx;
2430 
2431 	curr_major = vers[0].major;
2432 	curr_minor = vers[0].minor;
2433 
2434 	/*
2435 	 * Walk the version array, verifying correct ordering.
2436 	 * The array must be sorted from highest supported
2437 	 * version to lowest supported version.
2438 	 */
2439 	for (idx = 0; idx < nvers; idx++) {
2440 		if (vers[idx].major > curr_major) {
2441 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2442 			    " increasing major versions" DS_EOL);
2443 			return (DS_VERS_INCREASING_MAJOR_ERR);
2444 		}
2445 
2446 		if (vers[idx].major < curr_major) {
2447 			curr_major = vers[idx].major;
2448 			curr_minor = vers[idx].minor;
2449 			continue;
2450 		}
2451 
2452 		if (vers[idx].minor > curr_minor) {
2453 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2454 			    " increasing minor versions" DS_EOL);
2455 			return (DS_VERS_INCREASING_MINOR_ERR);
2456 		}
2457 
2458 		curr_minor = vers[idx].minor;
2459 	}
2460 
2461 	return (DS_VERS_OK);
2462 }
2463 
2464 /*
2465  * Extended user capability init.
2466  */
2467 int
2468 ds_ucap_init(ds_capability_t *cap, ds_clnt_ops_t *ops, uint32_t flags,
2469     int instance, ds_svc_hdl_t *hdlp)
2470 {
2471 	ds_vers_check_t	status;
2472 	ds_svc_t	*svc;
2473 	int		rv = 0;
2474 	ds_svc_hdl_t 	lb_hdl, hdl;
2475 	int		is_loopback;
2476 	int		is_client;
2477 
2478 	/* sanity check the args */
2479 	if ((cap == NULL) || (ops == NULL)) {
2480 		cmn_err(CE_NOTE, "%s: invalid arguments" DS_EOL, __func__);
2481 		return (EINVAL);
2482 	}
2483 
2484 	/* sanity check the capability specifier */
2485 	if ((cap->svc_id == NULL) || (cap->vers == NULL) || (cap->nvers == 0)) {
2486 		cmn_err(CE_NOTE, "%s: invalid capability specifier" DS_EOL,
2487 		    __func__);
2488 		return (EINVAL);
2489 	}
2490 
2491 	/* sanity check the version array */
2492 	if ((status = ds_vers_isvalid(cap->vers, cap->nvers)) != DS_VERS_OK) {
2493 		cmn_err(CE_NOTE, "%s: invalid capability version array "
2494 		    "for %s service: %s" DS_EOL, __func__, cap->svc_id,
2495 		    (status == DS_VERS_INCREASING_MAJOR_ERR) ?
2496 		    "increasing major versions" :
2497 		    "increasing minor versions");
2498 		return (EINVAL);
2499 	}
2500 
2501 	/* data and register callbacks are required */
2502 	if ((ops->ds_data_cb == NULL) || (ops->ds_reg_cb == NULL)) {
2503 		cmn_err(CE_NOTE, "%s: invalid ops specifier for %s service"
2504 		    DS_EOL, __func__, cap->svc_id);
2505 		return (EINVAL);
2506 	}
2507 
2508 	flags &= DSSF_USERFLAGS;
2509 	is_client = flags & DSSF_ISCLIENT;
2510 
2511 	DS_DBG_USR(CE_NOTE, "%s: svc_id='%s', data_cb=0x%lx, cb_arg=0x%lx"
2512 	    DS_EOL, __func__, cap->svc_id, PTR_TO_LONG(ops->ds_data_cb),
2513 	    PTR_TO_LONG(ops->cb_arg));
2514 
2515 	mutex_enter(&ds_svcs.lock);
2516 
2517 	/* check if the service is already registered */
2518 	if (i_ds_hdl_lookup(cap->svc_id, is_client, NULL, 1) == 1) {
2519 		/* already registered */
2520 		cmn_err(CE_NOTE, "Service '%s'/%s already registered" DS_EOL,
2521 		    cap->svc_id,
2522 		    (flags & DSSF_ISCLIENT) ? "client" : "service");
2523 		mutex_exit(&ds_svcs.lock);
2524 		return (EALREADY);
2525 	}
2526 
2527 	svc = ds_alloc_svc();
2528 	if (is_client) {
2529 		DS_HDL_SET_ISCLIENT(svc->hdl);
2530 	}
2531 
2532 	svc->state = DS_SVC_FREE;
2533 	svc->svc_hdl = DS_BADHDL1;
2534 
2535 	svc->flags = flags;
2536 	svc->drvi = instance;
2537 	svc->drv_psp = NULL;
2538 
2539 	/*
2540 	 * Check for loopback.  "pri" is a legacy service that assumes it
2541 	 * will never use loopback mode.
2542 	 */
2543 	if (strcmp(cap->svc_id, "pri") == 0) {
2544 		is_loopback = 0;
2545 	} else if (i_ds_hdl_lookup(cap->svc_id, is_client == 0, &lb_hdl, 1)
2546 	    == 1) {
2547 		if ((rv = ds_loopback_set_svc(svc, cap, &lb_hdl)) != 0) {
2548 			DS_DBG_USR(CE_NOTE, "%s: ds_loopback_set_svc '%s' err "
2549 			    " (%d)" DS_EOL, __func__, cap->svc_id, rv);
2550 			mutex_exit(&ds_svcs.lock);
2551 			return (rv);
2552 		}
2553 		is_loopback = 1;
2554 	} else
2555 		is_loopback = 0;
2556 
2557 	/* copy over all the client information */
2558 	(void) memcpy(&svc->cap, cap, sizeof (ds_capability_t));
2559 
2560 	/* make a copy of the service name */
2561 	svc->cap.svc_id = ds_strdup(cap->svc_id);
2562 
2563 	/* make a copy of the version array */
2564 	svc->cap.vers = DS_MALLOC(cap->nvers * sizeof (ds_ver_t));
2565 	(void) memcpy(svc->cap.vers, cap->vers, cap->nvers * sizeof (ds_ver_t));
2566 
2567 	/* copy the client ops vector */
2568 	(void) memcpy(&svc->ops, ops, sizeof (ds_clnt_ops_t));
2569 
2570 	svc->state = DS_SVC_INACTIVE;
2571 	svc->ver_idx = 0;
2572 	DS_PORTSET_DUP(svc->avail, ds_allports);
2573 	DS_PORTSET_SETNULL(svc->tried);
2574 
2575 	ds_svcs.nsvcs++;
2576 
2577 	hdl = svc->hdl;
2578 
2579 	/*
2580 	 * kludge to allow user callback code to get handle and user args.
2581 	 * Make sure the callback arg points to the svc structure.
2582 	 */
2583 	if ((flags & DSSF_ISUSER) != 0) {
2584 		ds_cbarg_set_cookie(svc);
2585 	}
2586 
2587 	if (is_loopback) {
2588 		ds_loopback_register(hdl);
2589 		ds_loopback_register(lb_hdl);
2590 	}
2591 
2592 	/*
2593 	 * If this is a client or a non-loopback service provider, send
2594 	 * out register requests.
2595 	 */
2596 	if (!is_loopback || (flags & DSSF_ISCLIENT) != 0)
2597 		(void) ds_svc_register(svc, NULL);
2598 
2599 	if (hdlp) {
2600 		*hdlp = hdl;
2601 	}
2602 
2603 	mutex_exit(&ds_svcs.lock);
2604 
2605 	DS_DBG_USR(CE_NOTE, "%s: service '%s' assigned handle 0x%09lx" DS_EOL,
2606 	    __func__, svc->cap.svc_id, hdl);
2607 
2608 	return (0);
2609 }
2610 
2611 /*
2612  * ds_cap_init interface for previous revision.
2613  */
2614 int
2615 ds_cap_init(ds_capability_t *cap, ds_clnt_ops_t *ops)
2616 {
2617 	return (ds_ucap_init(cap, ops, 0, DS_INVALID_INSTANCE, NULL));
2618 }
2619 
2620 /*
2621  * Interface for ds_unreg_hdl in lds driver.
2622  */
2623 int
2624 ds_unreg_hdl(ds_svc_hdl_t hdl)
2625 {
2626 	ds_svc_t	*svc;
2627 	int		is_loopback;
2628 	ds_svc_hdl_t	lb_hdl;
2629 
2630 	DS_DBG_USR(CE_NOTE, "%s: hdl=0x%09lx" DS_EOL, __func__, hdl);
2631 
2632 	mutex_enter(&ds_svcs.lock);
2633 	if ((svc = ds_get_svc(hdl)) == NULL) {
2634 		mutex_exit(&ds_svcs.lock);
2635 		DS_DBG_USR(CE_NOTE, "%s: unknown hdl: 0x%llx" DS_EOL, __func__,
2636 		    (u_longlong_t)hdl);
2637 		return (ENXIO);
2638 	}
2639 
2640 	DS_DBG_USR(CE_NOTE, "%s: svcid='%s', hdl=0x%llx" DS_EOL, __func__,
2641 	    svc->cap.svc_id, (u_longlong_t)svc->hdl);
2642 
2643 	svc->state = DS_SVC_UNREG_PENDING;
2644 
2645 	is_loopback = ((svc->flags & DSSF_LOOPBACK) != 0);
2646 	lb_hdl = svc->svc_hdl;
2647 
2648 	if (svc->port) {
2649 		(void) ds_send_unreg_req(svc);
2650 	}
2651 
2652 	(void) ds_svc_unregister(svc, svc->port);
2653 
2654 	ds_delete_svc_entry(svc);
2655 
2656 	if (is_loopback) {
2657 		ds_loopback_unregister(lb_hdl);
2658 	}
2659 
2660 	mutex_exit(&ds_svcs.lock);
2661 
2662 	return (0);
2663 }
2664 
2665 int
2666 ds_cap_fini(ds_capability_t *cap)
2667 {
2668 	ds_svc_hdl_t	hdl;
2669 	int rv;
2670 	uint_t nhdls = 0;
2671 
2672 	DS_DBG(CE_NOTE, "%s: '%s'" DS_EOL, __func__, cap->svc_id);
2673 	if ((rv = ds_hdl_lookup(cap->svc_id, 0, &hdl, 1, &nhdls)) != 0) {
2674 		DS_DBG(CE_NOTE, "%s: ds_hdl_lookup '%s' err (%d)" DS_EOL,
2675 		    __func__, cap->svc_id, rv);
2676 		return (rv);
2677 	}
2678 
2679 	if (nhdls == 0) {
2680 		DS_DBG(CE_NOTE, "%s: no such service '%s'" DS_EOL,
2681 		    __func__, cap->svc_id);
2682 		return (ENXIO);
2683 	}
2684 
2685 	if ((rv = ds_is_my_hdl(hdl, DS_INVALID_INSTANCE)) != 0) {
2686 		DS_DBG(CE_NOTE, "%s: ds_is_my_handle err (%d)" DS_EOL, __func__,
2687 		    rv);
2688 		return (rv);
2689 	}
2690 
2691 	if ((rv = ds_unreg_hdl(hdl)) != 0) {
2692 		DS_DBG(CE_NOTE, "%s: ds_unreg_hdl err (%d)" DS_EOL, __func__,
2693 		    rv);
2694 		return (rv);
2695 	}
2696 
2697 	return (0);
2698 }
2699 
2700 int
2701 ds_cap_send(ds_svc_hdl_t hdl, void *buf, size_t len)
2702 {
2703 	int		rv;
2704 	ds_hdr_t	*hdr;
2705 	caddr_t		msg;
2706 	size_t		msglen;
2707 	size_t		hdrlen;
2708 	caddr_t		payload;
2709 	ds_svc_t	*svc;
2710 	ds_port_t	*port;
2711 	ds_data_handle_t *data;
2712 	ds_svc_hdl_t	svc_hdl;
2713 	int		is_client = 0;
2714 
2715 	DS_DBG(CE_NOTE, "%s: hdl: 0x%llx, buf: %lx, len: %ld" DS_EOL, __func__,
2716 	    (u_longlong_t)hdl, (ulong_t)buf, len);
2717 
2718 	mutex_enter(&ds_svcs.lock);
2719 
2720 	if ((svc = ds_get_svc(hdl)) == NULL) {
2721 		cmn_err(CE_WARN, "%s: invalid handle 0x%llx" DS_EOL, __func__,
2722 		    (u_longlong_t)hdl);
2723 		mutex_exit(&ds_svcs.lock);
2724 		return (ENXIO);
2725 	}
2726 
2727 	if (svc->state != DS_SVC_ACTIVE) {
2728 		/* channel is up, but svc is not registered */
2729 		DS_DBG(CE_NOTE, "%s: invalid service state 0x%x" DS_EOL,
2730 		    __func__, svc->state);
2731 		mutex_exit(&ds_svcs.lock);
2732 		return (ENOTCONN);
2733 	}
2734 
2735 	if (svc->flags & DSSF_LOOPBACK) {
2736 		hdl = svc->svc_hdl;
2737 		mutex_exit(&ds_svcs.lock);
2738 		ds_loopback_send(hdl, buf, len);
2739 		return (0);
2740 	}
2741 
2742 	if ((port = svc->port) == NULL) {
2743 		DS_DBG(CE_NOTE, "%s: service '%s' not associated with a port"
2744 		    DS_EOL, __func__, svc->cap.svc_id);
2745 		mutex_exit(&ds_svcs.lock);
2746 		return (ECONNRESET);
2747 	}
2748 
2749 	if (svc->flags & DSSF_ISCLIENT) {
2750 		is_client = 1;
2751 		svc_hdl = svc->svc_hdl;
2752 	}
2753 
2754 	mutex_exit(&ds_svcs.lock);
2755 
2756 	/* check that the LDC channel is ready */
2757 	if (port->ldc.state != LDC_UP) {
2758 		DS_DBG(CE_NOTE, "%s: LDC channel is not up" DS_EOL, __func__);
2759 		return (ECONNRESET);
2760 	}
2761 
2762 	hdrlen = DS_HDR_SZ + sizeof (ds_data_handle_t);
2763 
2764 	msg = DS_MALLOC(len + hdrlen);
2765 	hdr = (ds_hdr_t *)msg;
2766 	payload = msg + hdrlen;
2767 	msglen = len + hdrlen;
2768 
2769 	hdr->payload_len = len + sizeof (ds_data_handle_t);
2770 	hdr->msg_type = DS_DATA;
2771 
2772 	data = (ds_data_handle_t *)(msg + DS_HDR_SZ);
2773 	if (is_client) {
2774 		data->svc_handle = svc_hdl;
2775 	} else {
2776 		data->svc_handle = hdl;
2777 	}
2778 
2779 	if ((buf != NULL) && (len != 0)) {
2780 		(void) memcpy(payload, buf, len);
2781 	}
2782 
2783 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data>: hdl=0x%llx, len=%ld, "
2784 	    " payload_len=%d" DS_EOL, PORTID(port), (u_longlong_t)svc->hdl,
2785 	    msglen, hdr->payload_len);
2786 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msglen);
2787 
2788 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
2789 		rv = (rv == EIO) ? ECONNRESET : rv;
2790 	}
2791 	DS_FREE(msg, msglen);
2792 
2793 	return (rv);
2794 }
2795 
2796 void
2797 ds_port_common_init(ds_port_t *port)
2798 {
2799 	int rv;
2800 
2801 	if ((port->flags & DS_PORT_MUTEX_INITED) == 0) {
2802 		mutex_init(&port->lock, NULL, MUTEX_DRIVER, NULL);
2803 		mutex_init(&port->tx_lock, NULL, MUTEX_DRIVER, NULL);
2804 		mutex_init(&port->rcv_lock, NULL, MUTEX_DRIVER, NULL);
2805 		port->flags |= DS_PORT_MUTEX_INITED;
2806 	}
2807 
2808 	port->state = DS_PORT_INIT;
2809 	DS_PORTSET_ADD(ds_allports, port->id);
2810 
2811 	ds_sys_port_init(port);
2812 
2813 	mutex_enter(&port->lock);
2814 	rv = ds_ldc_init(port);
2815 	mutex_exit(&port->lock);
2816 
2817 	/*
2818 	 * If LDC successfully init'ed, try to kick off protocol for this port.
2819 	 */
2820 	if (rv == 0) {
2821 		ds_handle_up_event(port);
2822 	}
2823 }
2824 
2825 void
2826 ds_port_common_fini(ds_port_t *port)
2827 {
2828 	ASSERT(MUTEX_HELD(&port->lock));
2829 
2830 	port->state = DS_PORT_FREE;
2831 
2832 	DS_PORTSET_DEL(ds_allports, port->id);
2833 
2834 	ds_sys_port_fini(port);
2835 }
2836 
2837 /*
2838  * Initialize table of registered service classes
2839  */
2840 void
2841 ds_init_svcs_tbl(uint_t nentries)
2842 {
2843 	int	tblsz;
2844 
2845 	ds_svcs.maxsvcs = nentries;
2846 
2847 	tblsz = ds_svcs.maxsvcs * sizeof (ds_svc_t *);
2848 	ds_svcs.tbl = (ds_svc_t **)DS_MALLOC(tblsz);
2849 
2850 	ds_svcs.nsvcs = 0;
2851 }
2852 
2853 /*
2854  * Find the max and min version supported.
2855  * Hacked from zeus workspace, support.c
2856  */
2857 static void
2858 min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
2859     uint16_t *min_major, uint16_t *max_major)
2860 {
2861 	int i;
2862 
2863 	*min_major = sup_versionsp[0].major;
2864 	*max_major = *min_major;
2865 
2866 	for (i = 1; i < num_versions; i++) {
2867 		if (sup_versionsp[i].major < *min_major)
2868 			*min_major = sup_versionsp[i].major;
2869 
2870 		if (sup_versionsp[i].major > *max_major)
2871 			*max_major = sup_versionsp[i].major;
2872 	}
2873 }
2874 
2875 /*
2876  * Check whether the major and minor numbers requested by the peer can be
2877  * satisfied. If the requested major is supported, true is returned, and the
2878  * agreed minor is returned in new_minor. If the requested major is not
2879  * supported, the routine returns false, and the closest major is returned in
2880  * *new_major, upon which the peer should re-negotiate. The closest major is
2881  * the just lower that the requested major number.
2882  *
2883  * Hacked from zeus workspace, support.c
2884  */
2885 boolean_t
2886 negotiate_version(int num_versions, ds_ver_t *sup_versionsp,
2887     uint16_t req_major, uint16_t *new_majorp, uint16_t *new_minorp)
2888 {
2889 	int i;
2890 	uint16_t major, lower_major;
2891 	uint16_t min_major = 0, max_major;
2892 	boolean_t found_match = B_FALSE;
2893 
2894 	min_max_versions(num_versions, sup_versionsp, &min_major, &max_major);
2895 
2896 	DS_DBG(CE_NOTE, "negotiate_version: req_major = %u, min = %u, max = %u"
2897 	    DS_EOL, req_major, min_major, max_major);
2898 
2899 	/*
2900 	 * If the minimum version supported is greater than
2901 	 * the version requested, return the lowest version
2902 	 * supported
2903 	 */
2904 	if (min_major > req_major) {
2905 		*new_majorp = min_major;
2906 		return (B_FALSE);
2907 	}
2908 
2909 	/*
2910 	 * If the largest version supported is lower than
2911 	 * the version requested, return the largest version
2912 	 * supported
2913 	 */
2914 	if (max_major < req_major) {
2915 		*new_majorp = max_major;
2916 		return (B_FALSE);
2917 	}
2918 
2919 	/*
2920 	 * Now we know that the requested version lies between the
2921 	 * min and max versions supported. Check if the requested
2922 	 * major can be found in supported versions.
2923 	 */
2924 	lower_major = min_major;
2925 	for (i = 0; i < num_versions; i++) {
2926 		major = sup_versionsp[i].major;
2927 		if (major == req_major) {
2928 			found_match = B_TRUE;
2929 			*new_majorp = req_major;
2930 			*new_minorp = sup_versionsp[i].minor;
2931 			break;
2932 		} else {
2933 			if ((major < req_major) && (major > lower_major))
2934 				lower_major = major;
2935 		}
2936 	}
2937 
2938 	/*
2939 	 * If no match is found, return the closest available number
2940 	 */
2941 	if (!found_match)
2942 		*new_majorp = lower_major;
2943 
2944 	return (found_match);
2945 }
2946 
2947 /*
2948  * Specific errno's that are used by ds.c and ldc.c
2949  */
2950 static struct {
2951 	int ds_errno;
2952 	char *estr;
2953 } ds_errno_to_str_tab[] = {
2954 	{ EIO,		"I/O error" },
2955 	{ ENXIO,	"No such device or address" },
2956 	{ EAGAIN,	"Resource temporarily unavailable" },
2957 	{ ENOMEM,	"Not enough space" },
2958 	{ EACCES,	"Permission denied" },
2959 	{ EFAULT,	"Bad address" },
2960 	{ EBUSY,	"Device busy" },
2961 	{ EINVAL,	"Invalid argument" },
2962 	{ ENOSPC,	"No space left on device" },
2963 	{ ENOMSG,	"No message of desired type" },
2964 #ifdef	ECHRNG
2965 	{ ECHRNG,	"Channel number out of range" },
2966 #endif
2967 	{ ENOTSUP,	"Operation not supported" },
2968 	{ EMSGSIZE,	"Message too long" },
2969 	{ EADDRINUSE,	"Address already in use" },
2970 	{ ECONNRESET,	"Connection reset by peer" },
2971 	{ ENOBUFS,	"No buffer space available" },
2972 	{ ENOTCONN,	"Socket is not connected" },
2973 	{ ECONNREFUSED,	"Connection refused" },
2974 	{ EALREADY,	"Operation already in progress" },
2975 	{ 0,		NULL },
2976 };
2977 
2978 char *
2979 ds_errno_to_str(int ds_errno, char *ebuf)
2980 {
2981 	int i, en;
2982 
2983 	for (i = 0; (en = ds_errno_to_str_tab[i].ds_errno) != 0; i++) {
2984 		if (en == ds_errno) {
2985 			(void) strcpy(ebuf, ds_errno_to_str_tab[i].estr);
2986 			return (ebuf);
2987 		}
2988 	}
2989 
2990 	(void) sprintf(ebuf, "ds_errno (%d)", ds_errno);
2991 	return (ebuf);
2992 }
2993 
2994 static void
2995 ds_loopback_register(ds_svc_hdl_t hdl)
2996 {
2997 	ds_ver_t ds_ver;
2998 	ds_svc_t *svc;
2999 
3000 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3001 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
3002 	    (u_longlong_t)hdl);
3003 	if ((svc = ds_get_svc(hdl)) == NULL) {
3004 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
3005 		    (u_longlong_t)hdl);
3006 		return;
3007 	}
3008 
3009 	svc->state = DS_SVC_ACTIVE;
3010 
3011 	if (svc->ops.ds_reg_cb) {
3012 		DS_DBG_LOOP(CE_NOTE, "%s: loopback regcb: hdl: 0x%llx" DS_EOL,
3013 		    __func__, (u_longlong_t)hdl);
3014 		ds_ver.major = svc->ver.major;
3015 		ds_ver.minor = svc->ver.minor;
3016 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &ds_ver, hdl);
3017 	}
3018 }
3019 
3020 static void
3021 ds_loopback_unregister(ds_svc_hdl_t hdl)
3022 {
3023 	ds_svc_t *svc;
3024 
3025 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3026 	if ((svc = ds_get_svc(hdl)) == NULL) {
3027 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
3028 		    (u_longlong_t)hdl);
3029 		return;
3030 	}
3031 
3032 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
3033 	    (u_longlong_t)hdl);
3034 
3035 	svc->flags &= ~DSSF_LOOPBACK;
3036 	svc->svc_hdl = DS_BADHDL2;
3037 	svc->state = DS_SVC_INACTIVE;
3038 
3039 	if (svc->ops.ds_unreg_cb) {
3040 		DS_DBG_LOOP(CE_NOTE, "%s: loopback unregcb: hdl: 0x%llx" DS_EOL,
3041 		    __func__, (u_longlong_t)hdl);
3042 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
3043 	}
3044 }
3045 
3046 static void
3047 ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen)
3048 {
3049 	ds_svc_t *svc;
3050 
3051 	mutex_enter(&ds_svcs.lock);
3052 	if ((svc = ds_get_svc(hdl)) == NULL) {
3053 		mutex_exit(&ds_svcs.lock);
3054 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
3055 		    (u_longlong_t)hdl);
3056 		return;
3057 	}
3058 	mutex_exit(&ds_svcs.lock);
3059 
3060 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
3061 	    (u_longlong_t)hdl);
3062 
3063 	if (svc->ops.ds_data_cb) {
3064 		DS_DBG_LOOP(CE_NOTE, "%s: loopback datacb hdl: 0x%llx" DS_EOL,
3065 		    __func__, (u_longlong_t)hdl);
3066 		(*svc->ops.ds_data_cb)(svc->ops.cb_arg, buf, buflen);
3067 	}
3068 }
3069 
3070 static int
3071 ds_loopback_set_svc(ds_svc_t *svc, ds_capability_t *cap, ds_svc_hdl_t *lb_hdlp)
3072 {
3073 	ds_svc_t *lb_svc;
3074 	ds_svc_hdl_t lb_hdl = *lb_hdlp;
3075 	int i;
3076 	int match = 0;
3077 	uint16_t new_major;
3078 	uint16_t new_minor;
3079 
3080 	if ((lb_svc = ds_get_svc(lb_hdl)) == NULL) {
3081 		DS_DBG_LOOP(CE_NOTE, "%s: loopback: hdl: 0x%llx invalid" DS_EOL,
3082 		    __func__, (u_longlong_t)lb_hdl);
3083 		return (ENXIO);
3084 	}
3085 
3086 	/* negotiate a version between loopback services, if possible */
3087 	for (i = 0; i < lb_svc->cap.nvers && match == 0; i++) {
3088 		match = negotiate_version(cap->nvers, cap->vers,
3089 		    lb_svc->cap.vers[i].major, &new_major, &new_minor);
3090 	}
3091 	if (!match) {
3092 		DS_DBG_LOOP(CE_NOTE, "%s: loopback version negotiate failed"
3093 		    DS_EOL, __func__);
3094 		return (ENOTSUP);
3095 	}
3096 	if (lb_svc->state != DS_SVC_INACTIVE) {
3097 		DS_DBG_LOOP(CE_NOTE, "%s: loopback active: hdl: 0x%llx"
3098 		    DS_EOL, __func__, (u_longlong_t)lb_hdl);
3099 		if ((lb_svc->flags & DSSF_ISCLIENT) == 0) {
3100 			DS_DBG_LOOP(CE_NOTE, "%s: loopback busy hdl: 0x%llx"
3101 			    DS_EOL, __func__, (u_longlong_t)lb_hdl);
3102 			return (EBUSY);
3103 		}
3104 		svc->state = DS_SVC_INACTIVE;	/* prevent alloc'ing svc */
3105 		lb_svc = ds_svc_clone(lb_svc);
3106 		DS_DBG_LOOP(CE_NOTE, "%s: loopback clone: ohdl: 0x%llx "
3107 		    "nhdl: 0x%llx" DS_EOL, __func__, (u_longlong_t)lb_hdl,
3108 		    (u_longlong_t)lb_svc->hdl);
3109 		*lb_hdlp = lb_svc->hdl;
3110 	}
3111 
3112 	svc->flags |= DSSF_LOOPBACK;
3113 	svc->svc_hdl = lb_svc->hdl;
3114 	svc->port = NULL;
3115 	svc->ver.major = new_major;
3116 	svc->ver.minor = new_minor;
3117 
3118 	lb_svc->flags |= DSSF_LOOPBACK;
3119 	lb_svc->svc_hdl = svc->hdl;
3120 	lb_svc->port = NULL;
3121 	lb_svc->ver.major = new_major;
3122 	lb_svc->ver.minor = new_minor;
3123 
3124 	DS_DBG_LOOP(CE_NOTE, "%s: setting loopback between: 0x%llx and 0x%llx"
3125 	    DS_EOL, __func__, (u_longlong_t)svc->hdl,
3126 	    (u_longlong_t)lb_svc->hdl);
3127 	return (0);
3128 }
3129 
3130 static ds_svc_t *
3131 ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl, ds_port_t *port)
3132 {
3133 	int		idx;
3134 	ds_svc_t	*svc;
3135 
3136 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s looking up clnt hdl: 0x%llx" DS_EOL,
3137 	    PORTID(port), __func__, (u_longlong_t)hdl);
3138 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3139 
3140 	/* walk every table entry */
3141 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
3142 		svc = ds_svcs.tbl[idx];
3143 		if (DS_SVC_ISFREE(svc))
3144 			continue;
3145 		if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3146 		    svc->svc_hdl == hdl && svc->port == port) {
3147 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s found clnt hdl "
3148 			    "0x%llx: svc%d" DS_EOL, PORTID(port), __func__,
3149 			    (u_longlong_t)hdl, (uint_t)DS_HDL2IDX(svc->hdl));
3150 			return (svc);
3151 		}
3152 	}
3153 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s clnt hdl: 0x%llx not found" DS_EOL,
3154 	    PORTID(port), __func__, (u_longlong_t)hdl);
3155 
3156 	return (NULL);
3157 }
3158 
3159 static ds_svc_t *
3160 ds_svc_clone(ds_svc_t *svc)
3161 {
3162 	ds_svc_t *newsvc;
3163 	ds_svc_hdl_t hdl;
3164 
3165 	ASSERT(svc->flags & DSSF_ISCLIENT);
3166 
3167 	newsvc = ds_alloc_svc();
3168 
3169 	/* Can only clone clients for now */
3170 	hdl = newsvc->hdl | DS_HDL_ISCLIENT_BIT;
3171 	DS_DBG_USR(CE_NOTE, "%s: cloning client: old hdl: 0x%llx new hdl: "
3172 	    "0x%llx" DS_EOL, __func__, (u_longlong_t)svc->hdl,
3173 	    (u_longlong_t)hdl);
3174 	(void) memcpy(newsvc, svc, sizeof (ds_svc_t));
3175 	newsvc->hdl = hdl;
3176 	newsvc->flags &= ~DSSF_LOOPBACK;
3177 	newsvc->port = NULL;
3178 	newsvc->svc_hdl = DS_BADHDL2;
3179 	newsvc->cap.svc_id = ds_strdup(svc->cap.svc_id);
3180 	newsvc->cap.vers = DS_MALLOC(svc->cap.nvers * sizeof (ds_ver_t));
3181 	(void) memcpy(newsvc->cap.vers, svc->cap.vers,
3182 	    svc->cap.nvers * sizeof (ds_ver_t));
3183 
3184 	/*
3185 	 * Kludge to allow lds driver user callbacks to get access to current
3186 	 * svc structure.  Arg could be index to svc table or some other piece
3187 	 * of info to get to the svc table entry.
3188 	 */
3189 	if (newsvc->flags & DSSF_ISUSER) {
3190 		newsvc->ops.cb_arg = (ds_cb_arg_t)(newsvc);
3191 	}
3192 	return (newsvc);
3193 }
3194 
3195 /*
3196  * Internal handle lookup function.
3197  */
3198 static int
3199 i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3200     uint_t maxhdls)
3201 {
3202 	int idx;
3203 	int nhdls = 0;
3204 	ds_svc_t *svc;
3205 	uint32_t client_flag = is_client ? DSSF_ISCLIENT : 0;
3206 
3207 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3208 
3209 	for (idx = 0; idx < ds_svcs.maxsvcs && nhdls < maxhdls; idx++) {
3210 		svc = ds_svcs.tbl[idx];
3211 		if (DS_SVC_ISFREE(svc))
3212 			continue;
3213 		if (strcmp(svc->cap.svc_id, service) == 0 &&
3214 		    (svc->flags & DSSF_ISCLIENT) == client_flag) {
3215 			if (hdlp != NULL && nhdls < maxhdls) {
3216 				hdlp[nhdls] = svc->hdl;
3217 				nhdls++;
3218 			} else {
3219 				nhdls++;
3220 			}
3221 		}
3222 	}
3223 	return (nhdls);
3224 }
3225 
3226 /*
3227  * Interface for ds_hdl_lookup in lds driver.
3228  */
3229 int
3230 ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3231     uint_t maxhdls, uint_t *nhdlsp)
3232 {
3233 	mutex_enter(&ds_svcs.lock);
3234 	*nhdlsp = i_ds_hdl_lookup(service, is_client, hdlp, maxhdls);
3235 	mutex_exit(&ds_svcs.lock);
3236 	return (0);
3237 }
3238 
3239 /*
3240  * After an UNREG REQ, check if this is a client service with multiple
3241  * handles.  If it is, then we can eliminate this entry.
3242  */
3243 static void
3244 ds_check_for_dup_services(ds_svc_t *svc)
3245 {
3246 	if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3247 	    svc->state == DS_SVC_INACTIVE &&
3248 	    i_ds_hdl_lookup(svc->cap.svc_id, 1, NULL, 2) == 2) {
3249 		ds_delete_svc_entry(svc);
3250 	}
3251 }
3252 
3253 static void
3254 ds_delete_svc_entry(ds_svc_t *svc)
3255 {
3256 	ds_svc_hdl_t tmp_hdl;
3257 
3258 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3259 
3260 	/*
3261 	 * Clear out the structure, but do not deallocate the
3262 	 * memory. It can be reused for the next registration.
3263 	 */
3264 	DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
3265 	DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
3266 
3267 	/* save the handle to prevent reuse */
3268 	tmp_hdl = svc->hdl;
3269 	bzero((void *)svc, sizeof (ds_svc_t));
3270 
3271 	/* initialize for next use */
3272 	svc->hdl = tmp_hdl;
3273 	svc->state = DS_SVC_FREE;
3274 
3275 	ds_svcs.nsvcs--;
3276 }
3277