xref: /titanic_41/usr/src/cmd/lvm/rpc.mdcommd/mdmn_commd_server.c (revision db2bae3047e71d795bde12e3baa621f4b6cc8930)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <unistd.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <sys/statvfs.h>
32 #include <sys/uadmin.h>
33 #include <sys/resource.h>
34 #include <fcntl.h>
35 #include <stdio.h>
36 #include <thread.h>
37 #include <meta.h>
38 #include <sdssc.h>
39 #include <mdmn_changelog.h>
40 #include "mdmn_subr.h"
41 
42 /*
43  * This is the communication daemon for SVM Multi Node Disksets.
44  * It runs on every node and provides the following rpc services:
45  *  - mdmn_send_svc_1
46  *  - mdmn_work_svc_1
47  *  - mdmn_wakeup_initiator_svc_1
48  *  - mdmn_wakeup_master_svc_1
49  *  - mdmn_comm_lock_svc_1
50  *  - mdmn_comm_unlock_svc_1
51  *  - mdmn_comm_suspend_svc_1
52  *  - mdmn_comm_resume_svc_1
53  *  - mdmn_comm_reinit_set_svc_1
54  * where send, lock, unlock and reinit are meant for external use,
55  * work and the two wakeups are for internal use only.
56  *
57  * NOTE:
58  * On every node only one of those xxx_1 functions can be active at the
59  * same time because the daemon is single threaded.
60  *
61  *
62  * In case an event occurs that has to be propagated to all the nodes...
63  *
64  * One node (the initiator)
65  *	calls the libmeta function mdmn_send_message()
66  *	This function calls the local daemon thru mdmn_send_svc_1.
67  *
68  * On the initiator:
69  *	mdmn_send_svc_1()
70  *	    - starts a thread -> mdmn_send_to_work() and returns.
71  *	mdmn_send_to_work()
72  *	    - sends this message over to the master of the diskset.
73  *	      This is done by calling mdmn_work_svc_1 on the master.
74  *	    - registers to the initiator_table
75  *	    - exits without doing a svc_sendreply() for the call to
76  *	      mdmn_send_svc_1. This means that call is blocked until somebody
77  *	      (see end of this comment) does a svc_sendreply().
78  *	      This means mdmn_send_message() does not yet return.
79  *	    - A timeout surveillance is started at this point.
80  *	      This means in case the master doesn't reply at all in an
81  *	      aproppriate time, an error condition is returned
82  *	      to the caller.
83  *
84  * On the master:
85  *	mdmn_work_svc_1()
86  *	    - starts a thread -> mdmn_master_process_msg() and returns
87  *	mdmn_master_process_msg()
88  *	    - logs the message to the change log
89  *	    - executes the message locally
90  *	    - flags the message in the change log
91  *	    - sends the message to mdmn_work_svc_1() on all the
92  *	      other nodes (slaves)
93  *	      after each call to mdmn_work_svc_1 the thread goes to sleep and
94  *	      will be woken up by mdmn_wakeup_master_svc_1() as soon as the
95  *	      slave node is done with this message.
96  *	    - In case the slave doesn't respond in a apropriate time, an error
97  *	      is assumed to ensure the master doesn't wait forever.
98  *
99  * On a slave:
100  *	mdmn_work_svc_1()
101  *	    - starts a thread -> mdmn_slave_process_msg() and returns
102  *	mdmn_slave_process_msg()
103  *	    - processes this message locally by calling the appropriate message
104  *	      handler, that creates some result.
105  *	    - sends that result thru a call to mdmn_wakeup_master_svc_1() to
106  *	      the master.
107  *
108  * Back on the master:
109  *	mdmn_wakeup_master_svc_1()
110  *	    - stores the result into the master_table.
111  *	    - signals the mdmn_master_process_msg-thread.
112  *	    - returns
113  *	mdmn_master_process_msg()
114  *	    - after getting the results from all nodes
115  *	    - sends them back to the initiating node thru a call to
116  *	      mdmn_wakeup_initiator_svc_1.
117  *
118  * Back on the initiator:
119  *	mdmn_wakeup_initiator_svc_1()
120  *	    - calls svc_sendreply() which makes the call to mdmn_send_svc_1()
121  *	      return.
122  *	      which allows the initial mdmn_send_message() call to return.
123  */
124 
125 FILE *commdout;		/* debug output for the commd */
126 char *commdoutfile;	/* file name for the above output */
127 /* want at least 10 MB free space when logging into a file */
128 #define	MIN_FS_SPACE	(10LL * 1024 * 1024)
129 
130 /*
131  * Number of outstanding messages that were initiated by this node.
132  * If zero, check_timeouts goes to sleep
133  */
134 uint_t	messages_on_their_way;
135 mutex_t	check_timeout_mutex;	/* need mutex to protect above */
136 cond_t	check_timeout_cv;	/* trigger for check_timeouts */
137 
138 /* for printing out time stamps */
139 hrtime_t __savetime;
140 
141 /* RPC clients for every set and every node and their protecting locks */
142 CLIENT	*client[MD_MAXSETS][NNODES];
143 rwlock_t client_rwlock[MD_MAXSETS];
144 
145 /* the descriptors of all possible sets and their protectors */
146 struct md_set_desc *set_descriptor[MD_MAXSETS];
147 rwlock_t set_desc_rwlock[MD_MAXSETS];
148 
149 /* the daemon to daemon communication has to timeout quickly */
150 static struct timeval FOUR_SECS = { 4, 0 };
151 
152 /* These indicate if a set has already been setup */
153 int md_mn_set_inited[MD_MAXSETS];
154 
155 /* For every set we have a message completion table and protecting mutexes */
156 md_mn_mct_t *mct[MD_MAXSETS];
157 mutex_t	mct_mutex[MD_MAXSETS][MD_MN_NCLASSES];
158 
159 /* Stuff to describe the global status of the commd on one node */
160 #define	MD_CGS_INITED		0x0001
161 #define	MD_CGS_ABORTED		0x0002	/* return everything with MDMNE_ABORT */
162 uint_t md_commd_global_state = 0;	/* No state when starting up */
163 
164 /*
165  * Global verbosity level for the daemon
166  */
167 uint_t md_commd_global_verb;
168 
169 /*
170  * libmeta doesn't like multiple threads in metaget_setdesc().
171  * So we must protect access to it with a global lock
172  */
173 mutex_t get_setdesc_mutex;
174 
175 /*
176  * Need a way to block single message types,
177  * hence an array with a status for every message type
178  */
179 uint_t msgtype_lock_state[MD_MN_NMESSAGES];
180 
181 /* for reading in the config file */
182 #define	MAX_LINE_SIZE 1024
183 
184 extern char *commd_get_outfile(void);
185 extern uint_t commd_get_verbosity(void);
186 
187 /*
188  * mdmn_clnt_create is a helper function for meta_client_create_retry.  It
189  * merely needs to call clnt_create_timed, and meta_client_create_retry
190  * will take care of the rest.
191  */
192 /* ARGSUSED */
193 static CLIENT *
194 mdmn_clnt_create(char *ignore, void *data, struct timeval *time_out)
195 {
196 	md_mnnode_desc	*node = (md_mnnode_desc *)data;
197 
198 	return (clnt_create_timed(node->nd_priv_ic, MDMN_COMMD, ONE, "tcp",
199 		time_out));
200 }
201 
202 #define	FLUSH_DEBUGFILE() \
203 	if (commdout != (FILE *)NULL) { \
204 		fflush(commdout); \
205 		fsync(fileno(commdout)); \
206 	}
207 
208 static void
209 panic_system(int nid, md_mn_msgtype_t type, int master_err, int master_exitval,
210     md_mn_result_t *slave_result)
211 {
212 	md_mn_commd_err_t	commd_err;
213 	md_error_t		mne = mdnullerror;
214 	char			*msg_buf;
215 
216 	msg_buf = (char *)calloc(MAXPATHLEN + 1, sizeof (char));
217 
218 	FLUSH_DEBUGFILE();
219 
220 	if (master_err != MDMNE_ACK) {
221 		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail on master "
222 			"when processing message type %d\n", type);
223 	} else if (slave_result == NULL) {
224 		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail on node "
225 			"%d when processing message type %d\n", nid, type);
226 	} else {
227 		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: Inconsistent "
228 			"return value from node %d when processing message "
229 			"type %d. Master exitval = %d, Slave exitval = %d\n",
230 			nid, type, master_exitval, slave_result->mmr_exitval);
231 	}
232 	commd_err.size = strlen(msg_buf);
233 	commd_err.md_message = (uint64_t)(uintptr_t)&msg_buf[0];
234 
235 	metaioctl(MD_MN_COMMD_ERR, &commd_err, &mne, "rpc.mdcommd");
236 	(void) uadmin(A_DUMP, AD_BOOT, NULL);
237 }
238 
239 static void
240 flush_fcout()
241 {
242 	struct statvfs64 vfsbuf;
243 	long long avail_bytes;
244 	int warned = 0;
245 
246 	for (; ; ) {
247 		sleep(10);
248 		/* No output file, nothing to do */
249 		if (commdout == (FILE *)NULL)
250 			continue;
251 
252 		/*
253 		 * stat the appropriate filesystem to check for available space.
254 		 */
255 		if (statvfs64(commdoutfile, &vfsbuf)) {
256 			continue;
257 		}
258 
259 		avail_bytes = vfsbuf.f_frsize * vfsbuf.f_bavail;
260 		/*
261 		 * If we don't have enough space, we print out a warning.
262 		 * And we drop the verbosity level to NULL
263 		 * In case the condtion doesn't go away, we don't repeat
264 		 * the warning.
265 		 */
266 		if (avail_bytes < MIN_FS_SPACE) {
267 			if (warned) {
268 				continue;
269 			}
270 			commd_debug(MD_MMV_SYSLOG,
271 			    "NOT enough space available for logging\n");
272 			commd_debug(MD_MMV_SYSLOG,
273 			    "Have %lld bytes, need %lld bytes\n",
274 			    avail_bytes, MIN_FS_SPACE);
275 			warned = 1;
276 			md_commd_global_verb = MD_MMV_NULL;
277 		} else {
278 			warned = 0;
279 		}
280 
281 		fflush(commdout);
282 	}
283 }
284 
285 /* safer version of clnt_destroy. If clnt is NULL don't do anything */
286 #define	mdmn_clnt_destroy(clnt) {	\
287 	if (clnt)			\
288 		clnt_destroy(clnt);	\
289 }
290 
291 /*
292  * Own version of svc_sendreply that checks the integrity of the transport
293  * handle and so prevents us from core dumps in the real svc_sendreply()
294  */
295 void
296 mdmn_svc_sendreply(SVCXPRT *transp, xdrproc_t xdr, caddr_t data)
297 {
298 	if (SVC_STAT(transp) == XPRT_DIED) {
299 		commd_debug(MD_MMV_MISC,
300 		    "mdmn_svc_sendreply: XPRT_DIED\n");
301 		return;
302 	}
303 	(void) svc_sendreply(transp, xdr, data);
304 }
305 
306 /*
307  * timeout_initiator(set, class)
308  *
309  * Alas, I sent a message and didn't get a response back in aproppriate time.
310  *
311  * timeout_initiator() takes care for doing the needed svc_sendreply() to the
312  * calling mdmn_send_message, so that guy doesn't wait forever
313  * What is done here is pretty much the same as what is done in
314  * wakeup initiator. The difference is that we cannot provide for any results,
315  * of course and we set the comm_state to MDMNE_TIMEOUT.
316  *
317  * By doing so, mdmn_send_message can decide if a retry would make sense or not.
318  * It's not our's to decide that here.
319  */
320 void
321 timeout_initiator(set_t setno, md_mn_msgclass_t class)
322 {
323 	SVCXPRT		*transp;
324 	md_mn_msgid_t	mid;
325 	md_mn_result_t *resultp;
326 
327 	resultp = Zalloc(sizeof (md_mn_result_t));
328 	resultp->mmr_comm_state	= MDMNE_TIMEOUT;
329 
330 	commd_debug(MD_MMV_MISC,
331 	    "timeout_initiator set = %d, class = %d\n", setno, class);
332 
333 	transp = mdmn_get_initiator_table_transp(setno, class);
334 	mdmn_get_initiator_table_id(setno, class, &mid);
335 
336 	commd_debug(MD_MMV_MISC, "timeout_ini: (%d, 0x%llx-%d)\n",
337 	    MSGID_ELEMS(mid));
338 
339 	/* return to mdmn_send_message() and let it deal with the situation */
340 	mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
341 
342 	free(resultp);
343 	commd_debug(MD_MMV_MISC, "timeout_ini: sendreplied\n");
344 	mdmn_unregister_initiator_table(setno, class);
345 }
346 
347 
348 /*
349  * check_timeouts - thread
350  *
351  * This implements a timeout surveillance for messages sent from the
352  * initiator to the master.
353  *
354  * If a message is started, this thread is triggered thru
355  * cond_signal(&check_timeout_cv) and we keep track of the numbers of
356  * messages that are outstanding (messages_on_their_way).
357  *
358  * As long as there are messages on their way, this thread never goes to sleep.
359  * It'll keep checking all class/set combinations for outstanding messages.
360  * If one is found, it's checked if this message is overdue. In that case,
361  * timeout_initiator() is called to wakeup the calling mdmn_send_message and
362  * to clean up the mess.
363  *
364  * If the result from the master arrives later, this message is considered
365  * to be unsolicited. And will be ignored.
366  */
367 
368 void
369 check_timeouts()
370 {
371 	set_t			setno;
372 	time_t			now, then;
373 	mutex_t			*mx;
374 	md_mn_msgclass_t	class;
375 
376 	for (; ; ) {
377 		now = time((time_t *)NULL);
378 		for (setno = 1; setno < MD_MAXSETS; setno++) {
379 			if (md_mn_set_inited[setno] != MDMN_SET_READY) {
380 				continue;
381 			}
382 			for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES;
383 			    class++) {
384 				mx = mdmn_get_initiator_table_mx(setno, class);
385 				mutex_lock(mx);
386 
387 				/* then is the registered time */
388 				then =
389 				    mdmn_get_initiator_table_time(setno, class);
390 				if ((then != 0) && (now > then)) {
391 					timeout_initiator(setno, class);
392 				}
393 				mutex_unlock(mx);
394 			}
395 		}
396 		/* it's ok to check only once per second */
397 		sleep(1);
398 
399 		/* is there work to do? */
400 		mutex_lock(&check_timeout_mutex);
401 		if (messages_on_their_way == 0) {
402 			cond_wait(&check_timeout_cv, &check_timeout_mutex);
403 		}
404 		mutex_unlock(&check_timeout_mutex);
405 	}
406 }
407 
408 void
409 setup_debug(void)
410 {
411 	char	*tmp_dir;
412 
413 	/* Read in the debug-controlling tokens from runtime.cf */
414 	md_commd_global_verb = commd_get_verbosity();
415 	/*
416 	 * If the user didn't specify a verbosity level in runtime.cf
417 	 * we can safely return here. As we don't intend to printout
418 	 * debug messages, we don't need to check for the output file.
419 	 */
420 	if (md_commd_global_verb == 0) {
421 		return;
422 	}
423 
424 	/* if commdout is non-NULL it is an open FILE, we'd better close it */
425 	if (commdout != (FILE *)NULL) {
426 		fclose(commdout);
427 	}
428 
429 	commdoutfile = commd_get_outfile();
430 
431 	/* setup the debug output */
432 	if (commdoutfile == (char *)NULL) {
433 		/* if no valid file was specified, use the default */
434 		commdoutfile = "/var/run/commd.out";
435 		commdout = fopen(commdoutfile, "a");
436 	} else {
437 		/* check if the directory exists and is writable */
438 		tmp_dir = strdup(commdoutfile);
439 		if ((access(dirname(tmp_dir), X_OK|W_OK)) ||
440 		    ((commdout = fopen(commdoutfile, "a")) == (FILE *)NULL)) {
441 			syslog(LOG_ERR,
442 			    "Can't write to specified output file %s,\n"
443 			    "using /var/run/commd.out instead\n", commdoutfile);
444 			free(commdoutfile);
445 			commdoutfile = "/var/run/commd.out";
446 			commdout = fopen(commdoutfile, "a");
447 		}
448 		free(tmp_dir);
449 	}
450 
451 	if (commdout == (FILE *)NULL) {
452 		syslog(LOG_ERR, "Can't write to debug output file %s\n",
453 		    commdoutfile);
454 	}
455 }
456 
457 /*
458  * mdmn_is_node_dead checks to see if a node is dead using
459  * the SunCluster infrastructure which is a stable interface.
460  * If unable to contact SunCuster the node is assumed to be alive.
461  * Return values:
462  *	1 - node is dead
463  *	0 - node is alive
464  */
465 int
466 mdmn_is_node_dead(md_mnnode_desc *node)
467 {
468 	char	*fmt = "/usr/cluster/bin/scha_cluster_get -O NODESTATE_NODE ";
469 	char	*cmd;
470 	size_t	size;
471 	char	buf[10];
472 	FILE	*ptr;
473 	int	retval = 0;
474 
475 	/* I know that I'm alive */
476 	if (strcmp(node->nd_nodename, mynode()) == 0)
477 		return (retval);
478 
479 	size = strlen(fmt) + strlen(node->nd_nodename) + 1;
480 	cmd = Zalloc(size);
481 	(void) strlcat(cmd, fmt, size);
482 	(void) strlcat(cmd, node->nd_nodename, size);
483 
484 	if ((ptr = popen(cmd, "r")) != NULL) {
485 		if (fgets(buf, sizeof (buf), ptr) != NULL) {
486 			/* If scha_cluster_get returned DOWN - return dead */
487 			if (strncmp(buf, "DOWN", 4) == 0)
488 				retval = 1;
489 		}
490 		(void) pclose(ptr);
491 	}
492 	Free(cmd);
493 	return (retval);
494 }
495 
496 /*
497  * global_init()
498  *
499  * Perform some global initializations.
500  *
501  * the following routines have to call this before operation can start:
502  *  - mdmn_send_svc_1
503  *  - mdmn_work_svc_1
504  *  - mdmn_comm_lock_svc_1
505  *  - mdmn_comm_unlock_svc_1
506  *  - mdmn_comm_suspend_svc_1
507  *  - mdmn_comm_resume_svc_1
508  *  - mdmn_comm_reinit_set_svc_1
509  *
510  * This is a single threaded daemon, so it can only be in one of the above
511  * routines at the same time.
512  * This means, global_init() cannot be called more than once at the same time.
513  * Hence, no lock is needed.
514  */
515 void
516 global_init(void)
517 {
518 	set_t			set;
519 	md_mn_msgclass_t	class;
520 	struct sigaction	sighandler;
521 	time_t			clock_val;
522 	struct rlimit		commd_limit;
523 
524 
525 
526 	/* Do these global initializations only once */
527 	if (md_commd_global_state & MD_CGS_INITED) {
528 		return;
529 	}
530 	(void) sdssc_bind_library();
531 
532 	/* setup the debug options from the config file */
533 	setup_debug();
534 
535 	/* make sure that we don't run out of file descriptors */
536 	commd_limit.rlim_cur = commd_limit.rlim_max = RLIM_INFINITY;
537 	if (setrlimit(RLIMIT_NOFILE, &commd_limit) != 0) {
538 		syslog(LOG_WARNING, gettext("setrlimit failed."
539 		    "Could not increase the max file descriptors"));
540 	}
541 
542 	/* Make setup_debug() be the action in case of SIGHUP */
543 	sighandler.sa_flags = 0;
544 	sigfillset(&sighandler.sa_mask);
545 	sighandler.sa_handler = (void (*)(int)) setup_debug;
546 	sigaction(SIGHUP, &sighandler, NULL);
547 
548 	__savetime = gethrtime();
549 	(void) time(&clock_val);
550 	commd_debug(MD_MMV_MISC, "global init called %s\n",
551 			ctime(&clock_val));
552 
553 	/* start a thread that flushes out the debug on a regular basis */
554 	thr_create(NULL, 0, (void *(*)(void *))flush_fcout,
555 	    (void *) NULL, THR_DETACHED, NULL);
556 
557 	/* global rwlock's / mutex's / cond_t's go here */
558 	mutex_init(&check_timeout_mutex, USYNC_THREAD, NULL);
559 	cond_init(&check_timeout_cv, USYNC_THREAD, NULL);
560 	mutex_init(&get_setdesc_mutex, USYNC_THREAD, NULL);
561 
562 	/* Make sure the initiator table is initialized correctly */
563 	for (set = 0; set < MD_MAXSETS; set++) {
564 		for (class = 0; class < MD_MN_NCLASSES; class++) {
565 			mdmn_unregister_initiator_table(set, class);
566 		}
567 	}
568 
569 
570 	/* setup the check for timeouts */
571 	thr_create(NULL, 0, (void *(*)(void *))check_timeouts,
572 	    (void *) NULL, THR_DETACHED, NULL);
573 
574 	md_commd_global_state |= MD_CGS_INITED;
575 }
576 
577 
578 /*
579  * mdmn_init_client(setno, nodeid)
580  * called if client[setno][nodeid] is NULL
581  *
582  * NOTE: Must be called with set_desc_rwlock held as a reader
583  * NOTE: Must be called with client_rwlock held as a writer
584  *
585  * If the rpc client for this node has not been setup for any set, we do it now.
586  *
587  * Returns	0 on success (node found in set, rpc client setup)
588  *		-1 if metaget_setdesc failed,
589  *		-2 if node not part of set
590  *		-3 if clnt_create fails
591  */
592 static int
593 mdmn_init_client(set_t setno, md_mn_nodeid_t nid)
594 {
595 	md_error_t	ep = mdnullerror;
596 	md_mnnode_desc	*node;
597 	md_set_desc	*sd;	/* just an abbr for set_descriptor[setno] */
598 
599 	sd = set_descriptor[setno];
600 
601 	/*
602 	 * Is the appropriate set_descriptor already initialized ?
603 	 * Can't think of a scenario where this is not the case, but we'd better
604 	 * check for it anyway.
605 	 */
606 	if (sd == NULL) {
607 		mdsetname_t	*sp;
608 
609 		rw_unlock(&set_desc_rwlock[setno]); /* readlock -> writelock */
610 		rw_wrlock(&set_desc_rwlock[setno]);
611 		sp = metasetnosetname(setno, &ep);
612 		/* Only one thread is supposed to be in metaget_setdesc() */
613 		mutex_lock(&get_setdesc_mutex);
614 		sd = metaget_setdesc(sp, &ep);
615 		mutex_unlock(&get_setdesc_mutex);
616 		if (sd == NULL) {
617 			rw_unlock(&set_desc_rwlock[setno]); /* back to ... */
618 			rw_rdlock(&set_desc_rwlock[setno]); /* ... readlock */
619 			return (-1);
620 		}
621 		set_descriptor[setno] = sd;
622 		rw_unlock(&set_desc_rwlock[setno]); /* back to readlock */
623 		rw_rdlock(&set_desc_rwlock[setno]);
624 	}
625 
626 	/* first we have to find the node name for this node id */
627 	for (node = sd->sd_nodelist; node; node = node->nd_next) {
628 		if (node->nd_nodeid == nid)
629 			break; /* we found our node in this set */
630 	}
631 
632 
633 	if (node == (md_mnnode_desc *)NULL) {
634 		commd_debug(MD_MMV_SYSLOG,
635 		    "FATAL: node %d not found in set %d\n", nid, setno);
636 		rw_unlock(&set_desc_rwlock[setno]);
637 		return (-2);
638 	}
639 
640 	commd_debug(MD_MMV_INIT, "init: %s has the flags: 0x%x\n",
641 	    node->nd_nodename ? node->nd_nodename : "NULL", node->nd_flags);
642 
643 	/* Did this node join the diskset?  */
644 	if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
645 		commd_debug(MD_MMV_INIT, "init: %s didn't join set %d\n",
646 		    node->nd_nodename ? node->nd_nodename : "NULL", setno);
647 		rw_unlock(&set_desc_rwlock[setno]);
648 		return (-2);
649 	}
650 
651 	/* if clnt_create has not been done for that node, do it now */
652 	if (client[setno][nid] == (CLIENT *) NULL) {
653 		time_t	tout = 0;
654 
655 		/*
656 		 * While trying to create a connection to a node,
657 		 * periodically check to see if the node has been marked
658 		 * dead by the SunCluster infrastructure.
659 		 * This periodic check is needed since a non-responsive
660 		 * rpc.mdcommd (while it is attempting to create a connection
661 		 * to a dead node) can lead to large delays and/or failures
662 		 * in the reconfig steps.
663 		 */
664 		while ((client[setno][nid] == (CLIENT *) NULL) &&
665 		    (tout < MD_CLNT_CREATE_TOUT)) {
666 			client[setno][nid] = meta_client_create_retry
667 				(node->nd_nodename, mdmn_clnt_create,
668 				(void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
669 			/* Is the node dead? */
670 			if (mdmn_is_node_dead(node) == 1) {
671 				commd_debug(MD_MMV_SYSLOG,
672 				    "rpc.mdcommd: no client for dead node %s\n",
673 				    node->nd_nodename);
674 				break;
675 			} else
676 				tout += MD_CLNT_CREATE_SUBTIMEOUT;
677 		}
678 
679 		if (client[setno][nid] == (CLIENT *) NULL) {
680 			clnt_pcreateerror(node->nd_nodename);
681 			rw_unlock(&set_desc_rwlock[setno]);
682 			return (-3);
683 		}
684 		/* this node has the license to send */
685 		commd_debug(MD_MMV_MISC, "init_client: calling add_lic\n");
686 		add_license(node);
687 
688 		/* set the timeout value */
689 		clnt_control(client[setno][nid], CLSET_TIMEOUT,
690 		    (char *)&FOUR_SECS);
691 
692 	}
693 	rw_unlock(&set_desc_rwlock[setno]);
694 	return (0);
695 }
696 
697 /*
698  * check_client(setno, nodeid)
699  *
700  * must be called with reader lock held for set_desc_rwlock[setno]
701  * and must be called with reader lock held for client_rwlock[setno]
702  * Checks if the client for this set/node combination is already setup
703  * if not it upgrades the lock to a writer lock
704  * and tries to initialize the client.
705  * Finally it's checked if the client nulled out again due to some race
706  *
707  * returns 0 if there is a usable client
708  * returns MDMNE_RPC_FAIL otherwise
709  */
710 static int
711 check_client(set_t setno, md_mn_nodeid_t nodeid)
712 {
713 	int ret = 0;
714 
715 	while ((client[setno][nodeid] == (CLIENT *)NULL) && (ret == 0)) {
716 		rw_unlock(&client_rwlock[setno]); /* upgrade reader ... */
717 		rw_wrlock(&client_rwlock[setno]); /* ... to writer lock. */
718 		if (mdmn_init_client(setno, nodeid) != 0) {
719 			ret = MDMNE_RPC_FAIL;
720 		}
721 		rw_unlock(&client_rwlock[setno]); /* downgrade writer ... */
722 		rw_rdlock(&client_rwlock[setno]); /* ... back to reader lock. */
723 	}
724 	return (ret);
725 }
726 
727 /*
728  * mdmn_init_set(setno, todo)
729  * setno is the number of the set to be initialized.
730  * todo is one of the MDMN_SET_* thingies or MDMN_SET_READY
731  * If called with MDMN_SET_READY everything is initialized.
732  *
733  * If the set mutexes are already initialized, the caller has to hold
734  * both set_desc_rwlock[setno] and client_rwlock[setno] as a writer, before
735  * calling mdmn_init_set()
736  */
737 int
738 mdmn_init_set(set_t setno, int todo)
739 {
740 	int class;
741 	md_mnnode_desc	*node;
742 	md_set_desc	*sd; /* just an abbr for set_descriptor[setno] */
743 	mdsetname_t	*sp;
744 	md_error_t	ep = mdnullerror;
745 	md_mn_nodeid_t	nid;
746 
747 	/*
748 	 * Check if we are told to setup the mutexes and
749 	 * if these are not yet setup
750 	 */
751 	if ((todo & MDMN_SET_MUTEXES) &&
752 	    ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0)) {
753 		mutex_init(&mdmn_busy_mutex[setno], USYNC_THREAD, NULL);
754 		cond_init(&mdmn_busy_cv[setno], USYNC_THREAD, NULL);
755 		rwlock_init(&client_rwlock[setno], USYNC_THREAD, NULL);
756 		rwlock_init(&set_desc_rwlock[setno], USYNC_THREAD, NULL);
757 
758 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
759 			mutex_init(mdmn_get_master_table_mx(setno, class),
760 			    USYNC_THREAD, NULL);
761 			cond_init(mdmn_get_master_table_cv(setno, class),
762 			    USYNC_THREAD, NULL);
763 			mutex_init(mdmn_get_initiator_table_mx(setno, class),
764 			    USYNC_THREAD, NULL);
765 		}
766 		md_mn_set_inited[setno] |= MDMN_SET_MUTEXES;
767 	}
768 	if ((todo & MDMN_SET_MCT) &&
769 	    ((md_mn_set_inited[setno] & MDMN_SET_MCT) == 0)) {
770 		int	fd;
771 		size_t	filesize;
772 		caddr_t	addr;
773 		char table_name[32];
774 
775 		filesize = (sizeof (md_mn_mct_t));
776 		(void) snprintf(table_name, sizeof (table_name), "%s%d",
777 		    MD_MN_MSG_COMP_TABLE, setno);
778 		/*
779 		 * If the mct file exists we map it into memory.
780 		 * Otherwise we create an empty file of appropriate
781 		 * size and map that into memory.
782 		 * The mapped areas are stored in mct[setno].
783 		 */
784 		fd = open(table_name, O_RDWR|O_CREAT|O_DSYNC, 0600);
785 		if (fd < 0) {
786 			commd_debug(MD_MMV_MISC,
787 			    "init_set: Can't open MCT\n");
788 			return (-1);
789 		}
790 		/*
791 		 * To ensure that the file has the appropriate size,
792 		 * we write a byte at the end of the file.
793 		 */
794 		lseek(fd, filesize + 1, SEEK_SET);
795 		write(fd, "\0", 1);
796 
797 		/* at this point we have a file in place that we can mmap */
798 		addr = mmap(0, filesize, PROT_READ | PROT_WRITE,
799 		    MAP_SHARED, fd, (off_t)0);
800 		if (addr == MAP_FAILED) {
801 			commd_debug(MD_MMV_INIT,
802 			    "init_set: mmap mct error %d\n",
803 			    errno);
804 			return (-1);
805 		}
806 		/* LINTED pointer alignment */
807 		mct[setno] = (md_mn_mct_t *)addr;
808 
809 		/* finally we initialize the mutexes that protect the mct */
810 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
811 			mutex_init(&(mct_mutex[setno][class]),
812 			    USYNC_THREAD, NULL);
813 		}
814 
815 		md_mn_set_inited[setno] |= MDMN_SET_MCT;
816 	}
817 	/*
818 	 * Check if we are told to setup the nodes and
819 	 * if these are not yet setup
820 	 * (Attention: negative logic here compared to above!)
821 	 */
822 	if (((todo & MDMN_SET_NODES) == 0) ||
823 	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
824 		return (0); /* success */
825 	}
826 
827 	if ((sp = metasetnosetname(setno, &ep)) == NULL) {
828 		commd_debug(MD_MMV_SYSLOG,
829 		    "metasetnosetname(%d) returned NULL\n", setno);
830 		return (MDMNE_NOT_JOINED);
831 	}
832 
833 	/* flush local copy of rpc.metad data */
834 	metaflushsetname(sp);
835 
836 	mutex_lock(&get_setdesc_mutex);
837 	sd = metaget_setdesc(sp, &ep);
838 	mutex_unlock(&get_setdesc_mutex);
839 
840 	if (sd == NULL) {
841 		commd_debug(MD_MMV_SYSLOG,
842 		    "metaget_setdesc(%d) returned NULL\n", setno);
843 		return (MDMNE_NOT_JOINED);
844 	}
845 
846 	/*
847 	 * if this set is not a multinode set or
848 	 * this node didn't join yet the diskset, better don't do anything
849 	 */
850 	if ((MD_MNSET_DESC(sd) == 0) ||
851 	    (sd->sd_mn_mynode->nd_flags & MD_MN_NODE_OWN) == 0) {
852 		commd_debug(MD_MMV_INIT, "didn't yet join set %d\n", setno);
853 		return (MDMNE_NOT_JOINED);
854 	}
855 
856 	for (node = sd->sd_nodelist; node != NULL; node = node->nd_next) {
857 		time_t	tout = 0;
858 		nid = node->nd_nodeid;
859 
860 		commd_debug(MD_MMV_INIT,
861 		    "setting up: node=%s, priv_ic=%s, flags=0x%x\n",
862 		    node->nd_nodename ? node->nd_nodename : "NULL",
863 		    node->nd_priv_ic ? node->nd_priv_ic : "NULL",
864 		    node->nd_flags);
865 
866 		if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
867 			commd_debug(MD_MMV_INIT,
868 			    "init: %s didn't join set %d\n",
869 			    node->nd_nodename ? node->nd_nodename : "NULL",
870 			    setno);
871 			continue;
872 		}
873 
874 		if (client[setno][nid] != (CLIENT *) NULL) {
875 			/* already inited */
876 			commd_debug(MD_MMV_INIT, "init: already: node=%s\n",
877 			    node->nd_nodename ? node->nd_nodename : "NULL");
878 			continue;
879 		}
880 
881 		/*
882 		 * While trying to create a connection to a node,
883 		 * periodically check to see if the node has been marked
884 		 * dead by the SunCluster infrastructure.
885 		 * This periodic check is needed since a non-responsive
886 		 * rpc.mdcommd (while it is attempting to create a connection
887 		 * to a dead node) can lead to large delays and/or failures
888 		 * in the reconfig steps.
889 		 */
890 		while ((client[setno][nid] == (CLIENT *) NULL) &&
891 		    (tout < MD_CLNT_CREATE_TOUT)) {
892 			client[setno][nid] = meta_client_create_retry
893 				(node->nd_nodename, mdmn_clnt_create,
894 				(void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
895 			/* Is the node dead? */
896 			if (mdmn_is_node_dead(node) == 1) {
897 				commd_debug(MD_MMV_SYSLOG,
898 				    "rpc.mdcommd: no client for dead node %s\n",
899 				    node->nd_nodename);
900 				break;
901 			} else
902 				tout += MD_CLNT_CREATE_SUBTIMEOUT;
903 		}
904 
905 		if (client[setno][nid] == (CLIENT *) NULL) {
906 			clnt_pcreateerror(node->nd_nodename);
907 			/*
908 			 * If we cannot connect to a single node
909 			 * (maybe because it is down) we mark this node as not
910 			 * owned and continue with the next node in the list.
911 			 * This is better than failing the entire starting up
912 			 * of the commd system.
913 			 */
914 			node->nd_flags &= ~MD_MN_NODE_OWN;
915 			commd_debug(MD_MMV_SYSLOG,
916 			    "WARNING couldn't create client for %s\n"
917 			    "Reconfig cycle required\n",
918 			    node->nd_nodename);
919 			commd_debug(MD_MMV_INIT,
920 			    "WARNING couldn't create client for %s\n"
921 			    "Reconfig cycle required\n",
922 			    node->nd_nodename);
923 			continue;
924 		}
925 		/* this node has the license to send */
926 		commd_debug(MD_MMV_MISC, "init_set: calling add_lic\n");
927 		add_license(node);
928 
929 		/* set the timeout value */
930 		clnt_control(client[setno][nid], CLSET_TIMEOUT,
931 		    (char *)&FOUR_SECS);
932 
933 		commd_debug(MD_MMV_INIT, "init: done: node=%s\n",
934 		    node->nd_nodename ? node->nd_nodename : "NULL");
935 	}
936 
937 	set_descriptor[setno] = sd;
938 	md_mn_set_inited[setno] |= MDMN_SET_NODES;
939 	return (0); /* success */
940 }
941 
942 void *
943 mdmn_send_to_work(void *arg)
944 {
945 	int			*rpc_err;
946 	int			success;
947 	int			try_master;
948 	set_t			setno;
949 	mutex_t			*mx;	/* protection for initiator_table */
950 	SVCXPRT			*transp;
951 	md_mn_msg_t		*msg;
952 	md_mn_nodeid_t		set_master;
953 	md_mn_msgclass_t	class;
954 	md_mn_msg_and_transp_t	*matp = (md_mn_msg_and_transp_t *)arg;
955 
956 	msg			= matp->mat_msg;
957 	transp			= matp->mat_transp;
958 
959 	/* the alloc was done in mdmn_send_svc_1 */
960 	free(matp);
961 
962 	class = mdmn_get_message_class(msg->msg_type);
963 	setno = msg->msg_setno;
964 
965 	/* set the sender, so the master knows who to send the results */
966 	rw_rdlock(&set_desc_rwlock[setno]);
967 	msg->msg_sender = set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
968 	set_master	= set_descriptor[setno]->sd_mn_master_nodeid;
969 
970 	mx = mdmn_get_initiator_table_mx(setno, class);
971 	mutex_lock(mx);
972 
973 	/*
974 	 * Here we check, if the initiator table slot for this set/class
975 	 * combination is free to use.
976 	 * If this is not the case, we return CLASS_BUSY forcing the
977 	 * initiating send_message call to retry
978 	 */
979 	success = mdmn_check_initiator_table(setno, class);
980 	if (success == MDMNE_CLASS_BUSY) {
981 		md_mn_msgid_t		active_mid;
982 
983 		mdmn_get_initiator_table_id(setno, class,
984 		&active_mid);
985 
986 		commd_debug(MD_MMV_SEND,
987 		    "send_to_work: received but locally busy "
988 		    "(%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
989 		    "active msg=(%d, 0x%llx-%d)\n",
990 		    MSGID_ELEMS(msg->msg_msgid), setno, class,
991 		    msg->msg_type, MSGID_ELEMS(active_mid));
992 	} else {
993 		commd_debug(MD_MMV_SEND,
994 		    "send_to_work: received (%d, 0x%llx-%d), "
995 		    "set=%d, class=%d, type=%d\n",
996 		    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
997 	}
998 
999 	try_master = 2; /* return failure after two retries */
1000 	while ((success == MDMNE_ACK) && (try_master--)) {
1001 		rw_rdlock(&client_rwlock[setno]);
1002 		/* is the rpc client to the master still around ? */
1003 		if (check_client(setno, set_master)) {
1004 			success = MDMNE_RPC_FAIL;
1005 			FLUSH_DEBUGFILE();
1006 			rw_unlock(&client_rwlock[setno]);
1007 			break; /* out of try_master-loop */
1008 		}
1009 
1010 		/*
1011 		 * Send the request to the work function on the master
1012 		 * this call will return immediately
1013 		 */
1014 		rpc_err = mdmn_work_1(msg, client[setno][set_master]);
1015 
1016 		/* Everything's Ok? */
1017 		if (rpc_err == NULL) {
1018 			success = MDMNE_RPC_FAIL;
1019 			/*
1020 			 * Probably something happened to the daemon on the
1021 			 * master. Kill the client, and try again...
1022 			 */
1023 			rw_unlock(&client_rwlock[setno]);
1024 			rw_wrlock(&client_rwlock[setno]);
1025 			mdmn_clnt_destroy(client[setno][set_master]);
1026 			if (client[setno][set_master] != (CLIENT *)NULL) {
1027 				client[setno][set_master] = (CLIENT *)NULL;
1028 			}
1029 			rw_unlock(&client_rwlock[setno]);
1030 			continue;
1031 
1032 		} else  if (*rpc_err != MDMNE_ACK) {
1033 			/* something went wrong, break out */
1034 			success = *rpc_err;
1035 			free(rpc_err);
1036 			rw_unlock(&client_rwlock[setno]);
1037 			break; /* out of try_master-loop */
1038 		}
1039 
1040 		rw_unlock(&client_rwlock[setno]);
1041 		free(rpc_err);
1042 
1043 		/*
1044 		 * If we are here, we sucessfully delivered the message.
1045 		 * We register the initiator_table, so that
1046 		 * wakeup_initiator_1  can do the sendreply with the
1047 		 * results for us.
1048 		 */
1049 		success = MDMNE_ACK;
1050 		mdmn_register_initiator_table(setno, class, msg, transp);
1051 
1052 		/* tell check_timeouts, there's work to do */
1053 		mutex_lock(&check_timeout_mutex);
1054 		messages_on_their_way++;
1055 		cond_signal(&check_timeout_cv);
1056 		mutex_unlock(&check_timeout_mutex);
1057 		break; /* out of try_master-loop */
1058 	}
1059 
1060 	rw_unlock(&set_desc_rwlock[setno]);
1061 
1062 	if (success == MDMNE_ACK) {
1063 		commd_debug(MD_MMV_SEND,
1064 		    "send_to_work: registered (%d, 0x%llx-%d)\n",
1065 		    MSGID_ELEMS(msg->msg_msgid));
1066 	} else {
1067 		/* In case of failure do the sendreply now */
1068 		md_mn_result_t *resultp;
1069 		resultp = Zalloc(sizeof (md_mn_result_t));
1070 		resultp->mmr_comm_state = success;
1071 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
1072 		commd_debug(MD_MMV_SEND,
1073 		    "send_to_work: not registered (%d, 0x%llx-%d) cs=%d\n",
1074 		    MSGID_ELEMS(msg->msg_msgid), success);
1075 		free_result(resultp);
1076 
1077 	}
1078 
1079 	free_msg(msg);
1080 	mutex_unlock(mx);
1081 	return (NULL);
1082 
1083 }
1084 
1085 /*
1086  * do_message_locally(msg, result)
1087  * Process a message locally on the master
1088  * Lookup the MCT if the message has already been processed.
1089  * If not, call the handler and store the result
1090  * If yes, retrieve the result from the MCT.
1091  * Return:
1092  *	MDMNE_ACK in case of success
1093  *	MDMNE_LOG_FAIL if the MCT could not be checked
1094  */
1095 static int
1096 do_message_locally(md_mn_msg_t *msg, md_mn_result_t *result)
1097 {
1098 	int			completed;
1099 	set_t			setno;
1100 	md_mn_msgtype_t		msgtype = msg->msg_type;
1101 	md_mn_msgclass_t	class;
1102 
1103 	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1104 
1105 	handler = mdmn_get_handler(msgtype);
1106 	if (handler == NULL) {
1107 		result->mmr_exitval = 0;
1108 		/* let the sender decide if this is an error or not */
1109 		result->mmr_comm_state = MDMNE_NO_HANDLER;
1110 		return (MDMNE_NO_HANDLER);
1111 	}
1112 
1113 	class = mdmn_get_message_class(msg->msg_type);
1114 	setno = msg->msg_setno;
1115 
1116 	result->mmr_msgtype	= msgtype;
1117 	result->mmr_flags	= msg->msg_flags;
1118 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1119 
1120 	mutex_lock(&mct_mutex[setno][class]);
1121 	completed = mdmn_check_completion(msg, result);
1122 	if (completed == MDMN_MCT_NOT_DONE) {
1123 		/* message not yet processed locally */
1124 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1125 		    "calling handler for (%d,0x%llx-%d) type %d\n",
1126 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1127 
1128 		/*
1129 		 * Mark the message as being currently processed,
1130 		 * so we won't start a second handler for it
1131 		 */
1132 		(void) mdmn_mark_completion(msg, NULL, MDMN_MCT_IN_PROGRESS);
1133 		mutex_unlock(&mct_mutex[setno][class]);
1134 
1135 		/* here we actually process the message on the master */
1136 		(*handler)(msg, MD_MSGF_ON_MASTER, result);
1137 
1138 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1139 		    "finished handler for (%d,0x%llx-%d) type %d\n",
1140 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1141 
1142 		/* Mark the message as fully processed, store the result */
1143 		mutex_lock(&mct_mutex[setno][class]);
1144 		(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
1145 	} else if (completed == MDMN_MCT_DONE) {
1146 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1147 		    "result for (%d, 0x%llx-%d) from MCT\n",
1148 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1149 	} else if (completed == MDMN_MCT_IN_PROGRESS) {
1150 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1151 		    "(%d, 0x%llx-%d) is currently being processed\n",
1152 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1153 	} else {
1154 		/* MCT error occurred (should never happen) */
1155 		mutex_unlock(&mct_mutex[setno][class]);
1156 		result->mmr_comm_state = MDMNE_LOG_FAIL;
1157 		commd_debug(MD_MMV_SYSLOG, "WARNING "
1158 		    "mdmn_check_completion returned %d "
1159 		    "for (%d,0x%llx-%d)\n", completed,
1160 		    MSGID_ELEMS(msg->msg_msgid));
1161 		return (MDMNE_LOG_FAIL);
1162 	}
1163 	mutex_unlock(&mct_mutex[setno][class]);
1164 	return (MDMNE_ACK);
1165 
1166 }
1167 
1168 /*
1169  * do_send_message(msg, node)
1170  *
1171  * Send a message to a given node and wait for a acknowledgment, that the
1172  * message has arrived on the remote node.
1173  * Make sure that the client for the set is setup correctly.
1174  * If no ACK arrives, destroy and recreate the RPC client and retry the
1175  * message one time
1176  * After actually sending wait no longer than the appropriate number of
1177  * before timing out the message.
1178  *
1179  * Note must be called with set_desc_wrlock held in reader mode
1180  */
1181 static int
1182 do_send_message(md_mn_msg_t *msg, md_mnnode_desc *node)
1183 {
1184 	int			err;
1185 	int			rpc_retries;
1186 	int			timeout_retries = 0;
1187 	int			*ret = NULL;
1188 	set_t			setno;
1189 	cond_t			*cv;	/* see mdmn_wakeup_master_svc_1 */
1190 	mutex_t			*mx;	/* protection for class_busy */
1191 	timestruc_t		timeout; /* surveillance for remote daemon */
1192 	md_mn_nodeid_t		nid;
1193 	md_mn_msgtype_t		msgtype;
1194 	md_mn_msgclass_t	class;
1195 
1196 	nid	= node->nd_nodeid;
1197 	msgtype = msg->msg_type;
1198 	setno	= msg->msg_setno;
1199 	class	= mdmn_get_message_class(msgtype);
1200 	mx	= mdmn_get_master_table_mx(setno, class);
1201 	cv	= mdmn_get_master_table_cv(setno, class);
1202 
1203 retry_rpc:
1204 
1205 	/* We try two times to send the message */
1206 	rpc_retries = 2;
1207 
1208 	/*
1209 	 * if sending the message doesn't succeed the first time due to a
1210 	 * RPC problem, we retry one time
1211 	 */
1212 	while ((rpc_retries != 0) && (ret == NULL)) {
1213 		/*  in abort state, we error out immediately */
1214 		if (md_commd_global_state & MD_CGS_ABORTED) {
1215 			return (MDMNE_ABORT);
1216 		}
1217 
1218 		rw_rdlock(&client_rwlock[setno]);
1219 		/* unable to create client? Ignore it */
1220 		if (check_client(setno, nid)) {
1221 			/*
1222 			 * In case we cannot establish an RPC client, we
1223 			 * take this node out of our considerations.
1224 			 * This will be reset by a reconfig
1225 			 * cycle that should come pretty soon.
1226 			 * MNISSUE: Should a reconfig cycle
1227 			 * be forced on SunCluster?
1228 			 */
1229 			node->nd_flags &= ~MD_MN_NODE_OWN;
1230 			commd_debug(MD_MMV_SYSLOG,
1231 			    "WARNING couldn't create client for %s\n"
1232 			    "Reconfig cycle required\n",
1233 			    node->nd_nodename);
1234 			commd_debug(MD_MMV_PROC_M, "proc_mas: (%d,0x%llx-%d) "
1235 			    "WARNING couldn't create client for %s\n",
1236 			    MSGID_ELEMS(msg->msg_msgid), node->nd_nodename);
1237 			rw_unlock(&client_rwlock[setno]);
1238 			return (MDMNE_IGNORE_NODE);
1239 		}
1240 		/* let's be paranoid and check again before sending */
1241 		if (client[setno][nid] == NULL) {
1242 			/*
1243 			 * if this is true, strange enough, we catch our breath,
1244 			 * and then continue, so that the client is set up
1245 			 * once again.
1246 			 */
1247 			commd_debug(MD_MMV_PROC_M, "client is NULL\n");
1248 			rw_unlock(&client_rwlock[setno]);
1249 			sleep(1);
1250 			continue;
1251 		}
1252 
1253 		/* send it over, it will return immediately */
1254 		ret = mdmn_work_1(msg, client[setno][nid]);
1255 
1256 		rw_unlock(&client_rwlock[setno]);
1257 
1258 		if (ret != NULL) {
1259 			commd_debug(MD_MMV_PROC_M,
1260 			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1261 			    " 0x%x\n",
1262 			    MSGID_ELEMS(msg->msg_msgid), nid, *ret);
1263 		} else {
1264 			commd_debug(MD_MMV_PROC_M,
1265 			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1266 			    " NULL \n",
1267 			    MSGID_ELEMS(msg->msg_msgid), nid);
1268 		}
1269 
1270 		if ((ret == NULL) || (*ret == MDMNE_CANNOT_CONNECT) ||
1271 		    (*ret == MDMNE_THR_CREATE_FAIL)) {
1272 			/*
1273 			 * Something happened to the daemon on the other side.
1274 			 * Kill the client, and try again.
1275 			 * check_client() will create a new client
1276 			 */
1277 			rw_wrlock(&client_rwlock[setno]);
1278 			mdmn_clnt_destroy(client[setno][nid]);
1279 			if (client[setno][nid] != (CLIENT *)NULL) {
1280 				client[setno][nid] = (CLIENT *)NULL;
1281 			}
1282 			rw_unlock(&client_rwlock[setno]);
1283 
1284 			/* ... but don't try infinitely */
1285 			--rpc_retries;
1286 			continue;
1287 		}
1288 		/*
1289 		 * If the class is locked on the other node, keep trying.
1290 		 * This situation will go away automatically,
1291 		 * if we wait long enough
1292 		 */
1293 		if (*ret == MDMNE_CLASS_LOCKED) {
1294 			sleep(1);
1295 			free(ret);
1296 			ret = NULL;
1297 			continue;
1298 		}
1299 	}
1300 	if (ret == NULL) {
1301 		return (MDMNE_RPC_FAIL);
1302 	}
1303 
1304 
1305 	/* if the slave is in abort state, we just ignore it. */
1306 	if (*ret == MDMNE_ABORT) {
1307 		commd_debug(MD_MMV_PROC_M,
1308 		    "proc_mas: work(%d,0x%llx-%d) returned "
1309 		    "MDMNE_ABORT\n",
1310 		    MSGID_ELEMS(msg->msg_msgid));
1311 		free(ret);
1312 		return (MDMNE_IGNORE_NODE);
1313 	}
1314 
1315 	/* Did the remote processing succeed? */
1316 	if (*ret != MDMNE_ACK) {
1317 		/*
1318 		 * Some commd failure in the middle of sending the msg
1319 		 * to the nodes. We don't continue here.
1320 		 */
1321 		commd_debug(MD_MMV_PROC_M,
1322 		    "proc_mas: work(%d,0x%llx-%d) returns %d\n",
1323 		    MSGID_ELEMS(msg->msg_msgid), *ret);
1324 		free(ret);
1325 		return (MDMNE_RPC_FAIL);
1326 	}
1327 	free(ret);
1328 	ret = NULL;
1329 
1330 	/*
1331 	 * When we are here, we have sent the message to the other node and
1332 	 * we know that node has accepted it.
1333 	 * We go to sleep and have trust to be woken up by wakeup.
1334 	 * If we wakeup due to a timeout, or a signal, no result has been
1335 	 * placed in the appropriate slot.
1336 	 * If we timeout, it is likely that this is because the node has
1337 	 * gone away, so we will destroy the client and try it again in the
1338 	 * expectation that the rpc will fail and we will return
1339 	 * MDMNE_IGNORE_NODE. If that is not the case, the message must still
1340 	 * be being processed on the slave. In this case just timeout for 4
1341 	 * more seconds and then return RPC_FAIL if the message is not complete.
1342 	 */
1343 	timeout.tv_nsec = 0;
1344 	timeout.tv_sec = (timeout_retries == 0) ? mdmn_get_timeout(msgtype) :
1345 	    FOUR_SECS.tv_sec;
1346 	err = cond_reltimedwait(cv, mx, &timeout);
1347 
1348 	if (err == 0) {
1349 		/* everything's fine, return success */
1350 		return (MDMNE_ACK);
1351 	}
1352 
1353 	if (err == ETIME) {
1354 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1355 		    "timeout occured, set=%d, class=%d, "
1356 		    "msgid=(%d, 0x%llx-%d), timeout_retries=%d\n",
1357 		    setno, class, MSGID_ELEMS(msg->msg_msgid), timeout_retries);
1358 		if (timeout_retries == 0) {
1359 			timeout_retries++;
1360 			/*
1361 			 * Destroy the client and try the rpc call again
1362 			 */
1363 			rw_wrlock(&client_rwlock[setno]);
1364 			mdmn_clnt_destroy(client[setno][nid]);
1365 			client[setno][nid] = (CLIENT *)NULL;
1366 			rw_unlock(&client_rwlock[setno]);
1367 			goto retry_rpc;
1368 		}
1369 	} else if (err == EINTR) {
1370 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1371 		    "commd signalled, set=%d, class=%d, "
1372 		    "msgid=(%d, 0x%llx-%d)\n",
1373 		    setno, class, MSGID_ELEMS(msg->msg_msgid));
1374 	} else {
1375 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1376 		    "cond_reltimedwait err=%d, set=%d, "
1377 		    "class=%d, msgid=(%d, 0x%llx-%d)\n",
1378 		    err, setno, class,
1379 		    MSGID_ELEMS(msg->msg_msgid));
1380 	}
1381 
1382 	/* some failure happened */
1383 	return (MDMNE_RPC_FAIL);
1384 }
1385 
1386 /*
1387  * before we return we have to
1388  * free_msg(msg); because we are working on a copied message
1389  */
1390 void
1391 mdmn_master_process_msg(md_mn_msg_t *msg)
1392 {
1393 	int		*ret;
1394 	int		err;
1395 	int		nmsgs;		/* total number of msgs */
1396 	int		curmsg;		/* index of current msg */
1397 	set_t		setno;
1398 	uint_t		inherit_flags = 0;
1399 	uint_t		secdiff, usecdiff; /* runtime of this message */
1400 	md_error_t	mde = mdnullerror;
1401 	md_mn_msg_t	*msglist[MAX_SUBMESSAGES]; /* all msgs to process */
1402 	md_mn_msg_t	*cmsg;		/* current msg */
1403 	md_mn_msgid_t	dummyid;
1404 	md_mn_result_t	*result;
1405 	md_mn_result_t	*slave_result;
1406 	md_mn_nodeid_t	sender;
1407 	md_mn_nodeid_t	set_master;
1408 	md_mnnode_desc	*node;
1409 	md_mn_msgtype_t	orig_type;	/* type of the original message */
1410 	md_mn_msgtype_t	msgtype;	/* type of the current message */
1411 	md_mn_msgclass_t orig_class;	/* class of the original message */
1412 	md_mn_msgclass_t class;		/* class of the current message */
1413 
1414 	int (*smgen)(md_mn_msg_t *msg, md_mn_msg_t **msglist);
1415 
1416 	orig_type = msgtype = msg->msg_type;
1417 	sender	= msg->msg_sender;
1418 	setno	= msg->msg_setno;
1419 
1420 	result = Zalloc(sizeof (md_mn_result_t));
1421 	result->mmr_setno	= setno;
1422 	result->mmr_msgtype	= msgtype;
1423 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1424 
1425 	orig_class = mdmn_get_message_class(msgtype);
1426 
1427 	commd_debug(MD_MMV_PROC_M,
1428 	    "proc_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1429 	    MSGID_ELEMS(msg->msg_msgid), setno, orig_class, msgtype);
1430 
1431 	rw_rdlock(&set_desc_rwlock[setno]);
1432 	set_master = set_descriptor[setno]->sd_mn_master_nodeid;
1433 	result->mmr_sender	= set_master;
1434 	/*
1435 	 * Put message into the change log unless told otherwise
1436 	 * Note that we only log original messages.
1437 	 * If they are generated by some smgen, we don't log them!
1438 	 * Replay messages aren't logged either.
1439 	 * Note, that replay messages are unlogged on completion.
1440 	 */
1441 	if ((msg->msg_flags & (MD_MSGF_NO_LOG | MD_MSGF_REPLAY_MSG)) == 0) {
1442 		commd_debug(MD_MMV_PROC_M,
1443 		    "proc_mas: calling log_msg for (%d,0x%llx-%d) type %d\n",
1444 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1445 		err = mdmn_log_msg(msg);
1446 		if (err == MDMNE_NULL) {
1447 			/* msg logged successfully */
1448 			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1449 			    "done log_msg for (%d,0x%llx-%d) type %d\n",
1450 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1451 			goto proceed;
1452 		}
1453 		if (err == MDMNE_ACK) {
1454 			/* Same msg in the slot, proceed */
1455 			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1456 			    "already logged (%d,0x%llx-%d) type %d\n",
1457 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1458 			goto proceed;
1459 		}
1460 		if (err == MDMNE_LOG_FAIL) {
1461 			/* Oh, bad, the log is non functional. */
1462 			result->mmr_comm_state = MDMNE_LOG_FAIL;
1463 			/*
1464 			 * Note that the mark_busy was already done by
1465 			 * mdmn_work_svc_1()
1466 			 */
1467 			mutex_lock(&mdmn_busy_mutex[setno]);
1468 			mdmn_mark_class_unbusy(setno, orig_class);
1469 			mutex_unlock(&mdmn_busy_mutex[setno]);
1470 
1471 		}
1472 		if (err == MDMNE_CLASS_BUSY) {
1473 			/*
1474 			 * The log is occupied with a different message
1475 			 * that needs to be played first.
1476 			 * We reject the current message with MDMNE_CLASS_BUSY
1477 			 * to the initiator and do not unbusy the set/class,
1478 			 * because we will proceed with the logged message,
1479 			 * which has the same set/class combination
1480 			 */
1481 			result->mmr_comm_state = MDMNE_CLASS_BUSY;
1482 		}
1483 		ret = (int *)NULL;
1484 		rw_rdlock(&client_rwlock[setno]);
1485 
1486 		if (check_client(setno, sender)) {
1487 			commd_debug(MD_MMV_SYSLOG,
1488 			    "proc_mas: No client for initiator \n");
1489 		} else {
1490 			ret = mdmn_wakeup_initiator_1(result,
1491 			    client[setno][sender]);
1492 		}
1493 		rw_unlock(&client_rwlock[setno]);
1494 
1495 		if (ret == (int *)NULL) {
1496 			commd_debug(MD_MMV_SYSLOG,
1497 			    "proc_mas: couldn't wakeup_initiator \n");
1498 		} else {
1499 			if (*ret != MDMNE_ACK) {
1500 				commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1501 				    "wakeup_initiator returned %d\n", *ret);
1502 			}
1503 			free(ret);
1504 		}
1505 		free_msg(msg);
1506 
1507 		if (err == MDMNE_LOG_FAIL) {
1508 			/* we can't proceed here */
1509 			free_result(result);
1510 			rw_unlock(&set_desc_rwlock[setno]);
1511 			return;
1512 		} else if (err == MDMNE_CLASS_BUSY) {
1513 			mdmn_changelog_record_t *lr;
1514 			lr = mdmn_get_changelogrec(setno, orig_class);
1515 			assert(lr != NULL);
1516 
1517 			/* proceed with the logged message */
1518 			msg = copy_msg(&(lr->lr_msg), NULL);
1519 
1520 			/*
1521 			 * The logged message has to have the same class but
1522 			 * type and sender can be different
1523 			 */
1524 			orig_type = msgtype = msg->msg_type;
1525 			sender	= msg->msg_sender;
1526 
1527 			commd_debug(MD_MMV_PROC_M,
1528 			    "proc_mas: Got new message from change log: "
1529 			    "(%d,0x%llx-%d) type %d\n",
1530 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1531 
1532 			/* continue normal operation with this message */
1533 		}
1534 	}
1535 
1536 proceed:
1537 	smgen = mdmn_get_submessage_generator(msgtype);
1538 	if (smgen == NULL) {
1539 		/* no submessages to create, just use the original message */
1540 		msglist[0] = msg;
1541 		nmsgs = 1;
1542 	} else {
1543 		/* some bits are passed on to submessages */
1544 		inherit_flags = msg->msg_flags & MD_MSGF_INHERIT_BITS;
1545 
1546 		nmsgs = smgen(msg, msglist);
1547 
1548 		/* some settings for the submessages */
1549 		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1550 			cmsg    = msglist[curmsg];
1551 
1552 			/* Apply the inherited flags */
1553 			cmsg->msg_flags |= inherit_flags;
1554 
1555 			/*
1556 			 * Make sure the submessage ID is set correctly
1557 			 * Note: first submessage has mid_smid of 1 (not 0)
1558 			 */
1559 			cmsg->msg_msgid.mid_smid = curmsg + 1;
1560 
1561 			/* need the original class set in msgID (for MCT) */
1562 			cmsg->msg_msgid.mid_oclass = orig_class;
1563 		}
1564 
1565 		commd_debug(MD_MMV_PROC_M,
1566 		    "smgen generated %d submsgs, origclass = %d\n",
1567 		    nmsgs, orig_class);
1568 	}
1569 	/*
1570 	 * This big loop does the following.
1571 	 * For all messages:
1572 	 *	process message on the master first (a message completion
1573 	 *		table MCT ensures a message is not processed twice)
1574 	 *	in case of an error break out of message loop
1575 	 *	for all nodes -- unless MD_MSGF_NO_BCAST is set --
1576 	 *		send message to node until that succeeds
1577 	 *		merge result -- not yet implemented
1578 	 *		respect MD_MSGF_STOP_ON_ERROR
1579 	 */
1580 	for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1581 		int	break_msg_loop = 0;
1582 		mutex_t	*mx;		/* protection for class_busy */
1583 		int	master_err;
1584 		int	master_exitval = -1;
1585 
1586 		cmsg	= msglist[curmsg];
1587 		msgtype = cmsg->msg_type;
1588 		class	= mdmn_get_message_class(msgtype);
1589 		node	= NULL;
1590 		mx	= mdmn_get_master_table_mx(setno, class);
1591 
1592 		/* If we are in the abort state, we error out immediately */
1593 		if (md_commd_global_state & MD_CGS_ABORTED) {
1594 			break; /* out of the message loop */
1595 		}
1596 
1597 		commd_debug(MD_MMV_PROC_M, "class=%d, orig_class=%d\n",
1598 		    class, orig_class);
1599 		/*
1600 		 * If the current class is different from the original class,
1601 		 * we have to lock it down.
1602 		 * The original class is already marked busy.
1603 		 * At this point we cannot refuse the message because the
1604 		 * class is busy right now, so we wait until the class becomes
1605 		 * available again. As soon as something changes for this set
1606 		 * we will be cond_signal'ed (in mdmn_mark_class_unbusy)
1607 		 *
1608 		 * Granularity could be finer (setno/class)
1609 		 */
1610 		if (class != orig_class) {
1611 			mutex_lock(&mdmn_busy_mutex[setno]);
1612 			while (mdmn_mark_class_busy(setno, class) == FALSE) {
1613 				cond_wait(&mdmn_busy_cv[setno],
1614 				    &mdmn_busy_mutex[setno]);
1615 			}
1616 			mutex_unlock(&mdmn_busy_mutex[setno]);
1617 		}
1618 
1619 		master_err = do_message_locally(cmsg, result);
1620 
1621 		if ((master_err != MDMNE_ACK) ||
1622 		    ((master_err == MDMNE_ACK) && (result->mmr_exitval != 0))) {
1623 			result->mmr_failing_node = set_master;
1624 			if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1625 				/*
1626 				 * if appropriate, unbusy the class and
1627 				 * break out of the message loop
1628 				 */
1629 				if (class != orig_class) {
1630 					mutex_lock(&mdmn_busy_mutex[setno]);
1631 					mdmn_mark_class_unbusy(setno, class);
1632 					mutex_unlock(&mdmn_busy_mutex[setno]);
1633 				}
1634 				break;
1635 			}
1636 		}
1637 
1638 		if (master_err == MDMNE_ACK)
1639 			master_exitval = result->mmr_exitval;
1640 
1641 		/* No broadcast? => next message */
1642 		if (cmsg->msg_flags & MD_MSGF_NO_BCAST) {
1643 			/* if appropriate, unbusy the class */
1644 			if (class != orig_class) {
1645 				mutex_lock(&mdmn_busy_mutex[setno]);
1646 				mdmn_mark_class_unbusy(setno, class);
1647 				mutex_unlock(&mdmn_busy_mutex[setno]);
1648 			}
1649 			continue;
1650 		}
1651 
1652 
1653 		/* fake sender, so we get notified when the results are avail */
1654 		cmsg->msg_sender = set_master;
1655 		/*
1656 		 * register to the master_table. It's needed by wakeup_master to
1657 		 * wakeup the sleeping thread.
1658 		 * Access is protected by the class lock: mdmn_mark_class_busy()
1659 		 */
1660 		mdmn_set_master_table_id(setno, class, &(cmsg->msg_msgid));
1661 
1662 
1663 
1664 		rw_rdlock(&set_desc_rwlock[setno]);
1665 		/* Send the message  to all other nodes */
1666 		for (node = set_descriptor[setno]->sd_nodelist; node;
1667 		    node = node->nd_next) {
1668 			md_mn_nodeid_t nid = node->nd_nodeid;
1669 
1670 			/* We are master and have already processed the msg */
1671 			if (node == set_descriptor[setno]->sd_mn_masternode) {
1672 				continue;
1673 			}
1674 
1675 			/* If this node didn't join the disk set, ignore it */
1676 			if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
1677 				continue;
1678 			}
1679 
1680 			mutex_lock(mx);
1681 			/*
1682 			 * Register the node that is addressed,
1683 			 * so we can detect unsolicited messages
1684 			 */
1685 			mdmn_set_master_table_addr(setno, class, nid);
1686 			slave_result = (md_mn_result_t *)NULL;
1687 
1688 			/*
1689 			 * Now send it. do_send_message() will return if
1690 			 *	a failure occurs or
1691 			 *	the results are available
1692 			 */
1693 			err = do_send_message(cmsg, node);
1694 
1695 			/*  in abort state, we error out immediately */
1696 			if (md_commd_global_state & MD_CGS_ABORTED) {
1697 				break;
1698 			}
1699 
1700 			if (err == MDMNE_ACK) {
1701 				slave_result =
1702 				    mdmn_get_master_table_res(setno, class);
1703 				commd_debug(MD_MMV_PROC_M,
1704 				    "proc_mas: got result for (%d,0x%llx-%d)\n",
1705 				    MSGID_ELEMS(cmsg->msg_msgid));
1706 			} else if (err == MDMNE_IGNORE_NODE) {
1707 				mutex_unlock(mx);
1708 				continue; /* send to next node */
1709 			}
1710 			mutex_unlock(mx);
1711 
1712 
1713 			/*
1714 			 * If the result is NULL, or err doesn't show success,
1715 			 * something went wrong with this RPC call.
1716 			 */
1717 			if ((slave_result == NULL) || (err != MDMNE_ACK)) {
1718 				/*
1719 				 * If PANIC_WHEN_INCONSISTENT set,
1720 				 * panic if the master succeeded while
1721 				 * this node failed
1722 				 */
1723 				if ((cmsg->msg_flags &
1724 				    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1725 				    (master_err == MDMNE_ACK))
1726 					panic_system(nid, cmsg->msg_type,
1727 					    master_err, master_exitval,
1728 					    slave_result);
1729 
1730 				result->mmr_failing_node = nid;
1731 				/* are we supposed to stop in case of error? */
1732 				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1733 					result->mmr_exitval = MDMNE_RPC_FAIL;
1734 					commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1735 					    "result (%d,0x%llx-%d) is NULL\n",
1736 					    MSGID_ELEMS(cmsg->msg_msgid));
1737 					FLUSH_DEBUGFILE();
1738 					break_msg_loop = 1;
1739 					break; /* out of node loop first */
1740 				} else {
1741 					/* send msg to the next node */
1742 					continue;
1743 				}
1744 
1745 			}
1746 
1747 			/*
1748 			 * Message processed on remote node.
1749 			 * If PANIC_WHEN_INCONSISTENT set, panic if the
1750 			 * result is different on this node from the result
1751 			 * on the master
1752 			 */
1753 			if ((cmsg->msg_flags &
1754 			    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1755 			    ((master_err != MDMNE_ACK) ||
1756 			    (slave_result->mmr_exitval != master_exitval)))
1757 				panic_system(nid, cmsg->msg_type, master_err,
1758 				    master_exitval, slave_result);
1759 
1760 			/*
1761 			 * At this point we know we have a message that was
1762 			 * processed on the remote node.
1763 			 * We now check if the exitval is non zero.
1764 			 * In that case we discard the previous result and
1765 			 * rather use the current.
1766 			 * This means: If a message fails on no node,
1767 			 * the result from the master will be returned.
1768 			 * There's currently no such thing as merge of results
1769 			 * If additionally STOP_ON_ERROR is set, we bail out
1770 			 */
1771 			if (slave_result->mmr_exitval != 0) {
1772 				/* throw away the previously allocated result */
1773 				free_result(result);
1774 
1775 				/* copy_result() allocates new memory */
1776 				result = copy_result(slave_result);
1777 				free_result(slave_result);
1778 
1779 				dump_result(MD_MMV_PROC_M, "proc_mas", result);
1780 
1781 				result->mmr_failing_node = nid;
1782 				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1783 					break_msg_loop = 1;
1784 					break; /* out of node loop */
1785 				}
1786 				continue; /* try next node */
1787 
1788 			} else {
1789 				/*
1790 				 * MNIssue: may want to merge the results
1791 				 * from all slaves.  Currently only report
1792 				 * the results from the master.
1793 				 */
1794 				free_result(slave_result);
1795 			}
1796 
1797 		} /* End of loop over the nodes */
1798 		rw_unlock(&set_desc_rwlock[setno]);
1799 
1800 
1801 		/* release the current class again */
1802 		if (class != orig_class) {
1803 			mutex_lock(&mdmn_busy_mutex[setno]);
1804 			mdmn_mark_class_unbusy(setno, class);
1805 			mutex_unlock(&mdmn_busy_mutex[setno]);
1806 		}
1807 
1808 		/* are we supposed to quit entirely ? */
1809 		if (break_msg_loop ||
1810 		    (md_commd_global_state & MD_CGS_ABORTED)) {
1811 			break; /* out of msg loop */
1812 		}
1813 
1814 	} /* End of loop over the messages */
1815 	/*
1816 	 * If we are here, there's two possibilities:
1817 	 * 	- we processed all messages on all nodes without an error.
1818 	 *	    In this case we return the result from the master.
1819 	 *	    (to be implemented: return the merged result)
1820 	 *	- we encountered an error in which case result has been
1821 	 *	    set accordingly already.
1822 	 */
1823 
1824 	if (md_commd_global_state & MD_CGS_ABORTED) {
1825 		result->mmr_comm_state = MDMNE_ABORT;
1826 	}
1827 
1828 	/*
1829 	 * This message has been processed completely.
1830 	 * Remove it from the changelog.
1831 	 * Do this for replay messages too.
1832 	 * Note that the message is unlogged before waking up the
1833 	 * initiator.  This is done for two reasons.
1834 	 * 1. Remove a race condition that occurs when back to back
1835 	 *   messages are sent for the same class, the registeration is
1836 	 *   is lost.
1837 	 * 2. If the initiator died but the action was completed on all the
1838 	 *   the nodes, we want that to be marked "done" quickly.
1839 	 */
1840 
1841 	if ((msg->msg_flags & MD_MSGF_NO_LOG) == 0) {
1842 		commd_debug(MD_MMV_PROC_M,
1843 		    "proc_mas: calling unlog_msg for (%d,0x%llx-%d) type %d\n",
1844 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1845 		mdmn_unlog_msg(msg);
1846 		commd_debug(MD_MMV_PROC_M,
1847 		    "proc_mas: done unlog_msg for (%d,0x%llx-%d) type %d\n",
1848 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1849 	}
1850 
1851 	/*
1852 	 * In case of submessages, we increased the submessage ID in the
1853 	 * result structure. We restore the message ID to the value that
1854 	 * the initiator is waiting for.
1855 	 */
1856 	result->mmr_msgid.mid_smid	= 0;
1857 	result->mmr_msgtype		= orig_type;
1858 	result->mmr_sender		= set_master;
1859 
1860 	/* if we have an inited client, send result */
1861 	ret = (int *)NULL;
1862 
1863 	rw_rdlock(&client_rwlock[setno]);
1864 	if (check_client(setno, sender)) {
1865 		commd_debug(MD_MMV_SYSLOG,
1866 		    "proc_mas: unable to create client for initiator\n");
1867 	} else {
1868 		ret = mdmn_wakeup_initiator_1(result, client[setno][sender]);
1869 	}
1870 	rw_unlock(&client_rwlock[setno]);
1871 
1872 	if (ret == (int *)NULL) {
1873 		commd_debug(MD_MMV_PROC_M,
1874 		    "proc_mas: couldn't wakeup initiator\n");
1875 	} else {
1876 		if (*ret != MDMNE_ACK) {
1877 			commd_debug(MD_MMV_PROC_M,
1878 			    "proc_mas: wakeup_initiator returned %d\n",
1879 			    *ret);
1880 		}
1881 		free(ret);
1882 	}
1883 
1884 	rw_unlock(&set_desc_rwlock[setno]);
1885 	/* Free all submessages, if there were any */
1886 	if (nmsgs > 1) {
1887 		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1888 			free_msg(msglist[curmsg]);
1889 		}
1890 	}
1891 	/* Free the result */
1892 	free_result(result);
1893 
1894 	mutex_lock(&mdmn_busy_mutex[setno]);
1895 	mdmn_mark_class_unbusy(setno, orig_class);
1896 	mutex_unlock(&mdmn_busy_mutex[setno]);
1897 
1898 
1899 	/*
1900 	 * We use this ioctl just to get the time in the same format as used in
1901 	 * the messageID. If it fails, all we get is a bad runtime output.
1902 	 */
1903 	(void) metaioctl(MD_IOCGUNIQMSGID, &dummyid, &mde, NULL);
1904 	secdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) >> 32;
1905 	usecdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) & 0xfffff;
1906 
1907 	/* catching possible overflow */
1908 	if (usecdiff >= 1000000) {
1909 		usecdiff -= 1000000;
1910 		secdiff++;
1911 	}
1912 
1913 
1914 	commd_debug(MD_MMV_PROC_M, "proc_mas: done (%d, 0x%llx-%d) type=%02d "
1915 	    "%5d.%06d secs runtime\n",
1916 	    MSGID_ELEMS(msg->msg_msgid), orig_type, secdiff, usecdiff);
1917 
1918 	/* Free the original message */
1919 	free_msg(msg);
1920 }
1921 
1922 void
1923 mdmn_slave_process_msg(md_mn_msg_t *msg)
1924 {
1925 	int			*ret = NULL;
1926 	int			completed;
1927 	int			retries;
1928 	int			successfully_returned;
1929 	set_t			setno;
1930 	md_mn_result_t		*result;
1931 	md_mn_nodeid_t		sender;
1932 	md_mn_nodeid_t		whoami;
1933 	md_mn_msgtype_t		msgtype;
1934 	md_mn_msgclass_t	class;
1935 
1936 	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1937 
1938 	setno	= msg->msg_setno;
1939 	sender	= msg->msg_sender; /* this is always the master of the set */
1940 	msgtype	= msg->msg_type;
1941 
1942 	rw_rdlock(&set_desc_rwlock[setno]);
1943 	whoami		= set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
1944 	rw_unlock(&set_desc_rwlock[setno]);
1945 
1946 	result = Zalloc(sizeof (md_mn_result_t));
1947 	result->mmr_flags	= msg->msg_flags;
1948 	result->mmr_setno	= setno;
1949 	result->mmr_msgtype	= msgtype;
1950 	result->mmr_sender	= whoami;
1951 	result->mmr_comm_state	= MDMNE_ACK; /* Ok state */
1952 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1953 	class = mdmn_get_message_class(msgtype);
1954 
1955 	commd_debug(MD_MMV_PROC_S,
1956 	    "proc_sla: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1957 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msgtype);
1958 
1959 	handler = mdmn_get_handler(msgtype);
1960 
1961 	if (handler == NULL) {
1962 		result->mmr_exitval = 0;
1963 		/* let the sender decide if this is an error or not */
1964 		result->mmr_comm_state = MDMNE_NO_HANDLER;
1965 		commd_debug(MD_MMV_PROC_S,
1966 		    "proc_sla: No handler for (%d, 0x%llx-%d)\n",
1967 		    MSGID_ELEMS(msg->msg_msgid));
1968 	} else {
1969 
1970 		/* Did we already process this message ? */
1971 		mutex_lock(&mct_mutex[setno][class]);
1972 		completed = mdmn_check_completion(msg, result);
1973 
1974 		if (completed == MDMN_MCT_NOT_DONE) {
1975 			/* message not yet processed locally */
1976 			commd_debug(MD_MMV_PROC_S,
1977 			    "proc_sla: calling handler for (%d, 0x%llx-%d)\n",
1978 			    MSGID_ELEMS(msg->msg_msgid));
1979 
1980 			/*
1981 			 * Mark the message as being currently processed,
1982 			 * so we won't start a second handler for it
1983 			 */
1984 			(void) mdmn_mark_completion(msg, NULL,
1985 			    MDMN_MCT_IN_PROGRESS);
1986 
1987 			mutex_unlock(&mct_mutex[setno][class]);
1988 			(*handler)(msg, MD_MSGF_ON_SLAVE, result);
1989 
1990 			commd_debug(MD_MMV_PROC_S,
1991 			    "proc_sla: finished handler for (%d, 0x%llx-%d)\n",
1992 			    MSGID_ELEMS(msg->msg_msgid));
1993 
1994 			mutex_lock(&mct_mutex[setno][class]);
1995 			/* Mark the message as fully done, store the result */
1996 			(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
1997 
1998 		} else if (completed == MDMN_MCT_DONE) {
1999 			/* message processed previously, got result from MCT */
2000 			commd_debug(MD_MMV_PROC_S,
2001 			    "proc_sla: result for (%d, 0x%llx-%d) from MCT\n",
2002 			    MSGID_ELEMS(msg->msg_msgid));
2003 		} else if (completed == MDMN_MCT_IN_PROGRESS) {
2004 			/*
2005 			 * If the message is curruntly being processed,
2006 			 * we can return here, without sending a result back.
2007 			 * This will be done by the initial message handling
2008 			 * thread
2009 			 */
2010 			mutex_unlock(&mct_mutex[setno][class]);
2011 			commd_debug(MD_MMV_PROC_M, "proc_sla: "
2012 			    "(%d, 0x%llx-%d) is currently being processed\n",
2013 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
2014 
2015 			free_msg(msg);
2016 			free_result(result);
2017 			return;
2018 		} else {
2019 			/* MCT error occurred (should never happen) */
2020 			result->mmr_comm_state = MDMNE_LOG_FAIL;
2021 			commd_debug(MD_MMV_PROC_S,
2022 			    "proc_sla: MCT error for (%d, 0x%llx-%d)\n",
2023 			    MSGID_ELEMS(msg->msg_msgid));
2024 		}
2025 		mutex_unlock(&mct_mutex[setno][class]);
2026 	}
2027 
2028 	/*
2029 	 * At this point we have a result (even in an error case)
2030 	 * that we return to the master.
2031 	 */
2032 	rw_rdlock(&set_desc_rwlock[setno]);
2033 	retries = 2; /* we will try two times to send the results */
2034 	successfully_returned = 0;
2035 
2036 	while (!successfully_returned && (retries != 0)) {
2037 		ret = (int *)NULL;
2038 		rw_rdlock(&client_rwlock[setno]);
2039 		if (check_client(setno, sender)) {
2040 			/*
2041 			 * If we cannot setup the rpc connection to the master,
2042 			 * we can't do anything besides logging this fact.
2043 			 */
2044 			commd_debug(MD_MMV_SYSLOG,
2045 			    "proc_mas: unable to create client for master\n");
2046 			rw_unlock(&client_rwlock[setno]);
2047 			break;
2048 		} else {
2049 			ret = mdmn_wakeup_master_1(result,
2050 			    client[setno][sender]);
2051 			/*
2052 			 * if mdmn_wakeup_master_1 returns NULL, it can be that
2053 			 * the master (or the commd on the master) had died.
2054 			 * In that case, we destroy the client to the master
2055 			 * and retry.
2056 			 * If mdmn_wakeup_master_1 doesn't return MDMNE_ACK,
2057 			 * the commd on the master is alive but
2058 			 * something else is wrong,
2059 			 * in that case a retry doesn't make sense => break out
2060 			 */
2061 			if (ret == (int *)NULL) {
2062 				commd_debug(MD_MMV_PROC_S,
2063 				    "proc_sla: wakeup_master returned NULL\n");
2064 				/* release reader lock, grab writer lock */
2065 				rw_unlock(&client_rwlock[setno]);
2066 				rw_wrlock(&client_rwlock[setno]);
2067 				mdmn_clnt_destroy(client[setno][sender]);
2068 				if (client[setno][sender] != (CLIENT *)NULL) {
2069 					client[setno][sender] = (CLIENT *)NULL;
2070 				}
2071 				rw_unlock(&client_rwlock[setno]);
2072 				retries--;
2073 				commd_debug(MD_MMV_PROC_S,
2074 				    "retries = %d\n", retries);
2075 				continue;
2076 			}
2077 			if (*ret != MDMNE_ACK) {
2078 				commd_debug(MD_MMV_PROC_S, "proc_sla: "
2079 				    "wakeup_master returned %d\n", *ret);
2080 				rw_unlock(&client_rwlock[setno]);
2081 				break;
2082 			} else { /* Good case */
2083 				successfully_returned = 1;
2084 				rw_unlock(&client_rwlock[setno]);
2085 			}
2086 		}
2087 	}
2088 
2089 	rw_unlock(&set_desc_rwlock[setno]);
2090 	commd_debug(MD_MMV_PROC_S, "proc_sla: done (%d, 0x%llx-%d)\n",
2091 	    MSGID_ELEMS(msg->msg_msgid));
2092 
2093 	if (ret != (int *)NULL)
2094 		free(ret);
2095 	free_msg(msg);
2096 	free_result(result);
2097 }
2098 
2099 
2100 md_mn_result_t *
2101 mdmn_send_svc_1(md_mn_msg_t *omsg, struct svc_req *rqstp)
2102 {
2103 	int			err;
2104 	set_t			setno;
2105 	SVCXPRT			*transp = rqstp->rq_xprt;
2106 	md_mn_msg_t		*msg;
2107 	md_mn_result_t		*resultp;
2108 	md_mn_msgclass_t	class;
2109 	md_mn_msg_and_transp_t	*matp;
2110 
2111 	msg = copy_msg(omsg, NULL);
2112 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2113 
2114 	setno = msg->msg_setno;
2115 	class = mdmn_get_message_class(msg->msg_type);
2116 
2117 	/* If we are in the abort state, we error out immediately */
2118 	if (md_commd_global_state & MD_CGS_ABORTED) {
2119 		resultp = Zalloc(sizeof (md_mn_result_t));
2120 		resultp->mmr_comm_state = MDMNE_ABORT;
2121 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2122 		free_result(resultp);
2123 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2124 		return (NULL);
2125 	}
2126 
2127 	/* check if the global initialization is done */
2128 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2129 		global_init();
2130 	}
2131 
2132 	commd_debug(MD_MMV_SEND,
2133 	    "send: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2134 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2135 
2136 	/* Check for verbosity related message */
2137 	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2138 		md_mn_verbose_t *d;
2139 
2140 		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2141 		md_commd_global_verb = d->mmv_what;
2142 		/* everytime the bitmask is set, we reset the timer */
2143 		__savetime = gethrtime();
2144 		/*
2145 		 * If local-only-flag is set, we are done here,
2146 		 * otherwise we pass that message on to the master.
2147 		 */
2148 		if (msg->msg_flags & MD_MSGF_LOCAL_ONLY) {
2149 			resultp = Zalloc(sizeof (md_mn_result_t));
2150 			resultp->mmr_comm_state = MDMNE_ACK;
2151 			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2152 			    (char *)resultp);
2153 			free_result(resultp);
2154 			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2155 			return (NULL);
2156 		}
2157 	}
2158 
2159 	/*
2160 	 * Are we entering the abort state?
2161 	 * Here we don't even need to check for MD_MSGF_LOCAL_ONLY, because
2162 	 * this message cannot be distributed anyway.
2163 	 * So, it's safe to return immediately.
2164 	 */
2165 	if (msg->msg_type == MD_MN_MSG_ABORT) {
2166 		md_commd_global_state |= MD_CGS_ABORTED;
2167 		resultp = Zalloc(sizeof (md_mn_result_t));
2168 		resultp->mmr_comm_state = MDMNE_ACK;
2169 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2170 		free_result(resultp);
2171 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2172 		return (NULL);
2173 	}
2174 
2175 
2176 	/*
2177 	 * Is this message type blocked?
2178 	 * If so we return MDMNE_CLASS_LOCKED, immediately
2179 	 */
2180 	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2181 		resultp = Zalloc(sizeof (md_mn_result_t));
2182 		resultp->mmr_comm_state = MDMNE_CLASS_LOCKED;
2183 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2184 		free_result(resultp);
2185 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2186 		commd_debug(MD_MMV_SEND,
2187 			"send: type locked (%d, 0x%llx-%d), set=%d, class=%d, "
2188 			"type=%d\n", MSGID_ELEMS(msg->msg_msgid), setno, class,
2189 			msg->msg_type);
2190 		return (NULL);
2191 	}
2192 
2193 
2194 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2195 		/* Can only use the appropriate mutexes if they are inited */
2196 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2197 			rw_wrlock(&set_desc_rwlock[setno]);
2198 			rw_wrlock(&client_rwlock[setno]);
2199 			err = mdmn_init_set(setno, MDMN_SET_READY);
2200 			rw_unlock(&client_rwlock[setno]);
2201 			rw_unlock(&set_desc_rwlock[setno]);
2202 		} else {
2203 			err = mdmn_init_set(setno, MDMN_SET_READY);
2204 		}
2205 
2206 		if (err) {
2207 			/* couldn't initialize connections, cannot proceed */
2208 			resultp = Zalloc(sizeof (md_mn_result_t));
2209 			resultp->mmr_comm_state = err;
2210 			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2211 			    (char *)resultp);
2212 			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2213 			free_result(resultp);
2214 			commd_debug(MD_MMV_SEND,
2215 			    "send: init err = %d\n", err);
2216 			return (NULL);
2217 		}
2218 	}
2219 
2220 	mutex_lock(&mdmn_busy_mutex[setno]);
2221 	if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2222 	    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2223 		mutex_unlock(&mdmn_busy_mutex[setno]);
2224 		resultp = Zalloc(sizeof (md_mn_result_t));
2225 		resultp->mmr_comm_state = MDMNE_SUSPENDED;
2226 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2227 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2228 		free_result(resultp);
2229 		commd_debug(MD_MMV_SEND,
2230 			"send: class suspended (%d, 0x%llx-%d), set=%d, "
2231 			"class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2232 			setno, class, msg->msg_type);
2233 		return (NULL);
2234 	}
2235 	mutex_unlock(&mdmn_busy_mutex[setno]);
2236 
2237 	/* is this rpc request coming from the local node? */
2238 	if (check_license(rqstp, 0) == FALSE) {
2239 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2240 		commd_debug(MD_MMV_SEND,
2241 			"send: check licence fail(%d, 0x%llx-%d), set=%d, "
2242 			"class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2243 			setno, class, msg->msg_type);
2244 		return (NULL);
2245 	}
2246 
2247 
2248 	/*
2249 	 * We allocate a structure that can take two pointers in order to pass
2250 	 * both the message and the transp into thread_create.
2251 	 * The free for this alloc is done in mdmn_send_to_work()
2252 	 */
2253 	matp = Malloc(sizeof (md_mn_msg_and_transp_t));
2254 	matp->mat_msg = msg;
2255 	matp->mat_transp = transp;
2256 
2257 	/*
2258 	 * create a thread here that calls work on the master.
2259 	 * If we are already on the master, this would block if running
2260 	 * in the same context. (our service is single threaded)(
2261 	 * Make it a detached thread because it will not communicate with
2262 	 * anybody thru thr_* mechanisms
2263 	 */
2264 	thr_create(NULL, 0, mdmn_send_to_work, (void *) matp, THR_DETACHED,
2265 	    NULL);
2266 
2267 	commd_debug(MD_MMV_SEND, "send: done (%d, 0x%llx-%d)\n",
2268 	    MSGID_ELEMS(msg->msg_msgid));
2269 	/*
2270 	 * We return here without sending results. This will be done by
2271 	 * mdmn_wakeup_initiator_svc_1() as soon as the results are available.
2272 	 * Until then the calling send_message will be blocked, while we
2273 	 * are able to take calls.
2274 	 */
2275 
2276 	return (NULL);
2277 }
2278 
2279 /* ARGSUSED */
2280 int *
2281 mdmn_work_svc_1(md_mn_msg_t *omsg, struct svc_req *rqstp)
2282 {
2283 	int		err;
2284 	set_t		setno;
2285 	thread_t	tid;
2286 	int		*retval;
2287 	md_mn_msg_t	*msg;
2288 	md_mn_msgclass_t class;
2289 
2290 	retval = Malloc(sizeof (int));
2291 
2292 	/* If we are in the abort state, we error out immediately */
2293 	if (md_commd_global_state & MD_CGS_ABORTED) {
2294 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2295 		*retval = MDMNE_ABORT;
2296 		return (retval);
2297 	}
2298 
2299 	msg = copy_msg(omsg, NULL);
2300 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2301 
2302 	/*
2303 	 * Is this message type blocked?
2304 	 * If so we return MDMNE_CLASS_LOCKED, immediately.
2305 	 * This check is performed on master and slave.
2306 	 */
2307 	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2308 		*retval = MDMNE_CLASS_LOCKED;
2309 		return (retval);
2310 	}
2311 
2312 	/* check if the global initialization is done */
2313 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2314 		global_init();
2315 	}
2316 
2317 	class = mdmn_get_message_class(msg->msg_type);
2318 	setno = msg->msg_setno;
2319 
2320 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2321 		/* Can only use the appropriate mutexes if they are inited */
2322 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2323 			rw_wrlock(&set_desc_rwlock[setno]);
2324 			rw_wrlock(&client_rwlock[setno]);
2325 			err = mdmn_init_set(setno, MDMN_SET_READY);
2326 			rw_unlock(&client_rwlock[setno]);
2327 			rw_unlock(&set_desc_rwlock[setno]);
2328 		} else {
2329 			err = mdmn_init_set(setno, MDMN_SET_READY);
2330 		}
2331 
2332 		if (err) {
2333 			*retval = MDMNE_CANNOT_CONNECT;
2334 			free_msg(msg);
2335 			return (retval);
2336 		}
2337 	}
2338 
2339 	/* is this rpc request coming from a licensed node? */
2340 	if (check_license(rqstp, msg->msg_sender) == FALSE) {
2341 		free_msg(msg);
2342 		*retval = MDMNE_RPC_FAIL;
2343 		return (retval);
2344 	}
2345 
2346 	commd_debug(MD_MMV_WORK,
2347 	    "work: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
2348 	    "flags=0x%x\n",
2349 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type,
2350 	    msg->msg_flags);
2351 
2352 	/* Check for various CLASS0 message types */
2353 	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2354 		md_mn_verbose_t *d;
2355 
2356 		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2357 		/* for now we ignore set / class in md_mn_verbose_t */
2358 		md_commd_global_verb = d->mmv_what;
2359 		/* everytime the bitmask is set, we reset the timer */
2360 		__savetime = gethrtime();
2361 	}
2362 
2363 	mutex_lock(&mdmn_busy_mutex[setno]);
2364 
2365 	/* check if class is locked via a call to mdmn_comm_lock_svc_1 */
2366 	if (mdmn_is_class_locked(setno, class) == TRUE) {
2367 		mutex_unlock(&mdmn_busy_mutex[setno]);
2368 		*retval = MDMNE_CLASS_LOCKED;
2369 		free_msg(msg);
2370 		return (retval);
2371 	}
2372 	mutex_unlock(&mdmn_busy_mutex[setno]);
2373 
2374 	/* Check if the class is busy right now. Do it only on the master */
2375 	rw_rdlock(&set_desc_rwlock[setno]);
2376 	if (set_descriptor[setno]->sd_mn_am_i_master) {
2377 		rw_unlock(&set_desc_rwlock[setno]);
2378 		/*
2379 		 * If the class is currently suspended, don't accept new
2380 		 * messages, unless they are flagged with an override bit.
2381 		 */
2382 		mutex_lock(&mdmn_busy_mutex[setno]);
2383 		if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2384 		    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2385 			mutex_unlock(&mdmn_busy_mutex[setno]);
2386 			*retval = MDMNE_SUSPENDED;
2387 			commd_debug(MD_MMV_SEND,
2388 			    "send: set %d is suspended\n", setno);
2389 			free_msg(msg);
2390 			return (retval);
2391 		}
2392 		if (mdmn_mark_class_busy(setno, class) == FALSE) {
2393 			mutex_unlock(&mdmn_busy_mutex[setno]);
2394 			*retval = MDMNE_CLASS_BUSY;
2395 			free_msg(msg);
2396 			return (retval);
2397 		}
2398 		mutex_unlock(&mdmn_busy_mutex[setno]);
2399 		/*
2400 		 * Because the real processing of the message takes time we
2401 		 * create a thread for it. So the master thread can continue
2402 		 * to run and accept further messages.
2403 		 */
2404 		*retval = thr_create(NULL, 0,
2405 		    (void *(*)(void *))mdmn_master_process_msg, (void *)msg,
2406 		    THR_DETACHED|THR_SUSPENDED, &tid);
2407 	} else {
2408 		rw_unlock(&set_desc_rwlock[setno]);
2409 		*retval = thr_create(NULL, 0,
2410 		    (void *(*)(void *)) mdmn_slave_process_msg, (void *)msg,
2411 		    THR_DETACHED|THR_SUSPENDED, &tid);
2412 	}
2413 
2414 	if (*retval != 0) {
2415 		*retval = MDMNE_THR_CREATE_FAIL;
2416 		free_msg(msg);
2417 		return (retval);
2418 	}
2419 
2420 	/* Now run the new thread */
2421 	thr_continue(tid);
2422 
2423 	commd_debug(MD_MMV_WORK,
2424 	    "work: done (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2425 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2426 
2427 	*retval = MDMNE_ACK; /* this means success */
2428 	return (retval);
2429 }
2430 
2431 /* ARGSUSED */
2432 int *
2433 mdmn_wakeup_initiator_svc_1(md_mn_result_t *res, struct svc_req *rqstp)
2434 {
2435 
2436 	int		*retval;
2437 	int		err;
2438 	set_t		setno;
2439 	mutex_t		*mx;   /* protection of initiator_table */
2440 	SVCXPRT		*transp;
2441 	md_mn_msgid_t	initiator_table_id;
2442 	md_mn_msgclass_t class;
2443 
2444 	retval = Malloc(sizeof (int));
2445 
2446 	/* check if the global initialization is done */
2447 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2448 		global_init();
2449 	}
2450 
2451 	setno	= res->mmr_setno;
2452 
2453 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2454 		/* set not ready means we just crashed are restarted now */
2455 		/* Can only use the appropriate mutexes if they are inited */
2456 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2457 			rw_wrlock(&set_desc_rwlock[setno]);
2458 			rw_wrlock(&client_rwlock[setno]);
2459 			err = mdmn_init_set(setno, MDMN_SET_READY);
2460 			rw_unlock(&client_rwlock[setno]);
2461 			rw_unlock(&set_desc_rwlock[setno]);
2462 		} else {
2463 			err = mdmn_init_set(setno, MDMN_SET_READY);
2464 		}
2465 
2466 		if (err) {
2467 			*retval = MDMNE_CANNOT_CONNECT;
2468 			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2469 			return (retval);
2470 		}
2471 	}
2472 
2473 	/* is this rpc request coming from a licensed node? */
2474 	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2475 		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2476 		*retval = MDMNE_RPC_FAIL;
2477 		return (retval);
2478 	}
2479 
2480 
2481 	class	= mdmn_get_message_class(res->mmr_msgtype);
2482 	mx	= mdmn_get_initiator_table_mx(setno, class);
2483 
2484 	commd_debug(MD_MMV_WAKE_I,
2485 	    "wake_ini: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
2486 	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype);
2487 
2488 	mutex_lock(mx);
2489 
2490 	/*
2491 	 * Search the initiator wakeup table.
2492 	 * If we find an entry here (which should always be true)
2493 	 * we are on the initiating node and we wakeup the original
2494 	 * local rpc call
2495 	 */
2496 	mdmn_get_initiator_table_id(setno, class, &initiator_table_id);
2497 
2498 	if (MSGID_CMP(&(initiator_table_id), &(res->mmr_msgid))) {
2499 		transp = mdmn_get_initiator_table_transp(setno, class);
2500 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)res);
2501 		mdmn_unregister_initiator_table(setno, class);
2502 		*retval = MDMNE_ACK;
2503 
2504 		commd_debug(MD_MMV_WAKE_I,
2505 		    "wake_ini: replied (%d, 0x%llx-%d)\n",
2506 		    MSGID_ELEMS(res->mmr_msgid));
2507 	} else {
2508 		commd_debug(MD_MMV_WAKE_I,
2509 		    "wakeup initiator: unsolicited message (%d, 0x%llx-%d)\n",
2510 		    MSGID_ELEMS(res->mmr_msgid));
2511 		*retval = MDMNE_NO_WAKEUP_ENTRY;
2512 	}
2513 	mutex_unlock(mx);
2514 	/* less work for check_timeouts */
2515 	mutex_lock(&check_timeout_mutex);
2516 	if (messages_on_their_way == 0) {
2517 		commd_debug(MD_MMV_WAKE_I,
2518 		    "Oops, messages_on_their_way < 0 (%d, 0x%llx-%d)\n",
2519 		    MSGID_ELEMS(res->mmr_msgid));
2520 	} else {
2521 		messages_on_their_way--;
2522 	}
2523 	mutex_unlock(&check_timeout_mutex);
2524 	xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2525 
2526 	return (retval);
2527 }
2528 
2529 
2530 /*
2531  * res must be free'd by the thread we wake up
2532  */
2533 /* ARGSUSED */
2534 int *
2535 mdmn_wakeup_master_svc_1(md_mn_result_t *ores, struct svc_req *rqstp)
2536 {
2537 
2538 	int		*retval;
2539 	int		err;
2540 	set_t		setno;
2541 	cond_t		*cv;
2542 	mutex_t		*mx;
2543 	md_mn_msgid_t	master_table_id;
2544 	md_mn_nodeid_t	sender;
2545 	md_mn_result_t	*res;
2546 	md_mn_msgclass_t class;
2547 
2548 	retval = Malloc(sizeof (int));
2549 
2550 	/* check if the global initialization is done */
2551 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2552 		global_init();
2553 	}
2554 
2555 	/* Need to copy the results here, as they are static for RPC */
2556 	res = copy_result(ores);
2557 	xdr_free(xdr_md_mn_result_t, (caddr_t)ores);
2558 
2559 	class = mdmn_get_message_class(res->mmr_msgtype);
2560 	setno = res->mmr_setno;
2561 
2562 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2563 		/* set not ready means we just crashed are restarted now */
2564 		/* Can only use the appropriate mutexes if they are inited */
2565 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2566 			rw_wrlock(&set_desc_rwlock[setno]);
2567 			rw_wrlock(&client_rwlock[setno]);
2568 			err = mdmn_init_set(setno, MDMN_SET_READY);
2569 			rw_unlock(&client_rwlock[setno]);
2570 			rw_unlock(&set_desc_rwlock[setno]);
2571 		} else {
2572 			err = mdmn_init_set(setno, MDMN_SET_READY);
2573 		}
2574 
2575 		if (err) {
2576 			*retval = MDMNE_CANNOT_CONNECT;
2577 			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2578 			return (retval);
2579 		}
2580 	}
2581 
2582 	/* is this rpc request coming from a licensed node? */
2583 	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2584 		*retval = MDMNE_RPC_FAIL;
2585 		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2586 		return (retval);
2587 	}
2588 
2589 
2590 	commd_debug(MD_MMV_WAKE_M,
2591 	    "wake_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d "
2592 	    "from %d\n",
2593 	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype,
2594 	    res->mmr_sender);
2595 	/*
2596 	 * The mutex and cv are needed for waking up the thread
2597 	 * sleeping in mdmn_master_process_msg()
2598 	 */
2599 	mx = mdmn_get_master_table_mx(setno, class);
2600 	cv = mdmn_get_master_table_cv(setno, class);
2601 
2602 	/*
2603 	 * lookup the master wakeup table
2604 	 * If we find our message, we are on the master and
2605 	 * called by a slave that finished processing a message.
2606 	 * We store the results in the appropriate slot and
2607 	 * wakeup the thread (mdmn_master_process_msg()) waiting for them.
2608 	 */
2609 	mutex_lock(mx);
2610 	mdmn_get_master_table_id(setno, class, &master_table_id);
2611 	sender = mdmn_get_master_table_addr(setno, class);
2612 
2613 	if (MSGID_CMP(&(master_table_id), &(res->mmr_msgid))) {
2614 		if (sender == res->mmr_sender) {
2615 			mdmn_set_master_table_res(setno, class, res);
2616 			cond_signal(cv);
2617 			*retval = MDMNE_ACK;
2618 		} else {
2619 			/* id is correct but wrong sender (I smell a timeout) */
2620 			commd_debug(MD_MMV_WAKE_M,
2621 			    "wakeup master got unsolicited message: "
2622 			    "(%d, 0x%llx-%d) from %d\n",
2623 			    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender);
2624 			free_result(res);
2625 			*retval = MDMNE_TIMEOUT;
2626 		}
2627 	} else {
2628 		/* id is wrong, smells like a very late timeout */
2629 		commd_debug(MD_MMV_WAKE_M,
2630 		    "wakeup master got unsolicited message: "
2631 		    "(%d, 0x%llx-%d) from %d, expected (%d, 0x%llx-%d)\n",
2632 		    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender,
2633 		    MSGID_ELEMS(master_table_id));
2634 		free_result(res);
2635 		*retval = MDMNE_NO_WAKEUP_ENTRY;
2636 	}
2637 
2638 	mutex_unlock(mx);
2639 
2640 	return (retval);
2641 }
2642 
2643 /*
2644  * Lock a set/class combination.
2645  * This is mainly done for debug purpose.
2646  * This set/class combination immediately is blocked,
2647  * even in the middle of sending messages to multiple slaves.
2648  * This remains until the user issues a mdmn_comm_unlock_svc_1 for the same
2649  * set/class combination.
2650  *
2651  * Special messages of class MD_MSG_CLASS0 can never be locked.
2652  * 	e.g. MD_MN_MSG_VERBOSITY, MD_MN_MSG_ABORT
2653  *
2654  * That means, if MD_MSG_CLASS0 is specified, we lock all classes from
2655  * >= MD_MSG_CLASS1 to < MD_MN_NCLASSES
2656  *
2657  * set must be between 1 and MD_MAXSETS
2658  * class can be:
2659  *	MD_MSG_CLASS0 which means all other classes in this case
2660  *	or one specific class (< MD_MN_NCLASSES)
2661  *
2662  * Returns:
2663  *	MDMNE_ACK on sucess (locking a locked class is Ok)
2664  *	MDMNE_EINVAL if a parameter is out of range
2665  */
2666 
2667 /* ARGSUSED */
2668 int *
2669 mdmn_comm_lock_svc_1(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2670 {
2671 	int			*retval;
2672 	set_t			setno = msc->msc_set;
2673 	md_mn_msgclass_t	class = msc->msc_class;
2674 
2675 	retval = Malloc(sizeof (int));
2676 
2677 	/* check if the global initialization is done */
2678 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2679 		global_init();
2680 	}
2681 
2682 	/* is this rpc request coming from the local node ? */
2683 	if (check_license(rqstp, 0) == FALSE) {
2684 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2685 		*retval = MDMNE_RPC_FAIL;
2686 		return (retval);
2687 	}
2688 
2689 	/* Perform some range checking */
2690 	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2691 	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2692 		*retval = MDMNE_EINVAL;
2693 		return (retval);
2694 	}
2695 
2696 	commd_debug(MD_MMV_MISC, "lock: set=%d, class=%d\n", setno, class);
2697 	mutex_lock(&mdmn_busy_mutex[setno]);
2698 	if (class != MD_MSG_CLASS0) {
2699 		mdmn_mark_class_locked(setno, class);
2700 	} else {
2701 		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2702 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2703 			mdmn_mark_class_locked(setno, class);
2704 		}
2705 	}
2706 	mutex_unlock(&mdmn_busy_mutex[setno]);
2707 
2708 	*retval = MDMNE_ACK;
2709 	return (retval);
2710 }
2711 
2712 /*
2713  * Unlock a set/class combination.
2714  * set must be between 1 and MD_MAXSETS
2715  * class can be:
2716  *	MD_MSG_CLASS0 which means all other classes in this case (like above)
2717  *	or one specific class (< MD_MN_NCLASSES)
2718  *
2719  * Returns:
2720  *	MDMNE_ACK on sucess (unlocking an unlocked class is Ok)
2721  *	MDMNE_EINVAL if a parameter is out of range
2722  */
2723 /* ARGSUSED */
2724 int *
2725 mdmn_comm_unlock_svc_1(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2726 {
2727 	int			*retval;
2728 	set_t			setno  = msc->msc_set;
2729 	md_mn_msgclass_t	class  = msc->msc_class;
2730 
2731 	retval = Malloc(sizeof (int));
2732 
2733 	/* check if the global initialization is done */
2734 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2735 		global_init();
2736 	}
2737 
2738 	/* is this rpc request coming from the local node ? */
2739 	if (check_license(rqstp, 0) == FALSE) {
2740 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2741 		*retval = MDMNE_RPC_FAIL;
2742 		return (retval);
2743 	}
2744 
2745 	/* Perform some range checking */
2746 	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2747 	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2748 		*retval = MDMNE_EINVAL;
2749 		return (retval);
2750 	}
2751 	commd_debug(MD_MMV_MISC, "unlock: set=%d, class=%d\n", setno, class);
2752 
2753 	mutex_lock(&mdmn_busy_mutex[setno]);
2754 	if (class != MD_MSG_CLASS0) {
2755 		mdmn_mark_class_unlocked(setno, class);
2756 	} else {
2757 		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2758 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2759 			mdmn_mark_class_unlocked(setno, class);
2760 		}
2761 	}
2762 	mutex_unlock(&mdmn_busy_mutex[setno]);
2763 
2764 	*retval = MDMNE_ACK;
2765 	return (retval);
2766 }
2767 
2768 /*
2769  * mdmn_comm_suspend_svc_1(setno, class)
2770  *
2771  * Drain all outstanding messages for a given set/class combination
2772  * and don't allow new messages to be processed.
2773  *
2774  * Special messages of class MD_MSG_CLASS0 can never be locked.
2775  * 	e.g. MD_MN_MSG_VERBOSITY
2776  *
2777  * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2778  * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2779  *
2780  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2781  * one class as being suspended.
2782  * If messages for this class are currently on their way,
2783  * MDMNE_SET_NOT_DRAINED is returned. Otherwise MDMNE_ACK is returned.
2784  *
2785  * If class _is_ MD_COMM_ALL_CLASSES we drain all classes of this set.
2786  * Messages must be generated in ascending order.
2787  * This means, a message cannot create submessages with the same or lower class.
2788  * Draining messages must go from 1 to NCLASSES in order to ensure we don't
2789  * generate a hanging situation here.
2790  * We mark class 1 as being suspended.
2791  * if the class is not busy, we proceed with class 2
2792  * and so on
2793  * if a class *is* busy, we cannot continue here, but return
2794  * MDMNE_SET_NOT_DRAINED.
2795  * We expect the caller to hold on for some seconds and try again.
2796  * When that message, that held the class busy is done in
2797  * mdmn_master_process_msg(), mdmn_mark_class_unbusy() called.
2798  * There it is checked if the class is about to drain.
2799  * In that case it tries to drain all higher classes there.
2800  *
2801  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2802  * In that case we return MDMNE_SET_NOT_DRAINED if not all sets are
2803  * completely drained.
2804  *
2805  * Returns:
2806  *	MDMNE_ACK on sucess (set is drained, no outstanding messages)
2807  *	MDMNE_SET_NOT_DRAINED  if drain process is started, but there are
2808  *		still outstanding messages for this set(s)
2809  *	MDMNE_EINVAL if setno is out of range
2810  *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2811  */
2812 
2813 /* ARGSUSED */
2814 int *
2815 mdmn_comm_suspend_svc_1(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2816 {
2817 	int			*retval;
2818 	int			failure = 0;
2819 	set_t			startset, endset;
2820 	set_t			setno  = msc->msc_set;
2821 	md_mn_msgclass_t	oclass = msc->msc_class;
2822 #ifdef NOT_YET_NEEDED
2823 	uint_t			flags  = msc->msc_flags;
2824 #endif /* NOT_YET_NEEDED */
2825 	md_mn_msgclass_t	class;
2826 
2827 	retval = Malloc(sizeof (int));
2828 
2829 	/* check if the global initialization is done */
2830 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2831 		global_init();
2832 	}
2833 
2834 	/* is this rpc request coming from the local node ? */
2835 	if (check_license(rqstp, 0) == FALSE) {
2836 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2837 		*retval = MDMNE_RPC_FAIL;
2838 		return (retval);
2839 	}
2840 
2841 	commd_debug(MD_MMV_MISC, "suspend: called for set=%d class=%d\n",
2842 	    setno, oclass);
2843 
2844 	/* Perform some range checking */
2845 	if (setno >= MD_MAXSETS) {
2846 		*retval = MDMNE_EINVAL;
2847 		commd_debug(MD_MMV_MISC, "suspend: returning MDMNE_EINVAL\n");
2848 		return (retval);
2849 	}
2850 
2851 	/*  setno == MD_COMM_ALL_SETS means: we walk thru all possible sets. */
2852 	if (setno == MD_COMM_ALL_SETS) {
2853 		startset = 1;
2854 		endset = MD_MAXSETS - 1;
2855 	} else {
2856 		startset = setno;
2857 		endset = setno;
2858 	}
2859 
2860 	for (setno = startset; setno <= endset; setno++) {
2861 		/* Here we need the mutexes for the set to be setup */
2862 		if (md_mn_set_inited[setno] != MDMN_SET_MUTEXES) {
2863 			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
2864 		}
2865 
2866 		mutex_lock(&mdmn_busy_mutex[setno]);
2867 		/* shall we drain all classes of this set? */
2868 		if (oclass == MD_COMM_ALL_CLASSES) {
2869 			for (class = 1; class < MD_MN_NCLASSES; class ++) {
2870 				commd_debug(MD_MMV_MISC,
2871 				    "suspend: suspending set %d, class %d\n",
2872 				    setno, class);
2873 				*retval = mdmn_mark_class_suspended(setno,
2874 				    class, MDMN_SUSPEND_ALL);
2875 				if (*retval == MDMNE_SET_NOT_DRAINED) {
2876 					failure++;
2877 				}
2878 			}
2879 		} else {
2880 			/* only drain one specific class */
2881 			commd_debug(MD_MMV_MISC,
2882 			    "suspend: suspending set=%d class=%d\n",
2883 			    setno, oclass);
2884 			*retval = mdmn_mark_class_suspended(setno, oclass,
2885 			    MDMN_SUSPEND_1);
2886 			if (*retval == MDMNE_SET_NOT_DRAINED) {
2887 				failure++;
2888 			}
2889 		}
2890 		mutex_unlock(&mdmn_busy_mutex[setno]);
2891 	}
2892 	/* If one or more sets are not entirely drained, failure is non-zero */
2893 	if (failure != 0) {
2894 		*retval = MDMNE_SET_NOT_DRAINED;
2895 		commd_debug(MD_MMV_MISC,
2896 		    "suspend: returning MDMNE_SET_NOT_DRAINED\n");
2897 	} else {
2898 		*retval = MDMNE_ACK;
2899 	}
2900 
2901 	return (retval);
2902 }
2903 
2904 /*
2905  * mdmn_comm_resume_svc_1(setno, class)
2906  *
2907  * Resume processing messages for a given set.
2908  * This incorporates the repeal of a previous suspend operation.
2909  *
2910  * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2911  * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2912  *
2913  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2914  * one class as being resumed.
2915  *
2916  * If class _is_ MD_COMM_ALL_CLASSES we resume all classes of this set.
2917  *
2918  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2919  *
2920  * If both setno is MD_COMM_ALL_SETS and class is MD_COMM_ALL_CLASSES we also
2921  * reset any ABORT flag from the global state.
2922  *
2923  * Returns:
2924  *	MDMNE_ACK on sucess (resuming an unlocked set is Ok)
2925  *	MDMNE_EINVAL if setno is out of range
2926  *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2927  */
2928 /* ARGSUSED */
2929 int *
2930 mdmn_comm_resume_svc_1(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2931 {
2932 	int			*retval;
2933 	set_t			startset, endset;
2934 	set_t			setno  = msc->msc_set;
2935 	md_mn_msgclass_t	oclass = msc->msc_class;
2936 	uint_t			flags  = msc->msc_flags;
2937 	md_mn_msgclass_t	class;
2938 
2939 	retval = Malloc(sizeof (int));
2940 
2941 	/* check if the global initialization is done */
2942 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2943 		global_init();
2944 	}
2945 
2946 	/* is this rpc request coming from the local node ? */
2947 	if (check_license(rqstp, 0) == FALSE) {
2948 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2949 		*retval = MDMNE_RPC_FAIL;
2950 		return (retval);
2951 	}
2952 
2953 	commd_debug(MD_MMV_MISC, "resume: called for set=%d class=%d\n",
2954 	    setno, oclass);
2955 
2956 	/* Perform some range checking */
2957 	if (setno > MD_MAXSETS) {
2958 		*retval = MDMNE_EINVAL;
2959 		return (retval);
2960 	}
2961 
2962 	if (setno == MD_COMM_ALL_SETS) {
2963 		startset = 1;
2964 		endset = MD_MAXSETS - 1;
2965 		if (oclass == MD_COMM_ALL_CLASSES) {
2966 			/* This is the point where we "unabort" the commd */
2967 			commd_debug(MD_MMV_MISC, "resume: resetting ABORT\n");
2968 			md_commd_global_state &= ~MD_CGS_ABORTED;
2969 		}
2970 	} else {
2971 		startset = setno;
2972 		endset = setno;
2973 	}
2974 
2975 	for (setno = startset; setno <= endset; setno++) {
2976 
2977 		/* Here we need the mutexes for the set to be setup */
2978 		if ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0) {
2979 			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
2980 		}
2981 
2982 		mutex_lock(&mdmn_busy_mutex[setno]);
2983 
2984 		if (oclass == MD_COMM_ALL_CLASSES) {
2985 			int end_class = 1;
2986 			/*
2987 			 * When SUSPENDing all classes, we go
2988 			 * from 1 to MD_MN_NCLASSES-1
2989 			 * The correct reverse action is RESUMing
2990 			 * from MD_MN_NCLASSES-1 to 1 (or 2)
2991 			 */
2992 
2993 			if (flags & MD_MSCF_DONT_RESUME_CLASS1) {
2994 				end_class = 2;
2995 			}
2996 
2997 			/*
2998 			 * Then mark all classes of this set as no longer
2999 			 * suspended. This supersedes any previous suspend(1)
3000 			 * calls and resumes the set entirely.
3001 			 */
3002 			for (class = MD_MN_NCLASSES - 1; class >= end_class;
3003 			    class --) {
3004 				commd_debug(MD_MMV_MISC,
3005 				    "resume: resuming set=%d class=%d\n",
3006 				    setno, class);
3007 				mdmn_mark_class_resumed(setno, class,
3008 				    (MDMN_SUSPEND_ALL | MDMN_SUSPEND_1));
3009 			}
3010 		} else {
3011 			/*
3012 			 * In this case only one class is marked as not
3013 			 * suspended. If a suspend(all) is currently active for
3014 			 * this set, this class will still be suspended.
3015 			 * That state will be cleared by a suspend(all)
3016 			 * (see above)
3017 			 */
3018 			commd_debug(MD_MMV_MISC,
3019 			    "resume: resuming set=%d class=%d\n",
3020 			    setno, oclass);
3021 			mdmn_mark_class_resumed(setno, oclass, MDMN_SUSPEND_1);
3022 		}
3023 
3024 		mutex_unlock(&mdmn_busy_mutex[setno]);
3025 	}
3026 
3027 	*retval = MDMNE_ACK;
3028 	return (retval);
3029 }
3030 /* ARGSUSED */
3031 int *
3032 mdmn_comm_reinit_set_svc_1(set_t *setnop, struct svc_req *rqstp)
3033 {
3034 	int		*retval;
3035 	md_mnnode_desc	*node;
3036 	set_t		 setno = *setnop;
3037 
3038 	retval = Malloc(sizeof (int));
3039 
3040 	/* check if the global initialization is done */
3041 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3042 		global_init();
3043 	}
3044 
3045 	/* is this rpc request coming from the local node ? */
3046 	if (check_license(rqstp, 0) == FALSE) {
3047 		xdr_free(xdr_set_t, (caddr_t)setnop);
3048 		*retval = MDMNE_RPC_FAIL;
3049 		return (retval);
3050 	}
3051 
3052 	commd_debug(MD_MMV_MISC, "reinit: set=%d\n", setno);
3053 
3054 	rw_rdlock(&set_desc_rwlock[setno]);
3055 	/*
3056 	 * We assume, that all messages have been suspended previously.
3057 	 *
3058 	 * As we are modifying lots of clients here we grab the client_rwlock
3059 	 * in writer mode. This ensures, no new messages come in.
3060 	 */
3061 	rw_wrlock(&client_rwlock[setno]);
3062 	/* This set is no longer initialized */
3063 
3064 	if ((set_descriptor[setno] != NULL) &&
3065 	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
3066 		/* destroy all rpc clients from this set */
3067 		for (node = set_descriptor[setno]->sd_nodelist; node;
3068 		    node = node->nd_next) {
3069 			mdmn_clnt_destroy(client[setno][node->nd_nodeid]);
3070 			if (client[setno][node->nd_nodeid] != (CLIENT *)NULL) {
3071 				client[setno][node->nd_nodeid] = (CLIENT *)NULL;
3072 			}
3073 		}
3074 	md_mn_set_inited[setno] &= ~MDMN_SET_NODES;
3075 	}
3076 
3077 	commd_debug(MD_MMV_MISC, "reinit: done init_set(%d)\n", setno);
3078 
3079 	rw_unlock(&client_rwlock[setno]);
3080 	rw_unlock(&set_desc_rwlock[setno]);
3081 	*retval = MDMNE_ACK;
3082 	return (retval);
3083 }
3084 
3085 /*
3086  * This is just an interface for testing purpose.
3087  * Here we can disable single message types.
3088  * If we block a message type, this is valid for all MN sets.
3089  * If a message arrives later, and  it's message type is blocked, it will
3090  * be returned immediately with MDMNE_CLASS_LOCKED, which causes the sender to
3091  * resend this message over and over again.
3092  */
3093 
3094 /* ARGSUSED */
3095 int *
3096 mdmn_comm_msglock_svc_1(md_mn_type_and_lock_t *mmtl, struct svc_req *rqstp)
3097 {
3098 	int			*retval;
3099 	md_mn_msgtype_t		type = mmtl->mmtl_type;
3100 	uint_t			lock = mmtl->mmtl_lock;
3101 
3102 	retval = Malloc(sizeof (int));
3103 
3104 	/* check if the global initialization is done */
3105 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3106 		global_init();
3107 	}
3108 
3109 	/* is this rpc request coming from the local node ? */
3110 	if (check_license(rqstp, 0) == FALSE) {
3111 		xdr_free(xdr_md_mn_type_and_lock_t, (caddr_t)mmtl);
3112 		*retval = MDMNE_RPC_FAIL;
3113 		return (retval);
3114 	}
3115 
3116 	/* Perform some range checking */
3117 	if ((type == 0) || (type >= MD_MN_NMESSAGES)) {
3118 		*retval = MDMNE_EINVAL;
3119 		return (retval);
3120 	}
3121 
3122 	commd_debug(MD_MMV_MISC, "msglock: type=%d, lock=%d\n", type, lock);
3123 	msgtype_lock_state[type] = lock;
3124 
3125 	*retval = MDMNE_ACK;
3126 	return (retval);
3127 }
3128