/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*
 * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

#pragma ident	"%Z%%M%	%I%	%E% SMI"

/*
 * This module provides for the management of interconnect adapters
 * inter-node connections (aka paths), and IPC.  Adapter descriptors are
 * maintained on a linked list; one list per adapter devname.  Each
 * adapter descriptor heads a linked list of path descriptors.  There is
 * also a linked list of ipc_info descriptors; one for each node.  Each
 * ipc_info descriptor heads a circular list of ipc tokens (the tokens are
 * embedded within a path descriptor). The tokens are used in round robin
 * fashion.
 *
 *
 * The exported interface consists of the following functions:
 *	- rsmka_add_adapter
 *	- rsmka_remove_adapter
 *
 *      [add_path and remove_path only called for current adapters]
 *	- rsmka_add_path
 *	- rsmka_remove_path	[a path down request is implicit]
 *
 *	- rsmka_path_up           [called at clock ipl for Sun Cluster]
 *	- rsmka_path_down         [called at clock ipl for Sun Cluster]
 *	- rsmka_disconnect_node   [called at clock ipl for Sun Cluster;
 *				treat like path-down for all node paths;
 *				can be before node_alive; always before
 *				node_died.]
 *
 *	[node_alive and node_died are always paired]
 *	- rsmka_node_alive   called after the first cluster path is up
 *                           for this node
 *	- rsmka_node_died
 *
 *      [set the local node id]
 *      - rsmka_set_my_nodeid    called to set the variable my_nodeid to the
 *                           local node id
 *
 * Processing for these functions is setup as a state machine supported
 * by the data structures described above.
 *
 * For Sun Cluster these are called from the Path-Manager/Kernel-Agent
 * Interface (rsmka_pm_interface.cc).
 *
 * The functions rsm_path_up, rsm_path_down, and rsm_disconnect_node are
 * called at clock interrupt level from the Path-Manager/Kernel-Agent
 * Interface which precludes sleeping; so these functions may (optionally)
 * defer processing to an independent thread running at normal ipl.
 *
 *
 * lock definitions:
 *
 *	(mutex) work_queue.work_mutex
 *			protects linked list of work tokens and used
 *			with cv_wait/cv_signal thread synchronization.
 *			No other locks acquired when held.
 *
 *	(mutex) adapter_listhead_base.listlock
 *			protects linked list of adapter listheads
 *			Always acquired before listhead->mutex
 *
 *
 *	(mutex) ipc_info_lock
 *			protects ipc_info list and sendq token lists
 *			Always acquired before listhead->mutex
 *
 *      (mutex) listhead->mutex
 *			protects adapter listhead, linked list of
 *			adapters, and linked list of paths.
 *
 *      (mutex) path->mutex
 *			protects the path descriptor.
 *			work_queue.work_mutex may be acquired when holding
 *			this lock.
 *
 *	(mutex) adapter->mutex
 *			protects adapter descriptor contents.  used
 *			mainly for ref_cnt update.
 */

#include <sys/param.h>
#include <sys/kmem.h>
#include <sys/errno.h>
#include <sys/time.h>
#include <sys/devops.h>
#include <sys/ddi_impldefs.h>
#include <sys/cmn_err.h>
#include <sys/ddi.h>
#include <sys/sunddi.h>
#include <sys/proc.h>
#include <sys/thread.h>
#include <sys/taskq.h>
#include <sys/callb.h>

#include <sys/rsm/rsm.h>
#include <rsm_in.h>
#include <sys/rsm/rsmka_path_int.h>

extern void _cplpl_init();
extern void _cplpl_fini();
extern pri_t maxclsyspri;
extern int   rsm_hash_size;

extern rsm_node_id_t my_nodeid;
extern rsmhash_table_t rsm_import_segs;
extern rsm_intr_hand_ret_t rsm_srv_func();
extern void rsmseg_unload(rsmseg_t *);
extern void rsm_suspend_complete(rsm_node_id_t src_node, int flag);
extern int rsmipc_send_controlmsg(path_t *path, int msgtype);
extern void rsmka_path_monitor_initialize();
extern void rsmka_path_monitor_terminate();

extern adapter_t loopback_adapter;
/*
 * Lint errors and warnings are displayed; informational messages
 * are suppressed.
 */
/* lint -w2 */


/*
 * macros SQ_TOKEN_TO_PATH and WORK_TOKEN_TO_PATH use a null pointer
 * for computational purposes.  Ignore the lint warning.
 */
/* lint -save -e413 */
/* FUNCTION PROTOTYPES */
static adapter_t *init_adapter(char *, int, rsm_addr_t,
    rsm_controller_handle_t, rsm_ops_t *, srv_handler_arg_t *);
adapter_t *rsmka_lookup_adapter(char *, int);
static ipc_info_t *lookup_ipc_info(rsm_node_id_t);
static ipc_info_t *init_ipc_info(rsm_node_id_t, boolean_t);
static path_t *lookup_path(char *, int, rsm_node_id_t, rsm_addr_t);
static void pathup_to_pathactive(ipc_info_t *, rsm_node_id_t);
static void path_importer_disconnect(path_t *);
boolean_t rsmka_do_path_active(path_t *, int);
static boolean_t do_path_up(path_t *, int);
static void do_path_down(path_t *, int);
static void enqueue_work(work_token_t *);
static boolean_t cancel_work(work_token_t *);
static void link_path(path_t *);
static void destroy_path(path_t *);
static void link_sendq_token(sendq_token_t *, rsm_node_id_t);
static void unlink_sendq_token(sendq_token_t *, rsm_node_id_t);
boolean_t rsmka_check_node_alive(rsm_node_id_t);
static void do_deferred_work(caddr_t);
static int create_ipc_sendq(path_t *);
static void destroy_ipc_info(ipc_info_t *);
void rsmka_pathmanager_cleanup();
void rsmka_release_adapter(adapter_t *);

kt_did_t rsm_thread_id;
int rsmka_terminate_workthread_loop = 0;

static struct adapter_listhead_list adapter_listhead_base;
static work_queue_t work_queue;

/* protect ipc_info descriptor manipulation */
static kmutex_t ipc_info_lock;

static ipc_info_t *ipc_info_head = NULL;

static int category = RSM_PATH_MANAGER | RSM_KERNEL_AGENT;

/* for synchronization with rsmipc_send() in rsm.c */
kmutex_t ipc_info_cvlock;
kcondvar_t ipc_info_cv;



/*
 * RSMKA PATHMANAGER INITIALIZATION AND CLEANUP ROUTINES
 *
 */


/*
 * Called from the rsm module (rsm.c)  _init() routine
 */
void
rsmka_pathmanager_init()
{
	kthread_t *tp;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_pathmanager_init enter\n"));

	/* initialization for locks and condition variables  */
	mutex_init(&work_queue.work_mutex, NULL, MUTEX_DEFAULT, NULL);
	mutex_init(&ipc_info_lock, NULL, MUTEX_DEFAULT, NULL);
	mutex_init(&ipc_info_cvlock, NULL, MUTEX_DEFAULT, NULL);
	mutex_init(&adapter_listhead_base.listlock, NULL,
	    MUTEX_DEFAULT, NULL);

	cv_init(&work_queue.work_cv, NULL, CV_DEFAULT, NULL);
	cv_init(&ipc_info_cv, NULL, CV_DEFAULT, NULL);

	tp = thread_create(NULL, 0, do_deferred_work, NULL, 0, &p0,
	    TS_RUN, maxclsyspri);
	rsm_thread_id = tp->t_did;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_pathmanager_init done\n"));
}

void
rsmka_pathmanager_cleanup()
{
	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_pathmanager_cleanup enter\n"));

	ASSERT(work_queue.head == NULL);

	/*
	 * In processing the remove path callbacks from the path monitor
	 * object, all deferred work will have been completed. So
	 * awaken the deferred work thread to give it a chance to exit
	 * the loop.
	 */
	mutex_enter(&work_queue.work_mutex);
	rsmka_terminate_workthread_loop++;
	cv_signal(&work_queue.work_cv);
	mutex_exit(&work_queue.work_mutex);

	/*
	 * Wait for the deferred work thread to exit before
	 * destroying the locks and cleaning up other data
	 * structures.
	 */
	if (rsm_thread_id)
		thread_join(rsm_thread_id);

	/*
	 * Destroy locks & condition variables
	 */
	mutex_destroy(&work_queue.work_mutex);
	cv_destroy(&work_queue.work_cv);

	mutex_enter(&ipc_info_lock);
	while (ipc_info_head)
		destroy_ipc_info(ipc_info_head);
	mutex_exit(&ipc_info_lock);

	mutex_destroy(&ipc_info_lock);

	mutex_destroy(&ipc_info_cvlock);
	cv_destroy(&ipc_info_cv);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_pathmanager_cleanup done\n"));

}

void
rsmka_set_my_nodeid(rsm_node_id_t local_nodeid)
{
	my_nodeid = local_nodeid;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsm: node %d \n", my_nodeid));

}

/*
 * DEFERRED WORK THREAD AND WORK QUEUE SUPPORT ROUTINES
 *
 */

/*
 * This function is the executable code of the thread which handles
 * deferred work.  Work is deferred when a function is called at
 * clock ipl and processing may require blocking.
 *
 *
 * The thread is created by a call to taskq_create in rsmka_pathmanager_init.
 * After creation, a call to taskq_dispatch causes this function to
 * execute.  It loops forever - blocked until work is enqueued from
 * rsmka_do_path_active, do_path_down, or rsmka_disconnect_node.
 * rsmka_pathmanager_cleanup (called from _fini) will
 * set rsmka_terminate_workthread_loop and the task processing will
 * terminate.
 */
