xref: /illumos-gate/usr/src/uts/sun4v/io/ds_common.c (revision a0dcbafd041a2ff7fdb68a14128e3744213dafac)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * Domain Services Module Common Code.
29  *
30  * This module is intended to be used by both Solaris and the VBSC
31  * module.
32  */
33 
34 #include <sys/modctl.h>
35 #include <sys/ksynch.h>
36 #include <sys/taskq.h>
37 #include <sys/disp.h>
38 #include <sys/cmn_err.h>
39 #include <sys/note.h>
40 #include <sys/mach_descrip.h>
41 #include <sys/mdesc.h>
42 #include <sys/ldc.h>
43 #include <sys/ds.h>
44 #include <sys/ds_impl.h>
45 
46 #ifndef MIN
47 #define	MIN(a, b)	((a) < (b) ? (a) : (b))
48 #endif
49 
50 #define	DS_DECODE_BUF_LEN		30
51 
52 /*
53  * All DS ports in the system
54  *
55  * The list of DS ports is read in from the MD when the DS module is
56  * initialized and is never modified. This eliminates the need for
57  * locking to access the port array itself. Access to the individual
58  * ports are synchronized at the port level.
59  */
60 ds_port_t	ds_ports[DS_MAX_PORTS];
61 ds_portset_t	ds_allports;	/* all DS ports in the system */
62 ds_portset_t	ds_nullport;	/* allows test against null portset */
63 
64 /*
65  * Table of registered services
66  *
67  * Locking: Accesses to the table of services are synchronized using
68  *   a mutex lock. The reader lock must be held when looking up service
69  *   information in the table. The writer lock must be held when any
70  *   service information is being modified.
71  */
72 ds_svcs_t	ds_svcs;
73 
74 /*
75  * Flag to prevent callbacks while in the middle of DS teardown.
76  */
77 boolean_t ds_enabled = B_FALSE;	/* enable/disable taskq processing */
78 
79 /*
80  * Retry count and delay for LDC reads and writes
81  */
82 #ifndef DS_DEFAULT_RETRIES
83 #define	DS_DEFAULT_RETRIES	10000	/* number of times to retry */
84 #endif
85 #ifndef DS_DEFAULT_DELAY
86 #define	DS_DEFAULT_DELAY	1000	/* usecs to wait between retries */
87 #endif
88 
89 static int ds_retries = DS_DEFAULT_RETRIES;
90 static clock_t ds_delay = DS_DEFAULT_DELAY;
91 
92 /*
93  * Supported versions of the DS message protocol
94  *
95  * The version array must be sorted in order from the highest
96  * supported version to the lowest. Support for a particular
97  * <major>.<minor> version implies all lower minor versions of
98  * that same major version are supported as well.
99  */
100 static ds_ver_t ds_vers[] = { { 1, 0 } };
101 
102 #define	DS_NUM_VER	(sizeof (ds_vers) / sizeof (ds_vers[0]))
103 
104 
105 /* incoming message handling functions */
106 typedef void (*ds_msg_handler_t)(ds_port_t *port, caddr_t buf, size_t len);
107 static void ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len);
108 static void ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len);
109 static void ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len);
110 static void ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len);
111 static void ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len);
112 static void ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len);
113 static void ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len);
114 static void ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len);
115 static void ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len);
116 static void ds_handle_data(ds_port_t *port, caddr_t buf, size_t len);
117 static void ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len);
118 
119 /*
120  * DS Message Handler Dispatch Table
121  *
122  * A table used to dispatch all incoming messages. This table
123  * contains handlers for all the fixed message types, as well as
124  * the the messages defined in the 1.0 version of the DS protocol.
125  * The handlers are indexed based on the DS header msg_type values
126  */
127 static const ds_msg_handler_t ds_msg_handlers[] = {
128 	ds_handle_init_req,		/* DS_INIT_REQ */
129 	ds_handle_init_ack,		/* DS_INIT_ACK */
130 	ds_handle_init_nack,		/* DS_INIT_NACK */
131 	ds_handle_reg_req,		/* DS_REG_REQ */
132 	ds_handle_reg_ack,		/* DS_REG_ACK */
133 	ds_handle_reg_nack,		/* DS_REG_NACK */
134 	ds_handle_unreg_req,		/* DS_UNREG */
135 	ds_handle_unreg_ack,		/* DS_UNREG_ACK */
136 	ds_handle_unreg_nack,		/* DS_UNREG_NACK */
137 	ds_handle_data,			/* DS_DATA */
138 	ds_handle_nack			/* DS_NACK */
139 };
140 
141 
142 
143 /* initialization functions */
144 static int ds_ldc_init(ds_port_t *port);
145 
146 /* event processing functions */
147 static uint_t ds_ldc_cb(uint64_t event, caddr_t arg);
148 static int ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep);
149 static void ds_handle_up_event(ds_port_t *port);
150 static void ds_handle_down_reset_events(ds_port_t *port);
151 static void ds_handle_recv(void *arg);
152 static void ds_dispatch_event(void *arg);
153 
154 /* message sending functions */
155 static int ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen);
156 static int ds_send_reg_req(ds_svc_t *svc, ds_port_t *port);
157 static void ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
158 static void ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
159 
160 /* walker functions */
161 static int ds_svc_isfree(ds_svc_t *svc, void *arg);
162 static int ds_svc_unregister(ds_svc_t *svc, void *arg);
163 static int ds_svc_port_up(ds_svc_t *svc, void *arg);
164 
165 /* service utilities */
166 static void ds_reset_svc(ds_svc_t *svc, ds_port_t *port);
167 static int ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port);
168 
169 /* port utilities */
170 static void ds_port_reset(ds_port_t *port);
171 static ldc_status_t ds_update_ldc_state(ds_port_t *port);
172 
173 /* misc utilities */
174 static void min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
175     uint16_t *min_major, uint16_t *max_major);
176 
177 /* debug */
178 static char *decode_ldc_events(uint64_t event, char *buf);
179 
180 /* loopback */
181 static void ds_loopback_register(ds_svc_hdl_t hdl);
182 static void ds_loopback_unregister(ds_svc_hdl_t hdl);
183 static void ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen);
184 static int ds_loopback_set_svc(ds_svc_t *svc, ds_capability_t *cap,
185     ds_svc_hdl_t *lb_hdlp);
186 
187 /* client handling */
188 static int i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
189     uint_t maxhdls);
190 static ds_svc_t *ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl,
191     ds_port_t *port);
192 static ds_svc_t *ds_find_svc_by_id_port(char *svc_id, int is_client,
193     ds_port_t *port);
194 static ds_svc_t *ds_svc_clone(ds_svc_t *svc);
195 static void ds_portset_del_active_clients(char *service, ds_portset_t *portsp);
196 static void ds_check_for_dup_services(ds_svc_t *svc);
197 static void ds_delete_svc_entry(ds_svc_t *svc);
198 
199 char *
200 ds_strdup(char *str)
201 {
202 	char *newstr;
203 
204 	newstr = DS_MALLOC(strlen(str) + 1);
205 	(void) strcpy(newstr, str);
206 	return (newstr);
207 }
208 
209 void
210 ds_common_init(void)
211 {
212 	/* Validate version table */
213 	ASSERT(ds_vers_isvalid(ds_vers, DS_NUM_VER) == DS_VERS_OK);
214 
215 	/* Initialize services table */
216 	ds_init_svcs_tbl(DS_MAXSVCS_INIT);
217 
218 	/* enable callback processing */
219 	ds_enabled = B_TRUE;
220 }
221 
222 /* BEGIN LDC SUPPORT FUNCTIONS */
223 
224 static char *
225 decode_ldc_events(uint64_t event, char *buf)
226 {
227 	buf[0] = 0;
228 	if (event & LDC_EVT_DOWN)	(void) strcat(buf, " DOWN");
229 	if (event & LDC_EVT_RESET)	(void) strcat(buf, " RESET");
230 	if (event & LDC_EVT_UP)		(void) strcat(buf, " UP");
231 	if (event & LDC_EVT_READ)	(void) strcat(buf, " READ");
232 	if (event & LDC_EVT_WRITE)	(void) strcat(buf, " WRITE");
233 	return (buf);
234 }
235 
236 static ldc_status_t
237 ds_update_ldc_state(ds_port_t *port)
238 {
239 	ldc_status_t	ldc_state;
240 	int		rv;
241 	char		ebuf[DS_EBUFSIZE];
242 
243 	ASSERT(MUTEX_HELD(&port->lock));
244 
245 	/*
246 	 * Read status and update ldc state info in port structure.
247 	 */
248 	if ((rv = ldc_status(port->ldc.hdl, &ldc_state)) != 0) {
249 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_status error: %s" DS_EOL,
250 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
251 		ldc_state = port->ldc.state;
252 	} else {
253 		port->ldc.state = ldc_state;
254 	}
255 
256 	return (ldc_state);
257 }
258 
259 static void
260 ds_handle_down_reset_events(ds_port_t *port)
261 {
262 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
263 	    __func__);
264 
265 	mutex_enter(&ds_svcs.lock);
266 	mutex_enter(&port->lock);
267 
268 	ds_sys_drain_events(port);
269 
270 	(void) ds_update_ldc_state(port);
271 
272 	/* reset the port state */
273 	ds_port_reset(port);
274 
275 	/* acknowledge the reset */
276 	(void) ldc_up(port->ldc.hdl);
277 
278 	mutex_exit(&port->lock);
279 	mutex_exit(&ds_svcs.lock);
280 
281 	ds_handle_up_event(port);
282 
283 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
284 }
285 
286 static void
287 ds_handle_up_event(ds_port_t *port)
288 {
289 	ldc_status_t	ldc_state;
290 
291 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
292 	    __func__);
293 
294 	mutex_enter(&port->lock);
295 
296 	ldc_state = ds_update_ldc_state(port);
297 
298 	mutex_exit(&port->lock);
299 
300 	if ((ldc_state == LDC_UP) && IS_DS_PORT(port)) {
301 		/*
302 		 * Initiate the handshake.
303 		 */
304 		ds_send_init_req(port);
305 	}
306 
307 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
308 }
309 
310 static uint_t
311 ds_ldc_cb(uint64_t event, caddr_t arg)
312 {
313 	ds_port_t	*port = (ds_port_t *)arg;
314 	char		evstring[DS_DECODE_BUF_LEN];
315 
316 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: %s event (%llx) received" DS_EOL,
317 	    PORTID(port), __func__, decode_ldc_events(event, evstring),
318 	    (u_longlong_t)event);
319 
320 	if (!ds_enabled) {
321 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: callback handling is disabled"
322 		    DS_EOL, PORTID(port), __func__);
323 		return (LDC_SUCCESS);
324 	}
325 
326 	if (event & (LDC_EVT_DOWN | LDC_EVT_RESET)) {
327 		ds_handle_down_reset_events(port);
328 		goto done;
329 	}
330 
331 	if (event & LDC_EVT_UP) {
332 		ds_handle_up_event(port);
333 	}
334 
335 	if (event & LDC_EVT_READ) {
336 		if (port->ldc.state != LDC_UP) {
337 			cmn_err(CE_WARN, "ds@%lx: %s: LDC READ event while "
338 			    "port not up" DS_EOL, PORTID(port), __func__);
339 			goto done;
340 		}
341 
342 		if (ds_sys_dispatch_func(ds_handle_recv, port)) {
343 			cmn_err(CE_WARN, "ds@%lx: error initiating LDC READ "
344 			    " event", PORTID(port));
345 		}
346 	}
347 
348 	if (event & LDC_EVT_WRITE) {
349 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: LDC WRITE event received, "
350 		    "not supported" DS_EOL, PORTID(port), __func__);
351 	}
352 
353 	if (event & ~(LDC_EVT_UP | LDC_EVT_READ)) {
354 		cmn_err(CE_WARN, "ds@%lx: %s: Unexpected LDC event received: "
355 		    "0x%llx" DS_EOL, PORTID(port), __func__,
356 		    (u_longlong_t)event);
357 	}
358 done:
359 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
360 
361 	return (LDC_SUCCESS);
362 }
363 
364 static int
365 ds_ldc_init(ds_port_t *port)
366 {
367 	int		rv;
368 	ldc_attr_t	ldc_attr;
369 	caddr_t		ldc_cb_arg = (caddr_t)port;
370 	char		ebuf[DS_EBUFSIZE];
371 
372 	ASSERT(MUTEX_HELD(&port->lock));
373 
374 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%lld" DS_EOL,
375 	    PORTID(port), __func__, (u_longlong_t)port->ldc.id);
376 
377 	ldc_attr.devclass = LDC_DEV_GENERIC;
378 	ldc_attr.instance = 0;
379 	ldc_attr.mode = LDC_MODE_RELIABLE;
380 	ldc_attr.mtu = DS_STREAM_MTU;
381 
382 	if ((rv = ldc_init(port->ldc.id, &ldc_attr, &port->ldc.hdl)) != 0) {
383 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_id: %lx, ldc_init error: %s"
384 		    DS_EOL, PORTID(port), __func__, port->ldc.id,
385 		    ds_errno_to_str(rv, ebuf));
386 		return (rv);
387 	}
388 
389 	rv = ldc_reg_callback(port->ldc.hdl, ds_ldc_cb, ldc_cb_arg);
390 	if (rv != 0) {
391 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_reg_callback error: %s"
392 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
393 		return (rv);
394 	}
395 
396 	ds_sys_ldc_init(port);
397 	return (0);
398 }
399 
400 int
401 ds_ldc_fini(ds_port_t *port)
402 {
403 	int	rv;
404 	char	ebuf[DS_EBUFSIZE];
405 
406 	ASSERT(port->state >= DS_PORT_LDC_INIT);
407 
408 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%ld" DS_EOL, PORTID(port),
409 	    __func__, port->ldc.id);
410 
411 	if ((rv = ldc_close(port->ldc.hdl)) != 0) {
412 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_close error: %s" DS_EOL,
413 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
414 		return (rv);
415 	}
416 
417 	if ((rv = ldc_unreg_callback(port->ldc.hdl)) != 0) {
418 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_unreg_callback error: %s"
419 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
420 		return (rv);
421 	}
422 
423 	if ((rv = ldc_fini(port->ldc.hdl)) != 0) {
424 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_fini error: %s" DS_EOL,
425 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
426 		return (rv);
427 	}
428 
429 	return (rv);
430 }
431 
432 /*
433  * Attempt to read a specified number of bytes from a particular LDC.
434  * Returns zero for success or the return code from the LDC read on
435  * failure. The actual number of bytes read from the LDC is returned
436  * in the size parameter.
437  */
438 static int
439 ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep)
440 {
441 	int	rv = 0;
442 	size_t	bytes_req = *sizep;
443 	size_t	bytes_left = bytes_req;
444 	size_t	nbytes;
445 	int	retry_count = 0;
446 	char	ebuf[DS_EBUFSIZE];
447 
448 	ASSERT(MUTEX_HELD(&port->rcv_lock));
449 
450 	*sizep = 0;
451 
452 	DS_DBG_LDC(CE_NOTE, "ds@%lx: attempting to read %ld bytes" DS_EOL,
453 	    PORTID(port), bytes_req);
454 
455 	while (bytes_left > 0) {
456 
457 		nbytes = bytes_left;
458 
459 		if ((rv = ldc_read(port->ldc.hdl, msgp, &nbytes)) != 0) {
460 			if (rv == ECONNRESET) {
461 				break;
462 			} else if (rv != EAGAIN) {
463 				cmn_err(CE_NOTE, "ds@%lx: %s: %s" DS_EOL,
464 				    PORTID(port), __func__,
465 				    ds_errno_to_str(rv, ebuf));
466 				break;
467 			}
468 		} else {
469 			if (nbytes != 0) {
470 				DS_DBG_LDC(CE_NOTE, "ds@%lx: "
471 				    "read %ld bytes, %d retries" DS_EOL,
472 				    PORTID(port), nbytes, retry_count);
473 
474 				*sizep += nbytes;
475 				msgp += nbytes;
476 				bytes_left -= nbytes;
477 
478 				/* reset counter on a successful read */
479 				retry_count = 0;
480 				continue;
481 			}
482 
483 			/*
484 			 * No data was read. Check if this is the
485 			 * first attempt. If so, just return since
486 			 * nothing has been read yet.
487 			 */
488 			if (bytes_left == bytes_req) {
489 				DS_DBG_LDC(CE_NOTE, "ds@%lx: read zero bytes, "
490 				    " no data available" DS_EOL, PORTID(port));
491 				break;
492 			}
493 		}
494 
495 		/*
496 		 * A retry is necessary because the read returned
497 		 * EAGAIN, or a zero length read occurred after
498 		 * reading a partial message.
499 		 */
500 		if (retry_count++ >= ds_retries) {
501 			DS_DBG_LDC(CE_NOTE, "ds@%lx: timed out waiting for "
502 			    "message" DS_EOL, PORTID(port));
503 			break;
504 		}
505 
506 		drv_usecwait(ds_delay);
507 	}
508 
509 	return (rv);
510 }
511 
512 static void
513 ds_handle_recv(void *arg)
514 {
515 	ds_port_t	*port = (ds_port_t *)arg;
516 	char		*hbuf;
517 	size_t		msglen;
518 	size_t		read_size;
519 	boolean_t	hasdata;
520 	ds_hdr_t	hdr;
521 	uint8_t		*msg;
522 	char		*currp;
523 	int		rv;
524 	ds_event_t	*devent;
525 
526 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s..." DS_EOL, PORTID(port), __func__);
527 
528 	/*
529 	 * Read messages from the channel until there are none
530 	 * pending. Valid messages are dispatched to be handled
531 	 * by a separate thread while any malformed messages are
532 	 * dropped.
533 	 */
534 
535 	mutex_enter(&port->rcv_lock);
536 
537 	while (((rv = ldc_chkq(port->ldc.hdl, &hasdata)) == 0) && hasdata) {
538 
539 		DS_DBG(CE_NOTE, "ds@%lx: %s: reading next message" DS_EOL,
540 		    PORTID(port), __func__);
541 
542 		/*
543 		 * Read in the next message.
544 		 */
545 		hbuf = (char *)&hdr;
546 		bzero(hbuf, DS_HDR_SZ);
547 		read_size = DS_HDR_SZ;
548 		currp = hbuf;
549 
550 		/* read in the message header */
551 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
552 			break;
553 		}
554 
555 		if (read_size < DS_HDR_SZ) {
556 			/*
557 			 * A zero length read is a valid signal that
558 			 * there is no data left on the channel.
559 			 */
560 			if (read_size != 0) {
561 				cmn_err(CE_WARN, "ds@%lx: invalid message "
562 				    "length, received %ld bytes, expected %ld"
563 				    DS_EOL, PORTID(port), read_size, DS_HDR_SZ);
564 			}
565 			continue;
566 		}
567 
568 		/* get payload size and allocate a buffer */
569 		read_size = ((ds_hdr_t *)hbuf)->payload_len;
570 		msglen = DS_HDR_SZ + read_size;
571 		msg = DS_MALLOC(msglen);
572 		if (!msg) {
573 			cmn_err(CE_WARN, "Memory allocation failed attempting "
574 			    " to allocate %d bytes." DS_EOL, (int)msglen);
575 			continue;
576 		}
577 
578 		DS_DBG(CE_NOTE, "ds@%lx: %s: message payload len %d" DS_EOL,
579 		    PORTID(port), __func__, (int)read_size);
580 
581 		/* move message header into buffer */
582 		(void) memcpy(msg, hbuf, DS_HDR_SZ);
583 		currp = (char *)(msg) + DS_HDR_SZ;
584 
585 		/* read in the message body */
586 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
587 			DS_FREE(msg, msglen);
588 			break;
589 		}
590 
591 		/* validate the size of the message */
592 		if ((DS_HDR_SZ + read_size) != msglen) {
593 			cmn_err(CE_WARN, "ds@%lx: %s: invalid message length, "
594 			    "received %ld bytes, expected %ld" DS_EOL,
595 			    PORTID(port), __func__, (DS_HDR_SZ + read_size),
596 			    msglen);
597 			DS_FREE(msg, msglen);
598 			continue;
599 		}
600 
601 		DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
602 
603 		/*
604 		 * Send the message for processing, and store it
605 		 * in the log. The memory is deallocated only when
606 		 * the message is removed from the log.
607 		 */
608 
609 		devent = DS_MALLOC(sizeof (ds_event_t));
610 		devent->port = port;
611 		devent->buf = (char *)msg;
612 		devent->buflen = msglen;
613 
614 		/* log the message */
615 		(void) ds_log_add_msg(DS_LOG_IN(port->id), msg, msglen);
616 
617 		if (ds_sys_dispatch_func(ds_dispatch_event, devent)) {
618 			cmn_err(CE_WARN, "ds@%lx: error initiating "
619 			    "event handler", PORTID(port));
620 			DS_FREE(devent, sizeof (ds_event_t));
621 		}
622 	}
623 
624 	mutex_exit(&port->rcv_lock);
625 
626 	/* handle connection reset errors returned from ds_recv_msg */
627 	if (rv == ECONNRESET) {
628 		ds_handle_down_reset_events(port);
629 	}
630 
631 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s done" DS_EOL, PORTID(port), __func__);
632 }
633 
634 static void
635 ds_dispatch_event(void *arg)
636 {
637 	ds_event_t	*event = (ds_event_t *)arg;
638 	ds_hdr_t	*hdr;
639 	ds_port_t	*port;
640 
641 	port = event->port;
642 
643 	hdr = (ds_hdr_t *)event->buf;
644 
645 	if (DS_MSG_TYPE_VALID(hdr->msg_type)) {
646 		DS_DBG(CE_NOTE, "ds@%lx: dispatch_event: msg_type=%d" DS_EOL,
647 		    PORTID(port), hdr->msg_type);
648 
649 		(*ds_msg_handlers[hdr->msg_type])(port, event->buf,
650 		    event->buflen);
651 	} else {
652 		cmn_err(CE_WARN, "ds@%lx: dispatch_event: invalid msg "
653 		    "type (%d)" DS_EOL, PORTID(port), hdr->msg_type);
654 	}
655 
656 	DS_FREE(event->buf, event->buflen);
657 	DS_FREE(event, sizeof (ds_event_t));
658 }
659 
660 int
661 ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen)
662 {
663 	int	rv;
664 	caddr_t	currp = msg;
665 	size_t	amt_left = msglen;
666 	int	loopcnt = 0;
667 
668 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s msglen: %ld" DS_EOL, PORTID(port),
669 	    __func__, msglen);
670 	DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
671 
672 	/*
673 	 * Ensure that no other messages can be sent on this port by holding
674 	 * the tx_lock mutex in case the write doesn't get sent with one write.
675 	 * This guarantees that the message doesn't become fragmented.
676 	 */
677 	mutex_enter(&port->tx_lock);
678 
679 	do {
680 		if ((rv = ldc_write(port->ldc.hdl, currp, &msglen)) != 0) {
681 			if (rv == ECONNRESET) {
682 				mutex_exit(&port->tx_lock);
683 				ds_handle_down_reset_events(port);
684 				return (rv);
685 			} else if ((rv == EWOULDBLOCK) &&
686 			    (loopcnt++ < ds_retries)) {
687 				drv_usecwait(ds_delay);
688 			} else {
689 				cmn_err(CE_WARN, "ds@%lx: send_msg: ldc_write "
690 				    "failed (%d), %d bytes remaining" DS_EOL,
691 				    PORTID(port), rv, (int)amt_left);
692 				goto error;
693 			}
694 		} else {
695 			amt_left -= msglen;
696 			currp += msglen;
697 			msglen = amt_left;
698 			loopcnt = 0;
699 		}
700 	} while (amt_left > 0);
701 error:
702 	mutex_exit(&port->tx_lock);
703 
704 	return (rv);
705 }
706 
707 /* END LDC SUPPORT FUNCTIONS */
708 
709 
710 /* BEGIN DS PROTOCOL SUPPORT FUNCTIONS */
711 
712 static void
713 ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len)
714 {
715 	ds_hdr_t	*hdr;
716 	ds_init_ack_t	*ack;
717 	ds_init_nack_t	*nack;
718 	char		*msg;
719 	size_t		msglen;
720 	ds_init_req_t	*req;
721 	size_t		explen = DS_MSG_LEN(ds_init_req_t);
722 	uint16_t	new_major;
723 	uint16_t	new_minor;
724 	boolean_t	match;
725 
726 	/* sanity check the incoming message */
727 	if (len != explen) {
728 		cmn_err(CE_WARN, "ds@%lx: <init_req: invalid message "
729 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
730 		    explen);
731 		return;
732 	}
733 
734 	req = (ds_init_req_t *)(buf + DS_HDR_SZ);
735 
736 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_req: ver=%d.%d" DS_EOL,
737 	    PORTID(port), req->major_vers, req->minor_vers);
738 
739 	match = negotiate_version(DS_NUM_VER, &ds_vers[0],
740 	    req->major_vers, &new_major, &new_minor);
741 
742 	/*
743 	 * Check version info. ACK only if the major numbers exactly
744 	 * match. The service entity can retry with a new minor
745 	 * based on the response sent as part of the NACK.
746 	 */
747 	if (match) {
748 		msglen = DS_MSG_LEN(ds_init_ack_t);
749 		msg = DS_MALLOC(msglen);
750 
751 		hdr = (ds_hdr_t *)msg;
752 		hdr->msg_type = DS_INIT_ACK;
753 		hdr->payload_len = sizeof (ds_init_ack_t);
754 
755 		ack = (ds_init_ack_t *)(msg + DS_HDR_SZ);
756 		ack->minor_vers = MIN(new_minor, req->minor_vers);
757 
758 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_ack>: minor=0x%04X" DS_EOL,
759 		    PORTID(port), MIN(new_minor, req->minor_vers));
760 	} else {
761 		msglen = DS_MSG_LEN(ds_init_nack_t);
762 		msg = DS_MALLOC(msglen);
763 
764 		hdr = (ds_hdr_t *)msg;
765 		hdr->msg_type = DS_INIT_NACK;
766 		hdr->payload_len = sizeof (ds_init_nack_t);
767 
768 		nack = (ds_init_nack_t *)(msg + DS_HDR_SZ);
769 		nack->major_vers = new_major;
770 
771 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_nack>: major=0x%04X" DS_EOL,
772 		    PORTID(port), new_major);
773 	}
774 
775 	/*
776 	 * Send the response
777 	 */
778 	(void) ds_send_msg(port, msg, msglen);
779 	DS_FREE(msg, msglen);
780 }
781 
782 static void
783 ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len)
784 {
785 	ds_init_ack_t	*ack;
786 	ds_ver_t	*ver;
787 	size_t		explen = DS_MSG_LEN(ds_init_ack_t);
788 
789 	/* sanity check the incoming message */
790 	if (len != explen) {
791 		cmn_err(CE_WARN, "ds@%lx: <init_ack: invalid message "
792 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
793 		    explen);
794 		return;
795 	}
796 
797 	ack = (ds_init_ack_t *)(buf + DS_HDR_SZ);
798 
799 	mutex_enter(&port->lock);
800 
801 	if (port->state != DS_PORT_INIT_REQ) {
802 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: invalid state: %d"
803 		    DS_EOL, PORTID(port), port->state);
804 		mutex_exit(&port->lock);
805 		return;
806 	}
807 
808 	ver = &(ds_vers[port->ver_idx]);
809 
810 	/* agreed upon a major version */
811 	port->ver.major = ver->major;
812 
813 	/*
814 	 * If the returned minor version is larger than
815 	 * the requested minor version, use the lower of
816 	 * the two, i.e. the requested version.
817 	 */
818 	if (ack->minor_vers >= ver->minor) {
819 		/*
820 		 * Use the minor version specified in the
821 		 * original request.
822 		 */
823 		port->ver.minor = ver->minor;
824 	} else {
825 		/*
826 		 * Use the lower minor version returned in
827 		 * the ack. By definition, all lower minor
828 		 * versions must be supported.
829 		 */
830 		port->ver.minor = ack->minor_vers;
831 	}
832 
833 	port->state = DS_PORT_READY;
834 
835 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: port ready v%d.%d" DS_EOL,
836 	    PORTID(port), port->ver.major, port->ver.minor);
837 
838 	mutex_exit(&port->lock);
839 
840 	/*
841 	 * The port came up, so update all the services
842 	 * with this information. Follow that up with an
843 	 * attempt to register any service that is not
844 	 * already registered.
845 	 */
846 	mutex_enter(&ds_svcs.lock);
847 
848 	(void) ds_walk_svcs(ds_svc_port_up, port);
849 	(void) ds_walk_svcs(ds_svc_register, NULL);
850 
851 	mutex_exit(&ds_svcs.lock);
852 }
853 
854 static void
855 ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len)
856 {
857 	int		idx;
858 	ds_init_nack_t	*nack;
859 	ds_ver_t	*ver;
860 	size_t		explen = DS_MSG_LEN(ds_init_nack_t);
861 
862 	/* sanity check the incoming message */
863 	if (len != explen) {
864 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: invalid message "
865 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
866 		    explen);
867 		return;
868 	}
869 
870 	nack = (ds_init_nack_t *)(buf + DS_HDR_SZ);
871 
872 	mutex_enter(&port->lock);
873 
874 	if (port->state != DS_PORT_INIT_REQ) {
875 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: invalid state: %d"
876 		    DS_EOL, PORTID(port), port->state);
877 		mutex_exit(&port->lock);
878 		return;
879 	}
880 
881 	ver = &(ds_vers[port->ver_idx]);
882 
883 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: req=v%d.%d, nack=v%d.x"
884 	    DS_EOL, PORTID(port), ver->major, ver->minor, nack->major_vers);
885 
886 	if (nack->major_vers == 0) {
887 		/* no supported protocol version */
888 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS not supported"
889 		    DS_EOL, PORTID(port));
890 		mutex_exit(&port->lock);
891 		return;
892 	}
893 
894 	/*
895 	 * Walk the version list, looking for a major version
896 	 * that is as close to the requested major version as
897 	 * possible.
898 	 */
899 	for (idx = port->ver_idx; idx < DS_NUM_VER; idx++) {
900 		if (ds_vers[idx].major <= nack->major_vers) {
901 			/* found a version to try */
902 			goto done;
903 		}
904 	}
905 
906 	if (idx == DS_NUM_VER) {
907 		/* no supported version */
908 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS v%d.x not "
909 		    "supported" DS_EOL, PORTID(port), nack->major_vers);
910 
911 		mutex_exit(&port->lock);
912 		return;
913 	}
914 
915 done:
916 	/* start the handshake again */
917 	port->ver_idx = idx;
918 	port->state = DS_PORT_LDC_INIT;
919 	mutex_exit(&port->lock);
920 
921 	ds_send_init_req(port);
922 
923 }
924 
925 static ds_svc_t *
926 ds_find_svc_by_id_port(char *svc_id, int is_client, ds_port_t *port)
927 {
928 	int		idx;
929 	ds_svc_t	*svc, *found_svc = 0;
930 	uint32_t	flag_match = is_client ? DSSF_ISCLIENT : 0;
931 
932 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
933 
934 	/* walk every table entry */
935 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
936 		svc = ds_svcs.tbl[idx];
937 		if (DS_SVC_ISFREE(svc))
938 			continue;
939 		if (strcmp(svc->cap.svc_id, svc_id) != 0)
940 			continue;
941 		if ((svc->flags & DSSF_ISCLIENT) != flag_match)
942 			continue;
943 		if (port != NULL && svc->port == port) {
944 			return (svc);
945 		} else if (svc->state == DS_SVC_INACTIVE) {
946 			found_svc = svc;
947 		} else if (!found_svc) {
948 			found_svc = svc;
949 		}
950 	}
951 
952 	return (found_svc);
953 }
954 
955 static void
956 ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len)
957 {
958 	ds_reg_req_t	*req;
959 	ds_hdr_t	*hdr;
960 	ds_reg_ack_t	*ack;
961 	ds_reg_nack_t	*nack;
962 	char		*msg;
963 	size_t		msglen;
964 	size_t		explen = DS_MSG_LEN(ds_reg_req_t);
965 	ds_svc_t	*svc = NULL;
966 	ds_ver_t	version;
967 	uint16_t	new_major;
968 	uint16_t	new_minor;
969 	boolean_t	match;
970 
971 	/* sanity check the incoming message */
972 	if (len < explen) {
973 		cmn_err(CE_WARN, "ds@%lx: <reg_req: invalid message "
974 		    "length (%ld), expected at least %ld" DS_EOL,
975 		    PORTID(port), len, explen);
976 		return;
977 	}
978 
979 	req = (ds_reg_req_t *)(buf + DS_HDR_SZ);
980 
981 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' ver=%d.%d, hdl=0x%llx"
982 	    DS_EOL, PORTID(port), req->svc_id, req->major_vers, req->minor_vers,
983 	    (u_longlong_t)req->svc_handle);
984 
985 	mutex_enter(&ds_svcs.lock);
986 	svc = ds_find_svc_by_id_port(req->svc_id,
987 	    DS_HDL_ISCLIENT(req->svc_handle) == 0, port);
988 	if (svc == NULL) {
989 
990 do_reg_nack:
991 		mutex_exit(&ds_svcs.lock);
992 
993 		msglen = DS_MSG_LEN(ds_reg_nack_t);
994 		msg = DS_MALLOC(msglen);
995 
996 		hdr = (ds_hdr_t *)msg;
997 		hdr->msg_type = DS_REG_NACK;
998 		hdr->payload_len = sizeof (ds_reg_nack_t);
999 
1000 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1001 		nack->svc_handle = req->svc_handle;
1002 		nack->result = DS_REG_VER_NACK;
1003 		nack->major_vers = 0;
1004 
1005 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s'" DS_EOL,
1006 		    PORTID(port), req->svc_id);
1007 		/*
1008 		 * Send the response
1009 		 */
1010 		(void) ds_send_msg(port, msg, msglen);
1011 		DS_FREE(msg, msglen);
1012 		return;
1013 	}
1014 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' found, hdl: 0x%llx" DS_EOL,
1015 	    PORTID(port), req->svc_id, (u_longlong_t)svc->hdl);
1016 
1017 	/*
1018 	 * A client sends out a reg req in order to force service providers to
1019 	 * initiate a reg req from their end (limitation in the protocol).  We
1020 	 * expect the service provider to be in the inactive (DS_SVC_INACTIVE)
1021 	 * state.  If the service provider has already sent out a reg req (the
1022 	 * state is DS_SVC_REG_PENDING) or has already handshaken (the
1023 	 * state is DS_SVC_ACTIVE), then we can simply ignore this reg
1024 	 * req.  For any other state, we force an unregister before initiating
1025 	 * a reg req.
1026 	 */
1027 
1028 	if (DS_HDL_ISCLIENT(req->svc_handle)) {
1029 		switch (svc->state) {
1030 
1031 		case DS_SVC_REG_PENDING:
1032 		case DS_SVC_ACTIVE:
1033 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1034 			    "client, state (%x)" DS_EOL, PORTID(port),
1035 			    req->svc_id, svc->state);
1036 			mutex_exit(&ds_svcs.lock);
1037 			return;
1038 
1039 		case DS_SVC_INACTIVE:
1040 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1041 			    "client" DS_EOL, PORTID(port), req->svc_id);
1042 			break;
1043 
1044 		default:
1045 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1046 			    "client forced unreg, state (%x)" DS_EOL,
1047 			    PORTID(port), req->svc_id, svc->state);
1048 			(void) ds_svc_unregister(svc, port);
1049 			break;
1050 		}
1051 		(void) ds_svc_port_up(svc, port);
1052 		(void) ds_svc_register_onport(svc, port);
1053 		mutex_exit(&ds_svcs.lock);
1054 		return;
1055 	}
1056 
1057 	/*
1058 	 * Only remote service providers can initiate a registration.  The
1059 	 * local sevice from here must be a client service.
1060 	 */
1061 
1062 	match = negotiate_version(svc->cap.nvers, svc->cap.vers,
1063 	    req->major_vers, &new_major, &new_minor);
1064 
1065 	/*
1066 	 * Check version info. ACK only if the major numbers exactly
1067 	 * match. The service entity can retry with a new minor
1068 	 * based on the response sent as part of the NACK.
1069 	 */
1070 	if (match) {
1071 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' svc%d: state: %x "
1072 		    "svc_portid: %d" DS_EOL, PORTID(port), req->svc_id,
1073 		    (int)DS_HDL2IDX(svc->hdl), svc->state,
1074 		    (int)(svc->port == NULL ? -1 : PORTID(svc->port)));
1075 		/*
1076 		 * If the current local service is already in use and
1077 		 * it's not on this port, clone it.
1078 		 */
1079 		if (svc->state != DS_SVC_INACTIVE) {
1080 			if (svc->port != NULL && port == svc->port) {
1081 				/*
1082 				 * Someone probably dropped an unreg req
1083 				 * somewhere.  Force a local unreg.
1084 				 */
1085 				(void) ds_svc_unregister(svc, port);
1086 			} else if (!DS_HDL_ISCLIENT(svc->hdl)) {
1087 				/*
1088 				 * Can't clone a non-client (service provider)
1089 				 * handle.  This is because old in-kernel
1090 				 * service providers can't deal with multiple
1091 				 * handles.
1092 				 */
1093 				goto do_reg_nack;
1094 			} else {
1095 				svc = ds_svc_clone(svc);
1096 			}
1097 		}
1098 		svc->port = port;
1099 		svc->svc_hdl = req->svc_handle;
1100 		svc->state = DS_SVC_ACTIVE;
1101 
1102 		msglen = DS_MSG_LEN(ds_reg_ack_t);
1103 		msg = DS_MALLOC(msglen);
1104 
1105 		hdr = (ds_hdr_t *)msg;
1106 		hdr->msg_type = DS_REG_ACK;
1107 		hdr->payload_len = sizeof (ds_reg_ack_t);
1108 
1109 		ack = (ds_reg_ack_t *)(msg + DS_HDR_SZ);
1110 		ack->svc_handle = req->svc_handle;
1111 		ack->minor_vers = MIN(new_minor, req->minor_vers);
1112 
1113 
1114 		if (svc->ops.ds_reg_cb) {
1115 			/* Call the registration callback */
1116 			version.major = req->major_vers;
1117 			version.minor = ack->minor_vers;
1118 			(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &version,
1119 			    svc->hdl);
1120 		}
1121 		mutex_exit(&ds_svcs.lock);
1122 
1123 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_ack>: '%s' minor=0x%04X"
1124 		    DS_EOL, PORTID(port), svc->cap.svc_id,
1125 		    MIN(new_minor, req->minor_vers));
1126 	} else {
1127 		mutex_exit(&ds_svcs.lock);
1128 
1129 		msglen = DS_MSG_LEN(ds_reg_nack_t);
1130 		msg = DS_MALLOC(msglen);
1131 
1132 		hdr = (ds_hdr_t *)msg;
1133 		hdr->msg_type = DS_REG_NACK;
1134 		hdr->payload_len = sizeof (ds_reg_nack_t);
1135 
1136 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1137 		nack->svc_handle = req->svc_handle;
1138 		nack->result = DS_REG_VER_NACK;
1139 		nack->major_vers = new_major;
1140 
1141 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s' major=0x%04X"
1142 		    DS_EOL, PORTID(port), svc->cap.svc_id, new_major);
1143 	}
1144 
1145 	/* send message */
1146 	(void) ds_send_msg(port, msg, msglen);
1147 	DS_FREE(msg, msglen);
1148 }
1149 
1150 static void
1151 ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len)
1152 {
1153 	ds_reg_ack_t	*ack;
1154 	ds_ver_t	*ver;
1155 	ds_ver_t	tmpver;
1156 	ds_svc_t	*svc;
1157 	size_t		explen = DS_MSG_LEN(ds_reg_ack_t);
1158 
1159 	/* sanity check the incoming message */
1160 	if (len != explen) {
1161 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid message "
1162 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1163 		    explen);
1164 		return;
1165 	}
1166 
1167 	ack = (ds_reg_ack_t *)(buf + DS_HDR_SZ);
1168 
1169 	mutex_enter(&ds_svcs.lock);
1170 
1171 	/*
1172 	 * This searches for service based on how we generate handles
1173 	 * and so only works because this is a reg ack.
1174 	 */
1175 	if (DS_HDL_ISCLIENT(ack->svc_handle) ||
1176 	    (svc = ds_get_svc(ack->svc_handle)) == NULL) {
1177 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid handle 0x%llx"
1178 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1179 		goto done;
1180 	}
1181 
1182 	/* make sure the message makes sense */
1183 	if (svc->state != DS_SVC_REG_PENDING) {
1184 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid state (%d)" DS_EOL,
1185 		    PORTID(port), svc->state);
1186 		goto done;
1187 	}
1188 
1189 	ver = &(svc->cap.vers[svc->ver_idx]);
1190 
1191 	/* major version has been agreed upon */
1192 	svc->ver.major = ver->major;
1193 
1194 	if (ack->minor_vers >= ver->minor) {
1195 		/*
1196 		 * Use the minor version specified in the
1197 		 * original request.
1198 		 */
1199 		svc->ver.minor = ver->minor;
1200 	} else {
1201 		/*
1202 		 * Use the lower minor version returned in
1203 		 * the ack. By defninition, all lower minor
1204 		 * versions must be supported.
1205 		 */
1206 		svc->ver.minor = ack->minor_vers;
1207 	}
1208 
1209 	svc->state = DS_SVC_ACTIVE;
1210 	svc->port = port;
1211 
1212 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_ack: '%s' v%d.%d ready, hdl=0x%llx"
1213 	    DS_EOL, PORTID(port), svc->cap.svc_id, svc->ver.major,
1214 	    svc->ver.minor, (u_longlong_t)svc->hdl);
1215 
1216 	/* notify the client that registration is complete */
1217 	if (svc->ops.ds_reg_cb) {
1218 		/*
1219 		 * Use a temporary version structure so that
1220 		 * the copy in the svc structure cannot be
1221 		 * modified by the client.
1222 		 */
1223 		tmpver.major = svc->ver.major;
1224 		tmpver.minor = svc->ver.minor;
1225 
1226 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &tmpver, svc->hdl);
1227 	}
1228 
1229 done:
1230 	mutex_exit(&ds_svcs.lock);
1231 }
1232 
1233 static void
1234 ds_try_next_port(ds_svc_t *svc, int portid)
1235 {
1236 	ds_port_t *port;
1237 	ds_portset_t totry;
1238 	int i;
1239 
1240 	DS_DBG_LDC(CE_NOTE, "ds@%x %s" DS_EOL, portid, __func__);
1241 
1242 	/*
1243 	 * Get the ports that haven't been tried yet and are available to try.
1244 	 */
1245 	DS_PORTSET_SETNULL(totry);
1246 	for (i = 0; i < DS_MAX_PORTS; i++) {
1247 		if (!DS_PORT_IN_SET(svc->tried, i) &&
1248 		    DS_PORT_IN_SET(svc->avail, i))
1249 			DS_PORTSET_ADD(totry, i);
1250 	}
1251 
1252 	if (DS_PORTSET_ISNULL(totry))
1253 		return;
1254 
1255 	for (i = 0; i < DS_MAX_PORTS; i++, portid++) {
1256 		if (portid >= DS_MAX_PORTS) {
1257 			portid = 0;
1258 		}
1259 
1260 		/*
1261 		 * If the port is not in the available list,
1262 		 * it is not a candidate for registration.
1263 		 */
1264 		if (!DS_PORT_IN_SET(totry, portid)) {
1265 			continue;
1266 		}
1267 
1268 		port = &ds_ports[portid];
1269 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s trying ldc.id: %d" DS_EOL,
1270 		    portid, __func__, (uint_t)(port->ldc.id));
1271 		if (ds_send_reg_req(svc, port) == 0) {
1272 			DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send OK" DS_EOL,
1273 			    portid, __func__);
1274 			/* register sent successfully */
1275 			break;
1276 		}
1277 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send FAIL" DS_EOL,
1278 		    portid, __func__);
1279 
1280 		/* reset the service to try the next port */
1281 		ds_reset_svc(svc, port);
1282 	}
1283 }
1284 
1285 static void
1286 ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len)
1287 {
1288 	ds_reg_nack_t	*nack;
1289 	ds_svc_t	*svc;
1290 	int		idx;
1291 	size_t		explen = DS_MSG_LEN(ds_reg_nack_t);
1292 
1293 	/* sanity check the incoming message */
1294 	if (len != explen) {
1295 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid message "
1296 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1297 		    explen);
1298 		return;
1299 	}
1300 
1301 	nack = (ds_reg_nack_t *)(buf + DS_HDR_SZ);
1302 
1303 	mutex_enter(&ds_svcs.lock);
1304 
1305 	/*
1306 	 * We expect a reg_nack for a client ping.
1307 	 */
1308 	if (DS_HDL_ISCLIENT(nack->svc_handle)) {
1309 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: ping hdl: 0x%llx"
1310 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1311 		goto done;
1312 	}
1313 
1314 	/*
1315 	 * This searches for service based on how we generate handles
1316 	 * and so only works because this is a reg nack.
1317 	 */
1318 	if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1319 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid handle 0x%llx"
1320 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1321 		goto done;
1322 	}
1323 
1324 	/* make sure the message makes sense */
1325 	if (svc->state != DS_SVC_REG_PENDING) {
1326 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid state (%d)" DS_EOL,
1327 		    PORTID(port), svc->state);
1328 		goto done;
1329 	}
1330 
1331 	if (nack->result == DS_REG_DUP) {
1332 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: duplicate registration "
1333 		    " for %s" DS_EOL, PORTID(port), svc->cap.svc_id);
1334 		ds_reset_svc(svc, port);
1335 		goto done;
1336 	}
1337 
1338 	/*
1339 	 * A major version of zero indicates that the
1340 	 * service is not supported at all.
1341 	 */
1342 	if (nack->major_vers == 0) {
1343 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' not supported"
1344 		    DS_EOL, PORTID(port), svc->cap.svc_id);
1345 		ds_reset_svc(svc, port);
1346 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1347 			ds_try_next_port(svc, PORTID(port) + 1);
1348 		goto done;
1349 	}
1350 
1351 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' hdl=0x%llx, nack=%d.x"
1352 	    DS_EOL, PORTID(port), svc->cap.svc_id,
1353 	    (u_longlong_t)nack->svc_handle, nack->major_vers);
1354 
1355 	/*
1356 	 * Walk the version list for the service, looking for
1357 	 * a major version that is as close to the requested
1358 	 * major version as possible.
1359 	 */
1360 	for (idx = svc->ver_idx; idx < svc->cap.nvers; idx++) {
1361 		if (svc->cap.vers[idx].major <= nack->major_vers) {
1362 			/* found a version to try */
1363 			break;
1364 		}
1365 	}
1366 
1367 	if (idx == svc->cap.nvers) {
1368 		/* no supported version */
1369 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: %s v%d.x not supported"
1370 		    DS_EOL, PORTID(port), svc->cap.svc_id, nack->major_vers);
1371 		ds_reset_svc(svc, port);
1372 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1373 			ds_try_next_port(svc, PORTID(port) + 1);
1374 		goto done;
1375 	}
1376 
1377 	/* start the handshake again */
1378 	svc->state = DS_SVC_INACTIVE;
1379 	svc->ver_idx = idx;
1380 
1381 	(void) ds_svc_register(svc, NULL);
1382 
1383 done:
1384 	mutex_exit(&ds_svcs.lock);
1385 }
1386 
1387 static void
1388 ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len)
1389 {
1390 	ds_hdr_t	*hdr;
1391 	ds_unreg_req_t	*req;
1392 	ds_unreg_ack_t	*ack;
1393 	ds_svc_t	*svc;
1394 	char		*msg;
1395 	size_t		msglen;
1396 	size_t		explen = DS_MSG_LEN(ds_unreg_req_t);
1397 
1398 	/* sanity check the incoming message */
1399 	if (len != explen) {
1400 		cmn_err(CE_WARN, "ds@%lx: <unreg_req: invalid message "
1401 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1402 		    explen);
1403 		return;
1404 	}
1405 
1406 	req = (ds_unreg_req_t *)(buf + DS_HDR_SZ);
1407 
1408 	mutex_enter(&ds_svcs.lock);
1409 
1410 	/* lookup appropriate client or service */
1411 	if (DS_HDL_ISCLIENT(req->svc_handle) ||
1412 	    ((svc = ds_find_clnt_svc_by_hdl_port(req->svc_handle, port))
1413 	    == NULL && ((svc = ds_get_svc(req->svc_handle)) == NULL ||
1414 	    svc->port != port))) {
1415 		cmn_err(CE_WARN, "ds@%lx: <unreg_req: invalid handle 0x%llx"
1416 		    DS_EOL, PORTID(port), (u_longlong_t)req->svc_handle);
1417 		ds_send_unreg_nack(port, req->svc_handle);
1418 		mutex_exit(&ds_svcs.lock);
1419 		return;
1420 	}
1421 
1422 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_req: '%s' handle 0x%llx" DS_EOL,
1423 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1424 
1425 	(void) ds_svc_unregister(svc, svc->port);
1426 
1427 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_ack>: '%s' hdl=0x%llx" DS_EOL,
1428 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1429 
1430 	ds_check_for_dup_services(svc);
1431 
1432 	mutex_exit(&ds_svcs.lock);
1433 
1434 	msglen = DS_HDR_SZ + sizeof (ds_unreg_ack_t);
1435 	msg = DS_MALLOC(msglen);
1436 
1437 	hdr = (ds_hdr_t *)msg;
1438 	hdr->msg_type = DS_UNREG_ACK;
1439 	hdr->payload_len = sizeof (ds_unreg_ack_t);
1440 
1441 	ack = (ds_unreg_ack_t *)(msg + DS_HDR_SZ);
1442 	ack->svc_handle = req->svc_handle;
1443 
1444 	/* send message */
1445 	(void) ds_send_msg(port, msg, msglen);
1446 	DS_FREE(msg, msglen);
1447 
1448 }
1449 
1450 static void
1451 ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len)
1452 {
1453 	ds_unreg_ack_t	*ack;
1454 	size_t		explen = DS_MSG_LEN(ds_unreg_ack_t);
1455 
1456 	/* sanity check the incoming message */
1457 	if (len != explen) {
1458 		cmn_err(CE_WARN, "ds@%lx: <unreg_ack: invalid message "
1459 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1460 		    explen);
1461 		return;
1462 	}
1463 
1464 	ack = (ds_unreg_ack_t *)(buf + DS_HDR_SZ);
1465 
1466 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: hdl=0x%llx" DS_EOL,
1467 	    PORTID(port), (u_longlong_t)ack->svc_handle);
1468 
1469 #ifdef DEBUG
1470 	mutex_enter(&ds_svcs.lock);
1471 
1472 	/*
1473 	 * Since the unregister request was initiated locally,
1474 	 * the service structure has already been torn down.
1475 	 * Just perform a sanity check to make sure the message
1476 	 * is appropriate.
1477 	 */
1478 	if (ds_get_svc(ack->svc_handle) != NULL) {
1479 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: handle 0x%llx in use"
1480 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1481 	}
1482 
1483 	mutex_exit(&ds_svcs.lock);
1484 #endif	/* DEBUG */
1485 }
1486 
1487 static void
1488 ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len)
1489 {
1490 	ds_unreg_nack_t	*nack;
1491 	size_t		explen = DS_MSG_LEN(ds_unreg_nack_t);
1492 
1493 	/* sanity check the incoming message */
1494 	if (len != explen) {
1495 		cmn_err(CE_WARN, "ds@%lx: <unreg_nack: invalid message "
1496 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1497 		    explen);
1498 		return;
1499 	}
1500 
1501 	nack = (ds_unreg_nack_t *)(buf + DS_HDR_SZ);
1502 
1503 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: hdl=0x%llx" DS_EOL,
1504 	    PORTID(port), (u_longlong_t)nack->svc_handle);
1505 
1506 #ifdef DEBUG
1507 	mutex_enter(&ds_svcs.lock);
1508 
1509 	/*
1510 	 * Since the unregister request was initiated locally,
1511 	 * the service structure has already been torn down.
1512 	 * Just perform a sanity check to make sure the message
1513 	 * is appropriate.
1514 	 */
1515 	if (ds_get_svc(nack->svc_handle) != NULL) {
1516 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: handle 0x%llx in use"
1517 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1518 	}
1519 
1520 	mutex_exit(&ds_svcs.lock);
1521 #endif	/* DEBUG */
1522 }
1523 
1524 static void
1525 ds_handle_data(ds_port_t *port, caddr_t buf, size_t len)
1526 {
1527 	ds_data_handle_t	*data;
1528 	ds_svc_t		*svc;
1529 	char			*msg;
1530 	int			msgsz;
1531 	int			hdrsz;
1532 	size_t			explen = DS_MSG_LEN(ds_data_handle_t);
1533 
1534 	/* sanity check the incoming message */
1535 	if (len < explen) {
1536 		cmn_err(CE_WARN, "ds@%lx: <data: invalid message length "
1537 		    "(%ld), expected at least %ld" DS_EOL, PORTID(port), len,
1538 		    explen);
1539 		return;
1540 	}
1541 
1542 	data = (ds_data_handle_t *)(buf + DS_HDR_SZ);
1543 
1544 	hdrsz = DS_HDR_SZ + sizeof (ds_data_handle_t);
1545 	msgsz = len - hdrsz;
1546 
1547 	/* strip off the header for the client */
1548 	msg = (msgsz) ? (buf + hdrsz) : NULL;
1549 
1550 	mutex_enter(&ds_svcs.lock);
1551 
1552 	if ((svc = ds_find_clnt_svc_by_hdl_port(data->svc_handle, port))
1553 	    == NULL) {
1554 		if ((svc = ds_get_svc(data->svc_handle)) == NULL) {
1555 			mutex_exit(&ds_svcs.lock);
1556 			cmn_err(CE_WARN, "ds@%lx: <data: invalid handle 0x%llx"
1557 			    DS_EOL, PORTID(port),
1558 			    (u_longlong_t)data->svc_handle);
1559 			ds_send_data_nack(port, data->svc_handle);
1560 			return;
1561 		}
1562 	}
1563 
1564 	mutex_exit(&ds_svcs.lock);
1565 
1566 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <data: '%s' hdl=0x%llx" DS_EOL,
1567 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)svc->hdl);
1568 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msgsz);
1569 
1570 	/* dispatch this message to the client */
1571 	(*svc->ops.ds_data_cb)(svc->ops.cb_arg, msg, msgsz);
1572 }
1573 
1574 static void
1575 ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len)
1576 {
1577 	ds_svc_t	*svc;
1578 	ds_data_nack_t	*nack;
1579 	size_t		explen = DS_MSG_LEN(ds_data_nack_t);
1580 
1581 	/* sanity check the incoming message */
1582 	if (len != explen) {
1583 		cmn_err(CE_WARN, "ds@%lx: <data_nack: invalid message "
1584 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1585 		    explen);
1586 		return;
1587 	}
1588 
1589 	nack = (ds_data_nack_t *)(buf + DS_HDR_SZ);
1590 
1591 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack: hdl=0x%llx, result=0x%llx"
1592 	    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle,
1593 	    (u_longlong_t)nack->result);
1594 
1595 	if (nack->result == DS_INV_HDL) {
1596 
1597 		mutex_enter(&ds_svcs.lock);
1598 
1599 		if ((svc = ds_find_clnt_svc_by_hdl_port(nack->svc_handle,
1600 		    port)) == NULL) {
1601 			if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1602 				mutex_exit(&ds_svcs.lock);
1603 				return;
1604 			}
1605 		}
1606 
1607 		cmn_err(CE_WARN, "ds@%lx: <data_nack: handle 0x%llx reported "
1608 		    " as invalid" DS_EOL, PORTID(port),
1609 		    (u_longlong_t)nack->svc_handle);
1610 
1611 		(void) ds_svc_unregister(svc, svc->port);
1612 
1613 		mutex_exit(&ds_svcs.lock);
1614 	}
1615 }
1616 
1617 /* Initialize the port */
1618 void
1619 ds_send_init_req(ds_port_t *port)
1620 {
1621 	ds_hdr_t	*hdr;
1622 	ds_init_req_t	*init_req;
1623 	size_t		msglen;
1624 	ds_ver_t	*vers = &ds_vers[port->ver_idx];
1625 
1626 	mutex_enter(&port->lock);
1627 	if (port->state != DS_PORT_LDC_INIT) {
1628 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: invalid state: %d"
1629 		    DS_EOL, PORTID(port), port->state);
1630 		mutex_exit(&port->lock);
1631 		return;
1632 	}
1633 	mutex_exit(&port->lock);
1634 
1635 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: req=v%d.%d" DS_EOL,
1636 	    PORTID(port), vers->major, vers->minor);
1637 
1638 	msglen = DS_HDR_SZ + sizeof (ds_init_req_t);
1639 	hdr = DS_MALLOC(msglen);
1640 
1641 	hdr->msg_type = DS_INIT_REQ;
1642 	hdr->payload_len = sizeof (ds_init_req_t);
1643 
1644 	init_req = (ds_init_req_t *)((caddr_t)hdr + DS_HDR_SZ);
1645 	init_req->major_vers = vers->major;
1646 	init_req->minor_vers = vers->minor;
1647 
1648 	if (ds_send_msg(port, (caddr_t)hdr, msglen) == 0) {
1649 		/*
1650 		 * We've left the port state unlocked over the malloc/send,
1651 		 * make sure no one has changed the state under us before
1652 		 * we update the state.
1653 		 */
1654 		mutex_enter(&port->lock);
1655 		if (port->state == DS_PORT_LDC_INIT)
1656 			port->state = DS_PORT_INIT_REQ;
1657 		mutex_exit(&port->lock);
1658 	}
1659 	DS_FREE(hdr, msglen);
1660 }
1661 
1662 static int
1663 ds_send_reg_req(ds_svc_t *svc, ds_port_t *port)
1664 {
1665 	ds_ver_t	*ver;
1666 	ds_hdr_t	*hdr;
1667 	caddr_t		msg;
1668 	size_t		msglen;
1669 	ds_reg_req_t	*req;
1670 	size_t		idlen;
1671 	int		rv;
1672 
1673 	if ((svc->state != DS_SVC_INACTIVE) &&
1674 	    ((svc->flags & DSSF_ISCLIENT) == 0)) {
1675 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: invalid svc state (%d) "
1676 		    "for svc '%s'" DS_EOL, PORTID(port), svc->state,
1677 		    svc->cap.svc_id);
1678 		return (-1);
1679 	}
1680 
1681 	mutex_enter(&port->lock);
1682 
1683 	/* check on the LDC to Zeus */
1684 	if (port->ldc.state != LDC_UP) {
1685 		/* can not send message */
1686 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: channel %ld is not up"
1687 		    DS_EOL, PORTID(port), port->ldc.id);
1688 		mutex_exit(&port->lock);
1689 		return (-1);
1690 	}
1691 
1692 	/* make sure port is ready */
1693 	if (port->state != DS_PORT_READY) {
1694 		/* can not send message */
1695 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: port is not ready"
1696 		    DS_EOL, PORTID(port));
1697 		mutex_exit(&port->lock);
1698 		return (-1);
1699 	}
1700 
1701 	mutex_exit(&port->lock);
1702 
1703 	/* allocate the message buffer */
1704 	idlen = strlen(svc->cap.svc_id);
1705 	msglen = DS_HDR_SZ + sizeof (ds_reg_req_t) + idlen;
1706 	msg = DS_MALLOC(msglen);
1707 
1708 	/* copy in the header data */
1709 	hdr = (ds_hdr_t *)msg;
1710 	hdr->msg_type = DS_REG_REQ;
1711 	hdr->payload_len = sizeof (ds_reg_req_t) + idlen;
1712 
1713 	req = (ds_reg_req_t *)(msg + DS_HDR_SZ);
1714 	req->svc_handle = svc->hdl;
1715 	ver = &(svc->cap.vers[svc->ver_idx]);
1716 	req->major_vers = ver->major;
1717 	req->minor_vers = ver->minor;
1718 
1719 	/* copy in the service id */
1720 	(void) memcpy(req->svc_id, svc->cap.svc_id, idlen + 1);
1721 
1722 	/* send the message */
1723 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: '%s' ver=%d.%d, hdl=0x%llx"
1724 	    DS_EOL, PORTID(port), svc->cap.svc_id, ver->major, ver->minor,
1725 	    (u_longlong_t)svc->hdl);
1726 
1727 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1728 		svc->port = port;
1729 		rv = -1;
1730 	} else if ((svc->flags & DSSF_ISCLIENT) == 0) {
1731 		svc->state = DS_SVC_REG_PENDING;
1732 	}
1733 	DS_FREE(msg, msglen);
1734 
1735 	return (rv);
1736 }
1737 
1738 /*
1739  * Keep around in case we want this later
1740  */
1741 int
1742 ds_send_unreg_req(ds_svc_t *svc)
1743 {
1744 	caddr_t		msg;
1745 	size_t		msglen;
1746 	ds_hdr_t	*hdr;
1747 	ds_unreg_req_t	*req;
1748 	ds_port_t	*port = svc->port;
1749 	int		rv;
1750 
1751 	if (port == NULL) {
1752 		DS_DBG(CE_NOTE, "send_unreg_req: service '%s' not "
1753 		    "associated with a port" DS_EOL, svc->cap.svc_id);
1754 		return (-1);
1755 	}
1756 
1757 	mutex_enter(&port->lock);
1758 
1759 	/* check on the LDC to Zeus */
1760 	if (port->ldc.state != LDC_UP) {
1761 		/* can not send message */
1762 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: channel %ld is not up"
1763 		    DS_EOL, PORTID(port), port->ldc.id);
1764 		mutex_exit(&port->lock);
1765 		return (-1);
1766 	}
1767 
1768 	/* make sure port is ready */
1769 	if (port->state != DS_PORT_READY) {
1770 		/* can not send message */
1771 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: port is not ready" DS_EOL,
1772 		    PORTID(port));
1773 		mutex_exit(&port->lock);
1774 		return (-1);
1775 	}
1776 
1777 	mutex_exit(&port->lock);
1778 
1779 	msglen = DS_HDR_SZ + sizeof (ds_unreg_req_t);
1780 	msg = DS_MALLOC(msglen);
1781 
1782 	/* copy in the header data */
1783 	hdr = (ds_hdr_t *)msg;
1784 	hdr->msg_type = DS_UNREG;
1785 	hdr->payload_len = sizeof (ds_unreg_req_t);
1786 
1787 	req = (ds_unreg_req_t *)(msg + DS_HDR_SZ);
1788 	if (svc->flags & DSSF_ISCLIENT) {
1789 		req->svc_handle = svc->svc_hdl;
1790 	} else {
1791 		req->svc_handle = svc->hdl;
1792 	}
1793 
1794 	/* send the message */
1795 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_req>: '%s' hdl=0x%llx" DS_EOL,
1796 	    PORTID(port), (svc->cap.svc_id) ? svc->cap.svc_id : "NULL",
1797 	    (u_longlong_t)svc->hdl);
1798 
1799 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1800 		rv = -1;
1801 	}
1802 	DS_FREE(msg, msglen);
1803 
1804 	return (rv);
1805 }
1806 
1807 static void
1808 ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1809 {
1810 	caddr_t		msg;
1811 	size_t		msglen;
1812 	ds_hdr_t	*hdr;
1813 	ds_unreg_nack_t	*nack;
1814 
1815 	mutex_enter(&port->lock);
1816 
1817 	/* check on the LDC to Zeus */
1818 	if (port->ldc.state != LDC_UP) {
1819 		/* can not send message */
1820 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: channel %ld is not up"
1821 		    DS_EOL, PORTID(port), port->ldc.id);
1822 		mutex_exit(&port->lock);
1823 		return;
1824 	}
1825 
1826 	/* make sure port is ready */
1827 	if (port->state != DS_PORT_READY) {
1828 		/* can not send message */
1829 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: port is not ready"
1830 		    DS_EOL, PORTID(port));
1831 		mutex_exit(&port->lock);
1832 		return;
1833 	}
1834 
1835 	mutex_exit(&port->lock);
1836 
1837 	msglen = DS_HDR_SZ + sizeof (ds_unreg_nack_t);
1838 	msg = DS_MALLOC(msglen);
1839 
1840 	/* copy in the header data */
1841 	hdr = (ds_hdr_t *)msg;
1842 	hdr->msg_type = DS_UNREG_NACK;
1843 	hdr->payload_len = sizeof (ds_unreg_nack_t);
1844 
1845 	nack = (ds_unreg_nack_t *)(msg + DS_HDR_SZ);
1846 	nack->svc_handle = bad_hdl;
1847 
1848 	/* send the message */
1849 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_nack>: hdl=0x%llx" DS_EOL,
1850 	    PORTID(port), (u_longlong_t)bad_hdl);
1851 
1852 	(void) ds_send_msg(port, msg, msglen);
1853 	DS_FREE(msg, msglen);
1854 }
1855 
1856 static void
1857 ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1858 {
1859 	caddr_t		msg;
1860 	size_t		msglen;
1861 	ds_hdr_t	*hdr;
1862 	ds_data_nack_t	*nack;
1863 
1864 	mutex_enter(&port->lock);
1865 
1866 	/* check on the LDC to Zeus */
1867 	if (port->ldc.state != LDC_UP) {
1868 		/* can not send message */
1869 		cmn_err(CE_WARN, "ds@%lx: data_nack>: channel %ld is not up"
1870 		    DS_EOL, PORTID(port), port->ldc.id);
1871 		mutex_exit(&port->lock);
1872 		return;
1873 	}
1874 
1875 	/* make sure port is ready */
1876 	if (port->state != DS_PORT_READY) {
1877 		/* can not send message */
1878 		cmn_err(CE_WARN, "ds@%lx: data_nack>: port is not ready" DS_EOL,
1879 		    PORTID(port));
1880 		mutex_exit(&port->lock);
1881 		return;
1882 	}
1883 
1884 	mutex_exit(&port->lock);
1885 
1886 	msglen = DS_HDR_SZ + sizeof (ds_data_nack_t);
1887 	msg = DS_MALLOC(msglen);
1888 
1889 	/* copy in the header data */
1890 	hdr = (ds_hdr_t *)msg;
1891 	hdr->msg_type = DS_NACK;
1892 	hdr->payload_len = sizeof (ds_data_nack_t);
1893 
1894 	nack = (ds_data_nack_t *)(msg + DS_HDR_SZ);
1895 	nack->svc_handle = bad_hdl;
1896 	nack->result = DS_INV_HDL;
1897 
1898 	/* send the message */
1899 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack>: hdl=0x%llx" DS_EOL,
1900 	    PORTID(port), (u_longlong_t)bad_hdl);
1901 
1902 	(void) ds_send_msg(port, msg, msglen);
1903 	DS_FREE(msg, msglen);
1904 }
1905 
1906 /* END DS PROTOCOL SUPPORT FUNCTIONS */
1907 
1908 #ifdef DEBUG
1909 
1910 #define	BYTESPERLINE	8
1911 #define	LINEWIDTH	((BYTESPERLINE * 3) + (BYTESPERLINE + 2) + 1)
1912 #define	ASCIIOFFSET	((BYTESPERLINE * 3) + 2)
1913 #define	ISPRINT(c)	((c >= ' ') && (c <= '~'))
1914 
1915 /*
1916  * Output a buffer formatted with a set number of bytes on
1917  * each line. Append each line with the ASCII equivalent of
1918  * each byte if it falls within the printable ASCII range,
1919  * and '.' otherwise.
1920  */
1921 void
1922 ds_dump_msg(void *vbuf, size_t len)
1923 {
1924 	int	i, j;
1925 	char	*curr;
1926 	char	*aoff;
1927 	char	line[LINEWIDTH];
1928 	uint8_t	*buf = vbuf;
1929 
1930 	if (len > 128)
1931 		len = 128;
1932 
1933 	/* walk the buffer one line at a time */
1934 	for (i = 0; i < len; i += BYTESPERLINE) {
1935 
1936 		bzero(line, LINEWIDTH);
1937 
1938 		curr = line;
1939 		aoff = line + ASCIIOFFSET;
1940 
1941 		/*
1942 		 * Walk the bytes in the current line, storing
1943 		 * the hex value for the byte as well as the
1944 		 * ASCII representation in a temporary buffer.
1945 		 * All ASCII values are placed at the end of
1946 		 * the line.
1947 		 */
1948 		for (j = 0; (j < BYTESPERLINE) && ((i + j) < len); j++) {
1949 			(void) sprintf(curr, " %02x", buf[i + j]);
1950 			*aoff = (ISPRINT(buf[i + j])) ? buf[i + j] : '.';
1951 			curr += 3;
1952 			aoff++;
1953 		}
1954 
1955 		/*
1956 		 * Fill in to the start of the ASCII translation
1957 		 * with spaces. This will only be necessary if
1958 		 * this is the last line and there are not enough
1959 		 * bytes to fill the whole line.
1960 		 */
1961 		while (curr != (line + ASCIIOFFSET))
1962 			*curr++ = ' ';
1963 
1964 		cmn_err(CE_NOTE, "%s" DS_EOL, line);
1965 	}
1966 }
1967 #endif /* DEBUG */
1968 
1969 
1970 /*
1971  * Walk the table of registered services, executing the specified callback
1972  * function for each service on a port. A non-zero return value from the
1973  * callback is used to terminate the walk, not to indicate an error. Returns
1974  * the index of the last service visited.
1975  */
1976 int
1977 ds_walk_svcs(svc_cb_t svc_cb, void *arg)
1978 {
1979 	int		idx;
1980 	ds_svc_t	*svc;
1981 
1982 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
1983 
1984 	/* walk every table entry */
1985 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
1986 		svc = ds_svcs.tbl[idx];
1987 
1988 		/* execute the callback */
1989 		if ((*svc_cb)(svc, arg) != 0)
1990 			break;
1991 	}
1992 
1993 	return (idx);
1994 }
1995 
1996 static int
1997 ds_svc_isfree(ds_svc_t *svc, void *arg)
1998 {
1999 	_NOTE(ARGUNUSED(arg))
2000 
2001 	/*
2002 	 * Looking for a free service. This may be a NULL entry
2003 	 * in the table, or an unused structure that could be
2004 	 * reused.
2005 	 */
2006 
2007 	if (DS_SVC_ISFREE(svc)) {
2008 		/* yes, it is free */
2009 		return (1);
2010 	}
2011 
2012 	/* not a candidate */
2013 	return (0);
2014 }
2015 
2016 int
2017 ds_svc_ismatch(ds_svc_t *svc, void *arg)
2018 {
2019 	if (DS_SVC_ISFREE(svc)) {
2020 		return (0);
2021 	}
2022 
2023 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2024 	    (svc->flags & DSSF_ISCLIENT) == 0) {
2025 		/* found a match */
2026 		return (1);
2027 	}
2028 
2029 	return (0);
2030 }
2031 
2032 int
2033 ds_svc_clnt_ismatch(ds_svc_t *svc, void *arg)
2034 {
2035 	if (DS_SVC_ISFREE(svc)) {
2036 		return (0);
2037 	}
2038 
2039 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2040 	    (svc->flags & DSSF_ISCLIENT) != 0) {
2041 		/* found a match */
2042 		return (1);
2043 	}
2044 
2045 	return (0);
2046 }
2047 
2048 int
2049 ds_svc_free(ds_svc_t *svc, void *arg)
2050 {
2051 	_NOTE(ARGUNUSED(arg))
2052 
2053 	if (svc == NULL) {
2054 		return (0);
2055 	}
2056 
2057 	if (svc->cap.svc_id) {
2058 		DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
2059 		svc->cap.svc_id = NULL;
2060 	}
2061 
2062 	if (svc->cap.vers) {
2063 		DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
2064 		svc->cap.vers = NULL;
2065 	}
2066 
2067 	DS_FREE(svc, sizeof (ds_svc_t));
2068 
2069 	return (0);
2070 }
2071 
2072 static int
2073 ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port)
2074 {
2075 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2076 
2077 	if (DS_SVC_ISFREE(svc))
2078 		return (0);
2079 
2080 	if (!DS_PORT_IN_SET(svc->avail, PORTID(port)))
2081 		return (0);
2082 
2083 	DS_PORTSET_ADD(svc->tried, PORTID(port));
2084 
2085 	if (ds_send_reg_req(svc, port) == 0) {
2086 		/* register sent successfully */
2087 		return (1);
2088 	}
2089 
2090 	if ((svc->flags & DSSF_ISCLIENT) == 0) {
2091 		/* reset the service */
2092 		ds_reset_svc(svc, port);
2093 	}
2094 	return (0);
2095 }
2096 
2097 int
2098 ds_svc_register(ds_svc_t *svc, void *arg)
2099 {
2100 	_NOTE(ARGUNUSED(arg))
2101 	ds_portset_t ports;
2102 	ds_port_t *port;
2103 	int	idx;
2104 
2105 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2106 
2107 	if (DS_SVC_ISFREE(svc))
2108 		return (0);
2109 
2110 	DS_PORTSET_DUP(ports, svc->avail);
2111 	if (svc->flags & DSSF_ISCLIENT) {
2112 		ds_portset_del_active_clients(svc->cap.svc_id, &ports);
2113 	} else if (svc->state != DS_SVC_INACTIVE)
2114 		return (0);
2115 
2116 	if (DS_PORTSET_ISNULL(ports))
2117 		return (0);
2118 
2119 	/*
2120 	 * Attempt to register the service. Start with the lowest
2121 	 * numbered port and continue until a registration message
2122 	 * is sent successfully, or there are no ports left to try.
2123 	 */
2124 	for (idx = 0; idx < DS_MAX_PORTS; idx++) {
2125 
2126 		/*
2127 		 * If the port is not in the available list,
2128 		 * it is not a candidate for registration.
2129 		 */
2130 		if (!DS_PORT_IN_SET(ports, idx)) {
2131 			continue;
2132 		}
2133 
2134 		port = &ds_ports[idx];
2135 		if (ds_svc_register_onport(svc, port)) {
2136 			if ((svc->flags & DSSF_ISCLIENT) == 0)
2137 				break;
2138 			DS_PORTSET_DEL(svc->avail, idx);
2139 		}
2140 	}
2141 
2142 	return (0);
2143 }
2144 
2145 static int
2146 ds_svc_unregister(ds_svc_t *svc, void *arg)
2147 {
2148 	ds_port_t *port = (ds_port_t *)arg;
2149 	ds_svc_hdl_t hdl;
2150 
2151 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2152 
2153 	if (DS_SVC_ISFREE(svc)) {
2154 		return (0);
2155 	}
2156 
2157 	/* make sure the service is using this port */
2158 	if (svc->port != port) {
2159 		return (0);
2160 	}
2161 
2162 	if (port) {
2163 		DS_DBG(CE_NOTE, "ds@%lx: svc_unreg: id='%s', ver=%d.%d, "
2164 		    " hdl=0x%09lx" DS_EOL, PORTID(port), svc->cap.svc_id,
2165 		    svc->ver.major, svc->ver.minor, svc->hdl);
2166 	} else {
2167 		DS_DBG(CE_NOTE, "port=NULL: svc_unreg: id='%s', ver=%d.%d, "
2168 		    " hdl=0x%09lx" DS_EOL, svc->cap.svc_id, svc->ver.major,
2169 		    svc->ver.minor, svc->hdl);
2170 	}
2171 
2172 	/* reset the service structure */
2173 	ds_reset_svc(svc, port);
2174 
2175 	/* call the client unregister callback */
2176 	if (svc->ops.ds_unreg_cb) {
2177 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
2178 	}
2179 
2180 	/* increment the count in the handle to prevent reuse */
2181 	hdl = DS_ALLOC_HDL(DS_HDL2IDX(svc->hdl), DS_HDL2COUNT(svc->hdl));
2182 	if (DS_HDL_ISCLIENT(svc->hdl)) {
2183 		DS_HDL_SET_ISCLIENT(hdl);
2184 	}
2185 	svc->hdl = hdl;
2186 
2187 	if (svc->state != DS_SVC_UNREG_PENDING) {
2188 		/* try to initiate a new registration */
2189 		(void) ds_svc_register(svc, NULL);
2190 	}
2191 
2192 	return (0);
2193 }
2194 
2195 static int
2196 ds_svc_port_up(ds_svc_t *svc, void *arg)
2197 {
2198 	ds_port_t *port = (ds_port_t *)arg;
2199 
2200 	if (DS_SVC_ISFREE(svc)) {
2201 		/* nothing to do */
2202 		return (0);
2203 	}
2204 
2205 	DS_PORTSET_ADD(svc->avail, port->id);
2206 	DS_PORTSET_DEL(svc->tried, port->id);
2207 
2208 	return (0);
2209 }
2210 
2211 ds_svc_t *
2212 ds_alloc_svc(void)
2213 {
2214 	int		idx;
2215 	uint_t		newmaxsvcs;
2216 	ds_svc_t	**newtbl;
2217 	ds_svc_t	*newsvc;
2218 
2219 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2220 
2221 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2222 
2223 	if (idx != ds_svcs.maxsvcs) {
2224 		goto found;
2225 	}
2226 
2227 	/*
2228 	 * There was no free space in the table. Grow
2229 	 * the table to double its current size.
2230 	 */
2231 	newmaxsvcs = ds_svcs.maxsvcs * 2;
2232 	newtbl = DS_MALLOC(newmaxsvcs * sizeof (ds_svc_t *));
2233 
2234 	/* copy old table data to the new table */
2235 	(void) memcpy(newtbl, ds_svcs.tbl,
2236 	    ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2237 
2238 	/* clean up the old table */
2239 	DS_FREE(ds_svcs.tbl, ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2240 	ds_svcs.tbl = newtbl;
2241 	ds_svcs.maxsvcs = newmaxsvcs;
2242 
2243 	/* search for a free space again */
2244 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2245 
2246 	/* the table is locked so should find a free slot */
2247 	ASSERT(idx != ds_svcs.maxsvcs);
2248 
2249 found:
2250 	/* allocate a new svc structure if necessary */
2251 	if ((newsvc = ds_svcs.tbl[idx]) == NULL) {
2252 		/* allocate a new service */
2253 		newsvc = DS_MALLOC(sizeof (ds_svc_t));
2254 		ds_svcs.tbl[idx] = newsvc;
2255 	}
2256 
2257 	/* fill in the handle */
2258 	newsvc->hdl = DS_ALLOC_HDL(idx, DS_HDL2COUNT(newsvc->hdl));
2259 	newsvc->state = DS_SVC_FREE;	/* Mark as free temporarily */
2260 
2261 	return (newsvc);
2262 }
2263 
2264 static void
2265 ds_reset_svc(ds_svc_t *svc, ds_port_t *port)
2266 {
2267 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2268 
2269 	if (svc->state != DS_SVC_UNREG_PENDING)
2270 		svc->state = DS_SVC_INACTIVE;
2271 	svc->ver_idx = 0;
2272 	svc->ver.major = 0;
2273 	svc->ver.minor = 0;
2274 	svc->port = NULL;
2275 	if (port) {
2276 		DS_PORTSET_DEL(svc->avail, port->id);
2277 	}
2278 }
2279 
2280 ds_svc_t *
2281 ds_get_svc(ds_svc_hdl_t hdl)
2282 {
2283 	int		idx;
2284 	ds_svc_t	*svc;
2285 
2286 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2287 
2288 	if (hdl == DS_INVALID_HDL)
2289 		return (NULL);
2290 
2291 	idx = DS_HDL2IDX(hdl);
2292 
2293 	/* check if index is out of bounds */
2294 	if ((idx < 0) || (idx >= ds_svcs.maxsvcs))
2295 		return (NULL);
2296 
2297 	svc = ds_svcs.tbl[idx];
2298 
2299 	/* check for a valid service */
2300 	if (DS_SVC_ISFREE(svc))
2301 		return (NULL);
2302 
2303 	/* make sure the handle is an exact match */
2304 	if (svc->hdl != hdl)
2305 		return (NULL);
2306 
2307 	return (svc);
2308 }
2309 
2310 static void
2311 ds_port_reset(ds_port_t *port)
2312 {
2313 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2314 	ASSERT(MUTEX_HELD(&port->lock));
2315 
2316 	/* connection went down, mark everything inactive */
2317 	(void) ds_walk_svcs(ds_svc_unregister, port);
2318 
2319 	port->ver_idx = 0;
2320 	port->ver.major = 0;
2321 	port->ver.minor = 0;
2322 	port->state = DS_PORT_LDC_INIT;
2323 }
2324 
2325 /*
2326  * Verify that a version array is sorted as expected for the
2327  * version negotiation to work correctly.
2328  */
2329 ds_vers_check_t
2330 ds_vers_isvalid(ds_ver_t *vers, int nvers)
2331 {
2332 	uint16_t	curr_major;
2333 	uint16_t	curr_minor;
2334 	int		idx;
2335 
2336 	curr_major = vers[0].major;
2337 	curr_minor = vers[0].minor;
2338 
2339 	/*
2340 	 * Walk the version array, verifying correct ordering.
2341 	 * The array must be sorted from highest supported
2342 	 * version to lowest supported version.
2343 	 */
2344 	for (idx = 0; idx < nvers; idx++) {
2345 		if (vers[idx].major > curr_major) {
2346 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2347 			    " increasing major versions" DS_EOL);
2348 			return (DS_VERS_INCREASING_MAJOR_ERR);
2349 		}
2350 
2351 		if (vers[idx].major < curr_major) {
2352 			curr_major = vers[idx].major;
2353 			curr_minor = vers[idx].minor;
2354 			continue;
2355 		}
2356 
2357 		if (vers[idx].minor > curr_minor) {
2358 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2359 			    " increasing minor versions" DS_EOL);
2360 			return (DS_VERS_INCREASING_MINOR_ERR);
2361 		}
2362 
2363 		curr_minor = vers[idx].minor;
2364 	}
2365 
2366 	return (DS_VERS_OK);
2367 }
2368 
2369 /*
2370  * Extended user capability init.
2371  */
2372 int
2373 ds_ucap_init(ds_capability_t *cap, ds_clnt_ops_t *ops, uint32_t flags,
2374     int instance, ds_svc_hdl_t *hdlp)
2375 {
2376 	ds_vers_check_t	status;
2377 	ds_svc_t	*svc;
2378 	int		rv = 0;
2379 	ds_svc_hdl_t 	lb_hdl, hdl;
2380 	int		is_loopback;
2381 	int		is_client;
2382 
2383 	/* sanity check the args */
2384 	if ((cap == NULL) || (ops == NULL)) {
2385 		cmn_err(CE_NOTE, "%s: invalid arguments" DS_EOL, __func__);
2386 		return (EINVAL);
2387 	}
2388 
2389 	/* sanity check the capability specifier */
2390 	if ((cap->svc_id == NULL) || (cap->vers == NULL) || (cap->nvers == 0)) {
2391 		cmn_err(CE_NOTE, "%s: invalid capability specifier" DS_EOL,
2392 		    __func__);
2393 		return (EINVAL);
2394 	}
2395 
2396 	/* sanity check the version array */
2397 	if ((status = ds_vers_isvalid(cap->vers, cap->nvers)) != DS_VERS_OK) {
2398 		cmn_err(CE_NOTE, "%s: invalid capability version array "
2399 		    "for %s service: %s" DS_EOL, __func__, cap->svc_id,
2400 		    (status == DS_VERS_INCREASING_MAJOR_ERR) ?
2401 		    "increasing major versions" :
2402 		    "increasing minor versions");
2403 		return (EINVAL);
2404 	}
2405 
2406 	/* data and register callbacks are required */
2407 	if ((ops->ds_data_cb == NULL) || (ops->ds_reg_cb == NULL)) {
2408 		cmn_err(CE_NOTE, "%s: invalid ops specifier for %s service"
2409 		    DS_EOL, __func__, cap->svc_id);
2410 		return (EINVAL);
2411 	}
2412 
2413 	flags &= DSSF_USERFLAGS;
2414 	is_client = flags & DSSF_ISCLIENT;
2415 
2416 	DS_DBG_USR(CE_NOTE, "%s: svc_id='%s', data_cb=0x%lx, cb_arg=0x%lx"
2417 	    DS_EOL, __func__, cap->svc_id, PTR_TO_LONG(ops->ds_data_cb),
2418 	    PTR_TO_LONG(ops->cb_arg));
2419 
2420 	mutex_enter(&ds_svcs.lock);
2421 
2422 	/* check if the service is already registered */
2423 	if (i_ds_hdl_lookup(cap->svc_id, is_client, NULL, 1) == 1) {
2424 		/* already registered */
2425 		cmn_err(CE_NOTE, "Service '%s'/%s already registered" DS_EOL,
2426 		    cap->svc_id,
2427 		    (flags & DSSF_ISCLIENT) ? "client" : "service");
2428 		mutex_exit(&ds_svcs.lock);
2429 		return (EALREADY);
2430 	}
2431 
2432 	svc = ds_alloc_svc();
2433 	if (is_client) {
2434 		DS_HDL_SET_ISCLIENT(svc->hdl);
2435 	}
2436 
2437 	svc->state = DS_SVC_FREE;
2438 	svc->svc_hdl = DS_BADHDL1;
2439 
2440 	svc->flags = flags;
2441 	svc->drvi = instance;
2442 	svc->drv_psp = NULL;
2443 
2444 	/*
2445 	 * Check for loopback.  "pri" is a legacy service that assumes it
2446 	 * will never use loopback mode.
2447 	 */
2448 	if (strcmp(cap->svc_id, "pri") == 0) {
2449 		is_loopback = 0;
2450 	} else if (i_ds_hdl_lookup(cap->svc_id, is_client == 0, &lb_hdl, 1)
2451 	    == 1) {
2452 		if ((rv = ds_loopback_set_svc(svc, cap, &lb_hdl)) != 0) {
2453 			DS_DBG_USR(CE_NOTE, "%s: ds_loopback_set_svc '%s' err "
2454 			    " (%d)" DS_EOL, __func__, cap->svc_id, rv);
2455 			mutex_exit(&ds_svcs.lock);
2456 			return (rv);
2457 		}
2458 		is_loopback = 1;
2459 	} else
2460 		is_loopback = 0;
2461 
2462 	/* copy over all the client information */
2463 	(void) memcpy(&svc->cap, cap, sizeof (ds_capability_t));
2464 
2465 	/* make a copy of the service name */
2466 	svc->cap.svc_id = ds_strdup(cap->svc_id);
2467 
2468 	/* make a copy of the version array */
2469 	svc->cap.vers = DS_MALLOC(cap->nvers * sizeof (ds_ver_t));
2470 	(void) memcpy(svc->cap.vers, cap->vers, cap->nvers * sizeof (ds_ver_t));
2471 
2472 	/* copy the client ops vector */
2473 	(void) memcpy(&svc->ops, ops, sizeof (ds_clnt_ops_t));
2474 
2475 	svc->state = DS_SVC_INACTIVE;
2476 	svc->ver_idx = 0;
2477 	DS_PORTSET_DUP(svc->avail, ds_allports);
2478 	DS_PORTSET_SETNULL(svc->tried);
2479 
2480 	ds_svcs.nsvcs++;
2481 
2482 	hdl = svc->hdl;
2483 
2484 	/*
2485 	 * kludge to allow user callback code to get handle and user args.
2486 	 * Make sure the callback arg points to the svc structure.
2487 	 */
2488 	if ((flags & DSSF_ISUSER) != 0) {
2489 		ds_cbarg_set_cookie(svc);
2490 	}
2491 
2492 	if (is_loopback) {
2493 		ds_loopback_register(hdl);
2494 		ds_loopback_register(lb_hdl);
2495 	}
2496 
2497 	/*
2498 	 * If this is a client or a non-loopback service provider, send
2499 	 * out register requests.
2500 	 */
2501 	if (!is_loopback || (flags & DSSF_ISCLIENT) != 0)
2502 		(void) ds_svc_register(svc, NULL);
2503 
2504 	if (hdlp) {
2505 		*hdlp = hdl;
2506 	}
2507 
2508 	mutex_exit(&ds_svcs.lock);
2509 
2510 	DS_DBG_USR(CE_NOTE, "%s: service '%s' assigned handle 0x%09lx" DS_EOL,
2511 	    __func__, svc->cap.svc_id, hdl);
2512 
2513 	return (0);
2514 }
2515 
2516 /*
2517  * ds_cap_init interface for previous revision.
2518  */
2519 int
2520 ds_cap_init(ds_capability_t *cap, ds_clnt_ops_t *ops)
2521 {
2522 	return (ds_ucap_init(cap, ops, 0, DS_INVALID_INSTANCE, NULL));
2523 }
2524 
2525 /*
2526  * Interface for ds_unreg_hdl in lds driver.
2527  */
2528 int
2529 ds_unreg_hdl(ds_svc_hdl_t hdl)
2530 {
2531 	ds_svc_t	*svc;
2532 	int		is_loopback;
2533 	ds_svc_hdl_t	lb_hdl;
2534 
2535 	DS_DBG_USR(CE_NOTE, "%s: hdl=0x%09lx" DS_EOL, __func__, hdl);
2536 
2537 	mutex_enter(&ds_svcs.lock);
2538 	if ((svc = ds_get_svc(hdl)) == NULL) {
2539 		mutex_exit(&ds_svcs.lock);
2540 		DS_DBG_USR(CE_NOTE, "%s: unknown hdl: 0x%llx" DS_EOL, __func__,
2541 		    (u_longlong_t)hdl);
2542 		return (ENXIO);
2543 	}
2544 
2545 	DS_DBG_USR(CE_NOTE, "%s: svcid='%s', hdl=0x%llx" DS_EOL, __func__,
2546 	    svc->cap.svc_id, (u_longlong_t)svc->hdl);
2547 
2548 	svc->state = DS_SVC_UNREG_PENDING;
2549 
2550 	is_loopback = ((svc->flags & DSSF_LOOPBACK) != 0);
2551 	lb_hdl = svc->svc_hdl;
2552 
2553 	if (svc->port) {
2554 		(void) ds_send_unreg_req(svc);
2555 	}
2556 
2557 	(void) ds_svc_unregister(svc, svc->port);
2558 
2559 	ds_delete_svc_entry(svc);
2560 
2561 	if (is_loopback) {
2562 		ds_loopback_unregister(lb_hdl);
2563 	}
2564 
2565 	mutex_exit(&ds_svcs.lock);
2566 
2567 	return (0);
2568 }
2569 
2570 int
2571 ds_cap_fini(ds_capability_t *cap)
2572 {
2573 	ds_svc_hdl_t	hdl;
2574 	int rv;
2575 	uint_t nhdls = 0;
2576 
2577 	DS_DBG(CE_NOTE, "%s: '%s'" DS_EOL, __func__, cap->svc_id);
2578 	if ((rv = ds_hdl_lookup(cap->svc_id, 0, &hdl, 1, &nhdls)) != 0) {
2579 		DS_DBG(CE_NOTE, "%s: ds_hdl_lookup '%s' err (%d)" DS_EOL,
2580 		    __func__, cap->svc_id, rv);
2581 		return (rv);
2582 	}
2583 
2584 	if (nhdls == 0) {
2585 		DS_DBG(CE_NOTE, "%s: no such service '%s'" DS_EOL,
2586 		    __func__, cap->svc_id);
2587 		return (ENXIO);
2588 	}
2589 
2590 	if ((rv = ds_is_my_hdl(hdl, DS_INVALID_INSTANCE)) != 0) {
2591 		DS_DBG(CE_NOTE, "%s: ds_is_my_handle err (%d)" DS_EOL, __func__,
2592 		    rv);
2593 		return (rv);
2594 	}
2595 
2596 	if ((rv = ds_unreg_hdl(hdl)) != 0) {
2597 		DS_DBG(CE_NOTE, "%s: ds_unreg_hdl err (%d)" DS_EOL, __func__,
2598 		    rv);
2599 		return (rv);
2600 	}
2601 
2602 	return (0);
2603 }
2604 
2605 int
2606 ds_cap_send(ds_svc_hdl_t hdl, void *buf, size_t len)
2607 {
2608 	int		rv;
2609 	ds_hdr_t	*hdr;
2610 	caddr_t		msg;
2611 	size_t		msglen;
2612 	size_t		hdrlen;
2613 	caddr_t		payload;
2614 	ds_svc_t	*svc;
2615 	ds_port_t	*port;
2616 	ds_data_handle_t *data;
2617 	ds_svc_hdl_t	svc_hdl;
2618 	int		is_client = 0;
2619 
2620 	DS_DBG(CE_NOTE, "%s: hdl: 0x%llx, buf: %lx, len: %ld" DS_EOL, __func__,
2621 	    (u_longlong_t)hdl, (ulong_t)buf, len);
2622 
2623 	mutex_enter(&ds_svcs.lock);
2624 
2625 	if ((svc = ds_get_svc(hdl)) == NULL) {
2626 		cmn_err(CE_WARN, "%s: invalid handle 0x%llx" DS_EOL, __func__,
2627 		    (u_longlong_t)hdl);
2628 		mutex_exit(&ds_svcs.lock);
2629 		return (ENXIO);
2630 	}
2631 
2632 	if (svc->state != DS_SVC_ACTIVE) {
2633 		/* channel is up, but svc is not registered */
2634 		DS_DBG(CE_NOTE, "%s: invalid service state 0x%x" DS_EOL,
2635 		    __func__, svc->state);
2636 		mutex_exit(&ds_svcs.lock);
2637 		return (ENOTCONN);
2638 	}
2639 
2640 	if (svc->flags & DSSF_LOOPBACK) {
2641 		hdl = svc->svc_hdl;
2642 		mutex_exit(&ds_svcs.lock);
2643 		ds_loopback_send(hdl, buf, len);
2644 		return (0);
2645 	}
2646 
2647 	if ((port = svc->port) == NULL) {
2648 		DS_DBG(CE_NOTE, "%s: service '%s' not associated with a port"
2649 		    DS_EOL, __func__, svc->cap.svc_id);
2650 		mutex_exit(&ds_svcs.lock);
2651 		return (ECONNRESET);
2652 	}
2653 
2654 	if (svc->flags & DSSF_ISCLIENT) {
2655 		is_client = 1;
2656 		svc_hdl = svc->svc_hdl;
2657 	}
2658 
2659 	mutex_exit(&ds_svcs.lock);
2660 
2661 	/* check that the LDC channel is ready */
2662 	if (port->ldc.state != LDC_UP) {
2663 		DS_DBG(CE_NOTE, "%s: LDC channel is not up" DS_EOL, __func__);
2664 		return (ECONNRESET);
2665 	}
2666 
2667 	hdrlen = DS_HDR_SZ + sizeof (ds_data_handle_t);
2668 
2669 	msg = DS_MALLOC(len + hdrlen);
2670 	hdr = (ds_hdr_t *)msg;
2671 	payload = msg + hdrlen;
2672 	msglen = len + hdrlen;
2673 
2674 	hdr->payload_len = len + sizeof (ds_data_handle_t);
2675 	hdr->msg_type = DS_DATA;
2676 
2677 	data = (ds_data_handle_t *)(msg + DS_HDR_SZ);
2678 	if (is_client) {
2679 		data->svc_handle = svc_hdl;
2680 	} else {
2681 		data->svc_handle = hdl;
2682 	}
2683 
2684 	if ((buf != NULL) && (len != 0)) {
2685 		(void) memcpy(payload, buf, len);
2686 	}
2687 
2688 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data>: hdl=0x%llx, len=%ld, "
2689 	    " payload_len=%d" DS_EOL, PORTID(port), (u_longlong_t)svc->hdl,
2690 	    msglen, hdr->payload_len);
2691 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msglen);
2692 
2693 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
2694 		rv = (rv == EIO) ? ECONNRESET : rv;
2695 	}
2696 	DS_FREE(msg, msglen);
2697 
2698 	return (rv);
2699 }
2700 
2701 void
2702 ds_port_common_init(ds_port_t *port)
2703 {
2704 	int rv;
2705 
2706 	if ((port->flags & DS_PORT_MUTEX_INITED) == 0) {
2707 		mutex_init(&port->lock, NULL, MUTEX_DRIVER, NULL);
2708 		mutex_init(&port->tx_lock, NULL, MUTEX_DRIVER, NULL);
2709 		mutex_init(&port->rcv_lock, NULL, MUTEX_DRIVER, NULL);
2710 		port->flags |= DS_PORT_MUTEX_INITED;
2711 	}
2712 
2713 	port->state = DS_PORT_INIT;
2714 	DS_PORTSET_ADD(ds_allports, port->id);
2715 
2716 	ds_sys_port_init(port);
2717 
2718 	mutex_enter(&port->lock);
2719 	rv = ds_ldc_init(port);
2720 	mutex_exit(&port->lock);
2721 
2722 	/*
2723 	 * If LDC successfully init'ed, try to kick off protocol for this port.
2724 	 */
2725 	if (rv == 0) {
2726 		ds_handle_up_event(port);
2727 	}
2728 }
2729 
2730 void
2731 ds_port_common_fini(ds_port_t *port, int is_fini)
2732 {
2733 	port->state = DS_PORT_FREE;
2734 
2735 	if (is_fini && (port->flags & DS_PORT_MUTEX_INITED) != 0) {
2736 		mutex_destroy(&port->lock);
2737 		mutex_destroy(&port->tx_lock);
2738 		mutex_destroy(&port->rcv_lock);
2739 		port->flags &= ~DS_PORT_MUTEX_INITED;
2740 	}
2741 
2742 	DS_PORTSET_DEL(ds_allports, port->id);
2743 
2744 	ds_sys_port_fini(port);
2745 }
2746 
2747 /*
2748  * Initialize table of registered service classes
2749  */
2750 void
2751 ds_init_svcs_tbl(uint_t nentries)
2752 {
2753 	int	tblsz;
2754 
2755 	ds_svcs.maxsvcs = nentries;
2756 
2757 	tblsz = ds_svcs.maxsvcs * sizeof (ds_svc_t *);
2758 	ds_svcs.tbl = (ds_svc_t **)DS_MALLOC(tblsz);
2759 
2760 	ds_svcs.nsvcs = 0;
2761 }
2762 
2763 /*
2764  * Find the max and min version supported.
2765  * Hacked from zeus workspace, support.c
2766  */
2767 static void
2768 min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
2769     uint16_t *min_major, uint16_t *max_major)
2770 {
2771 	int i;
2772 
2773 	*min_major = sup_versionsp[0].major;
2774 	*max_major = *min_major;
2775 
2776 	for (i = 1; i < num_versions; i++) {
2777 		if (sup_versionsp[i].major < *min_major)
2778 			*min_major = sup_versionsp[i].major;
2779 
2780 		if (sup_versionsp[i].major > *max_major)
2781 			*max_major = sup_versionsp[i].major;
2782 	}
2783 }
2784 
2785 /*
2786  * Check whether the major and minor numbers requested by the peer can be
2787  * satisfied. If the requested major is supported, true is returned, and the
2788  * agreed minor is returned in new_minor. If the requested major is not
2789  * supported, the routine returns false, and the closest major is returned in
2790  * *new_major, upon which the peer should re-negotiate. The closest major is
2791  * the just lower that the requested major number.
2792  *
2793  * Hacked from zeus workspace, support.c
2794  */
2795 boolean_t
2796 negotiate_version(int num_versions, ds_ver_t *sup_versionsp,
2797     uint16_t req_major, uint16_t *new_majorp, uint16_t *new_minorp)
2798 {
2799 	int i;
2800 	uint16_t major, lower_major;
2801 	uint16_t min_major = 0, max_major;
2802 	boolean_t found_match = B_FALSE;
2803 
2804 	min_max_versions(num_versions, sup_versionsp, &min_major, &max_major);
2805 
2806 	DS_DBG(CE_NOTE, "negotiate_version: req_major = %u, min = %u, max = %u"
2807 	    DS_EOL, req_major, min_major, max_major);
2808 
2809 	/*
2810 	 * If the minimum version supported is greater than
2811 	 * the version requested, return the lowest version
2812 	 * supported
2813 	 */
2814 	if (min_major > req_major) {
2815 		*new_majorp = min_major;
2816 		return (B_FALSE);
2817 	}
2818 
2819 	/*
2820 	 * If the largest version supported is lower than
2821 	 * the version requested, return the largest version
2822 	 * supported
2823 	 */
2824 	if (max_major < req_major) {
2825 		*new_majorp = max_major;
2826 		return (B_FALSE);
2827 	}
2828 
2829 	/*
2830 	 * Now we know that the requested version lies between the
2831 	 * min and max versions supported. Check if the requested
2832 	 * major can be found in supported versions.
2833 	 */
2834 	lower_major = min_major;
2835 	for (i = 0; i < num_versions; i++) {
2836 		major = sup_versionsp[i].major;
2837 		if (major == req_major) {
2838 			found_match = B_TRUE;
2839 			*new_majorp = req_major;
2840 			*new_minorp = sup_versionsp[i].minor;
2841 			break;
2842 		} else {
2843 			if ((major < req_major) && (major > lower_major))
2844 				lower_major = major;
2845 		}
2846 	}
2847 
2848 	/*
2849 	 * If no match is found, return the closest available number
2850 	 */
2851 	if (!found_match)
2852 		*new_majorp = lower_major;
2853 
2854 	return (found_match);
2855 }
2856 
2857 /*
2858  * Specific errno's that are used by ds.c and ldc.c
2859  */
2860 static struct {
2861 	int ds_errno;
2862 	char *estr;
2863 } ds_errno_to_str_tab[] = {
2864 	{ EIO,		"I/O error" },
2865 	{ ENXIO,	"No such device or address" },
2866 	{ EAGAIN,	"Resource temporarily unavailable" },
2867 	{ ENOMEM,	"Not enough space" },
2868 	{ EACCES,	"Permission denied" },
2869 	{ EFAULT,	"Bad address" },
2870 	{ EBUSY,	"Device busy" },
2871 	{ EINVAL,	"Invalid argument" },
2872 	{ ENOSPC,	"No space left on device" },
2873 	{ ENOMSG,	"No message of desired type" },
2874 #ifdef	ECHRNG
2875 	{ ECHRNG,	"Channel number out of range" },
2876 #endif
2877 	{ ENOTSUP,	"Operation not supported" },
2878 	{ EMSGSIZE,	"Message too long" },
2879 	{ EADDRINUSE,	"Address already in use" },
2880 	{ ECONNRESET,	"Connection reset by peer" },
2881 	{ ENOBUFS,	"No buffer space available" },
2882 	{ ENOTCONN,	"Socket is not connected" },
2883 	{ ECONNREFUSED,	"Connection refused" },
2884 	{ EALREADY,	"Operation already in progress" },
2885 	{ 0,		NULL },
2886 };
2887 
2888 char *
2889 ds_errno_to_str(int ds_errno, char *ebuf)
2890 {
2891 	int i, en;
2892 
2893 	for (i = 0; (en = ds_errno_to_str_tab[i].ds_errno) != 0; i++) {
2894 		if (en == ds_errno) {
2895 			(void) strcpy(ebuf, ds_errno_to_str_tab[i].estr);
2896 			return (ebuf);
2897 		}
2898 	}
2899 
2900 	(void) sprintf(ebuf, "ds_errno (%d)", ds_errno);
2901 	return (ebuf);
2902 }
2903 
2904 static void
2905 ds_loopback_register(ds_svc_hdl_t hdl)
2906 {
2907 	ds_ver_t ds_ver;
2908 	ds_svc_t *svc;
2909 
2910 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2911 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2912 	    (u_longlong_t)hdl);
2913 	if ((svc = ds_get_svc(hdl)) == NULL) {
2914 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2915 		    (u_longlong_t)hdl);
2916 		return;
2917 	}
2918 
2919 	svc->state = DS_SVC_ACTIVE;
2920 
2921 	if (svc->ops.ds_reg_cb) {
2922 		DS_DBG_LOOP(CE_NOTE, "%s: loopback regcb: hdl: 0x%llx" DS_EOL,
2923 		    __func__, (u_longlong_t)hdl);
2924 		ds_ver.major = svc->ver.major;
2925 		ds_ver.minor = svc->ver.minor;
2926 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &ds_ver, hdl);
2927 	}
2928 }
2929 
2930 static void
2931 ds_loopback_unregister(ds_svc_hdl_t hdl)
2932 {
2933 	ds_svc_t *svc;
2934 
2935 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2936 	if ((svc = ds_get_svc(hdl)) == NULL) {
2937 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2938 		    (u_longlong_t)hdl);
2939 		return;
2940 	}
2941 
2942 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2943 	    (u_longlong_t)hdl);
2944 
2945 	svc->flags &= ~DSSF_LOOPBACK;
2946 	svc->svc_hdl = DS_BADHDL2;
2947 	svc->state = DS_SVC_INACTIVE;
2948 
2949 	if (svc->ops.ds_unreg_cb) {
2950 		DS_DBG_LOOP(CE_NOTE, "%s: loopback unregcb: hdl: 0x%llx" DS_EOL,
2951 		    __func__, (u_longlong_t)hdl);
2952 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
2953 	}
2954 }
2955 
2956 static void
2957 ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen)
2958 {
2959 	ds_svc_t *svc;
2960 
2961 	mutex_enter(&ds_svcs.lock);
2962 	if ((svc = ds_get_svc(hdl)) == NULL) {
2963 		mutex_exit(&ds_svcs.lock);
2964 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
2965 		    (u_longlong_t)hdl);
2966 		return;
2967 	}
2968 	mutex_exit(&ds_svcs.lock);
2969 
2970 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
2971 	    (u_longlong_t)hdl);
2972 
2973 	if (svc->ops.ds_data_cb) {
2974 		DS_DBG_LOOP(CE_NOTE, "%s: loopback datacb hdl: 0x%llx" DS_EOL,
2975 		    __func__, (u_longlong_t)hdl);
2976 		(*svc->ops.ds_data_cb)(svc->ops.cb_arg, buf, buflen);
2977 	}
2978 }
2979 
2980 static int
2981 ds_loopback_set_svc(ds_svc_t *svc, ds_capability_t *cap, ds_svc_hdl_t *lb_hdlp)
2982 {
2983 	ds_svc_t *lb_svc;
2984 	ds_svc_hdl_t lb_hdl = *lb_hdlp;
2985 	int i;
2986 	int match = 0;
2987 	uint16_t new_major;
2988 	uint16_t new_minor;
2989 
2990 	if ((lb_svc = ds_get_svc(lb_hdl)) == NULL) {
2991 		DS_DBG_LOOP(CE_NOTE, "%s: loopback: hdl: 0x%llx invalid" DS_EOL,
2992 		    __func__, (u_longlong_t)lb_hdl);
2993 		return (ENXIO);
2994 	}
2995 
2996 	/* negotiate a version between loopback services, if possible */
2997 	for (i = 0; i < lb_svc->cap.nvers && match == 0; i++) {
2998 		match = negotiate_version(cap->nvers, cap->vers,
2999 		    lb_svc->cap.vers[i].major, &new_major, &new_minor);
3000 	}
3001 	if (!match) {
3002 		DS_DBG_LOOP(CE_NOTE, "%s: loopback version negotiate failed"
3003 		    DS_EOL, __func__);
3004 		return (ENOTSUP);
3005 	}
3006 	if (lb_svc->state != DS_SVC_INACTIVE) {
3007 		DS_DBG_LOOP(CE_NOTE, "%s: loopback active: hdl: 0x%llx"
3008 		    DS_EOL, __func__, (u_longlong_t)lb_hdl);
3009 		if ((lb_svc->flags & DSSF_ISCLIENT) == 0) {
3010 			DS_DBG_LOOP(CE_NOTE, "%s: loopback busy hdl: 0x%llx"
3011 			    DS_EOL, __func__, (u_longlong_t)lb_hdl);
3012 			return (EBUSY);
3013 		}
3014 		svc->state = DS_SVC_INACTIVE;	/* prevent alloc'ing svc */
3015 		lb_svc = ds_svc_clone(lb_svc);
3016 		DS_DBG_LOOP(CE_NOTE, "%s: loopback clone: ohdl: 0x%llx "
3017 		    "nhdl: 0x%llx" DS_EOL, __func__, (u_longlong_t)lb_hdl,
3018 		    (u_longlong_t)lb_svc->hdl);
3019 		*lb_hdlp = lb_svc->hdl;
3020 	}
3021 
3022 	svc->flags |= DSSF_LOOPBACK;
3023 	svc->svc_hdl = lb_svc->hdl;
3024 	svc->port = NULL;
3025 	svc->ver.major = new_major;
3026 	svc->ver.minor = new_minor;
3027 
3028 	lb_svc->flags |= DSSF_LOOPBACK;
3029 	lb_svc->svc_hdl = svc->hdl;
3030 	lb_svc->port = NULL;
3031 	lb_svc->ver.major = new_major;
3032 	lb_svc->ver.minor = new_minor;
3033 
3034 	DS_DBG_LOOP(CE_NOTE, "%s: setting loopback between: 0x%llx and 0x%llx"
3035 	    DS_EOL, __func__, (u_longlong_t)svc->hdl,
3036 	    (u_longlong_t)lb_svc->hdl);
3037 	return (0);
3038 }
3039 
3040 static ds_svc_t *
3041 ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl, ds_port_t *port)
3042 {
3043 	int		idx;
3044 	ds_svc_t	*svc;
3045 
3046 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s looking up clnt hdl: 0x%llx" DS_EOL,
3047 	    PORTID(port), __func__, (u_longlong_t)hdl);
3048 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3049 
3050 	/* walk every table entry */
3051 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
3052 		svc = ds_svcs.tbl[idx];
3053 		if (DS_SVC_ISFREE(svc))
3054 			continue;
3055 		if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3056 		    svc->svc_hdl == hdl && svc->port == port) {
3057 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s found clnt hdl "
3058 			    "0x%llx: svc%d" DS_EOL, PORTID(port), __func__,
3059 			    (u_longlong_t)hdl, (uint_t)DS_HDL2IDX(svc->hdl));
3060 			return (svc);
3061 		}
3062 	}
3063 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s clnt hdl: 0x%llx not found" DS_EOL,
3064 	    PORTID(port), __func__, (u_longlong_t)hdl);
3065 
3066 	return (NULL);
3067 }
3068 
3069 static ds_svc_t *
3070 ds_svc_clone(ds_svc_t *svc)
3071 {
3072 	ds_svc_t *newsvc;
3073 	ds_svc_hdl_t hdl;
3074 
3075 	ASSERT(svc->flags & DSSF_ISCLIENT);
3076 
3077 	newsvc = ds_alloc_svc();
3078 
3079 	/* Can only clone clients for now */
3080 	hdl = newsvc->hdl | DS_HDL_ISCLIENT_BIT;
3081 	DS_DBG_USR(CE_NOTE, "%s: cloning client: old hdl: 0x%llx new hdl: "
3082 	    "0x%llx" DS_EOL, __func__, (u_longlong_t)svc->hdl,
3083 	    (u_longlong_t)hdl);
3084 	(void) memcpy(newsvc, svc, sizeof (ds_svc_t));
3085 	newsvc->hdl = hdl;
3086 	newsvc->flags &= ~DSSF_LOOPBACK;
3087 	newsvc->port = NULL;
3088 	newsvc->svc_hdl = DS_BADHDL2;
3089 	newsvc->cap.svc_id = ds_strdup(svc->cap.svc_id);
3090 	newsvc->cap.vers = DS_MALLOC(svc->cap.nvers * sizeof (ds_ver_t));
3091 	(void) memcpy(newsvc->cap.vers, svc->cap.vers,
3092 	    svc->cap.nvers * sizeof (ds_ver_t));
3093 
3094 	/*
3095 	 * Kludge to allow lds driver user callbacks to get access to current
3096 	 * svc structure.  Arg could be index to svc table or some other piece
3097 	 * of info to get to the svc table entry.
3098 	 */
3099 	if (newsvc->flags & DSSF_ISUSER) {
3100 		newsvc->ops.cb_arg = (ds_cb_arg_t)(newsvc);
3101 	}
3102 	return (newsvc);
3103 }
3104 
3105 /*
3106  * Internal handle lookup function.
3107  */
3108 static int
3109 i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3110     uint_t maxhdls)
3111 {
3112 	int idx;
3113 	int nhdls = 0;
3114 	ds_svc_t *svc;
3115 	uint32_t client_flag = is_client ? DSSF_ISCLIENT : 0;
3116 
3117 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3118 
3119 	for (idx = 0; idx < ds_svcs.maxsvcs && nhdls < maxhdls; idx++) {
3120 		svc = ds_svcs.tbl[idx];
3121 		if (DS_SVC_ISFREE(svc))
3122 			continue;
3123 		if (strcmp(svc->cap.svc_id, service) == 0 &&
3124 		    (svc->flags & DSSF_ISCLIENT) == client_flag) {
3125 			if (hdlp != NULL && nhdls < maxhdls) {
3126 				hdlp[nhdls] = svc->hdl;
3127 				nhdls++;
3128 			} else {
3129 				nhdls++;
3130 			}
3131 		}
3132 	}
3133 	return (nhdls);
3134 }
3135 
3136 /*
3137  * Interface for ds_hdl_lookup in lds driver.
3138  */
3139 int
3140 ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3141     uint_t maxhdls, uint_t *nhdlsp)
3142 {
3143 	mutex_enter(&ds_svcs.lock);
3144 	*nhdlsp = i_ds_hdl_lookup(service, is_client, hdlp, maxhdls);
3145 	mutex_exit(&ds_svcs.lock);
3146 	return (0);
3147 }
3148 
3149 static void
3150 ds_portset_del_active_clients(char *service, ds_portset_t *portsp)
3151 {
3152 	ds_portset_t ports;
3153 	int idx;
3154 	ds_svc_t *svc;
3155 	ds_svc_hdl_t hdl;
3156 
3157 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3158 
3159 	DS_PORTSET_DUP(ports, *portsp);
3160 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
3161 		svc = ds_svcs.tbl[idx];
3162 		if (DS_SVC_ISFREE(svc))
3163 			continue;
3164 		if (strcmp(svc->cap.svc_id, service) == 0 &&
3165 		    (svc->flags & DSSF_ISCLIENT) != 0 &&
3166 		    svc->state != DS_SVC_INACTIVE &&
3167 		    svc->port != NULL) {
3168 			DS_PORTSET_DEL(ports, PORTID(svc->port));
3169 		}
3170 	}
3171 
3172 	/*
3173 	 * Legacy "pri" client service should not try to make a
3174 	 * connection to the SP if the existing "pri" provider
3175 	 * service has a connection.
3176 	 */
3177 	if (strcmp(service, "pri") == 0 &&
3178 	    i_ds_hdl_lookup(service, 0, &hdl, 1) == 1 &&
3179 	    (svc = ds_get_svc(hdl)) != NULL && svc->state == DS_SVC_ACTIVE &&
3180 	    svc->port != NULL) {
3181 		DS_PORTSET_DEL(ports, PORTID(svc->port));
3182 	}
3183 	DS_PORTSET_DUP(*portsp, ports);
3184 }
3185 
3186 /*
3187  * After an UNREG REQ, check if this is a client service with multiple
3188  * handles.  If it is, then we can eliminate this entry.
3189  */
3190 static void
3191 ds_check_for_dup_services(ds_svc_t *svc)
3192 {
3193 	if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3194 	    svc->state == DS_SVC_INACTIVE &&
3195 	    i_ds_hdl_lookup(svc->cap.svc_id, 1, NULL, 2) == 2) {
3196 		ds_delete_svc_entry(svc);
3197 	}
3198 }
3199 
3200 static void
3201 ds_delete_svc_entry(ds_svc_t *svc)
3202 {
3203 	ds_svc_hdl_t tmp_hdl;
3204 
3205 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3206 
3207 	/*
3208 	 * Clear out the structure, but do not deallocate the
3209 	 * memory. It can be reused for the next registration.
3210 	 */
3211 	DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
3212 	DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
3213 
3214 	/* save the handle to prevent reuse */
3215 	tmp_hdl = svc->hdl;
3216 	bzero((void *)svc, sizeof (ds_svc_t));
3217 
3218 	/* initialize for next use */
3219 	svc->hdl = tmp_hdl;
3220 	svc->state = DS_SVC_FREE;
3221 
3222 	ds_svcs.nsvcs--;
3223 }
3224