xref: /titanic_50/usr/src/uts/sun4v/io/ds_common.c (revision cb8a054b1ab30d5caa746e6c44f29d4c9d3071c1)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  */
25 
26 /*
27  * Domain Services Module Common Code.
28  *
29  * This module is intended to be used by both Solaris and the VBSC
30  * module.
31  */
32 
33 #include <sys/modctl.h>
34 #include <sys/ksynch.h>
35 #include <sys/taskq.h>
36 #include <sys/disp.h>
37 #include <sys/cmn_err.h>
38 #include <sys/note.h>
39 #include <sys/mach_descrip.h>
40 #include <sys/mdesc.h>
41 #include <sys/ldc.h>
42 #include <sys/ds.h>
43 #include <sys/ds_impl.h>
44 
45 #ifndef MIN
46 #define	MIN(a, b)	((a) < (b) ? (a) : (b))
47 #endif
48 
49 #define	DS_DECODE_BUF_LEN		30
50 
51 /*
52  * All DS ports in the system
53  *
54  * The list of DS ports is read in from the MD when the DS module is
55  * initialized and is never modified. This eliminates the need for
56  * locking to access the port array itself. Access to the individual
57  * ports are synchronized at the port level.
58  */
59 ds_port_t	ds_ports[DS_MAX_PORTS];
60 ds_portset_t	ds_allports;	/* all DS ports in the system */
61 ds_portset_t	ds_nullport;	/* allows test against null portset */
62 
63 /* DS SP port id */
64 uint64_t ds_sp_port_id = DS_PORTID_INVALID;
65 
66 /*
67  * Table of registered services
68  *
69  * Locking: Accesses to the table of services are synchronized using
70  *   a mutex lock. The reader lock must be held when looking up service
71  *   information in the table. The writer lock must be held when any
72  *   service information is being modified.
73  */
74 ds_svcs_t	ds_svcs;
75 
76 /*
77  * Flag to prevent callbacks while in the middle of DS teardown.
78  */
79 boolean_t ds_enabled = B_FALSE;	/* enable/disable taskq processing */
80 
81 /*
82  * Retry count and delay for LDC reads and writes
83  */
84 #ifndef DS_DEFAULT_RETRIES
85 #define	DS_DEFAULT_RETRIES	10000	/* number of times to retry */
86 #endif
87 #ifndef DS_DEFAULT_DELAY
88 #define	DS_DEFAULT_DELAY	1000	/* usecs to wait between retries */
89 #endif
90 
91 static int ds_retries = DS_DEFAULT_RETRIES;
92 static clock_t ds_delay = DS_DEFAULT_DELAY;
93 
94 /*
95  * Supported versions of the DS message protocol
96  *
97  * The version array must be sorted in order from the highest
98  * supported version to the lowest. Support for a particular
99  * <major>.<minor> version implies all lower minor versions of
100  * that same major version are supported as well.
101  */
102 static ds_ver_t ds_vers[] = { { 1, 0 } };
103 
104 #define	DS_NUM_VER	(sizeof (ds_vers) / sizeof (ds_vers[0]))
105 
106 
107 /* incoming message handling functions */
108 typedef void (*ds_msg_handler_t)(ds_port_t *port, caddr_t buf, size_t len);
109 static void ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len);
110 static void ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len);
111 static void ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len);
112 static void ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len);
113 static void ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len);
114 static void ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len);
115 static void ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len);
116 static void ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len);
117 static void ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len);
118 static void ds_handle_data(ds_port_t *port, caddr_t buf, size_t len);
119 static void ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len);
120 
121 /*
122  * DS Message Handler Dispatch Table
123  *
124  * A table used to dispatch all incoming messages. This table
125  * contains handlers for all the fixed message types, as well as
126  * the the messages defined in the 1.0 version of the DS protocol.
127  * The handlers are indexed based on the DS header msg_type values
128  */
129 static const ds_msg_handler_t ds_msg_handlers[] = {
130 	ds_handle_init_req,		/* DS_INIT_REQ */
131 	ds_handle_init_ack,		/* DS_INIT_ACK */
132 	ds_handle_init_nack,		/* DS_INIT_NACK */
133 	ds_handle_reg_req,		/* DS_REG_REQ */
134 	ds_handle_reg_ack,		/* DS_REG_ACK */
135 	ds_handle_reg_nack,		/* DS_REG_NACK */
136 	ds_handle_unreg_req,		/* DS_UNREG */
137 	ds_handle_unreg_ack,		/* DS_UNREG_ACK */
138 	ds_handle_unreg_nack,		/* DS_UNREG_NACK */
139 	ds_handle_data,			/* DS_DATA */
140 	ds_handle_nack			/* DS_NACK */
141 };
142 
143 
144 
145 /* initialization functions */
146 static int ds_ldc_init(ds_port_t *port);
147 
148 /* event processing functions */
149 static uint_t ds_ldc_cb(uint64_t event, caddr_t arg);
150 static int ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep);
151 static void ds_handle_up_event(ds_port_t *port);
152 static void ds_handle_down_reset_events(ds_port_t *port);
153 static void ds_handle_recv(void *arg);
154 static void ds_dispatch_event(void *arg);
155 
156 /* message sending functions */
157 static int ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen);
158 static int ds_send_reg_req(ds_svc_t *svc, ds_port_t *port);
159 static void ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
160 static void ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl);
161 
162 /* walker functions */
163 static int ds_svc_isfree(ds_svc_t *svc, void *arg);
164 static int ds_svc_unregister(ds_svc_t *svc, void *arg);
165 static int ds_svc_port_up(ds_svc_t *svc, void *arg);
166 
167 /* service utilities */
168 static void ds_reset_svc(ds_svc_t *svc, ds_port_t *port);
169 static int ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port);
170 static int ds_svc_register_onport_walker(ds_svc_t *svc, void *arg);
171 static void ds_set_port_ready(ds_port_t *port, uint16_t major, uint16_t minor);
172 
173 /* port utilities */
174 static void ds_port_reset(ds_port_t *port);
175 static ldc_status_t ds_update_ldc_state(ds_port_t *port);
176 
177 /* misc utilities */
178 static void min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
179     uint16_t *min_major, uint16_t *max_major);
180 
181 /* debug */
182 static char *decode_ldc_events(uint64_t event, char *buf);
183 
184 /* loopback */
185 static void ds_loopback_register(ds_svc_hdl_t hdl);
186 static void ds_loopback_unregister(ds_svc_hdl_t hdl);
187 static void ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen);
188 static int ds_loopback_set_svc(ds_svc_t *svc, ds_capability_t *cap,
189     ds_svc_hdl_t *lb_hdlp);
190 
191 /* client handling */
192 static int i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
193     uint_t maxhdls);
194 static ds_svc_t *ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl,
195     ds_port_t *port);
196 static ds_svc_t *ds_find_svc_by_id_port(char *svc_id, int is_client,
197     ds_port_t *port);
198 static ds_svc_t *ds_svc_clone(ds_svc_t *svc);
199 static void ds_check_for_dup_services(ds_svc_t *svc);
200 static void ds_delete_svc_entry(ds_svc_t *svc);
201 
202 char *
203 ds_strdup(char *str)
204 {
205 	char *newstr;
206 
207 	newstr = DS_MALLOC(strlen(str) + 1);
208 	(void) strcpy(newstr, str);
209 	return (newstr);
210 }
211 
212 void
213 ds_common_init(void)
214 {
215 	/* Validate version table */
216 	ASSERT(ds_vers_isvalid(ds_vers, DS_NUM_VER) == DS_VERS_OK);
217 
218 	/* Initialize services table */
219 	ds_init_svcs_tbl(DS_MAXSVCS_INIT);
220 
221 	/* enable callback processing */
222 	ds_enabled = B_TRUE;
223 }
224 
225 /* BEGIN LDC SUPPORT FUNCTIONS */
226 
227 static char *
228 decode_ldc_events(uint64_t event, char *buf)
229 {
230 	buf[0] = 0;
231 	if (event & LDC_EVT_DOWN)	(void) strcat(buf, " DOWN");
232 	if (event & LDC_EVT_RESET)	(void) strcat(buf, " RESET");
233 	if (event & LDC_EVT_UP)		(void) strcat(buf, " UP");
234 	if (event & LDC_EVT_READ)	(void) strcat(buf, " READ");
235 	if (event & LDC_EVT_WRITE)	(void) strcat(buf, " WRITE");
236 	return (buf);
237 }
238 
239 static ldc_status_t
240 ds_update_ldc_state(ds_port_t *port)
241 {
242 	ldc_status_t	ldc_state;
243 	int		rv;
244 	char		ebuf[DS_EBUFSIZE];
245 
246 	ASSERT(MUTEX_HELD(&port->lock));
247 
248 	/*
249 	 * Read status and update ldc state info in port structure.
250 	 */
251 	if ((rv = ldc_status(port->ldc.hdl, &ldc_state)) != 0) {
252 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_status error: %s" DS_EOL,
253 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
254 		ldc_state = port->ldc.state;
255 	} else {
256 		port->ldc.state = ldc_state;
257 	}
258 
259 	return (ldc_state);
260 }
261 
262 static void
263 ds_handle_down_reset_events(ds_port_t *port)
264 {
265 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
266 	    __func__);
267 
268 	mutex_enter(&ds_svcs.lock);
269 	mutex_enter(&port->lock);
270 
271 	ds_sys_drain_events(port);
272 
273 	(void) ds_update_ldc_state(port);
274 
275 	/* reset the port state */
276 	ds_port_reset(port);
277 
278 	/* acknowledge the reset */
279 	(void) ldc_up(port->ldc.hdl);
280 
281 	mutex_exit(&port->lock);
282 	mutex_exit(&ds_svcs.lock);
283 
284 	ds_handle_up_event(port);
285 
286 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
287 }
288 
289 static void
290 ds_handle_up_event(ds_port_t *port)
291 {
292 	ldc_status_t	ldc_state;
293 
294 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: entered" DS_EOL, PORTID(port),
295 	    __func__);
296 
297 	mutex_enter(&port->lock);
298 
299 	ldc_state = ds_update_ldc_state(port);
300 
301 	mutex_exit(&port->lock);
302 
303 	if ((ldc_state == LDC_UP) && IS_DS_PORT(port)) {
304 		/*
305 		 * Initiate the handshake.
306 		 */
307 		ds_send_init_req(port);
308 	}
309 
310 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
311 }
312 
313 static uint_t
314 ds_ldc_cb(uint64_t event, caddr_t arg)
315 {
316 	ds_port_t	*port = (ds_port_t *)arg;
317 	char		evstring[DS_DECODE_BUF_LEN];
318 
319 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: %s event (%llx) received" DS_EOL,
320 	    PORTID(port), __func__, decode_ldc_events(event, evstring),
321 	    (u_longlong_t)event);
322 
323 	if (!ds_enabled) {
324 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: callback handling is disabled"
325 		    DS_EOL, PORTID(port), __func__);
326 		return (LDC_SUCCESS);
327 	}
328 
329 	if (event & (LDC_EVT_DOWN | LDC_EVT_RESET)) {
330 		ds_handle_down_reset_events(port);
331 		goto done;
332 	}
333 
334 	if (event & LDC_EVT_UP) {
335 		ds_handle_up_event(port);
336 	}
337 
338 	if (event & LDC_EVT_READ) {
339 		if (port->ldc.state != LDC_UP) {
340 			cmn_err(CE_WARN, "ds@%lx: %s: LDC READ event while "
341 			    "port not up" DS_EOL, PORTID(port), __func__);
342 			goto done;
343 		}
344 
345 		if (ds_sys_dispatch_func(ds_handle_recv, port)) {
346 			cmn_err(CE_WARN, "ds@%lx: error initiating LDC READ "
347 			    " event", PORTID(port));
348 		}
349 	}
350 
351 	if (event & LDC_EVT_WRITE) {
352 		DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: LDC WRITE event received, "
353 		    "not supported" DS_EOL, PORTID(port), __func__);
354 	}
355 
356 	if (event & ~(LDC_EVT_UP | LDC_EVT_READ)) {
357 		cmn_err(CE_WARN, "ds@%lx: %s: Unexpected LDC event received: "
358 		    "0x%llx" DS_EOL, PORTID(port), __func__,
359 		    (u_longlong_t)event);
360 	}
361 done:
362 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: exit" DS_EOL, PORTID(port), __func__);
363 
364 	return (LDC_SUCCESS);
365 }
366 
367 static int
368 ds_ldc_init(ds_port_t *port)
369 {
370 	int		rv;
371 	ldc_attr_t	ldc_attr;
372 	caddr_t		ldc_cb_arg = (caddr_t)port;
373 	char		ebuf[DS_EBUFSIZE];
374 
375 	ASSERT(MUTEX_HELD(&port->lock));
376 
377 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%lld" DS_EOL,
378 	    PORTID(port), __func__, (u_longlong_t)port->ldc.id);
379 
380 	ldc_attr.devclass = LDC_DEV_GENERIC;
381 	ldc_attr.instance = 0;
382 	ldc_attr.mode = LDC_MODE_RELIABLE;
383 	ldc_attr.mtu = DS_STREAM_MTU;
384 
385 	if ((rv = ldc_init(port->ldc.id, &ldc_attr, &port->ldc.hdl)) != 0) {
386 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_id: %lx, ldc_init error: %s"
387 		    DS_EOL, PORTID(port), __func__, port->ldc.id,
388 		    ds_errno_to_str(rv, ebuf));
389 		return (rv);
390 	}
391 
392 	rv = ldc_reg_callback(port->ldc.hdl, ds_ldc_cb, ldc_cb_arg);
393 	if (rv != 0) {
394 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_reg_callback error: %s"
395 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
396 		return (rv);
397 	}
398 
399 	ds_sys_ldc_init(port);
400 	return (0);
401 }
402 
403 int
404 ds_ldc_fini(ds_port_t *port)
405 {
406 	int	rv;
407 	char	ebuf[DS_EBUFSIZE];
408 
409 	ASSERT(port->state >= DS_PORT_LDC_INIT);
410 	ASSERT(MUTEX_HELD(&port->lock));
411 
412 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s: ldc_id=%ld" DS_EOL, PORTID(port),
413 	    __func__, port->ldc.id);
414 
415 	if ((rv = ldc_close(port->ldc.hdl)) != 0) {
416 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_close error: %s" DS_EOL,
417 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
418 		return (rv);
419 	}
420 
421 	if ((rv = ldc_unreg_callback(port->ldc.hdl)) != 0) {
422 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_unreg_callback error: %s"
423 		    DS_EOL, PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
424 		return (rv);
425 	}
426 
427 	if ((rv = ldc_fini(port->ldc.hdl)) != 0) {
428 		cmn_err(CE_WARN, "ds@%lx: %s: ldc_fini error: %s" DS_EOL,
429 		    PORTID(port), __func__, ds_errno_to_str(rv, ebuf));
430 		return (rv);
431 	}
432 
433 	port->ldc.id = (uint64_t)-1;
434 	port->ldc.hdl = NULL;
435 	port->ldc.state = 0;
436 
437 	return (rv);
438 }
439 
440 /*
441  * Attempt to read a specified number of bytes from a particular LDC.
442  * Returns zero for success or the return code from the LDC read on
443  * failure. The actual number of bytes read from the LDC is returned
444  * in the size parameter.
445  */
446 static int
447 ds_recv_msg(ds_port_t *port, caddr_t msgp, size_t *sizep)
448 {
449 	int	rv = 0;
450 	size_t	bytes_req = *sizep;
451 	size_t	bytes_left = bytes_req;
452 	size_t	nbytes;
453 	int	retry_count = 0;
454 	char	ebuf[DS_EBUFSIZE];
455 
456 	ASSERT(MUTEX_HELD(&port->rcv_lock));
457 
458 	*sizep = 0;
459 
460 	DS_DBG_LDC(CE_NOTE, "ds@%lx: attempting to read %ld bytes" DS_EOL,
461 	    PORTID(port), bytes_req);
462 
463 	while (bytes_left > 0) {
464 
465 		nbytes = bytes_left;
466 
467 		mutex_enter(&port->lock);
468 		if (port->ldc.state == LDC_UP) {
469 			rv = ldc_read(port->ldc.hdl, msgp, &nbytes);
470 		} else
471 			rv = ENXIO;
472 		mutex_exit(&port->lock);
473 		if (rv != 0) {
474 			if (rv == ECONNRESET) {
475 				break;
476 			} else if (rv != EAGAIN) {
477 				cmn_err(CE_NOTE, "ds@%lx: %s: %s" DS_EOL,
478 				    PORTID(port), __func__,
479 				    ds_errno_to_str(rv, ebuf));
480 				break;
481 			}
482 		} else {
483 			if (nbytes != 0) {
484 				DS_DBG_LDC(CE_NOTE, "ds@%lx: "
485 				    "read %ld bytes, %d retries" DS_EOL,
486 				    PORTID(port), nbytes, retry_count);
487 
488 				*sizep += nbytes;
489 				msgp += nbytes;
490 				bytes_left -= nbytes;
491 
492 				/* reset counter on a successful read */
493 				retry_count = 0;
494 				continue;
495 			}
496 
497 			/*
498 			 * No data was read. Check if this is the
499 			 * first attempt. If so, just return since
500 			 * nothing has been read yet.
501 			 */
502 			if (bytes_left == bytes_req) {
503 				DS_DBG_LDC(CE_NOTE, "ds@%lx: read zero bytes, "
504 				    " no data available" DS_EOL, PORTID(port));
505 				break;
506 			}
507 		}
508 
509 		/*
510 		 * A retry is necessary because the read returned
511 		 * EAGAIN, or a zero length read occurred after
512 		 * reading a partial message.
513 		 */
514 		if (retry_count++ >= ds_retries) {
515 			DS_DBG_LDC(CE_NOTE, "ds@%lx: timed out waiting for "
516 			    "message" DS_EOL, PORTID(port));
517 			break;
518 		}
519 
520 		drv_usecwait(ds_delay);
521 	}
522 
523 	return (rv);
524 }
525 
526 static void
527 ds_handle_recv(void *arg)
528 {
529 	ds_port_t	*port = (ds_port_t *)arg;
530 	char		*hbuf;
531 	size_t		msglen;
532 	size_t		read_size;
533 	boolean_t	hasdata;
534 	ds_hdr_t	hdr;
535 	uint8_t		*msg;
536 	char		*currp;
537 	int		rv;
538 	ds_event_t	*devent;
539 
540 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s..." DS_EOL, PORTID(port), __func__);
541 
542 	/*
543 	 * Read messages from the channel until there are none
544 	 * pending. Valid messages are dispatched to be handled
545 	 * by a separate thread while any malformed messages are
546 	 * dropped.
547 	 */
548 
549 	mutex_enter(&port->rcv_lock);
550 
551 	for (;;) {
552 		mutex_enter(&port->lock);
553 		if (port->ldc.state == LDC_UP) {
554 			rv = ldc_chkq(port->ldc.hdl, &hasdata);
555 		} else
556 			rv = ENXIO;
557 		mutex_exit(&port->lock);
558 		if (rv != 0 || !hasdata)
559 			break;
560 
561 		DS_DBG(CE_NOTE, "ds@%lx: %s: reading next message" DS_EOL,
562 		    PORTID(port), __func__);
563 
564 		/*
565 		 * Read in the next message.
566 		 */
567 		hbuf = (char *)&hdr;
568 		bzero(hbuf, DS_HDR_SZ);
569 		read_size = DS_HDR_SZ;
570 		currp = hbuf;
571 
572 		/* read in the message header */
573 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
574 			break;
575 		}
576 
577 		if (read_size < DS_HDR_SZ) {
578 			/*
579 			 * A zero length read is a valid signal that
580 			 * there is no data left on the channel.
581 			 */
582 			if (read_size != 0) {
583 				cmn_err(CE_WARN, "ds@%lx: invalid message "
584 				    "length, received %ld bytes, expected %ld"
585 				    DS_EOL, PORTID(port), read_size, DS_HDR_SZ);
586 			}
587 			continue;
588 		}
589 
590 		/* get payload size and allocate a buffer */
591 		read_size = ((ds_hdr_t *)hbuf)->payload_len;
592 		msglen = DS_HDR_SZ + read_size;
593 		msg = DS_MALLOC(msglen);
594 		if (!msg) {
595 			cmn_err(CE_WARN, "Memory allocation failed attempting "
596 			    " to allocate %d bytes." DS_EOL, (int)msglen);
597 			continue;
598 		}
599 
600 		DS_DBG(CE_NOTE, "ds@%lx: %s: message payload len %d" DS_EOL,
601 		    PORTID(port), __func__, (int)read_size);
602 
603 		/* move message header into buffer */
604 		(void) memcpy(msg, hbuf, DS_HDR_SZ);
605 		currp = (char *)(msg) + DS_HDR_SZ;
606 
607 		/* read in the message body */
608 		if ((rv = ds_recv_msg(port, currp, &read_size)) != 0) {
609 			DS_FREE(msg, msglen);
610 			break;
611 		}
612 
613 		/* validate the size of the message */
614 		if ((DS_HDR_SZ + read_size) != msglen) {
615 			cmn_err(CE_WARN, "ds@%lx: %s: invalid message length, "
616 			    "received %ld bytes, expected %ld" DS_EOL,
617 			    PORTID(port), __func__, (DS_HDR_SZ + read_size),
618 			    msglen);
619 			DS_FREE(msg, msglen);
620 			continue;
621 		}
622 
623 		DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
624 
625 		/*
626 		 * Send the message for processing, and store it
627 		 * in the log. The memory is deallocated only when
628 		 * the message is removed from the log.
629 		 */
630 
631 		devent = DS_MALLOC(sizeof (ds_event_t));
632 		devent->port = port;
633 		devent->buf = (char *)msg;
634 		devent->buflen = msglen;
635 
636 		/* log the message */
637 		(void) ds_log_add_msg(DS_LOG_IN(port->id), msg, msglen);
638 
639 		if (ds_sys_dispatch_func(ds_dispatch_event, devent)) {
640 			cmn_err(CE_WARN, "ds@%lx: error initiating "
641 			    "event handler", PORTID(port));
642 			DS_FREE(devent, sizeof (ds_event_t));
643 		}
644 	}
645 
646 	mutex_exit(&port->rcv_lock);
647 
648 	/* handle connection reset errors returned from ds_recv_msg */
649 	if (rv == ECONNRESET) {
650 		ds_handle_down_reset_events(port);
651 	}
652 
653 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s done" DS_EOL, PORTID(port), __func__);
654 }
655 
656 static void
657 ds_dispatch_event(void *arg)
658 {
659 	ds_event_t	*event = (ds_event_t *)arg;
660 	ds_hdr_t	*hdr;
661 	ds_port_t	*port;
662 
663 	port = event->port;
664 
665 	hdr = (ds_hdr_t *)event->buf;
666 
667 	if (DS_MSG_TYPE_VALID(hdr->msg_type)) {
668 		DS_DBG(CE_NOTE, "ds@%lx: dispatch_event: msg_type=%d" DS_EOL,
669 		    PORTID(port), hdr->msg_type);
670 
671 		(*ds_msg_handlers[hdr->msg_type])(port, event->buf,
672 		    event->buflen);
673 	} else {
674 		cmn_err(CE_WARN, "ds@%lx: dispatch_event: invalid msg "
675 		    "type (%d)" DS_EOL, PORTID(port), hdr->msg_type);
676 	}
677 
678 	DS_FREE(event->buf, event->buflen);
679 	DS_FREE(event, sizeof (ds_event_t));
680 }
681 
682 int
683 ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen)
684 {
685 	int	rv;
686 	caddr_t	currp = msg;
687 	size_t	amt_left = msglen;
688 	int	loopcnt = 0;
689 
690 	DS_DBG_LDC(CE_NOTE, "ds@%lx: %s msglen: %ld" DS_EOL, PORTID(port),
691 	    __func__, msglen);
692 	DS_DUMP_MSG(DS_DBG_FLAG_LDC, msg, msglen);
693 
694 	(void) ds_log_add_msg(DS_LOG_OUT(port->id), (uint8_t *)msg, msglen);
695 
696 	/*
697 	 * Ensure that no other messages can be sent on this port by holding
698 	 * the tx_lock mutex in case the write doesn't get sent with one write.
699 	 * This guarantees that the message doesn't become fragmented.
700 	 */
701 	mutex_enter(&port->tx_lock);
702 
703 	do {
704 		mutex_enter(&port->lock);
705 		if (port->ldc.state == LDC_UP) {
706 			rv = ldc_write(port->ldc.hdl, currp, &msglen);
707 		} else
708 			rv = ENXIO;
709 		mutex_exit(&port->lock);
710 		if (rv != 0) {
711 			if (rv == ECONNRESET) {
712 				mutex_exit(&port->tx_lock);
713 				(void) ds_sys_dispatch_func((void (*)(void *))
714 				    ds_handle_down_reset_events, port);
715 				return (rv);
716 			} else if ((rv == EWOULDBLOCK) &&
717 			    (loopcnt++ < ds_retries)) {
718 				drv_usecwait(ds_delay);
719 			} else {
720 				DS_DBG_PRCL(CE_NOTE, "ds@%lx: send_msg: "
721 				    "ldc_write failed (%d), %d bytes "
722 				    "remaining" DS_EOL, PORTID(port), rv,
723 				    (int)amt_left);
724 				goto error;
725 			}
726 		} else {
727 			amt_left -= msglen;
728 			currp += msglen;
729 			msglen = amt_left;
730 			loopcnt = 0;
731 		}
732 	} while (amt_left > 0);
733 error:
734 	mutex_exit(&port->tx_lock);
735 
736 	return (rv);
737 }
738 
739 /* END LDC SUPPORT FUNCTIONS */
740 
741 
742 /* BEGIN DS PROTOCOL SUPPORT FUNCTIONS */
743 
744 static void
745 ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len)
746 {
747 	ds_hdr_t	*hdr;
748 	ds_init_ack_t	*ack;
749 	ds_init_nack_t	*nack;
750 	char		*msg;
751 	size_t		msglen;
752 	ds_init_req_t	*req;
753 	size_t		explen = DS_MSG_LEN(ds_init_req_t);
754 	uint16_t	new_major;
755 	uint16_t	new_minor;
756 	boolean_t	match;
757 
758 	/* sanity check the incoming message */
759 	if (len != explen) {
760 		cmn_err(CE_WARN, "ds@%lx: <init_req: invalid message "
761 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
762 		    explen);
763 		return;
764 	}
765 
766 	req = (ds_init_req_t *)(buf + DS_HDR_SZ);
767 
768 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_req: ver=%d.%d" DS_EOL,
769 	    PORTID(port), req->major_vers, req->minor_vers);
770 
771 	match = negotiate_version(DS_NUM_VER, &ds_vers[0],
772 	    req->major_vers, &new_major, &new_minor);
773 
774 	/*
775 	 * Check version info. ACK only if the major numbers exactly
776 	 * match. The service entity can retry with a new minor
777 	 * based on the response sent as part of the NACK.
778 	 */
779 	if (match) {
780 		msglen = DS_MSG_LEN(ds_init_ack_t);
781 		msg = DS_MALLOC(msglen);
782 
783 		hdr = (ds_hdr_t *)msg;
784 		hdr->msg_type = DS_INIT_ACK;
785 		hdr->payload_len = sizeof (ds_init_ack_t);
786 
787 		ack = (ds_init_ack_t *)(msg + DS_HDR_SZ);
788 		ack->minor_vers = MIN(new_minor, req->minor_vers);
789 
790 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_ack>: minor=0x%04X" DS_EOL,
791 		    PORTID(port), MIN(new_minor, req->minor_vers));
792 	} else {
793 		msglen = DS_MSG_LEN(ds_init_nack_t);
794 		msg = DS_MALLOC(msglen);
795 
796 		hdr = (ds_hdr_t *)msg;
797 		hdr->msg_type = DS_INIT_NACK;
798 		hdr->payload_len = sizeof (ds_init_nack_t);
799 
800 		nack = (ds_init_nack_t *)(msg + DS_HDR_SZ);
801 		nack->major_vers = new_major;
802 
803 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_nack>: major=0x%04X" DS_EOL,
804 		    PORTID(port), new_major);
805 	}
806 
807 	/*
808 	 * Send the response
809 	 */
810 	(void) ds_send_msg(port, msg, msglen);
811 	DS_FREE(msg, msglen);
812 
813 	if (match) {
814 		ds_set_port_ready(port, req->major_vers, ack->minor_vers);
815 	}
816 }
817 
818 static void
819 ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len)
820 {
821 	ds_init_ack_t	*ack;
822 	ds_ver_t	*ver;
823 	uint16_t	major;
824 	uint16_t	minor;
825 	size_t		explen = DS_MSG_LEN(ds_init_ack_t);
826 
827 	/* sanity check the incoming message */
828 	if (len != explen) {
829 		cmn_err(CE_WARN, "ds@%lx: <init_ack: invalid message "
830 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
831 		    explen);
832 		return;
833 	}
834 
835 	ack = (ds_init_ack_t *)(buf + DS_HDR_SZ);
836 
837 	mutex_enter(&port->lock);
838 
839 	if (port->state == DS_PORT_READY) {
840 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: port ready" DS_EOL,
841 		    PORTID(port));
842 		mutex_exit(&port->lock);
843 		return;
844 	}
845 
846 	if (port->state != DS_PORT_INIT_REQ) {
847 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: invalid state: %d"
848 		    DS_EOL, PORTID(port), port->state);
849 		mutex_exit(&port->lock);
850 		return;
851 	}
852 
853 	ver = &(ds_vers[port->ver_idx]);
854 	major = ver->major;
855 	minor = MIN(ver->minor, ack->minor_vers);
856 	mutex_exit(&port->lock);
857 
858 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_ack: port ready v%d.%d" DS_EOL,
859 	    PORTID(port), major, minor);
860 
861 	ds_set_port_ready(port, major, minor);
862 }
863 
864 static void
865 ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len)
866 {
867 	int		idx;
868 	ds_init_nack_t	*nack;
869 	ds_ver_t	*ver;
870 	size_t		explen = DS_MSG_LEN(ds_init_nack_t);
871 
872 	/* sanity check the incoming message */
873 	if (len != explen) {
874 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: invalid message "
875 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
876 		    explen);
877 		return;
878 	}
879 
880 	nack = (ds_init_nack_t *)(buf + DS_HDR_SZ);
881 
882 	mutex_enter(&port->lock);
883 
884 	if (port->state != DS_PORT_INIT_REQ) {
885 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: invalid state: %d"
886 		    DS_EOL, PORTID(port), port->state);
887 		mutex_exit(&port->lock);
888 		return;
889 	}
890 
891 	ver = &(ds_vers[port->ver_idx]);
892 
893 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <init_nack: req=v%d.%d, nack=v%d.x"
894 	    DS_EOL, PORTID(port), ver->major, ver->minor, nack->major_vers);
895 
896 	if (nack->major_vers == 0) {
897 		/* no supported protocol version */
898 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS not supported"
899 		    DS_EOL, PORTID(port));
900 		mutex_exit(&port->lock);
901 		return;
902 	}
903 
904 	/*
905 	 * Walk the version list, looking for a major version
906 	 * that is as close to the requested major version as
907 	 * possible.
908 	 */
909 	for (idx = port->ver_idx; idx < DS_NUM_VER; idx++) {
910 		if (ds_vers[idx].major <= nack->major_vers) {
911 			/* found a version to try */
912 			goto done;
913 		}
914 	}
915 
916 	if (idx == DS_NUM_VER) {
917 		/* no supported version */
918 		DS_DBG_PRCL(CE_WARN, "ds@%lx: <init_nack: DS v%d.x not "
919 		    "supported" DS_EOL, PORTID(port), nack->major_vers);
920 
921 		mutex_exit(&port->lock);
922 		return;
923 	}
924 
925 done:
926 	/* start the handshake again */
927 	port->ver_idx = idx;
928 	port->state = DS_PORT_LDC_INIT;
929 	mutex_exit(&port->lock);
930 
931 	ds_send_init_req(port);
932 
933 }
934 
935 static ds_svc_t *
936 ds_find_svc_by_id_port(char *svc_id, int is_client, ds_port_t *port)
937 {
938 	int		idx;
939 	ds_svc_t	*svc, *found_svc = 0;
940 	uint32_t	flag_match = is_client ? DSSF_ISCLIENT : 0;
941 
942 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
943 
944 	/* walk every table entry */
945 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
946 		svc = ds_svcs.tbl[idx];
947 		if (DS_SVC_ISFREE(svc))
948 			continue;
949 		if (strcmp(svc->cap.svc_id, svc_id) != 0)
950 			continue;
951 		if ((svc->flags & DSSF_ISCLIENT) != flag_match)
952 			continue;
953 		if (port != NULL && svc->port == port) {
954 			return (svc);
955 		} else if (svc->state == DS_SVC_INACTIVE) {
956 			found_svc = svc;
957 		} else if (!found_svc) {
958 			found_svc = svc;
959 		}
960 	}
961 
962 	return (found_svc);
963 }
964 
965 static void
966 ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len)
967 {
968 	ds_reg_req_t	*req;
969 	ds_hdr_t	*hdr;
970 	ds_reg_ack_t	*ack;
971 	ds_reg_nack_t	*nack;
972 	char		*msg;
973 	size_t		msglen;
974 	size_t		explen = DS_MSG_LEN(ds_reg_req_t);
975 	ds_svc_t	*svc = NULL;
976 	ds_ver_t	version;
977 	uint16_t	new_major;
978 	uint16_t	new_minor;
979 	boolean_t	match;
980 
981 	/* sanity check the incoming message */
982 	if (len < explen) {
983 		cmn_err(CE_WARN, "ds@%lx: <reg_req: invalid message "
984 		    "length (%ld), expected at least %ld" DS_EOL,
985 		    PORTID(port), len, explen);
986 		return;
987 	}
988 
989 	req = (ds_reg_req_t *)(buf + DS_HDR_SZ);
990 
991 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' ver=%d.%d, hdl=0x%llx"
992 	    DS_EOL, PORTID(port), req->svc_id, req->major_vers, req->minor_vers,
993 	    (u_longlong_t)req->svc_handle);
994 
995 	mutex_enter(&ds_svcs.lock);
996 	svc = ds_find_svc_by_id_port(req->svc_id,
997 	    DS_HDL_ISCLIENT(req->svc_handle) == 0, port);
998 	if (svc == NULL) {
999 
1000 do_reg_nack:
1001 		mutex_exit(&ds_svcs.lock);
1002 
1003 		msglen = DS_MSG_LEN(ds_reg_nack_t);
1004 		msg = DS_MALLOC(msglen);
1005 
1006 		hdr = (ds_hdr_t *)msg;
1007 		hdr->msg_type = DS_REG_NACK;
1008 		hdr->payload_len = sizeof (ds_reg_nack_t);
1009 
1010 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1011 		nack->svc_handle = req->svc_handle;
1012 		nack->result = DS_REG_VER_NACK;
1013 		nack->major_vers = 0;
1014 
1015 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s'" DS_EOL,
1016 		    PORTID(port), req->svc_id);
1017 		/*
1018 		 * Send the response
1019 		 */
1020 		(void) ds_send_msg(port, msg, msglen);
1021 		DS_FREE(msg, msglen);
1022 		return;
1023 	}
1024 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' found, hdl: 0x%llx" DS_EOL,
1025 	    PORTID(port), req->svc_id, (u_longlong_t)svc->hdl);
1026 
1027 	/*
1028 	 * A client sends out a reg req in order to force service providers to
1029 	 * initiate a reg req from their end (limitation in the protocol).  We
1030 	 * expect the service provider to be in the inactive (DS_SVC_INACTIVE)
1031 	 * state.  If the service provider has already sent out a reg req (the
1032 	 * state is DS_SVC_REG_PENDING) or has already handshaken (the
1033 	 * state is DS_SVC_ACTIVE), then we can simply ignore this reg
1034 	 * req.  For any other state, we force an unregister before initiating
1035 	 * a reg req.
1036 	 */
1037 
1038 	if (DS_HDL_ISCLIENT(req->svc_handle)) {
1039 		switch (svc->state) {
1040 
1041 		case DS_SVC_REG_PENDING:
1042 		case DS_SVC_ACTIVE:
1043 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1044 			    "client, state (%x)" DS_EOL, PORTID(port),
1045 			    req->svc_id, svc->state);
1046 			mutex_exit(&ds_svcs.lock);
1047 			return;
1048 
1049 		case DS_SVC_INACTIVE:
1050 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1051 			    "client" DS_EOL, PORTID(port), req->svc_id);
1052 			break;
1053 
1054 		default:
1055 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' pinging "
1056 			    "client forced unreg, state (%x)" DS_EOL,
1057 			    PORTID(port), req->svc_id, svc->state);
1058 			(void) ds_svc_unregister(svc, port);
1059 			break;
1060 		}
1061 		(void) ds_svc_port_up(svc, port);
1062 		(void) ds_svc_register_onport(svc, port);
1063 		mutex_exit(&ds_svcs.lock);
1064 		return;
1065 	}
1066 
1067 	/*
1068 	 * Only remote service providers can initiate a registration.  The
1069 	 * local sevice from here must be a client service.
1070 	 */
1071 
1072 	match = negotiate_version(svc->cap.nvers, svc->cap.vers,
1073 	    req->major_vers, &new_major, &new_minor);
1074 
1075 	/*
1076 	 * Check version info. ACK only if the major numbers exactly
1077 	 * match. The service entity can retry with a new minor
1078 	 * based on the response sent as part of the NACK.
1079 	 */
1080 	if (match) {
1081 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_req: '%s' svc%d: state: %x "
1082 		    "svc_portid: %d" DS_EOL, PORTID(port), req->svc_id,
1083 		    (int)DS_HDL2IDX(svc->hdl), svc->state,
1084 		    (int)(svc->port == NULL ? -1 : PORTID(svc->port)));
1085 		/*
1086 		 * If the current local service is already in use and
1087 		 * it's not on this port, clone it.
1088 		 */
1089 		if (svc->state != DS_SVC_INACTIVE) {
1090 			if (svc->port != NULL && port == svc->port) {
1091 				/*
1092 				 * Someone probably dropped an unreg req
1093 				 * somewhere.  Force a local unreg.
1094 				 */
1095 				(void) ds_svc_unregister(svc, port);
1096 			} else if (!DS_HDL_ISCLIENT(svc->hdl)) {
1097 				/*
1098 				 * Can't clone a non-client (service provider)
1099 				 * handle.  This is because old in-kernel
1100 				 * service providers can't deal with multiple
1101 				 * handles.
1102 				 */
1103 				goto do_reg_nack;
1104 			} else {
1105 				svc = ds_svc_clone(svc);
1106 			}
1107 		}
1108 		svc->port = port;
1109 		svc->svc_hdl = req->svc_handle;
1110 		svc->state = DS_SVC_ACTIVE;
1111 
1112 		msglen = DS_MSG_LEN(ds_reg_ack_t);
1113 		msg = DS_MALLOC(msglen);
1114 
1115 		hdr = (ds_hdr_t *)msg;
1116 		hdr->msg_type = DS_REG_ACK;
1117 		hdr->payload_len = sizeof (ds_reg_ack_t);
1118 
1119 		ack = (ds_reg_ack_t *)(msg + DS_HDR_SZ);
1120 		ack->svc_handle = req->svc_handle;
1121 		ack->minor_vers = MIN(new_minor, req->minor_vers);
1122 
1123 
1124 		if (svc->ops.ds_reg_cb) {
1125 			/* Call the registration callback */
1126 			version.major = req->major_vers;
1127 			version.minor = ack->minor_vers;
1128 			(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &version,
1129 			    svc->hdl);
1130 		}
1131 		mutex_exit(&ds_svcs.lock);
1132 
1133 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_ack>: '%s' minor=0x%04X"
1134 		    DS_EOL, PORTID(port), svc->cap.svc_id,
1135 		    MIN(new_minor, req->minor_vers));
1136 	} else {
1137 		mutex_exit(&ds_svcs.lock);
1138 
1139 		msglen = DS_MSG_LEN(ds_reg_nack_t);
1140 		msg = DS_MALLOC(msglen);
1141 
1142 		hdr = (ds_hdr_t *)msg;
1143 		hdr->msg_type = DS_REG_NACK;
1144 		hdr->payload_len = sizeof (ds_reg_nack_t);
1145 
1146 		nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ);
1147 		nack->svc_handle = req->svc_handle;
1148 		nack->result = DS_REG_VER_NACK;
1149 		nack->major_vers = new_major;
1150 
1151 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_nack>: '%s' major=0x%04X"
1152 		    DS_EOL, PORTID(port), svc->cap.svc_id, new_major);
1153 	}
1154 
1155 	/* send message */
1156 	(void) ds_send_msg(port, msg, msglen);
1157 	DS_FREE(msg, msglen);
1158 }
1159 
1160 static void
1161 ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len)
1162 {
1163 	ds_reg_ack_t	*ack;
1164 	ds_ver_t	*ver;
1165 	ds_ver_t	tmpver;
1166 	ds_svc_t	*svc;
1167 	size_t		explen = DS_MSG_LEN(ds_reg_ack_t);
1168 
1169 	/* sanity check the incoming message */
1170 	if (len != explen) {
1171 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid message "
1172 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1173 		    explen);
1174 		return;
1175 	}
1176 
1177 	ack = (ds_reg_ack_t *)(buf + DS_HDR_SZ);
1178 
1179 	mutex_enter(&ds_svcs.lock);
1180 
1181 	/*
1182 	 * This searches for service based on how we generate handles
1183 	 * and so only works because this is a reg ack.
1184 	 */
1185 	if (DS_HDL_ISCLIENT(ack->svc_handle) ||
1186 	    (svc = ds_get_svc(ack->svc_handle)) == NULL) {
1187 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid handle 0x%llx"
1188 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1189 		goto done;
1190 	}
1191 
1192 	/* make sure the message makes sense */
1193 	if (svc->state != DS_SVC_REG_PENDING) {
1194 		cmn_err(CE_WARN, "ds@%lx: <reg_ack: invalid state (%d)" DS_EOL,
1195 		    PORTID(port), svc->state);
1196 		goto done;
1197 	}
1198 
1199 	ver = &(svc->cap.vers[svc->ver_idx]);
1200 
1201 	/* major version has been agreed upon */
1202 	svc->ver.major = ver->major;
1203 
1204 	if (ack->minor_vers >= ver->minor) {
1205 		/*
1206 		 * Use the minor version specified in the
1207 		 * original request.
1208 		 */
1209 		svc->ver.minor = ver->minor;
1210 	} else {
1211 		/*
1212 		 * Use the lower minor version returned in
1213 		 * the ack. By defninition, all lower minor
1214 		 * versions must be supported.
1215 		 */
1216 		svc->ver.minor = ack->minor_vers;
1217 	}
1218 
1219 	svc->state = DS_SVC_ACTIVE;
1220 	svc->port = port;
1221 
1222 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_ack: '%s' v%d.%d ready, hdl=0x%llx"
1223 	    DS_EOL, PORTID(port), svc->cap.svc_id, svc->ver.major,
1224 	    svc->ver.minor, (u_longlong_t)svc->hdl);
1225 
1226 	/* notify the client that registration is complete */
1227 	if (svc->ops.ds_reg_cb) {
1228 		/*
1229 		 * Use a temporary version structure so that
1230 		 * the copy in the svc structure cannot be
1231 		 * modified by the client.
1232 		 */
1233 		tmpver.major = svc->ver.major;
1234 		tmpver.minor = svc->ver.minor;
1235 
1236 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &tmpver, svc->hdl);
1237 	}
1238 
1239 done:
1240 	mutex_exit(&ds_svcs.lock);
1241 }
1242 
1243 static boolean_t
1244 ds_port_is_ready(ds_port_t *port)
1245 {
1246 	boolean_t is_ready;
1247 
1248 	mutex_enter(&port->lock);
1249 	is_ready = (port->ldc.state == LDC_UP) &&
1250 	    (port->state == DS_PORT_READY);
1251 	mutex_exit(&port->lock);
1252 	return (is_ready);
1253 }
1254 
1255 static void
1256 ds_try_next_port(ds_svc_t *svc, int portid)
1257 {
1258 	ds_port_t *port;
1259 	ds_portset_t totry;
1260 	int i;
1261 
1262 	DS_DBG_LDC(CE_NOTE, "ds@%x %s" DS_EOL, portid, __func__);
1263 
1264 	/*
1265 	 * Get the ports that haven't been tried yet and are available to try.
1266 	 */
1267 	DS_PORTSET_DUP(totry, svc->avail);
1268 	for (i = 0; i < DS_MAX_PORTS; i++) {
1269 		if (DS_PORT_IN_SET(svc->tried, i))
1270 			DS_PORTSET_DEL(totry, i);
1271 	}
1272 
1273 	if (DS_PORTSET_ISNULL(totry))
1274 		return;
1275 
1276 	for (i = 0; i < DS_MAX_PORTS; i++, portid++) {
1277 		if (portid >= DS_MAX_PORTS) {
1278 			portid = 0;
1279 		}
1280 
1281 		/*
1282 		 * If the port is not in the available list,
1283 		 * it is not a candidate for registration.
1284 		 */
1285 		if (!DS_PORT_IN_SET(totry, portid)) {
1286 			continue;
1287 		}
1288 
1289 		port = &ds_ports[portid];
1290 
1291 		if (!ds_port_is_ready(port))
1292 			continue;
1293 
1294 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s trying ldc.id: %d" DS_EOL,
1295 		    portid, __func__, (uint_t)(port->ldc.id));
1296 
1297 		DS_PORTSET_ADD(svc->tried, portid);
1298 
1299 		if (ds_send_reg_req(svc, port) == 0) {
1300 			DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send OK" DS_EOL,
1301 			    portid, __func__);
1302 			/* register sent successfully */
1303 			break;
1304 		}
1305 		DS_DBG_LDC(CE_NOTE, "ds@%x: %s reg msg send FAIL" DS_EOL,
1306 		    portid, __func__);
1307 
1308 		/* reset the service to try the next port */
1309 		ds_reset_svc(svc, port);
1310 	}
1311 }
1312 
1313 static void
1314 ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len)
1315 {
1316 	ds_reg_nack_t	*nack;
1317 	ds_svc_t	*svc;
1318 	int		idx;
1319 	size_t		explen = DS_MSG_LEN(ds_reg_nack_t);
1320 
1321 	/* sanity check the incoming message */
1322 	if (len != explen) {
1323 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid message "
1324 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1325 		    explen);
1326 		return;
1327 	}
1328 
1329 	nack = (ds_reg_nack_t *)(buf + DS_HDR_SZ);
1330 
1331 	mutex_enter(&ds_svcs.lock);
1332 
1333 	/*
1334 	 * We expect a reg_nack for a client ping.
1335 	 */
1336 	if (DS_HDL_ISCLIENT(nack->svc_handle)) {
1337 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: ping hdl: 0x%llx"
1338 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1339 		goto done;
1340 	}
1341 
1342 	/*
1343 	 * This searches for service based on how we generate handles
1344 	 * and so only works because this is a reg nack.
1345 	 */
1346 	if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1347 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: invalid handle 0x%llx"
1348 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1349 		goto done;
1350 	}
1351 
1352 	/* make sure the message makes sense */
1353 	if (svc->state != DS_SVC_REG_PENDING) {
1354 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' handle: 0x%llx "
1355 		    "invalid state (%d)" DS_EOL, PORTID(port), svc->cap.svc_id,
1356 		    (u_longlong_t)nack->svc_handle, svc->state);
1357 		goto done;
1358 	}
1359 
1360 	if (nack->result == DS_REG_DUP) {
1361 		cmn_err(CE_WARN, "ds@%lx: <reg_nack: duplicate registration "
1362 		    " for %s" DS_EOL, PORTID(port), svc->cap.svc_id);
1363 		ds_reset_svc(svc, port);
1364 		goto done;
1365 	}
1366 
1367 	/*
1368 	 * A major version of zero indicates that the
1369 	 * service is not supported at all.
1370 	 */
1371 	if (nack->major_vers == 0) {
1372 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' not supported"
1373 		    DS_EOL, PORTID(port), svc->cap.svc_id);
1374 		ds_reset_svc(svc, port);
1375 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1376 			ds_try_next_port(svc, PORTID(port) + 1);
1377 		goto done;
1378 	}
1379 
1380 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: '%s' hdl=0x%llx, nack=%d.x"
1381 	    DS_EOL, PORTID(port), svc->cap.svc_id,
1382 	    (u_longlong_t)nack->svc_handle, nack->major_vers);
1383 
1384 	/*
1385 	 * Walk the version list for the service, looking for
1386 	 * a major version that is as close to the requested
1387 	 * major version as possible.
1388 	 */
1389 	for (idx = svc->ver_idx; idx < svc->cap.nvers; idx++) {
1390 		if (svc->cap.vers[idx].major <= nack->major_vers) {
1391 			/* found a version to try */
1392 			break;
1393 		}
1394 	}
1395 
1396 	if (idx == svc->cap.nvers) {
1397 		/* no supported version */
1398 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <reg_nack: %s v%d.x not supported"
1399 		    DS_EOL, PORTID(port), svc->cap.svc_id, nack->major_vers);
1400 		ds_reset_svc(svc, port);
1401 		if ((svc->flags & DSSF_ISCLIENT) == 0)
1402 			ds_try_next_port(svc, PORTID(port) + 1);
1403 		goto done;
1404 	}
1405 
1406 	/* start the handshake again */
1407 	svc->state = DS_SVC_INACTIVE;
1408 	svc->ver_idx = idx;
1409 
1410 	(void) ds_svc_register(svc, NULL);
1411 
1412 done:
1413 	mutex_exit(&ds_svcs.lock);
1414 }
1415 
1416 static void
1417 ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len)
1418 {
1419 	ds_hdr_t	*hdr;
1420 	ds_unreg_req_t	*req;
1421 	ds_unreg_ack_t	*ack;
1422 	ds_svc_t	*svc;
1423 	char		*msg;
1424 	size_t		msglen;
1425 	size_t		explen = DS_MSG_LEN(ds_unreg_req_t);
1426 	boolean_t	is_up;
1427 
1428 	/* sanity check the incoming message */
1429 	if (len != explen) {
1430 		cmn_err(CE_WARN, "ds@%lx: <unreg_req: invalid message "
1431 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1432 		    explen);
1433 		return;
1434 	}
1435 
1436 	req = (ds_unreg_req_t *)(buf + DS_HDR_SZ);
1437 
1438 	mutex_enter(&ds_svcs.lock);
1439 
1440 	/* lookup appropriate client or service */
1441 	if (DS_HDL_ISCLIENT(req->svc_handle) ||
1442 	    ((svc = ds_find_clnt_svc_by_hdl_port(req->svc_handle, port))
1443 	    == NULL && ((svc = ds_get_svc(req->svc_handle)) == NULL ||
1444 	    svc->port != port))) {
1445 		mutex_exit(&ds_svcs.lock);
1446 		mutex_enter(&port->lock);
1447 		is_up = (port->ldc.state == LDC_UP);
1448 		mutex_exit(&port->lock);
1449 		if (!is_up)
1450 			return;
1451 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_req: invalid handle 0x%llx"
1452 		    DS_EOL, PORTID(port), (u_longlong_t)req->svc_handle);
1453 		ds_send_unreg_nack(port, req->svc_handle);
1454 		return;
1455 	}
1456 
1457 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_req: '%s' handle 0x%llx" DS_EOL,
1458 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1459 
1460 	(void) ds_svc_unregister(svc, svc->port);
1461 
1462 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_ack>: '%s' hdl=0x%llx" DS_EOL,
1463 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)req->svc_handle);
1464 
1465 	ds_check_for_dup_services(svc);
1466 
1467 	mutex_exit(&ds_svcs.lock);
1468 
1469 	msglen = DS_HDR_SZ + sizeof (ds_unreg_ack_t);
1470 	msg = DS_MALLOC(msglen);
1471 
1472 	hdr = (ds_hdr_t *)msg;
1473 	hdr->msg_type = DS_UNREG_ACK;
1474 	hdr->payload_len = sizeof (ds_unreg_ack_t);
1475 
1476 	ack = (ds_unreg_ack_t *)(msg + DS_HDR_SZ);
1477 	ack->svc_handle = req->svc_handle;
1478 
1479 	/* send message */
1480 	(void) ds_send_msg(port, msg, msglen);
1481 	DS_FREE(msg, msglen);
1482 
1483 }
1484 
1485 static void
1486 ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len)
1487 {
1488 	ds_unreg_ack_t	*ack;
1489 	size_t		explen = DS_MSG_LEN(ds_unreg_ack_t);
1490 
1491 	/* sanity check the incoming message */
1492 	if (len != explen) {
1493 		cmn_err(CE_WARN, "ds@%lx: <unreg_ack: invalid message "
1494 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1495 		    explen);
1496 		return;
1497 	}
1498 
1499 	ack = (ds_unreg_ack_t *)(buf + DS_HDR_SZ);
1500 
1501 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: hdl=0x%llx" DS_EOL,
1502 	    PORTID(port), (u_longlong_t)ack->svc_handle);
1503 
1504 #ifdef DEBUG
1505 	mutex_enter(&ds_svcs.lock);
1506 
1507 	/*
1508 	 * Since the unregister request was initiated locally,
1509 	 * the service structure has already been torn down.
1510 	 * Just perform a sanity check to make sure the message
1511 	 * is appropriate.
1512 	 */
1513 	if (ds_get_svc(ack->svc_handle) != NULL) {
1514 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_ack: handle 0x%llx in use"
1515 		    DS_EOL, PORTID(port), (u_longlong_t)ack->svc_handle);
1516 	}
1517 
1518 	mutex_exit(&ds_svcs.lock);
1519 #endif	/* DEBUG */
1520 }
1521 
1522 static void
1523 ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len)
1524 {
1525 	ds_unreg_nack_t	*nack;
1526 	size_t		explen = DS_MSG_LEN(ds_unreg_nack_t);
1527 
1528 	/* sanity check the incoming message */
1529 	if (len != explen) {
1530 		cmn_err(CE_WARN, "ds@%lx: <unreg_nack: invalid message "
1531 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1532 		    explen);
1533 		return;
1534 	}
1535 
1536 	nack = (ds_unreg_nack_t *)(buf + DS_HDR_SZ);
1537 
1538 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: hdl=0x%llx" DS_EOL,
1539 	    PORTID(port), (u_longlong_t)nack->svc_handle);
1540 
1541 #ifdef DEBUG
1542 	mutex_enter(&ds_svcs.lock);
1543 
1544 	/*
1545 	 * Since the unregister request was initiated locally,
1546 	 * the service structure has already been torn down.
1547 	 * Just perform a sanity check to make sure the message
1548 	 * is appropriate.
1549 	 */
1550 	if (ds_get_svc(nack->svc_handle) != NULL) {
1551 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: <unreg_nack: handle 0x%llx in use"
1552 		    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle);
1553 	}
1554 
1555 	mutex_exit(&ds_svcs.lock);
1556 #endif	/* DEBUG */
1557 }
1558 
1559 static void
1560 ds_handle_data(ds_port_t *port, caddr_t buf, size_t len)
1561 {
1562 	ds_data_handle_t	*data;
1563 	ds_svc_t		*svc;
1564 	char			*msg;
1565 	int			msgsz;
1566 	int			hdrsz;
1567 	size_t			explen = DS_MSG_LEN(ds_data_handle_t);
1568 
1569 	/* sanity check the incoming message */
1570 	if (len < explen) {
1571 		cmn_err(CE_WARN, "ds@%lx: <data: invalid message length "
1572 		    "(%ld), expected at least %ld" DS_EOL, PORTID(port), len,
1573 		    explen);
1574 		return;
1575 	}
1576 
1577 	data = (ds_data_handle_t *)(buf + DS_HDR_SZ);
1578 
1579 	hdrsz = DS_HDR_SZ + sizeof (ds_data_handle_t);
1580 	msgsz = len - hdrsz;
1581 
1582 	/* strip off the header for the client */
1583 	msg = (msgsz) ? (buf + hdrsz) : NULL;
1584 
1585 	mutex_enter(&ds_svcs.lock);
1586 
1587 	if ((svc = ds_find_clnt_svc_by_hdl_port(data->svc_handle, port))
1588 	    == NULL) {
1589 		if ((svc = ds_get_svc(data->svc_handle)) == NULL) {
1590 			mutex_exit(&ds_svcs.lock);
1591 			cmn_err(CE_WARN, "ds@%lx: <data: invalid handle 0x%llx"
1592 			    DS_EOL, PORTID(port),
1593 			    (u_longlong_t)data->svc_handle);
1594 			ds_send_data_nack(port, data->svc_handle);
1595 			return;
1596 		}
1597 	}
1598 
1599 	mutex_exit(&ds_svcs.lock);
1600 
1601 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: <data: '%s' hdl=0x%llx" DS_EOL,
1602 	    PORTID(port), svc->cap.svc_id, (u_longlong_t)svc->hdl);
1603 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msgsz);
1604 
1605 	/* dispatch this message to the client */
1606 	(*svc->ops.ds_data_cb)(svc->ops.cb_arg, msg, msgsz);
1607 }
1608 
1609 static void
1610 ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len)
1611 {
1612 	ds_svc_t	*svc;
1613 	ds_data_nack_t	*nack;
1614 	size_t		explen = DS_MSG_LEN(ds_data_nack_t);
1615 
1616 	/* sanity check the incoming message */
1617 	if (len != explen) {
1618 		cmn_err(CE_WARN, "ds@%lx: <data_nack: invalid message "
1619 		    "length (%ld), expected %ld" DS_EOL, PORTID(port), len,
1620 		    explen);
1621 		return;
1622 	}
1623 
1624 	nack = (ds_data_nack_t *)(buf + DS_HDR_SZ);
1625 
1626 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack: hdl=0x%llx, result=0x%llx"
1627 	    DS_EOL, PORTID(port), (u_longlong_t)nack->svc_handle,
1628 	    (u_longlong_t)nack->result);
1629 
1630 	if (nack->result == DS_INV_HDL) {
1631 
1632 		mutex_enter(&ds_svcs.lock);
1633 
1634 		if ((svc = ds_find_clnt_svc_by_hdl_port(nack->svc_handle,
1635 		    port)) == NULL) {
1636 			if ((svc = ds_get_svc(nack->svc_handle)) == NULL) {
1637 				mutex_exit(&ds_svcs.lock);
1638 				return;
1639 			}
1640 		}
1641 
1642 		cmn_err(CE_WARN, "ds@%lx: <data_nack: handle 0x%llx reported "
1643 		    " as invalid" DS_EOL, PORTID(port),
1644 		    (u_longlong_t)nack->svc_handle);
1645 
1646 		(void) ds_svc_unregister(svc, svc->port);
1647 
1648 		mutex_exit(&ds_svcs.lock);
1649 	}
1650 }
1651 
1652 /* Initialize the port */
1653 void
1654 ds_send_init_req(ds_port_t *port)
1655 {
1656 	ds_hdr_t	*hdr;
1657 	ds_init_req_t	*init_req;
1658 	size_t		msglen;
1659 	ds_ver_t	*vers = &ds_vers[port->ver_idx];
1660 
1661 	mutex_enter(&port->lock);
1662 	if (port->state != DS_PORT_LDC_INIT) {
1663 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: invalid state: %d"
1664 		    DS_EOL, PORTID(port), port->state);
1665 		mutex_exit(&port->lock);
1666 		return;
1667 	}
1668 	mutex_exit(&port->lock);
1669 
1670 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: init_req>: req=v%d.%d" DS_EOL,
1671 	    PORTID(port), vers->major, vers->minor);
1672 
1673 	msglen = DS_HDR_SZ + sizeof (ds_init_req_t);
1674 	hdr = DS_MALLOC(msglen);
1675 
1676 	hdr->msg_type = DS_INIT_REQ;
1677 	hdr->payload_len = sizeof (ds_init_req_t);
1678 
1679 	init_req = (ds_init_req_t *)((caddr_t)hdr + DS_HDR_SZ);
1680 	init_req->major_vers = vers->major;
1681 	init_req->minor_vers = vers->minor;
1682 
1683 	if (ds_send_msg(port, (caddr_t)hdr, msglen) == 0) {
1684 		/*
1685 		 * We've left the port state unlocked over the malloc/send,
1686 		 * make sure no one has changed the state under us before
1687 		 * we update the state.
1688 		 */
1689 		mutex_enter(&port->lock);
1690 		if (port->state == DS_PORT_LDC_INIT)
1691 			port->state = DS_PORT_INIT_REQ;
1692 		mutex_exit(&port->lock);
1693 	}
1694 	DS_FREE(hdr, msglen);
1695 }
1696 
1697 static int
1698 ds_send_reg_req(ds_svc_t *svc, ds_port_t *port)
1699 {
1700 	ds_ver_t	*ver;
1701 	ds_hdr_t	*hdr;
1702 	caddr_t		msg;
1703 	size_t		msglen;
1704 	ds_reg_req_t	*req;
1705 	size_t		idlen;
1706 	int		rv;
1707 
1708 	if ((svc->state != DS_SVC_INACTIVE) &&
1709 	    ((svc->flags & DSSF_ISCLIENT) == 0)) {
1710 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: invalid svc state (%d) "
1711 		    "for svc '%s'" DS_EOL, PORTID(port), svc->state,
1712 		    svc->cap.svc_id);
1713 		return (-1);
1714 	}
1715 
1716 	mutex_enter(&port->lock);
1717 
1718 	/* check on the LDC to Zeus */
1719 	if (port->ldc.state != LDC_UP) {
1720 		/* can not send message */
1721 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: channel %ld is not up"
1722 		    DS_EOL, PORTID(port), port->ldc.id);
1723 		mutex_exit(&port->lock);
1724 		return (-1);
1725 	}
1726 
1727 	/* make sure port is ready */
1728 	if (port->state != DS_PORT_READY) {
1729 		/* can not send message */
1730 		DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: port is not ready"
1731 		    DS_EOL, PORTID(port));
1732 		mutex_exit(&port->lock);
1733 		return (-1);
1734 	}
1735 
1736 	mutex_exit(&port->lock);
1737 
1738 	/* allocate the message buffer */
1739 	idlen = strlen(svc->cap.svc_id);
1740 	msglen = DS_HDR_SZ + sizeof (ds_reg_req_t) + idlen;
1741 	msg = DS_MALLOC(msglen);
1742 
1743 	/* copy in the header data */
1744 	hdr = (ds_hdr_t *)msg;
1745 	hdr->msg_type = DS_REG_REQ;
1746 	hdr->payload_len = sizeof (ds_reg_req_t) + idlen;
1747 
1748 	req = (ds_reg_req_t *)(msg + DS_HDR_SZ);
1749 	req->svc_handle = svc->hdl;
1750 	ver = &(svc->cap.vers[svc->ver_idx]);
1751 	req->major_vers = ver->major;
1752 	req->minor_vers = ver->minor;
1753 
1754 	/* copy in the service id */
1755 	(void) memcpy(req->svc_id, svc->cap.svc_id, idlen + 1);
1756 
1757 	/* send the message */
1758 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: reg_req>: '%s' ver=%d.%d, hdl=0x%llx"
1759 	    DS_EOL, PORTID(port), svc->cap.svc_id, ver->major, ver->minor,
1760 	    (u_longlong_t)svc->hdl);
1761 
1762 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1763 		svc->port = port;
1764 		rv = -1;
1765 	} else if ((svc->flags & DSSF_ISCLIENT) == 0) {
1766 		svc->state = DS_SVC_REG_PENDING;
1767 	}
1768 	DS_FREE(msg, msglen);
1769 
1770 	return (rv);
1771 }
1772 
1773 /*
1774  * Keep around in case we want this later
1775  */
1776 int
1777 ds_send_unreg_req(ds_svc_t *svc)
1778 {
1779 	caddr_t		msg;
1780 	size_t		msglen;
1781 	ds_hdr_t	*hdr;
1782 	ds_unreg_req_t	*req;
1783 	ds_port_t	*port = svc->port;
1784 	int		rv;
1785 
1786 	if (port == NULL) {
1787 		DS_DBG(CE_NOTE, "send_unreg_req: service '%s' not "
1788 		    "associated with a port" DS_EOL, svc->cap.svc_id);
1789 		return (-1);
1790 	}
1791 
1792 	mutex_enter(&port->lock);
1793 
1794 	/* check on the LDC to Zeus */
1795 	if (port->ldc.state != LDC_UP) {
1796 		/* can not send message */
1797 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: channel %ld is not up"
1798 		    DS_EOL, PORTID(port), port->ldc.id);
1799 		mutex_exit(&port->lock);
1800 		return (-1);
1801 	}
1802 
1803 	/* make sure port is ready */
1804 	if (port->state != DS_PORT_READY) {
1805 		/* can not send message */
1806 		cmn_err(CE_WARN, "ds@%lx: unreg_req>: port is not ready" DS_EOL,
1807 		    PORTID(port));
1808 		mutex_exit(&port->lock);
1809 		return (-1);
1810 	}
1811 
1812 	mutex_exit(&port->lock);
1813 
1814 	msglen = DS_HDR_SZ + sizeof (ds_unreg_req_t);
1815 	msg = DS_MALLOC(msglen);
1816 
1817 	/* copy in the header data */
1818 	hdr = (ds_hdr_t *)msg;
1819 	hdr->msg_type = DS_UNREG;
1820 	hdr->payload_len = sizeof (ds_unreg_req_t);
1821 
1822 	req = (ds_unreg_req_t *)(msg + DS_HDR_SZ);
1823 	if (svc->flags & DSSF_ISCLIENT) {
1824 		req->svc_handle = svc->svc_hdl;
1825 	} else {
1826 		req->svc_handle = svc->hdl;
1827 	}
1828 
1829 	/* send the message */
1830 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_req>: '%s' hdl=0x%llx" DS_EOL,
1831 	    PORTID(port), (svc->cap.svc_id) ? svc->cap.svc_id : "NULL",
1832 	    (u_longlong_t)svc->hdl);
1833 
1834 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
1835 		rv = -1;
1836 	}
1837 	DS_FREE(msg, msglen);
1838 
1839 	return (rv);
1840 }
1841 
1842 static void
1843 ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1844 {
1845 	caddr_t		msg;
1846 	size_t		msglen;
1847 	ds_hdr_t	*hdr;
1848 	ds_unreg_nack_t	*nack;
1849 
1850 	mutex_enter(&port->lock);
1851 
1852 	/* check on the LDC to Zeus */
1853 	if (port->ldc.state != LDC_UP) {
1854 		/* can not send message */
1855 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: channel %ld is not up"
1856 		    DS_EOL, PORTID(port), port->ldc.id);
1857 		mutex_exit(&port->lock);
1858 		return;
1859 	}
1860 
1861 	/* make sure port is ready */
1862 	if (port->state != DS_PORT_READY) {
1863 		/* can not send message */
1864 		cmn_err(CE_WARN, "ds@%lx: unreg_nack>: port is not ready"
1865 		    DS_EOL, PORTID(port));
1866 		mutex_exit(&port->lock);
1867 		return;
1868 	}
1869 
1870 	mutex_exit(&port->lock);
1871 
1872 	msglen = DS_HDR_SZ + sizeof (ds_unreg_nack_t);
1873 	msg = DS_MALLOC(msglen);
1874 
1875 	/* copy in the header data */
1876 	hdr = (ds_hdr_t *)msg;
1877 	hdr->msg_type = DS_UNREG_NACK;
1878 	hdr->payload_len = sizeof (ds_unreg_nack_t);
1879 
1880 	nack = (ds_unreg_nack_t *)(msg + DS_HDR_SZ);
1881 	nack->svc_handle = bad_hdl;
1882 
1883 	/* send the message */
1884 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: unreg_nack>: hdl=0x%llx" DS_EOL,
1885 	    PORTID(port), (u_longlong_t)bad_hdl);
1886 
1887 	(void) ds_send_msg(port, msg, msglen);
1888 	DS_FREE(msg, msglen);
1889 }
1890 
1891 static void
1892 ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl)
1893 {
1894 	caddr_t		msg;
1895 	size_t		msglen;
1896 	ds_hdr_t	*hdr;
1897 	ds_data_nack_t	*nack;
1898 
1899 	mutex_enter(&port->lock);
1900 
1901 	/* check on the LDC to Zeus */
1902 	if (port->ldc.state != LDC_UP) {
1903 		/* can not send message */
1904 		cmn_err(CE_WARN, "ds@%lx: data_nack>: channel %ld is not up"
1905 		    DS_EOL, PORTID(port), port->ldc.id);
1906 		mutex_exit(&port->lock);
1907 		return;
1908 	}
1909 
1910 	/* make sure port is ready */
1911 	if (port->state != DS_PORT_READY) {
1912 		/* can not send message */
1913 		cmn_err(CE_WARN, "ds@%lx: data_nack>: port is not ready" DS_EOL,
1914 		    PORTID(port));
1915 		mutex_exit(&port->lock);
1916 		return;
1917 	}
1918 
1919 	mutex_exit(&port->lock);
1920 
1921 	msglen = DS_HDR_SZ + sizeof (ds_data_nack_t);
1922 	msg = DS_MALLOC(msglen);
1923 
1924 	/* copy in the header data */
1925 	hdr = (ds_hdr_t *)msg;
1926 	hdr->msg_type = DS_NACK;
1927 	hdr->payload_len = sizeof (ds_data_nack_t);
1928 
1929 	nack = (ds_data_nack_t *)(msg + DS_HDR_SZ);
1930 	nack->svc_handle = bad_hdl;
1931 	nack->result = DS_INV_HDL;
1932 
1933 	/* send the message */
1934 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data_nack>: hdl=0x%llx" DS_EOL,
1935 	    PORTID(port), (u_longlong_t)bad_hdl);
1936 
1937 	(void) ds_send_msg(port, msg, msglen);
1938 	DS_FREE(msg, msglen);
1939 }
1940 
1941 /* END DS PROTOCOL SUPPORT FUNCTIONS */
1942 
1943 #ifdef DEBUG
1944 
1945 #define	BYTESPERLINE	8
1946 #define	LINEWIDTH	((BYTESPERLINE * 3) + (BYTESPERLINE + 2) + 1)
1947 #define	ASCIIOFFSET	((BYTESPERLINE * 3) + 2)
1948 #define	ISPRINT(c)	((c >= ' ') && (c <= '~'))
1949 
1950 /*
1951  * Output a buffer formatted with a set number of bytes on
1952  * each line. Append each line with the ASCII equivalent of
1953  * each byte if it falls within the printable ASCII range,
1954  * and '.' otherwise.
1955  */
1956 void
1957 ds_dump_msg(void *vbuf, size_t len)
1958 {
1959 	int	i, j;
1960 	char	*curr;
1961 	char	*aoff;
1962 	char	line[LINEWIDTH];
1963 	uint8_t	*buf = vbuf;
1964 
1965 	if (len > 128)
1966 		len = 128;
1967 
1968 	/* walk the buffer one line at a time */
1969 	for (i = 0; i < len; i += BYTESPERLINE) {
1970 
1971 		bzero(line, LINEWIDTH);
1972 
1973 		curr = line;
1974 		aoff = line + ASCIIOFFSET;
1975 
1976 		/*
1977 		 * Walk the bytes in the current line, storing
1978 		 * the hex value for the byte as well as the
1979 		 * ASCII representation in a temporary buffer.
1980 		 * All ASCII values are placed at the end of
1981 		 * the line.
1982 		 */
1983 		for (j = 0; (j < BYTESPERLINE) && ((i + j) < len); j++) {
1984 			(void) sprintf(curr, " %02x", buf[i + j]);
1985 			*aoff = (ISPRINT(buf[i + j])) ? buf[i + j] : '.';
1986 			curr += 3;
1987 			aoff++;
1988 		}
1989 
1990 		/*
1991 		 * Fill in to the start of the ASCII translation
1992 		 * with spaces. This will only be necessary if
1993 		 * this is the last line and there are not enough
1994 		 * bytes to fill the whole line.
1995 		 */
1996 		while (curr != (line + ASCIIOFFSET))
1997 			*curr++ = ' ';
1998 
1999 		cmn_err(CE_NOTE, "%s" DS_EOL, line);
2000 	}
2001 }
2002 #endif /* DEBUG */
2003 
2004 
2005 /*
2006  * Walk the table of registered services, executing the specified callback
2007  * function for each service on a port. A non-zero return value from the
2008  * callback is used to terminate the walk, not to indicate an error. Returns
2009  * the index of the last service visited.
2010  */
2011 int
2012 ds_walk_svcs(svc_cb_t svc_cb, void *arg)
2013 {
2014 	int		idx;
2015 	ds_svc_t	*svc;
2016 
2017 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2018 
2019 	/* walk every table entry */
2020 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
2021 		svc = ds_svcs.tbl[idx];
2022 
2023 		/* execute the callback */
2024 		if ((*svc_cb)(svc, arg) != 0)
2025 			break;
2026 	}
2027 
2028 	return (idx);
2029 }
2030 
2031 static int
2032 ds_svc_isfree(ds_svc_t *svc, void *arg)
2033 {
2034 	_NOTE(ARGUNUSED(arg))
2035 
2036 	/*
2037 	 * Looking for a free service. This may be a NULL entry
2038 	 * in the table, or an unused structure that could be
2039 	 * reused.
2040 	 */
2041 
2042 	if (DS_SVC_ISFREE(svc)) {
2043 		/* yes, it is free */
2044 		return (1);
2045 	}
2046 
2047 	/* not a candidate */
2048 	return (0);
2049 }
2050 
2051 int
2052 ds_svc_ismatch(ds_svc_t *svc, void *arg)
2053 {
2054 	if (DS_SVC_ISFREE(svc)) {
2055 		return (0);
2056 	}
2057 
2058 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2059 	    (svc->flags & DSSF_ISCLIENT) == 0) {
2060 		/* found a match */
2061 		return (1);
2062 	}
2063 
2064 	return (0);
2065 }
2066 
2067 int
2068 ds_svc_clnt_ismatch(ds_svc_t *svc, void *arg)
2069 {
2070 	if (DS_SVC_ISFREE(svc)) {
2071 		return (0);
2072 	}
2073 
2074 	if (strcmp(svc->cap.svc_id, arg) == 0 &&
2075 	    (svc->flags & DSSF_ISCLIENT) != 0) {
2076 		/* found a match */
2077 		return (1);
2078 	}
2079 
2080 	return (0);
2081 }
2082 
2083 int
2084 ds_svc_free(ds_svc_t *svc, void *arg)
2085 {
2086 	_NOTE(ARGUNUSED(arg))
2087 
2088 	if (svc == NULL) {
2089 		return (0);
2090 	}
2091 
2092 	if (svc->cap.svc_id) {
2093 		DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
2094 		svc->cap.svc_id = NULL;
2095 	}
2096 
2097 	if (svc->cap.vers) {
2098 		DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
2099 		svc->cap.vers = NULL;
2100 	}
2101 
2102 	DS_FREE(svc, sizeof (ds_svc_t));
2103 
2104 	return (0);
2105 }
2106 
2107 static void
2108 ds_set_svc_port_tried(char *svc_id, ds_port_t *port)
2109 {
2110 	int		idx;
2111 	ds_svc_t	*svc;
2112 
2113 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2114 
2115 	/* walk every table entry */
2116 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
2117 		svc = ds_svcs.tbl[idx];
2118 		if (!DS_SVC_ISFREE(svc) && (svc->flags & DSSF_ISCLIENT) != 0 &&
2119 		    strcmp(svc_id, svc->cap.svc_id) == 0)
2120 			DS_PORTSET_ADD(svc->tried, PORTID(port));
2121 	}
2122 }
2123 
2124 static int
2125 ds_svc_register_onport(ds_svc_t *svc, ds_port_t *port)
2126 {
2127 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2128 
2129 	if (DS_SVC_ISFREE(svc))
2130 		return (0);
2131 
2132 	if (!DS_PORT_IN_SET(svc->avail, PORTID(port)))
2133 		return (0);
2134 
2135 	if (DS_PORT_IN_SET(svc->tried, PORTID(port)))
2136 		return (0);
2137 
2138 	if (!ds_port_is_ready(port))
2139 		return (0);
2140 
2141 	if ((svc->flags & DSSF_ISCLIENT) == 0) {
2142 		if (svc->state != DS_SVC_INACTIVE)
2143 			return (0);
2144 		DS_PORTSET_ADD(svc->tried, PORTID(port));
2145 	} else {
2146 		ds_set_svc_port_tried(svc->cap.svc_id, port);
2147 
2148 		/*
2149 		 * Never send a client reg req to the SP.
2150 		 */
2151 		if (PORTID(port) == ds_sp_port_id) {
2152 			return (0);
2153 		}
2154 	}
2155 
2156 	if (ds_send_reg_req(svc, port) == 0) {
2157 		/* register sent successfully */
2158 		return (1);
2159 	}
2160 
2161 	if ((svc->flags & DSSF_ISCLIENT) == 0) {
2162 		/* reset the service */
2163 		ds_reset_svc(svc, port);
2164 	}
2165 	return (0);
2166 }
2167 
2168 static int
2169 ds_svc_register_onport_walker(ds_svc_t *svc, void *arg)
2170 {
2171 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2172 
2173 	if (DS_SVC_ISFREE(svc))
2174 		return (0);
2175 
2176 	(void) ds_svc_register_onport(svc, arg);
2177 	return (0);
2178 }
2179 
2180 int
2181 ds_svc_register(ds_svc_t *svc, void *arg)
2182 {
2183 	_NOTE(ARGUNUSED(arg))
2184 	ds_portset_t ports;
2185 	ds_port_t *port;
2186 	int	idx;
2187 
2188 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2189 
2190 	if (DS_SVC_ISFREE(svc))
2191 		return (0);
2192 
2193 	DS_PORTSET_DUP(ports, svc->avail);
2194 	if (svc->flags & DSSF_ISCLIENT) {
2195 		for (idx = 0; idx < DS_MAX_PORTS; idx++) {
2196 			if (DS_PORT_IN_SET(svc->tried, idx))
2197 				DS_PORTSET_DEL(ports, idx);
2198 		}
2199 	} else if (svc->state != DS_SVC_INACTIVE)
2200 		return (0);
2201 
2202 	if (DS_PORTSET_ISNULL(ports))
2203 		return (0);
2204 
2205 	/*
2206 	 * Attempt to register the service. Start with the lowest
2207 	 * numbered port and continue until a registration message
2208 	 * is sent successfully, or there are no ports left to try.
2209 	 */
2210 	for (idx = 0; idx < DS_MAX_PORTS; idx++) {
2211 
2212 		/*
2213 		 * If the port is not in the available list,
2214 		 * it is not a candidate for registration.
2215 		 */
2216 		if (!DS_PORT_IN_SET(ports, idx)) {
2217 			continue;
2218 		}
2219 
2220 		port = &ds_ports[idx];
2221 		if (ds_svc_register_onport(svc, port)) {
2222 			if ((svc->flags & DSSF_ISCLIENT) == 0)
2223 				break;
2224 		}
2225 	}
2226 
2227 	return (0);
2228 }
2229 
2230 static int
2231 ds_svc_unregister(ds_svc_t *svc, void *arg)
2232 {
2233 	ds_port_t *port = (ds_port_t *)arg;
2234 	ds_svc_hdl_t hdl;
2235 
2236 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2237 
2238 	if (DS_SVC_ISFREE(svc)) {
2239 		return (0);
2240 	}
2241 
2242 	/* make sure the service is using this port */
2243 	if (svc->port != port) {
2244 		return (0);
2245 	}
2246 
2247 	if (port) {
2248 		DS_DBG(CE_NOTE, "ds@%lx: svc_unreg: id='%s', ver=%d.%d, "
2249 		    " hdl=0x%09lx" DS_EOL, PORTID(port), svc->cap.svc_id,
2250 		    svc->ver.major, svc->ver.minor, svc->hdl);
2251 	} else {
2252 		DS_DBG(CE_NOTE, "port=NULL: svc_unreg: id='%s', ver=%d.%d, "
2253 		    " hdl=0x%09lx" DS_EOL, svc->cap.svc_id, svc->ver.major,
2254 		    svc->ver.minor, svc->hdl);
2255 	}
2256 
2257 	/* reset the service structure */
2258 	ds_reset_svc(svc, port);
2259 
2260 	/* call the client unregister callback */
2261 	if (svc->ops.ds_unreg_cb) {
2262 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
2263 	}
2264 
2265 	/* increment the count in the handle to prevent reuse */
2266 	hdl = DS_ALLOC_HDL(DS_HDL2IDX(svc->hdl), DS_HDL2COUNT(svc->hdl));
2267 	if (DS_HDL_ISCLIENT(svc->hdl)) {
2268 		DS_HDL_SET_ISCLIENT(hdl);
2269 	}
2270 	svc->hdl = hdl;
2271 
2272 	if (svc->state != DS_SVC_UNREG_PENDING) {
2273 		/* try to initiate a new registration */
2274 		(void) ds_svc_register(svc, NULL);
2275 	}
2276 
2277 	return (0);
2278 }
2279 
2280 static int
2281 ds_svc_port_up(ds_svc_t *svc, void *arg)
2282 {
2283 	ds_port_t *port = (ds_port_t *)arg;
2284 
2285 	if (DS_SVC_ISFREE(svc)) {
2286 		/* nothing to do */
2287 		return (0);
2288 	}
2289 
2290 	DS_PORTSET_ADD(svc->avail, port->id);
2291 	DS_PORTSET_DEL(svc->tried, port->id);
2292 
2293 	return (0);
2294 }
2295 
2296 static void
2297 ds_set_port_ready(ds_port_t *port, uint16_t major, uint16_t minor)
2298 {
2299 	boolean_t was_ready;
2300 
2301 	mutex_enter(&port->lock);
2302 	was_ready = (port->state == DS_PORT_READY);
2303 	if (!was_ready) {
2304 		port->state = DS_PORT_READY;
2305 		port->ver.major = major;
2306 		port->ver.minor = minor;
2307 	}
2308 	mutex_exit(&port->lock);
2309 
2310 	if (!was_ready) {
2311 
2312 		/*
2313 		 * The port came up, so update all the services
2314 		 * with this information. Follow that up with an
2315 		 * attempt to register any service that is not
2316 		 * already registered.
2317 		 */
2318 		mutex_enter(&ds_svcs.lock);
2319 
2320 		(void) ds_walk_svcs(ds_svc_port_up, port);
2321 		(void) ds_walk_svcs(ds_svc_register_onport_walker, port);
2322 
2323 		mutex_exit(&ds_svcs.lock);
2324 	}
2325 }
2326 
2327 ds_svc_t *
2328 ds_alloc_svc(void)
2329 {
2330 	int		idx;
2331 	uint_t		newmaxsvcs;
2332 	ds_svc_t	**newtbl;
2333 	ds_svc_t	*newsvc;
2334 
2335 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2336 
2337 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2338 
2339 	if (idx != ds_svcs.maxsvcs) {
2340 		goto found;
2341 	}
2342 
2343 	/*
2344 	 * There was no free space in the table. Grow
2345 	 * the table to double its current size.
2346 	 */
2347 	newmaxsvcs = ds_svcs.maxsvcs * 2;
2348 	newtbl = DS_MALLOC(newmaxsvcs * sizeof (ds_svc_t *));
2349 
2350 	/* copy old table data to the new table */
2351 	(void) memcpy(newtbl, ds_svcs.tbl,
2352 	    ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2353 
2354 	/* clean up the old table */
2355 	DS_FREE(ds_svcs.tbl, ds_svcs.maxsvcs * sizeof (ds_svc_t *));
2356 	ds_svcs.tbl = newtbl;
2357 	ds_svcs.maxsvcs = newmaxsvcs;
2358 
2359 	/* search for a free space again */
2360 	idx = ds_walk_svcs(ds_svc_isfree, NULL);
2361 
2362 	/* the table is locked so should find a free slot */
2363 	ASSERT(idx != ds_svcs.maxsvcs);
2364 
2365 found:
2366 	/* allocate a new svc structure if necessary */
2367 	if ((newsvc = ds_svcs.tbl[idx]) == NULL) {
2368 		/* allocate a new service */
2369 		newsvc = DS_MALLOC(sizeof (ds_svc_t));
2370 		ds_svcs.tbl[idx] = newsvc;
2371 	}
2372 
2373 	/* fill in the handle */
2374 	newsvc->hdl = DS_ALLOC_HDL(idx, DS_HDL2COUNT(newsvc->hdl));
2375 	newsvc->state = DS_SVC_FREE;	/* Mark as free temporarily */
2376 
2377 	return (newsvc);
2378 }
2379 
2380 static void
2381 ds_reset_svc(ds_svc_t *svc, ds_port_t *port)
2382 {
2383 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2384 
2385 	if (svc->state != DS_SVC_UNREG_PENDING)
2386 		svc->state = DS_SVC_INACTIVE;
2387 	svc->ver_idx = 0;
2388 	svc->ver.major = 0;
2389 	svc->ver.minor = 0;
2390 	svc->port = NULL;
2391 	if (port) {
2392 		DS_PORTSET_DEL(svc->avail, port->id);
2393 	}
2394 }
2395 
2396 ds_svc_t *
2397 ds_get_svc(ds_svc_hdl_t hdl)
2398 {
2399 	int		idx;
2400 	ds_svc_t	*svc;
2401 
2402 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2403 
2404 	if (hdl == DS_INVALID_HDL)
2405 		return (NULL);
2406 
2407 	idx = DS_HDL2IDX(hdl);
2408 
2409 	/* check if index is out of bounds */
2410 	if ((idx < 0) || (idx >= ds_svcs.maxsvcs))
2411 		return (NULL);
2412 
2413 	svc = ds_svcs.tbl[idx];
2414 
2415 	/* check for a valid service */
2416 	if (DS_SVC_ISFREE(svc))
2417 		return (NULL);
2418 
2419 	/* make sure the handle is an exact match */
2420 	if (svc->hdl != hdl)
2421 		return (NULL);
2422 
2423 	return (svc);
2424 }
2425 
2426 static void
2427 ds_port_reset(ds_port_t *port)
2428 {
2429 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
2430 	ASSERT(MUTEX_HELD(&port->lock));
2431 
2432 	/* connection went down, mark everything inactive */
2433 	(void) ds_walk_svcs(ds_svc_unregister, port);
2434 
2435 	port->ver_idx = 0;
2436 	port->ver.major = 0;
2437 	port->ver.minor = 0;
2438 	port->state = DS_PORT_LDC_INIT;
2439 }
2440 
2441 /*
2442  * Verify that a version array is sorted as expected for the
2443  * version negotiation to work correctly.
2444  */
2445 ds_vers_check_t
2446 ds_vers_isvalid(ds_ver_t *vers, int nvers)
2447 {
2448 	uint16_t	curr_major;
2449 	uint16_t	curr_minor;
2450 	int		idx;
2451 
2452 	curr_major = vers[0].major;
2453 	curr_minor = vers[0].minor;
2454 
2455 	/*
2456 	 * Walk the version array, verifying correct ordering.
2457 	 * The array must be sorted from highest supported
2458 	 * version to lowest supported version.
2459 	 */
2460 	for (idx = 0; idx < nvers; idx++) {
2461 		if (vers[idx].major > curr_major) {
2462 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2463 			    " increasing major versions" DS_EOL);
2464 			return (DS_VERS_INCREASING_MAJOR_ERR);
2465 		}
2466 
2467 		if (vers[idx].major < curr_major) {
2468 			curr_major = vers[idx].major;
2469 			curr_minor = vers[idx].minor;
2470 			continue;
2471 		}
2472 
2473 		if (vers[idx].minor > curr_minor) {
2474 			DS_DBG(CE_NOTE, "ds_vers_isvalid: version array has "
2475 			    " increasing minor versions" DS_EOL);
2476 			return (DS_VERS_INCREASING_MINOR_ERR);
2477 		}
2478 
2479 		curr_minor = vers[idx].minor;
2480 	}
2481 
2482 	return (DS_VERS_OK);
2483 }
2484 
2485 /*
2486  * Extended user capability init.
2487  */
2488 int
2489 ds_ucap_init(ds_capability_t *cap, ds_clnt_ops_t *ops, uint32_t flags,
2490     int instance, ds_svc_hdl_t *hdlp)
2491 {
2492 	ds_vers_check_t	status;
2493 	ds_svc_t	*svc;
2494 	int		rv = 0;
2495 	ds_svc_hdl_t 	lb_hdl, hdl;
2496 	int		is_loopback;
2497 	int		is_client;
2498 
2499 	/* sanity check the args */
2500 	if ((cap == NULL) || (ops == NULL)) {
2501 		cmn_err(CE_NOTE, "%s: invalid arguments" DS_EOL, __func__);
2502 		return (EINVAL);
2503 	}
2504 
2505 	/* sanity check the capability specifier */
2506 	if ((cap->svc_id == NULL) || (cap->vers == NULL) || (cap->nvers == 0)) {
2507 		cmn_err(CE_NOTE, "%s: invalid capability specifier" DS_EOL,
2508 		    __func__);
2509 		return (EINVAL);
2510 	}
2511 
2512 	/* sanity check the version array */
2513 	if ((status = ds_vers_isvalid(cap->vers, cap->nvers)) != DS_VERS_OK) {
2514 		cmn_err(CE_NOTE, "%s: invalid capability version array "
2515 		    "for %s service: %s" DS_EOL, __func__, cap->svc_id,
2516 		    (status == DS_VERS_INCREASING_MAJOR_ERR) ?
2517 		    "increasing major versions" :
2518 		    "increasing minor versions");
2519 		return (EINVAL);
2520 	}
2521 
2522 	/* data and register callbacks are required */
2523 	if ((ops->ds_data_cb == NULL) || (ops->ds_reg_cb == NULL)) {
2524 		cmn_err(CE_NOTE, "%s: invalid ops specifier for %s service"
2525 		    DS_EOL, __func__, cap->svc_id);
2526 		return (EINVAL);
2527 	}
2528 
2529 	flags &= DSSF_USERFLAGS;
2530 	is_client = flags & DSSF_ISCLIENT;
2531 
2532 	DS_DBG_USR(CE_NOTE, "%s: svc_id='%s', data_cb=0x%lx, cb_arg=0x%lx"
2533 	    DS_EOL, __func__, cap->svc_id, PTR_TO_LONG(ops->ds_data_cb),
2534 	    PTR_TO_LONG(ops->cb_arg));
2535 
2536 	mutex_enter(&ds_svcs.lock);
2537 
2538 	/* check if the service is already registered */
2539 	if (i_ds_hdl_lookup(cap->svc_id, is_client, NULL, 1) == 1) {
2540 		/* already registered */
2541 		DS_DBG_USR(CE_NOTE, "Service '%s'/%s already registered" DS_EOL,
2542 		    cap->svc_id,
2543 		    (flags & DSSF_ISCLIENT) ? "client" : "service");
2544 		mutex_exit(&ds_svcs.lock);
2545 		return (EALREADY);
2546 	}
2547 
2548 	svc = ds_alloc_svc();
2549 	if (is_client) {
2550 		DS_HDL_SET_ISCLIENT(svc->hdl);
2551 	}
2552 
2553 	svc->state = DS_SVC_FREE;
2554 	svc->svc_hdl = DS_BADHDL1;
2555 
2556 	svc->flags = flags;
2557 	svc->drvi = instance;
2558 	svc->drv_psp = NULL;
2559 
2560 	/*
2561 	 * Check for loopback.  "pri" is a legacy service that assumes it
2562 	 * will never use loopback mode.
2563 	 */
2564 	if (strcmp(cap->svc_id, "pri") == 0) {
2565 		is_loopback = 0;
2566 	} else if (i_ds_hdl_lookup(cap->svc_id, is_client == 0, &lb_hdl, 1)
2567 	    == 1) {
2568 		if ((rv = ds_loopback_set_svc(svc, cap, &lb_hdl)) != 0) {
2569 			DS_DBG_USR(CE_NOTE, "%s: ds_loopback_set_svc '%s' err "
2570 			    " (%d)" DS_EOL, __func__, cap->svc_id, rv);
2571 			mutex_exit(&ds_svcs.lock);
2572 			return (rv);
2573 		}
2574 		is_loopback = 1;
2575 	} else
2576 		is_loopback = 0;
2577 
2578 	/* copy over all the client information */
2579 	(void) memcpy(&svc->cap, cap, sizeof (ds_capability_t));
2580 
2581 	/* make a copy of the service name */
2582 	svc->cap.svc_id = ds_strdup(cap->svc_id);
2583 
2584 	/* make a copy of the version array */
2585 	svc->cap.vers = DS_MALLOC(cap->nvers * sizeof (ds_ver_t));
2586 	(void) memcpy(svc->cap.vers, cap->vers, cap->nvers * sizeof (ds_ver_t));
2587 
2588 	/* copy the client ops vector */
2589 	(void) memcpy(&svc->ops, ops, sizeof (ds_clnt_ops_t));
2590 
2591 	svc->state = DS_SVC_INACTIVE;
2592 	svc->ver_idx = 0;
2593 	DS_PORTSET_DUP(svc->avail, ds_allports);
2594 	DS_PORTSET_SETNULL(svc->tried);
2595 
2596 	ds_svcs.nsvcs++;
2597 
2598 	hdl = svc->hdl;
2599 
2600 	/*
2601 	 * kludge to allow user callback code to get handle and user args.
2602 	 * Make sure the callback arg points to the svc structure.
2603 	 */
2604 	if ((flags & DSSF_ISUSER) != 0) {
2605 		ds_cbarg_set_cookie(svc);
2606 	}
2607 
2608 	if (is_loopback) {
2609 		ds_loopback_register(hdl);
2610 		ds_loopback_register(lb_hdl);
2611 	}
2612 
2613 	/*
2614 	 * If this is a client or a non-loopback service provider, send
2615 	 * out register requests.
2616 	 */
2617 	if (!is_loopback || (flags & DSSF_ISCLIENT) != 0)
2618 		(void) ds_svc_register(svc, NULL);
2619 
2620 	if (hdlp) {
2621 		*hdlp = hdl;
2622 	}
2623 
2624 	mutex_exit(&ds_svcs.lock);
2625 
2626 	DS_DBG_USR(CE_NOTE, "%s: service '%s' assigned handle 0x%09lx" DS_EOL,
2627 	    __func__, svc->cap.svc_id, hdl);
2628 
2629 	return (0);
2630 }
2631 
2632 /*
2633  * ds_cap_init interface for previous revision.
2634  */
2635 int
2636 ds_cap_init(ds_capability_t *cap, ds_clnt_ops_t *ops)
2637 {
2638 	return (ds_ucap_init(cap, ops, 0, DS_INVALID_INSTANCE, NULL));
2639 }
2640 
2641 /*
2642  * Interface for ds_unreg_hdl in lds driver.
2643  */
2644 int
2645 ds_unreg_hdl(ds_svc_hdl_t hdl)
2646 {
2647 	ds_svc_t	*svc;
2648 	int		is_loopback;
2649 	ds_svc_hdl_t	lb_hdl;
2650 
2651 	DS_DBG_USR(CE_NOTE, "%s: hdl=0x%09lx" DS_EOL, __func__, hdl);
2652 
2653 	mutex_enter(&ds_svcs.lock);
2654 	if ((svc = ds_get_svc(hdl)) == NULL) {
2655 		mutex_exit(&ds_svcs.lock);
2656 		DS_DBG_USR(CE_NOTE, "%s: unknown hdl: 0x%llx" DS_EOL, __func__,
2657 		    (u_longlong_t)hdl);
2658 		return (ENXIO);
2659 	}
2660 
2661 	DS_DBG_USR(CE_NOTE, "%s: svcid='%s', hdl=0x%llx" DS_EOL, __func__,
2662 	    svc->cap.svc_id, (u_longlong_t)svc->hdl);
2663 
2664 	svc->state = DS_SVC_UNREG_PENDING;
2665 
2666 	is_loopback = ((svc->flags & DSSF_LOOPBACK) != 0);
2667 	lb_hdl = svc->svc_hdl;
2668 
2669 	if (svc->port) {
2670 		(void) ds_send_unreg_req(svc);
2671 	}
2672 
2673 	(void) ds_svc_unregister(svc, svc->port);
2674 
2675 	ds_delete_svc_entry(svc);
2676 
2677 	if (is_loopback) {
2678 		ds_loopback_unregister(lb_hdl);
2679 	}
2680 
2681 	mutex_exit(&ds_svcs.lock);
2682 
2683 	return (0);
2684 }
2685 
2686 int
2687 ds_cap_fini(ds_capability_t *cap)
2688 {
2689 	ds_svc_hdl_t	hdl;
2690 	int rv;
2691 	uint_t nhdls = 0;
2692 
2693 	DS_DBG(CE_NOTE, "%s: '%s'" DS_EOL, __func__, cap->svc_id);
2694 	if ((rv = ds_hdl_lookup(cap->svc_id, 0, &hdl, 1, &nhdls)) != 0) {
2695 		DS_DBG(CE_NOTE, "%s: ds_hdl_lookup '%s' err (%d)" DS_EOL,
2696 		    __func__, cap->svc_id, rv);
2697 		return (rv);
2698 	}
2699 
2700 	if (nhdls == 0) {
2701 		DS_DBG(CE_NOTE, "%s: no such service '%s'" DS_EOL,
2702 		    __func__, cap->svc_id);
2703 		return (ENXIO);
2704 	}
2705 
2706 	if ((rv = ds_is_my_hdl(hdl, DS_INVALID_INSTANCE)) != 0) {
2707 		DS_DBG(CE_NOTE, "%s: ds_is_my_handle err (%d)" DS_EOL, __func__,
2708 		    rv);
2709 		return (rv);
2710 	}
2711 
2712 	if ((rv = ds_unreg_hdl(hdl)) != 0) {
2713 		DS_DBG(CE_NOTE, "%s: ds_unreg_hdl err (%d)" DS_EOL, __func__,
2714 		    rv);
2715 		return (rv);
2716 	}
2717 
2718 	return (0);
2719 }
2720 
2721 int
2722 ds_cap_send(ds_svc_hdl_t hdl, void *buf, size_t len)
2723 {
2724 	int		rv;
2725 	ds_hdr_t	*hdr;
2726 	caddr_t		msg;
2727 	size_t		msglen;
2728 	size_t		hdrlen;
2729 	caddr_t		payload;
2730 	ds_svc_t	*svc;
2731 	ds_port_t	*port;
2732 	ds_data_handle_t *data;
2733 	ds_svc_hdl_t	svc_hdl;
2734 	int		is_client = 0;
2735 
2736 	DS_DBG(CE_NOTE, "%s: hdl: 0x%llx, buf: %lx, len: %ld" DS_EOL, __func__,
2737 	    (u_longlong_t)hdl, (ulong_t)buf, len);
2738 
2739 	mutex_enter(&ds_svcs.lock);
2740 
2741 	if ((svc = ds_get_svc(hdl)) == NULL) {
2742 		cmn_err(CE_WARN, "%s: invalid handle 0x%llx" DS_EOL, __func__,
2743 		    (u_longlong_t)hdl);
2744 		mutex_exit(&ds_svcs.lock);
2745 		return (ENXIO);
2746 	}
2747 
2748 	if (svc->state != DS_SVC_ACTIVE) {
2749 		/* channel is up, but svc is not registered */
2750 		DS_DBG(CE_NOTE, "%s: invalid service state 0x%x" DS_EOL,
2751 		    __func__, svc->state);
2752 		mutex_exit(&ds_svcs.lock);
2753 		return (ENOTCONN);
2754 	}
2755 
2756 	if (svc->flags & DSSF_LOOPBACK) {
2757 		hdl = svc->svc_hdl;
2758 		mutex_exit(&ds_svcs.lock);
2759 		ds_loopback_send(hdl, buf, len);
2760 		return (0);
2761 	}
2762 
2763 	if ((port = svc->port) == NULL) {
2764 		DS_DBG(CE_NOTE, "%s: service '%s' not associated with a port"
2765 		    DS_EOL, __func__, svc->cap.svc_id);
2766 		mutex_exit(&ds_svcs.lock);
2767 		return (ECONNRESET);
2768 	}
2769 
2770 	if (svc->flags & DSSF_ISCLIENT) {
2771 		is_client = 1;
2772 		svc_hdl = svc->svc_hdl;
2773 	}
2774 
2775 	mutex_exit(&ds_svcs.lock);
2776 
2777 	/* check that the LDC channel is ready */
2778 	if (port->ldc.state != LDC_UP) {
2779 		DS_DBG(CE_NOTE, "%s: LDC channel is not up" DS_EOL, __func__);
2780 		return (ECONNRESET);
2781 	}
2782 
2783 	hdrlen = DS_HDR_SZ + sizeof (ds_data_handle_t);
2784 
2785 	msg = DS_MALLOC(len + hdrlen);
2786 	hdr = (ds_hdr_t *)msg;
2787 	payload = msg + hdrlen;
2788 	msglen = len + hdrlen;
2789 
2790 	hdr->payload_len = len + sizeof (ds_data_handle_t);
2791 	hdr->msg_type = DS_DATA;
2792 
2793 	data = (ds_data_handle_t *)(msg + DS_HDR_SZ);
2794 	if (is_client) {
2795 		data->svc_handle = svc_hdl;
2796 	} else {
2797 		data->svc_handle = hdl;
2798 	}
2799 
2800 	if ((buf != NULL) && (len != 0)) {
2801 		(void) memcpy(payload, buf, len);
2802 	}
2803 
2804 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: data>: hdl=0x%llx, len=%ld, "
2805 	    " payload_len=%d" DS_EOL, PORTID(port), (u_longlong_t)svc->hdl,
2806 	    msglen, hdr->payload_len);
2807 	DS_DUMP_MSG(DS_DBG_FLAG_PRCL, msg, msglen);
2808 
2809 	if ((rv = ds_send_msg(port, msg, msglen)) != 0) {
2810 		rv = (rv == EIO) ? ECONNRESET : rv;
2811 	}
2812 	DS_FREE(msg, msglen);
2813 
2814 	return (rv);
2815 }
2816 
2817 void
2818 ds_port_common_init(ds_port_t *port)
2819 {
2820 	int rv;
2821 
2822 	if ((port->flags & DS_PORT_MUTEX_INITED) == 0) {
2823 		mutex_init(&port->lock, NULL, MUTEX_DRIVER, NULL);
2824 		mutex_init(&port->tx_lock, NULL, MUTEX_DRIVER, NULL);
2825 		mutex_init(&port->rcv_lock, NULL, MUTEX_DRIVER, NULL);
2826 		port->flags |= DS_PORT_MUTEX_INITED;
2827 	}
2828 
2829 	port->state = DS_PORT_INIT;
2830 	DS_PORTSET_ADD(ds_allports, port->id);
2831 
2832 	ds_sys_port_init(port);
2833 
2834 	mutex_enter(&port->lock);
2835 	rv = ds_ldc_init(port);
2836 	mutex_exit(&port->lock);
2837 
2838 	/*
2839 	 * If LDC successfully init'ed, try to kick off protocol for this port.
2840 	 */
2841 	if (rv == 0) {
2842 		ds_handle_up_event(port);
2843 	}
2844 }
2845 
2846 void
2847 ds_port_common_fini(ds_port_t *port)
2848 {
2849 	ASSERT(MUTEX_HELD(&port->lock));
2850 
2851 	port->state = DS_PORT_FREE;
2852 
2853 	DS_PORTSET_DEL(ds_allports, port->id);
2854 
2855 	ds_sys_port_fini(port);
2856 }
2857 
2858 /*
2859  * Initialize table of registered service classes
2860  */
2861 void
2862 ds_init_svcs_tbl(uint_t nentries)
2863 {
2864 	int	tblsz;
2865 
2866 	ds_svcs.maxsvcs = nentries;
2867 
2868 	tblsz = ds_svcs.maxsvcs * sizeof (ds_svc_t *);
2869 	ds_svcs.tbl = (ds_svc_t **)DS_MALLOC(tblsz);
2870 
2871 	ds_svcs.nsvcs = 0;
2872 }
2873 
2874 /*
2875  * Find the max and min version supported.
2876  * Hacked from zeus workspace, support.c
2877  */
2878 static void
2879 min_max_versions(int num_versions, ds_ver_t *sup_versionsp,
2880     uint16_t *min_major, uint16_t *max_major)
2881 {
2882 	int i;
2883 
2884 	*min_major = sup_versionsp[0].major;
2885 	*max_major = *min_major;
2886 
2887 	for (i = 1; i < num_versions; i++) {
2888 		if (sup_versionsp[i].major < *min_major)
2889 			*min_major = sup_versionsp[i].major;
2890 
2891 		if (sup_versionsp[i].major > *max_major)
2892 			*max_major = sup_versionsp[i].major;
2893 	}
2894 }
2895 
2896 /*
2897  * Check whether the major and minor numbers requested by the peer can be
2898  * satisfied. If the requested major is supported, true is returned, and the
2899  * agreed minor is returned in new_minor. If the requested major is not
2900  * supported, the routine returns false, and the closest major is returned in
2901  * *new_major, upon which the peer should re-negotiate. The closest major is
2902  * the just lower that the requested major number.
2903  *
2904  * Hacked from zeus workspace, support.c
2905  */
2906 boolean_t
2907 negotiate_version(int num_versions, ds_ver_t *sup_versionsp,
2908     uint16_t req_major, uint16_t *new_majorp, uint16_t *new_minorp)
2909 {
2910 	int i;
2911 	uint16_t major, lower_major;
2912 	uint16_t min_major = 0, max_major;
2913 	boolean_t found_match = B_FALSE;
2914 
2915 	min_max_versions(num_versions, sup_versionsp, &min_major, &max_major);
2916 
2917 	DS_DBG(CE_NOTE, "negotiate_version: req_major = %u, min = %u, max = %u"
2918 	    DS_EOL, req_major, min_major, max_major);
2919 
2920 	/*
2921 	 * If the minimum version supported is greater than
2922 	 * the version requested, return the lowest version
2923 	 * supported
2924 	 */
2925 	if (min_major > req_major) {
2926 		*new_majorp = min_major;
2927 		return (B_FALSE);
2928 	}
2929 
2930 	/*
2931 	 * If the largest version supported is lower than
2932 	 * the version requested, return the largest version
2933 	 * supported
2934 	 */
2935 	if (max_major < req_major) {
2936 		*new_majorp = max_major;
2937 		return (B_FALSE);
2938 	}
2939 
2940 	/*
2941 	 * Now we know that the requested version lies between the
2942 	 * min and max versions supported. Check if the requested
2943 	 * major can be found in supported versions.
2944 	 */
2945 	lower_major = min_major;
2946 	for (i = 0; i < num_versions; i++) {
2947 		major = sup_versionsp[i].major;
2948 		if (major == req_major) {
2949 			found_match = B_TRUE;
2950 			*new_majorp = req_major;
2951 			*new_minorp = sup_versionsp[i].minor;
2952 			break;
2953 		} else {
2954 			if ((major < req_major) && (major > lower_major))
2955 				lower_major = major;
2956 		}
2957 	}
2958 
2959 	/*
2960 	 * If no match is found, return the closest available number
2961 	 */
2962 	if (!found_match)
2963 		*new_majorp = lower_major;
2964 
2965 	return (found_match);
2966 }
2967 
2968 /*
2969  * Specific errno's that are used by ds.c and ldc.c
2970  */
2971 static struct {
2972 	int ds_errno;
2973 	char *estr;
2974 } ds_errno_to_str_tab[] = {
2975 	{ EIO,		"I/O error" },
2976 	{ ENXIO,	"No such device or address" },
2977 	{ EAGAIN,	"Resource temporarily unavailable" },
2978 	{ ENOMEM,	"Not enough space" },
2979 	{ EACCES,	"Permission denied" },
2980 	{ EFAULT,	"Bad address" },
2981 	{ EBUSY,	"Device busy" },
2982 	{ EINVAL,	"Invalid argument" },
2983 	{ ENOSPC,	"No space left on device" },
2984 	{ ENOMSG,	"No message of desired type" },
2985 #ifdef	ECHRNG
2986 	{ ECHRNG,	"Channel number out of range" },
2987 #endif
2988 	{ ENOTSUP,	"Operation not supported" },
2989 	{ EMSGSIZE,	"Message too long" },
2990 	{ EADDRINUSE,	"Address already in use" },
2991 	{ ECONNRESET,	"Connection reset by peer" },
2992 	{ ENOBUFS,	"No buffer space available" },
2993 	{ ENOTCONN,	"Socket is not connected" },
2994 	{ ECONNREFUSED,	"Connection refused" },
2995 	{ EALREADY,	"Operation already in progress" },
2996 	{ 0,		NULL },
2997 };
2998 
2999 char *
3000 ds_errno_to_str(int ds_errno, char *ebuf)
3001 {
3002 	int i, en;
3003 
3004 	for (i = 0; (en = ds_errno_to_str_tab[i].ds_errno) != 0; i++) {
3005 		if (en == ds_errno) {
3006 			(void) strcpy(ebuf, ds_errno_to_str_tab[i].estr);
3007 			return (ebuf);
3008 		}
3009 	}
3010 
3011 	(void) sprintf(ebuf, "ds_errno (%d)", ds_errno);
3012 	return (ebuf);
3013 }
3014 
3015 static void
3016 ds_loopback_register(ds_svc_hdl_t hdl)
3017 {
3018 	ds_ver_t ds_ver;
3019 	ds_svc_t *svc;
3020 
3021 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3022 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
3023 	    (u_longlong_t)hdl);
3024 	if ((svc = ds_get_svc(hdl)) == NULL) {
3025 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
3026 		    (u_longlong_t)hdl);
3027 		return;
3028 	}
3029 
3030 	svc->state = DS_SVC_ACTIVE;
3031 
3032 	if (svc->ops.ds_reg_cb) {
3033 		DS_DBG_LOOP(CE_NOTE, "%s: loopback regcb: hdl: 0x%llx" DS_EOL,
3034 		    __func__, (u_longlong_t)hdl);
3035 		ds_ver.major = svc->ver.major;
3036 		ds_ver.minor = svc->ver.minor;
3037 		(*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &ds_ver, hdl);
3038 	}
3039 }
3040 
3041 static void
3042 ds_loopback_unregister(ds_svc_hdl_t hdl)
3043 {
3044 	ds_svc_t *svc;
3045 
3046 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3047 	if ((svc = ds_get_svc(hdl)) == NULL) {
3048 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
3049 		    (u_longlong_t)hdl);
3050 		return;
3051 	}
3052 
3053 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
3054 	    (u_longlong_t)hdl);
3055 
3056 	svc->flags &= ~DSSF_LOOPBACK;
3057 	svc->svc_hdl = DS_BADHDL2;
3058 	svc->state = DS_SVC_INACTIVE;
3059 
3060 	if (svc->ops.ds_unreg_cb) {
3061 		DS_DBG_LOOP(CE_NOTE, "%s: loopback unregcb: hdl: 0x%llx" DS_EOL,
3062 		    __func__, (u_longlong_t)hdl);
3063 		(*svc->ops.ds_unreg_cb)(svc->ops.cb_arg);
3064 	}
3065 }
3066 
3067 static void
3068 ds_loopback_send(ds_svc_hdl_t hdl, void *buf, size_t buflen)
3069 {
3070 	ds_svc_t *svc;
3071 
3072 	mutex_enter(&ds_svcs.lock);
3073 	if ((svc = ds_get_svc(hdl)) == NULL) {
3074 		mutex_exit(&ds_svcs.lock);
3075 		DS_DBG_LOOP(CE_NOTE, "%s: invalid hdl: 0x%llx" DS_EOL, __func__,
3076 		    (u_longlong_t)hdl);
3077 		return;
3078 	}
3079 	mutex_exit(&ds_svcs.lock);
3080 
3081 	DS_DBG_LOOP(CE_NOTE, "%s: entered hdl: 0x%llx" DS_EOL, __func__,
3082 	    (u_longlong_t)hdl);
3083 
3084 	if (svc->ops.ds_data_cb) {
3085 		DS_DBG_LOOP(CE_NOTE, "%s: loopback datacb hdl: 0x%llx" DS_EOL,
3086 		    __func__, (u_longlong_t)hdl);
3087 		(*svc->ops.ds_data_cb)(svc->ops.cb_arg, buf, buflen);
3088 	}
3089 }
3090 
3091 static int
3092 ds_loopback_set_svc(ds_svc_t *svc, ds_capability_t *cap, ds_svc_hdl_t *lb_hdlp)
3093 {
3094 	ds_svc_t *lb_svc;
3095 	ds_svc_hdl_t lb_hdl = *lb_hdlp;
3096 	int i;
3097 	int match = 0;
3098 	uint16_t new_major;
3099 	uint16_t new_minor;
3100 
3101 	if ((lb_svc = ds_get_svc(lb_hdl)) == NULL) {
3102 		DS_DBG_LOOP(CE_NOTE, "%s: loopback: hdl: 0x%llx invalid" DS_EOL,
3103 		    __func__, (u_longlong_t)lb_hdl);
3104 		return (ENXIO);
3105 	}
3106 
3107 	/* negotiate a version between loopback services, if possible */
3108 	for (i = 0; i < lb_svc->cap.nvers && match == 0; i++) {
3109 		match = negotiate_version(cap->nvers, cap->vers,
3110 		    lb_svc->cap.vers[i].major, &new_major, &new_minor);
3111 	}
3112 	if (!match) {
3113 		DS_DBG_LOOP(CE_NOTE, "%s: loopback version negotiate failed"
3114 		    DS_EOL, __func__);
3115 		return (ENOTSUP);
3116 	}
3117 
3118 	/*
3119 	 * If a client service is not inactive, clone it.  If the service is
3120 	 * not a client service and has a reg req pending (usually from OBP
3121 	 * in boot state not acking/nacking reg req's), it's OK to ignore that,
3122 	 * since there are never multiple service clients.  Also reg req pending
3123 	 * only happens for non-client services, so it's OK to skip
3124 	 * this block that does client service cloning.
3125 	 */
3126 	if (lb_svc->state != DS_SVC_INACTIVE &&
3127 	    lb_svc->state != DS_SVC_REG_PENDING) {
3128 		DS_DBG_LOOP(CE_NOTE, "%s: loopback active: hdl: 0x%llx"
3129 		    DS_EOL, __func__, (u_longlong_t)lb_hdl);
3130 		if ((lb_svc->flags & DSSF_ISCLIENT) == 0) {
3131 			DS_DBG_LOOP(CE_NOTE, "%s: loopback busy hdl: 0x%llx"
3132 			    DS_EOL, __func__, (u_longlong_t)lb_hdl);
3133 			return (EBUSY);
3134 		}
3135 		svc->state = DS_SVC_INACTIVE;	/* prevent alloc'ing svc */
3136 		lb_svc = ds_svc_clone(lb_svc);
3137 		DS_DBG_LOOP(CE_NOTE, "%s: loopback clone: ohdl: 0x%llx "
3138 		    "nhdl: 0x%llx" DS_EOL, __func__, (u_longlong_t)lb_hdl,
3139 		    (u_longlong_t)lb_svc->hdl);
3140 		*lb_hdlp = lb_svc->hdl;
3141 	}
3142 
3143 	svc->flags |= DSSF_LOOPBACK;
3144 	svc->svc_hdl = lb_svc->hdl;
3145 	svc->port = NULL;
3146 	svc->ver.major = new_major;
3147 	svc->ver.minor = new_minor;
3148 
3149 	lb_svc->flags |= DSSF_LOOPBACK;
3150 	lb_svc->svc_hdl = svc->hdl;
3151 	lb_svc->port = NULL;
3152 	lb_svc->ver.major = new_major;
3153 	lb_svc->ver.minor = new_minor;
3154 
3155 	DS_DBG_LOOP(CE_NOTE, "%s: setting loopback between: 0x%llx and 0x%llx"
3156 	    DS_EOL, __func__, (u_longlong_t)svc->hdl,
3157 	    (u_longlong_t)lb_svc->hdl);
3158 	return (0);
3159 }
3160 
3161 static ds_svc_t *
3162 ds_find_clnt_svc_by_hdl_port(ds_svc_hdl_t hdl, ds_port_t *port)
3163 {
3164 	int		idx;
3165 	ds_svc_t	*svc;
3166 
3167 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s looking up clnt hdl: 0x%llx" DS_EOL,
3168 	    PORTID(port), __func__, (u_longlong_t)hdl);
3169 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3170 
3171 	/* walk every table entry */
3172 	for (idx = 0; idx < ds_svcs.maxsvcs; idx++) {
3173 		svc = ds_svcs.tbl[idx];
3174 		if (DS_SVC_ISFREE(svc))
3175 			continue;
3176 		if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3177 		    svc->svc_hdl == hdl && svc->port == port) {
3178 			DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s found clnt hdl "
3179 			    "0x%llx: svc%d" DS_EOL, PORTID(port), __func__,
3180 			    (u_longlong_t)hdl, (uint_t)DS_HDL2IDX(svc->hdl));
3181 			return (svc);
3182 		}
3183 	}
3184 	DS_DBG_PRCL(CE_NOTE, "ds@%lx: %s clnt hdl: 0x%llx not found" DS_EOL,
3185 	    PORTID(port), __func__, (u_longlong_t)hdl);
3186 
3187 	return (NULL);
3188 }
3189 
3190 static ds_svc_t *
3191 ds_svc_clone(ds_svc_t *svc)
3192 {
3193 	ds_svc_t *newsvc;
3194 	ds_svc_hdl_t hdl;
3195 
3196 	ASSERT(svc->flags & DSSF_ISCLIENT);
3197 
3198 	newsvc = ds_alloc_svc();
3199 
3200 	/* Can only clone clients for now */
3201 	hdl = newsvc->hdl | DS_HDL_ISCLIENT_BIT;
3202 	DS_DBG_USR(CE_NOTE, "%s: cloning client: old hdl: 0x%llx new hdl: "
3203 	    "0x%llx" DS_EOL, __func__, (u_longlong_t)svc->hdl,
3204 	    (u_longlong_t)hdl);
3205 	(void) memcpy(newsvc, svc, sizeof (ds_svc_t));
3206 	newsvc->hdl = hdl;
3207 	newsvc->flags &= ~DSSF_LOOPBACK;
3208 	newsvc->port = NULL;
3209 	newsvc->svc_hdl = DS_BADHDL2;
3210 	newsvc->cap.svc_id = ds_strdup(svc->cap.svc_id);
3211 	newsvc->cap.vers = DS_MALLOC(svc->cap.nvers * sizeof (ds_ver_t));
3212 	(void) memcpy(newsvc->cap.vers, svc->cap.vers,
3213 	    svc->cap.nvers * sizeof (ds_ver_t));
3214 
3215 	/*
3216 	 * Kludge to allow lds driver user callbacks to get access to current
3217 	 * svc structure.  Arg could be index to svc table or some other piece
3218 	 * of info to get to the svc table entry.
3219 	 */
3220 	if (newsvc->flags & DSSF_ISUSER) {
3221 		newsvc->ops.cb_arg = (ds_cb_arg_t)(newsvc);
3222 	}
3223 	return (newsvc);
3224 }
3225 
3226 /*
3227  * Internal handle lookup function.
3228  */
3229 static int
3230 i_ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3231     uint_t maxhdls)
3232 {
3233 	int idx;
3234 	int nhdls = 0;
3235 	ds_svc_t *svc;
3236 	uint32_t client_flag = is_client ? DSSF_ISCLIENT : 0;
3237 
3238 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3239 
3240 	for (idx = 0; idx < ds_svcs.maxsvcs && nhdls < maxhdls; idx++) {
3241 		svc = ds_svcs.tbl[idx];
3242 		if (DS_SVC_ISFREE(svc))
3243 			continue;
3244 		if (strcmp(svc->cap.svc_id, service) == 0 &&
3245 		    (svc->flags & DSSF_ISCLIENT) == client_flag) {
3246 			if (hdlp != NULL && nhdls < maxhdls) {
3247 				hdlp[nhdls] = svc->hdl;
3248 				nhdls++;
3249 			} else {
3250 				nhdls++;
3251 			}
3252 		}
3253 	}
3254 	return (nhdls);
3255 }
3256 
3257 /*
3258  * Interface for ds_hdl_lookup in lds driver.
3259  */
3260 int
3261 ds_hdl_lookup(char *service, uint_t is_client, ds_svc_hdl_t *hdlp,
3262     uint_t maxhdls, uint_t *nhdlsp)
3263 {
3264 	mutex_enter(&ds_svcs.lock);
3265 	*nhdlsp = i_ds_hdl_lookup(service, is_client, hdlp, maxhdls);
3266 	mutex_exit(&ds_svcs.lock);
3267 	return (0);
3268 }
3269 
3270 /*
3271  * After an UNREG REQ, check if this is a client service with multiple
3272  * handles.  If it is, then we can eliminate this entry.
3273  */
3274 static void
3275 ds_check_for_dup_services(ds_svc_t *svc)
3276 {
3277 	if ((svc->flags & DSSF_ISCLIENT) != 0 &&
3278 	    svc->state == DS_SVC_INACTIVE &&
3279 	    i_ds_hdl_lookup(svc->cap.svc_id, 1, NULL, 2) == 2) {
3280 		ds_delete_svc_entry(svc);
3281 	}
3282 }
3283 
3284 static void
3285 ds_delete_svc_entry(ds_svc_t *svc)
3286 {
3287 	ds_svc_hdl_t tmp_hdl;
3288 
3289 	ASSERT(MUTEX_HELD(&ds_svcs.lock));
3290 
3291 	/*
3292 	 * Clear out the structure, but do not deallocate the
3293 	 * memory. It can be reused for the next registration.
3294 	 */
3295 	DS_FREE(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1);
3296 	DS_FREE(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t));
3297 
3298 	/* save the handle to prevent reuse */
3299 	tmp_hdl = svc->hdl;
3300 	bzero((void *)svc, sizeof (ds_svc_t));
3301 
3302 	/* initialize for next use */
3303 	svc->hdl = tmp_hdl;
3304 	svc->state = DS_SVC_FREE;
3305 
3306 	ds_svcs.nsvcs--;
3307 }
3308