static void
do_deferred_work(caddr_t arg /*ARGSUSED*/)
{

	adapter_t 			*adapter;
	path_t				*path;
	work_token_t			*work_token;
	int				work_opcode;
	rsm_send_q_handle_t		sendq_handle;
	int				error;
	timespec_t			tv;
	callb_cpr_t			cprinfo;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "do_deferred_work enter\n"));

	CALLB_CPR_INIT(&cprinfo, &work_queue.work_mutex, callb_generic_cpr,
	    "rsm_deferred_work");

	for (;;) {
		mutex_enter(&work_queue.work_mutex);

		if (rsmka_terminate_workthread_loop) {
			goto exit;
		}

		/* When there is no work to do, block here */
		while (work_queue.head == NULL) {
			/* Since no work to do, Safe to CPR */
			CALLB_CPR_SAFE_BEGIN(&cprinfo);
			cv_wait(&work_queue.work_cv, &work_queue.work_mutex);
			CALLB_CPR_SAFE_END(&cprinfo, &work_queue.work_mutex);

			if (rsmka_terminate_workthread_loop) {
				goto exit;
			}
		}

		/*
		 * Remove a work token and begin work
		 */
		work_token = work_queue.head;
		work_queue.head = work_token->next;
		if (work_queue.tail == work_token)
			work_queue.tail = NULL;

		work_opcode = work_token->opcode;
		path = WORK_TOKEN_TO_PATH(work_token, work_opcode -1);
		work_token->next = NULL;
		mutex_exit(&work_queue.work_mutex);


		switch (work_opcode) {
		case RSMKA_IPC_UP:
			DBG_PRINTF((category, RSM_DEBUG,
			    "do_deferred_work:up,  path = %lx\n", path));
			error = create_ipc_sendq(path);
			mutex_enter(&path->mutex);
			if (path->state != RSMKA_PATH_UP) {
				/*
				 * path state has changed, if sendq was created,
				 * destroy it and return. Don't need to worry
				 * about sendq ref_cnt since no one starts
				 * using the sendq till path state becomes
				 * active
				 */
				if (error == RSM_SUCCESS) {
					sendq_handle = path->sendq_token.
					    rsmpi_sendq_handle;
					path->sendq_token.rsmpi_sendq_handle =
					    NULL;
					adapter = path->local_adapter;
					mutex_exit(&path->mutex);

					if (sendq_handle != NULL) {
						adapter->rsmpi_ops->
						    rsm_sendq_destroy(
							    sendq_handle);
					}
					mutex_enter(&path->mutex);
				}
				/* free up work token */
				work_token->opcode = 0;

				/*
				 * decrement reference count for the path
				 * descriptor and signal for synchronization
				 * with rsmka_remove_path. PATH_HOLD_NOLOCK was
				 * done by rsmka_path_up.
				 */
				PATH_RELE_NOLOCK(path);
				mutex_exit(&path->mutex);
				break;
			}

			if (error == RSM_SUCCESS) {
				DBG_PRINTF((category, RSM_DEBUG,
				    "do_deferred_work:success on up\n"));
				/* clear flag since sendq_create succeeded */
				path->flags &= ~RSMKA_SQCREATE_PENDING;
				path->state = RSMKA_PATH_ACTIVE;

				/*
				 * now that path is active we send the
				 * RSMIPC_MSG_SQREADY to the remote endpoint
				 */
				path->procmsg_cnt = 0;
				path->sendq_token.msgbuf_avail = 0;

				/* Calculate local incarnation number */
				gethrestime(&tv);
				if (tv.tv_sec == RSM_UNKNOWN_INCN)
					tv.tv_sec = 1;
				path->local_incn = (int64_t)tv.tv_sec;

				/*
				 * if send fails here its due to some
				 * non-transient error because QUEUE_FULL is
				 * not possible here since we are the first
				 * message on this sendq. The error will cause
				 * the path to go down anyways, so ignore
				 * the return value.
				 */
				(void) rsmipc_send_controlmsg(path,
				    RSMIPC_MSG_SQREADY);
				/* wait for SQREADY_ACK message */
				path->flags |= RSMKA_WAIT_FOR_SQACK;
			} else {
				/*
				 * sendq create failed possibly because
				 * the remote end is not yet ready eg.
				 * handler not registered, set a flag
				 * so that when there is an indication
				 * that the remote end is ready
				 * rsmka_do_path_active will be retried.
				 */
				path->flags |= RSMKA_SQCREATE_PENDING;
			}

			/* free up work token */
			work_token->opcode = 0;

			/*
			 * decrement reference count for the path
			 * descriptor and signal for synchronization with
			 * rsmka_remove_path. PATH_HOLD_NOLOCK was done
			 * by rsmka_path_up.
			 */
			PATH_RELE_NOLOCK(path);
			mutex_exit(&path->mutex);

			break;
		case RSMKA_IPC_DOWN:
			DBG_PRINTF((category, RSM_DEBUG,
			    "do_deferred_work:down, path = %lx\n", path));

			/*
			 * Unlike the processing of path_down in the case
			 * where the RSMKA_NO_SLEEP flag is not set, here,
			 * the state of the path is changed directly to
			 * RSMKA_PATH_DOWN. This is because in this case
			 * where the RSMKA_NO_SLEEP flag is set, any other
			 * calls referring this path will just queue up
			 * and will be processed only after the path
			 * down processing has completed.
			 */
			mutex_enter(&path->mutex);
			path->state = RSMKA_PATH_DOWN;
			/*
			 * clear the WAIT_FOR_SQACK flag since path is down.
			 */
			path->flags &= ~RSMKA_WAIT_FOR_SQACK;

			/*
			 * this wakes up any thread waiting to receive credits
			 * in rsmipc_send to tell it that the path is down
			 * thus releasing the sendq.
			 */
			cv_broadcast(&path->sendq_token.sendq_cv);

			mutex_exit(&path->mutex);

			/* drain the messages from the receive msgbuf */
			taskq_wait(path->recv_taskq);

			/*
			 * The path_importer_disconnect function has to
			 * be called after releasing the mutex on the path
			 * in order to avoid any recursive mutex enter panics
			 */
			path_importer_disconnect(path);
			DBG_PRINTF((category, RSM_DEBUG,
			    "do_deferred_work: success on down\n"));
			/*
			 * decrement reference count for the path
			 * descriptor and signal for synchronization with
			 * rsmka_remove_path. PATH_HOLD_NOLOCK was done
			 * by rsmka_path_down.
			 */
			mutex_enter(&path->mutex);

#ifdef DEBUG
			/*
			 * Some IPC messages left in the recv_buf,
			 * they'll be dropped
			 */
			if (path->msgbuf_cnt != 0)
				cmn_err(CE_NOTE,
				    "path=%lx msgbuf_cnt != 0\n",
				    (uintptr_t)path);
#endif

			/*
			 * Don't want to destroy a send queue when a token
			 * has been acquired; so wait 'til the token is
			 * no longer referenced (with a cv_wait).
			 */
			while (path->sendq_token.ref_cnt != 0)
				cv_wait(&path->sendq_token.sendq_cv,
				    &path->mutex);

			sendq_handle = path->sendq_token.rsmpi_sendq_handle;
			path->sendq_token.rsmpi_sendq_handle = NULL;

			/* destroy the send queue and release the handle */
			if (sendq_handle != NULL) {
				adapter = path->local_adapter;
				adapter->rsmpi_ops->rsm_sendq_destroy(
				    sendq_handle);
			}

			work_token->opcode = 0;
			PATH_RELE_NOLOCK(path);
			mutex_exit(&path->mutex);
			break;
		default:
			DBG_PRINTF((category, RSM_DEBUG,
			    "do_deferred_work: bad work token opcode\n"));
			break;
		}
	}

exit:
	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "do_deferred_work done\n"));
	/*
	 * CALLB_CPR_EXIT does a mutex_exit for
	 * the work_queue.work_mutex
	 */
	CALLB_CPR_EXIT(&cprinfo);
}

/*
 * Work is inserted at the tail of the list and processed from the
 * head of the list.
 */
static void
enqueue_work(work_token_t *token)
{
	work_token_t	*tail_token;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "enqueue_work enter\n"));

	ASSERT(MUTEX_HELD(&work_queue.work_mutex));

	token->next = NULL;
	if (work_queue.head == NULL) {
		work_queue.head = work_queue.tail = token;
	} else {
		tail_token = work_queue.tail;
		work_queue.tail = tail_token->next = token;
	}

	/* wake up deferred work thread */
	cv_signal(&work_queue.work_cv);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "enqueue_work done\n"));
}


/*
 * If the work_token is found on the work queue, the work is cancelled
 * by removing the token from the work queue.
 *
 * Return true if a work_token was found and cancelled, otherwise return false
 *
 * enqueue_work increments the path refcnt to make sure that the path doesn't
 * go away, callers of cancel_work need to decrement the refcnt of the path to
 * which this work_token belongs if a work_token is found in the work_queue
 * and cancelled ie. when the return value is B_TRUE.
 */
static boolean_t
cancel_work(work_token_t *work_token)
{
	work_token_t	*current_token;
	work_token_t	*prev_token = NULL;
	boolean_t	cancelled = B_FALSE;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "cancel_work enter\n"));

	ASSERT(MUTEX_HELD(&work_queue.work_mutex));


	current_token = work_queue.head;
	while (current_token != NULL) {
		if (current_token == work_token) {
			if (work_token == work_queue.head)
				work_queue.head = work_token->next;
			else
				prev_token->next = work_token->next;
			if (work_token == work_queue.tail)
				work_queue.tail = prev_token;

			current_token->opcode = 0;
			current_token->next = NULL;
			/* found and cancelled work */
			cancelled = B_TRUE;
			DBG_PRINTF((category, RSM_DEBUG,
			    "cancelled_work = 0x%p\n", work_token));
			break;
		}
		prev_token = current_token;
		current_token = current_token->next;
	}

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "cancel_work done\n"));
	return (cancelled);
}

/*
 * EXTERNAL INTERFACES
 *
 * For Galileo Clustering, these routine are called from
 * rsmka_pm_interface.cc
 *
 */

/*
 *
 * If the adapter is supported by rsmpi then initialize an adapter descriptor
 * and link it to the list of adapters.  The adapter attributes are obtained
 * from rsmpi and stored in the descriptor.  Finally, a service handler
 * for incoming ipc on this adapter is registered with rsmpi.
 * A pointer for the adapter descriptor is returned as a cookie to the
 * caller.  The cookie may be use with subsequent calls to save the time of
 * adapter descriptor lookup.
 *
 * The adapter descriptor maintains a reference count which is intialized
 * to 1 and incremented on lookups; when a cookie is used in place of
 * a lookup, an explicit ADAPTER_HOLD is required.
 */

void *
rsmka_add_adapter(char *name, int instance, rsm_addr_t hwaddr)
{
	adapter_t		*adapter;
	rsm_controller_object_t	rsmpi_adapter_object;
	rsm_controller_handle_t	rsmpi_adapter_handle;
	rsm_ops_t		*rsmpi_ops_vector;
	int			adapter_is_supported;
	rsm_controller_attr_t	*attr;
	srv_handler_arg_t	*srv_hdlr_argp;
	int result;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_add_adapter enter\n"));

	DBG_PRINTF((category, RSM_DEBUG,
	    "rsmka_add_adapter: name = %s instance = %d hwaddr = %llx \n",
	    name, instance, hwaddr));

	/* verify name length */
	if (strlen(name) >= MAXNAMELEN) {
		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
		    "rsmka_add_adapter done: name too long\n"));
		return (NULL);
	}


	/* Check if rsmpi supports this adapter type */
	adapter_is_supported = rsm_get_controller(name, instance,
	    &rsmpi_adapter_object, RSM_VERSION);

	if (adapter_is_supported != RSM_SUCCESS) {
		DBG_PRINTF((category, RSM_ERR,
		    "rsmka_add_adapter done: adapter not supported\n"));
		return (NULL);
	}

	rsmpi_adapter_handle = rsmpi_adapter_object.handle;
	rsmpi_ops_vector = rsmpi_adapter_object.ops;

	/* Get adapter attributes */
	result = rsm_get_controller_attr(rsmpi_adapter_handle, &attr);
	if (result != RSM_SUCCESS) {
		DBG_PRINTF((category, RSM_ERR,
		    "rsm: get_controller_attr(%d) Failed %x\n",
		    instance, result));
		(void) rsm_release_controller(name, instance,
		    &rsmpi_adapter_object);
		return (NULL);
	}

	DBG_PRINTF((category, RSM_DEBUG,
	    "rsmka_add_adapter: register service offset = %d\n", hwaddr));

	/*
	 * create a srv_handler_arg_t object, initialize it and register
	 * it along with rsm_srv_func. This get passed as the
	 * rsm_intr_hand_arg_t when the handler gets invoked.
	 */
	srv_hdlr_argp = kmem_zalloc(sizeof (srv_handler_arg_t), KM_SLEEP);

	(void) strcpy(srv_hdlr_argp->adapter_name, name);
	srv_hdlr_argp->adapter_instance = instance;
	srv_hdlr_argp->adapter_hwaddr = hwaddr;

	/* Have rsmpi register the ipc receive handler for this adapter */
	/*
	 * Currently, we need to pass in a separate service identifier for
	 * each adapter. In order to obtain a unique service identifier
	 * value for an adapter, we add the hardware address of the
	 * adapter to the base service identifier(RSM_SERVICE which is
	 * defined as RSM_INTR_T_KA as per the RSMPI specification).
	 * NOTE: This may result in using some of the service identifier
	 * values defined for RSM_INTR_T_XPORT(the Sun Cluster Transport).
	 */
	result = rsmpi_ops_vector->rsm_register_handler(
	    rsmpi_adapter_handle, &rsmpi_adapter_object,
	    RSM_SERVICE+(uint_t)hwaddr, rsm_srv_func,
	    (rsm_intr_hand_arg_t)srv_hdlr_argp, NULL, 0);

	if (result != RSM_SUCCESS) {
		DBG_PRINTF((category, RSM_ERR,
		    "rsmka_add_adapter done: rsm_register_handler"
		    "failed %d\n",
		    instance));
		return (NULL);
	}

	/* Initialize an adapter descriptor and add it to the adapter list */
	adapter = init_adapter(name, instance, hwaddr,
	    rsmpi_adapter_handle, rsmpi_ops_vector, srv_hdlr_argp);

	/* Copy over the attributes from the pointer returned to us */
	adapter->rsm_attr = *attr;

	/*
	 * With the addition of the topology obtainment interface, applications
	 * now get the local nodeid from the topology data structure.
	 *
	 * adapter->rsm_attr.attr_node_id = my_nodeid;
	 */
	DBG_PRINTF((category, RSM_ERR,
	    "rsmka_add_adapter: adapter = %lx\n", adapter));

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_add_adapter done\n"));

	/* return adapter pointer as a cookie for later fast access */
	return ((void *)adapter);
}


