xref: /titanic_51/usr/src/uts/common/io/rsm/rsmka_pathmanager.c (revision 88447a05f537aabe9a1bc3d5313f22581ec992a7)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * This module provides for the management of interconnect adapters
31  * inter-node connections (aka paths), and IPC.  Adapter descriptors are
32  * maintained on a linked list; one list per adapter devname.  Each
33  * adapter descriptor heads a linked list of path descriptors.  There is
34  * also a linked list of ipc_info descriptors; one for each node.  Each
35  * ipc_info descriptor heads a circular list of ipc tokens (the tokens are
36  * embedded within a path descriptor). The tokens are used in round robin
37  * fashion.
38  *
39  *
40  * The exported interface consists of the following functions:
41  *	- rsmka_add_adapter
42  *	- rsmka_remove_adapter
43  *
44  *      [add_path and remove_path only called for current adapters]
45  *	- rsmka_add_path
46  *	- rsmka_remove_path	[a path down request is implicit]
47  *
48  *	- rsmka_path_up           [called at clock ipl for Sun Cluster]
49  *	- rsmka_path_down         [called at clock ipl for Sun Cluster]
50  *	- rsmka_disconnect_node   [called at clock ipl for Sun Cluster;
51  *				treat like path-down for all node paths;
52  *				can be before node_alive; always before
53  *				node_died.]
54  *
55  *	[node_alive and node_died are always paired]
56  *	- rsmka_node_alive   called after the first cluster path is up
57  *                           for this node
58  *	- rsmka_node_died
59  *
60  *      [set the local node id]
61  *      - rsmka_set_my_nodeid    called to set the variable my_nodeid to the
62  *                           local node id
63  *
64  * Processing for these functions is setup as a state machine supported
65  * by the data structures described above.
66  *
67  * For Sun Cluster these are called from the Path-Manager/Kernel-Agent
68  * Interface (rsmka_pm_interface.cc).
69  *
70  * The functions rsm_path_up, rsm_path_down, and rsm_disconnect_node are
71  * called at clock interrupt level from the Path-Manager/Kernel-Agent
72  * Interface which precludes sleeping; so these functions may (optionally)
73  * defer processing to an independent thread running at normal ipl.
74  *
75  *
76  * lock definitions:
77  *
78  *	(mutex) work_queue.work_mutex
79  *			protects linked list of work tokens and used
80  *			with cv_wait/cv_signal thread synchronization.
81  *			No other locks acquired when held.
82  *
83  *	(mutex) adapter_listhead_base.listlock
84  *			protects linked list of adapter listheads
85  *			Always acquired before listhead->mutex
86  *
87  *
88  *	(mutex) ipc_info_lock
89  *			protects ipc_info list and sendq token lists
90  *			Always acquired before listhead->mutex
91  *
92  *      (mutex) listhead->mutex
93  *			protects adapter listhead, linked list of
94  *			adapters, and linked list of paths.
95  *
96  *      (mutex) path->mutex
97  *			protects the path descriptor.
98  *			work_queue.work_mutex may be acquired when holding
99  *			this lock.
100  *
101  *	(mutex) adapter->mutex
102  *			protects adapter descriptor contents.  used
103  *			mainly for ref_cnt update.
104  */
105 
106 #include <sys/param.h>
107 #include <sys/kmem.h>
108 #include <sys/errno.h>
109 #include <sys/time.h>
110 #include <sys/devops.h>
111 #include <sys/ddi_impldefs.h>
112 #include <sys/cmn_err.h>
113 #include <sys/ddi.h>
114 #include <sys/sunddi.h>
115 #include <sys/proc.h>
116 #include <sys/thread.h>
117 #include <sys/taskq.h>
118 #include <sys/callb.h>
119 
120 #include <sys/rsm/rsm.h>
121 #include <rsm_in.h>
122 #include <sys/rsm/rsmka_path_int.h>
123 
124 extern void _cplpl_init();
125 extern void _cplpl_fini();
126 extern pri_t maxclsyspri;
127 extern int   rsm_hash_size;
128 
129 extern rsm_node_id_t my_nodeid;
130 extern rsmhash_table_t rsm_import_segs;
131 extern rsm_intr_hand_ret_t rsm_srv_func();
132 extern void rsmseg_unload(rsmseg_t *);
133 extern void rsm_suspend_complete(rsm_node_id_t src_node, int flag);
134 extern int rsmipc_send_controlmsg(path_t *path, int msgtype);
135 extern void rsmka_path_monitor_initialize();
136 extern void rsmka_path_monitor_terminate();
137 
138 extern adapter_t loopback_adapter;
139 /*
140  * Lint errors and warnings are displayed; informational messages
141  * are suppressed.
142  */
143 /* lint -w2 */
144 
145 
146 /*
147  * macros SQ_TOKEN_TO_PATH and WORK_TOKEN_TO_PATH use a null pointer
148  * for computational purposes.  Ignore the lint warning.
149  */
150 /* lint -save -e413 */
151 /* FUNCTION PROTOTYPES */
152 static adapter_t *init_adapter(char *, int, rsm_addr_t,
153     rsm_controller_handle_t, rsm_ops_t *, srv_handler_arg_t *);
154 adapter_t *rsmka_lookup_adapter(char *, int);
155 static ipc_info_t *lookup_ipc_info(rsm_node_id_t);
156 static ipc_info_t *init_ipc_info(rsm_node_id_t, boolean_t);
157 static path_t *lookup_path(char *, int, rsm_node_id_t, rsm_addr_t);
158 static void pathup_to_pathactive(ipc_info_t *, rsm_node_id_t);
159 static void path_importer_disconnect(path_t *);
160 boolean_t rsmka_do_path_active(path_t *, int);
161 static boolean_t do_path_up(path_t *, int);
162 static void do_path_down(path_t *, int);
163 static void enqueue_work(work_token_t *);
164 static boolean_t cancel_work(work_token_t *);
165 static void link_path(path_t *);
166 static void destroy_path(path_t *);
167 static void link_sendq_token(sendq_token_t *, rsm_node_id_t);
168 static void unlink_sendq_token(sendq_token_t *, rsm_node_id_t);
169 boolean_t rsmka_check_node_alive(rsm_node_id_t);
170 static void do_deferred_work(caddr_t);
171 static int create_ipc_sendq(path_t *);
172 static void destroy_ipc_info(ipc_info_t *);
173 void rsmka_pathmanager_cleanup();
174 void rsmka_release_adapter(adapter_t *);
175 
176 kt_did_t rsm_thread_id;
177 int rsmka_terminate_workthread_loop = 0;
178 
179 static struct adapter_listhead_list adapter_listhead_base;
180 static work_queue_t work_queue;
181 
182 /* protect ipc_info descriptor manipulation */
183 static kmutex_t ipc_info_lock;
184 
185 static ipc_info_t *ipc_info_head = NULL;
186 
187 static int category = RSM_PATH_MANAGER | RSM_KERNEL_AGENT;
188 
189 /* for synchronization with rsmipc_send() in rsm.c */
190 kmutex_t ipc_info_cvlock;
191 kcondvar_t ipc_info_cv;
192 
193 
194 
195 /*
196  * RSMKA PATHMANAGER INITIALIZATION AND CLEANUP ROUTINES
197  *
198  */
199 
200 
201 /*
202  * Called from the rsm module (rsm.c)  _init() routine
203  */
204 void
205 rsmka_pathmanager_init()
206 {
207 	kthread_t *tp;
208 
209 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
210 	    "rsmka_pathmanager_init enter\n"));
211 
212 	/* initialization for locks and condition variables  */
213 	mutex_init(&work_queue.work_mutex, NULL, MUTEX_DEFAULT, NULL);
214 	mutex_init(&ipc_info_lock, NULL, MUTEX_DEFAULT, NULL);
215 	mutex_init(&ipc_info_cvlock, NULL, MUTEX_DEFAULT, NULL);
216 	mutex_init(&adapter_listhead_base.listlock, NULL,
217 	    MUTEX_DEFAULT, NULL);
218 
219 	cv_init(&work_queue.work_cv, NULL, CV_DEFAULT, NULL);
220 	cv_init(&ipc_info_cv, NULL, CV_DEFAULT, NULL);
221 
222 	tp = thread_create(NULL, 0, do_deferred_work, NULL, 0, &p0,
223 	    TS_RUN, maxclsyspri);
224 	rsm_thread_id = tp->t_did;
225 
226 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
227 	    "rsmka_pathmanager_init done\n"));
228 }
229 
230 void
231 rsmka_pathmanager_cleanup()
232 {
233 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
234 	    "rsmka_pathmanager_cleanup enter\n"));
235 
236 	ASSERT(work_queue.head == NULL);
237 
238 	/*
239 	 * In processing the remove path callbacks from the path monitor
240 	 * object, all deferred work will have been completed. So
241 	 * awaken the deferred work thread to give it a chance to exit
242 	 * the loop.
243 	 */
244 	mutex_enter(&work_queue.work_mutex);
245 	rsmka_terminate_workthread_loop++;
246 	cv_signal(&work_queue.work_cv);
247 	mutex_exit(&work_queue.work_mutex);
248 
249 	/*
250 	 * Wait for the deferred work thread to exit before
251 	 * destroying the locks and cleaning up other data
252 	 * structures.
253 	 */
254 	if (rsm_thread_id)
255 		thread_join(rsm_thread_id);
256 
257 	/*
258 	 * Destroy locks & condition variables
259 	 */
260 	mutex_destroy(&work_queue.work_mutex);
261 	cv_destroy(&work_queue.work_cv);
262 
263 	mutex_enter(&ipc_info_lock);
264 	while (ipc_info_head)
265 		destroy_ipc_info(ipc_info_head);
266 	mutex_exit(&ipc_info_lock);
267 
268 	mutex_destroy(&ipc_info_lock);
269 
270 	mutex_destroy(&ipc_info_cvlock);
271 	cv_destroy(&ipc_info_cv);
272 
273 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
274 	    "rsmka_pathmanager_cleanup done\n"));
275 
276 }
277 
278 void
279 rsmka_set_my_nodeid(rsm_node_id_t local_nodeid)
280 {
281 	my_nodeid = local_nodeid;
282 
283 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
284 	    "rsm: node %d \n", my_nodeid));
285 
286 }
287 
288 /*
289  * DEFERRED WORK THREAD AND WORK QUEUE SUPPORT ROUTINES
290  *
291  */
292 
293 /*
294  * This function is the executable code of the thread which handles
295  * deferred work.  Work is deferred when a function is called at
296  * clock ipl and processing may require blocking.
297  *
298  *
299  * The thread is created by a call to taskq_create in rsmka_pathmanager_init.
300  * After creation, a call to taskq_dispatch causes this function to
301  * execute.  It loops forever - blocked until work is enqueued from
302  * rsmka_do_path_active, do_path_down, or rsmka_disconnect_node.
303  * rsmka_pathmanager_cleanup (called from _fini) will
304  * set rsmka_terminate_workthread_loop and the task processing will
305  * terminate.
306  */
307 static void
308 do_deferred_work(caddr_t arg /*ARGSUSED*/)
309 {
310 
311 	adapter_t 			*adapter;
312 	path_t				*path;
313 	work_token_t			*work_token;
314 	int				work_opcode;
315 	rsm_send_q_handle_t		sendq_handle;
316 	int				error;
317 	timespec_t			tv;
318 	callb_cpr_t			cprinfo;
319 
320 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "do_deferred_work enter\n"));
321 
322 	CALLB_CPR_INIT(&cprinfo, &work_queue.work_mutex, callb_generic_cpr,
323 	    "rsm_deferred_work");
324 
325 	for (;;) {
326 		mutex_enter(&work_queue.work_mutex);
327 
328 		if (rsmka_terminate_workthread_loop) {
329 			goto exit;
330 		}
331 
332 		/* When there is no work to do, block here */
333 		while (work_queue.head == NULL) {
334 			/* Since no work to do, Safe to CPR */
335 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
336 			cv_wait(&work_queue.work_cv, &work_queue.work_mutex);
337 			CALLB_CPR_SAFE_END(&cprinfo, &work_queue.work_mutex);
338 
339 			if (rsmka_terminate_workthread_loop) {
340 				goto exit;
341 			}
342 		}
343 
344 		/*
345 		 * Remove a work token and begin work
346 		 */
347 		work_token = work_queue.head;
348 		work_queue.head = work_token->next;
349 		if (work_queue.tail == work_token)
350 			work_queue.tail = NULL;
351 
352 		work_opcode = work_token->opcode;
353 		path = WORK_TOKEN_TO_PATH(work_token, work_opcode -1);
354 		work_token->next = NULL;
355 		mutex_exit(&work_queue.work_mutex);
356 
357 
358 		switch (work_opcode) {
359 		case RSMKA_IPC_UP:
360 			DBG_PRINTF((category, RSM_DEBUG,
361 			    "do_deferred_work:up,  path = %lx\n", path));
362 			error = create_ipc_sendq(path);
363 			mutex_enter(&path->mutex);
364 			if (path->state != RSMKA_PATH_UP) {
365 				/*
366 				 * path state has changed, if sendq was created,
367 				 * destroy it and return. Don't need to worry
368 				 * about sendq ref_cnt since no one starts
369 				 * using the sendq till path state becomes
370 				 * active
371 				 */
372 				if (error == RSM_SUCCESS) {
373 					sendq_handle = path->sendq_token.
374 					    rsmpi_sendq_handle;
375 					path->sendq_token.rsmpi_sendq_handle =
376 					    NULL;
377 					adapter = path->local_adapter;
378 					mutex_exit(&path->mutex);
379 
380 					if (sendq_handle != NULL) {
381 						adapter->rsmpi_ops->
382 						    rsm_sendq_destroy(
383 							    sendq_handle);
384 					}
385 					mutex_enter(&path->mutex);
386 				}
387 				/* free up work token */
388 				work_token->opcode = 0;
389 
390 				/*
391 				 * decrement reference count for the path
392 				 * descriptor and signal for synchronization
393 				 * with rsmka_remove_path. PATH_HOLD_NOLOCK was
394 				 * done by rsmka_path_up.
395 				 */
396 				PATH_RELE_NOLOCK(path);
397 				mutex_exit(&path->mutex);
398 				break;
399 			}
400 
401 			if (error == RSM_SUCCESS) {
402 				DBG_PRINTF((category, RSM_DEBUG,
403 				    "do_deferred_work:success on up\n"));
404 				/* clear flag since sendq_create succeeded */
405 				path->flags &= ~RSMKA_SQCREATE_PENDING;
406 				path->state = RSMKA_PATH_ACTIVE;
407 
408 				/*
409 				 * now that path is active we send the
410 				 * RSMIPC_MSG_SQREADY to the remote endpoint
411 				 */
412 				path->procmsg_cnt = 0;
413 				path->sendq_token.msgbuf_avail = 0;
414 
415 				/* Calculate local incarnation number */
416 				gethrestime(&tv);
417 				if (tv.tv_sec == RSM_UNKNOWN_INCN)
418 					tv.tv_sec = 1;
419 				path->local_incn = (int64_t)tv.tv_sec;
420 
421 				/*
422 				 * if send fails here its due to some
423 				 * non-transient error because QUEUE_FULL is
424 				 * not possible here since we are the first
425 				 * message on this sendq. The error will cause
426 				 * the path to go down anyways, so ignore
427 				 * the return value.
428 				 */
429 				(void) rsmipc_send_controlmsg(path,
430 				    RSMIPC_MSG_SQREADY);
431 				/* wait for SQREADY_ACK message */
432 				path->flags |= RSMKA_WAIT_FOR_SQACK;
433 			} else {
434 				/*
435 				 * sendq create failed possibly because
436 				 * the remote end is not yet ready eg.
437 				 * handler not registered, set a flag
438 				 * so that when there is an indication
439 				 * that the remote end is ready
440 				 * rsmka_do_path_active will be retried.
441 				 */
442 				path->flags |= RSMKA_SQCREATE_PENDING;
443 			}
444 
445 			/* free up work token */
446 			work_token->opcode = 0;
447 
448 			/*
449 			 * decrement reference count for the path
450 			 * descriptor and signal for synchronization with
451 			 * rsmka_remove_path. PATH_HOLD_NOLOCK was done
452 			 * by rsmka_path_up.
453 			 */
454 			PATH_RELE_NOLOCK(path);
455 			mutex_exit(&path->mutex);
456 
457 			break;
458 		case RSMKA_IPC_DOWN:
459 			DBG_PRINTF((category, RSM_DEBUG,
460 			    "do_deferred_work:down, path = %lx\n", path));
461 
462 			/*
463 			 * Unlike the processing of path_down in the case
464 			 * where the RSMKA_NO_SLEEP flag is not set, here,
465 			 * the state of the path is changed directly to
466 			 * RSMKA_PATH_DOWN. This is because in this case
467 			 * where the RSMKA_NO_SLEEP flag is set, any other
468 			 * calls referring this path will just queue up
469 			 * and will be processed only after the path
470 			 * down processing has completed.
471 			 */
472 			mutex_enter(&path->mutex);
473 			path->state = RSMKA_PATH_DOWN;
474 			/*
475 			 * clear the WAIT_FOR_SQACK flag since path is down.
476 			 */
477 			path->flags &= ~RSMKA_WAIT_FOR_SQACK;
478 
479 			/*
480 			 * this wakes up any thread waiting to receive credits
481 			 * in rsmipc_send to tell it that the path is down
482 			 * thus releasing the sendq.
483 			 */
484 			cv_broadcast(&path->sendq_token.sendq_cv);
485 
486 			mutex_exit(&path->mutex);
487 
488 			/* drain the messages from the receive msgbuf */
489 			taskq_wait(path->recv_taskq);
490 
491 			/*
492 			 * The path_importer_disconnect function has to
493 			 * be called after releasing the mutex on the path
494 			 * in order to avoid any recursive mutex enter panics
495 			 */
496 			path_importer_disconnect(path);
497 			DBG_PRINTF((category, RSM_DEBUG,
498 			    "do_deferred_work: success on down\n"));
499 			/*
500 			 * decrement reference count for the path
501 			 * descriptor and signal for synchronization with
502 			 * rsmka_remove_path. PATH_HOLD_NOLOCK was done
503 			 * by rsmka_path_down.
504 			 */
505 			mutex_enter(&path->mutex);
506 
507 #ifdef DEBUG
508 			/*
509 			 * Some IPC messages left in the recv_buf,
510 			 * they'll be dropped
511 			 */
512 			if (path->msgbuf_cnt != 0)
513 				cmn_err(CE_NOTE,
514 				    "path=%lx msgbuf_cnt != 0\n",
515 				    (uintptr_t)path);
516 #endif
517 
518 			/*
519 			 * Don't want to destroy a send queue when a token
520 			 * has been acquired; so wait 'til the token is
521 			 * no longer referenced (with a cv_wait).
522 			 */
523 			while (path->sendq_token.ref_cnt != 0)
524 				cv_wait(&path->sendq_token.sendq_cv,
525 				    &path->mutex);
526 
527 			sendq_handle = path->sendq_token.rsmpi_sendq_handle;
528 			path->sendq_token.rsmpi_sendq_handle = NULL;
529 
530 			/* destroy the send queue and release the handle */
531 			if (sendq_handle != NULL) {
532 				adapter = path->local_adapter;
533 				adapter->rsmpi_ops->rsm_sendq_destroy(
534 				    sendq_handle);
535 			}
536 
537 			work_token->opcode = 0;
538 			PATH_RELE_NOLOCK(path);
539 			mutex_exit(&path->mutex);
540 			break;
541 		default:
542 			DBG_PRINTF((category, RSM_DEBUG,
543 			    "do_deferred_work: bad work token opcode\n"));
544 			break;
545 		}
546 	}
547 
548 exit:
549 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "do_deferred_work done\n"));
550 	/*
551 	 * CALLB_CPR_EXIT does a mutex_exit for
552 	 * the work_queue.work_mutex
553 	 */
554 	CALLB_CPR_EXIT(&cprinfo);
555 }
556 
557 /*
558  * Work is inserted at the tail of the list and processed from the
559  * head of the list.
560  */
561 static void
562 enqueue_work(work_token_t *token)
563 {
564 	work_token_t	*tail_token;
565 
566 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "enqueue_work enter\n"));
567 
568 	ASSERT(MUTEX_HELD(&work_queue.work_mutex));
569 
570 	token->next = NULL;
571 	if (work_queue.head == NULL) {
572 		work_queue.head = work_queue.tail = token;
573 	} else {
574 		tail_token = work_queue.tail;
575 		work_queue.tail = tail_token->next = token;
576 	}
577 
578 	/* wake up deferred work thread */
579 	cv_signal(&work_queue.work_cv);
580 
581 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "enqueue_work done\n"));
582 }
583 
584 
585 /*
586  * If the work_token is found on the work queue, the work is cancelled
587  * by removing the token from the work queue.
588  *
589  * Return true if a work_token was found and cancelled, otherwise return false
590  *
591  * enqueue_work increments the path refcnt to make sure that the path doesn't
592  * go away, callers of cancel_work need to decrement the refcnt of the path to
593  * which this work_token belongs if a work_token is found in the work_queue
594  * and cancelled ie. when the return value is B_TRUE.
595  */
596 static boolean_t
597 cancel_work(work_token_t *work_token)
598 {
599 	work_token_t	*current_token;
600 	work_token_t	*prev_token = NULL;
601 	boolean_t	cancelled = B_FALSE;
602 
603 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "cancel_work enter\n"));
604 
605 	ASSERT(MUTEX_HELD(&work_queue.work_mutex));
606 
607 
608 	current_token = work_queue.head;
609 	while (current_token != NULL) {
610 		if (current_token == work_token) {
611 			if (work_token == work_queue.head)
612 				work_queue.head = work_token->next;
613 			else
614 				prev_token->next = work_token->next;
615 			if (work_token == work_queue.tail)
616 				work_queue.tail = prev_token;
617 
618 			current_token->opcode = 0;
619 			current_token->next = NULL;
620 			/* found and cancelled work */
621 			cancelled = B_TRUE;
622 			DBG_PRINTF((category, RSM_DEBUG,
623 			    "cancelled_work = 0x%p\n", work_token));
624 			break;
625 		}
626 		prev_token = current_token;
627 		current_token = current_token->next;
628 	}
629 
630 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "cancel_work done\n"));
631 	return (cancelled);
632 }
633 
634 /*
635  * EXTERNAL INTERFACES
636  *
637  * For Galileo Clustering, these routine are called from
638  * rsmka_pm_interface.cc
639  *
640  */
641 
642 /*
643  *
644  * If the adapter is supported by rsmpi then initialize an adapter descriptor
645  * and link it to the list of adapters.  The adapter attributes are obtained
646  * from rsmpi and stored in the descriptor.  Finally, a service handler
647  * for incoming ipc on this adapter is registered with rsmpi.
648  * A pointer for the adapter descriptor is returned as a cookie to the
649  * caller.  The cookie may be use with subsequent calls to save the time of
650  * adapter descriptor lookup.
651  *
652  * The adapter descriptor maintains a reference count which is intialized
653  * to 1 and incremented on lookups; when a cookie is used in place of
654  * a lookup, an explicit ADAPTER_HOLD is required.
655  */
656 
657 void *
658 rsmka_add_adapter(char *name, int instance, rsm_addr_t hwaddr)
659 {
660 	adapter_t		*adapter;
661 	rsm_controller_object_t	rsmpi_adapter_object;
662 	rsm_controller_handle_t	rsmpi_adapter_handle;
663 	rsm_ops_t		*rsmpi_ops_vector;
664 	int			adapter_is_supported;
665 	rsm_controller_attr_t	*attr;
666 	srv_handler_arg_t	*srv_hdlr_argp;
667 	int result;
668 
669 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_add_adapter enter\n"));
670 
671 	DBG_PRINTF((category, RSM_DEBUG,
672 	    "rsmka_add_adapter: name = %s instance = %d hwaddr = %llx \n",
673 	    name, instance, hwaddr));
674 
675 	/* verify name length */
676 	if (strlen(name) >= MAXNAMELEN) {
677 		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
678 		    "rsmka_add_adapter done: name too long\n"));
679 		return (NULL);
680 	}
681 
682 
683 	/* Check if rsmpi supports this adapter type */
684 	adapter_is_supported = rsm_get_controller(name, instance,
685 	    &rsmpi_adapter_object, RSM_VERSION);
686 
687 	if (adapter_is_supported != RSM_SUCCESS) {
688 		DBG_PRINTF((category, RSM_ERR,
689 		    "rsmka_add_adapter done: adapter not supported\n"));
690 		return (NULL);
691 	}
692 
693 	rsmpi_adapter_handle = rsmpi_adapter_object.handle;
694 	rsmpi_ops_vector = rsmpi_adapter_object.ops;
695 
696 	/* Get adapter attributes */
697 	result = rsm_get_controller_attr(rsmpi_adapter_handle, &attr);
698 	if (result != RSM_SUCCESS) {
699 		DBG_PRINTF((category, RSM_ERR,
700 		    "rsm: get_controller_attr(%d) Failed %x\n",
701 		    instance, result));
702 		(void) rsm_release_controller(name, instance,
703 		    &rsmpi_adapter_object);
704 		return (NULL);
705 	}
706 
707 	DBG_PRINTF((category, RSM_DEBUG,
708 	    "rsmka_add_adapter: register service offset = %d\n", hwaddr));
709 
710 	/*
711 	 * create a srv_handler_arg_t object, initialize it and register
712 	 * it along with rsm_srv_func. This get passed as the
713 	 * rsm_intr_hand_arg_t when the handler gets invoked.
714 	 */
715 	srv_hdlr_argp = kmem_zalloc(sizeof (srv_handler_arg_t), KM_SLEEP);
716 
717 	(void) strcpy(srv_hdlr_argp->adapter_name, name);
718 	srv_hdlr_argp->adapter_instance = instance;
719 	srv_hdlr_argp->adapter_hwaddr = hwaddr;
720 
721 	/* Have rsmpi register the ipc receive handler for this adapter */
722 	/*
723 	 * Currently, we need to pass in a separate service identifier for
724 	 * each adapter. In order to obtain a unique service identifier
725 	 * value for an adapter, we add the hardware address of the
726 	 * adapter to the base service identifier(RSM_SERVICE which is
727 	 * defined as RSM_INTR_T_KA as per the RSMPI specification).
728 	 * NOTE: This may result in using some of the service identifier
729 	 * values defined for RSM_INTR_T_XPORT(the Sun Cluster Transport).
730 	 */
731 	result = rsmpi_ops_vector->rsm_register_handler(
732 	    rsmpi_adapter_handle, &rsmpi_adapter_object,
733 	    RSM_SERVICE+(uint_t)hwaddr, rsm_srv_func,
734 	    (rsm_intr_hand_arg_t)srv_hdlr_argp, NULL, 0);
735 
736 	if (result != RSM_SUCCESS) {
737 		DBG_PRINTF((category, RSM_ERR,
738 		    "rsmka_add_adapter done: rsm_register_handler"
739 		    "failed %d\n",
740 		    instance));
741 		return (NULL);
742 	}
743 
744 	/* Initialize an adapter descriptor and add it to the adapter list */
745 	adapter = init_adapter(name, instance, hwaddr,
746 	    rsmpi_adapter_handle, rsmpi_ops_vector, srv_hdlr_argp);
747 
748 	/* Copy over the attributes from the pointer returned to us */
749 	adapter->rsm_attr = *attr;
750 
751 	/*
752 	 * With the addition of the topology obtainment interface, applications
753 	 * now get the local nodeid from the topology data structure.
754 	 *
755 	 * adapter->rsm_attr.attr_node_id = my_nodeid;
756 	 */
757 	DBG_PRINTF((category, RSM_ERR,
758 	    "rsmka_add_adapter: adapter = %lx\n", adapter));
759 
760 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_add_adapter done\n"));
761 
762 	/* return adapter pointer as a cookie for later fast access */
763 	return ((void *)adapter);
764 }
765 
766 
767 /*
768  * Unlink the adapter descriptor and call rsmka_release_adapter which
769  * will decrement the reference count and possibly free the desriptor.
770  */
771 boolean_t
772 rsmka_remove_adapter(char *name, uint_t instance, void *cookie, int flags)
773 {
774 	adapter_t		*adapter;
775 	adapter_listhead_t	*listhead;
776 	adapter_t		*prev, *current;
777 	rsm_controller_object_t	rsm_cntl_obj;
778 
779 
780 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
781 	    "rsmka_remove_adapter enter\n"));
782 
783 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
784 	    "rsmka_remove_adapter: cookie = %lx\n", cookie));
785 
786 	if (flags & RSMKA_USE_COOKIE) {
787 		adapter = (adapter_t *)cookie;
788 	} else {
789 		adapter = rsmka_lookup_adapter(name, instance);
790 		/*
791 		 * rsmka_lookup_adapter increments the ref_cnt; need
792 		 * to decrement here to get true count
793 		 */
794 		ADAPTER_RELE(adapter);
795 	}
796 	ASSERT(adapter->next_path == NULL);
797 
798 	listhead = adapter->listhead;
799 
800 	mutex_enter(&adapter_listhead_base.listlock);
801 
802 	mutex_enter(&listhead->mutex);
803 
804 	/* find the adapter in the list and remove it */
805 	prev = NULL;
806 	current = listhead->next_adapter;
807 	while (current != NULL) {
808 		if (adapter->instance == current->instance) {
809 			break;
810 		} else {
811 			prev = current;
812 			current = current->next;
813 		}
814 	}
815 	ASSERT(current != NULL);
816 
817 	if (prev == NULL)
818 		listhead->next_adapter = current->next;
819 	else
820 		prev->next = current->next;
821 
822 	listhead->adapter_count--;
823 
824 	mutex_exit(&listhead->mutex);
825 
826 	mutex_exit(&adapter_listhead_base.listlock);
827 
828 	mutex_enter(&current->mutex);
829 
830 	/*
831 	 * unregister the handler
832 	 */
833 	current->rsmpi_ops->rsm_unregister_handler(current->rsmpi_handle,
834 	    RSM_SERVICE+current->hwaddr, rsm_srv_func,
835 	    (rsm_intr_hand_arg_t)current->hdlr_argp);
836 
837 	DBG_PRINTF((category, RSM_DEBUG, "rsmka_remove_adapter: unreg hdlr "
838 	    ":adapter=%lx, hwaddr=%lx\n", current, current->hwaddr));
839 
840 	rsm_cntl_obj.handle = current->rsmpi_handle;
841 	rsm_cntl_obj.ops = current->rsmpi_ops;
842 
843 	(void) rsm_release_controller(current->listhead->adapter_devname,
844 	    current->instance, &rsm_cntl_obj);
845 
846 	mutex_exit(&current->mutex);
847 
848 	rsmka_release_adapter(current);
849 
850 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
851 	    "rsmka_remove_adapter done\n"));
852 
853 	return (B_TRUE);
854 }
855 
856 /*
857  * An adapter descriptor will exist from an earlier add_adapter. This
858  * function does:
859  *		initialize the path descriptor
860  *		initialize the ipc descriptor (it may already exist)
861  *		initialize and link a sendq token for this path
862  */
863 void *
864 rsmka_add_path(char *adapter_name, int adapter_instance,
865     rsm_node_id_t remote_node,
866     rsm_addr_t remote_hwaddr, int rem_adapt_instance,
867     void *cookie, int flags)
868 {
869 
870 	path_t			*path;
871 	adapter_t		*adapter;
872 	char			tq_name[TASKQ_NAMELEN];
873 
874 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_add_path enter\n"));
875 
876 	/* allocate new path descriptor */
877 	path = kmem_zalloc(sizeof (path_t), KM_SLEEP);
878 
879 	if (flags & RSMKA_USE_COOKIE) {
880 		adapter = (adapter_t *)cookie;
881 		ADAPTER_HOLD(adapter);
882 	} else {
883 		adapter = rsmka_lookup_adapter(adapter_name, adapter_instance);
884 	}
885 
886 	DBG_PRINTF((category, RSM_DEBUG,
887 	    "rsmka_add_path: adapter = %lx\n", adapter));
888 
889 	/*
890 	 * initialize path descriptor
891 	 * don't need to increment adapter reference count because
892 	 * it can't be removed if paths exist for it.
893 	 */
894 	mutex_init(&path->mutex, NULL, MUTEX_DEFAULT, NULL);
895 
896 	PATH_HOLD(path);
897 	path->state = RSMKA_PATH_DOWN;
898 	path->remote_node = remote_node;
899 	path->remote_hwaddr = remote_hwaddr;
900 	path->remote_devinst = rem_adapt_instance;
901 	path->local_adapter = adapter;
902 
903 	/* taskq is for sendq on adapter with remote_hwaddr on remote_node */
904 	(void) snprintf(tq_name, sizeof (tq_name), "%x_%llx",
905 	    remote_node, (unsigned long long) remote_hwaddr);
906 
907 	path->recv_taskq = taskq_create_instance(tq_name, adapter_instance,
908 	    RSMKA_ONE_THREAD, maxclsyspri, RSMIPC_MAX_MESSAGES,
909 	    RSMIPC_MAX_MESSAGES, TASKQ_PREPOPULATE);
910 
911 	/* allocate the message buffer array */
912 	path->msgbuf_queue = (msgbuf_elem_t *)kmem_zalloc(
913 	    RSMIPC_MAX_MESSAGES * sizeof (msgbuf_elem_t), KM_SLEEP);
914 
915 	/*
916 	 * init cond variables for synch with rsmipc_send()
917 	 * and rsmka_remove_path
918 	 */
919 	cv_init(&path->sendq_token.sendq_cv, NULL, CV_DEFAULT, NULL);
920 	cv_init(&path->hold_cv, NULL, CV_DEFAULT, NULL);
921 
922 	/* link path descriptor on adapter path list */
923 	link_path(path);
924 
925 	/* link the path sendq token on the ipc_info token list */
926 	link_sendq_token(&path->sendq_token, remote_node);
927 
928 	/* ADAPTER_HOLD done above by rsmka_lookup_adapter */
929 	ADAPTER_RELE(adapter);
930 
931 	DBG_PRINTF((category, RSM_DEBUG, "rsmka_add_path: path = %lx\n", path));
932 
933 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_add_path done\n"));
934 	return ((void *)path);
935 }
936 
937 /*
938  * Wait for the path descriptor reference count to become zero then
939  * directly call path down processing.  Finally, unlink the sendq token and
940  * free the path descriptor memory.
941  *
942  * Note: lookup_path locks the path and increments the path hold count
943  */
944 void
945 rsmka_remove_path(char *adapter_name, int instance, rsm_node_id_t remote_node,
946     rsm_addr_t remote_hwaddr, void *path_cookie, int flags)
947 {
948 	path_t		*path;
949 
950 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_remove_path enter\n"));
951 
952 	if (flags & RSMKA_USE_COOKIE) {
953 		path = (path_t *)path_cookie;
954 		mutex_enter(&path->mutex);
955 	} else {
956 		path = lookup_path(adapter_name, instance,  remote_node,
957 		    remote_hwaddr);
958 
959 		/*
960 		 * remember, lookup_path increments the reference
961 		 * count - so decrement now so we can get to zero
962 		 */
963 		PATH_RELE_NOLOCK(path);
964 	}
965 
966 	DBG_PRINTF((category, RSM_DEBUG,
967 	    "rsmka_remove_path: path = %lx\n", path));
968 
969 	while (path->state == RSMKA_PATH_GOING_DOWN)
970 		cv_wait(&path->hold_cv, &path->mutex);
971 
972 	/* attempt to cancel any possibly pending work */
973 	mutex_enter(&work_queue.work_mutex);
974 	if (cancel_work(&path->work_token[RSMKA_IPC_UP_INDEX])) {
975 		PATH_RELE_NOLOCK(path);
976 	}
977 	if (cancel_work(&path->work_token[RSMKA_IPC_DOWN_INDEX])) {
978 		PATH_RELE_NOLOCK(path);
979 	}
980 	mutex_exit(&work_queue.work_mutex);
981 
982 	/*
983 	 * The path descriptor ref cnt was set to 1 initially when
984 	 * the path was added.  So we need to do a decrement here to
985 	 * balance that.
986 	 */
987 	PATH_RELE_NOLOCK(path);
988 
989 	switch (path->state) {
990 	case RSMKA_PATH_UP:
991 		/* clear the flag */
992 		path->flags &= ~RSMKA_SQCREATE_PENDING;
993 		path->state = RSMKA_PATH_DOWN;
994 		break;
995 	case RSMKA_PATH_DOWN:
996 		break;
997 
998 	case RSMKA_PATH_ACTIVE:
999 		/*
1000 		 * rsmka_remove_path should not call do_path_down
1001 		 * with the RSMKA_NO_SLEEP flag set since for
1002 		 * this code path, the deferred work would
1003 		 * incorrectly do a PATH_RELE_NOLOCK.
1004 		 */
1005 		do_path_down(path, 0);
1006 		break;
1007 	default:
1008 		mutex_exit(&path->mutex);
1009 		DBG_PRINTF((category, RSM_ERR,
1010 		    "rsm_remove_path: invalid path state %d\n",
1011 		    path->state));
1012 		return;
1013 
1014 	}
1015 
1016 	/*
1017 	 * wait for all references to the path to be released. If a thread
1018 	 * was waiting to receive credits do_path_down should wake it up
1019 	 * since the path is going down and that will cause the sleeping
1020 	 * thread to release its hold on the path.
1021 	 */
1022 	while (path->ref_cnt != 0) {
1023 		cv_wait(&path->hold_cv, &path->mutex);
1024 	}
1025 
1026 	mutex_exit(&path->mutex);
1027 
1028 	/*
1029 	 * remove from ipc token list
1030 	 * NOTE: use the remote_node value from the path structure
1031 	 * since for RSMKA_USE_COOKIE being set, the remote_node
1032 	 * value passed into rsmka_remove_path is 0.
1033 	 */
1034 	unlink_sendq_token(&path->sendq_token, path->remote_node);
1035 
1036 	/* unlink from adapter path list and free path descriptor */
1037 	destroy_path(path);
1038 
1039 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_remove_path done\n"));
1040 }
1041 
1042 /*
1043  *
1044  * LOCKING:
1045  * lookup_path locks the path and increments the path hold count. If the remote
1046  * node is not in the alive state, do_path_up will release the lock and
1047  * decrement the hold count.  Otherwise rsmka_do_path_active will release the
1048  * lock prior to waking up the work thread.
1049  *
1050  * REF_CNT:
1051  * The path descriptor ref_cnt is incremented here; it will be decremented
1052  * when path up processing is completed in do_path_up or by the work thread
1053  * if the path up is deferred.
1054  *
1055  */
1056 boolean_t
1057 rsmka_path_up(char *adapter_name, uint_t adapter_instance,
1058     rsm_node_id_t remote_node, rsm_addr_t remote_hwaddr,
1059     void *path_cookie, int flags)
1060 {
1061 
1062 	path_t			*path;
1063 	boolean_t		rval = B_TRUE;
1064 
1065 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_path_up enter\n"));
1066 
1067 	if (flags & RSMKA_USE_COOKIE) {
1068 		path = (path_t *)path_cookie;
1069 		mutex_enter(&path->mutex);
1070 		PATH_HOLD_NOLOCK(path);
1071 	} else {
1072 		path = lookup_path(adapter_name, adapter_instance,
1073 		    remote_node, remote_hwaddr);
1074 	}
1075 
1076 	while (path->state == RSMKA_PATH_GOING_DOWN)
1077 		cv_wait(&path->hold_cv, &path->mutex);
1078 
1079 	DBG_PRINTF((category, RSM_DEBUG, "rsmka_path_up: path = %lx\n", path));
1080 	rval = do_path_up(path, flags);
1081 	mutex_exit(&path->mutex);
1082 
1083 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_path_up done\n"));
1084 	return (rval);
1085 }
1086 
1087 /*
1088  *
1089  * LOCKING:
1090  * lookup_path locks the path and increments the path hold count. If the
1091  * current state is ACTIVE the path lock is release prior to waking up
1092  * the work thread in do_path_down .  The work thread will decrement the hold
1093  * count when the work for this is finished.
1094  *
1095  *
1096  * REF_CNT:
1097  * The path descriptor ref_cnt is incremented here; it will be decremented
1098  * when path down processing is completed in do_path_down or by the work thread
1099  * if the path down is deferred.
1100  *
1101  */
1102 boolean_t
1103 rsmka_path_down(char *adapter_devname, int instance, rsm_node_id_t remote_node,
1104     rsm_addr_t remote_hwaddr,  void *path_cookie, int flags)
1105 {
1106 	path_t			*path;
1107 	boolean_t		rval = B_TRUE;
1108 
1109 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_path_down enter\n"));
1110 
1111 	if (flags & RSMKA_USE_COOKIE) {
1112 		path = (path_t *)path_cookie;
1113 		mutex_enter(&path->mutex);
1114 		PATH_HOLD_NOLOCK(path);
1115 	} else {
1116 		path = lookup_path(adapter_devname, instance, remote_node,
1117 		    remote_hwaddr);
1118 	}
1119 
1120 	while (path->state == RSMKA_PATH_GOING_DOWN)
1121 		cv_wait(&path->hold_cv, &path->mutex);
1122 
1123 	DBG_PRINTF((category, RSM_DEBUG,
1124 	    "rsmka_path_down: path = %lx\n", path));
1125 
1126 	switch (path->state) {
1127 	case RSMKA_PATH_UP:
1128 		/* clear the flag */
1129 		path->flags &= ~RSMKA_SQCREATE_PENDING;
1130 		path->state = RSMKA_PATH_GOING_DOWN;
1131 		mutex_exit(&path->mutex);
1132 
1133 		/*
1134 		 * release path->mutex since enqueued tasks acquire it.
1135 		 * Drain all the enqueued tasks.
1136 		 */
1137 		taskq_wait(path->recv_taskq);
1138 
1139 		mutex_enter(&path->mutex);
1140 		path->state = RSMKA_PATH_DOWN;
1141 		PATH_RELE_NOLOCK(path);
1142 		break;
1143 	case RSMKA_PATH_DOWN:
1144 		PATH_RELE_NOLOCK(path);
1145 		break;
1146 	case RSMKA_PATH_ACTIVE:
1147 		do_path_down(path, flags);
1148 		/*
1149 		 * Need to release the path refcnt. Either done in do_path_down
1150 		 * or do_deferred_work for RSMKA_NO_SLEEP being set. Has to be
1151 		 * done here for RSMKA_NO_SLEEP not set.
1152 		 */
1153 		if (!(flags & RSMKA_NO_SLEEP))
1154 			PATH_RELE_NOLOCK(path);
1155 		break;
1156 	default:
1157 		DBG_PRINTF((category, RSM_ERR,
1158 		    "rsm_path_down: invalid path state %d\n", path->state));
1159 		rval = B_FALSE;
1160 	}
1161 
1162 	mutex_exit(&path->mutex);
1163 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_path_down done\n"));
1164 	return (rval);
1165 }
1166 
1167 
1168 /*
1169  * Paths cannot become active until node_is_alive is marked true
1170  * in the ipc_info descriptor for the node
1171  *
1172  * In the event this is called before any paths have been added,
1173  * init_ipc_info if called here.
1174  *
1175  */
1176 boolean_t
1177 rsmka_node_alive(rsm_node_id_t remote_node)
1178 {
1179 	ipc_info_t *ipc_info;
1180 
1181 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_node_alive enter\n"));
1182 
1183 	DBG_PRINTF((category, RSM_DEBUG,
1184 	    "rsmka_node_alive: remote_node = %x\n", remote_node));
1185 
1186 	ipc_info = lookup_ipc_info(remote_node);
1187 
1188 	if (ipc_info == NULL) {
1189 		ipc_info = init_ipc_info(remote_node, B_TRUE);
1190 		DBG_PRINTF((category, RSM_DEBUG,
1191 		    "rsmka_node_alive: new ipc_info = %lx\n", ipc_info));
1192 	} else {
1193 		ASSERT(ipc_info->node_is_alive == B_FALSE);
1194 		ipc_info->node_is_alive = B_TRUE;
1195 	}
1196 
1197 	pathup_to_pathactive(ipc_info, remote_node);
1198 
1199 	mutex_exit(&ipc_info_lock);
1200 
1201 	/* rsmipc_send() may be waiting for a sendq_token */
1202 	mutex_enter(&ipc_info_cvlock);
1203 	cv_broadcast(&ipc_info_cv);
1204 	mutex_exit(&ipc_info_cvlock);
1205 
1206 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_node_alive done\n"));
1207 
1208 	return (B_TRUE);
1209 }
1210 
1211 
1212 
1213 /*
1214  * Paths cannot become active when node_is_alive is marked false
1215  * in the ipc_info descriptor for the node
1216  */
1217 boolean_t
1218 rsmka_node_died(rsm_node_id_t remote_node)
1219 {
1220 	ipc_info_t *ipc_info;
1221 
1222 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_node_died enter\n"));
1223 
1224 	DBG_PRINTF((category, RSM_DEBUG,
1225 	    "rsmka_node_died: remote_node = %x\n", remote_node));
1226 
1227 	ipc_info = lookup_ipc_info(remote_node);
1228 	if (ipc_info == NULL)
1229 		return (B_FALSE);
1230 
1231 	ASSERT(ipc_info->node_is_alive == B_TRUE);
1232 	ipc_info->node_is_alive = B_FALSE;
1233 
1234 	rsm_suspend_complete(remote_node, RSM_SUSPEND_NODEDEAD);
1235 
1236 	mutex_exit(&ipc_info_lock);
1237 
1238 	/* rsmipc_send() may be waiting for a sendq_token */
1239 	mutex_enter(&ipc_info_cvlock);
1240 	cv_broadcast(&ipc_info_cv);
1241 	mutex_exit(&ipc_info_cvlock);
1242 
1243 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_node_died done\n"));
1244 
1245 	return (B_TRUE);
1246 }
1247 
1248 /*
1249  * Treat like path_down for all paths for the specified remote node.
1250  * Always invoked before node died.
1251  *
1252  * NOTE: This routine is not called from the cluster path interface; the
1253  * rsmka_path_down is called directly for each path.
1254  */
1255 void
1256 rsmka_disconnect_node(rsm_node_id_t remote_node, int flags)
1257 {
1258 	ipc_info_t	*ipc_info;
1259 	path_t		*path;
1260 	sendq_token_t	*sendq_token;
1261 	work_token_t 	*up_token;
1262 	work_token_t 	*down_token;
1263 	boolean_t	do_work = B_FALSE;
1264 	boolean_t	cancelled = B_FALSE;
1265 
1266 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1267 	    "rsmka_disconnect_node enter\n"));
1268 
1269 	DBG_PRINTF((category, RSM_DEBUG,
1270 	    "rsmka_disconnect_node: node = %d\n", remote_node));
1271 
1272 	if (flags & RSMKA_NO_SLEEP) {
1273 		ipc_info = lookup_ipc_info(remote_node);
1274 
1275 		sendq_token = ipc_info->token_list;
1276 
1277 		while (sendq_token != NULL) {
1278 			path = SQ_TOKEN_TO_PATH(sendq_token);
1279 			PATH_HOLD(path);
1280 			up_token = &path->work_token[RSMKA_IPC_UP_INDEX];
1281 			down_token = &path->work_token[RSMKA_IPC_DOWN_INDEX];
1282 
1283 			mutex_enter(&work_queue.work_mutex);
1284 
1285 			/* if an up token is enqueued, remove it */
1286 			cancelled = cancel_work(up_token);
1287 
1288 			/*
1289 			 * If the path is active and down work hasn't
1290 			 * already been setup then down work is needed.
1291 			 * else
1292 			 * if up work wasn't canceled because it was
1293 			 * already being processed then down work is needed
1294 			 */
1295 			if (path->state == RSMKA_PATH_ACTIVE) {
1296 				if (down_token->opcode == 0)
1297 					do_work = B_TRUE;
1298 			} else
1299 				if (up_token->opcode == RSMKA_IPC_UP)
1300 					do_work = B_TRUE;
1301 
1302 			if (do_work == B_TRUE) {
1303 				down_token->opcode = RSMKA_IPC_DOWN;
1304 				enqueue_work(down_token);
1305 			}
1306 			mutex_exit(&work_queue.work_mutex);
1307 
1308 			if (do_work == B_FALSE)
1309 				PATH_RELE(path);
1310 
1311 			if (cancelled) {
1312 				PATH_RELE(path);
1313 			}
1314 			sendq_token = sendq_token->next;
1315 		}
1316 
1317 		/*
1318 		 * Now that all the work is enqueued, wakeup the work
1319 		 * thread.
1320 		 */
1321 		mutex_enter(&work_queue.work_mutex);
1322 		cv_signal(&work_queue.work_cv);
1323 		mutex_exit(&work_queue.work_mutex);
1324 
1325 		IPCINFO_RELE_NOLOCK(ipc_info);
1326 		mutex_exit(&ipc_info_lock);
1327 
1328 	} else {
1329 		/* get locked ipc_info descriptor */
1330 		ipc_info = lookup_ipc_info(remote_node);
1331 
1332 		sendq_token = ipc_info->token_list;
1333 		while (sendq_token != NULL) {
1334 			path = SQ_TOKEN_TO_PATH(sendq_token);
1335 			DBG_PRINTF((category, RSM_DEBUG,
1336 			    "rsmka_disconnect_node: path_down"
1337 			    "for path = %x\n",
1338 			    path));
1339 			(void) rsmka_path_down(0, 0, 0, 0,
1340 			    path, RSMKA_USE_COOKIE);
1341 			sendq_token = sendq_token->next;
1342 			if (sendq_token == ipc_info->token_list)
1343 				break;
1344 		}
1345 		mutex_exit(&ipc_info_lock);
1346 	}
1347 
1348 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1349 	    "rsmka_disconnect_node done\n"));
1350 }
1351 
1352 
1353 /*
1354  * Called from rsm_node_alive - if a path to a remote node is in
1355  * state RSMKA_PATH_UP, transition the state to RSMKA_PATH_ACTIVE with a
1356  * call to rsmka_do_path_active.
1357  *
1358  * REF_CNT:
1359  * The path descriptor ref_cnt is incremented here; it will be decremented
1360  * when path up processing is completed in rsmka_do_path_active or by the work
1361  * thread if the path up is deferred.
1362  */
1363 static void
1364 pathup_to_pathactive(ipc_info_t *ipc_info, rsm_node_id_t remote_node)
1365 {
1366 	path_t		*path;
1367 	sendq_token_t	*token;
1368 
1369 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1370 	    "pathup_to_pathactive enter\n"));
1371 
1372 	remote_node = remote_node;
1373 
1374 	ASSERT(MUTEX_HELD(&ipc_info_lock));
1375 
1376 	token = ipc_info->token_list;
1377 	while (token != NULL) {
1378 		path = SQ_TOKEN_TO_PATH(token);
1379 		mutex_enter(&path->mutex);
1380 		if (path->state == RSMKA_PATH_UP)  {
1381 			PATH_HOLD_NOLOCK(path);
1382 			(void) rsmka_do_path_active(path, 0);
1383 		}
1384 		mutex_exit(&path->mutex);
1385 		token = token->next;
1386 		if (token == ipc_info->token_list)
1387 			break;
1388 	}
1389 
1390 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1391 	    "pathup_to_pathactive done\n"));
1392 }
1393 
1394 /*
1395  * Called from pathup_to_pathactive and do_path_up. The objective is to
1396  * create an ipc send queue and transition to state RSMKA_PATH_ACTIVE.
1397  * For the no sleep case we may need to defer the work using a token.
1398  *
1399  */
1400 boolean_t
1401 rsmka_do_path_active(path_t *path, int flags)
1402 {
1403 	work_token_t	*up_token = &path->work_token[RSMKA_IPC_UP_INDEX];
1404 	work_token_t	*down_token = &path->work_token[RSMKA_IPC_DOWN_INDEX];
1405 	boolean_t	do_work = B_FALSE;
1406 	int		error;
1407 	timespec_t	tv;
1408 	adapter_t	*adapter;
1409 	rsm_send_q_handle_t	sqhdl;
1410 
1411 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1412 	    "rsmka_do_path_active enter\n"));
1413 
1414 	ASSERT(MUTEX_HELD(&path->mutex));
1415 
1416 	if (flags & RSMKA_NO_SLEEP) {
1417 		mutex_enter(&work_queue.work_mutex);
1418 
1419 		/* if a down token is enqueued, remove it */
1420 		if (cancel_work(down_token)) {
1421 			PATH_RELE_NOLOCK(path);
1422 		}
1423 
1424 		/*
1425 		 * If the path is not active and up work hasn't
1426 		 * already been setup then up work is needed.
1427 		 * else
1428 		 * if down work wasn't canceled because it was
1429 		 * already being processed then up work is needed
1430 		 */
1431 		if (path->state != RSMKA_PATH_ACTIVE) {
1432 			if (up_token->opcode == 0)
1433 				do_work = B_TRUE;
1434 		} else
1435 			if (down_token->opcode == RSMKA_IPC_DOWN)
1436 				do_work = B_TRUE;
1437 
1438 		if (do_work == B_TRUE) {
1439 			up_token->opcode = RSMKA_IPC_UP;
1440 			enqueue_work(up_token);
1441 		}
1442 		else
1443 			PATH_RELE_NOLOCK(path);
1444 
1445 		mutex_exit(&work_queue.work_mutex);
1446 		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1447 		    "rsmka_do_path_active done\n"));
1448 		return (B_TRUE);
1449 	} else {
1450 		/*
1451 		 * Drop the path lock before calling create_ipc_sendq, shouldn't
1452 		 * hold locks across calls to RSMPI routines.
1453 		 */
1454 		mutex_exit(&path->mutex);
1455 
1456 		error = create_ipc_sendq(path);
1457 
1458 		mutex_enter(&path->mutex);
1459 		if (path->state != RSMKA_PATH_UP) {
1460 			/*
1461 			 * path state has changed, if sendq was created,
1462 			 * destroy it and return
1463 			 */
1464 			if (error == RSM_SUCCESS) {
1465 				sqhdl = path->sendq_token.rsmpi_sendq_handle;
1466 				path->sendq_token.rsmpi_sendq_handle = NULL;
1467 				adapter = path->local_adapter;
1468 				mutex_exit(&path->mutex);
1469 
1470 				if (sqhdl != NULL) {
1471 					adapter->rsmpi_ops->rsm_sendq_destroy(
1472 						sqhdl);
1473 				}
1474 				mutex_enter(&path->mutex);
1475 			}
1476 			PATH_RELE_NOLOCK(path);
1477 
1478 			DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1479 			    "rsmka_do_path_active done: path=%lx not UP\n",
1480 			    (uintptr_t)path));
1481 			return (error ? B_FALSE : B_TRUE);
1482 		}
1483 
1484 		if (error == RSM_SUCCESS) {
1485 			/* clear flag since sendq_create succeeded */
1486 			path->flags &= ~RSMKA_SQCREATE_PENDING;
1487 			path->state = RSMKA_PATH_ACTIVE;
1488 			/*
1489 			 * now that path is active we send the
1490 			 * RSMIPC_MSG_SQREADY to the remote endpoint
1491 			 */
1492 			path->procmsg_cnt = 0;
1493 			path->sendq_token.msgbuf_avail = 0;
1494 
1495 			/* Calculate local incarnation number */
1496 			gethrestime(&tv);
1497 			if (tv.tv_sec == RSM_UNKNOWN_INCN)
1498 				tv.tv_sec = 1;
1499 			path->local_incn = (int64_t)tv.tv_sec;
1500 
1501 			/*
1502 			 * if send fails here its due to some non-transient
1503 			 * error because QUEUE_FULL is not possible here since
1504 			 * we are the first message on this sendq. The error
1505 			 * will cause the path to go down anyways so ignore
1506 			 * the return value
1507 			 */
1508 			(void) rsmipc_send_controlmsg(path, RSMIPC_MSG_SQREADY);
1509 			/* wait for SQREADY_ACK message */
1510 			path->flags |= RSMKA_WAIT_FOR_SQACK;
1511 
1512 			DBG_PRINTF((category, RSM_DEBUG,
1513 			    "rsmka_do_path_active success\n"));
1514 		} else {
1515 			/*
1516 			 * sendq create failed possibly because
1517 			 * the remote end is not yet ready eg.
1518 			 * handler not registered, set a flag
1519 			 * so that when there is an indication
1520 			 * that the remote end is ready rsmka_do_path_active
1521 			 * will be retried.
1522 			 */
1523 			path->flags |= RSMKA_SQCREATE_PENDING;
1524 		}
1525 
1526 		PATH_RELE_NOLOCK(path);
1527 
1528 		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1529 		    "rsmka_do_path_active done\n"));
1530 		return (error ? B_FALSE : B_TRUE);
1531 	}
1532 
1533 }
1534 
1535 /*
1536  * Called from rsm_path_up.
1537  * If the remote node state is "alive" then call rsmka_do_path_active
1538  * otherwise just transition path state to RSMKA_PATH_UP.
1539  */
1540 static boolean_t
1541 do_path_up(path_t *path, int flags)
1542 {
1543 	boolean_t	rval;
1544 	boolean_t	node_alive;
1545 
1546 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "do_path_up enter\n"));
1547 
1548 	ASSERT(MUTEX_HELD(&path->mutex));
1549 
1550 	/* path moved to ACTIVE by rsm_sqcreateop_callback - just return */
1551 	if (path->state == RSMKA_PATH_ACTIVE) {
1552 		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1553 			"do_path_up done: already ACTIVE\n"));
1554 		PATH_RELE_NOLOCK(path);
1555 		return (B_TRUE);
1556 	}
1557 
1558 	path->state = RSMKA_PATH_UP;
1559 
1560 	/* initialize the receive msgbuf counters */
1561 	path->msgbuf_head = 0;
1562 	path->msgbuf_tail = RSMIPC_MAX_MESSAGES - 1;
1563 	path->msgbuf_cnt = 0;
1564 	path->procmsg_cnt = 0;
1565 	/*
1566 	 * rsmka_check_node_alive acquires ipc_info_lock, in order to maintain
1567 	 * correct lock ordering drop the path lock before calling it.
1568 	 */
1569 	mutex_exit(&path->mutex);
1570 
1571 	node_alive = rsmka_check_node_alive(path->remote_node);
1572 
1573 	mutex_enter(&path->mutex);
1574 	if (node_alive == B_TRUE)
1575 		rval = rsmka_do_path_active(path, flags);
1576 	else {
1577 		PATH_RELE_NOLOCK(path);
1578 		rval = B_TRUE;
1579 	}
1580 
1581 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "do_path_up done\n"));
1582 	return (rval);
1583 }
1584 
1585 
1586 
1587 /*
1588  * Called from rsm_remove_path, rsm_path_down, deferred_work.
1589  * Destroy the send queue on this path.
1590  * Disconnect segments being imported from the remote node
1591  * Disconnect segments being imported by the remote node
1592  *
1593  */
1594 static void
1595 do_path_down(path_t *path, int flags)
1596 {
1597 	work_token_t *up_token = &path->work_token[RSMKA_IPC_UP_INDEX];
1598 	work_token_t *down_token = &path->work_token[RSMKA_IPC_DOWN_INDEX];
1599 	boolean_t do_work = B_FALSE;
1600 
1601 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "do_path_down enter\n"));
1602 
1603 	ASSERT(MUTEX_HELD(&path->mutex));
1604 
1605 	if (flags & RSMKA_NO_SLEEP) {
1606 		mutex_enter(&work_queue.work_mutex);
1607 		DBG_PRINTF((category, RSM_DEBUG,
1608 		    "do_path_down: after work_mutex\n"));
1609 
1610 		/* if an up token is enqueued, remove it */
1611 		if (cancel_work(up_token)) {
1612 			PATH_RELE_NOLOCK(path);
1613 		}
1614 
1615 		/*
1616 		 * If the path is active and down work hasn't
1617 		 * already been setup then down work is needed.
1618 		 * else
1619 		 * if up work wasn't canceled because it was
1620 		 * already being processed then down work is needed
1621 		 */
1622 		if (path->state == RSMKA_PATH_ACTIVE) {
1623 			if (down_token->opcode == 0)
1624 				do_work = B_TRUE;
1625 		} else
1626 			if (up_token->opcode == RSMKA_IPC_UP)
1627 				do_work = B_TRUE;
1628 
1629 		if (do_work == B_TRUE) {
1630 			down_token->opcode = RSMKA_IPC_DOWN;
1631 			enqueue_work(down_token);
1632 		} else
1633 			PATH_RELE_NOLOCK(path);
1634 
1635 
1636 		mutex_exit(&work_queue.work_mutex);
1637 
1638 	} else {
1639 
1640 		/*
1641 		 * Change state of the path to RSMKA_PATH_GOING_DOWN and
1642 		 * release the path mutex. Any other thread referring
1643 		 * this path would cv_wait till the state of the path
1644 		 * remains RSMKA_PATH_GOING_DOWN.
1645 		 * On completing the path down processing, change the
1646 		 * state of RSMKA_PATH_DOWN indicating that the path
1647 		 * is indeed down.
1648 		 */
1649 		path->state = RSMKA_PATH_GOING_DOWN;
1650 
1651 		/*
1652 		 * clear the WAIT_FOR_SQACK flag since path is going down.
1653 		 */
1654 		path->flags &= ~RSMKA_WAIT_FOR_SQACK;
1655 
1656 		/*
1657 		 * this wakes up any thread waiting to receive credits
1658 		 * in rsmipc_send to tell it that the path is going down
1659 		 */
1660 		cv_broadcast(&path->sendq_token.sendq_cv);
1661 
1662 		mutex_exit(&path->mutex);
1663 
1664 		/*
1665 		 * drain the messages from the receive msgbuf, the
1666 		 * tasks in the taskq_thread acquire the path->mutex
1667 		 * so we drop the path mutex before taskq_wait.
1668 		 */
1669 		taskq_wait(path->recv_taskq);
1670 
1671 		/*
1672 		 * Disconnect segments being imported from the remote node
1673 		 * The path_importer_disconnect function needs to be called
1674 		 * only after releasing the mutex on the path. This is to
1675 		 * avoid a recursive mutex enter when doing the
1676 		 * rsmka_get_sendq_token.
1677 		 */
1678 		path_importer_disconnect(path);
1679 
1680 		/*
1681 		 * Get the path mutex, change the state of the path to
1682 		 * RSMKA_PATH_DOWN since the path down processing has
1683 		 * completed and cv_signal anyone who was waiting since
1684 		 * the state was RSMKA_PATH_GOING_DOWN.
1685 		 * NOTE: Do not do a mutex_exit here. We entered this
1686 		 * routine with the path lock held by the caller. The
1687 		 * caller eventually releases the path lock by doing a
1688 		 * mutex_exit.
1689 		 */
1690 		mutex_enter(&path->mutex);
1691 
1692 #ifdef DEBUG
1693 		/*
1694 		 * Some IPC messages left in the recv_buf,
1695 		 * they'll be dropped
1696 		 */
1697 		if (path->msgbuf_cnt != 0)
1698 			cmn_err(CE_NOTE, "path=%lx msgbuf_cnt != 0\n",
1699 			    (uintptr_t)path);
1700 #endif
1701 		while (path->sendq_token.ref_cnt != 0)
1702 			cv_wait(&path->sendq_token.sendq_cv,
1703 			    &path->mutex);
1704 
1705 		/* release the rsmpi handle */
1706 		if (path->sendq_token.rsmpi_sendq_handle != NULL)
1707 			path->local_adapter->rsmpi_ops->rsm_sendq_destroy(
1708 			    path->sendq_token.rsmpi_sendq_handle);
1709 
1710 		path->sendq_token.rsmpi_sendq_handle = NULL;
1711 
1712 		path->state = RSMKA_PATH_DOWN;
1713 
1714 		cv_signal(&path->hold_cv);
1715 
1716 	}
1717 
1718 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "do_path_down done\n"));
1719 
1720 }
1721 
1722 /*
1723  * Search through the list of imported segments for segments using this path
1724  * and unload the memory mappings for each one.  The application will
1725  * get an error return when a barrier close is invoked.
1726  * NOTE: This function has to be called only after releasing the mutex on
1727  * the path. This is to avoid any recursive mutex panics on the path mutex
1728  * since the path_importer_disconnect function would end up calling
1729  * rsmka_get_sendq_token which requires the path mutex.
1730  */
1731 
1732 static void
1733 path_importer_disconnect(path_t *path)
1734 {
1735 	int i;
1736 	adapter_t *adapter = path->local_adapter;
1737 	rsm_node_id_t remote_node = path->remote_node;
1738 	rsmresource_t		*p = NULL;
1739 	rsmseg_t *seg;
1740 
1741 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1742 	    "path_importer_disconnect enter\n"));
1743 
1744 	rw_enter(&rsm_import_segs.rsmhash_rw, RW_READER);
1745 
1746 	if (rsm_import_segs.bucket != NULL) {
1747 		for (i = 0; i < rsm_hash_size; i++) {
1748 			p = rsm_import_segs.bucket[i];
1749 			for (; p; p = p->rsmrc_next) {
1750 				if ((p->rsmrc_node == remote_node) &&
1751 				    (p->rsmrc_adapter == adapter)) {
1752 					seg = (rsmseg_t *)p;
1753 			/*
1754 			 * In order to make rsmseg_unload and
1755 			 * path_importer_disconnect thread safe, acquire the
1756 			 * segment lock here. rsmseg_unload is responsible for
1757 			 * releasing the lock. rsmseg_unload releases the lock
1758 			 * just before a call to rsmipc_send or in case of an
1759 			 * early exit which occurs if the segment was in the
1760 			 * state RSM_STATE_CONNECTING or RSM_STATE_NEW.
1761 			 */
1762 					rsmseglock_acquire(seg);
1763 					seg->s_flags |= RSM_FORCE_DISCONNECT;
1764 					rsmseg_unload(seg);
1765 				}
1766 			}
1767 		}
1768 	}
1769 	rw_exit(&rsm_import_segs.rsmhash_rw);
1770 
1771 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1772 	    "path_importer_disconnect done\n"));
1773 }
1774 
1775 
1776 
1777 
1778 /*
1779  *
1780  * ADAPTER UTILITY FUNCTIONS
1781  *
1782  */
1783 
1784 
1785 
1786 /*
1787  * Allocate new adapter list head structure and add it to the beginning of
1788  * the list of adapter list heads.  There is one list for each adapter
1789  * device name (or type).
1790  */
1791 static adapter_listhead_t *
1792 init_listhead(char *name)
1793 {
1794 	adapter_listhead_t *listhead;
1795 
1796 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "init_listhead enter\n"));
1797 
1798 	/* allocation and initialization */
1799 	listhead = kmem_zalloc(sizeof (adapter_listhead_t), KM_SLEEP);
1800 	mutex_init(&listhead->mutex, NULL, MUTEX_DEFAULT, NULL);
1801 	(void) strcpy(listhead->adapter_devname, name);
1802 
1803 	/* link into list of listheads */
1804 	mutex_enter(&adapter_listhead_base.listlock);
1805 	if (adapter_listhead_base.next == NULL) {
1806 		adapter_listhead_base.next = listhead;
1807 		listhead->next_listhead = NULL;
1808 	} else {
1809 		listhead->next_listhead = adapter_listhead_base.next;
1810 		adapter_listhead_base.next = listhead;
1811 	}
1812 	mutex_exit(&adapter_listhead_base.listlock);
1813 
1814 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "init_listhead done\n"));
1815 
1816 	return (listhead);
1817 }
1818 
1819 
1820 /*
1821  * Search the list of adapter list heads for a match on name.
1822  *
1823  */
1824 static adapter_listhead_t *
1825 lookup_adapter_listhead(char *name)
1826 {
1827 	adapter_listhead_t *listhead;
1828 
1829 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1830 	    "lookup_adapter_listhead enter\n"));
1831 
1832 	mutex_enter(&adapter_listhead_base.listlock);
1833 	listhead = adapter_listhead_base.next;
1834 	while (listhead != NULL) {
1835 		if (strcmp(name, listhead->adapter_devname) == 0)
1836 			break;
1837 		listhead = listhead->next_listhead;
1838 	}
1839 	mutex_exit(&adapter_listhead_base.listlock);
1840 
1841 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1842 	    "lookup_adapter_listhead done\n"));
1843 
1844 	return (listhead);
1845 }
1846 
1847 
1848 /*
1849  * Get the adapter list head corresponding to devname and search for
1850  * an adapter descriptor with a match on the instance number. If
1851  * successful, increment the descriptor reference count and return
1852  * the descriptor pointer to the caller.
1853  *
1854  */
1855 adapter_t *
1856 rsmka_lookup_adapter(char *devname, int instance)
1857 {
1858 	adapter_listhead_t *listhead;
1859 	adapter_t *current = NULL;
1860 
1861 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1862 	    "rsmka_lookup_adapter enter\n"));
1863 
1864 	listhead = lookup_adapter_listhead(devname);
1865 	if (listhead != NULL) {
1866 		mutex_enter(&listhead->mutex);
1867 
1868 		current = listhead->next_adapter;
1869 		while (current != NULL) {
1870 			if (current->instance == instance) {
1871 				ADAPTER_HOLD(current);
1872 				break;
1873 			} else
1874 				current = current->next;
1875 		}
1876 
1877 		mutex_exit(&listhead->mutex);
1878 	}
1879 
1880 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1881 	    "rsmka_lookup_adapter done\n"));
1882 
1883 	return (current);
1884 }
1885 
1886 /*
1887  * Called from rsmka_remove_adapter or rsmseg_free.
1888  * rsm_bind() and rsm_connect() store the adapter pointer returned
1889  * from rsmka_getadapter.  The pointer is kept in the segment descriptor.
1890  * When the segment is freed, this routine is called by rsmseg_free to decrement
1891  * the adapter descriptor reference count and possibly free the
1892  * descriptor.
1893  */
1894 void
1895 rsmka_release_adapter(adapter_t *adapter)
1896 {
1897 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1898 	    "rsmka_release_adapter enter\n"));
1899 
1900 	if (adapter == &loopback_adapter) {
1901 		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1902 		    "rsmka_release_adapter done\n"));
1903 		return;
1904 	}
1905 
1906 	mutex_enter(&adapter->mutex);
1907 
1908 	/* decrement reference count */
1909 	ADAPTER_RELE_NOLOCK(adapter);
1910 
1911 	/*
1912 	 * if the adapter descriptor reference count is equal to the
1913 	 * initialization value of one, then the descriptor has been
1914 	 * unlinked and can now be freed.
1915 	 */
1916 	if (adapter->ref_cnt == 1) {
1917 		mutex_exit(&adapter->mutex);
1918 
1919 		mutex_destroy(&adapter->mutex);
1920 		kmem_free(adapter->hdlr_argp, sizeof (srv_handler_arg_t));
1921 		kmem_free(adapter, sizeof (adapter_t));
1922 	}
1923 	else
1924 		mutex_exit(&adapter->mutex);
1925 
1926 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
1927 	    "rsmka_release_adapter done\n"));
1928 
1929 }
1930 
1931 
1932 
1933 /*
1934  * Singly linked list. Add to the front.
1935  */
1936 static void
1937 link_adapter(adapter_t *adapter)
1938 {
1939 
1940 	adapter_listhead_t *listhead;
1941 	adapter_t *current;
1942 
1943 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "link_adapter enter\n"));
1944 
1945 	mutex_enter(&adapter_listhead_base.listlock);
1946 
1947 	mutex_enter(&adapter->listhead->mutex);
1948 
1949 	listhead = adapter->listhead;
1950 	current = listhead->next_adapter;
1951 	listhead->next_adapter = adapter;
1952 	adapter->next = current;
1953 	ADAPTER_HOLD(adapter);
1954 
1955 	adapter->listhead->adapter_count++;
1956 
1957 	mutex_exit(&adapter->listhead->mutex);
1958 
1959 	mutex_exit(&adapter_listhead_base.listlock);
1960 
1961 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "link_adapter done\n"));
1962 }
1963 
1964 
1965 /*
1966  * Return adapter descriptor
1967  *
1968  * lookup_adapter_listhead returns with the the list of adapter listheads
1969  * locked.  After adding the adapter descriptor, the adapter listhead list
1970  * lock is dropped.
1971  */
1972 static adapter_t *
1973 init_adapter(char *name, int instance, rsm_addr_t hwaddr,
1974     rsm_controller_handle_t handle, rsm_ops_t *ops,
1975     srv_handler_arg_t *hdlr_argp)
1976 {
1977 	adapter_t *adapter;
1978 	adapter_listhead_t *listhead;
1979 
1980 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "init_adapter enter\n"));
1981 
1982 	adapter = kmem_zalloc(sizeof (adapter_t), KM_SLEEP);
1983 	adapter->instance = instance;
1984 	adapter->hwaddr = hwaddr;
1985 	adapter->rsmpi_handle = handle;
1986 	adapter->rsmpi_ops = ops;
1987 	adapter->hdlr_argp = hdlr_argp;
1988 	mutex_init(&adapter->mutex, NULL, MUTEX_DEFAULT, NULL);
1989 	ADAPTER_HOLD(adapter);
1990 
1991 
1992 	listhead = lookup_adapter_listhead(name);
1993 	if (listhead == NULL)  {
1994 		listhead = init_listhead(name);
1995 	}
1996 
1997 	adapter->listhead = listhead;
1998 
1999 	link_adapter(adapter);
2000 
2001 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "init_adapter done\n"));
2002 
2003 	return (adapter);
2004 }
2005 
2006 /*
2007  *
2008  * PATH UTILITY FUNCTIONS
2009  *
2010  */
2011 
2012 
2013 /*
2014  * Search the per adapter path list for a match on remote node and
2015  * hwaddr.  The path ref_cnt must be greater than zero or the path
2016  * is in the process of being removed.
2017  *
2018  * Acquire the path lock and increment the path hold count.
2019  */
2020 static path_t *
2021 lookup_path(char *adapter_devname, int adapter_instance,
2022     rsm_node_id_t remote_node, rsm_addr_t hwaddr)
2023 {
2024 	path_t		*current;
2025 	adapter_t	*adapter;
2026 
2027 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "lookup_path enter\n"));
2028 
2029 	adapter = rsmka_lookup_adapter(adapter_devname, adapter_instance);
2030 	ASSERT(adapter != NULL);
2031 
2032 	mutex_enter(&adapter->listhead->mutex);
2033 
2034 	/* start at the list head */
2035 	current = adapter->next_path;
2036 
2037 	while (current != NULL) {
2038 		if ((current->remote_node == remote_node) &&
2039 		    (current->remote_hwaddr == hwaddr) &&
2040 		    (current->ref_cnt > 0))
2041 			break;
2042 		else
2043 			current = current->next_path;
2044 	}
2045 	if (current != NULL) {
2046 		mutex_enter(&current->mutex);
2047 		PATH_HOLD_NOLOCK(current);
2048 	}
2049 
2050 	mutex_exit(&adapter->listhead->mutex);
2051 	ADAPTER_RELE(adapter);
2052 
2053 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "lookup_path done\n"));
2054 
2055 	return (current);
2056 }
2057 
2058 /*
2059  * This interface is similar to lookup_path but takes only the local
2060  * adapter name, instance and remote adapters hwaddr to identify the
2061  * path. This is used in the interrupt handler routines where nodeid
2062  * is not always available.
2063  */
2064 path_t *
2065 rsm_find_path(char *adapter_devname, int adapter_instance, rsm_addr_t hwaddr)
2066 {
2067 	path_t		*current;
2068 	adapter_t	*adapter;
2069 
2070 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsm_find_path enter\n"));
2071 
2072 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2073 	    "rsm_find_path:adapter=%s:%d,rem=%llx\n",
2074 	    adapter_devname, adapter_instance, hwaddr));
2075 
2076 	adapter = rsmka_lookup_adapter(adapter_devname, adapter_instance);
2077 
2078 	/*
2079 	 * its possible that we are here due to an interrupt but the adapter
2080 	 * has been removed after we received the callback.
2081 	 */
2082 	if (adapter == NULL)
2083 		return (NULL);
2084 
2085 	mutex_enter(&adapter->listhead->mutex);
2086 
2087 	/* start at the list head */
2088 	current = adapter->next_path;
2089 
2090 	while (current != NULL) {
2091 		if ((current->remote_hwaddr == hwaddr) &&
2092 		    (current->ref_cnt > 0))
2093 			break;
2094 		else
2095 			current = current->next_path;
2096 	}
2097 	if (current != NULL) {
2098 		mutex_enter(&current->mutex);
2099 		PATH_HOLD_NOLOCK(current);
2100 	}
2101 
2102 	mutex_exit(&adapter->listhead->mutex);
2103 
2104 	rsmka_release_adapter(adapter);
2105 
2106 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsm_find_path done\n"));
2107 
2108 	return (current);
2109 }
2110 
2111 
2112 /*
2113  * Add the path to the head of the (per adapter) list of paths
2114  */
2115 static void
2116 link_path(path_t *path)
2117 {
2118 
2119 	adapter_t *adapter = path->local_adapter;
2120 	path_t *first_path;
2121 
2122 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "link_path enter\n"));
2123 
2124 	mutex_enter(&adapter_listhead_base.listlock);
2125 
2126 	mutex_enter(&adapter->listhead->mutex);
2127 
2128 	first_path = adapter->next_path;
2129 	adapter->next_path = path;
2130 	path->next_path = first_path;
2131 
2132 	adapter->listhead->path_count++;
2133 
2134 	mutex_exit(&adapter->listhead->mutex);
2135 
2136 	mutex_exit(&adapter_listhead_base.listlock);
2137 
2138 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "link_path done\n"));
2139 }
2140 
2141 /*
2142  * Search the per-adapter list of paths for the specified path, beginning
2143  * at the head of the list.  Unlink the path and free the descriptor
2144  * memory.
2145  */
2146 static void
2147 destroy_path(path_t *path)
2148 {
2149 
2150 	adapter_t *adapter = path->local_adapter;
2151 	path_t *prev, *current;
2152 
2153 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "destroy_path enter\n"));
2154 
2155 	mutex_enter(&adapter_listhead_base.listlock);
2156 
2157 	mutex_enter(&path->local_adapter->listhead->mutex);
2158 	ASSERT(path->ref_cnt == 0);
2159 
2160 	/* start at the list head */
2161 	prev = NULL;
2162 	current =  adapter->next_path;
2163 
2164 	while (current != NULL) {
2165 		if (path->remote_node == current->remote_node &&
2166 		    path->remote_hwaddr == current->remote_hwaddr)
2167 			break;
2168 		else {
2169 			prev = current;
2170 			current = current->next_path;
2171 		}
2172 	}
2173 
2174 	if (prev == NULL)
2175 		adapter->next_path = current->next_path;
2176 	else
2177 		prev->next_path = current->next_path;
2178 
2179 	path->local_adapter->listhead->path_count--;
2180 
2181 	mutex_exit(&path->local_adapter->listhead->mutex);
2182 
2183 	mutex_exit(&adapter_listhead_base.listlock);
2184 
2185 	taskq_destroy(path->recv_taskq);
2186 
2187 	kmem_free(path->msgbuf_queue,
2188 	    RSMIPC_MAX_MESSAGES * sizeof (msgbuf_elem_t));
2189 
2190 	mutex_destroy(&current->mutex);
2191 	cv_destroy(&current->sendq_token.sendq_cv);
2192 	cv_destroy(&path->hold_cv);
2193 	kmem_free(current, sizeof (path_t));
2194 
2195 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "destroy_path done\n"));
2196 }
2197 
2198 void
2199 rsmka_enqueue_msgbuf(path_t *path, void *data)
2200 {
2201 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2202 	    "rsmka_enqueue_msgbuf enter\n"));
2203 
2204 	ASSERT(MUTEX_HELD(&path->mutex));
2205 
2206 	ASSERT(path->msgbuf_cnt < RSMIPC_MAX_MESSAGES);
2207 
2208 	/* increment the count and advance the tail */
2209 
2210 	path->msgbuf_cnt++;
2211 
2212 	if (path->msgbuf_tail == RSMIPC_MAX_MESSAGES - 1) {
2213 		path->msgbuf_tail = 0;
2214 	} else {
2215 		path->msgbuf_tail++;
2216 	}
2217 
2218 	path->msgbuf_queue[path->msgbuf_tail].active = B_TRUE;
2219 
2220 	bcopy(data, &(path->msgbuf_queue[path->msgbuf_tail].msg),
2221 	    sizeof (rsmipc_request_t));
2222 
2223 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2224 	    "rsmka_enqueue_msgbuf done\n"));
2225 
2226 }
2227 
2228 /*
2229  * get the head of the queue using rsmka_gethead_msgbuf and then call
2230  * rsmka_dequeue_msgbuf to remove it.
2231  */
2232 void
2233 rsmka_dequeue_msgbuf(path_t *path)
2234 {
2235 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2236 	    "rsmka_dequeue_msgbuf enter\n"));
2237 
2238 	ASSERT(MUTEX_HELD(&path->mutex));
2239 
2240 	if (path->msgbuf_cnt == 0)
2241 		return;
2242 
2243 	path->msgbuf_cnt--;
2244 
2245 	path->msgbuf_queue[path->msgbuf_head].active = B_FALSE;
2246 
2247 	if (path->msgbuf_head == RSMIPC_MAX_MESSAGES - 1) {
2248 		path->msgbuf_head = 0;
2249 	} else {
2250 		path->msgbuf_head++;
2251 	}
2252 
2253 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2254 	    "rsmka_dequeue_msgbuf done\n"));
2255 
2256 }
2257 
2258 msgbuf_elem_t *
2259 rsmka_gethead_msgbuf(path_t *path)
2260 {
2261 	msgbuf_elem_t	*head;
2262 
2263 	ASSERT(MUTEX_HELD(&path->mutex));
2264 
2265 	if (path->msgbuf_cnt == 0)
2266 		return (NULL);
2267 
2268 	head = &path->msgbuf_queue[path->msgbuf_head];
2269 
2270 	return (head);
2271 
2272 }
2273 /*
2274  * Called by rsm_connect which needs the hardware address of the
2275  * remote adapter.  A search is done through the paths for the local
2276  * adapter for a match on the specified remote node.
2277  */
2278 rsm_node_id_t
2279 get_remote_nodeid(adapter_t *adapter, rsm_addr_t remote_hwaddr)
2280 {
2281 
2282 	rsm_node_id_t remote_node;
2283 	path_t	   *current = adapter->next_path;
2284 
2285 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_remote_nodeid enter\n"));
2286 
2287 	mutex_enter(&adapter->listhead->mutex);
2288 	while (current != NULL) {
2289 		if (current->remote_hwaddr == remote_hwaddr) {
2290 			remote_node = current->remote_node;
2291 			break;
2292 		}
2293 		current = current->next_path;
2294 	}
2295 
2296 	if (current == NULL)
2297 		remote_node = (rsm_node_id_t)-1;
2298 
2299 	mutex_exit(&adapter->listhead->mutex);
2300 
2301 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_remote_nodeid done\n"));
2302 
2303 	return (remote_node);
2304 }
2305 
2306 /*
2307  * Called by rsm_connect which needs the hardware address of the
2308  * remote adapter.  A search is done through the paths for the local
2309  * adapter for a match on the specified remote node.
2310  */
2311 rsm_addr_t
2312 get_remote_hwaddr(adapter_t *adapter, rsm_node_id_t remote_node)
2313 {
2314 
2315 	rsm_addr_t remote_hwaddr;
2316 	path_t	   *current = adapter->next_path;
2317 
2318 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_remote_hwaddr enter\n"));
2319 
2320 	mutex_enter(&adapter->listhead->mutex);
2321 	while (current != NULL) {
2322 		if (current->remote_node == remote_node) {
2323 			remote_hwaddr = current->remote_hwaddr;
2324 			break;
2325 		}
2326 		current = current->next_path;
2327 	}
2328 	if (current == NULL)
2329 		remote_hwaddr = -1;
2330 	mutex_exit(&adapter->listhead->mutex);
2331 
2332 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_remote_hwaddr done\n"));
2333 
2334 	return (remote_hwaddr);
2335 }
2336 /*
2337  * IPC UTILITY FUNCTIONS
2338  */
2339 
2340 
2341 /*
2342  * If an entry exists, return with the ipc_info_lock held
2343  */
2344 static ipc_info_t *
2345 lookup_ipc_info(rsm_node_id_t remote_node)
2346 {
2347 	ipc_info_t  *ipc_info;
2348 
2349 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "lookup_ipc_info enter\n"));
2350 
2351 	mutex_enter(&ipc_info_lock);
2352 
2353 	ipc_info = ipc_info_head;
2354 	if (ipc_info == NULL) {
2355 		mutex_exit(&ipc_info_lock);
2356 		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2357 		    "lookup_ipc_info done: ipc_info is NULL\n"));
2358 		return (NULL);
2359 	}
2360 
2361 	while (ipc_info->remote_node != remote_node) {
2362 		ipc_info = ipc_info->next;
2363 		if (ipc_info == NULL) {
2364 			DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2365 			    "lookup_ipc_info: ipc_info not found\n"));
2366 			mutex_exit(&ipc_info_lock);
2367 			break;
2368 		}
2369 	}
2370 
2371 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "lookup_ipc_info done\n"));
2372 
2373 	return (ipc_info);
2374 }
2375 
2376 /*
2377  * Create an ipc_info descriptor and return with ipc_info_lock held
2378  */
2379 static ipc_info_t *
2380 init_ipc_info(rsm_node_id_t remote_node, boolean_t state)
2381 {
2382 	ipc_info_t *ipc_info;
2383 
2384 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "init_ipc_info enter\n"));
2385 
2386 	/*
2387 	 * allocate an ipc_info descriptor and add it to a
2388 	 * singly linked list
2389 	 */
2390 
2391 	ipc_info = kmem_zalloc(sizeof (ipc_info_t), KM_SLEEP);
2392 	ipc_info->remote_node = remote_node;
2393 	ipc_info->node_is_alive = state;
2394 
2395 	mutex_enter(&ipc_info_lock);
2396 	if (ipc_info_head == NULL) {
2397 		DBG_PRINTF((category, RSM_DEBUG,
2398 		    "init_ipc_info:ipc_info_head = %lx\n", ipc_info));
2399 		ipc_info_head = ipc_info;
2400 		ipc_info->next = NULL;
2401 	} else {
2402 		ipc_info->next = ipc_info_head;
2403 		ipc_info_head = ipc_info;
2404 	}
2405 
2406 	ipc_info->remote_node = remote_node;
2407 
2408 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "init_ipc_info done\n"));
2409 
2410 	return (ipc_info);
2411 }
2412 
2413 static void
2414 destroy_ipc_info(ipc_info_t *ipc_info)
2415 {
2416 	ipc_info_t *current = ipc_info_head;
2417 	ipc_info_t *prev;
2418 
2419 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "destroy_ipc_info enter\n"));
2420 
2421 	ASSERT(MUTEX_HELD(&ipc_info_lock));
2422 
2423 	while (current != ipc_info) {
2424 		prev = current;
2425 		current = current->next;
2426 	}
2427 	ASSERT(current != NULL);
2428 
2429 	if (current != ipc_info_head)
2430 		prev->next = current->next;
2431 	else
2432 		ipc_info_head = current->next;
2433 
2434 	kmem_free(current, sizeof (ipc_info_t));
2435 
2436 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "destroy_ipc_info done\n"));
2437 
2438 }
2439 
2440 /*
2441  * Sendq tokens are kept on a circular list.  If tokens A, B, C, & D are
2442  * on the list headed by ipc_info, then ipc_info points to A, A points to
2443  * D, D to C, C to B, and B to A.
2444  */
2445 static void
2446 link_sendq_token(sendq_token_t *token, rsm_node_id_t remote_node)
2447 {
2448 	ipc_info_t *ipc_info;
2449 
2450 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "link_sendq_token enter\n"));
2451 
2452 	ipc_info = lookup_ipc_info(remote_node);
2453 	if (ipc_info == NULL) {
2454 		ipc_info = init_ipc_info(remote_node, B_FALSE);
2455 		DBG_PRINTF((category, RSM_DEBUG,
2456 		    "link_sendq_token: new ipc_info = %lx\n", ipc_info));
2457 	}
2458 	else
2459 		DBG_PRINTF((category, RSM_DEBUG,
2460 		    "link_sendq_token: ipc_info = %lx\n", ipc_info));
2461 
2462 	if (ipc_info->token_list == NULL) {
2463 		ipc_info->token_list = token;
2464 		ipc_info->current_token = token;
2465 		DBG_PRINTF((category, RSM_DEBUG,
2466 		    "link_sendq_token: current = %lx\n", token));
2467 		token->next = token;
2468 	} else {
2469 		DBG_PRINTF((category, RSM_DEBUG,
2470 		    "link_sendq_token: token = %lx\n", token));
2471 		token->next = ipc_info->token_list->next;
2472 		ipc_info->token_list->next = token;
2473 		ipc_info->token_list = token;
2474 	}
2475 
2476 
2477 	mutex_exit(&ipc_info_lock);
2478 
2479 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "link_sendq_token done\n"));
2480 
2481 }
2482 
2483 static void
2484 unlink_sendq_token(sendq_token_t *token, rsm_node_id_t remote_node)
2485 {
2486 	sendq_token_t *prev, *start,  *current;
2487 	ipc_info_t *ipc_info;
2488 	path_t *path = SQ_TOKEN_TO_PATH(token);
2489 
2490 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2491 	    "unlink_sendq_token enter\n"));
2492 
2493 	ASSERT(path->ref_cnt == 0);
2494 
2495 	ipc_info = lookup_ipc_info(remote_node);
2496 	if (ipc_info == NULL) {
2497 		DBG_PRINTF((category, RSM_DEBUG,
2498 		    "ipc_info for %d not found\n", remote_node));
2499 		return;
2500 	}
2501 
2502 	prev = ipc_info->token_list;
2503 	start = current = ipc_info->token_list->next;
2504 
2505 	for (;;) {
2506 		if (current == token) {
2507 			if (current->next != current) {
2508 				DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2509 				    "found token, removed it\n"));
2510 				prev->next = token->next;
2511 				if (ipc_info->token_list == token)
2512 					ipc_info->token_list = prev;
2513 				ipc_info->current_token = token->next;
2514 			} else {
2515 				/* list will be empty  */
2516 				DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2517 				    "removed token, list empty\n"));
2518 				ipc_info->token_list = NULL;
2519 				ipc_info->current_token = NULL;
2520 			}
2521 			break;
2522 		}
2523 		prev = current;
2524 		current = current->next;
2525 		if (current == start) {
2526 			DBG_PRINTF((category, RSM_DEBUG,
2527 			    "unlink_sendq_token: token not found\n"));
2528 			break;
2529 		}
2530 	}
2531 	mutex_exit(&ipc_info_lock);
2532 
2533 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "unlink_sendq_token done\n"));
2534 }
2535 
2536 
2537 void
2538 rele_sendq_token(sendq_token_t *token)
2539 {
2540 	path_t *path;
2541 
2542 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rele_sendq_token enter\n"));
2543 
2544 	path = SQ_TOKEN_TO_PATH(token);
2545 	mutex_enter(&path->mutex);
2546 	PATH_RELE_NOLOCK(path);
2547 	SENDQ_TOKEN_RELE(path);
2548 	mutex_exit(&path->mutex);
2549 
2550 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rele_sendq_token done\n"));
2551 
2552 }
2553 
2554 /*
2555  * A valid ipc token can only be returned if the remote node is alive.
2556  * Tokens are on a circular list.  Starting with the current token
2557  * search for a token with an endpoint in state RSM_PATH_ACTIVE.
2558  * rsmipc_send which calls rsmka_get_sendq_token expects that if there are
2559  * multiple paths available between a node-pair then consecutive calls from
2560  * a particular invocation of rsmipc_send will return a sendq that is
2561  * different from the one that was used in the previous iteration. When
2562  * prev_used is NULL it indicates that this is the first interation in a
2563  * specific rsmipc_send invocation.
2564  *
2565  * Updating the current token provides round robin selection and this
2566  * is done only in the first iteration ie. when prev_used is NULL
2567  */
2568 sendq_token_t *
2569 rsmka_get_sendq_token(rsm_node_id_t remote_node, sendq_token_t *prev_used)
2570 {
2571 	sendq_token_t *token, *first_token;
2572 	path_t *path;
2573 	ipc_info_t *ipc_info;
2574 
2575 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2576 	    "rsmka_get_sendq_token enter\n"));
2577 
2578 	ipc_info = lookup_ipc_info(remote_node);
2579 	if (ipc_info == NULL) {
2580 		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2581 		    "rsmka_get_sendq_token done: ipc_info is NULL\n"));
2582 		return (NULL);
2583 	}
2584 
2585 	if (ipc_info->node_is_alive == B_TRUE) {
2586 		token = first_token = ipc_info->current_token;
2587 		if (token == NULL) {
2588 			mutex_exit(&ipc_info_lock);
2589 			DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2590 			    "rsmka_get_sendq_token done: token=NULL\n"));
2591 			return (NULL);
2592 		}
2593 
2594 		for (;;) {
2595 			path = SQ_TOKEN_TO_PATH(token);
2596 			DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2597 			    "path %lx\n", path));
2598 			mutex_enter(&path->mutex);
2599 			if (path->state != RSMKA_PATH_ACTIVE ||
2600 			    path->ref_cnt == 0) {
2601 				mutex_exit(&path->mutex);
2602 			} else {
2603 				if (token != prev_used) {
2604 					/* found a new token */
2605 					break;
2606 				}
2607 				mutex_exit(&path->mutex);
2608 			}
2609 
2610 			token = token->next;
2611 			if (token == first_token) {
2612 				/*
2613 				 * we didn't find a new token reuse prev_used
2614 				 * if the corresponding path is still up
2615 				 */
2616 				if (prev_used) {
2617 					path = SQ_TOKEN_TO_PATH(prev_used);
2618 					DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2619 					    "path %lx\n", path));
2620 					mutex_enter(&path->mutex);
2621 					if (path->state != RSMKA_PATH_ACTIVE ||
2622 					    path->ref_cnt == 0) {
2623 						mutex_exit(&path->mutex);
2624 					} else {
2625 						token = prev_used;
2626 						break;
2627 					}
2628 				}
2629 				mutex_exit(&ipc_info_lock);
2630 				DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2631 				    "rsmka_get_sendq_token: token=NULL\n"));
2632 				return (NULL);
2633 			}
2634 		}
2635 
2636 		PATH_HOLD_NOLOCK(path);
2637 		SENDQ_TOKEN_HOLD(path);
2638 		if (prev_used == NULL) {
2639 			/* change current_token only the first time */
2640 			ipc_info->current_token = token->next;
2641 		}
2642 
2643 		mutex_exit(&path->mutex);
2644 		mutex_exit(&ipc_info_lock);
2645 
2646 		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2647 		    "rsmka_get_sendq_token done\n"));
2648 		return (token);
2649 	} else {
2650 		mutex_exit(&ipc_info_lock);
2651 		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
2652 		    "rsmka_get_sendq_token done\n"));
2653 		return (NULL);
2654 	}
2655 }
2656 
2657 
2658 
2659 /*
2660  */
2661 static int
2662 create_ipc_sendq(path_t *path)
2663 {
2664 	int		rval;
2665 	sendq_token_t	*token;
2666 	adapter_t 	*adapter;
2667 	int64_t		srvc_offset;
2668 
2669 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "create_ipc_sendq enter\n"));
2670 
2671 	DBG_PRINTF((category, RSM_DEBUG, "create_ipc_sendq: path = %lx\n",
2672 	    path));
2673 
2674 	adapter = path->local_adapter;
2675 	token = &path->sendq_token;
2676 
2677 	srvc_offset = path->remote_hwaddr;
2678 
2679 	DBG_PRINTF((category, RSM_DEBUG,
2680 	    "create_ipc_sendq: srvc_offset = %lld\n",
2681 	    srvc_offset));
2682 
2683 	rval = adapter->rsmpi_ops->rsm_sendq_create(adapter->rsmpi_handle,
2684 	    path->remote_hwaddr,
2685 	    (rsm_intr_service_t)(RSM_SERVICE+srvc_offset),
2686 	    (rsm_intr_pri_t)RSM_PRI, (size_t)RSM_QUEUE_SZ,
2687 	    RSM_INTR_SEND_Q_NO_FENCE,
2688 	    RSM_RESOURCE_SLEEP, NULL, &token->rsmpi_sendq_handle);
2689 	if (rval == RSM_SUCCESS) {
2690 		/* rsmipc_send() may be waiting for a sendq_token */
2691 		mutex_enter(&ipc_info_cvlock);
2692 		cv_broadcast(&ipc_info_cv);
2693 		mutex_exit(&ipc_info_cvlock);
2694 	}
2695 
2696 	DBG_PRINTF((category, RSM_DEBUG, "create_ipc_sendq: handle = %lx\n",
2697 	    token->rsmpi_sendq_handle));
2698 	DBG_PRINTF((category, RSM_DEBUG, "create_ipc_sendq: rval = %d\n",
2699 	    rval));
2700 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "create_ipc_sendq done\n"));
2701 
2702 	return (rval);
2703 }
2704 
2705 
2706 boolean_t
2707 rsmka_check_node_alive(rsm_node_id_t remote_node)
2708 {
2709 	ipc_info_t *ipc_info;
2710 
2711 	DBG_PRINTF((category, RSM_DEBUG, "rsmka_check_node_alive enter\n"));
2712 
2713 	ipc_info = lookup_ipc_info(remote_node);
2714 	if (ipc_info == NULL) {
2715 		DBG_PRINTF((category, RSM_DEBUG,
2716 		    "rsmka_check_node_alive done: ipc_info NULL\n"));
2717 		return (B_FALSE);
2718 	}
2719 
2720 	if (ipc_info->node_is_alive == B_TRUE) {
2721 		mutex_exit(&ipc_info_lock);
2722 		DBG_PRINTF((category, RSM_DEBUG,
2723 		    "rsmka_check_node_alive done: node is alive\n"));
2724 		return (B_TRUE);
2725 	} else {
2726 		mutex_exit(&ipc_info_lock);
2727 		DBG_PRINTF((category, RSM_DEBUG,
2728 		    "rsmka_check_node_alive done: node is not alive\n"));
2729 		return (B_FALSE);
2730 	}
2731 }
2732 
2733 
2734 
2735 
2736 /*
2737  *  TOPOLOGY IOCTL SUPPORT
2738  */
2739 
2740 static uint32_t
2741 get_topology_size(int mode)
2742 {
2743 	uint32_t	topology_size;
2744 	int		pointer_area_size;
2745 	adapter_listhead_t	*listhead;
2746 	int		total_num_of_adapters;
2747 	int		total_num_of_paths;
2748 
2749 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_topology_size enter\n"));
2750 
2751 	/*
2752 	 * Find the total number of adapters and paths by adding up the
2753 	 * individual adapter and path counts from all the listheads
2754 	 */
2755 	total_num_of_adapters = 0;
2756 	total_num_of_paths = 0;
2757 	listhead = adapter_listhead_base.next;
2758 	while (listhead != NULL) {
2759 		total_num_of_adapters += listhead->adapter_count;
2760 		total_num_of_paths += listhead->path_count;
2761 		listhead = listhead->next_listhead;
2762 	}
2763 
2764 #ifdef	_MULTI_DATAMODEL
2765 	if ((mode & DATAMODEL_MASK) == DATAMODEL_ILP32)
2766 		/*
2767 		 * Add extra 4-bytes to make sure connections header
2768 		 * is double-word aligned
2769 		 */
2770 		pointer_area_size =
2771 		    (total_num_of_adapters + total_num_of_adapters%2) *
2772 		    sizeof (caddr32_t);
2773 	else
2774 		pointer_area_size = total_num_of_adapters * sizeof (caddr_t);
2775 #else	/* _MULTI_DATAMODEL */
2776 	mode = mode;
2777 	pointer_area_size = total_num_of_adapters * sizeof (caddr_t);
2778 #endif	/* _MULTI_DATAMODEL */
2779 
2780 
2781 	topology_size = sizeof (rsmka_topology_hdr_t) +
2782 	    pointer_area_size +
2783 	    (total_num_of_adapters * sizeof (rsmka_connections_hdr_t)) +
2784 	    (total_num_of_paths * sizeof (rsmka_remote_cntlr_t));
2785 
2786 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_topology_size done\n"));
2787 
2788 	return (topology_size);
2789 }
2790 
2791 
2792 
2793 static void
2794 get_topology(caddr_t arg, char *bufp, int mode)
2795 {
2796 
2797 	rsmka_topology_t	*tp = (rsmka_topology_t *)bufp;
2798 	adapter_listhead_t	*listhead;
2799 	adapter_t		*adapter;
2800 	path_t			*path;
2801 	int			cntlr = 0;
2802 	rsmka_connections_t	*connection;
2803 	rsmka_remote_cntlr_t	*rem_cntlr;
2804 	int			total_num_of_adapters;
2805 
2806 #ifdef	_MULTI_DATAMODEL
2807 	rsmka_topology32_t	*tp32 = (rsmka_topology32_t *)bufp;
2808 #else
2809 	mode = mode;
2810 #endif	/* _MULTI_DATAMODEL */
2811 
2812 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_topology enter\n"));
2813 
2814 	/*
2815 	 * Find the total number of adapters by adding up the
2816 	 * individual adapter counts from all the listheads
2817 	 */
2818 	total_num_of_adapters = 0;
2819 	listhead = adapter_listhead_base.next;
2820 	while (listhead != NULL) {
2821 		total_num_of_adapters += listhead->adapter_count;
2822 		listhead = listhead->next_listhead;
2823 	}
2824 
2825 	/* fill topology header and adjust bufp */
2826 	tp->topology_hdr.local_nodeid = my_nodeid;
2827 	tp->topology_hdr.local_cntlr_count = total_num_of_adapters;
2828 	bufp = (char *)&tp->connections[0];
2829 
2830 	/* leave room for connection pointer area */
2831 #ifdef	_MULTI_DATAMODEL
2832 	if ((mode & DATAMODEL_MASK) == DATAMODEL_ILP32)
2833 		/* make sure bufp is double-word aligned */
2834 		bufp += (total_num_of_adapters + total_num_of_adapters%2) *
2835 		    sizeof (caddr32_t);
2836 	else
2837 		bufp += total_num_of_adapters * sizeof (caddr_t);
2838 #else	/* _MULTI_DATAMODEL */
2839 	bufp += total_num_of_adapters * sizeof (caddr_t);
2840 #endif	/* _MULTI_DATAMODEL */
2841 
2842 	/* fill topology from the adapter and path data */
2843 	listhead = adapter_listhead_base.next;
2844 	while (listhead != NULL) {
2845 		adapter = listhead->next_adapter;
2846 		while (adapter != NULL) {
2847 			/* fill in user based connection pointer */
2848 #ifdef	_MULTI_DATAMODEL
2849 			if ((mode & DATAMODEL_MASK) == DATAMODEL_ILP32) {
2850 				ulong_t delta = (ulong_t)bufp - (ulong_t)tp32;
2851 				caddr32_t userbase = (caddr32_t)((ulong_t)arg &
2852 				    0xffffffff);
2853 				tp32->connections[cntlr++] = userbase + delta;
2854 			} else {
2855 				tp->connections[cntlr++] = arg +
2856 				    (ulong_t)bufp -
2857 				    (ulong_t)tp;
2858 			}
2859 #else	/* _MULTI_DATAMODEL */
2860 				tp->connections[cntlr++] = arg +
2861 				    (ulong_t)bufp -
2862 				    (ulong_t)tp;
2863 #endif	/* _MULTI_DATAMODEL */
2864 			connection = (rsmka_connections_t *)bufp;
2865 			(void) snprintf(connection->hdr.cntlr_name,
2866 			    MAXNAMELEN, "%s%d",
2867 			    listhead->adapter_devname,
2868 			    adapter->instance);
2869 			connection->hdr.local_hwaddr = adapter->hwaddr;
2870 			connection->hdr.remote_cntlr_count = 0;
2871 			bufp += sizeof (rsmka_connections_hdr_t);
2872 			rem_cntlr = (rsmka_remote_cntlr_t *)bufp;
2873 			path = adapter->next_path;
2874 			while (path != NULL) {
2875 				connection->hdr.remote_cntlr_count++;
2876 				rem_cntlr->remote_nodeid = path->remote_node;
2877 				(void) snprintf(rem_cntlr->remote_cntlrname,
2878 				    MAXNAMELEN, "%s%d",
2879 				    listhead->adapter_devname,
2880 				    path->remote_devinst);
2881 				rem_cntlr->remote_hwaddr = path->remote_hwaddr;
2882 				rem_cntlr->connection_state = path->state;
2883 				++rem_cntlr;
2884 				path = path->next_path;
2885 			}
2886 			adapter = adapter->next;
2887 			bufp = (char *)rem_cntlr;
2888 		}
2889 		listhead = listhead->next_listhead;
2890 	}
2891 
2892 	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_topology done\n"));
2893 
2894 }
2895 
2896 
2897 /*
2898  * Called from rsm_ioctl() in rsm.c
2899  * Make sure there is no possiblity of blocking while holding
2900  * adapter_listhead_base.lock
2901  */
2902 int
2903 rsmka_topology_ioctl(caddr_t arg, int cmd, int mode)
2904 {
2905 	uint32_t	topology_size;
2906 	uint32_t 	request_size;
2907 	char		*bufp;
2908 	int		error = RSM_SUCCESS;
2909 	size_t		max_toposize;
2910 
2911 	DBG_PRINTF((category | RSM_IOCTL, RSM_DEBUG_VERBOSE,
2912 	    "rsmka_topology_ioctl enter\n"));
2913 
2914 	switch (cmd) {
2915 	case RSM_IOCTL_TOPOLOGY_SIZE:
2916 		mutex_enter(&adapter_listhead_base.listlock);
2917 		topology_size = get_topology_size(mode);
2918 		mutex_exit(&adapter_listhead_base.listlock);
2919 		if (ddi_copyout((caddr_t)&topology_size,
2920 		    (caddr_t)arg, sizeof (uint32_t), mode))
2921 			error = RSMERR_BAD_ADDR;
2922 		break;
2923 	case RSM_IOCTL_TOPOLOGY_DATA:
2924 		/*
2925 		 * The size of the buffer which the caller has allocated
2926 		 * is passed in.  If the size needed for the topology data
2927 		 * is not sufficient, E2BIG is returned
2928 		 */
2929 		if (ddi_copyin(arg, &request_size, sizeof (uint32_t), mode)) {
2930 			DBG_PRINTF((category | RSM_IOCTL, RSM_DEBUG_VERBOSE,
2931 			    "rsmka_topology_ioctl done: BAD_ADDR\n"));
2932 			return (RSMERR_BAD_ADDR);
2933 		}
2934 		/* calculate the max size of the topology structure */
2935 		max_toposize = sizeof (rsmka_topology_hdr_t) +
2936 		    RSM_MAX_CTRL * (sizeof (caddr_t) +
2937 			sizeof (rsmka_connections_hdr_t)) +
2938 		    RSM_MAX_NODE * sizeof (rsmka_remote_cntlr_t);
2939 
2940 		if (request_size > max_toposize) { /* validate request_size */
2941 			DBG_PRINTF((category | RSM_IOCTL, RSM_DEBUG_VERBOSE,
2942 			    "rsmka_topology_ioctl done: size too large\n"));
2943 			return (EINVAL);
2944 		}
2945 		bufp = kmem_zalloc(request_size, KM_SLEEP);
2946 		mutex_enter(&adapter_listhead_base.listlock);
2947 		topology_size = get_topology_size(mode);
2948 		if (request_size < topology_size) {
2949 			kmem_free(bufp, request_size);
2950 			mutex_exit(&adapter_listhead_base.listlock);
2951 			DBG_PRINTF((category | RSM_IOCTL, RSM_DEBUG_VERBOSE,
2952 			    "rsmka_topology_ioctl done: E2BIG\n"));
2953 			return (E2BIG);
2954 		}
2955 
2956 		/* get the topology data and copyout to the caller */
2957 		get_topology(arg, bufp, mode);
2958 		mutex_exit(&adapter_listhead_base.listlock);
2959 		if (ddi_copyout((caddr_t)bufp, (caddr_t)arg,
2960 		    topology_size, mode))
2961 			error = RSMERR_BAD_ADDR;
2962 
2963 		kmem_free(bufp, request_size);
2964 		break;
2965 	default:
2966 		DBG_PRINTF((category | RSM_IOCTL, RSM_DEBUG,
2967 		    "rsmka_topology_ioctl: cmd not supported\n"));
2968 		error = DDI_FAILURE;
2969 	}
2970 
2971 	DBG_PRINTF((category | RSM_IOCTL, RSM_DEBUG_VERBOSE,
2972 	    "rsmka_topology_ioctl done: %d\n", error));
2973 	return (error);
2974 }
2975