xref: /titanic_41/usr/src/cmd/lvm/rpc.mdcommd/mdmn_commd_server.c (revision 2eaee53e5b3d4cd48a35cd651c0a8ae149d772c5)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include <unistd.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <sys/statvfs.h>
33 #include <sys/uadmin.h>
34 #include <fcntl.h>
35 #include <stdio.h>
36 #include <thread.h>
37 #include <meta.h>
38 #include <sdssc.h>
39 #include <mdmn_changelog.h>
40 #include "mdmn_subr.h"
41 
42 /*
43  * This is the communication daemon for SVM Multi Node Disksets.
44  * It runs on every node and provides the following rpc services:
45  *  - mdmn_send_svc_1
46  *  - mdmn_work_svc_1
47  *  - mdmn_wakeup_initiator_svc_1
48  *  - mdmn_wakeup_master_svc_1
49  *  - mdmn_comm_lock_svc_1
50  *  - mdmn_comm_unlock_svc_1
51  *  - mdmn_comm_suspend_svc_1
52  *  - mdmn_comm_resume_svc_1
53  *  - mdmn_comm_reinit_set_svc_1
54  * where send, lock, unlock and reinit are meant for external use,
55  * work and the two wakeups are for internal use only.
56  *
57  * NOTE:
58  * On every node only one of those xxx_1 functions can be active at the
59  * same time because the daemon is single threaded.
60  *
61  *
62  * In case an event occurs that has to be propagated to all the nodes...
63  *
64  * One node (the initiator)
65  *	calls the libmeta function mdmn_send_message()
66  *	This function calls the local daemon thru mdmn_send_svc_1.
67  *
68  * On the initiator:
69  *	mdmn_send_svc_1()
70  *	    - starts a thread -> mdmn_send_to_work() and returns.
71  *	mdmn_send_to_work()
72  *	    - sends this message over to the master of the diskset.
73  *	      This is done by calling mdmn_work_svc_1 on the master.
74  *	    - registers to the initiator_table
75  *	    - exits without doing a svc_sendreply() for the call to
76  *	      mdmn_send_svc_1. This means that call is blocked until somebody
77  *	      (see end of this comment) does a svc_sendreply().
78  *	      This means mdmn_send_message() does not yet return.
79  *	    - A timeout surveillance is started at this point.
80  *	      This means in case the master doesn't reply at all in an
81  *	      aproppriate time, an error condition is returned
82  *	      to the caller.
83  *
84  * On the master:
85  *	mdmn_work_svc_1()
86  *	    - starts a thread -> mdmn_master_process_msg() and returns
87  *	mdmn_master_process_msg()
88  *	    - logs the message to the change log
89  *	    - executes the message locally
90  *	    - flags the message in the change log
91  *	    - sends the message to mdmn_work_svc_1() on all the
92  *	      other nodes (slaves)
93  *	      after each call to mdmn_work_svc_1 the thread goes to sleep and
94  *	      will be woken up by mdmn_wakeup_master_svc_1() as soon as the
95  *	      slave node is done with this message.
96  *	    - In case the slave doesn't respond in a apropriate time, an error
97  *	      is assumed to ensure the master doesn't wait forever.
98  *
99  * On a slave:
100  *	mdmn_work_svc_1()
101  *	    - starts a thread -> mdmn_slave_process_msg() and returns
102  *	mdmn_slave_process_msg()
103  *	    - processes this message locally by calling the appropriate message
104  *	      handler, that creates some result.
105  *	    - sends that result thru a call to mdmn_wakeup_master_svc_1() to
106  *	      the master.
107  *
108  * Back on the master:
109  *	mdmn_wakeup_master_svc_1()
110  *	    - stores the result into the master_table.
111  *	    - signals the mdmn_master_process_msg-thread.
112  *	    - returns
113  *	mdmn_master_process_msg()
114  *	    - after getting the results from all nodes
115  *	    - sends them back to the initiating node thru a call to
116  *	      mdmn_wakeup_initiator_svc_1.
117  *
118  * Back on the initiator:
119  *	mdmn_wakeup_initiator_svc_1()
120  *	    - calls svc_sendreply() which makes the call to mdmn_send_svc_1()
121  *	      return.
122  *	      which allows the initial mdmn_send_message() call to return.
123  */
124 
125 FILE *commdout;		/* debug output for the commd */
126 char *commdoutfile;	/* file name for the above output */
127 /* want at least 10 MB free space when logging into a file */
128 #define	MIN_FS_SPACE	(10LL * 1024 * 1024)
129 
130 /*
131  * Number of outstanding messages that were initiated by this node.
132  * If zero, check_timeouts goes to sleep
133  */
134 uint_t	messages_on_their_way;
135 mutex_t	check_timeout_mutex;	/* need mutex to protect above */
136 cond_t	check_timeout_cv;	/* trigger for check_timeouts */
137 
138 /* for printing out time stamps */
139 hrtime_t __savetime;
140 
141 /* RPC clients for every set and every node and their protecting locks */
142 CLIENT	*client[MD_MAXSETS][NNODES];
143 rwlock_t client_rwlock[MD_MAXSETS];
144 
145 /* the descriptors of all possible sets and their protectors */
146 struct md_set_desc *set_descriptor[MD_MAXSETS];
147 rwlock_t set_desc_rwlock[MD_MAXSETS];
148 
149 /* the daemon to daemon communication has to timeout quickly */
150 static struct timeval FOUR_SECS = { 4, 0 };
151 
152 /* These indicate if a set has already been setup */
153 int md_mn_set_inited[MD_MAXSETS];
154 
155 /* For every set we have a message completion table and protecting mutexes */
156 md_mn_mct_t *mct[MD_MAXSETS];
157 mutex_t	mct_mutex[MD_MAXSETS][MD_MN_NCLASSES];
158 
159 /* Stuff to describe the global status of the commd on one node */
160 #define	MD_CGS_INITED		0x0001
161 #define	MD_CGS_ABORTED		0x0002	/* return everything with MDMNE_ABORT */
162 uint_t md_commd_global_state = 0;	/* No state when starting up */
163 
164 /*
165  * Global verbosity level for the daemon
166  */
167 uint_t md_commd_global_verb;
168 
169 /*
170  * libmeta doesn't like multiple threads in metaget_setdesc().
171  * So we must protect access to it with a global lock
172  */
173 mutex_t get_setdesc_mutex;
174 
175 /*
176  * Need a way to block single message types,
177  * hence an array with a status for every message type
178  */
179 uint_t msgtype_lock_state[MD_MN_NMESSAGES];
180 
181 /* for reading in the config file */
182 #define	MAX_LINE_SIZE 1024
183 
184 extern char *commd_get_outfile(void);
185 extern uint_t commd_get_verbosity(void);
186 
187 /*
188  * mdmn_clnt_create is a helper function for meta_client_create_retry.  It
189  * merely needs to call clnt_create_timed, and meta_client_create_retry
190  * will take care of the rest.
191  */
192 /* ARGSUSED */
193 static CLIENT *
194 mdmn_clnt_create(char *ignore, void *data, struct timeval *time_out)
195 {
196 	md_mnnode_desc	*node = (md_mnnode_desc *)data;
197 
198 	return (clnt_create_timed(node->nd_priv_ic, MDMN_COMMD, ONE, "tcp",
199 		time_out));
200 }
201 
202 #define	FLUSH_DEBUGFILE() \
203 	if (commdout != (FILE *)NULL) { \
204 		fflush(commdout); \
205 		fsync(fileno(commdout)); \
206 	}
207 
208 static void
209 panic_system(int nid, md_mn_msgtype_t type, int master_err, int master_exitval,
210     md_mn_result_t *slave_result)
211 {
212 	md_mn_commd_err_t	commd_err;
213 	md_error_t		mne = mdnullerror;
214 	char			*msg_buf;
215 
216 	msg_buf = (char *)calloc(MAXPATHLEN + 1, sizeof (char));
217 
218 	FLUSH_DEBUGFILE();
219 
220 	if (master_err != MDMNE_ACK) {
221 		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail on master "
222 			"when processing message type %d\n", type);
223 	} else if (slave_result == NULL) {
224 		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail on node "
225 			"%d when processing message type %d\n", nid, type);
226 	} else {
227 		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: Inconsistent "
228 			"return value from node %d when processing message "
229 			"type %d. Master exitval = %d, Slave exitval = %d\n",
230 			nid, type, master_exitval, slave_result->mmr_exitval);
231 	}
232 	commd_err.size = strlen(msg_buf);
233 	commd_err.md_message = (uint64_t)(uintptr_t)&msg_buf[0];
234 
235 	metaioctl(MD_MN_COMMD_ERR, &commd_err, &mne, "rpc.mdcommd");
236 	(void) uadmin(A_DUMP, AD_BOOT, NULL);
237 }
238 
239 static void
240 flush_fcout()
241 {
242 	struct statvfs64 vfsbuf;
243 	long long avail_bytes;
244 	int warned = 0;
245 
246 	for (; ; ) {
247 		sleep(10);
248 		/* No output file, nothing to do */
249 		if (commdout == (FILE *)NULL)
250 			continue;
251 
252 		/*
253 		 * stat the appropriate filesystem to check for available space.
254 		 */
255 		if (statvfs64(commdoutfile, &vfsbuf)) {
256 			continue;
257 		}
258 
259 		avail_bytes = vfsbuf.f_frsize * vfsbuf.f_bavail;
260 		/*
261 		 * If we don't have enough space, we print out a warning.
262 		 * And we drop the verbosity level to NULL
263 		 * In case the condtion doesn't go away, we don't repeat
264 		 * the warning.
265 		 */
266 		if (avail_bytes < MIN_FS_SPACE) {
267 			if (warned) {
268 				continue;
269 			}
270 			commd_debug(MD_MMV_SYSLOG,
271 			    "NOT enough space available for logging\n");
272 			commd_debug(MD_MMV_SYSLOG,
273 			    "Have %lld bytes, need %lld bytes\n",
274 			    avail_bytes, MIN_FS_SPACE);
275 			warned = 1;
276 			md_commd_global_verb = MD_MMV_NULL;
277 		} else {
278 			warned = 0;
279 		}
280 
281 		fflush(commdout);
282 	}
283 }
284 
285 /* safer version of clnt_destroy. If clnt is NULL don't do anything */
286 #define	mdmn_clnt_destroy(clnt) {	\
287 	if (clnt)			\
288 		clnt_destroy(clnt);	\
289 }
290 
291 /*
292  * Own version of svc_sendreply that checks the integrity of the transport
293  * handle and so prevents us from core dumps in the real svc_sendreply()
294  */
295 void
296 mdmn_svc_sendreply(SVCXPRT *transp, xdrproc_t xdr, caddr_t data)
297 {
298 	if (SVC_STAT(transp) == XPRT_DIED) {
299 		commd_debug(MD_MMV_MISC,
300 		    "mdmn_svc_sendreply: XPRT_DIED\n");
301 		return;
302 	}
303 	(void) svc_sendreply(transp, xdr, data);
304 }
305 
306 /*
307  * timeout_initiator(set, class)
308  *
309  * Alas, I sent a message and didn't get a response back in aproppriate time.
310  *
311  * timeout_initiator() takes care for doing the needed svc_sendreply() to the
312  * calling mdmn_send_message, so that guy doesn't wait forever
313  * What is done here is pretty much the same as what is done in
314  * wakeup initiator. The difference is that we cannot provide for any results,
315  * of course and we set the comm_state to MDMNE_TIMEOUT.
316  *
317  * By doing so, mdmn_send_message can decide if a retry would make sense or not.
318  * It's not our's to decide that here.
319  */
320 void
321 timeout_initiator(set_t setno, md_mn_msgclass_t class)
322 {
323 	SVCXPRT		*transp;
324 	md_mn_msgid_t	mid;
325 	md_mn_result_t *resultp;
326 
327 	resultp = Zalloc(sizeof (md_mn_result_t));
328 	resultp->mmr_comm_state	= MDMNE_TIMEOUT;
329 
330 	commd_debug(MD_MMV_MISC,
331 	    "timeout_initiator set = %d, class = %d\n", setno, class);
332 
333 	transp = mdmn_get_initiator_table_transp(setno, class);
334 	mdmn_get_initiator_table_id(setno, class, &mid);
335 
336 	commd_debug(MD_MMV_MISC, "timeout_ini: (%d, 0x%llx-%d)\n",
337 	    MSGID_ELEMS(mid));
338 
339 	/* return to mdmn_send_message() and let it deal with the situation */
340 	mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
341 
342 	free(resultp);
343 	commd_debug(MD_MMV_MISC, "timeout_ini: sendreplied\n");
344 	mdmn_unregister_initiator_table(setno, class);
345 }
346 
347 
348 /*
349  * check_timeouts - thread
350  *
351  * This implements a timeout surveillance for messages sent from the
352  * initiator to the master.
353  *
354  * If a message is started, this thread is triggered thru
355  * cond_signal(&check_timeout_cv) and we keep track of the numbers of
356  * messages that are outstanding (messages_on_their_way).
357  *
358  * As long as there are messages on their way, this thread never goes to sleep.
359  * It'll keep checking all class/set combinations for outstanding messages.
360  * If one is found, it's checked if this message is overdue. In that case,
361  * timeout_initiator() is called to wakeup the calling mdmn_send_message and
362  * to clean up the mess.
363  *
364  * If the result from the master arrives later, this message is considered
365  * to be unsolicited. And will be ignored.
366  */
367 
368 void
369 check_timeouts()
370 {
371 	set_t			setno;
372 	time_t			now, then;
373 	mutex_t			*mx;
374 	md_mn_msgclass_t	class;
375 
376 	for (; ; ) {
377 		now = time((time_t *)NULL);
378 		for (setno = 1; setno < MD_MAXSETS; setno++) {
379 			if (md_mn_set_inited[setno] != MDMN_SET_READY) {
380 				continue;
381 			}
382 			for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES;
383 			    class++) {
384 				mx = mdmn_get_initiator_table_mx(setno, class);
385 				mutex_lock(mx);
386 
387 				/* then is the registered time */
388 				then =
389 				    mdmn_get_initiator_table_time(setno, class);
390 				if ((then != 0) && (now > then)) {
391 					timeout_initiator(setno, class);
392 				}
393 				mutex_unlock(mx);
394 			}
395 		}
396 		/* it's ok to check only once per second */
397 		sleep(1);
398 
399 		/* is there work to do? */
400 		mutex_lock(&check_timeout_mutex);
401 		if (messages_on_their_way == 0) {
402 			cond_wait(&check_timeout_cv, &check_timeout_mutex);
403 		}
404 		mutex_unlock(&check_timeout_mutex);
405 	}
406 }
407 
408 void
409 setup_debug(void)
410 {
411 	char	*tmp_dir;
412 
413 	/* Read in the debug-controlling tokens from runtime.cf */
414 	md_commd_global_verb = commd_get_verbosity();
415 	/*
416 	 * If the user didn't specify a verbosity level in runtime.cf
417 	 * we can safely return here. As we don't intend to printout
418 	 * debug messages, we don't need to check for the output file.
419 	 */
420 	if (md_commd_global_verb == 0) {
421 		return;
422 	}
423 
424 	/* if commdout is non-NULL it is an open FILE, we'd better close it */
425 	if (commdout != (FILE *)NULL) {
426 		fclose(commdout);
427 	}
428 
429 	commdoutfile = commd_get_outfile();
430 
431 	/* setup the debug output */
432 	if (commdoutfile == (char *)NULL) {
433 		/* if no valid file was specified, use the default */
434 		commdoutfile = "/var/run/commd.out";
435 		commdout = fopen(commdoutfile, "a");
436 	} else {
437 		/* check if the directory exists and is writable */
438 		tmp_dir = strdup(commdoutfile);
439 		if ((access(dirname(tmp_dir), X_OK|W_OK)) ||
440 		    ((commdout = fopen(commdoutfile, "a")) == (FILE *)NULL)) {
441 			syslog(LOG_ERR,
442 			    "Can't write to specified output file %s,\n"
443 			    "using /var/run/commd.out instead\n", commdoutfile);
444 			free(commdoutfile);
445 			commdoutfile = "/var/run/commd.out";
446 			commdout = fopen(commdoutfile, "a");
447 		}
448 		free(tmp_dir);
449 	}
450 
451 	if (commdout == (FILE *)NULL) {
452 		syslog(LOG_ERR, "Can't write to debug output file %s\n",
453 		    commdoutfile);
454 	}
455 }
456 
457 /*
458  * mdmn_is_node_dead checks to see if a node is dead using
459  * the SunCluster infrastructure which is a stable interface.
460  * If unable to contact SunCuster the node is assumed to be alive.
461  * Return values:
462  *	1 - node is dead
463  *	0 - node is alive
464  */
465 int
466 mdmn_is_node_dead(md_mnnode_desc *node)
467 {
468 	char	*fmt = "/usr/cluster/bin/scha_cluster_get -O NODESTATE_NODE ";
469 	char	*cmd;
470 	size_t	size;
471 	char	buf[10];
472 	FILE	*ptr;
473 	int	retval = 0;
474 
475 	/* I know that I'm alive */
476 	if (strcmp(node->nd_nodename, mynode()) == 0)
477 		return (retval);
478 
479 	size = strlen(fmt) + strlen(node->nd_nodename) + 1;
480 	cmd = Zalloc(size);
481 	(void) strlcat(cmd, fmt, size);
482 	(void) strlcat(cmd, node->nd_nodename, size);
483 
484 	if ((ptr = popen(cmd, "r")) != NULL) {
485 		if (fgets(buf, sizeof (buf), ptr) != NULL) {
486 			/* If scha_cluster_get returned DOWN - return dead */
487 			if (strncmp(buf, "DOWN", 4) == 0)
488 				retval = 1;
489 		}
490 		(void) pclose(ptr);
491 	}
492 	Free(cmd);
493 	return (retval);
494 }
495 
496 /*
497  * global_init()
498  *
499  * Perform some global initializations.
500  *
501  * the following routines have to call this before operation can start:
502  *  - mdmn_send_svc_1
503  *  - mdmn_work_svc_1
504  *  - mdmn_comm_lock_svc_1
505  *  - mdmn_comm_unlock_svc_1
506  *  - mdmn_comm_suspend_svc_1
507  *  - mdmn_comm_resume_svc_1
508  *  - mdmn_comm_reinit_set_svc_1
509  *
510  * This is a single threaded daemon, so it can only be in one of the above
511  * routines at the same time.
512  * This means, global_init() cannot be called more than once at the same time.
513  * Hence, no lock is needed.
514  */
515 void
516 global_init(void)
517 {
518 	set_t			set;
519 	md_mn_msgclass_t	class;
520 	struct sigaction	sighandler;
521 	time_t			clock_val;
522 
523 	/* Do these global initializations only once */
524 	if (md_commd_global_state & MD_CGS_INITED) {
525 		return;
526 	}
527 	(void) sdssc_bind_library();
528 
529 	/* setup the debug options from the config file */
530 	setup_debug();
531 
532 	/* Make setup_debug() be the action in case of SIGHUP */
533 	sighandler.sa_flags = 0;
534 	sigfillset(&sighandler.sa_mask);
535 	sighandler.sa_handler = (void (*)(int)) setup_debug;
536 	sigaction(SIGHUP, &sighandler, NULL);
537 
538 	__savetime = gethrtime();
539 	(void) time(&clock_val);
540 	commd_debug(MD_MMV_MISC, "global init called %s\n",
541 			ctime(&clock_val));
542 
543 	/* start a thread that flushes out the debug on a regular basis */
544 	thr_create(NULL, 0, (void *(*)(void *))flush_fcout,
545 	    (void *) NULL, THR_DETACHED, NULL);
546 
547 	/* global rwlock's / mutex's / cond_t's go here */
548 	mutex_init(&check_timeout_mutex, USYNC_THREAD, NULL);
549 	cond_init(&check_timeout_cv, USYNC_THREAD, NULL);
550 	mutex_init(&get_setdesc_mutex, USYNC_THREAD, NULL);
551 
552 	/* Make sure the initiator table is initialized correctly */
553 	for (set = 0; set < MD_MAXSETS; set++) {
554 		for (class = 0; class < MD_MN_NCLASSES; class++) {
555 			mdmn_unregister_initiator_table(set, class);
556 		}
557 	}
558 
559 
560 	/* setup the check for timeouts */
561 	thr_create(NULL, 0, (void *(*)(void *))check_timeouts,
562 	    (void *) NULL, THR_DETACHED, NULL);
563 
564 	md_commd_global_state |= MD_CGS_INITED;
565 }
566 
567 
568 /*
569  * mdmn_init_client(setno, nodeid)
570  * called if client[setno][nodeid] is NULL
571  *
572  * NOTE: Must be called with set_desc_rwlock held as a reader
573  * NOTE: Must be called with client_rwlock held as a writer
574  *
575  * If the rpc client for this node has not been setup for any set, we do it now.
576  *
577  * Returns	0 on success (node found in set, rpc client setup)
578  *		-1 if metaget_setdesc failed,
579  *		-2 if node not part of set
580  *		-3 if clnt_create fails
581  */
582 static int
583 mdmn_init_client(set_t setno, md_mn_nodeid_t nid)
584 {
585 	md_error_t	ep = mdnullerror;
586 	md_mnnode_desc	*node;
587 	md_set_desc	*sd;	/* just an abbr for set_descriptor[setno] */
588 
589 	sd = set_descriptor[setno];
590 
591 	/*
592 	 * Is the appropriate set_descriptor already initialized ?
593 	 * Can't think of a scenario where this is not the case, but we'd better
594 	 * check for it anyway.
595 	 */
596 	if (sd == NULL) {
597 		mdsetname_t	*sp;
598 
599 		rw_unlock(&set_desc_rwlock[setno]); /* readlock -> writelock */
600 		rw_wrlock(&set_desc_rwlock[setno]);
601 		sp = metasetnosetname(setno, &ep);
602 		/* Only one thread is supposed to be in metaget_setdesc() */
603 		mutex_lock(&get_setdesc_mutex);
604 		sd = metaget_setdesc(sp, &ep);
605 		mutex_unlock(&get_setdesc_mutex);
606 		if (sd == NULL) {
607 			rw_unlock(&set_desc_rwlock[setno]); /* back to ... */
608 			rw_rdlock(&set_desc_rwlock[setno]); /* ... readlock */
609 			return (-1);
610 		}
611 		set_descriptor[setno] = sd;
612 		rw_unlock(&set_desc_rwlock[setno]); /* back to readlock */
613 		rw_rdlock(&set_desc_rwlock[setno]);
614 	}
615 
616 	/* first we have to find the node name for this node id */
617 	for (node = sd->sd_nodelist; node; node = node->nd_next) {
618 		if (node->nd_nodeid == nid)
619 			break; /* we found our node in this set */
620 	}
621 
622 
623 	if (node == (md_mnnode_desc *)NULL) {
624 		commd_debug(MD_MMV_SYSLOG,
625 		    "FATAL: node %d not found in set %d\n", nid, setno);
626 		rw_unlock(&set_desc_rwlock[setno]);
627 		return (-2);
628 	}
629 
630 	commd_debug(MD_MMV_INIT, "init: %s has the flags: 0x%x\n",
631 	    node->nd_nodename ? node->nd_nodename : "NULL", node->nd_flags);
632 
633 	/* Did this node join the diskset?  */
634 	if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
635 		commd_debug(MD_MMV_INIT, "init: %s didn't join set %d\n",
636 		    node->nd_nodename ? node->nd_nodename : "NULL", setno);
637 		rw_unlock(&set_desc_rwlock[setno]);
638 		return (-2);
639 	}
640 
641 	/* if clnt_create has not been done for that node, do it now */
642 	if (client[setno][nid] == (CLIENT *) NULL) {
643 		time_t	tout = 0;
644 
645 		/*
646 		 * While trying to create a connection to a node,
647 		 * periodically check to see if the node has been marked
648 		 * dead by the SunCluster infrastructure.
649 		 * This periodic check is needed since a non-responsive
650 		 * rpc.mdcommd (while it is attempting to create a connection
651 		 * to a dead node) can lead to large delays and/or failures
652 		 * in the reconfig steps.
653 		 */
654 		while ((client[setno][nid] == (CLIENT *) NULL) &&
655 		    (tout < MD_CLNT_CREATE_TOUT)) {
656 			client[setno][nid] = meta_client_create_retry
657 				(node->nd_nodename, mdmn_clnt_create,
658 				(void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
659 			/* Is the node dead? */
660 			if (mdmn_is_node_dead(node) == 1) {
661 				commd_debug(MD_MMV_SYSLOG,
662 				    "rpc.mdcommd: no client for dead node %s\n",
663 				    node->nd_nodename);
664 				break;
665 			} else
666 				tout += MD_CLNT_CREATE_SUBTIMEOUT;
667 		}
668 
669 		if (client[setno][nid] == (CLIENT *) NULL) {
670 			clnt_pcreateerror(node->nd_nodename);
671 			rw_unlock(&set_desc_rwlock[setno]);
672 			return (-3);
673 		}
674 		/* this node has the license to send */
675 		commd_debug(MD_MMV_MISC, "init_client: calling add_lic\n");
676 		add_license(node);
677 
678 		/* set the timeout value */
679 		clnt_control(client[setno][nid], CLSET_TIMEOUT,
680 		    (char *)&FOUR_SECS);
681 
682 	}
683 	rw_unlock(&set_desc_rwlock[setno]);
684 	return (0);
685 }
686 
687 /*
688  * check_client(setno, nodeid)
689  *
690  * must be called with reader lock held for set_desc_rwlock[setno]
691  * and must be called with reader lock held for client_rwlock[setno]
692  * Checks if the client for this set/node combination is already setup
693  * if not it upgrades the lock to a writer lock
694  * and tries to initialize the client.
695  * Finally it's checked if the client nulled out again due to some race
696  *
697  * returns 0 if there is a usable client
698  * returns MDMNE_RPC_FAIL otherwise
699  */
700 static int
701 check_client(set_t setno, md_mn_nodeid_t nodeid)
702 {
703 	int ret = 0;
704 
705 	while ((client[setno][nodeid] == (CLIENT *)NULL) && (ret == 0)) {
706 		rw_unlock(&client_rwlock[setno]); /* upgrade reader ... */
707 		rw_wrlock(&client_rwlock[setno]); /* ... to writer lock. */
708 		if (mdmn_init_client(setno, nodeid) != 0) {
709 			ret = MDMNE_RPC_FAIL;
710 		}
711 		rw_unlock(&client_rwlock[setno]); /* downgrade writer ... */
712 		rw_rdlock(&client_rwlock[setno]); /* ... back to reader lock. */
713 	}
714 	return (ret);
715 }
716 
717 /*
718  * mdmn_init_set(setno, todo)
719  * setno is the number of the set to be initialized.
720  * todo is one of the MDMN_SET_* thingies or MDMN_SET_READY
721  * If called with MDMN_SET_READY everything is initialized.
722  *
723  * If the set mutexes are already initialized, the caller has to hold
724  * both set_desc_rwlock[setno] and client_rwlock[setno] as a writer, before
725  * calling mdmn_init_set()
726  */
727 int
728 mdmn_init_set(set_t setno, int todo)
729 {
730 	int class;
731 	md_mnnode_desc	*node;
732 	md_set_desc	*sd; /* just an abbr for set_descriptor[setno] */
733 	mdsetname_t	*sp;
734 	md_error_t	ep = mdnullerror;
735 	md_mn_nodeid_t	nid;
736 
737 	/*
738 	 * Check if we are told to setup the mutexes and
739 	 * if these are not yet setup
740 	 */
741 	if ((todo & MDMN_SET_MUTEXES) &&
742 	    ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0)) {
743 		mutex_init(&mdmn_busy_mutex[setno], USYNC_THREAD, NULL);
744 		cond_init(&mdmn_busy_cv[setno], USYNC_THREAD, NULL);
745 		rwlock_init(&client_rwlock[setno], USYNC_THREAD, NULL);
746 		rwlock_init(&set_desc_rwlock[setno], USYNC_THREAD, NULL);
747 
748 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
749 			mutex_init(mdmn_get_master_table_mx(setno, class),
750 			    USYNC_THREAD, NULL);
751 			cond_init(mdmn_get_master_table_cv(setno, class),
752 			    USYNC_THREAD, NULL);
753 			mutex_init(mdmn_get_initiator_table_mx(setno, class),
754 			    USYNC_THREAD, NULL);
755 		}
756 		md_mn_set_inited[setno] |= MDMN_SET_MUTEXES;
757 	}
758 	if ((todo & MDMN_SET_MCT) &&
759 	    ((md_mn_set_inited[setno] & MDMN_SET_MCT) == 0)) {
760 		int	fd;
761 		size_t	filesize;
762 		caddr_t	addr;
763 		char table_name[32];
764 
765 		filesize = (sizeof (md_mn_mct_t));
766 		(void) snprintf(table_name, sizeof (table_name), "%s%d",
767 		    MD_MN_MSG_COMP_TABLE, setno);
768 		/*
769 		 * If the mct file exists we map it into memory.
770 		 * Otherwise we create an empty file of appropriate
771 		 * size and map that into memory.
772 		 * The mapped areas are stored in mct[setno].
773 		 */
774 		fd = open(table_name, O_RDWR|O_CREAT|O_DSYNC, 0600);
775 		if (fd < 0) {
776 			commd_debug(MD_MMV_MISC,
777 			    "init_set: Can't open MCT\n");
778 			return (-1);
779 		}
780 		/*
781 		 * To ensure that the file has the appropriate size,
782 		 * we write a byte at the end of the file.
783 		 */
784 		lseek(fd, filesize + 1, SEEK_SET);
785 		write(fd, "\0", 1);
786 
787 		/* at this point we have a file in place that we can mmap */
788 		addr = mmap(0, filesize, PROT_READ | PROT_WRITE,
789 		    MAP_SHARED, fd, (off_t)0);
790 		if (addr == MAP_FAILED) {
791 			commd_debug(MD_MMV_INIT,
792 			    "init_set: mmap mct error %d\n",
793 			    errno);
794 			return (-1);
795 		}
796 		/* LINTED pointer alignment */
797 		mct[setno] = (md_mn_mct_t *)addr;
798 
799 		/* finally we initialize the mutexes that protect the mct */
800 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
801 			mutex_init(&(mct_mutex[setno][class]),
802 			    USYNC_THREAD, NULL);
803 		}
804 
805 		md_mn_set_inited[setno] |= MDMN_SET_MCT;
806 	}
807 	/*
808 	 * Check if we are told to setup the nodes and
809 	 * if these are not yet setup
810 	 * (Attention: negative logic here compared to above!)
811 	 */
812 	if (((todo & MDMN_SET_NODES) == 0) ||
813 	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
814 		return (0); /* success */
815 	}
816 
817 	if ((sp = metasetnosetname(setno, &ep)) == NULL) {
818 		commd_debug(MD_MMV_SYSLOG,
819 		    "metasetnosetname(%d) returned NULL\n", setno);
820 		return (MDMNE_NOT_JOINED);
821 	}
822 
823 	/* flush local copy of rpc.metad data */
824 	metaflushsetname(sp);
825 
826 	mutex_lock(&get_setdesc_mutex);
827 	sd = metaget_setdesc(sp, &ep);
828 	mutex_unlock(&get_setdesc_mutex);
829 
830 	if (sd == NULL) {
831 		commd_debug(MD_MMV_SYSLOG,
832 		    "metaget_setdesc(%d) returned NULL\n", setno);
833 		return (MDMNE_NOT_JOINED);
834 	}
835 
836 	/*
837 	 * if this set is not a multinode set or
838 	 * this node didn't join yet the diskset, better don't do anything
839 	 */
840 	if ((MD_MNSET_DESC(sd) == 0) ||
841 	    (sd->sd_mn_mynode->nd_flags & MD_MN_NODE_OWN) == 0) {
842 		commd_debug(MD_MMV_INIT, "didn't yet join set %d\n", setno);
843 		return (MDMNE_NOT_JOINED);
844 	}
845 
846 	for (node = sd->sd_nodelist; node != NULL; node = node->nd_next) {
847 		time_t	tout = 0;
848 		nid = node->nd_nodeid;
849 
850 		commd_debug(MD_MMV_INIT,
851 		    "setting up: node=%s, priv_ic=%s, flags=0x%x\n",
852 		    node->nd_nodename ? node->nd_nodename : "NULL",
853 		    node->nd_priv_ic ? node->nd_priv_ic : "NULL",
854 		    node->nd_flags);
855 
856 		if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
857 			commd_debug(MD_MMV_INIT,
858 			    "init: %s didn't join set %d\n",
859 			    node->nd_nodename ? node->nd_nodename : "NULL",
860 			    setno);
861 			continue;
862 		}
863 
864 		if (client[setno][nid] != (CLIENT *) NULL) {
865 			/* already inited */
866 			commd_debug(MD_MMV_INIT, "init: already: node=%s\n",
867 			    node->nd_nodename ? node->nd_nodename : "NULL");
868 			continue;
869 		}
870 
871 		/*
872 		 * While trying to create a connection to a node,
873 		 * periodically check to see if the node has been marked
874 		 * dead by the SunCluster infrastructure.
875 		 * This periodic check is needed since a non-responsive
876 		 * rpc.mdcommd (while it is attempting to create a connection
877 		 * to a dead node) can lead to large delays and/or failures
878 		 * in the reconfig steps.
879 		 */
880 		while ((client[setno][nid] == (CLIENT *) NULL) &&
881 		    (tout < MD_CLNT_CREATE_TOUT)) {
882 			client[setno][nid] = meta_client_create_retry
883 				(node->nd_nodename, mdmn_clnt_create,
884 				(void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
885 			/* Is the node dead? */
886 			if (mdmn_is_node_dead(node) == 1) {
887 				commd_debug(MD_MMV_SYSLOG,
888 				    "rpc.mdcommd: no client for dead node %s\n",
889 				    node->nd_nodename);
890 				break;
891 			} else
892 				tout += MD_CLNT_CREATE_SUBTIMEOUT;
893 		}
894 
895 		if (client[setno][nid] == (CLIENT *) NULL) {
896 			clnt_pcreateerror(node->nd_nodename);
897 			/*
898 			 * If we cannot connect to a single node
899 			 * (maybe because it is down) we mark this node as not
900 			 * owned and continue with the next node in the list.
901 			 * This is better than failing the entire starting up
902 			 * of the commd system.
903 			 */
904 			node->nd_flags &= ~MD_MN_NODE_OWN;
905 			commd_debug(MD_MMV_SYSLOG,
906 			    "WARNING couldn't create client for %s\n"
907 			    "Reconfig cycle required\n",
908 			    node->nd_nodename);
909 			commd_debug(MD_MMV_INIT,
910 			    "WARNING couldn't create client for %s\n"
911 			    "Reconfig cycle required\n",
912 			    node->nd_nodename);
913 			continue;
914 		}
915 		/* this node has the license to send */
916 		commd_debug(MD_MMV_MISC, "init_set: calling add_lic\n");
917 		add_license(node);
918 
919 		/* set the timeout value */
920 		clnt_control(client[setno][nid], CLSET_TIMEOUT,
921 		    (char *)&FOUR_SECS);
922 
923 		commd_debug(MD_MMV_INIT, "init: done: node=%s\n",
924 		    node->nd_nodename ? node->nd_nodename : "NULL");
925 	}
926 
927 	set_descriptor[setno] = sd;
928 	md_mn_set_inited[setno] |= MDMN_SET_NODES;
929 	return (0); /* success */
930 }
931 
932 void *
933 mdmn_send_to_work(void *arg)
934 {
935 	int			*rpc_err;
936 	int			success;
937 	int			try_master;
938 	set_t			setno;
939 	mutex_t			*mx;	/* protection for initiator_table */
940 	SVCXPRT			*transp;
941 	md_mn_msg_t		*msg;
942 	md_mn_nodeid_t		set_master;
943 	md_mn_msgclass_t	class;
944 	md_mn_msg_and_transp_t	*matp = (md_mn_msg_and_transp_t *)arg;
945 
946 	msg			= matp->mat_msg;
947 	transp			= matp->mat_transp;
948 
949 	/* the alloc was done in mdmn_send_svc_1 */
950 	free(matp);
951 
952 	class = mdmn_get_message_class(msg->msg_type);
953 	setno = msg->msg_setno;
954 
955 	/* set the sender, so the master knows who to send the results */
956 	rw_rdlock(&set_desc_rwlock[setno]);
957 	msg->msg_sender = set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
958 	set_master	= set_descriptor[setno]->sd_mn_master_nodeid;
959 
960 	mx = mdmn_get_initiator_table_mx(setno, class);
961 	mutex_lock(mx);
962 
963 	/*
964 	 * Here we check, if the initiator table slot for this set/class
965 	 * combination is free to use.
966 	 * If this is not the case, we return CLASS_BUSY forcing the
967 	 * initiating send_message call to retry
968 	 */
969 	success = mdmn_check_initiator_table(setno, class);
970 	if (success == MDMNE_CLASS_BUSY) {
971 		md_mn_msgid_t		active_mid;
972 
973 		mdmn_get_initiator_table_id(setno, class,
974 		&active_mid);
975 
976 		commd_debug(MD_MMV_SEND,
977 		    "send_to_work: received but locally busy "
978 		    "(%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
979 		    "active msg=(%d, 0x%llx-%d)\n",
980 		    MSGID_ELEMS(msg->msg_msgid), setno, class,
981 		    msg->msg_type, MSGID_ELEMS(active_mid));
982 	} else {
983 		commd_debug(MD_MMV_SEND,
984 		    "send_to_work: received (%d, 0x%llx-%d), "
985 		    "set=%d, class=%d, type=%d\n",
986 		    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
987 	}
988 
989 	try_master = 2; /* return failure after two retries */
990 	while ((success == MDMNE_ACK) && (try_master--)) {
991 		rw_rdlock(&client_rwlock[setno]);
992 		/* is the rpc client to the master still around ? */
993 		if (check_client(setno, set_master)) {
994 			success = MDMNE_RPC_FAIL;
995 			FLUSH_DEBUGFILE();
996 			rw_unlock(&client_rwlock[setno]);
997 			break; /* out of try_master-loop */
998 		}
999 
1000 		/*
1001 		 * Send the request to the work function on the master
1002 		 * this call will return immediately
1003 		 */
1004 		rpc_err = mdmn_work_1(msg, client[setno][set_master]);
1005 
1006 		/* Everything's Ok? */
1007 		if (rpc_err == NULL) {
1008 			success = MDMNE_RPC_FAIL;
1009 			/*
1010 			 * Probably something happened to the daemon on the
1011 			 * master. Kill the client, and try again...
1012 			 */
1013 			rw_unlock(&client_rwlock[setno]);
1014 			rw_wrlock(&client_rwlock[setno]);
1015 			mdmn_clnt_destroy(client[setno][set_master]);
1016 			if (client[setno][set_master] != (CLIENT *)NULL) {
1017 				client[setno][set_master] = (CLIENT *)NULL;
1018 			}
1019 			rw_unlock(&client_rwlock[setno]);
1020 			continue;
1021 
1022 		} else  if (*rpc_err != MDMNE_ACK) {
1023 			/* something went wrong, break out */
1024 			success = *rpc_err;
1025 			free(rpc_err);
1026 			rw_unlock(&client_rwlock[setno]);
1027 			break; /* out of try_master-loop */
1028 		}
1029 
1030 		rw_unlock(&client_rwlock[setno]);
1031 		free(rpc_err);
1032 
1033 		/*
1034 		 * If we are here, we sucessfully delivered the message.
1035 		 * We register the initiator_table, so that
1036 		 * wakeup_initiator_1  can do the sendreply with the
1037 		 * results for us.
1038 		 */
1039 		success = MDMNE_ACK;
1040 		mdmn_register_initiator_table(setno, class, msg, transp);
1041 
1042 		/* tell check_timeouts, there's work to do */
1043 		mutex_lock(&check_timeout_mutex);
1044 		messages_on_their_way++;
1045 		cond_signal(&check_timeout_cv);
1046 		mutex_unlock(&check_timeout_mutex);
1047 		break; /* out of try_master-loop */
1048 	}
1049 
1050 	rw_unlock(&set_desc_rwlock[setno]);
1051 
1052 	if (success == MDMNE_ACK) {
1053 		commd_debug(MD_MMV_SEND,
1054 		    "send_to_work: registered (%d, 0x%llx-%d)\n",
1055 		    MSGID_ELEMS(msg->msg_msgid));
1056 	} else {
1057 		/* In case of failure do the sendreply now */
1058 		md_mn_result_t *resultp;
1059 		resultp = Zalloc(sizeof (md_mn_result_t));
1060 		resultp->mmr_comm_state = success;
1061 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
1062 		commd_debug(MD_MMV_SEND,
1063 		    "send_to_work: not registered (%d, 0x%llx-%d) cs=%d\n",
1064 		    MSGID_ELEMS(msg->msg_msgid), success);
1065 		free_result(resultp);
1066 
1067 	}
1068 
1069 	free_msg(msg);
1070 	mutex_unlock(mx);
1071 	return (NULL);
1072 
1073 }
1074 
1075 /*
1076  * do_message_locally(msg, result)
1077  * Process a message locally on the master
1078  * Lookup the MCT if the message has already been processed.
1079  * If not, call the handler and store the result
1080  * If yes, retrieve the result from the MCT.
1081  * Return:
1082  *	MDMNE_ACK in case of success
1083  *	MDMNE_LOG_FAIL if the MCT could not be checked
1084  */
1085 static int
1086 do_message_locally(md_mn_msg_t *msg, md_mn_result_t *result)
1087 {
1088 	int			completed;
1089 	set_t			setno;
1090 	md_mn_msgtype_t		msgtype = msg->msg_type;
1091 	md_mn_msgclass_t	class;
1092 
1093 	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1094 
1095 	handler = mdmn_get_handler(msgtype);
1096 	if (handler == NULL) {
1097 		result->mmr_exitval = 0;
1098 		/* let the sender decide if this is an error or not */
1099 		result->mmr_comm_state = MDMNE_NO_HANDLER;
1100 		return (MDMNE_NO_HANDLER);
1101 	}
1102 
1103 	class = mdmn_get_message_class(msg->msg_type);
1104 	setno = msg->msg_setno;
1105 
1106 	result->mmr_msgtype	= msgtype;
1107 	result->mmr_flags	= msg->msg_flags;
1108 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1109 
1110 	mutex_lock(&mct_mutex[setno][class]);
1111 	completed = mdmn_check_completion(msg, result);
1112 	if (completed == MDMN_MCT_NOT_DONE) {
1113 		/* message not yet processed locally */
1114 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1115 		    "calling handler for (%d,0x%llx-%d) type %d\n",
1116 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1117 
1118 		/*
1119 		 * Mark the message as being currently processed,
1120 		 * so we won't start a second handler for it
1121 		 */
1122 		(void) mdmn_mark_completion(msg, NULL, MDMN_MCT_IN_PROGRESS);
1123 		mutex_unlock(&mct_mutex[setno][class]);
1124 
1125 		/* here we actually process the message on the master */
1126 		(*handler)(msg, MD_MSGF_ON_MASTER, result);
1127 
1128 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1129 		    "finished handler for (%d,0x%llx-%d) type %d\n",
1130 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1131 
1132 		/* Mark the message as fully processed, store the result */
1133 		mutex_lock(&mct_mutex[setno][class]);
1134 		(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
1135 	} else if (completed == MDMN_MCT_DONE) {
1136 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1137 		    "result for (%d, 0x%llx-%d) from MCT\n",
1138 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1139 	} else if (completed == MDMN_MCT_IN_PROGRESS) {
1140 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1141 		    "(%d, 0x%llx-%d) is currently being processed\n",
1142 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1143 	} else {
1144 		/* MCT error occurred (should never happen) */
1145 		mutex_unlock(&mct_mutex[setno][class]);
1146 		result->mmr_comm_state = MDMNE_LOG_FAIL;
1147 		commd_debug(MD_MMV_SYSLOG, "WARNING "
1148 		    "mdmn_check_completion returned %d "
1149 		    "for (%d,0x%llx-%d)\n", completed,
1150 		    MSGID_ELEMS(msg->msg_msgid));
1151 		return (MDMNE_LOG_FAIL);
1152 	}
1153 	mutex_unlock(&mct_mutex[setno][class]);
1154 	return (MDMNE_ACK);
1155 
1156 }
1157 
1158 /*
1159  * do_send_message(msg, node)
1160  *
1161  * Send a message to a given node and wait for a acknowledgment, that the
1162  * message has arrived on the remote node.
1163  * Make sure that the client for the set is setup correctly.
1164  * If no ACK arrives, destroy and recreate the RPC client and retry the
1165  * message one time
1166  * After actually sending wait no longer than the appropriate number of
1167  * before timing out the message.
1168  *
1169  * Note must be called with set_desc_wrlock held in reader mode
1170  */
1171 static int
1172 do_send_message(md_mn_msg_t *msg, md_mnnode_desc *node)
1173 {
1174 	int			err;
1175 	int			rpc_retries;
1176 	int			timeout_retries = 0;
1177 	int			*ret = NULL;
1178 	set_t			setno;
1179 	cond_t			*cv;	/* see mdmn_wakeup_master_svc_1 */
1180 	mutex_t			*mx;	/* protection for class_busy */
1181 	timestruc_t		timeout; /* surveillance for remote daemon */
1182 	md_mn_nodeid_t		nid;
1183 	md_mn_msgtype_t		msgtype;
1184 	md_mn_msgclass_t	class;
1185 
1186 	nid	= node->nd_nodeid;
1187 	msgtype = msg->msg_type;
1188 	setno	= msg->msg_setno;
1189 	class	= mdmn_get_message_class(msgtype);
1190 	mx	= mdmn_get_master_table_mx(setno, class);
1191 	cv	= mdmn_get_master_table_cv(setno, class);
1192 
1193 retry_rpc:
1194 
1195 	/* We try two times to send the message */
1196 	rpc_retries = 2;
1197 
1198 	/*
1199 	 * if sending the message doesn't succeed the first time due to a
1200 	 * RPC problem, we retry one time
1201 	 */
1202 	while ((rpc_retries != 0) && (ret == NULL)) {
1203 		/*  in abort state, we error out immediately */
1204 		if (md_commd_global_state & MD_CGS_ABORTED) {
1205 			return (MDMNE_ABORT);
1206 		}
1207 
1208 		rw_rdlock(&client_rwlock[setno]);
1209 		/* unable to create client? Ignore it */
1210 		if (check_client(setno, nid)) {
1211 			/*
1212 			 * In case we cannot establish an RPC client, we
1213 			 * take this node out of our considerations.
1214 			 * This will be reset by a reconfig
1215 			 * cycle that should come pretty soon.
1216 			 * MNISSUE: Should a reconfig cycle
1217 			 * be forced on SunCluster?
1218 			 */
1219 			node->nd_flags &= ~MD_MN_NODE_OWN;
1220 			commd_debug(MD_MMV_SYSLOG,
1221 			    "WARNING couldn't create client for %s\n"
1222 			    "Reconfig cycle required\n",
1223 			    node->nd_nodename);
1224 			commd_debug(MD_MMV_PROC_M, "proc_mas: (%d,0x%llx-%d) "
1225 			    "WARNING couldn't create client for %s\n",
1226 			    MSGID_ELEMS(msg->msg_msgid), node->nd_nodename);
1227 			rw_unlock(&client_rwlock[setno]);
1228 			return (MDMNE_IGNORE_NODE);
1229 		}
1230 		/* let's be paranoid and check again before sending */
1231 		if (client[setno][nid] == NULL) {
1232 			/*
1233 			 * if this is true, strange enough, we catch our breath,
1234 			 * and then continue, so that the client is set up
1235 			 * once again.
1236 			 */
1237 			commd_debug(MD_MMV_PROC_M, "client is NULL\n");
1238 			rw_unlock(&client_rwlock[setno]);
1239 			sleep(1);
1240 			continue;
1241 		}
1242 
1243 		/* send it over, it will return immediately */
1244 		ret = mdmn_work_1(msg, client[setno][nid]);
1245 
1246 		rw_unlock(&client_rwlock[setno]);
1247 
1248 		if (ret != NULL) {
1249 			commd_debug(MD_MMV_PROC_M,
1250 			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1251 			    " 0x%x\n",
1252 			    MSGID_ELEMS(msg->msg_msgid), nid, *ret);
1253 		} else {
1254 			commd_debug(MD_MMV_PROC_M,
1255 			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1256 			    " NULL \n",
1257 			    MSGID_ELEMS(msg->msg_msgid), nid);
1258 		}
1259 
1260 		if ((ret == NULL) || (*ret == MDMNE_CANNOT_CONNECT) ||
1261 		    (*ret == MDMNE_THR_CREATE_FAIL)) {
1262 			/*
1263 			 * Something happened to the daemon on the other side.
1264 			 * Kill the client, and try again.
1265 			 * check_client() will create a new client
1266 			 */
1267 			rw_wrlock(&client_rwlock[setno]);
1268 			mdmn_clnt_destroy(client[setno][nid]);
1269 			if (client[setno][nid] != (CLIENT *)NULL) {
1270 				client[setno][nid] = (CLIENT *)NULL;
1271 			}
1272 			rw_unlock(&client_rwlock[setno]);
1273 
1274 			/* ... but don't try infinitely */
1275 			--rpc_retries;
1276 			continue;
1277 		}
1278 		/*
1279 		 * If the class is locked on the other node, keep trying.
1280 		 * This situation will go away automatically,
1281 		 * if we wait long enough
1282 		 */
1283 		if (*ret == MDMNE_CLASS_LOCKED) {
1284 			sleep(1);
1285 			free(ret);
1286 			ret = NULL;
1287 			continue;
1288 		}
1289 	}
1290 	if (ret == NULL) {
1291 		return (MDMNE_RPC_FAIL);
1292 	}
1293 
1294 
1295 	/* if the slave is in abort state, we just ignore it. */
1296 	if (*ret == MDMNE_ABORT) {
1297 		commd_debug(MD_MMV_PROC_M,
1298 		    "proc_mas: work(%d,0x%llx-%d) returned "
1299 		    "MDMNE_ABORT\n",
1300 		    MSGID_ELEMS(msg->msg_msgid));
1301 		free(ret);
1302 		return (MDMNE_IGNORE_NODE);
1303 	}
1304 
1305 	/* Did the remote processing succeed? */
1306 	if (*ret != MDMNE_ACK) {
1307 		/*
1308 		 * Some commd failure in the middle of sending the msg
1309 		 * to the nodes. We don't continue here.
1310 		 */
1311 		commd_debug(MD_MMV_PROC_M,
1312 		    "proc_mas: work(%d,0x%llx-%d) returns %d\n",
1313 		    MSGID_ELEMS(msg->msg_msgid), *ret);
1314 		free(ret);
1315 		return (MDMNE_RPC_FAIL);
1316 	}
1317 	free(ret);
1318 	ret = NULL;
1319 
1320 	/*
1321 	 * When we are here, we have sent the message to the other node and
1322 	 * we know that node has accepted it.
1323 	 * We go to sleep and have trust to be woken up by wakeup.
1324 	 * If we wakeup due to a timeout, or a signal, no result has been
1325 	 * placed in the appropriate slot.
1326 	 * If we timeout, it is likely that this is because the node has
1327 	 * gone away, so we will destroy the client and try it again in the
1328 	 * expectation that the rpc will fail and we will return
1329 	 * MDMNE_IGNORE_NODE. If that is not the case, the message must still
1330 	 * be being processed on the slave. In this case just timeout for 4
1331 	 * more seconds and then return RPC_FAIL if the message is not complete.
1332 	 */
1333 	timeout.tv_nsec = 0;
1334 	timeout.tv_sec = (timeout_retries == 0) ? mdmn_get_timeout(msgtype) :
1335 	    FOUR_SECS.tv_sec;
1336 	err = cond_reltimedwait(cv, mx, &timeout);
1337 
1338 	if (err == 0) {
1339 		/* everything's fine, return success */
1340 		return (MDMNE_ACK);
1341 	}
1342 
1343 	if (err == ETIME) {
1344 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1345 		    "timeout occured, set=%d, class=%d, "
1346 		    "msgid=(%d, 0x%llx-%d), timeout_retries=%d\n",
1347 		    setno, class, MSGID_ELEMS(msg->msg_msgid), timeout_retries);
1348 		if (timeout_retries == 0) {
1349 			timeout_retries++;
1350 			/*
1351 			 * Destroy the client and try the rpc call again
1352 			 */
1353 			rw_wrlock(&client_rwlock[setno]);
1354 			mdmn_clnt_destroy(client[setno][nid]);
1355 			client[setno][nid] = (CLIENT *)NULL;
1356 			rw_unlock(&client_rwlock[setno]);
1357 			goto retry_rpc;
1358 		}
1359 	} else if (err == EINTR) {
1360 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1361 		    "commd signalled, set=%d, class=%d, "
1362 		    "msgid=(%d, 0x%llx-%d)\n",
1363 		    setno, class, MSGID_ELEMS(msg->msg_msgid));
1364 	} else {
1365 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1366 		    "cond_reltimedwait err=%d, set=%d, "
1367 		    "class=%d, msgid=(%d, 0x%llx-%d)\n",
1368 		    err, setno, class,
1369 		    MSGID_ELEMS(msg->msg_msgid));
1370 	}
1371 
1372 	/* some failure happened */
1373 	return (MDMNE_RPC_FAIL);
1374 }
1375 
1376 /*
1377  * before we return we have to
1378  * free_msg(msg); because we are working on a copied message
1379  */
1380 void
1381 mdmn_master_process_msg(md_mn_msg_t *msg)
1382 {
1383 	int		*ret;
1384 	int		err;
1385 	int		nmsgs;		/* total number of msgs */
1386 	int		curmsg;		/* index of current msg */
1387 	set_t		setno;
1388 	uint_t		inherit_flags = 0;
1389 	uint_t		secdiff, usecdiff; /* runtime of this message */
1390 	md_error_t	mde = mdnullerror;
1391 	md_mn_msg_t	*msglist[MAX_SUBMESSAGES]; /* all msgs to process */
1392 	md_mn_msg_t	*cmsg;		/* current msg */
1393 	md_mn_msgid_t	dummyid;
1394 	md_mn_result_t	*result;
1395 	md_mn_result_t	*slave_result;
1396 	md_mn_nodeid_t	sender;
1397 	md_mn_nodeid_t	set_master;
1398 	md_mnnode_desc	*node;
1399 	md_mn_msgtype_t	orig_type;	/* type of the original message */
1400 	md_mn_msgtype_t	msgtype;	/* type of the current message */
1401 	md_mn_msgclass_t orig_class;	/* class of the original message */
1402 	md_mn_msgclass_t class;		/* class of the current message */
1403 
1404 	int (*smgen)(md_mn_msg_t *msg, md_mn_msg_t **msglist);
1405 
1406 	orig_type = msgtype = msg->msg_type;
1407 	sender	= msg->msg_sender;
1408 	setno	= msg->msg_setno;
1409 
1410 	result = Zalloc(sizeof (md_mn_result_t));
1411 	result->mmr_setno	= setno;
1412 	result->mmr_msgtype	= msgtype;
1413 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1414 
1415 	orig_class = mdmn_get_message_class(msgtype);
1416 
1417 	commd_debug(MD_MMV_PROC_M,
1418 	    "proc_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1419 	    MSGID_ELEMS(msg->msg_msgid), setno, orig_class, msgtype);
1420 
1421 	rw_rdlock(&set_desc_rwlock[setno]);
1422 	set_master = set_descriptor[setno]->sd_mn_master_nodeid;
1423 	result->mmr_sender	= set_master;
1424 	/*
1425 	 * Put message into the change log unless told otherwise
1426 	 * Note that we only log original messages.
1427 	 * If they are generated by some smgen, we don't log them!
1428 	 * Replay messages aren't logged either.
1429 	 * Note, that replay messages are unlogged on completion.
1430 	 */
1431 	if ((msg->msg_flags & (MD_MSGF_NO_LOG | MD_MSGF_REPLAY_MSG)) == 0) {
1432 		commd_debug(MD_MMV_PROC_M,
1433 		    "proc_mas: calling log_msg for (%d,0x%llx-%d) type %d\n",
1434 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1435 		err = mdmn_log_msg(msg);
1436 		if (err == MDMNE_NULL) {
1437 			/* msg logged successfully */
1438 			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1439 			    "done log_msg for (%d,0x%llx-%d) type %d\n",
1440 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1441 			goto proceed;
1442 		}
1443 		if (err == MDMNE_ACK) {
1444 			/* Same msg in the slot, proceed */
1445 			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1446 			    "already logged (%d,0x%llx-%d) type %d\n",
1447 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1448 			goto proceed;
1449 		}
1450 		if (err == MDMNE_LOG_FAIL) {
1451 			/* Oh, bad, the log is non functional. */
1452 			result->mmr_comm_state = MDMNE_LOG_FAIL;
1453 			/*
1454 			 * Note that the mark_busy was already done by
1455 			 * mdmn_work_svc_1()
1456 			 */
1457 			mutex_lock(&mdmn_busy_mutex[setno]);
1458 			mdmn_mark_class_unbusy(setno, orig_class);
1459 			mutex_unlock(&mdmn_busy_mutex[setno]);
1460 
1461 		}
1462 		if (err == MDMNE_CLASS_BUSY) {
1463 			/*
1464 			 * The log is occupied with a different message
1465 			 * that needs to be played first.
1466 			 * We reject the current message with MDMNE_CLASS_BUSY
1467 			 * to the initiator and do not unbusy the set/class,
1468 			 * because we will proceed with the logged message,
1469 			 * which has the same set/class combination
1470 			 */
1471 			result->mmr_comm_state = MDMNE_CLASS_BUSY;
1472 		}
1473 		ret = (int *)NULL;
1474 		rw_rdlock(&client_rwlock[setno]);
1475 
1476 		if (check_client(setno, sender)) {
1477 			commd_debug(MD_MMV_SYSLOG,
1478 			    "proc_mas: No client for initiator \n");
1479 		} else {
1480 			ret = mdmn_wakeup_initiator_1(result,
1481 			    client[setno][sender]);
1482 		}
1483 		rw_unlock(&client_rwlock[setno]);
1484 
1485 		if (ret == (int *)NULL) {
1486 			commd_debug(MD_MMV_SYSLOG,
1487 			    "proc_mas: couldn't wakeup_initiator \n");
1488 		} else {
1489 			if (*ret != MDMNE_ACK) {
1490 				commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1491 				    "wakeup_initiator returned %d\n", *ret);
1492 			}
1493 			free(ret);
1494 		}
1495 		free_msg(msg);
1496 
1497 		if (err == MDMNE_LOG_FAIL) {
1498 			/* we can't proceed here */
1499 			free_result(result);
1500 			rw_unlock(&set_desc_rwlock[setno]);
1501 			return;
1502 		} else if (err == MDMNE_CLASS_BUSY) {
1503 			mdmn_changelog_record_t *lr;
1504 			lr = mdmn_get_changelogrec(setno, orig_class);
1505 			assert(lr != NULL);
1506 
1507 			/* proceed with the logged message */
1508 			msg = copy_msg(&(lr->lr_msg), NULL);
1509 
1510 			/*
1511 			 * The logged message has to have the same class but
1512 			 * type and sender can be different
1513 			 */
1514 			orig_type = msgtype = msg->msg_type;
1515 			sender	= msg->msg_sender;
1516 
1517 			commd_debug(MD_MMV_PROC_M,
1518 			    "proc_mas: Got new message from change log: "
1519 			    "(%d,0x%llx-%d) type %d\n",
1520 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1521 
1522 			/* continue normal operation with this message */
1523 		}
1524 	}
1525 
1526 proceed:
1527 	smgen = mdmn_get_submessage_generator(msgtype);
1528 	if (smgen == NULL) {
1529 		/* no submessages to create, just use the original message */
1530 		msglist[0] = msg;
1531 		nmsgs = 1;
1532 	} else {
1533 		/* some bits are passed on to submessages */
1534 		inherit_flags = msg->msg_flags & MD_MSGF_INHERIT_BITS;
1535 
1536 		nmsgs = smgen(msg, msglist);
1537 
1538 		/* some settings for the submessages */
1539 		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1540 			cmsg    = msglist[curmsg];
1541 
1542 			/* Apply the inherited flags */
1543 			cmsg->msg_flags |= inherit_flags;
1544 
1545 			/*
1546 			 * Make sure the submessage ID is set correctly
1547 			 * Note: first submessage has mid_smid of 1 (not 0)
1548 			 */
1549 			cmsg->msg_msgid.mid_smid = curmsg + 1;
1550 
1551 			/* need the original class set in msgID (for MCT) */
1552 			cmsg->msg_msgid.mid_oclass = orig_class;
1553 		}
1554 
1555 		commd_debug(MD_MMV_PROC_M,
1556 		    "smgen generated %d submsgs, origclass = %d\n",
1557 		    nmsgs, orig_class);
1558 	}
1559 	/*
1560 	 * This big loop does the following.
1561 	 * For all messages:
1562 	 *	process message on the master first (a message completion
1563 	 *		table MCT ensures a message is not processed twice)
1564 	 *	in case of an error break out of message loop
1565 	 *	for all nodes -- unless MD_MSGF_NO_BCAST is set --
1566 	 *		send message to node until that succeeds
1567 	 *		merge result -- not yet implemented
1568 	 *		respect MD_MSGF_STOP_ON_ERROR
1569 	 */
1570 	for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1571 		int	break_msg_loop = 0;
1572 		mutex_t	*mx;		/* protection for class_busy */
1573 		int	master_err;
1574 		int	master_exitval = -1;
1575 
1576 		cmsg	= msglist[curmsg];
1577 		msgtype = cmsg->msg_type;
1578 		class	= mdmn_get_message_class(msgtype);
1579 		node	= NULL;
1580 		mx	= mdmn_get_master_table_mx(setno, class);
1581 
1582 		/* If we are in the abort state, we error out immediately */
1583 		if (md_commd_global_state & MD_CGS_ABORTED) {
1584 			break; /* out of the message loop */
1585 		}
1586 
1587 		commd_debug(MD_MMV_PROC_M, "class=%d, orig_class=%d\n",
1588 		    class, orig_class);
1589 		/*
1590 		 * If the current class is different from the original class,
1591 		 * we have to lock it down.
1592 		 * The original class is already marked busy.
1593 		 * At this point we cannot refuse the message because the
1594 		 * class is busy right now, so we wait until the class becomes
1595 		 * available again. As soon as something changes for this set
1596 		 * we will be cond_signal'ed (in mdmn_mark_class_unbusy)
1597 		 *
1598 		 * Granularity could be finer (setno/class)
1599 		 */
1600 		if (class != orig_class) {
1601 			mutex_lock(&mdmn_busy_mutex[setno]);
1602 			while (mdmn_mark_class_busy(setno, class) == FALSE) {
1603 				cond_wait(&mdmn_busy_cv[setno],
1604 				    &mdmn_busy_mutex[setno]);
1605 			}
1606 			mutex_unlock(&mdmn_busy_mutex[setno]);
1607 		}
1608 
1609 		master_err = do_message_locally(cmsg, result);
1610 
1611 		if ((master_err != MDMNE_ACK) ||
1612 		    ((master_err == MDMNE_ACK) && (result->mmr_exitval != 0))) {
1613 			result->mmr_failing_node = set_master;
1614 			if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1615 				/*
1616 				 * if appropriate, unbusy the class and
1617 				 * break out of the message loop
1618 				 */
1619 				if (class != orig_class) {
1620 					mutex_lock(&mdmn_busy_mutex[setno]);
1621 					mdmn_mark_class_unbusy(setno, class);
1622 					mutex_unlock(&mdmn_busy_mutex[setno]);
1623 				}
1624 				break;
1625 			}
1626 		}
1627 
1628 		if (master_err == MDMNE_ACK)
1629 			master_exitval = result->mmr_exitval;
1630 
1631 		/* No broadcast? => next message */
1632 		if (cmsg->msg_flags & MD_MSGF_NO_BCAST) {
1633 			/* if appropriate, unbusy the class */
1634 			if (class != orig_class) {
1635 				mutex_lock(&mdmn_busy_mutex[setno]);
1636 				mdmn_mark_class_unbusy(setno, class);
1637 				mutex_unlock(&mdmn_busy_mutex[setno]);
1638 			}
1639 			continue;
1640 		}
1641 
1642 
1643 		/* fake sender, so we get notified when the results are avail */
1644 		cmsg->msg_sender = set_master;
1645 		/*
1646 		 * register to the master_table. It's needed by wakeup_master to
1647 		 * wakeup the sleeping thread.
1648 		 * Access is protected by the class lock: mdmn_mark_class_busy()
1649 		 */
1650 		mdmn_set_master_table_id(setno, class, &(cmsg->msg_msgid));
1651 
1652 
1653 
1654 		rw_rdlock(&set_desc_rwlock[setno]);
1655 		/* Send the message  to all other nodes */
1656 		for (node = set_descriptor[setno]->sd_nodelist; node;
1657 		    node = node->nd_next) {
1658 			md_mn_nodeid_t nid = node->nd_nodeid;
1659 
1660 			/* We are master and have already processed the msg */
1661 			if (node == set_descriptor[setno]->sd_mn_masternode) {
1662 				continue;
1663 			}
1664 
1665 			/* If this node didn't join the disk set, ignore it */
1666 			if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
1667 				continue;
1668 			}
1669 
1670 			mutex_lock(mx);
1671 			/*
1672 			 * Register the node that is addressed,
1673 			 * so we can detect unsolicited messages
1674 			 */
1675 			mdmn_set_master_table_addr(setno, class, nid);
1676 			slave_result = (md_mn_result_t *)NULL;
1677 
1678 			/*
1679 			 * Now send it. do_send_message() will return if
1680 			 *	a failure occurs or
1681 			 *	the results are available
1682 			 */
1683 			err = do_send_message(cmsg, node);
1684 
1685 			/*  in abort state, we error out immediately */
1686 			if (md_commd_global_state & MD_CGS_ABORTED) {
1687 				break;
1688 			}
1689 
1690 			if (err == MDMNE_ACK) {
1691 				slave_result =
1692 				    mdmn_get_master_table_res(setno, class);
1693 				commd_debug(MD_MMV_PROC_M,
1694 				    "proc_mas: got result for (%d,0x%llx-%d)\n",
1695 				    MSGID_ELEMS(cmsg->msg_msgid));
1696 			} else if (err == MDMNE_IGNORE_NODE) {
1697 				mutex_unlock(mx);
1698 				continue; /* send to next node */
1699 			}
1700 			mutex_unlock(mx);
1701 
1702 
1703 			/*
1704 			 * If the result is NULL, or err doesn't show success,
1705 			 * something went wrong with this RPC call.
1706 			 */
1707 			if ((slave_result == NULL) || (err != MDMNE_ACK)) {
1708 				/*
1709 				 * If PANIC_WHEN_INCONSISTENT set,
1710 				 * panic if the master succeeded while
1711 				 * this node failed
1712 				 */
1713 				if ((cmsg->msg_flags &
1714 				    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1715 				    (master_err == MDMNE_ACK))
1716 					panic_system(nid, cmsg->msg_type,
1717 					    master_err, master_exitval,
1718 					    slave_result);
1719 
1720 				result->mmr_failing_node = nid;
1721 				/* are we supposed to stop in case of error? */
1722 				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1723 					result->mmr_exitval = MDMNE_RPC_FAIL;
1724 					commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1725 					    "result (%d,0x%llx-%d) is NULL\n",
1726 					    MSGID_ELEMS(cmsg->msg_msgid));
1727 					FLUSH_DEBUGFILE();
1728 					break_msg_loop = 1;
1729 					break; /* out of node loop first */
1730 				} else {
1731 					/* send msg to the next node */
1732 					continue;
1733 				}
1734 
1735 			}
1736 
1737 			/*
1738 			 * Message processed on remote node.
1739 			 * If PANIC_WHEN_INCONSISTENT set, panic if the
1740 			 * result is different on this node from the result
1741 			 * on the master
1742 			 */
1743 			if ((cmsg->msg_flags &
1744 			    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1745 			    ((master_err != MDMNE_ACK) ||
1746 			    (slave_result->mmr_exitval != master_exitval)))
1747 				panic_system(nid, cmsg->msg_type, master_err,
1748 				    master_exitval, slave_result);
1749 
1750 			/*
1751 			 * At this point we know we have a message that was
1752 			 * processed on the remote node.
1753 			 * We now check if the exitval is non zero.
1754 			 * In that case we discard the previous result and
1755 			 * rather use the current.
1756 			 * This means: If a message fails on no node,
1757 			 * the result from the master will be returned.
1758 			 * There's currently no such thing as merge of results
1759 			 * If additionally STOP_ON_ERROR is set, we bail out
1760 			 */
1761 			if (slave_result->mmr_exitval != 0) {
1762 				/* throw away the previously allocated result */
1763 				free_result(result);
1764 
1765 				/* copy_result() allocates new memory */
1766 				result = copy_result(slave_result);
1767 				free_result(slave_result);
1768 
1769 				dump_result(MD_MMV_PROC_M, "proc_mas", result);
1770 
1771 				result->mmr_failing_node = nid;
1772 				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1773 					break_msg_loop = 1;
1774 					break; /* out of node loop */
1775 				}
1776 				continue; /* try next node */
1777 
1778 			} else {
1779 				/*
1780 				 * MNIssue: may want to merge the results
1781 				 * from all slaves.  Currently only report
1782 				 * the results from the master.
1783 				 */
1784 				free_result(slave_result);
1785 			}
1786 
1787 		} /* End of loop over the nodes */
1788 		rw_unlock(&set_desc_rwlock[setno]);
1789 
1790 
1791 		/* release the current class again */
1792 		if (class != orig_class) {
1793 			mutex_lock(&mdmn_busy_mutex[setno]);
1794 			mdmn_mark_class_unbusy(setno, class);
1795 			mutex_unlock(&mdmn_busy_mutex[setno]);
1796 		}
1797 
1798 		/* are we supposed to quit entirely ? */
1799 		if (break_msg_loop ||
1800 		    (md_commd_global_state & MD_CGS_ABORTED)) {
1801 			break; /* out of msg loop */
1802 		}
1803 
1804 	} /* End of loop over the messages */
1805 	/*
1806 	 * If we are here, there's two possibilities:
1807 	 * 	- we processed all messages on all nodes without an error.
1808 	 *	    In this case we return the result from the master.
1809 	 *	    (to be implemented: return the merged result)
1810 	 *	- we encountered an error in which case result has been
1811 	 *	    set accordingly already.
1812 	 */
1813 
1814 	if (md_commd_global_state & MD_CGS_ABORTED) {
1815 		result->mmr_comm_state = MDMNE_ABORT;
1816 	}
1817 
1818 	/*
1819 	 * This message has been processed completely.
1820 	 * Remove it from the changelog.
1821 	 * Do this for replay messages too.
1822 	 * Note that the message is unlogged before waking up the
1823 	 * initiator.  This is done for two reasons.
1824 	 * 1. Remove a race condition that occurs when back to back
1825 	 *   messages are sent for the same class, the registeration is
1826 	 *   is lost.
1827 	 * 2. If the initiator died but the action was completed on all the
1828 	 *   the nodes, we want that to be marked "done" quickly.
1829 	 */
1830 
1831 	if ((msg->msg_flags & MD_MSGF_NO_LOG) == 0) {
1832 		commd_debug(MD_MMV_PROC_M,
1833 		    "proc_mas: calling unlog_msg for (%d,0x%llx-%d) type %d\n",
1834 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1835 		mdmn_unlog_msg(msg);
1836 		commd_debug(MD_MMV_PROC_M,
1837 		    "proc_mas: done unlog_msg for (%d,0x%llx-%d) type %d\n",
1838 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1839 	}
1840 
1841 	/*
1842 	 * In case of submessages, we increased the submessage ID in the
1843 	 * result structure. We restore the message ID to the value that
1844 	 * the initiator is waiting for.
1845 	 */
1846 	result->mmr_msgid.mid_smid	= 0;
1847 	result->mmr_msgtype		= orig_type;
1848 	result->mmr_sender		= set_master;
1849 
1850 	/* if we have an inited client, send result */
1851 	ret = (int *)NULL;
1852 
1853 	rw_rdlock(&client_rwlock[setno]);
1854 	if (check_client(setno, sender)) {
1855 		commd_debug(MD_MMV_SYSLOG,
1856 		    "proc_mas: unable to create client for initiator\n");
1857 	} else {
1858 		ret = mdmn_wakeup_initiator_1(result, client[setno][sender]);
1859 	}
1860 	rw_unlock(&client_rwlock[setno]);
1861 
1862 	if (ret == (int *)NULL) {
1863 		commd_debug(MD_MMV_PROC_M,
1864 		    "proc_mas: couldn't wakeup initiator\n");
1865 	} else {
1866 		if (*ret != MDMNE_ACK) {
1867 			commd_debug(MD_MMV_PROC_M,
1868 			    "proc_mas: wakeup_initiator returned %d\n",
1869 			    *ret);
1870 		}
1871 		free(ret);
1872 	}
1873 
1874 	rw_unlock(&set_desc_rwlock[setno]);
1875 	/* Free all submessages, if there were any */
1876 	if (nmsgs > 1) {
1877 		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1878 			free_msg(msglist[curmsg]);
1879 		}
1880 	}
1881 	/* Free the result */
1882 	free_result(result);
1883 
1884 	mutex_lock(&mdmn_busy_mutex[setno]);
1885 	mdmn_mark_class_unbusy(setno, orig_class);
1886 	mutex_unlock(&mdmn_busy_mutex[setno]);
1887 
1888 
1889 	/*
1890 	 * We use this ioctl just to get the time in the same format as used in
1891 	 * the messageID. If it fails, all we get is a bad runtime output.
1892 	 */
1893 	(void) metaioctl(MD_IOCGUNIQMSGID, &dummyid, &mde, NULL);
1894 	secdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) >> 32;
1895 	usecdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) & 0xfffff;
1896 
1897 	/* catching possible overflow */
1898 	if (usecdiff >= 1000000) {
1899 		usecdiff -= 1000000;
1900 		secdiff++;
1901 	}
1902 
1903 
1904 	commd_debug(MD_MMV_PROC_M, "proc_mas: done (%d, 0x%llx-%d) type=%02d "
1905 	    "%5d.%06d secs runtime\n",
1906 	    MSGID_ELEMS(msg->msg_msgid), orig_type, secdiff, usecdiff);
1907 
1908 	/* Free the original message */
1909 	free_msg(msg);
1910 }
1911 
1912 void
1913 mdmn_slave_process_msg(md_mn_msg_t *msg)
1914 {
1915 	int			*ret = NULL;
1916 	int			completed;
1917 	int			retries;
1918 	int			successfully_returned;
1919 	set_t			setno;
1920 	md_mn_result_t		*result;
1921 	md_mn_nodeid_t		sender;
1922 	md_mn_nodeid_t		whoami;
1923 	md_mn_msgtype_t		msgtype;
1924 	md_mn_msgclass_t	class;
1925 
1926 	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1927 
1928 	setno	= msg->msg_setno;
1929 	sender	= msg->msg_sender; /* this is always the master of the set */
1930 	msgtype	= msg->msg_type;
1931 
1932 	rw_rdlock(&set_desc_rwlock[setno]);
1933 	whoami		= set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
1934 	rw_unlock(&set_desc_rwlock[setno]);
1935 
1936 	result = Zalloc(sizeof (md_mn_result_t));
1937 	result->mmr_flags	= msg->msg_flags;
1938 	result->mmr_setno	= setno;
1939 	result->mmr_msgtype	= msgtype;
1940 	result->mmr_sender	= whoami;
1941 	result->mmr_comm_state	= MDMNE_ACK; /* Ok state */
1942 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1943 	class = mdmn_get_message_class(msgtype);
1944 
1945 	commd_debug(MD_MMV_PROC_S,
1946 	    "proc_sla: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1947 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msgtype);
1948 
1949 	handler = mdmn_get_handler(msgtype);
1950 
1951 	if (handler == NULL) {
1952 		result->mmr_exitval = 0;
1953 		/* let the sender decide if this is an error or not */
1954 		result->mmr_comm_state = MDMNE_NO_HANDLER;
1955 		commd_debug(MD_MMV_PROC_S,
1956 		    "proc_sla: No handler for (%d, 0x%llx-%d)\n",
1957 		    MSGID_ELEMS(msg->msg_msgid));
1958 	} else {
1959 
1960 		/* Did we already process this message ? */
1961 		mutex_lock(&mct_mutex[setno][class]);
1962 		completed = mdmn_check_completion(msg, result);
1963 
1964 		if (completed == MDMN_MCT_NOT_DONE) {
1965 			/* message not yet processed locally */
1966 			commd_debug(MD_MMV_PROC_S,
1967 			    "proc_sla: calling handler for (%d, 0x%llx-%d)\n",
1968 			    MSGID_ELEMS(msg->msg_msgid));
1969 
1970 			/*
1971 			 * Mark the message as being currently processed,
1972 			 * so we won't start a second handler for it
1973 			 */
1974 			(void) mdmn_mark_completion(msg, NULL,
1975 			    MDMN_MCT_IN_PROGRESS);
1976 
1977 			mutex_unlock(&mct_mutex[setno][class]);
1978 			(*handler)(msg, MD_MSGF_ON_SLAVE, result);
1979 
1980 			commd_debug(MD_MMV_PROC_S,
1981 			    "proc_sla: finished handler for (%d, 0x%llx-%d)\n",
1982 			    MSGID_ELEMS(msg->msg_msgid));
1983 
1984 			mutex_lock(&mct_mutex[setno][class]);
1985 			/* Mark the message as fully done, store the result */
1986 			(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
1987 
1988 		} else if (completed == MDMN_MCT_DONE) {
1989 			/* message processed previously, got result from MCT */
1990 			commd_debug(MD_MMV_PROC_S,
1991 			    "proc_sla: result for (%d, 0x%llx-%d) from MCT\n",
1992 			    MSGID_ELEMS(msg->msg_msgid));
1993 		} else if (completed == MDMN_MCT_IN_PROGRESS) {
1994 			/*
1995 			 * If the message is curruntly being processed,
1996 			 * we can return here, without sending a result back.
1997 			 * This will be done by the initial message handling
1998 			 * thread
1999 			 */
2000 			mutex_unlock(&mct_mutex[setno][class]);
2001 			commd_debug(MD_MMV_PROC_M, "proc_sla: "
2002 			    "(%d, 0x%llx-%d) is currently being processed\n",
2003 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
2004 
2005 			free_msg(msg);
2006 			free_result(result);
2007 			return;
2008 		} else {
2009 			/* MCT error occurred (should never happen) */
2010 			result->mmr_comm_state = MDMNE_LOG_FAIL;
2011 			commd_debug(MD_MMV_PROC_S,
2012 			    "proc_sla: MCT error for (%d, 0x%llx-%d)\n",
2013 			    MSGID_ELEMS(msg->msg_msgid));
2014 		}
2015 		mutex_unlock(&mct_mutex[setno][class]);
2016 	}
2017 
2018 	/*
2019 	 * At this point we have a result (even in an error case)
2020 	 * that we return to the master.
2021 	 */
2022 	rw_rdlock(&set_desc_rwlock[setno]);
2023 	retries = 2; /* we will try two times to send the results */
2024 	successfully_returned = 0;
2025 
2026 	while (!successfully_returned && (retries != 0)) {
2027 		ret = (int *)NULL;
2028 		rw_rdlock(&client_rwlock[setno]);
2029 		if (check_client(setno, sender)) {
2030 			/*
2031 			 * If we cannot setup the rpc connection to the master,
2032 			 * we can't do anything besides logging this fact.
2033 			 */
2034 			commd_debug(MD_MMV_SYSLOG,
2035 			    "proc_mas: unable to create client for master\n");
2036 			rw_unlock(&client_rwlock[setno]);
2037 			break;
2038 		} else {
2039 			ret = mdmn_wakeup_master_1(result,
2040 			    client[setno][sender]);
2041 			/*
2042 			 * if mdmn_wakeup_master_1 returns NULL, it can be that
2043 			 * the master (or the commd on the master) had died.
2044 			 * In that case, we destroy the client to the master
2045 			 * and retry.
2046 			 * If mdmn_wakeup_master_1 doesn't return MDMNE_ACK,
2047 			 * the commd on the master is alive but
2048 			 * something else is wrong,
2049 			 * in that case a retry doesn't make sense => break out
2050 			 */
2051 			if (ret == (int *)NULL) {
2052 				commd_debug(MD_MMV_PROC_S,
2053 				    "proc_sla: wakeup_master returned NULL\n");
2054 				/* release reader lock, grab writer lock */
2055 				rw_unlock(&client_rwlock[setno]);
2056 				rw_wrlock(&client_rwlock[setno]);
2057 				mdmn_clnt_destroy(client[setno][sender]);
2058 				if (client[setno][sender] != (CLIENT *)NULL) {
2059 					client[setno][sender] = (CLIENT *)NULL;
2060 				}
2061 				rw_unlock(&client_rwlock[setno]);
2062 				retries--;
2063 				commd_debug(MD_MMV_PROC_S,
2064 				    "retries = %d\n", retries);
2065 				continue;
2066 			}
2067 			if (*ret != MDMNE_ACK) {
2068 				commd_debug(MD_MMV_PROC_S, "proc_sla: "
2069 				    "wakeup_master returned %d\n", *ret);
2070 				rw_unlock(&client_rwlock[setno]);
2071 				break;
2072 			} else { /* Good case */
2073 				successfully_returned = 1;
2074 				rw_unlock(&client_rwlock[setno]);
2075 			}
2076 		}
2077 	}
2078 
2079 	rw_unlock(&set_desc_rwlock[setno]);
2080 	commd_debug(MD_MMV_PROC_S, "proc_sla: done (%d, 0x%llx-%d)\n",
2081 	    MSGID_ELEMS(msg->msg_msgid));
2082 
2083 	if (ret != (int *)NULL)
2084 		free(ret);
2085 	free_msg(msg);
2086 	free_result(result);
2087 }
2088 
2089 
2090 md_mn_result_t *
2091 mdmn_send_svc_1(md_mn_msg_t *omsg, struct svc_req *rqstp)
2092 {
2093 	int			err;
2094 	set_t			setno;
2095 	SVCXPRT			*transp = rqstp->rq_xprt;
2096 	md_mn_msg_t		*msg;
2097 	md_mn_result_t		*resultp;
2098 	md_mn_msgclass_t	class;
2099 	md_mn_msg_and_transp_t	*matp;
2100 
2101 	msg = copy_msg(omsg, NULL);
2102 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2103 
2104 	setno = msg->msg_setno;
2105 	class = mdmn_get_message_class(msg->msg_type);
2106 
2107 	/* If we are in the abort state, we error out immediately */
2108 	if (md_commd_global_state & MD_CGS_ABORTED) {
2109 		resultp = Zalloc(sizeof (md_mn_result_t));
2110 		resultp->mmr_comm_state = MDMNE_ABORT;
2111 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2112 		free_result(resultp);
2113 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2114 		return (NULL);
2115 	}
2116 
2117 	/* check if the global initialization is done */
2118 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2119 		global_init();
2120 	}
2121 
2122 	commd_debug(MD_MMV_SEND,
2123 	    "send: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2124 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2125 
2126 	/* Check for verbosity related message */
2127 	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2128 		md_mn_verbose_t *d;
2129 
2130 		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2131 		md_commd_global_verb = d->mmv_what;
2132 		/* everytime the bitmask is set, we reset the timer */
2133 		__savetime = gethrtime();
2134 		/*
2135 		 * If local-only-flag is set, we are done here,
2136 		 * otherwise we pass that message on to the master.
2137 		 */
2138 		if (msg->msg_flags & MD_MSGF_LOCAL_ONLY) {
2139 			resultp = Zalloc(sizeof (md_mn_result_t));
2140 			resultp->mmr_comm_state = MDMNE_ACK;
2141 			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2142 			    (char *)resultp);
2143 			free_result(resultp);
2144 			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2145 			return (NULL);
2146 		}
2147 	}
2148 
2149 	/*
2150 	 * Are we entering the abort state?
2151 	 * Here we don't even need to check for MD_MSGF_LOCAL_ONLY, because
2152 	 * this message cannot be distributed anyway.
2153 	 * So, it's safe to return immediately.
2154 	 */
2155 	if (msg->msg_type == MD_MN_MSG_ABORT) {
2156 		md_commd_global_state |= MD_CGS_ABORTED;
2157 		resultp = Zalloc(sizeof (md_mn_result_t));
2158 		resultp->mmr_comm_state = MDMNE_ACK;
2159 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2160 		free_result(resultp);
2161 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2162 		return (NULL);
2163 	}
2164 
2165 
2166 	/*
2167 	 * Is this message type blocked?
2168 	 * If so we return MDMNE_CLASS_LOCKED, immediately
2169 	 */
2170 	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2171 		resultp = Zalloc(sizeof (md_mn_result_t));
2172 		resultp->mmr_comm_state = MDMNE_CLASS_LOCKED;
2173 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2174 		free_result(resultp);
2175 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2176 		commd_debug(MD_MMV_SEND,
2177 			"send: type locked (%d, 0x%llx-%d), set=%d, class=%d, "
2178 			"type=%d\n", MSGID_ELEMS(msg->msg_msgid), setno, class,
2179 			msg->msg_type);
2180 		return (NULL);
2181 	}
2182 
2183 
2184 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2185 		/* Can only use the appropriate mutexes if they are inited */
2186 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2187 			rw_wrlock(&set_desc_rwlock[setno]);
2188 			rw_wrlock(&client_rwlock[setno]);
2189 			err = mdmn_init_set(setno, MDMN_SET_READY);
2190 			rw_unlock(&client_rwlock[setno]);
2191 			rw_unlock(&set_desc_rwlock[setno]);
2192 		} else {
2193 			err = mdmn_init_set(setno, MDMN_SET_READY);
2194 		}
2195 
2196 		if (err) {
2197 			/* couldn't initialize connections, cannot proceed */
2198 			resultp = Zalloc(sizeof (md_mn_result_t));
2199 			resultp->mmr_comm_state = err;
2200 			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2201 			    (char *)resultp);
2202 			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2203 			free_result(resultp);
2204 			commd_debug(MD_MMV_SEND,
2205 			    "send: init err = %d\n", err);
2206 			return (NULL);
2207 		}
2208 	}
2209 
2210 	mutex_lock(&mdmn_busy_mutex[setno]);
2211 	if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2212 	    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2213 		mutex_unlock(&mdmn_busy_mutex[setno]);
2214 		resultp = Zalloc(sizeof (md_mn_result_t));
2215 		resultp->mmr_comm_state = MDMNE_SUSPENDED;
2216 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2217 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2218 		free_result(resultp);
2219 		commd_debug(MD_MMV_SEND,
2220 			"send: class suspended (%d, 0x%llx-%d), set=%d, "
2221 			"class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2222 			setno, class, msg->msg_type);
2223 		return (NULL);
2224 	}
2225 	mutex_unlock(&mdmn_busy_mutex[setno]);
2226 
2227 	/* is this rpc request coming from the local node? */
2228 	if (check_license(rqstp, 0) == FALSE) {
2229 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2230 		commd_debug(MD_MMV_SEND,
2231 			"send: check licence fail(%d, 0x%llx-%d), set=%d, "
2232 			"class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2233 			setno, class, msg->msg_type);
2234 		return (NULL);
2235 	}
2236 
2237 
2238 	/*
2239 	 * We allocate a structure that can take two pointers in order to pass
2240 	 * both the message and the transp into thread_create.
2241 	 * The free for this alloc is done in mdmn_send_to_work()
2242 	 */
2243 	matp = Malloc(sizeof (md_mn_msg_and_transp_t));
2244 	matp->mat_msg = msg;
2245 	matp->mat_transp = transp;
2246 
2247 	/*
2248 	 * create a thread here that calls work on the master.
2249 	 * If we are already on the master, this would block if running
2250 	 * in the same context. (our service is single threaded)(
2251 	 * Make it a detached thread because it will not communicate with
2252 	 * anybody thru thr_* mechanisms
2253 	 */
2254 	thr_create(NULL, 0, mdmn_send_to_work, (void *) matp, THR_DETACHED,
2255 	    NULL);
2256 
2257 	commd_debug(MD_MMV_SEND, "send: done (%d, 0x%llx-%d)\n",
2258 	    MSGID_ELEMS(msg->msg_msgid));
2259 	/*
2260 	 * We return here without sending results. This will be done by
2261 	 * mdmn_wakeup_initiator_svc_1() as soon as the results are available.
2262 	 * Until then the calling send_message will be blocked, while we
2263 	 * are able to take calls.
2264 	 */
2265 
2266 	return (NULL);
2267 }
2268 
2269 /* ARGSUSED */
2270 int *
2271 mdmn_work_svc_1(md_mn_msg_t *omsg, struct svc_req *rqstp)
2272 {
2273 	int		err;
2274 	set_t		setno;
2275 	thread_t	tid;
2276 	int		*retval;
2277 	md_mn_msg_t	*msg;
2278 	md_mn_msgclass_t class;
2279 
2280 	retval = Malloc(sizeof (int));
2281 
2282 	/* If we are in the abort state, we error out immediately */
2283 	if (md_commd_global_state & MD_CGS_ABORTED) {
2284 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2285 		*retval = MDMNE_ABORT;
2286 		return (retval);
2287 	}
2288 
2289 	msg = copy_msg(omsg, NULL);
2290 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2291 
2292 	/*
2293 	 * Is this message type blocked?
2294 	 * If so we return MDMNE_CLASS_LOCKED, immediately.
2295 	 * This check is performed on master and slave.
2296 	 */
2297 	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2298 		*retval = MDMNE_CLASS_LOCKED;
2299 		return (retval);
2300 	}
2301 
2302 	/* check if the global initialization is done */
2303 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2304 		global_init();
2305 	}
2306 
2307 	class = mdmn_get_message_class(msg->msg_type);
2308 	setno = msg->msg_setno;
2309 
2310 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2311 		/* Can only use the appropriate mutexes if they are inited */
2312 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2313 			rw_wrlock(&set_desc_rwlock[setno]);
2314 			rw_wrlock(&client_rwlock[setno]);
2315 			err = mdmn_init_set(setno, MDMN_SET_READY);
2316 			rw_unlock(&client_rwlock[setno]);
2317 			rw_unlock(&set_desc_rwlock[setno]);
2318 		} else {
2319 			err = mdmn_init_set(setno, MDMN_SET_READY);
2320 		}
2321 
2322 		if (err) {
2323 			*retval = MDMNE_CANNOT_CONNECT;
2324 			free_msg(msg);
2325 			return (retval);
2326 		}
2327 	}
2328 
2329 	/* is this rpc request coming from a licensed node? */
2330 	if (check_license(rqstp, msg->msg_sender) == FALSE) {
2331 		free_msg(msg);
2332 		*retval = MDMNE_RPC_FAIL;
2333 		return (retval);
2334 	}
2335 
2336 	commd_debug(MD_MMV_WORK,
2337 	    "work: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
2338 	    "flags=0x%x\n",
2339 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type,
2340 	    msg->msg_flags);
2341 
2342 	/* Check for various CLASS0 message types */
2343 	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2344 		md_mn_verbose_t *d;
2345 
2346 		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2347 		/* for now we ignore set / class in md_mn_verbose_t */
2348 		md_commd_global_verb = d->mmv_what;
2349 		/* everytime the bitmask is set, we reset the timer */
2350 		__savetime = gethrtime();
2351 	}
2352 
2353 	mutex_lock(&mdmn_busy_mutex[setno]);
2354 
2355 	/* check if class is locked via a call to mdmn_comm_lock_svc_1 */
2356 	if (mdmn_is_class_locked(setno, class) == TRUE) {
2357 		mutex_unlock(&mdmn_busy_mutex[setno]);
2358 		*retval = MDMNE_CLASS_LOCKED;
2359 		free_msg(msg);
2360 		return (retval);
2361 	}
2362 	mutex_unlock(&mdmn_busy_mutex[setno]);
2363 
2364 	/* Check if the class is busy right now. Do it only on the master */
2365 	rw_rdlock(&set_desc_rwlock[setno]);
2366 	if (set_descriptor[setno]->sd_mn_am_i_master) {
2367 		rw_unlock(&set_desc_rwlock[setno]);
2368 		/*
2369 		 * If the class is currently suspended, don't accept new
2370 		 * messages, unless they are flagged with an override bit.
2371 		 */
2372 		mutex_lock(&mdmn_busy_mutex[setno]);
2373 		if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2374 		    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2375 			mutex_unlock(&mdmn_busy_mutex[setno]);
2376 			*retval = MDMNE_SUSPENDED;
2377 			commd_debug(MD_MMV_SEND,
2378 			    "send: set %d is suspended\n", setno);
2379 			free_msg(msg);
2380 			return (retval);
2381 		}
2382 		if (mdmn_mark_class_busy(setno, class) == FALSE) {
2383 			mutex_unlock(&mdmn_busy_mutex[setno]);
2384 			*retval = MDMNE_CLASS_BUSY;
2385 			free_msg(msg);
2386 			return (retval);
2387 		}
2388 		mutex_unlock(&mdmn_busy_mutex[setno]);
2389 		/*
2390 		 * Because the real processing of the message takes time we
2391 		 * create a thread for it. So the master thread can continue
2392 		 * to run and accept further messages.
2393 		 */
2394 		*retval = thr_create(NULL, 0,
2395 		    (void *(*)(void *))mdmn_master_process_msg, (void *)msg,
2396 		    THR_DETACHED|THR_SUSPENDED, &tid);
2397 	} else {
2398 		rw_unlock(&set_desc_rwlock[setno]);
2399 		*retval = thr_create(NULL, 0,
2400 		    (void *(*)(void *)) mdmn_slave_process_msg, (void *)msg,
2401 		    THR_DETACHED|THR_SUSPENDED, &tid);
2402 	}
2403 
2404 	if (*retval != 0) {
2405 		*retval = MDMNE_THR_CREATE_FAIL;
2406 		free_msg(msg);
2407 		return (retval);
2408 	}
2409 
2410 	/* Now run the new thread */
2411 	thr_continue(tid);
2412 
2413 	commd_debug(MD_MMV_WORK,
2414 	    "work: done (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2415 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2416 
2417 	*retval = MDMNE_ACK; /* this means success */
2418 	return (retval);
2419 }
2420 
2421 /* ARGSUSED */
2422 int *
2423 mdmn_wakeup_initiator_svc_1(md_mn_result_t *res, struct svc_req *rqstp)
2424 {
2425 
2426 	int		*retval;
2427 	int		err;
2428 	set_t		setno;
2429 	mutex_t		*mx;   /* protection of initiator_table */
2430 	SVCXPRT		*transp;
2431 	md_mn_msgid_t	initiator_table_id;
2432 	md_mn_msgclass_t class;
2433 
2434 	retval = Malloc(sizeof (int));
2435 
2436 	/* check if the global initialization is done */
2437 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2438 		global_init();
2439 	}
2440 
2441 	setno	= res->mmr_setno;
2442 
2443 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2444 		/* set not ready means we just crashed are restarted now */
2445 		/* Can only use the appropriate mutexes if they are inited */
2446 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2447 			rw_wrlock(&set_desc_rwlock[setno]);
2448 			rw_wrlock(&client_rwlock[setno]);
2449 			err = mdmn_init_set(setno, MDMN_SET_READY);
2450 			rw_unlock(&client_rwlock[setno]);
2451 			rw_unlock(&set_desc_rwlock[setno]);
2452 		} else {
2453 			err = mdmn_init_set(setno, MDMN_SET_READY);
2454 		}
2455 
2456 		if (err) {
2457 			*retval = MDMNE_CANNOT_CONNECT;
2458 			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2459 			return (retval);
2460 		}
2461 	}
2462 
2463 	/* is this rpc request coming from a licensed node? */
2464 	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2465 		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2466 		*retval = MDMNE_RPC_FAIL;
2467 		return (retval);
2468 	}
2469 
2470 
2471 	class	= mdmn_get_message_class(res->mmr_msgtype);
2472 	mx	= mdmn_get_initiator_table_mx(setno, class);
2473 
2474 	commd_debug(MD_MMV_WAKE_I,
2475 	    "wake_ini: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
2476 	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype);
2477 
2478 	mutex_lock(mx);
2479 
2480 	/*
2481 	 * Search the initiator wakeup table.
2482 	 * If we find an entry here (which should always be true)
2483 	 * we are on the initiating node and we wakeup the original
2484 	 * local rpc call
2485 	 */
2486 	mdmn_get_initiator_table_id(setno, class, &initiator_table_id);
2487 
2488 	if (MSGID_CMP(&(initiator_table_id), &(res->mmr_msgid))) {
2489 		transp = mdmn_get_initiator_table_transp(setno, class);
2490 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)res);
2491 		mdmn_unregister_initiator_table(setno, class);
2492 		*retval = MDMNE_ACK;
2493 
2494 		commd_debug(MD_MMV_WAKE_I,
2495 		    "wake_ini: replied (%d, 0x%llx-%d)\n",
2496 		    MSGID_ELEMS(res->mmr_msgid));
2497 	} else {
2498 		commd_debug(MD_MMV_WAKE_I,
2499 		    "wakeup initiator: unsolicited message (%d, 0x%llx-%d)\n",
2500 		    MSGID_ELEMS(res->mmr_msgid));
2501 		*retval = MDMNE_NO_WAKEUP_ENTRY;
2502 	}
2503 	mutex_unlock(mx);
2504 	/* less work for check_timeouts */
2505 	mutex_lock(&check_timeout_mutex);
2506 	if (messages_on_their_way == 0) {
2507 		commd_debug(MD_MMV_WAKE_I,
2508 		    "Oops, messages_on_their_way < 0 (%d, 0x%llx-%d)\n",
2509 		    MSGID_ELEMS(res->mmr_msgid));
2510 	} else {
2511 		messages_on_their_way--;
2512 	}
2513 	mutex_unlock(&check_timeout_mutex);
2514 	xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2515 
2516 	return (retval);
2517 }
2518 
2519 
2520 /*
2521  * res must be free'd by the thread we wake up
2522  */
2523 /* ARGSUSED */
2524 int *
2525 mdmn_wakeup_master_svc_1(md_mn_result_t *ores, struct svc_req *rqstp)
2526 {
2527 
2528 	int		*retval;
2529 	int		err;
2530 	set_t		setno;
2531 	cond_t		*cv;
2532 	mutex_t		*mx;
2533 	md_mn_msgid_t	master_table_id;
2534 	md_mn_nodeid_t	sender;
2535 	md_mn_result_t	*res;
2536 	md_mn_msgclass_t class;
2537 
2538 	retval = Malloc(sizeof (int));
2539 
2540 	/* check if the global initialization is done */
2541 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2542 		global_init();
2543 	}
2544 
2545 	/* Need to copy the results here, as they are static for RPC */
2546 	res = copy_result(ores);
2547 	xdr_free(xdr_md_mn_result_t, (caddr_t)ores);
2548 
2549 	class = mdmn_get_message_class(res->mmr_msgtype);
2550 	setno = res->mmr_setno;
2551 
2552 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2553 		/* set not ready means we just crashed are restarted now */
2554 		/* Can only use the appropriate mutexes if they are inited */
2555 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2556 			rw_wrlock(&set_desc_rwlock[setno]);
2557 			rw_wrlock(&client_rwlock[setno]);
2558 			err = mdmn_init_set(setno, MDMN_SET_READY);
2559 			rw_unlock(&client_rwlock[setno]);
2560 			rw_unlock(&set_desc_rwlock[setno]);
2561 		} else {
2562 			err = mdmn_init_set(setno, MDMN_SET_READY);
2563 		}
2564 
2565 		if (err) {
2566 			*retval = MDMNE_CANNOT_CONNECT;
2567 			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2568 			return (retval);
2569 		}
2570 	}
2571 
2572 	/* is this rpc request coming from a licensed node? */
2573 	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2574 		*retval = MDMNE_RPC_FAIL;
2575 		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2576 		return (retval);
2577 	}
2578 
2579 
2580 	commd_debug(MD_MMV_WAKE_M,
2581 	    "wake_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d "
2582 	    "from %d\n",
2583 	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype,
2584 	    res->mmr_sender);
2585 	/*
2586 	 * The mutex and cv are needed for waking up the thread
2587 	 * sleeping in mdmn_master_process_msg()
2588 	 */
2589 	mx = mdmn_get_master_table_mx(setno, class);
2590 	cv = mdmn_get_master_table_cv(setno, class);
2591 
2592 	/*
2593 	 * lookup the master wakeup table
2594 	 * If we find our message, we are on the master and
2595 	 * called by a slave that finished processing a message.
2596 	 * We store the results in the appropriate slot and
2597 	 * wakeup the thread (mdmn_master_process_msg()) waiting for them.
2598 	 */
2599 	mutex_lock(mx);
2600 	mdmn_get_master_table_id(setno, class, &master_table_id);
2601 	sender = mdmn_get_master_table_addr(setno, class);
2602 
2603 	if (MSGID_CMP(&(master_table_id), &(res->mmr_msgid))) {
2604 		if (sender == res->mmr_sender) {
2605 			mdmn_set_master_table_res(setno, class, res);
2606 			cond_signal(cv);
2607 			*retval = MDMNE_ACK;
2608 		} else {
2609 			/* id is correct but wrong sender (I smell a timeout) */
2610 			commd_debug(MD_MMV_WAKE_M,
2611 			    "wakeup master got unsolicited message: "
2612 			    "(%d, 0x%llx-%d) from %d\n",
2613 			    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender);
2614 			free_result(res);
2615 			*retval = MDMNE_TIMEOUT;
2616 		}
2617 	} else {
2618 		/* id is wrong, smells like a very late timeout */
2619 		commd_debug(MD_MMV_WAKE_M,
2620 		    "wakeup master got unsolicited message: "
2621 		    "(%d, 0x%llx-%d) from %d, expected (%d, 0x%llx-%d)\n",
2622 		    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender,
2623 		    MSGID_ELEMS(master_table_id));
2624 		free_result(res);
2625 		*retval = MDMNE_NO_WAKEUP_ENTRY;
2626 	}
2627 
2628 	mutex_unlock(mx);
2629 
2630 	return (retval);
2631 }
2632 
2633 /*
2634  * Lock a set/class combination.
2635  * This is mainly done for debug purpose.
2636  * This set/class combination immediately is blocked,
2637  * even in the middle of sending messages to multiple slaves.
2638  * This remains until the user issues a mdmn_comm_unlock_svc_1 for the same
2639  * set/class combination.
2640  *
2641  * Special messages of class MD_MSG_CLASS0 can never be locked.
2642  * 	e.g. MD_MN_MSG_VERBOSITY, MD_MN_MSG_ABORT
2643  *
2644  * That means, if MD_MSG_CLASS0 is specified, we lock all classes from
2645  * >= MD_MSG_CLASS1 to < MD_MN_NCLASSES
2646  *
2647  * set must be between 1 and MD_MAXSETS
2648  * class can be:
2649  *	MD_MSG_CLASS0 which means all other classes in this case
2650  *	or one specific class (< MD_MN_NCLASSES)
2651  *
2652  * Returns:
2653  *	MDMNE_ACK on sucess (locking a locked class is Ok)
2654  *	MDMNE_EINVAL if a parameter is out of range
2655  */
2656 
2657 /* ARGSUSED */
2658 int *
2659 mdmn_comm_lock_svc_1(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2660 {
2661 	int			*retval;
2662 	set_t			setno = msc->msc_set;
2663 	md_mn_msgclass_t	class = msc->msc_class;
2664 
2665 	retval = Malloc(sizeof (int));
2666 
2667 	/* check if the global initialization is done */
2668 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2669 		global_init();
2670 	}
2671 
2672 	/* is this rpc request coming from the local node ? */
2673 	if (check_license(rqstp, 0) == FALSE) {
2674 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2675 		*retval = MDMNE_RPC_FAIL;
2676 		return (retval);
2677 	}
2678 
2679 	/* Perform some range checking */
2680 	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2681 	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2682 		*retval = MDMNE_EINVAL;
2683 		return (retval);
2684 	}
2685 
2686 	commd_debug(MD_MMV_MISC, "lock: set=%d, class=%d\n", setno, class);
2687 	mutex_lock(&mdmn_busy_mutex[setno]);
2688 	if (class != MD_MSG_CLASS0) {
2689 		mdmn_mark_class_locked(setno, class);
2690 	} else {
2691 		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2692 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2693 			mdmn_mark_class_locked(setno, class);
2694 		}
2695 	}
2696 	mutex_unlock(&mdmn_busy_mutex[setno]);
2697 
2698 	*retval = MDMNE_ACK;
2699 	return (retval);
2700 }
2701 
2702 /*
2703  * Unlock a set/class combination.
2704  * set must be between 1 and MD_MAXSETS
2705  * class can be:
2706  *	MD_MSG_CLASS0 which means all other classes in this case (like above)
2707  *	or one specific class (< MD_MN_NCLASSES)
2708  *
2709  * Returns:
2710  *	MDMNE_ACK on sucess (unlocking an unlocked class is Ok)
2711  *	MDMNE_EINVAL if a parameter is out of range
2712  */
2713 /* ARGSUSED */
2714 int *
2715 mdmn_comm_unlock_svc_1(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2716 {
2717 	int			*retval;
2718 	set_t			setno  = msc->msc_set;
2719 	md_mn_msgclass_t	class  = msc->msc_class;
2720 
2721 	retval = Malloc(sizeof (int));
2722 
2723 	/* check if the global initialization is done */
2724 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2725 		global_init();
2726 	}
2727 
2728 	/* is this rpc request coming from the local node ? */
2729 	if (check_license(rqstp, 0) == FALSE) {
2730 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2731 		*retval = MDMNE_RPC_FAIL;
2732 		return (retval);
2733 	}
2734 
2735 	/* Perform some range checking */
2736 	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2737 	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2738 		*retval = MDMNE_EINVAL;
2739 		return (retval);
2740 	}
2741 	commd_debug(MD_MMV_MISC, "unlock: set=%d, class=%d\n", setno, class);
2742 
2743 	mutex_lock(&mdmn_busy_mutex[setno]);
2744 	if (class != MD_MSG_CLASS0) {
2745 		mdmn_mark_class_unlocked(setno, class);
2746 	} else {
2747 		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2748 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2749 			mdmn_mark_class_unlocked(setno, class);
2750 		}
2751 	}
2752 	mutex_unlock(&mdmn_busy_mutex[setno]);
2753 
2754 	*retval = MDMNE_ACK;
2755 	return (retval);
2756 }
2757 
2758 /*
2759  * mdmn_comm_suspend_svc_1(setno, class)
2760  *
2761  * Drain all outstanding messages for a given set/class combination
2762  * and don't allow new messages to be processed.
2763  *
2764  * Special messages of class MD_MSG_CLASS0 can never be locked.
2765  * 	e.g. MD_MN_MSG_VERBOSITY
2766  *
2767  * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2768  * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2769  *
2770  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2771  * one class as being suspended.
2772  * If messages for this class are currently on their way,
2773  * MDMNE_SET_NOT_DRAINED is returned. Otherwise MDMNE_ACK is returned.
2774  *
2775  * If class _is_ MD_COMM_ALL_CLASSES we drain all classes of this set.
2776  * Messages must be generated in ascending order.
2777  * This means, a message cannot create submessages with the same or lower class.
2778  * Draining messages must go from 1 to NCLASSES in order to ensure we don't
2779  * generate a hanging situation here.
2780  * We mark class 1 as being suspended.
2781  * if the class is not busy, we proceed with class 2
2782  * and so on
2783  * if a class *is* busy, we cannot continue here, but return
2784  * MDMNE_SET_NOT_DRAINED.
2785  * We expect the caller to hold on for some seconds and try again.
2786  * When that message, that held the class busy is done in
2787  * mdmn_master_process_msg(), mdmn_mark_class_unbusy() called.
2788  * There it is checked if the class is about to drain.
2789  * In that case it tries to drain all higher classes there.
2790  *
2791  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2792  * In that case we return MDMNE_SET_NOT_DRAINED if not all sets are
2793  * completely drained.
2794  *
2795  * Returns:
2796  *	MDMNE_ACK on sucess (set is drained, no outstanding messages)
2797  *	MDMNE_SET_NOT_DRAINED  if drain process is started, but there are
2798  *		still outstanding messages for this set(s)
2799  *	MDMNE_EINVAL if setno is out of range
2800  *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2801  */
2802 
2803 /* ARGSUSED */
2804 int *
2805 mdmn_comm_suspend_svc_1(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2806 {
2807 	int			*retval;
2808 	int			failure = 0;
2809 	set_t			startset, endset;
2810 	set_t			setno  = msc->msc_set;
2811 	md_mn_msgclass_t	oclass = msc->msc_class;
2812 #ifdef NOT_YET_NEEDED
2813 	uint_t			flags  = msc->msc_flags;
2814 #endif /* NOT_YET_NEEDED */
2815 	md_mn_msgclass_t	class;
2816 
2817 	retval = Malloc(sizeof (int));
2818 
2819 	/* check if the global initialization is done */
2820 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2821 		global_init();
2822 	}
2823 
2824 	/* is this rpc request coming from the local node ? */
2825 	if (check_license(rqstp, 0) == FALSE) {
2826 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2827 		*retval = MDMNE_RPC_FAIL;
2828 		return (retval);
2829 	}
2830 
2831 	commd_debug(MD_MMV_MISC, "suspend: called for set=%d class=%d\n",
2832 	    setno, oclass);
2833 
2834 	/* Perform some range checking */
2835 	if (setno >= MD_MAXSETS) {
2836 		*retval = MDMNE_EINVAL;
2837 		commd_debug(MD_MMV_MISC, "suspend: returning MDMNE_EINVAL\n");
2838 		return (retval);
2839 	}
2840 
2841 	/*  setno == MD_COMM_ALL_SETS means: we walk thru all possible sets. */
2842 	if (setno == MD_COMM_ALL_SETS) {
2843 		startset = 1;
2844 		endset = MD_MAXSETS - 1;
2845 	} else {
2846 		startset = setno;
2847 		endset = setno;
2848 	}
2849 
2850 	for (setno = startset; setno <= endset; setno++) {
2851 		/* Here we need the mutexes for the set to be setup */
2852 		if (md_mn_set_inited[setno] != MDMN_SET_MUTEXES) {
2853 			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
2854 		}
2855 
2856 		mutex_lock(&mdmn_busy_mutex[setno]);
2857 		/* shall we drain all classes of this set? */
2858 		if (oclass == MD_COMM_ALL_CLASSES) {
2859 			for (class = 1; class < MD_MN_NCLASSES; class ++) {
2860 				commd_debug(MD_MMV_MISC,
2861 				    "suspend: suspending set %d, class %d\n",
2862 				    setno, class);
2863 				*retval = mdmn_mark_class_suspended(setno,
2864 				    class, MDMN_SUSPEND_ALL);
2865 				if (*retval == MDMNE_SET_NOT_DRAINED) {
2866 					failure++;
2867 				}
2868 			}
2869 		} else {
2870 			/* only drain one specific class */
2871 			commd_debug(MD_MMV_MISC,
2872 			    "suspend: suspending set=%d class=%d\n",
2873 			    setno, oclass);
2874 			*retval = mdmn_mark_class_suspended(setno, oclass,
2875 			    MDMN_SUSPEND_1);
2876 			if (*retval == MDMNE_SET_NOT_DRAINED) {
2877 				failure++;
2878 			}
2879 		}
2880 		mutex_unlock(&mdmn_busy_mutex[setno]);
2881 	}
2882 	/* If one or more sets are not entirely drained, failure is non-zero */
2883 	if (failure != 0) {
2884 		*retval = MDMNE_SET_NOT_DRAINED;
2885 		commd_debug(MD_MMV_MISC,
2886 		    "suspend: returning MDMNE_SET_NOT_DRAINED\n");
2887 	} else {
2888 		*retval = MDMNE_ACK;
2889 	}
2890 
2891 	return (retval);
2892 }
2893 
2894 /*
2895  * mdmn_comm_resume_svc_1(setno, class)
2896  *
2897  * Resume processing messages for a given set.
2898  * This incorporates the repeal of a previous suspend operation.
2899  *
2900  * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2901  * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2902  *
2903  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2904  * one class as being resumed.
2905  *
2906  * If class _is_ MD_COMM_ALL_CLASSES we resume all classes of this set.
2907  *
2908  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2909  *
2910  * If both setno is MD_COMM_ALL_SETS and class is MD_COMM_ALL_CLASSES we also
2911  * reset any ABORT flag from the global state.
2912  *
2913  * Returns:
2914  *	MDMNE_ACK on sucess (resuming an unlocked set is Ok)
2915  *	MDMNE_EINVAL if setno is out of range
2916  *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2917  */
2918 /* ARGSUSED */
2919 int *
2920 mdmn_comm_resume_svc_1(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2921 {
2922 	int			*retval;
2923 	set_t			startset, endset;
2924 	set_t			setno  = msc->msc_set;
2925 	md_mn_msgclass_t	oclass = msc->msc_class;
2926 	uint_t			flags  = msc->msc_flags;
2927 	md_mn_msgclass_t	class;
2928 
2929 	retval = Malloc(sizeof (int));
2930 
2931 	/* check if the global initialization is done */
2932 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2933 		global_init();
2934 	}
2935 
2936 	/* is this rpc request coming from the local node ? */
2937 	if (check_license(rqstp, 0) == FALSE) {
2938 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2939 		*retval = MDMNE_RPC_FAIL;
2940 		return (retval);
2941 	}
2942 
2943 	commd_debug(MD_MMV_MISC, "resume: called for set=%d class=%d\n",
2944 	    setno, oclass);
2945 
2946 	/* Perform some range checking */
2947 	if (setno > MD_MAXSETS) {
2948 		*retval = MDMNE_EINVAL;
2949 		return (retval);
2950 	}
2951 
2952 	if (setno == MD_COMM_ALL_SETS) {
2953 		startset = 1;
2954 		endset = MD_MAXSETS - 1;
2955 		if (oclass == MD_COMM_ALL_CLASSES) {
2956 			/* This is the point where we "unabort" the commd */
2957 			commd_debug(MD_MMV_MISC, "resume: resetting ABORT\n");
2958 			md_commd_global_state &= ~MD_CGS_ABORTED;
2959 		}
2960 	} else {
2961 		startset = setno;
2962 		endset = setno;
2963 	}
2964 
2965 	for (setno = startset; setno <= endset; setno++) {
2966 
2967 		/* Here we need the mutexes for the set to be setup */
2968 		if ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0) {
2969 			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
2970 		}
2971 
2972 		mutex_lock(&mdmn_busy_mutex[setno]);
2973 
2974 		if (oclass == MD_COMM_ALL_CLASSES) {
2975 			int end_class = 1;
2976 			/*
2977 			 * When SUSPENDing all classes, we go
2978 			 * from 1 to MD_MN_NCLASSES-1
2979 			 * The correct reverse action is RESUMing
2980 			 * from MD_MN_NCLASSES-1 to 1 (or 2)
2981 			 */
2982 
2983 			if (flags & MD_MSCF_DONT_RESUME_CLASS1) {
2984 				end_class = 2;
2985 			}
2986 
2987 			/*
2988 			 * Then mark all classes of this set as no longer
2989 			 * suspended. This supersedes any previous suspend(1)
2990 			 * calls and resumes the set entirely.
2991 			 */
2992 			for (class = MD_MN_NCLASSES - 1; class >= end_class;
2993 			    class --) {
2994 				commd_debug(MD_MMV_MISC,
2995 				    "resume: resuming set=%d class=%d\n",
2996 				    setno, class);
2997 				mdmn_mark_class_resumed(setno, class,
2998 				    (MDMN_SUSPEND_ALL | MDMN_SUSPEND_1));
2999 			}
3000 		} else {
3001 			/*
3002 			 * In this case only one class is marked as not
3003 			 * suspended. If a suspend(all) is currently active for
3004 			 * this set, this class will still be suspended.
3005 			 * That state will be cleared by a suspend(all)
3006 			 * (see above)
3007 			 */
3008 			commd_debug(MD_MMV_MISC,
3009 			    "resume: resuming set=%d class=%d\n",
3010 			    setno, oclass);
3011 			mdmn_mark_class_resumed(setno, oclass, MDMN_SUSPEND_1);
3012 		}
3013 
3014 		mutex_unlock(&mdmn_busy_mutex[setno]);
3015 	}
3016 
3017 	*retval = MDMNE_ACK;
3018 	return (retval);
3019 }
3020 /* ARGSUSED */
3021 int *
3022 mdmn_comm_reinit_set_svc_1(set_t *setnop, struct svc_req *rqstp)
3023 {
3024 	int		*retval;
3025 	md_mnnode_desc	*node;
3026 	set_t		 setno = *setnop;
3027 
3028 	retval = Malloc(sizeof (int));
3029 
3030 	/* check if the global initialization is done */
3031 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3032 		global_init();
3033 	}
3034 
3035 	/* is this rpc request coming from the local node ? */
3036 	if (check_license(rqstp, 0) == FALSE) {
3037 		xdr_free(xdr_set_t, (caddr_t)setnop);
3038 		*retval = MDMNE_RPC_FAIL;
3039 		return (retval);
3040 	}
3041 
3042 	commd_debug(MD_MMV_MISC, "reinit: set=%d\n", setno);
3043 
3044 	rw_rdlock(&set_desc_rwlock[setno]);
3045 	/*
3046 	 * We assume, that all messages have been suspended previously.
3047 	 *
3048 	 * As we are modifying lots of clients here we grab the client_rwlock
3049 	 * in writer mode. This ensures, no new messages come in.
3050 	 */
3051 	rw_wrlock(&client_rwlock[setno]);
3052 	/* This set is no longer initialized */
3053 
3054 	if ((set_descriptor[setno] != NULL) &&
3055 	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
3056 		/* destroy all rpc clients from this set */
3057 		for (node = set_descriptor[setno]->sd_nodelist; node;
3058 		    node = node->nd_next) {
3059 			mdmn_clnt_destroy(client[setno][node->nd_nodeid]);
3060 			if (client[setno][node->nd_nodeid] != (CLIENT *)NULL) {
3061 				client[setno][node->nd_nodeid] = (CLIENT *)NULL;
3062 			}
3063 		}
3064 	md_mn_set_inited[setno] &= ~MDMN_SET_NODES;
3065 	}
3066 
3067 	commd_debug(MD_MMV_MISC, "reinit: done init_set(%d)\n", setno);
3068 
3069 	rw_unlock(&client_rwlock[setno]);
3070 	rw_unlock(&set_desc_rwlock[setno]);
3071 	*retval = MDMNE_ACK;
3072 	return (retval);
3073 }
3074 
3075 /*
3076  * This is just an interface for testing purpose.
3077  * Here we can disable single message types.
3078  * If we block a message type, this is valid for all MN sets.
3079  * If a message arrives later, and  it's message type is blocked, it will
3080  * be returned immediately with MDMNE_CLASS_LOCKED, which causes the sender to
3081  * resend this message over and over again.
3082  */
3083 
3084 /* ARGSUSED */
3085 int *
3086 mdmn_comm_msglock_svc_1(md_mn_type_and_lock_t *mmtl, struct svc_req *rqstp)
3087 {
3088 	int			*retval;
3089 	md_mn_msgtype_t		type = mmtl->mmtl_type;
3090 	uint_t			lock = mmtl->mmtl_lock;
3091 
3092 	retval = Malloc(sizeof (int));
3093 
3094 	/* check if the global initialization is done */
3095 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3096 		global_init();
3097 	}
3098 
3099 	/* is this rpc request coming from the local node ? */
3100 	if (check_license(rqstp, 0) == FALSE) {
3101 		xdr_free(xdr_md_mn_type_and_lock_t, (caddr_t)mmtl);
3102 		*retval = MDMNE_RPC_FAIL;
3103 		return (retval);
3104 	}
3105 
3106 	/* Perform some range checking */
3107 	if ((type == 0) || (type >= MD_MN_NMESSAGES)) {
3108 		*retval = MDMNE_EINVAL;
3109 		return (retval);
3110 	}
3111 
3112 	commd_debug(MD_MMV_MISC, "msglock: type=%d, lock=%d\n", type, lock);
3113 	msgtype_lock_state[type] = lock;
3114 
3115 	*retval = MDMNE_ACK;
3116 	return (retval);
3117 }
3118