/*
 * Unlink the adapter descriptor and call rsmka_release_adapter which
 * will decrement the reference count and possibly free the desriptor.
 */
boolean_t
rsmka_remove_adapter(char *name, uint_t instance, void *cookie, int flags)
{
	adapter_t		*adapter;
	adapter_listhead_t	*listhead;
	adapter_t		*prev, *current;
	rsm_controller_object_t	rsm_cntl_obj;


	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_remove_adapter enter\n"));

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_remove_adapter: cookie = %lx\n", cookie));

	if (flags & RSMKA_USE_COOKIE) {
		adapter = (adapter_t *)cookie;
	} else {
		adapter = rsmka_lookup_adapter(name, instance);
		/*
		 * rsmka_lookup_adapter increments the ref_cnt; need
		 * to decrement here to get true count
		 */
		ADAPTER_RELE(adapter);
	}
	ASSERT(adapter->next_path == NULL);

	listhead = adapter->listhead;

	mutex_enter(&adapter_listhead_base.listlock);

	mutex_enter(&listhead->mutex);

	/* find the adapter in the list and remove it */
	prev = NULL;
	current = listhead->next_adapter;
	while (current != NULL) {
		if (adapter->instance == current->instance) {
			break;
		} else {
			prev = current;
			current = current->next;
		}
	}
	ASSERT(current != NULL);

	if (prev == NULL)
		listhead->next_adapter = current->next;
	else
		prev->next = current->next;

	listhead->adapter_count--;

	mutex_exit(&listhead->mutex);

	mutex_exit(&adapter_listhead_base.listlock);

	mutex_enter(&current->mutex);

	/*
	 * unregister the handler
	 */
	current->rsmpi_ops->rsm_unregister_handler(current->rsmpi_handle,
	    RSM_SERVICE+current->hwaddr, rsm_srv_func,
	    (rsm_intr_hand_arg_t)current->hdlr_argp);

	DBG_PRINTF((category, RSM_DEBUG, "rsmka_remove_adapter: unreg hdlr "
	    ":adapter=%lx, hwaddr=%lx\n", current, current->hwaddr));

	rsm_cntl_obj.handle = current->rsmpi_handle;
	rsm_cntl_obj.ops = current->rsmpi_ops;

	(void) rsm_release_controller(current->listhead->adapter_devname,
	    current->instance, &rsm_cntl_obj);

	mutex_exit(&current->mutex);

	rsmka_release_adapter(current);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_remove_adapter done\n"));

	return (B_TRUE);
}

/*
 * An adapter descriptor will exist from an earlier add_adapter. This
 * function does:
 *		initialize the path descriptor
 *		initialize the ipc descriptor (it may already exist)
 *		initialize and link a sendq token for this path
 */
void *
rsmka_add_path(char *adapter_name, int adapter_instance,
    rsm_node_id_t remote_node,
    rsm_addr_t remote_hwaddr, int rem_adapt_instance,
    void *cookie, int flags)
{

	path_t			*path;
	adapter_t		*adapter;
	char			tq_name[TASKQ_NAMELEN];

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_add_path enter\n"));

	/* allocate new path descriptor */
	path = kmem_zalloc(sizeof (path_t), KM_SLEEP);

	if (flags & RSMKA_USE_COOKIE) {
		adapter = (adapter_t *)cookie;
		ADAPTER_HOLD(adapter);
	} else {
		adapter = rsmka_lookup_adapter(adapter_name, adapter_instance);
	}

	DBG_PRINTF((category, RSM_DEBUG,
	    "rsmka_add_path: adapter = %lx\n", adapter));

	/*
	 * initialize path descriptor
	 * don't need to increment adapter reference count because
	 * it can't be removed if paths exist for it.
	 */
	mutex_init(&path->mutex, NULL, MUTEX_DEFAULT, NULL);

	PATH_HOLD(path);
	path->state = RSMKA_PATH_DOWN;
	path->remote_node = remote_node;
	path->remote_hwaddr = remote_hwaddr;
	path->remote_devinst = rem_adapt_instance;
	path->local_adapter = adapter;

	/* taskq is for sendq on adapter with remote_hwaddr on remote_node */
	(void) snprintf(tq_name, sizeof (tq_name), "%x_%llx",
	    remote_node, (unsigned long long) remote_hwaddr);

	path->recv_taskq = taskq_create_instance(tq_name, adapter_instance,
	    RSMKA_ONE_THREAD, maxclsyspri, RSMIPC_MAX_MESSAGES,
	    RSMIPC_MAX_MESSAGES, TASKQ_PREPOPULATE);

	/* allocate the message buffer array */
	path->msgbuf_queue = (msgbuf_elem_t *)kmem_zalloc(
	    RSMIPC_MAX_MESSAGES * sizeof (msgbuf_elem_t), KM_SLEEP);

	/*
	 * init cond variables for synch with rsmipc_send()
	 * and rsmka_remove_path
	 */
	cv_init(&path->sendq_token.sendq_cv, NULL, CV_DEFAULT, NULL);
	cv_init(&path->hold_cv, NULL, CV_DEFAULT, NULL);

	/* link path descriptor on adapter path list */
	link_path(path);

	/* link the path sendq token on the ipc_info token list */
	link_sendq_token(&path->sendq_token, remote_node);

	/* ADAPTER_HOLD done above by rsmka_lookup_adapter */
	ADAPTER_RELE(adapter);

	DBG_PRINTF((category, RSM_DEBUG, "rsmka_add_path: path = %lx\n", path));

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_add_path done\n"));
	return ((void *)path);
}

/*
 * Wait for the path descriptor reference count to become zero then
 * directly call path down processing.  Finally, unlink the sendq token and
 * free the path descriptor memory.
 *
 * Note: lookup_path locks the path and increments the path hold count
 */
void
rsmka_remove_path(char *adapter_name, int instance, rsm_node_id_t remote_node,
    rsm_addr_t remote_hwaddr, void *path_cookie, int flags)
{
	path_t		*path;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_remove_path enter\n"));

	if (flags & RSMKA_USE_COOKIE) {
		path = (path_t *)path_cookie;
		mutex_enter(&path->mutex);
	} else {
		path = lookup_path(adapter_name, instance,  remote_node,
		    remote_hwaddr);

		/*
		 * remember, lookup_path increments the reference
		 * count - so decrement now so we can get to zero
		 */
		PATH_RELE_NOLOCK(path);
	}

	DBG_PRINTF((category, RSM_DEBUG,
	    "rsmka_remove_path: path = %lx\n", path));

	while (path->state == RSMKA_PATH_GOING_DOWN)
		cv_wait(&path->hold_cv, &path->mutex);

	/* attempt to cancel any possibly pending work */
	mutex_enter(&work_queue.work_mutex);
	if (cancel_work(&path->work_token[RSMKA_IPC_UP_INDEX])) {
		PATH_RELE_NOLOCK(path);
	}
	if (cancel_work(&path->work_token[RSMKA_IPC_DOWN_INDEX])) {
		PATH_RELE_NOLOCK(path);
	}
	mutex_exit(&work_queue.work_mutex);

	/*
	 * The path descriptor ref cnt was set to 1 initially when
	 * the path was added.  So we need to do a decrement here to
	 * balance that.
	 */
	PATH_RELE_NOLOCK(path);

	switch (path->state) {
	case RSMKA_PATH_UP:
		/* clear the flag */
		path->flags &= ~RSMKA_SQCREATE_PENDING;
		path->state = RSMKA_PATH_DOWN;
		break;
	case RSMKA_PATH_DOWN:
		break;

	case RSMKA_PATH_ACTIVE:
		/*
		 * rsmka_remove_path should not call do_path_down
		 * with the RSMKA_NO_SLEEP flag set since for
		 * this code path, the deferred work would
		 * incorrectly do a PATH_RELE_NOLOCK.
		 */
		do_path_down(path, 0);
		break;
	default:
		mutex_exit(&path->mutex);
		DBG_PRINTF((category, RSM_ERR,
		    "rsm_remove_path: invalid path state %d\n",
		    path->state));
		return;

	}

	/*
	 * wait for all references to the path to be released. If a thread
	 * was waiting to receive credits do_path_down should wake it up
	 * since the path is going down and that will cause the sleeping
	 * thread to release its hold on the path.
	 */
	while (path->ref_cnt != 0) {
		cv_wait(&path->hold_cv, &path->mutex);
	}

	mutex_exit(&path->mutex);

	/*
	 * remove from ipc token list
	 * NOTE: use the remote_node value from the path structure
	 * since for RSMKA_USE_COOKIE being set, the remote_node
	 * value passed into rsmka_remove_path is 0.
	 */
	unlink_sendq_token(&path->sendq_token, path->remote_node);

	/* unlink from adapter path list and free path descriptor */
	destroy_path(path);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_remove_path done\n"));
}

/*
 *
 * LOCKING:
 * lookup_path locks the path and increments the path hold count. If the remote
 * node is not in the alive state, do_path_up will release the lock and
 * decrement the hold count.  Otherwise rsmka_do_path_active will release the
 * lock prior to waking up the work thread.
 *
 * REF_CNT:
 * The path descriptor ref_cnt is incremented here; it will be decremented
 * when path up processing is completed in do_path_up or by the work thread
 * if the path up is deferred.
 *
 */
boolean_t
rsmka_path_up(char *adapter_name, uint_t adapter_instance,
    rsm_node_id_t remote_node, rsm_addr_t remote_hwaddr,
    void *path_cookie, int flags)
{

	path_t			*path;
	boolean_t		rval = B_TRUE;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_path_up enter\n"));

	if (flags & RSMKA_USE_COOKIE) {
		path = (path_t *)path_cookie;
		mutex_enter(&path->mutex);
		PATH_HOLD_NOLOCK(path);
	} else {
		path = lookup_path(adapter_name, adapter_instance,
		    remote_node, remote_hwaddr);
	}

	while (path->state == RSMKA_PATH_GOING_DOWN)
		cv_wait(&path->hold_cv, &path->mutex);

	DBG_PRINTF((category, RSM_DEBUG, "rsmka_path_up: path = %lx\n", path));
	rval = do_path_up(path, flags);
	mutex_exit(&path->mutex);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_path_up done\n"));
	return (rval);
}

/*
 *
 * LOCKING:
 * lookup_path locks the path and increments the path hold count. If the
 * current state is ACTIVE the path lock is release prior to waking up
 * the work thread in do_path_down .  The work thread will decrement the hold
 * count when the work for this is finished.
 *
 *
 * REF_CNT:
 * The path descriptor ref_cnt is incremented here; it will be decremented
 * when path down processing is completed in do_path_down or by the work thread
 * if the path down is deferred.
 *
 */
boolean_t
rsmka_path_down(char *adapter_devname, int instance, rsm_node_id_t remote_node,
    rsm_addr_t remote_hwaddr,  void *path_cookie, int flags)
{
	path_t			*path;
	boolean_t		rval = B_TRUE;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_path_down enter\n"));

	if (flags & RSMKA_USE_COOKIE) {
		path = (path_t *)path_cookie;
		mutex_enter(&path->mutex);
		PATH_HOLD_NOLOCK(path);
	} else {
		path = lookup_path(adapter_devname, instance, remote_node,
		    remote_hwaddr);
	}

	while (path->state == RSMKA_PATH_GOING_DOWN)
		cv_wait(&path->hold_cv, &path->mutex);

	DBG_PRINTF((category, RSM_DEBUG,
	    "rsmka_path_down: path = %lx\n", path));

	switch (path->state) {
	case RSMKA_PATH_UP:
		/* clear the flag */
		path->flags &= ~RSMKA_SQCREATE_PENDING;
		path->state = RSMKA_PATH_GOING_DOWN;
		mutex_exit(&path->mutex);

		/*
		 * release path->mutex since enqueued tasks acquire it.
		 * Drain all the enqueued tasks.
		 */
		taskq_wait(path->recv_taskq);

		mutex_enter(&path->mutex);
		path->state = RSMKA_PATH_DOWN;
		PATH_RELE_NOLOCK(path);
		break;
	case RSMKA_PATH_DOWN:
		PATH_RELE_NOLOCK(path);
		break;
	case RSMKA_PATH_ACTIVE:
		do_path_down(path, flags);
		/*
		 * Need to release the path refcnt. Either done in do_path_down
		 * or do_deferred_work for RSMKA_NO_SLEEP being set. Has to be
		 * done here for RSMKA_NO_SLEEP not set.
		 */
		if (!(flags & RSMKA_NO_SLEEP))
			PATH_RELE_NOLOCK(path);
		break;
	default:
		DBG_PRINTF((category, RSM_ERR,
		    "rsm_path_down: invalid path state %d\n", path->state));
		rval = B_FALSE;
	}

	mutex_exit(&path->mutex);
	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_path_down done\n"));
	return (rval);
}


/*
 * Paths cannot become active until node_is_alive is marked true
 * in the ipc_info descriptor for the node
 *
 * In the event this is called before any paths have been added,
 * init_ipc_info if called here.
 *
 */
boolean_t
rsmka_node_alive(rsm_node_id_t remote_node)
{
	ipc_info_t *ipc_info;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_node_alive enter\n"));

	DBG_PRINTF((category, RSM_DEBUG,
	    "rsmka_node_alive: remote_node = %x\n", remote_node));

	ipc_info = lookup_ipc_info(remote_node);

	if (ipc_info == NULL) {
		ipc_info = init_ipc_info(remote_node, B_TRUE);
		DBG_PRINTF((category, RSM_DEBUG,
		    "rsmka_node_alive: new ipc_info = %lx\n", ipc_info));
	} else {
		ASSERT(ipc_info->node_is_alive == B_FALSE);
		ipc_info->node_is_alive = B_TRUE;
	}

	pathup_to_pathactive(ipc_info, remote_node);

	mutex_exit(&ipc_info_lock);

	/* rsmipc_send() may be waiting for a sendq_token */
	mutex_enter(&ipc_info_cvlock);
	cv_broadcast(&ipc_info_cv);
	mutex_exit(&ipc_info_cvlock);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_node_alive done\n"));

	return (B_TRUE);
}



/*
 * Paths cannot become active when node_is_alive is marked false
 * in the ipc_info descriptor for the node
 */
boolean_t
rsmka_node_died(rsm_node_id_t remote_node)
{
	ipc_info_t *ipc_info;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_node_died enter\n"));

	DBG_PRINTF((category, RSM_DEBUG,
	    "rsmka_node_died: remote_node = %x\n", remote_node));

	ipc_info = lookup_ipc_info(remote_node);
	if (ipc_info == NULL)
		return (B_FALSE);

	ASSERT(ipc_info->node_is_alive == B_TRUE);
	ipc_info->node_is_alive = B_FALSE;

	rsm_suspend_complete(remote_node, RSM_SUSPEND_NODEDEAD);

	mutex_exit(&ipc_info_lock);

	/* rsmipc_send() may be waiting for a sendq_token */
	mutex_enter(&ipc_info_cvlock);
	cv_broadcast(&ipc_info_cv);
	mutex_exit(&ipc_info_cvlock);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsmka_node_died done\n"));

	return (B_TRUE);
}

/*
 * Treat like path_down for all paths for the specified remote node.
 * Always invoked before node died.
 *
 * NOTE: This routine is not called from the cluster path interface; the
 * rsmka_path_down is called directly for each path.
 */
void
rsmka_disconnect_node(rsm_node_id_t remote_node, int flags)
{
	ipc_info_t	*ipc_info;
	path_t		*path;
	sendq_token_t	*sendq_token;
	work_token_t 	*up_token;
	work_token_t 	*down_token;
	boolean_t	do_work = B_FALSE;
	boolean_t	cancelled = B_FALSE;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_disconnect_node enter\n"));

	DBG_PRINTF((category, RSM_DEBUG,
	    "rsmka_disconnect_node: node = %d\n", remote_node));

	if (flags & RSMKA_NO_SLEEP) {
		ipc_info = lookup_ipc_info(remote_node);

		sendq_token = ipc_info->token_list;

		while (sendq_token != NULL) {
			path = SQ_TOKEN_TO_PATH(sendq_token);
			PATH_HOLD(path);
			up_token = &path->work_token[RSMKA_IPC_UP_INDEX];
			down_token = &path->work_token[RSMKA_IPC_DOWN_INDEX];

			mutex_enter(&work_queue.work_mutex);

			/* if an up token is enqueued, remove it */
			cancelled = cancel_work(up_token);

			/*
			 * If the path is active and down work hasn't
			 * already been setup then down work is needed.
			 * else
			 * if up work wasn't canceled because it was
			 * already being processed then down work is needed
			 */
			if (path->state == RSMKA_PATH_ACTIVE) {
				if (down_token->opcode == 0)
					do_work = B_TRUE;
			} else
				if (up_token->opcode == RSMKA_IPC_UP)
					do_work = B_TRUE;

			if (do_work == B_TRUE) {
				down_token->opcode = RSMKA_IPC_DOWN;
				enqueue_work(down_token);
			}
			mutex_exit(&work_queue.work_mutex);

			if (do_work == B_FALSE)
				PATH_RELE(path);

			if (cancelled) {
				PATH_RELE(path);
			}
			sendq_token = sendq_token->next;
		}

		/*
		 * Now that all the work is enqueued, wakeup the work
		 * thread.
		 */
		mutex_enter(&work_queue.work_mutex);
		cv_signal(&work_queue.work_cv);
		mutex_exit(&work_queue.work_mutex);

		IPCINFO_RELE_NOLOCK(ipc_info);
		mutex_exit(&ipc_info_lock);

	} else {
		/* get locked ipc_info descriptor */
		ipc_info = lookup_ipc_info(remote_node);

		sendq_token = ipc_info->token_list;
		while (sendq_token != NULL) {
			path = SQ_TOKEN_TO_PATH(sendq_token);
			DBG_PRINTF((category, RSM_DEBUG,
			    "rsmka_disconnect_node: path_down"
			    "for path = %x\n",
			    path));
			(void) rsmka_path_down(0, 0, 0, 0,
			    path, RSMKA_USE_COOKIE);
			sendq_token = sendq_token->next;
			if (sendq_token == ipc_info->token_list)
				break;
		}
		mutex_exit(&ipc_info_lock);
	}

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_disconnect_node done\n"));
}


/*
 * Called from rsm_node_alive - if a path to a remote node is in
 * state RSMKA_PATH_UP, transition the state to RSMKA_PATH_ACTIVE with a
 * call to rsmka_do_path_active.
 *
 * REF_CNT:
 * The path descriptor ref_cnt is incremented here; it will be decremented
 * when path up processing is completed in rsmka_do_path_active or by the work
 * thread if the path up is deferred.
 */
static void
pathup_to_pathactive(ipc_info_t *ipc_info, rsm_node_id_t remote_node)
{
	path_t		*path;
	sendq_token_t	*token;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "pathup_to_pathactive enter\n"));

	remote_node = remote_node;

	ASSERT(MUTEX_HELD(&ipc_info_lock));

	token = ipc_info->token_list;
	while (token != NULL) {
		path = SQ_TOKEN_TO_PATH(token);
		mutex_enter(&path->mutex);
		if (path->state == RSMKA_PATH_UP)  {
			PATH_HOLD_NOLOCK(path);
			(void) rsmka_do_path_active(path, 0);
		}
		mutex_exit(&path->mutex);
		token = token->next;
		if (token == ipc_info->token_list)
			break;
	}

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "pathup_to_pathactive done\n"));
}

/*
 * Called from pathup_to_pathactive and do_path_up. The objective is to
 * create an ipc send queue and transition to state RSMKA_PATH_ACTIVE.
 * For the no sleep case we may need to defer the work using a token.
 *
 */
boolean_t
rsmka_do_path_active(path_t *path, int flags)
{
	work_token_t	*up_token = &path->work_token[RSMKA_IPC_UP_INDEX];
	work_token_t	*down_token = &path->work_token[RSMKA_IPC_DOWN_INDEX];
	boolean_t	do_work = B_FALSE;
	int		error;
	timespec_t	tv;
	adapter_t	*adapter;
	rsm_send_q_handle_t	sqhdl;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_do_path_active enter\n"));

	ASSERT(MUTEX_HELD(&path->mutex));

	if (flags & RSMKA_NO_SLEEP) {
		mutex_enter(&work_queue.work_mutex);

		/* if a down token is enqueued, remove it */
		if (cancel_work(down_token)) {
			PATH_RELE_NOLOCK(path);
		}

		/*
		 * If the path is not active and up work hasn't
		 * already been setup then up work is needed.
		 * else
		 * if down work wasn't canceled because it was
		 * already being processed then up work is needed
		 */
		if (path->state != RSMKA_PATH_ACTIVE) {
			if (up_token->opcode == 0)
				do_work = B_TRUE;
		} else
			if (down_token->opcode == RSMKA_IPC_DOWN)
				do_work = B_TRUE;

		if (do_work == B_TRUE) {
			up_token->opcode = RSMKA_IPC_UP;
			enqueue_work(up_token);
		}
		else
			PATH_RELE_NOLOCK(path);

		mutex_exit(&work_queue.work_mutex);
		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
		    "rsmka_do_path_active done\n"));
		return (B_TRUE);
	} else {
		/*
		 * Drop the path lock before calling create_ipc_sendq, shouldn't
		 * hold locks across calls to RSMPI routines.
		 */
		mutex_exit(&path->mutex);

		error = create_ipc_sendq(path);

		mutex_enter(&path->mutex);
		if (path->state != RSMKA_PATH_UP) {
			/*
			 * path state has changed, if sendq was created,
			 * destroy it and return
			 */
			if (error == RSM_SUCCESS) {
				sqhdl = path->sendq_token.rsmpi_sendq_handle;
				path->sendq_token.rsmpi_sendq_handle = NULL;
				adapter = path->local_adapter;
				mutex_exit(&path->mutex);

				if (sqhdl != NULL) {
					adapter->rsmpi_ops->rsm_sendq_destroy(
						sqhdl);
				}
				mutex_enter(&path->mutex);
			}
			PATH_RELE_NOLOCK(path);

			DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
			    "rsmka_do_path_active done: path=%lx not UP\n",
			    (uintptr_t)path));
			return (error ? B_FALSE : B_TRUE);
		}

		if (error == RSM_SUCCESS) {
			/* clear flag since sendq_create succeeded */
			path->flags &= ~RSMKA_SQCREATE_PENDING;
			path->state = RSMKA_PATH_ACTIVE;
			/*
			 * now that path is active we send the
			 * RSMIPC_MSG_SQREADY to the remote endpoint
			 */
			path->procmsg_cnt = 0;
			path->sendq_token.msgbuf_avail = 0;

			/* Calculate local incarnation number */
			gethrestime(&tv);
			if (tv.tv_sec == RSM_UNKNOWN_INCN)
				tv.tv_sec = 1;
			path->local_incn = (int64_t)tv.tv_sec;

			/*
			 * if send fails here its due to some non-transient
			 * error because QUEUE_FULL is not possible here since
			 * we are the first message on this sendq. The error
			 * will cause the path to go down anyways so ignore
			 * the return value
			 */
			(void) rsmipc_send_controlmsg(path, RSMIPC_MSG_SQREADY);
			/* wait for SQREADY_ACK message */
			path->flags |= RSMKA_WAIT_FOR_SQACK;

			DBG_PRINTF((category, RSM_DEBUG,
			    "rsmka_do_path_active success\n"));
		} else {
			/*
			 * sendq create failed possibly because
			 * the remote end is not yet ready eg.
			 * handler not registered, set a flag
			 * so that when there is an indication
			 * that the remote end is ready rsmka_do_path_active
			 * will be retried.
			 */
			path->flags |= RSMKA_SQCREATE_PENDING;
		}

		PATH_RELE_NOLOCK(path);

		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
		    "rsmka_do_path_active done\n"));
		return (error ? B_FALSE : B_TRUE);
	}

}

/*
 * Called from rsm_path_up.
 * If the remote node state is "alive" then call rsmka_do_path_active
 * otherwise just transition path state to RSMKA_PATH_UP.
 */
static boolean_t
do_path_up(path_t *path, int flags)
{
	boolean_t	rval;
	boolean_t	node_alive;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "do_path_up enter\n"));

	ASSERT(MUTEX_HELD(&path->mutex));

	/* path moved to ACTIVE by rsm_sqcreateop_callback - just return */
	if (path->state == RSMKA_PATH_ACTIVE) {
		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
			"do_path_up done: already ACTIVE\n"));
		PATH_RELE_NOLOCK(path);
		return (B_TRUE);
	}

	path->state = RSMKA_PATH_UP;

	/* initialize the receive msgbuf counters */
	path->msgbuf_head = 0;
	path->msgbuf_tail = RSMIPC_MAX_MESSAGES - 1;
	path->msgbuf_cnt = 0;
	path->procmsg_cnt = 0;
	/*
	 * rsmka_check_node_alive acquires ipc_info_lock, in order to maintain
	 * correct lock ordering drop the path lock before calling it.
	 */
	mutex_exit(&path->mutex);

	node_alive = rsmka_check_node_alive(path->remote_node);

	mutex_enter(&path->mutex);
	if (node_alive == B_TRUE)
		rval = rsmka_do_path_active(path, flags);
	else {
		PATH_RELE_NOLOCK(path);
		rval = B_TRUE;
	}

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "do_path_up done\n"));
	return (rval);
}



/*
 * Called from rsm_remove_path, rsm_path_down, deferred_work.
 * Destroy the send queue on this path.
 * Disconnect segments being imported from the remote node
 * Disconnect segments being imported by the remote node
 *
 */
static void
do_path_down(path_t *path, int flags)
{
	work_token_t *up_token = &path->work_token[RSMKA_IPC_UP_INDEX];
	work_token_t *down_token = &path->work_token[RSMKA_IPC_DOWN_INDEX];
	boolean_t do_work = B_FALSE;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "do_path_down enter\n"));

	ASSERT(MUTEX_HELD(&path->mutex));

	if (flags & RSMKA_NO_SLEEP) {
		mutex_enter(&work_queue.work_mutex);
		DBG_PRINTF((category, RSM_DEBUG,
		    "do_path_down: after work_mutex\n"));

		/* if an up token is enqueued, remove it */
		if (cancel_work(up_token)) {
			PATH_RELE_NOLOCK(path);
		}

		/*
		 * If the path is active and down work hasn't
		 * already been setup then down work is needed.
		 * else
		 * if up work wasn't canceled because it was
		 * already being processed then down work is needed
		 */
		if (path->state == RSMKA_PATH_ACTIVE) {
			if (down_token->opcode == 0)
				do_work = B_TRUE;
		} else
			if (up_token->opcode == RSMKA_IPC_UP)
				do_work = B_TRUE;

		if (do_work == B_TRUE) {
			down_token->opcode = RSMKA_IPC_DOWN;
			enqueue_work(down_token);
		} else
			PATH_RELE_NOLOCK(path);


		mutex_exit(&work_queue.work_mutex);

	} else {

		/*
		 * Change state of the path to RSMKA_PATH_GOING_DOWN and
		 * release the path mutex. Any other thread referring
		 * this path would cv_wait till the state of the path
		 * remains RSMKA_PATH_GOING_DOWN.
		 * On completing the path down processing, change the
		 * state of RSMKA_PATH_DOWN indicating that the path
		 * is indeed down.
		 */
		path->state = RSMKA_PATH_GOING_DOWN;

		/*
		 * clear the WAIT_FOR_SQACK flag since path is going down.
		 */
		path->flags &= ~RSMKA_WAIT_FOR_SQACK;

		/*
		 * this wakes up any thread waiting to receive credits
		 * in rsmipc_send to tell it that the path is going down
		 */
		cv_broadcast(&path->sendq_token.sendq_cv);

		mutex_exit(&path->mutex);

		/*
		 * drain the messages from the receive msgbuf, the
		 * tasks in the taskq_thread acquire the path->mutex
		 * so we drop the path mutex before taskq_wait.
		 */
		taskq_wait(path->recv_taskq);

		/*
		 * Disconnect segments being imported from the remote node
		 * The path_importer_disconnect function needs to be called
		 * only after releasing the mutex on the path. This is to
		 * avoid a recursive mutex enter when doing the
		 * rsmka_get_sendq_token.
		 */
		path_importer_disconnect(path);

		/*
		 * Get the path mutex, change the state of the path to
		 * RSMKA_PATH_DOWN since the path down processing has
		 * completed and cv_signal anyone who was waiting since
		 * the state was RSMKA_PATH_GOING_DOWN.
		 * NOTE: Do not do a mutex_exit here. We entered this
		 * routine with the path lock held by the caller. The
		 * caller eventually releases the path lock by doing a
		 * mutex_exit.
		 */
		mutex_enter(&path->mutex);

#ifdef DEBUG
		/*
		 * Some IPC messages left in the recv_buf,
		 * they'll be dropped
		 */
		if (path->msgbuf_cnt != 0)
			cmn_err(CE_NOTE, "path=%lx msgbuf_cnt != 0\n",
			    (uintptr_t)path);
#endif
		while (path->sendq_token.ref_cnt != 0)
			cv_wait(&path->sendq_token.sendq_cv,
			    &path->mutex);

		/* release the rsmpi handle */
		if (path->sendq_token.rsmpi_sendq_handle != NULL)
			path->local_adapter->rsmpi_ops->rsm_sendq_destroy(
			    path->sendq_token.rsmpi_sendq_handle);

		path->sendq_token.rsmpi_sendq_handle = NULL;

		path->state = RSMKA_PATH_DOWN;

		cv_signal(&path->hold_cv);

	}

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "do_path_down done\n"));

}

/*
 * Search through the list of imported segments for segments using this path
 * and unload the memory mappings for each one.  The application will
 * get an error return when a barrier close is invoked.
 * NOTE: This function has to be called only after releasing the mutex on
 * the path. This is to avoid any recursive mutex panics on the path mutex
 * since the path_importer_disconnect function would end up calling
 * rsmka_get_sendq_token which requires the path mutex.
 */

static void
path_importer_disconnect(path_t *path)
{
	int i;
	adapter_t *adapter = path->local_adapter;
	rsm_node_id_t remote_node = path->remote_node;
	rsmresource_t		*p = NULL;
	rsmseg_t *seg;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "path_importer_disconnect enter\n"));

	rw_enter(&rsm_import_segs.rsmhash_rw, RW_READER);

	if (rsm_import_segs.bucket != NULL) {
		for (i = 0; i < rsm_hash_size; i++) {
			p = rsm_import_segs.bucket[i];
			for (; p; p = p->rsmrc_next) {
				if ((p->rsmrc_node == remote_node) &&
				    (p->rsmrc_adapter == adapter)) {
					seg = (rsmseg_t *)p;
			/*
			 * In order to make rsmseg_unload and
			 * path_importer_disconnect thread safe, acquire the
			 * segment lock here. rsmseg_unload is responsible for
			 * releasing the lock. rsmseg_unload releases the lock
			 * just before a call to rsmipc_send or in case of an
			 * early exit which occurs if the segment was in the
			 * state RSM_STATE_CONNECTING or RSM_STATE_NEW.
			 */
					rsmseglock_acquire(seg);
					seg->s_flags |= RSM_FORCE_DISCONNECT;
					rsmseg_unload(seg);
				}
			}
		}
	}
	rw_exit(&rsm_import_segs.rsmhash_rw);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "path_importer_disconnect done\n"));
}




/*
 *
 * ADAPTER UTILITY FUNCTIONS
 *
 */



/*
 * Allocate new adapter list head structure and add it to the beginning of
 * the list of adapter list heads.  There is one list for each adapter
 * device name (or type).
 */
static adapter_listhead_t *
init_listhead(char *name)
{
	adapter_listhead_t *listhead;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "init_listhead enter\n"));

	/* allocation and initialization */
	listhead = kmem_zalloc(sizeof (adapter_listhead_t), KM_SLEEP);
	mutex_init(&listhead->mutex, NULL, MUTEX_DEFAULT, NULL);
	(void) strcpy(listhead->adapter_devname, name);

	/* link into list of listheads */
	mutex_enter(&adapter_listhead_base.listlock);
	if (adapter_listhead_base.next == NULL) {
		adapter_listhead_base.next = listhead;
		listhead->next_listhead = NULL;
	} else {
		listhead->next_listhead = adapter_listhead_base.next;
		adapter_listhead_base.next = listhead;
	}
	mutex_exit(&adapter_listhead_base.listlock);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "init_listhead done\n"));

	return (listhead);
}


/*
 * Search the list of adapter list heads for a match on name.
 *
 */
static adapter_listhead_t *
lookup_adapter_listhead(char *name)
{
	adapter_listhead_t *listhead;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "lookup_adapter_listhead enter\n"));

	mutex_enter(&adapter_listhead_base.listlock);
	listhead = adapter_listhead_base.next;
	while (listhead != NULL) {
		if (strcmp(name, listhead->adapter_devname) == 0)
			break;
		listhead = listhead->next_listhead;
	}
	mutex_exit(&adapter_listhead_base.listlock);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "lookup_adapter_listhead done\n"));

	return (listhead);
}


/*
 * Get the adapter list head corresponding to devname and search for
 * an adapter descriptor with a match on the instance number. If
 * successful, increment the descriptor reference count and return
 * the descriptor pointer to the caller.
 *
 */
adapter_t *
rsmka_lookup_adapter(char *devname, int instance)
{
	adapter_listhead_t *listhead;
	adapter_t *current = NULL;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_lookup_adapter enter\n"));

	listhead = lookup_adapter_listhead(devname);
	if (listhead != NULL) {
		mutex_enter(&listhead->mutex);

		current = listhead->next_adapter;
		while (current != NULL) {
			if (current->instance == instance) {
				ADAPTER_HOLD(current);
				break;
			} else
				current = current->next;
		}

		mutex_exit(&listhead->mutex);
	}

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_lookup_adapter done\n"));

	return (current);
}

/*
 * Called from rsmka_remove_adapter or rsmseg_free.
 * rsm_bind() and rsm_connect() store the adapter pointer returned
 * from rsmka_getadapter.  The pointer is kept in the segment descriptor.
 * When the segment is freed, this routine is called by rsmseg_free to decrement
 * the adapter descriptor reference count and possibly free the
 * descriptor.
 */
void
rsmka_release_adapter(adapter_t *adapter)
{
	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_release_adapter enter\n"));

	if (adapter == &loopback_adapter) {
		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
		    "rsmka_release_adapter done\n"));
		return;
	}

	mutex_enter(&adapter->mutex);

	/* decrement reference count */
	ADAPTER_RELE_NOLOCK(adapter);

	/*
	 * if the adapter descriptor reference count is equal to the
	 * initialization value of one, then the descriptor has been
	 * unlinked and can now be freed.
	 */
	if (adapter->ref_cnt == 1) {
		mutex_exit(&adapter->mutex);

		mutex_destroy(&adapter->mutex);
		kmem_free(adapter->hdlr_argp, sizeof (srv_handler_arg_t));
		kmem_free(adapter, sizeof (adapter_t));
	}
	else
		mutex_exit(&adapter->mutex);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_release_adapter done\n"));

}



/*
 * Singly linked list. Add to the front.
 */
static void
link_adapter(adapter_t *adapter)
{

	adapter_listhead_t *listhead;
	adapter_t *current;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "link_adapter enter\n"));

	mutex_enter(&adapter_listhead_base.listlock);

	mutex_enter(&adapter->listhead->mutex);

	listhead = adapter->listhead;
	current = listhead->next_adapter;
	listhead->next_adapter = adapter;
	adapter->next = current;
	ADAPTER_HOLD(adapter);

	adapter->listhead->adapter_count++;

	mutex_exit(&adapter->listhead->mutex);

	mutex_exit(&adapter_listhead_base.listlock);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "link_adapter done\n"));
}


/*
 * Return adapter descriptor
 *
 * lookup_adapter_listhead returns with the the list of adapter listheads
 * locked.  After adding the adapter descriptor, the adapter listhead list
 * lock is dropped.
 */
static adapter_t *
init_adapter(char *name, int instance, rsm_addr_t hwaddr,
    rsm_controller_handle_t handle, rsm_ops_t *ops,
    srv_handler_arg_t *hdlr_argp)
{
	adapter_t *adapter;
	adapter_listhead_t *listhead;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "init_adapter enter\n"));

	adapter = kmem_zalloc(sizeof (adapter_t), KM_SLEEP);
	adapter->instance = instance;
	adapter->hwaddr = hwaddr;
	adapter->rsmpi_handle = handle;
	adapter->rsmpi_ops = ops;
	adapter->hdlr_argp = hdlr_argp;
	mutex_init(&adapter->mutex, NULL, MUTEX_DEFAULT, NULL);
	ADAPTER_HOLD(adapter);


	listhead = lookup_adapter_listhead(name);
	if (listhead == NULL)  {
		listhead = init_listhead(name);
	}

	adapter->listhead = listhead;

	link_adapter(adapter);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "init_adapter done\n"));

	return (adapter);
}

/*
 *
 * PATH UTILITY FUNCTIONS
 *
 */


/*
 * Search the per adapter path list for a match on remote node and
 * hwaddr.  The path ref_cnt must be greater than zero or the path
 * is in the process of being removed.
 *
 * Acquire the path lock and increment the path hold count.
 */
static path_t *
lookup_path(char *adapter_devname, int adapter_instance,
    rsm_node_id_t remote_node, rsm_addr_t hwaddr)
{
	path_t		*current;
	adapter_t	*adapter;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "lookup_path enter\n"));

	adapter = rsmka_lookup_adapter(adapter_devname, adapter_instance);
	ASSERT(adapter != NULL);

	mutex_enter(&adapter->listhead->mutex);

	/* start at the list head */
	current = adapter->next_path;

	while (current != NULL) {
		if ((current->remote_node == remote_node) &&
		    (current->remote_hwaddr == hwaddr) &&
		    (current->ref_cnt > 0))
			break;
		else
			current = current->next_path;
	}
	if (current != NULL) {
		mutex_enter(&current->mutex);
		PATH_HOLD_NOLOCK(current);
	}

	mutex_exit(&adapter->listhead->mutex);
	ADAPTER_RELE(adapter);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "lookup_path done\n"));

	return (current);
}

/*
 * This interface is similar to lookup_path but takes only the local
 * adapter name, instance and remote adapters hwaddr to identify the
 * path. This is used in the interrupt handler routines where nodeid
 * is not always available.
 */
path_t *
rsm_find_path(char *adapter_devname, int adapter_instance, rsm_addr_t hwaddr)
{
	path_t		*current;
	adapter_t	*adapter;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsm_find_path enter\n"));

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsm_find_path:adapter=%s:%d,rem=%llx\n",
	    adapter_devname, adapter_instance, hwaddr));

	adapter = rsmka_lookup_adapter(adapter_devname, adapter_instance);

	/*
	 * its possible that we are here due to an interrupt but the adapter
	 * has been removed after we received the callback.
	 */
	if (adapter == NULL)
		return (NULL);

	mutex_enter(&adapter->listhead->mutex);

	/* start at the list head */
	current = adapter->next_path;

	while (current != NULL) {
		if ((current->remote_hwaddr == hwaddr) &&
		    (current->ref_cnt > 0))
			break;
		else
			current = current->next_path;
	}
	if (current != NULL) {
		mutex_enter(&current->mutex);
		PATH_HOLD_NOLOCK(current);
	}

	mutex_exit(&adapter->listhead->mutex);

	rsmka_release_adapter(adapter);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rsm_find_path done\n"));

	return (current);
}


/*
 * Add the path to the head of the (per adapter) list of paths
 */
static void
link_path(path_t *path)
{

	adapter_t *adapter = path->local_adapter;
	path_t *first_path;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "link_path enter\n"));

	mutex_enter(&adapter_listhead_base.listlock);

	mutex_enter(&adapter->listhead->mutex);

	first_path = adapter->next_path;
	adapter->next_path = path;
	path->next_path = first_path;

	adapter->listhead->path_count++;

	mutex_exit(&adapter->listhead->mutex);

	mutex_exit(&adapter_listhead_base.listlock);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "link_path done\n"));
}

/*
 * Search the per-adapter list of paths for the specified path, beginning
 * at the head of the list.  Unlink the path and free the descriptor
 * memory.
 */
static void
destroy_path(path_t *path)
{

	adapter_t *adapter = path->local_adapter;
	path_t *prev, *current;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "destroy_path enter\n"));

	mutex_enter(&adapter_listhead_base.listlock);

	mutex_enter(&path->local_adapter->listhead->mutex);
	ASSERT(path->ref_cnt == 0);

	/* start at the list head */
	prev = NULL;
	current =  adapter->next_path;

	while (current != NULL) {
		if (path->remote_node == current->remote_node &&
		    path->remote_hwaddr == current->remote_hwaddr)
			break;
		else {
			prev = current;
			current = current->next_path;
		}
	}

	if (prev == NULL)
		adapter->next_path = current->next_path;
	else
		prev->next_path = current->next_path;

	path->local_adapter->listhead->path_count--;

	mutex_exit(&path->local_adapter->listhead->mutex);

	mutex_exit(&adapter_listhead_base.listlock);

	taskq_destroy(path->recv_taskq);

	kmem_free(path->msgbuf_queue,
	    RSMIPC_MAX_MESSAGES * sizeof (msgbuf_elem_t));

	mutex_destroy(&current->mutex);
	cv_destroy(&current->sendq_token.sendq_cv);
	cv_destroy(&path->hold_cv);
	kmem_free(current, sizeof (path_t));

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "destroy_path done\n"));
}

void
rsmka_enqueue_msgbuf(path_t *path, void *data)
{
	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_enqueue_msgbuf enter\n"));

	ASSERT(MUTEX_HELD(&path->mutex));

	ASSERT(path->msgbuf_cnt < RSMIPC_MAX_MESSAGES);

	/* increment the count and advance the tail */

	path->msgbuf_cnt++;

	if (path->msgbuf_tail == RSMIPC_MAX_MESSAGES - 1) {
		path->msgbuf_tail = 0;
	} else {
		path->msgbuf_tail++;
	}

	path->msgbuf_queue[path->msgbuf_tail].active = B_TRUE;

	bcopy(data, &(path->msgbuf_queue[path->msgbuf_tail].msg),
	    sizeof (rsmipc_request_t));

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_enqueue_msgbuf done\n"));

}

/*
 * get the head of the queue using rsmka_gethead_msgbuf and then call
 * rsmka_dequeue_msgbuf to remove it.
 */
void
rsmka_dequeue_msgbuf(path_t *path)
{
	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_dequeue_msgbuf enter\n"));

	ASSERT(MUTEX_HELD(&path->mutex));

	if (path->msgbuf_cnt == 0)
		return;

	path->msgbuf_cnt--;

	path->msgbuf_queue[path->msgbuf_head].active = B_FALSE;

	if (path->msgbuf_head == RSMIPC_MAX_MESSAGES - 1) {
		path->msgbuf_head = 0;
	} else {
		path->msgbuf_head++;
	}

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_dequeue_msgbuf done\n"));

}

msgbuf_elem_t *
rsmka_gethead_msgbuf(path_t *path)
{
	msgbuf_elem_t	*head;

	ASSERT(MUTEX_HELD(&path->mutex));

	if (path->msgbuf_cnt == 0)
		return (NULL);

	head = &path->msgbuf_queue[path->msgbuf_head];

	return (head);

}
/*
 * Called by rsm_connect which needs the hardware address of the
 * remote adapter.  A search is done through the paths for the local
 * adapter for a match on the specified remote node.
 */
rsm_node_id_t
get_remote_nodeid(adapter_t *adapter, rsm_addr_t remote_hwaddr)
{

	rsm_node_id_t remote_node;
	path_t	   *current = adapter->next_path;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_remote_nodeid enter\n"));

	mutex_enter(&adapter->listhead->mutex);
	while (current != NULL) {
		if (current->remote_hwaddr == remote_hwaddr) {
			remote_node = current->remote_node;
			break;
		}
		current = current->next_path;
	}

	if (current == NULL)
		remote_node = (rsm_node_id_t)-1;

	mutex_exit(&adapter->listhead->mutex);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_remote_nodeid done\n"));

	return (remote_node);
}

/*
 * Called by rsm_connect which needs the hardware address of the
 * remote adapter.  A search is done through the paths for the local
 * adapter for a match on the specified remote node.
 */
rsm_addr_t
get_remote_hwaddr(adapter_t *adapter, rsm_node_id_t remote_node)
{

	rsm_addr_t remote_hwaddr;
	path_t	   *current = adapter->next_path;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_remote_hwaddr enter\n"));

	mutex_enter(&adapter->listhead->mutex);
	while (current != NULL) {
		if (current->remote_node == remote_node) {
			remote_hwaddr = current->remote_hwaddr;
			break;
		}
		current = current->next_path;
	}
	if (current == NULL)
		remote_hwaddr = -1;
	mutex_exit(&adapter->listhead->mutex);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_remote_hwaddr done\n"));

	return (remote_hwaddr);
}
/*
 * IPC UTILITY FUNCTIONS
 */


/*
 * If an entry exists, return with the ipc_info_lock held
 */
static ipc_info_t *
lookup_ipc_info(rsm_node_id_t remote_node)
{
	ipc_info_t  *ipc_info;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "lookup_ipc_info enter\n"));

	mutex_enter(&ipc_info_lock);

	ipc_info = ipc_info_head;
	if (ipc_info == NULL) {
		mutex_exit(&ipc_info_lock);
		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
		    "lookup_ipc_info done: ipc_info is NULL\n"));
		return (NULL);
	}

	while (ipc_info->remote_node != remote_node) {
		ipc_info = ipc_info->next;
		if (ipc_info == NULL) {
			DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
			    "lookup_ipc_info: ipc_info not found\n"));
			mutex_exit(&ipc_info_lock);
			break;
		}
	}

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "lookup_ipc_info done\n"));

	return (ipc_info);
}

/*
 * Create an ipc_info descriptor and return with ipc_info_lock held
 */
static ipc_info_t *
init_ipc_info(rsm_node_id_t remote_node, boolean_t state)
{
	ipc_info_t *ipc_info;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "init_ipc_info enter\n"));

	/*
	 * allocate an ipc_info descriptor and add it to a
	 * singly linked list
	 */

	ipc_info = kmem_zalloc(sizeof (ipc_info_t), KM_SLEEP);
	ipc_info->remote_node = remote_node;
	ipc_info->node_is_alive = state;

	mutex_enter(&ipc_info_lock);
	if (ipc_info_head == NULL) {
		DBG_PRINTF((category, RSM_DEBUG,
		    "init_ipc_info:ipc_info_head = %lx\n", ipc_info));
		ipc_info_head = ipc_info;
		ipc_info->next = NULL;
	} else {
		ipc_info->next = ipc_info_head;
		ipc_info_head = ipc_info;
	}

	ipc_info->remote_node = remote_node;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "init_ipc_info done\n"));

	return (ipc_info);
}

static void
destroy_ipc_info(ipc_info_t *ipc_info)
{
	ipc_info_t *current = ipc_info_head;
	ipc_info_t *prev;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "destroy_ipc_info enter\n"));

	ASSERT(MUTEX_HELD(&ipc_info_lock));

	while (current != ipc_info) {
		prev = current;
		current = current->next;
	}
	ASSERT(current != NULL);

	if (current != ipc_info_head)
		prev->next = current->next;
	else
		ipc_info_head = current->next;

	kmem_free(current, sizeof (ipc_info_t));

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "destroy_ipc_info done\n"));

}

/*
 * Sendq tokens are kept on a circular list.  If tokens A, B, C, & D are
 * on the list headed by ipc_info, then ipc_info points to A, A points to
 * D, D to C, C to B, and B to A.
 */
static void
link_sendq_token(sendq_token_t *token, rsm_node_id_t remote_node)
{
	ipc_info_t *ipc_info;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "link_sendq_token enter\n"));

	ipc_info = lookup_ipc_info(remote_node);
	if (ipc_info == NULL) {
		ipc_info = init_ipc_info(remote_node, B_FALSE);
		DBG_PRINTF((category, RSM_DEBUG,
		    "link_sendq_token: new ipc_info = %lx\n", ipc_info));
	}
	else
		DBG_PRINTF((category, RSM_DEBUG,
		    "link_sendq_token: ipc_info = %lx\n", ipc_info));

	if (ipc_info->token_list == NULL) {
		ipc_info->token_list = token;
		ipc_info->current_token = token;
		DBG_PRINTF((category, RSM_DEBUG,
		    "link_sendq_token: current = %lx\n", token));
		token->next = token;
	} else {
		DBG_PRINTF((category, RSM_DEBUG,
		    "link_sendq_token: token = %lx\n", token));
		token->next = ipc_info->token_list->next;
		ipc_info->token_list->next = token;
		ipc_info->token_list = token;
	}


	mutex_exit(&ipc_info_lock);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "link_sendq_token done\n"));

}

static void
unlink_sendq_token(sendq_token_t *token, rsm_node_id_t remote_node)
{
	sendq_token_t *prev, *start,  *current;
	ipc_info_t *ipc_info;
	path_t *path = SQ_TOKEN_TO_PATH(token);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "unlink_sendq_token enter\n"));

	ASSERT(path->ref_cnt == 0);

	ipc_info = lookup_ipc_info(remote_node);
	if (ipc_info == NULL) {
		DBG_PRINTF((category, RSM_DEBUG,
		    "ipc_info for %d not found\n", remote_node));
		return;
	}

	prev = ipc_info->token_list;
	start = current = ipc_info->token_list->next;

	for (;;) {
		if (current == token) {
			if (current->next != current) {
				DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
				    "found token, removed it\n"));
				prev->next = token->next;
				if (ipc_info->token_list == token)
					ipc_info->token_list = prev;
				ipc_info->current_token = token->next;
			} else {
				/* list will be empty  */
				DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
				    "removed token, list empty\n"));
				ipc_info->token_list = NULL;
				ipc_info->current_token = NULL;
			}
			break;
		}
		prev = current;
		current = current->next;
		if (current == start) {
			DBG_PRINTF((category, RSM_DEBUG,
			    "unlink_sendq_token: token not found\n"));
			break;
		}
	}
	mutex_exit(&ipc_info_lock);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "unlink_sendq_token done\n"));
}


void
rele_sendq_token(sendq_token_t *token)
{
	path_t *path;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rele_sendq_token enter\n"));

	path = SQ_TOKEN_TO_PATH(token);
	mutex_enter(&path->mutex);
	PATH_RELE_NOLOCK(path);
	SENDQ_TOKEN_RELE(path);
	mutex_exit(&path->mutex);

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "rele_sendq_token done\n"));

}

/*
 * A valid ipc token can only be returned if the remote node is alive.
 * Tokens are on a circular list.  Starting with the current token
 * search for a token with an endpoint in state RSM_PATH_ACTIVE.
 * rsmipc_send which calls rsmka_get_sendq_token expects that if there are
 * multiple paths available between a node-pair then consecutive calls from
 * a particular invocation of rsmipc_send will return a sendq that is
 * different from the one that was used in the previous iteration. When
 * prev_used is NULL it indicates that this is the first interation in a
 * specific rsmipc_send invocation.
 *
 * Updating the current token provides round robin selection and this
 * is done only in the first iteration ie. when prev_used is NULL
 */
sendq_token_t *
rsmka_get_sendq_token(rsm_node_id_t remote_node, sendq_token_t *prev_used)
{
	sendq_token_t *token, *first_token;
	path_t *path;
	ipc_info_t *ipc_info;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
	    "rsmka_get_sendq_token enter\n"));

	ipc_info = lookup_ipc_info(remote_node);
	if (ipc_info == NULL) {
		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
		    "rsmka_get_sendq_token done: ipc_info is NULL\n"));
		return (NULL);
	}

	if (ipc_info->node_is_alive == B_TRUE) {
		token = first_token = ipc_info->current_token;
		if (token == NULL) {
			mutex_exit(&ipc_info_lock);
			DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
			    "rsmka_get_sendq_token done: token=NULL\n"));
			return (NULL);
		}

		for (;;) {
			path = SQ_TOKEN_TO_PATH(token);
			DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
			    "path %lx\n", path));
			mutex_enter(&path->mutex);
			if (path->state != RSMKA_PATH_ACTIVE ||
			    path->ref_cnt == 0) {
				mutex_exit(&path->mutex);
			} else {
				if (token != prev_used) {
					/* found a new token */
					break;
				}
				mutex_exit(&path->mutex);
			}

			token = token->next;
			if (token == first_token) {
				/*
				 * we didn't find a new token reuse prev_used
				 * if the corresponding path is still up
				 */
				if (prev_used) {
					path = SQ_TOKEN_TO_PATH(prev_used);
					DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
					    "path %lx\n", path));
					mutex_enter(&path->mutex);
					if (path->state != RSMKA_PATH_ACTIVE ||
					    path->ref_cnt == 0) {
						mutex_exit(&path->mutex);
					} else {
						token = prev_used;
						break;
					}
				}
				mutex_exit(&ipc_info_lock);
				DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
				    "rsmka_get_sendq_token: token=NULL\n"));
				return (NULL);
			}
		}

		PATH_HOLD_NOLOCK(path);
		SENDQ_TOKEN_HOLD(path);
		if (prev_used == NULL) {
			/* change current_token only the first time */
			ipc_info->current_token = token->next;
		}

		mutex_exit(&path->mutex);
		mutex_exit(&ipc_info_lock);

		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
		    "rsmka_get_sendq_token done\n"));
		return (token);
	} else {
		mutex_exit(&ipc_info_lock);
		DBG_PRINTF((category, RSM_DEBUG_VERBOSE,
		    "rsmka_get_sendq_token done\n"));
		return (NULL);
	}
}



/*
 */
static int
create_ipc_sendq(path_t *path)
{
	int		rval;
	sendq_token_t	*token;
	adapter_t 	*adapter;
	int64_t		srvc_offset;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "create_ipc_sendq enter\n"));

	DBG_PRINTF((category, RSM_DEBUG, "create_ipc_sendq: path = %lx\n",
	    path));

	adapter = path->local_adapter;
	token = &path->sendq_token;

	srvc_offset = path->remote_hwaddr;

	DBG_PRINTF((category, RSM_DEBUG,
	    "create_ipc_sendq: srvc_offset = %lld\n",
	    srvc_offset));

	rval = adapter->rsmpi_ops->rsm_sendq_create(adapter->rsmpi_handle,
	    path->remote_hwaddr,
	    (rsm_intr_service_t)(RSM_SERVICE+srvc_offset),
	    (rsm_intr_pri_t)RSM_PRI, (size_t)RSM_QUEUE_SZ,
	    RSM_INTR_SEND_Q_NO_FENCE,
	    RSM_RESOURCE_SLEEP, NULL, &token->rsmpi_sendq_handle);
	if (rval == RSM_SUCCESS) {
		/* rsmipc_send() may be waiting for a sendq_token */
		mutex_enter(&ipc_info_cvlock);
		cv_broadcast(&ipc_info_cv);
		mutex_exit(&ipc_info_cvlock);
	}

	DBG_PRINTF((category, RSM_DEBUG, "create_ipc_sendq: handle = %lx\n",
	    token->rsmpi_sendq_handle));
	DBG_PRINTF((category, RSM_DEBUG, "create_ipc_sendq: rval = %d\n",
	    rval));
	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "create_ipc_sendq done\n"));

	return (rval);
}


boolean_t
rsmka_check_node_alive(rsm_node_id_t remote_node)
{
	ipc_info_t *ipc_info;

	DBG_PRINTF((category, RSM_DEBUG, "rsmka_check_node_alive enter\n"));

	ipc_info = lookup_ipc_info(remote_node);
	if (ipc_info == NULL) {
		DBG_PRINTF((category, RSM_DEBUG,
		    "rsmka_check_node_alive done: ipc_info NULL\n"));
		return (B_FALSE);
	}

	if (ipc_info->node_is_alive == B_TRUE) {
		mutex_exit(&ipc_info_lock);
		DBG_PRINTF((category, RSM_DEBUG,
		    "rsmka_check_node_alive done: node is alive\n"));
		return (B_TRUE);
	} else {
		mutex_exit(&ipc_info_lock);
		DBG_PRINTF((category, RSM_DEBUG,
		    "rsmka_check_node_alive done: node is not alive\n"));
		return (B_FALSE);
	}
}




/*
 *  TOPOLOGY IOCTL SUPPORT
 */

static uint32_t
get_topology_size(int mode)
{
	uint32_t	topology_size;
	int		pointer_area_size;
	adapter_listhead_t	*listhead;
	int		total_num_of_adapters;
	int		total_num_of_paths;

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_topology_size enter\n"));

	/*
	 * Find the total number of adapters and paths by adding up the
	 * individual adapter and path counts from all the listheads
	 */
	total_num_of_adapters = 0;
	total_num_of_paths = 0;
	listhead = adapter_listhead_base.next;
	while (listhead != NULL) {
		total_num_of_adapters += listhead->adapter_count;
		total_num_of_paths += listhead->path_count;
		listhead = listhead->next_listhead;
	}

#ifdef	_MULTI_DATAMODEL
	if ((mode & DATAMODEL_MASK) == DATAMODEL_ILP32)
		/*
		 * Add extra 4-bytes to make sure connections header
		 * is double-word aligned
		 */
		pointer_area_size =
		    (total_num_of_adapters + total_num_of_adapters%2) *
		    sizeof (caddr32_t);
	else
		pointer_area_size = total_num_of_adapters * sizeof (caddr_t);
#else	/* _MULTI_DATAMODEL */
	mode = mode;
	pointer_area_size = total_num_of_adapters * sizeof (caddr_t);
#endif	/* _MULTI_DATAMODEL */


	topology_size = sizeof (rsmka_topology_hdr_t) +
	    pointer_area_size +
	    (total_num_of_adapters * sizeof (rsmka_connections_hdr_t)) +
	    (total_num_of_paths * sizeof (rsmka_remote_cntlr_t));

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_topology_size done\n"));

	return (topology_size);
}



static void
get_topology(caddr_t arg, char *bufp, int mode)
{

	rsmka_topology_t	*tp = (rsmka_topology_t *)bufp;
	adapter_listhead_t	*listhead;
	adapter_t		*adapter;
	path_t			*path;
	int			cntlr = 0;
	rsmka_connections_t	*connection;
	rsmka_remote_cntlr_t	*rem_cntlr;
	int			total_num_of_adapters;

#ifdef	_MULTI_DATAMODEL
	rsmka_topology32_t	*tp32 = (rsmka_topology32_t *)bufp;
#else
	mode = mode;
#endif	/* _MULTI_DATAMODEL */

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_topology enter\n"));

	/*
	 * Find the total number of adapters by adding up the
	 * individual adapter counts from all the listheads
	 */
	total_num_of_adapters = 0;
	listhead = adapter_listhead_base.next;
	while (listhead != NULL) {
		total_num_of_adapters += listhead->adapter_count;
		listhead = listhead->next_listhead;
	}

	/* fill topology header and adjust bufp */
	tp->topology_hdr.local_nodeid = my_nodeid;
	tp->topology_hdr.local_cntlr_count = total_num_of_adapters;
	bufp = (char *)&tp->connections[0];

	/* leave room for connection pointer area */
#ifdef	_MULTI_DATAMODEL
	if ((mode & DATAMODEL_MASK) == DATAMODEL_ILP32)
		/* make sure bufp is double-word aligned */
		bufp += (total_num_of_adapters + total_num_of_adapters%2) *
		    sizeof (caddr32_t);
	else
		bufp += total_num_of_adapters * sizeof (caddr_t);
#else	/* _MULTI_DATAMODEL */
	bufp += total_num_of_adapters * sizeof (caddr_t);
#endif	/* _MULTI_DATAMODEL */

	/* fill topology from the adapter and path data */
	listhead = adapter_listhead_base.next;
	while (listhead != NULL) {
		adapter = listhead->next_adapter;
		while (adapter != NULL) {
			/* fill in user based connection pointer */
#ifdef	_MULTI_DATAMODEL
			if ((mode & DATAMODEL_MASK) == DATAMODEL_ILP32) {
				ulong_t delta = (ulong_t)bufp - (ulong_t)tp32;
				caddr32_t userbase = (caddr32_t)((ulong_t)arg &
				    0xffffffff);
				tp32->connections[cntlr++] = userbase + delta;
			} else {
				tp->connections[cntlr++] = arg +
				    (ulong_t)bufp -
				    (ulong_t)tp;
			}
#else	/* _MULTI_DATAMODEL */
				tp->connections[cntlr++] = arg +
				    (ulong_t)bufp -
				    (ulong_t)tp;
#endif	/* _MULTI_DATAMODEL */
			connection = (rsmka_connections_t *)bufp;
			(void) snprintf(connection->hdr.cntlr_name,
			    MAXNAMELEN, "%s%d",
			    listhead->adapter_devname,
			    adapter->instance);
			connection->hdr.local_hwaddr = adapter->hwaddr;
			connection->hdr.remote_cntlr_count = 0;
			bufp += sizeof (rsmka_connections_hdr_t);
			rem_cntlr = (rsmka_remote_cntlr_t *)bufp;
			path = adapter->next_path;
			while (path != NULL) {
				connection->hdr.remote_cntlr_count++;
				rem_cntlr->remote_nodeid = path->remote_node;
				(void) snprintf(rem_cntlr->remote_cntlrname,
				    MAXNAMELEN, "%s%d",
				    listhead->adapter_devname,
				    path->remote_devinst);
				rem_cntlr->remote_hwaddr = path->remote_hwaddr;
				rem_cntlr->connection_state = path->state;
				++rem_cntlr;
				path = path->next_path;
			}
			adapter = adapter->next;
			bufp = (char *)rem_cntlr;
		}
		listhead = listhead->next_listhead;
	}

	DBG_PRINTF((category, RSM_DEBUG_VERBOSE, "get_topology done\n"));

}


/*
 * Called from rsm_ioctl() in rsm.c
 * Make sure there is no possiblity of blocking while holding
 * adapter_listhead_base.lock
 */
int
rsmka_topology_ioctl(caddr_t arg, int cmd, int mode)
{
	uint32_t	topology_size;
	uint32_t 	request_size;
	char		*bufp;
	int		error = RSM_SUCCESS;
	size_t		max_toposize;

	DBG_PRINTF((category | RSM_IOCTL, RSM_DEBUG_VERBOSE,
	    "rsmka_topology_ioctl enter\n"));

	switch (cmd) {
	case RSM_IOCTL_TOPOLOGY_SIZE:
		mutex_enter(&adapter_listhead_base.listlock);
		topology_size = get_topology_size(mode);
		mutex_exit(&adapter_listhead_base.listlock);
		if (ddi_copyout((caddr_t)&topology_size,
		    (caddr_t)arg, sizeof (uint32_t), mode))
			error = RSMERR_BAD_ADDR;
		break;
	case RSM_IOCTL_TOPOLOGY_DATA:
		/*
		 * The size of the buffer which the caller has allocated
		 * is passed in.  If the size needed for the topology data
		 * is not sufficient, E2BIG is returned
		 */
		if (ddi_copyin(arg, &request_size, sizeof (uint32_t), mode)) {
			DBG_PRINTF((category | RSM_IOCTL, RSM_DEBUG_VERBOSE,
			    "rsmka_topology_ioctl done: BAD_ADDR\n"));
			return (RSMERR_BAD_ADDR);
		}
		/* calculate the max size of the topology structure */
		max_toposize = sizeof (rsmka_topology_hdr_t) +
		    RSM_MAX_CTRL * (sizeof (caddr_t) +
			sizeof (rsmka_connections_hdr_t)) +
		    RSM_MAX_NODE * sizeof (rsmka_remote_cntlr_t);

		if (request_size > max_toposize) { /* validate request_size */
			DBG_PRINTF((category | RSM_IOCTL, RSM_DEBUG_VERBOSE,
			    "rsmka_topology_ioctl done: size too large\n"));
			return (EINVAL);
		}
		bufp = kmem_zalloc(request_size, KM_SLEEP);
		mutex_enter(&adapter_listhead_base.listlock);
		topology_size = get_topology_size(mode);
		if (request_size < topology_size) {
			kmem_free(bufp, request_size);
			mutex_exit(&adapter_listhead_base.listlock);
			DBG_PRINTF((category | RSM_IOCTL, RSM_DEBUG_VERBOSE,
			    "rsmka_topology_ioctl done: E2BIG\n"));
			return (E2BIG);
		}

		/* get the topology data and copyout to the caller */
		get_topology(arg, bufp, mode);
		mutex_exit(&adapter_listhead_base.listlock);
		if (ddi_copyout((caddr_t)bufp, (caddr_t)arg,
		    topology_size, mode))
			error = RSMERR_BAD_ADDR;

		kmem_free(bufp, request_size);
		break;
	default:
		DBG_PRINTF((category | RSM_IOCTL, RSM_DEBUG,
		    "rsmka_topology_ioctl: cmd not supported\n"));
		error = DDI_FAILURE;
	}

	DBG_PRINTF((category | RSM_IOCTL, RSM_DEBUG_VERBOSE,
	    "rsmka_topology_ioctl done: %d\n", error));
	return (error);
}