xref: /titanic_41/usr/src/cmd/lvm/rpc.mdcommd/mdmn_commd_server.c (revision 2e02daeede04af58a9d4f18f8dfed1fda3ececa1)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include <unistd.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <sys/statvfs.h>
31 #include <sys/uadmin.h>
32 #include <sys/resource.h>
33 #include <fcntl.h>
34 #include <stdio.h>
35 #include <thread.h>
36 #include <meta.h>
37 #include <sdssc.h>
38 #include <mdmn_changelog.h>
39 #include "mdmn_subr.h"
40 
41 /*
42  * This is the communication daemon for SVM Multi Node Disksets.
43  * It runs on every node and provides the following rpc services:
44  *  - mdmn_send_svc_2
45  *  - mdmn_work_svc_2
46  *  - mdmn_wakeup_initiator_svc_2
47  *  - mdmn_wakeup_master_svc_2
48  *  - mdmn_comm_lock_svc_2
49  *  - mdmn_comm_unlock_svc_2
50  *  - mdmn_comm_suspend_svc_2
51  *  - mdmn_comm_resume_svc_2
52  *  - mdmn_comm_reinit_set_svc_2
53  * where send, lock, unlock and reinit are meant for external use,
54  * work and the two wakeups are for internal use only.
55  *
56  * NOTE:
57  * On every node only one of those xxx_2 functions can be active at the
58  * same time because the daemon is single threaded.
59  *
60  * (not quite true, as mdmn_send_svc_2 and mdmn_work_svc_2 do thr_create()s
61  * as part of their handlers, so those aspects are multi-threaded)
62  *
63  * In case an event occurs that has to be propagated to all the nodes...
64  *
65  * One node (the initiator)
66  *	calls the libmeta function mdmn_send_message()
67  *	This function calls the local daemon thru mdmn_send_svc_2.
68  *
69  * On the initiator:
70  *	mdmn_send_svc_2()
71  *	    - starts a thread -> mdmn_send_to_work() and returns.
72  *	mdmn_send_to_work()
73  *	    - sends this message over to the master of the diskset.
74  *	      This is done by calling mdmn_work_svc_2 on the master.
75  *	    - registers to the initiator_table
76  *	    - exits without doing a svc_sendreply() for the call to
77  *	      mdmn_send_svc_2. This means that call is blocked until somebody
78  *	      (see end of this comment) does a svc_sendreply().
79  *	      This means mdmn_send_message() does not yet return.
80  *	    - A timeout surveillance is started at this point.
81  *	      This means in case the master doesn't reply at all in an
82  *	      aproppriate time, an error condition is returned
83  *	      to the caller.
84  *
85  * On the master:
86  *	mdmn_work_svc_2()
87  *	    - starts a thread -> mdmn_master_process_msg() and returns
88  *	mdmn_master_process_msg()
89  *	    - logs the message to the change log
90  *	    - executes the message locally
91  *	    - flags the message in the change log
92  *	    - sends the message to mdmn_work_svc_2() on all the
93  *	      other nodes (slaves)
94  *	      after each call to mdmn_work_svc_2 the thread goes to sleep and
95  *	      will be woken up by mdmn_wakeup_master_svc_2() as soon as the
96  *	      slave node is done with this message.
97  *	    - In case the slave doesn't respond in a apropriate time, an error
98  *	      is assumed to ensure the master doesn't wait forever.
99  *
100  * On a slave:
101  *	mdmn_work_svc_2()
102  *	    - starts a thread -> mdmn_slave_process_msg() and returns
103  *	mdmn_slave_process_msg()
104  *	    - processes this message locally by calling the appropriate message
105  *	      handler, that creates some result.
106  *	    - sends that result thru a call to mdmn_wakeup_master_svc_2() to
107  *	      the master.
108  *
109  * Back on the master:
110  *	mdmn_wakeup_master_svc_2()
111  *	    - stores the result into the master_table.
112  *	    - signals the mdmn_master_process_msg-thread.
113  *	    - returns
114  *	mdmn_master_process_msg()
115  *	    - after getting the results from all nodes
116  *	    - sends them back to the initiating node thru a call to
117  *	      mdmn_wakeup_initiator_svc_2.
118  *
119  * Back on the initiator:
120  *	mdmn_wakeup_initiator_svc_2()
121  *	    - calls svc_sendreply() which makes the call to mdmn_send_svc_2()
122  *	      return.
123  *	      which allows the initial mdmn_send_message() call to return.
124  */
125 
126 FILE *commdout;		/* debug output for the commd */
127 char *commdoutfile;	/* file name for the above output */
128 /* want at least 10 MB free space when logging into a file */
129 #define	MIN_FS_SPACE	(10LL * 1024 * 1024)
130 
131 /*
132  * Number of outstanding messages that were initiated by this node.
133  * If zero, check_timeouts goes to sleep
134  */
135 uint_t	messages_on_their_way;
136 mutex_t	check_timeout_mutex;	/* need mutex to protect above */
137 cond_t	check_timeout_cv;	/* trigger for check_timeouts */
138 
139 /* for printing out time stamps */
140 hrtime_t __savetime;
141 
142 /* RPC clients for every set and every node and their protecting locks */
143 CLIENT	*client[MD_MAXSETS][NNODES];
144 rwlock_t client_rwlock[MD_MAXSETS];
145 
146 /* the descriptors of all possible sets and their protectors */
147 struct md_set_desc *set_descriptor[MD_MAXSETS];
148 rwlock_t set_desc_rwlock[MD_MAXSETS];
149 
150 /* the daemon to daemon communication has to timeout quickly */
151 static struct timeval FOUR_SECS = { 4, 0 };
152 
153 /* These indicate if a set has already been setup */
154 int md_mn_set_inited[MD_MAXSETS];
155 
156 /* For every set we have a message completion table and protecting mutexes */
157 md_mn_mct_t *mct[MD_MAXSETS];
158 mutex_t	mct_mutex[MD_MAXSETS][MD_MN_NCLASSES];
159 
160 /* Stuff to describe the global status of the commd on one node */
161 #define	MD_CGS_INITED		0x0001
162 #define	MD_CGS_ABORTED		0x0002	/* return everything with MDMNE_ABORT */
163 uint_t md_commd_global_state = 0;	/* No state when starting up */
164 
165 /*
166  * Global verbosity level for the daemon
167  */
168 uint_t md_commd_global_verb;
169 
170 /*
171  * libmeta doesn't like multiple threads in metaget_setdesc().
172  * So we must protect access to it with a global lock
173  */
174 mutex_t get_setdesc_mutex;
175 
176 /*
177  * Need a way to block single message types,
178  * hence an array with a status for every message type
179  */
180 uint_t msgtype_lock_state[MD_MN_NMESSAGES];
181 
182 /* for reading in the config file */
183 #define	MAX_LINE_SIZE 1024
184 
185 extern char *commd_get_outfile(void);
186 extern uint_t commd_get_verbosity(void);
187 
188 /*
189  * mdmn_clnt_create is a helper function for meta_client_create_retry.  It
190  * merely needs to call clnt_create_timed, and meta_client_create_retry
191  * will take care of the rest.
192  */
193 /* ARGSUSED */
194 static CLIENT *
195 mdmn_clnt_create(char *ignore, void *data, struct timeval *time_out)
196 {
197 	md_mnnode_desc	*node = (md_mnnode_desc *)data;
198 
199 	return (clnt_create_timed(node->nd_priv_ic, MDMN_COMMD, TWO, "tcp",
200 	    time_out));
201 }
202 
203 #define	FLUSH_DEBUGFILE() \
204 	if (commdout != (FILE *)NULL) { \
205 		fflush(commdout); \
206 		fsync(fileno(commdout)); \
207 	}
208 
209 static void
210 panic_system(int nid, md_mn_msgtype_t type, int master_err, int master_exitval,
211     md_mn_result_t *slave_result)
212 {
213 	md_mn_commd_err_t	commd_err;
214 	md_error_t		mne = mdnullerror;
215 	char			*msg_buf;
216 
217 	msg_buf = (char *)calloc(MAXPATHLEN + 1, sizeof (char));
218 
219 	FLUSH_DEBUGFILE();
220 
221 	if (master_err != MDMNE_ACK) {
222 		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail on master "
223 		    "when processing message type %d\n", type);
224 	} else if (slave_result == NULL) {
225 		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail on node "
226 		    "%d when processing message type %d\n", nid, type);
227 	} else {
228 		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: Inconsistent "
229 		    "return value from node %d when processing message "
230 		    "type %d. Master exitval = %d, Slave exitval = %d\n",
231 		    nid, type, master_exitval, slave_result->mmr_exitval);
232 	}
233 	commd_err.size = strlen(msg_buf);
234 	commd_err.md_message = (uint64_t)(uintptr_t)&msg_buf[0];
235 
236 	metaioctl(MD_MN_COMMD_ERR, &commd_err, &mne, "rpc.mdcommd");
237 	(void) uadmin(A_DUMP, AD_BOOT, NULL);
238 }
239 
240 static void
241 flush_fcout()
242 {
243 	struct statvfs64 vfsbuf;
244 	long long avail_bytes;
245 	int warned = 0;
246 
247 	for (; ; ) {
248 		sleep(10);
249 		/* No output file, nothing to do */
250 		if (commdout == (FILE *)NULL)
251 			continue;
252 
253 		/*
254 		 * stat the appropriate filesystem to check for available space.
255 		 */
256 		if (statvfs64(commdoutfile, &vfsbuf)) {
257 			continue;
258 		}
259 
260 		avail_bytes = vfsbuf.f_frsize * vfsbuf.f_bavail;
261 		/*
262 		 * If we don't have enough space, we print out a warning.
263 		 * And we drop the verbosity level to NULL
264 		 * In case the condtion doesn't go away, we don't repeat
265 		 * the warning.
266 		 */
267 		if (avail_bytes < MIN_FS_SPACE) {
268 			if (warned) {
269 				continue;
270 			}
271 			commd_debug(MD_MMV_SYSLOG,
272 			    "NOT enough space available for logging\n");
273 			commd_debug(MD_MMV_SYSLOG,
274 			    "Have %lld bytes, need %lld bytes\n",
275 			    avail_bytes, MIN_FS_SPACE);
276 			warned = 1;
277 			md_commd_global_verb = MD_MMV_NULL;
278 		} else {
279 			warned = 0;
280 		}
281 
282 		fflush(commdout);
283 	}
284 }
285 
286 /* safer version of clnt_destroy. If clnt is NULL don't do anything */
287 #define	mdmn_clnt_destroy(clnt) {	\
288 	if (clnt)			\
289 		clnt_destroy(clnt);	\
290 }
291 
292 /*
293  * Own version of svc_sendreply that checks the integrity of the transport
294  * handle and so prevents us from core dumps in the real svc_sendreply()
295  */
296 void
297 mdmn_svc_sendreply(SVCXPRT *transp, xdrproc_t xdr, caddr_t data)
298 {
299 	if (SVC_STAT(transp) == XPRT_DIED) {
300 		commd_debug(MD_MMV_MISC,
301 		    "mdmn_svc_sendreply: XPRT_DIED\n");
302 		return;
303 	}
304 	(void) svc_sendreply(transp, xdr, data);
305 }
306 
307 /*
308  * timeout_initiator(set, class)
309  *
310  * Alas, I sent a message and didn't get a response back in aproppriate time.
311  *
312  * timeout_initiator() takes care for doing the needed svc_sendreply() to the
313  * calling mdmn_send_message, so that guy doesn't wait forever
314  * What is done here is pretty much the same as what is done in
315  * wakeup initiator. The difference is that we cannot provide for any results,
316  * of course and we set the comm_state to MDMNE_TIMEOUT.
317  *
318  * By doing so, mdmn_send_message can decide if a retry would make sense or not.
319  * It's not our's to decide that here.
320  */
321 void
322 timeout_initiator(set_t setno, md_mn_msgclass_t class)
323 {
324 	SVCXPRT		*transp;
325 	md_mn_msgid_t	mid;
326 	md_mn_result_t *resultp;
327 
328 	resultp = Zalloc(sizeof (md_mn_result_t));
329 	resultp->mmr_comm_state	= MDMNE_TIMEOUT;
330 
331 	commd_debug(MD_MMV_MISC,
332 	    "timeout_initiator set = %d, class = %d\n", setno, class);
333 
334 	transp = mdmn_get_initiator_table_transp(setno, class);
335 	mdmn_get_initiator_table_id(setno, class, &mid);
336 
337 	commd_debug(MD_MMV_MISC, "timeout_ini: (%d, 0x%llx-%d)\n",
338 	    MSGID_ELEMS(mid));
339 	/*
340 	 * Give the result the corresponding msgid from the failed message.
341 	 */
342 	MSGID_COPY(&mid, &(resultp->mmr_msgid));
343 
344 	/* return to mdmn_send_message() and let it deal with the situation */
345 	mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
346 
347 	free(resultp);
348 	commd_debug(MD_MMV_MISC, "timeout_ini: sendreplied\n");
349 	svc_done(transp);
350 	mdmn_unregister_initiator_table(setno, class);
351 }
352 
353 
354 /*
355  * check_timeouts - thread
356  *
357  * This implements a timeout surveillance for messages sent from the
358  * initiator to the master.
359  *
360  * If a message is started, this thread is triggered thru
361  * cond_signal(&check_timeout_cv) and we keep track of the numbers of
362  * messages that are outstanding (messages_on_their_way).
363  *
364  * As long as there are messages on their way, this thread never goes to sleep.
365  * It'll keep checking all class/set combinations for outstanding messages.
366  * If one is found, it's checked if this message is overdue. In that case,
367  * timeout_initiator() is called to wakeup the calling mdmn_send_message and
368  * to clean up the mess.
369  *
370  * If the result from the master arrives later, this message is considered
371  * to be unsolicited. And will be ignored.
372  */
373 
374 void
375 check_timeouts()
376 {
377 	set_t			setno;
378 	time_t			now, then;
379 	mutex_t			*mx;
380 	md_mn_msgclass_t	class;
381 
382 	for (; ; ) {
383 		now = time((time_t *)NULL);
384 		for (setno = 1; setno < MD_MAXSETS; setno++) {
385 			if (md_mn_set_inited[setno] != MDMN_SET_READY) {
386 				continue;
387 			}
388 			for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES;
389 			    class++) {
390 				mx = mdmn_get_initiator_table_mx(setno, class);
391 				mutex_lock(mx);
392 
393 				/* then is the registered time */
394 				then =
395 				    mdmn_get_initiator_table_time(setno, class);
396 				if ((then != 0) && (now > then)) {
397 					timeout_initiator(setno, class);
398 				}
399 				mutex_unlock(mx);
400 			}
401 		}
402 		/* it's ok to check only once per second */
403 		sleep(1);
404 
405 		/* is there work to do? */
406 		mutex_lock(&check_timeout_mutex);
407 		if (messages_on_their_way == 0) {
408 			cond_wait(&check_timeout_cv, &check_timeout_mutex);
409 		}
410 		mutex_unlock(&check_timeout_mutex);
411 	}
412 }
413 
414 void
415 setup_debug(void)
416 {
417 	char	*tmp_dir;
418 
419 	/* Read in the debug-controlling tokens from runtime.cf */
420 	md_commd_global_verb = commd_get_verbosity();
421 	/*
422 	 * If the user didn't specify a verbosity level in runtime.cf
423 	 * we can safely return here. As we don't intend to printout
424 	 * debug messages, we don't need to check for the output file.
425 	 */
426 	if (md_commd_global_verb == 0) {
427 		return;
428 	}
429 
430 	/* if commdout is non-NULL it is an open FILE, we'd better close it */
431 	if (commdout != (FILE *)NULL) {
432 		fclose(commdout);
433 	}
434 
435 	commdoutfile = commd_get_outfile();
436 
437 	/* setup the debug output */
438 	if (commdoutfile == (char *)NULL) {
439 		/* if no valid file was specified, use the default */
440 		commdoutfile = "/var/run/commd.out";
441 		commdout = fopen(commdoutfile, "a");
442 	} else {
443 		/* check if the directory exists and is writable */
444 		tmp_dir = strdup(commdoutfile);
445 		if ((access(dirname(tmp_dir), X_OK|W_OK)) ||
446 		    ((commdout = fopen(commdoutfile, "a")) == (FILE *)NULL)) {
447 			syslog(LOG_ERR,
448 			    "Can't write to specified output file %s,\n"
449 			    "using /var/run/commd.out instead\n", commdoutfile);
450 			free(commdoutfile);
451 			commdoutfile = "/var/run/commd.out";
452 			commdout = fopen(commdoutfile, "a");
453 		}
454 		free(tmp_dir);
455 	}
456 
457 	if (commdout == (FILE *)NULL) {
458 		syslog(LOG_ERR, "Can't write to debug output file %s\n",
459 		    commdoutfile);
460 	}
461 }
462 
463 /*
464  * mdmn_is_node_dead checks to see if a node is dead using
465  * the SunCluster infrastructure which is a stable interface.
466  * If unable to contact SunCuster the node is assumed to be alive.
467  * Return values:
468  *	1 - node is dead
469  *	0 - node is alive
470  */
471 int
472 mdmn_is_node_dead(md_mnnode_desc *node)
473 {
474 	char	*fmt = "/usr/cluster/bin/scha_cluster_get -O NODESTATE_NODE ";
475 	char	*cmd;
476 	size_t	size;
477 	char	buf[10];
478 	FILE	*ptr;
479 	int	retval = 0;
480 
481 	/* I know that I'm alive */
482 	if (strcmp(node->nd_nodename, mynode()) == 0)
483 		return (retval);
484 
485 	size = strlen(fmt) + strlen(node->nd_nodename) + 1;
486 	cmd = Zalloc(size);
487 	(void) strlcat(cmd, fmt, size);
488 	(void) strlcat(cmd, node->nd_nodename, size);
489 
490 	if ((ptr = popen(cmd, "r")) != NULL) {
491 		if (fgets(buf, sizeof (buf), ptr) != NULL) {
492 			/* If scha_cluster_get returned DOWN - return dead */
493 			if (strncmp(buf, "DOWN", 4) == 0)
494 				retval = 1;
495 		}
496 		(void) pclose(ptr);
497 	}
498 	Free(cmd);
499 	return (retval);
500 }
501 
502 /*
503  * global_init()
504  *
505  * Perform some global initializations.
506  *
507  * the following routines have to call this before operation can start:
508  *  - mdmn_send_svc_2
509  *  - mdmn_work_svc_2
510  *  - mdmn_comm_lock_svc_2
511  *  - mdmn_comm_unlock_svc_2
512  *  - mdmn_comm_suspend_svc_2
513  *  - mdmn_comm_resume_svc_2
514  *  - mdmn_comm_reinit_set_svc_2
515  *
516  * This is a single threaded daemon, so it can only be in one of the above
517  * routines at the same time.
518  * This means, global_init() cannot be called more than once at the same time.
519  * Hence, no lock is needed.
520  */
521 void
522 global_init(void)
523 {
524 	set_t			set;
525 	md_mn_msgclass_t	class;
526 	struct sigaction	sighandler;
527 	time_t			clock_val;
528 	struct rlimit		commd_limit;
529 
530 
531 
532 	/* Do these global initializations only once */
533 	if (md_commd_global_state & MD_CGS_INITED) {
534 		return;
535 	}
536 	(void) sdssc_bind_library();
537 
538 	/* setup the debug options from the config file */
539 	setup_debug();
540 
541 	/* make sure that we don't run out of file descriptors */
542 	commd_limit.rlim_cur = commd_limit.rlim_max = RLIM_INFINITY;
543 	if (setrlimit(RLIMIT_NOFILE, &commd_limit) != 0) {
544 		syslog(LOG_WARNING, gettext("setrlimit failed."
545 		    "Could not increase the max file descriptors"));
546 	}
547 
548 	/* Make setup_debug() be the action in case of SIGHUP */
549 	sighandler.sa_flags = 0;
550 	sigfillset(&sighandler.sa_mask);
551 	sighandler.sa_handler = (void (*)(int)) setup_debug;
552 	sigaction(SIGHUP, &sighandler, NULL);
553 
554 	__savetime = gethrtime();
555 	(void) time(&clock_val);
556 	commd_debug(MD_MMV_MISC, "global init called %s\n", ctime(&clock_val));
557 
558 	/* start a thread that flushes out the debug on a regular basis */
559 	thr_create(NULL, 0, (void *(*)(void *))flush_fcout,
560 	    (void *) NULL, THR_DETACHED, NULL);
561 
562 	/* global rwlock's / mutex's / cond_t's go here */
563 	mutex_init(&check_timeout_mutex, USYNC_THREAD, NULL);
564 	cond_init(&check_timeout_cv, USYNC_THREAD, NULL);
565 	mutex_init(&get_setdesc_mutex, USYNC_THREAD, NULL);
566 
567 	/* Make sure the initiator table is initialized correctly */
568 	for (set = 0; set < MD_MAXSETS; set++) {
569 		for (class = 0; class < MD_MN_NCLASSES; class++) {
570 			mdmn_unregister_initiator_table(set, class);
571 		}
572 	}
573 
574 
575 	/* setup the check for timeouts */
576 	thr_create(NULL, 0, (void *(*)(void *))check_timeouts,
577 	    (void *) NULL, THR_DETACHED, NULL);
578 
579 	md_commd_global_state |= MD_CGS_INITED;
580 }
581 
582 
583 /*
584  * mdmn_init_client(setno, nodeid)
585  * called if client[setno][nodeid] is NULL
586  *
587  * NOTE: Must be called with set_desc_rwlock held as a reader
588  * NOTE: Must be called with client_rwlock held as a writer
589  *
590  * If the rpc client for this node has not been setup for any set, we do it now.
591  *
592  * Returns	0 on success (node found in set, rpc client setup)
593  *		-1 if metaget_setdesc failed,
594  *		-2 if node not part of set
595  *		-3 if clnt_create fails
596  */
597 static int
598 mdmn_init_client(set_t setno, md_mn_nodeid_t nid)
599 {
600 	md_error_t	ep = mdnullerror;
601 	md_mnnode_desc	*node;
602 	md_set_desc	*sd;	/* just an abbr for set_descriptor[setno] */
603 
604 	sd = set_descriptor[setno];
605 
606 	/*
607 	 * Is the appropriate set_descriptor already initialized ?
608 	 * Can't think of a scenario where this is not the case, but we'd better
609 	 * check for it anyway.
610 	 */
611 	if (sd == NULL) {
612 		mdsetname_t	*sp;
613 
614 		rw_unlock(&set_desc_rwlock[setno]); /* readlock -> writelock */
615 		rw_wrlock(&set_desc_rwlock[setno]);
616 		sp = metasetnosetname(setno, &ep);
617 		/* Only one thread is supposed to be in metaget_setdesc() */
618 		mutex_lock(&get_setdesc_mutex);
619 		sd = metaget_setdesc(sp, &ep);
620 		mutex_unlock(&get_setdesc_mutex);
621 		if (sd == NULL) {
622 			rw_unlock(&set_desc_rwlock[setno]); /* back to ... */
623 			rw_rdlock(&set_desc_rwlock[setno]); /* ... readlock */
624 			return (-1);
625 		}
626 		set_descriptor[setno] = sd;
627 		rw_unlock(&set_desc_rwlock[setno]); /* back to readlock */
628 		rw_rdlock(&set_desc_rwlock[setno]);
629 	}
630 
631 	/* first we have to find the node name for this node id */
632 	for (node = sd->sd_nodelist; node; node = node->nd_next) {
633 		if (node->nd_nodeid == nid)
634 			break; /* we found our node in this set */
635 	}
636 
637 
638 	if (node == (md_mnnode_desc *)NULL) {
639 		commd_debug(MD_MMV_SYSLOG,
640 		    "FATAL: node %d not found in set %d\n", nid, setno);
641 		rw_unlock(&set_desc_rwlock[setno]);
642 		return (-2);
643 	}
644 
645 	commd_debug(MD_MMV_INIT, "init: %s has the flags: 0x%x\n",
646 	    node->nd_nodename ? node->nd_nodename : "NULL", node->nd_flags);
647 
648 	/* Did this node join the diskset?  */
649 	if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
650 		commd_debug(MD_MMV_INIT, "init: %s didn't join set %d\n",
651 		    node->nd_nodename ? node->nd_nodename : "NULL", setno);
652 		rw_unlock(&set_desc_rwlock[setno]);
653 		return (-2);
654 	}
655 
656 	/* if clnt_create has not been done for that node, do it now */
657 	if (client[setno][nid] == (CLIENT *) NULL) {
658 		time_t	tout = 0;
659 
660 		/*
661 		 * While trying to create a connection to a node,
662 		 * periodically check to see if the node has been marked
663 		 * dead by the SunCluster infrastructure.
664 		 * This periodic check is needed since a non-responsive
665 		 * rpc.mdcommd (while it is attempting to create a connection
666 		 * to a dead node) can lead to large delays and/or failures
667 		 * in the reconfig steps.
668 		 */
669 		while ((client[setno][nid] == (CLIENT *) NULL) &&
670 		    (tout < MD_CLNT_CREATE_TOUT)) {
671 			client[setno][nid] = meta_client_create_retry(
672 			    node->nd_nodename, mdmn_clnt_create,
673 			    (void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
674 			/* Is the node dead? */
675 			if (mdmn_is_node_dead(node) == 1) {
676 				commd_debug(MD_MMV_SYSLOG,
677 				    "rpc.mdcommd: no client for dead node %s\n",
678 				    node->nd_nodename);
679 				break;
680 			} else
681 				tout += MD_CLNT_CREATE_SUBTIMEOUT;
682 		}
683 
684 		if (client[setno][nid] == (CLIENT *) NULL) {
685 			clnt_pcreateerror(node->nd_nodename);
686 			rw_unlock(&set_desc_rwlock[setno]);
687 			return (-3);
688 		}
689 		/* this node has the license to send */
690 		commd_debug(MD_MMV_MISC, "init_client: calling add_lic\n");
691 		add_license(node);
692 
693 		/* set the timeout value */
694 		clnt_control(client[setno][nid], CLSET_TIMEOUT,
695 		    (char *)&FOUR_SECS);
696 
697 	}
698 	rw_unlock(&set_desc_rwlock[setno]);
699 	return (0);
700 }
701 
702 /*
703  * check_client(setno, nodeid)
704  *
705  * must be called with reader lock held for set_desc_rwlock[setno]
706  * and must be called with reader lock held for client_rwlock[setno]
707  * Checks if the client for this set/node combination is already setup
708  * if not it upgrades the lock to a writer lock
709  * and tries to initialize the client.
710  * Finally it's checked if the client nulled out again due to some race
711  *
712  * returns 0 if there is a usable client
713  * returns MDMNE_RPC_FAIL otherwise
714  */
715 static int
716 check_client(set_t setno, md_mn_nodeid_t nodeid)
717 {
718 	int ret = 0;
719 
720 	while ((client[setno][nodeid] == (CLIENT *)NULL) && (ret == 0)) {
721 		rw_unlock(&client_rwlock[setno]); /* upgrade reader ... */
722 		rw_wrlock(&client_rwlock[setno]); /* ... to writer lock. */
723 		if (mdmn_init_client(setno, nodeid) != 0) {
724 			ret = MDMNE_RPC_FAIL;
725 		}
726 		rw_unlock(&client_rwlock[setno]); /* downgrade writer ... */
727 		rw_rdlock(&client_rwlock[setno]); /* ... back to reader lock. */
728 	}
729 	return (ret);
730 }
731 
732 /*
733  * mdmn_init_set(setno, todo)
734  * setno is the number of the set to be initialized.
735  * todo is one of the MDMN_SET_* thingies or MDMN_SET_READY
736  * If called with MDMN_SET_READY everything is initialized.
737  *
738  * If the set mutexes are already initialized, the caller has to hold
739  * both set_desc_rwlock[setno] and client_rwlock[setno] as a writer, before
740  * calling mdmn_init_set()
741  */
742 int
743 mdmn_init_set(set_t setno, int todo)
744 {
745 	int class;
746 	md_mnnode_desc	*node;
747 	md_set_desc	*sd; /* just an abbr for set_descriptor[setno] */
748 	mdsetname_t	*sp;
749 	md_error_t	ep = mdnullerror;
750 	md_mn_nodeid_t	nid;
751 
752 	/*
753 	 * Check if we are told to setup the mutexes and
754 	 * if these are not yet setup
755 	 */
756 	if ((todo & MDMN_SET_MUTEXES) &&
757 	    ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0)) {
758 		mutex_init(&mdmn_busy_mutex[setno], USYNC_THREAD, NULL);
759 		cond_init(&mdmn_busy_cv[setno], USYNC_THREAD, NULL);
760 		rwlock_init(&client_rwlock[setno], USYNC_THREAD, NULL);
761 		rwlock_init(&set_desc_rwlock[setno], USYNC_THREAD, NULL);
762 
763 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
764 			mutex_init(mdmn_get_master_table_mx(setno, class),
765 			    USYNC_THREAD, NULL);
766 			cond_init(mdmn_get_master_table_cv(setno, class),
767 			    USYNC_THREAD, NULL);
768 			mutex_init(mdmn_get_initiator_table_mx(setno, class),
769 			    USYNC_THREAD, NULL);
770 		}
771 		md_mn_set_inited[setno] |= MDMN_SET_MUTEXES;
772 	}
773 	if ((todo & MDMN_SET_MCT) &&
774 	    ((md_mn_set_inited[setno] & MDMN_SET_MCT) == 0)) {
775 		int	fd;
776 		size_t	filesize;
777 		caddr_t	addr;
778 		char table_name[32];
779 
780 		filesize = (sizeof (md_mn_mct_t));
781 		(void) snprintf(table_name, sizeof (table_name), "%s%d",
782 		    MD_MN_MSG_COMP_TABLE, setno);
783 		/*
784 		 * If the mct file exists we map it into memory.
785 		 * Otherwise we create an empty file of appropriate
786 		 * size and map that into memory.
787 		 * The mapped areas are stored in mct[setno].
788 		 */
789 		fd = open(table_name, O_RDWR|O_CREAT|O_DSYNC, 0600);
790 		if (fd < 0) {
791 			commd_debug(MD_MMV_MISC,
792 			    "init_set: Can't open MCT\n");
793 			return (-1);
794 		}
795 		/*
796 		 * To ensure that the file has the appropriate size,
797 		 * we write a byte at the end of the file.
798 		 */
799 		lseek(fd, filesize + 1, SEEK_SET);
800 		write(fd, "\0", 1);
801 
802 		/* at this point we have a file in place that we can mmap */
803 		addr = mmap(0, filesize, PROT_READ | PROT_WRITE,
804 		    MAP_SHARED, fd, (off_t)0);
805 		if (addr == MAP_FAILED) {
806 			commd_debug(MD_MMV_INIT,
807 			    "init_set: mmap mct error %d\n",
808 			    errno);
809 			return (-1);
810 		}
811 		/* LINTED pointer alignment */
812 		mct[setno] = (md_mn_mct_t *)addr;
813 
814 		/* finally we initialize the mutexes that protect the mct */
815 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
816 			mutex_init(&(mct_mutex[setno][class]),
817 			    USYNC_THREAD, NULL);
818 		}
819 
820 		md_mn_set_inited[setno] |= MDMN_SET_MCT;
821 	}
822 	/*
823 	 * Check if we are told to setup the nodes and
824 	 * if these are not yet setup
825 	 * (Attention: negative logic here compared to above!)
826 	 */
827 	if (((todo & MDMN_SET_NODES) == 0) ||
828 	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
829 		return (0); /* success */
830 	}
831 
832 	if ((sp = metasetnosetname(setno, &ep)) == NULL) {
833 		commd_debug(MD_MMV_SYSLOG,
834 		    "metasetnosetname(%d) returned NULL\n", setno);
835 		return (MDMNE_NOT_JOINED);
836 	}
837 
838 	/* flush local copy of rpc.metad data */
839 	metaflushsetname(sp);
840 
841 	mutex_lock(&get_setdesc_mutex);
842 	sd = metaget_setdesc(sp, &ep);
843 	mutex_unlock(&get_setdesc_mutex);
844 
845 	if (sd == NULL) {
846 		commd_debug(MD_MMV_SYSLOG,
847 		    "metaget_setdesc(%d) returned NULL\n", setno);
848 		return (MDMNE_NOT_JOINED);
849 	}
850 
851 	/*
852 	 * if this set is not a multinode set or
853 	 * this node didn't join yet the diskset, better don't do anything
854 	 */
855 	if ((MD_MNSET_DESC(sd) == 0) ||
856 	    (sd->sd_mn_mynode->nd_flags & MD_MN_NODE_OWN) == 0) {
857 		commd_debug(MD_MMV_INIT, "didn't yet join set %d\n", setno);
858 		return (MDMNE_NOT_JOINED);
859 	}
860 
861 	for (node = sd->sd_nodelist; node != NULL; node = node->nd_next) {
862 		time_t	tout = 0;
863 		nid = node->nd_nodeid;
864 
865 		commd_debug(MD_MMV_INIT,
866 		    "setting up: node=%s, priv_ic=%s, flags=0x%x\n",
867 		    node->nd_nodename ? node->nd_nodename : "NULL",
868 		    node->nd_priv_ic ? node->nd_priv_ic : "NULL",
869 		    node->nd_flags);
870 
871 		if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
872 			commd_debug(MD_MMV_INIT,
873 			    "init: %s didn't join set %d\n",
874 			    node->nd_nodename ? node->nd_nodename : "NULL",
875 			    setno);
876 			continue;
877 		}
878 
879 		if (client[setno][nid] != (CLIENT *) NULL) {
880 			/* already inited */
881 			commd_debug(MD_MMV_INIT, "init: already: node=%s\n",
882 			    node->nd_nodename ? node->nd_nodename : "NULL");
883 			continue;
884 		}
885 
886 		/*
887 		 * While trying to create a connection to a node,
888 		 * periodically check to see if the node has been marked
889 		 * dead by the SunCluster infrastructure.
890 		 * This periodic check is needed since a non-responsive
891 		 * rpc.mdcommd (while it is attempting to create a connection
892 		 * to a dead node) can lead to large delays and/or failures
893 		 * in the reconfig steps.
894 		 */
895 		while ((client[setno][nid] == (CLIENT *) NULL) &&
896 		    (tout < MD_CLNT_CREATE_TOUT)) {
897 			client[setno][nid] = meta_client_create_retry(
898 			    node->nd_nodename, mdmn_clnt_create,
899 			    (void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
900 			/* Is the node dead? */
901 			if (mdmn_is_node_dead(node) == 1) {
902 				commd_debug(MD_MMV_SYSLOG,
903 				    "rpc.mdcommd: no client for dead node %s\n",
904 				    node->nd_nodename);
905 				break;
906 			} else
907 				tout += MD_CLNT_CREATE_SUBTIMEOUT;
908 		}
909 
910 		if (client[setno][nid] == (CLIENT *) NULL) {
911 			clnt_pcreateerror(node->nd_nodename);
912 			/*
913 			 * If we cannot connect to a single node
914 			 * (maybe because it is down) we mark this node as not
915 			 * owned and continue with the next node in the list.
916 			 * This is better than failing the entire starting up
917 			 * of the commd system.
918 			 */
919 			node->nd_flags &= ~MD_MN_NODE_OWN;
920 			commd_debug(MD_MMV_SYSLOG,
921 			    "WARNING couldn't create client for %s\n"
922 			    "Reconfig cycle required\n",
923 			    node->nd_nodename);
924 			commd_debug(MD_MMV_INIT,
925 			    "WARNING couldn't create client for %s\n"
926 			    "Reconfig cycle required\n",
927 			    node->nd_nodename);
928 			continue;
929 		}
930 		/* this node has the license to send */
931 		commd_debug(MD_MMV_MISC, "init_set: calling add_lic\n");
932 		add_license(node);
933 
934 		/* set the timeout value */
935 		clnt_control(client[setno][nid], CLSET_TIMEOUT,
936 		    (char *)&FOUR_SECS);
937 
938 		commd_debug(MD_MMV_INIT, "init: done: node=%s\n",
939 		    node->nd_nodename ? node->nd_nodename : "NULL");
940 	}
941 
942 	set_descriptor[setno] = sd;
943 	md_mn_set_inited[setno] |= MDMN_SET_NODES;
944 	return (0); /* success */
945 }
946 
947 void *
948 mdmn_send_to_work(void *arg)
949 {
950 	int			*rpc_err = NULL;
951 	int			success;
952 	int			try_master;
953 	set_t			setno;
954 	mutex_t			*mx;	/* protection for initiator_table */
955 	SVCXPRT			*transp;
956 	md_mn_msg_t		*msg;
957 	md_mn_nodeid_t		set_master;
958 	md_mn_msgclass_t	class;
959 	md_mn_msg_and_transp_t	*matp = (md_mn_msg_and_transp_t *)arg;
960 
961 	msg			= matp->mat_msg;
962 	transp			= matp->mat_transp;
963 
964 	class = mdmn_get_message_class(msg->msg_type);
965 	setno = msg->msg_setno;
966 
967 	/* set the sender, so the master knows who to send the results */
968 	rw_rdlock(&set_desc_rwlock[setno]);
969 	msg->msg_sender = set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
970 	set_master	= set_descriptor[setno]->sd_mn_master_nodeid;
971 
972 	mx = mdmn_get_initiator_table_mx(setno, class);
973 	mutex_lock(mx);
974 
975 	/*
976 	 * Here we check, if the initiator table slot for this set/class
977 	 * combination is free to use.
978 	 * If this is not the case, we return CLASS_BUSY forcing the
979 	 * initiating send_message call to retry
980 	 */
981 	success = mdmn_check_initiator_table(setno, class);
982 	if (success == MDMNE_CLASS_BUSY) {
983 		md_mn_msgid_t		active_mid;
984 
985 		mdmn_get_initiator_table_id(setno, class, &active_mid);
986 
987 		commd_debug(MD_MMV_SEND,
988 		    "send_to_work: received but locally busy "
989 		    "(%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
990 		    "active msg=(%d, 0x%llx-%d)\n",
991 		    MSGID_ELEMS(msg->msg_msgid), setno, class,
992 		    msg->msg_type, MSGID_ELEMS(active_mid));
993 	} else {
994 		commd_debug(MD_MMV_SEND,
995 		    "send_to_work: received (%d, 0x%llx-%d), "
996 		    "set=%d, class=%d, type=%d\n",
997 		    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
998 	}
999 
1000 	try_master = 2; /* return failure after two retries */
1001 	while ((success == MDMNE_ACK) && (try_master--)) {
1002 		rw_rdlock(&client_rwlock[setno]);
1003 		/* is the rpc client to the master still around ? */
1004 		if (check_client(setno, set_master)) {
1005 			success = MDMNE_RPC_FAIL;
1006 			FLUSH_DEBUGFILE();
1007 			rw_unlock(&client_rwlock[setno]);
1008 			break; /* out of try_master-loop */
1009 		}
1010 
1011 		/*
1012 		 * Send the request to the work function on the master
1013 		 * this call will return immediately
1014 		 */
1015 		rpc_err = mdmn_work_2(msg, client[setno][set_master],
1016 		    set_master);
1017 
1018 		/* Everything's Ok? */
1019 		if (rpc_err == NULL) {
1020 			success = MDMNE_RPC_FAIL;
1021 			/*
1022 			 * Probably something happened to the daemon on the
1023 			 * master. Kill the client, and try again...
1024 			 */
1025 			rw_unlock(&client_rwlock[setno]);
1026 			rw_wrlock(&client_rwlock[setno]);
1027 			mdmn_clnt_destroy(client[setno][set_master]);
1028 			if (client[setno][set_master] != (CLIENT *)NULL) {
1029 				client[setno][set_master] = (CLIENT *)NULL;
1030 			}
1031 			rw_unlock(&client_rwlock[setno]);
1032 			continue;
1033 
1034 		} else  if (*rpc_err != MDMNE_ACK) {
1035 			/* something went wrong, break out */
1036 			success = *rpc_err;
1037 			free(rpc_err);
1038 			rw_unlock(&client_rwlock[setno]);
1039 			break; /* out of try_master-loop */
1040 		}
1041 
1042 		rw_unlock(&client_rwlock[setno]);
1043 		free(rpc_err);
1044 
1045 		/*
1046 		 * If we are here, we sucessfully delivered the message.
1047 		 * We register the initiator_table, so that
1048 		 * wakeup_initiator_2 can do the sendreply with the
1049 		 * results for us.
1050 		 */
1051 		success = MDMNE_ACK;
1052 		mdmn_register_initiator_table(setno, class, msg, transp);
1053 
1054 		/* tell check_timeouts, there's work to do */
1055 		mutex_lock(&check_timeout_mutex);
1056 		messages_on_their_way++;
1057 		cond_signal(&check_timeout_cv);
1058 		mutex_unlock(&check_timeout_mutex);
1059 		break; /* out of try_master-loop */
1060 	}
1061 
1062 	rw_unlock(&set_desc_rwlock[setno]);
1063 
1064 	if (success == MDMNE_ACK) {
1065 		commd_debug(MD_MMV_SEND,
1066 		    "send_to_work: registered (%d, 0x%llx-%d)\n",
1067 		    MSGID_ELEMS(msg->msg_msgid));
1068 	} else {
1069 		/* In case of failure do the sendreply now */
1070 		md_mn_result_t *resultp;
1071 		resultp = Zalloc(sizeof (md_mn_result_t));
1072 		resultp->mmr_comm_state = success;
1073 		/*
1074 		 * copy the MSGID so that we know _which_ message
1075 		 * failed (if the transp has got mangled)
1076 		 */
1077 		MSGID_COPY(&(msg->msg_msgid), &(resultp->mmr_msgid));
1078 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
1079 		commd_debug(MD_MMV_SEND,
1080 		    "send_to_work: not registered (%d, 0x%llx-%d) cs=%d\n",
1081 		    MSGID_ELEMS(msg->msg_msgid), success);
1082 		free_result(resultp);
1083 		/*
1084 		 * We don't have a timeout registered to wake us up, so we're
1085 		 * now done with this handle. Release it back to the pool.
1086 		 */
1087 		svc_done(transp);
1088 
1089 	}
1090 
1091 	free_msg(msg);
1092 	/* the alloc was done in mdmn_send_svc_2 */
1093 	Free(matp);
1094 	mutex_unlock(mx);
1095 	return (NULL);
1096 
1097 }
1098 
1099 /*
1100  * do_message_locally(msg, result)
1101  * Process a message locally on the master
1102  * Lookup the MCT if the message has already been processed.
1103  * If not, call the handler and store the result
1104  * If yes, retrieve the result from the MCT.
1105  * Return:
1106  *	MDMNE_ACK in case of success
1107  *	MDMNE_LOG_FAIL if the MCT could not be checked
1108  */
1109 static int
1110 do_message_locally(md_mn_msg_t *msg, md_mn_result_t *result)
1111 {
1112 	int			completed;
1113 	set_t			setno;
1114 	md_mn_msgtype_t		msgtype = msg->msg_type;
1115 	md_mn_msgclass_t	class;
1116 
1117 	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1118 
1119 	handler = mdmn_get_handler(msgtype);
1120 	if (handler == NULL) {
1121 		result->mmr_exitval = 0;
1122 		/* let the sender decide if this is an error or not */
1123 		result->mmr_comm_state = MDMNE_NO_HANDLER;
1124 		return (MDMNE_NO_HANDLER);
1125 	}
1126 
1127 	class = mdmn_get_message_class(msg->msg_type);
1128 	setno = msg->msg_setno;
1129 
1130 	result->mmr_msgtype	= msgtype;
1131 	result->mmr_flags	= msg->msg_flags;
1132 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1133 
1134 	mutex_lock(&mct_mutex[setno][class]);
1135 	completed = mdmn_check_completion(msg, result);
1136 	if (completed == MDMN_MCT_NOT_DONE) {
1137 		/* message not yet processed locally */
1138 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1139 		    "calling handler for (%d,0x%llx-%d) type %d\n",
1140 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1141 
1142 		/*
1143 		 * Mark the message as being currently processed,
1144 		 * so we won't start a second handler for it
1145 		 */
1146 		(void) mdmn_mark_completion(msg, NULL, MDMN_MCT_IN_PROGRESS);
1147 		mutex_unlock(&mct_mutex[setno][class]);
1148 
1149 		/* here we actually process the message on the master */
1150 		(*handler)(msg, MD_MSGF_ON_MASTER, result);
1151 
1152 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1153 		    "finished handler for (%d,0x%llx-%d) type %d\n",
1154 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1155 
1156 		/* Mark the message as fully processed, store the result */
1157 		mutex_lock(&mct_mutex[setno][class]);
1158 		(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
1159 	} else if (completed == MDMN_MCT_DONE) {
1160 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1161 		    "result for (%d, 0x%llx-%d) from MCT\n",
1162 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1163 	} else if (completed == MDMN_MCT_IN_PROGRESS) {
1164 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1165 		    "(%d, 0x%llx-%d) is currently being processed\n",
1166 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1167 	} else {
1168 		/* MCT error occurred (should never happen) */
1169 		mutex_unlock(&mct_mutex[setno][class]);
1170 		result->mmr_comm_state = MDMNE_LOG_FAIL;
1171 		commd_debug(MD_MMV_SYSLOG, "WARNING "
1172 		    "mdmn_check_completion returned %d "
1173 		    "for (%d,0x%llx-%d)\n", completed,
1174 		    MSGID_ELEMS(msg->msg_msgid));
1175 		return (MDMNE_LOG_FAIL);
1176 	}
1177 	mutex_unlock(&mct_mutex[setno][class]);
1178 	return (MDMNE_ACK);
1179 
1180 }
1181 
1182 /*
1183  * do_send_message(msg, node)
1184  *
1185  * Send a message to a given node and wait for a acknowledgment, that the
1186  * message has arrived on the remote node.
1187  * Make sure that the client for the set is setup correctly.
1188  * If no ACK arrives, destroy and recreate the RPC client and retry the
1189  * message one time
1190  * After actually sending wait no longer than the appropriate number of
1191  * before timing out the message.
1192  *
1193  * Note must be called with set_desc_wrlock held in reader mode
1194  */
1195 static int
1196 do_send_message(md_mn_msg_t *msg, md_mnnode_desc *node)
1197 {
1198 	int			err;
1199 	int			rpc_retries;
1200 	int			timeout_retries = 0;
1201 	int			*ret = NULL;
1202 	set_t			setno;
1203 	cond_t			*cv;	/* see mdmn_wakeup_master_svc_2 */
1204 	mutex_t			*mx;	/* protection for class_busy */
1205 	timestruc_t		timeout; /* surveillance for remote daemon */
1206 	md_mn_nodeid_t		nid;
1207 	md_mn_msgtype_t		msgtype;
1208 	md_mn_msgclass_t	class;
1209 
1210 	nid	= node->nd_nodeid;
1211 	msgtype = msg->msg_type;
1212 	setno	= msg->msg_setno;
1213 	class	= mdmn_get_message_class(msgtype);
1214 	mx	= mdmn_get_master_table_mx(setno, class);
1215 	cv	= mdmn_get_master_table_cv(setno, class);
1216 
1217 retry_rpc:
1218 
1219 	/* We try two times to send the message */
1220 	rpc_retries = 2;
1221 
1222 	/*
1223 	 * if sending the message doesn't succeed the first time due to a
1224 	 * RPC problem, we retry one time
1225 	 */
1226 	while ((rpc_retries != 0) && (ret == NULL)) {
1227 		/*  in abort state, we error out immediately */
1228 		if (md_commd_global_state & MD_CGS_ABORTED) {
1229 			return (MDMNE_ABORT);
1230 		}
1231 
1232 		rw_rdlock(&client_rwlock[setno]);
1233 		/* unable to create client? Ignore it */
1234 		if (check_client(setno, nid)) {
1235 			/*
1236 			 * In case we cannot establish an RPC client, we
1237 			 * take this node out of our considerations.
1238 			 * This will be reset by a reconfig
1239 			 * cycle that should come pretty soon.
1240 			 * MNISSUE: Should a reconfig cycle
1241 			 * be forced on SunCluster?
1242 			 */
1243 			node->nd_flags &= ~MD_MN_NODE_OWN;
1244 			commd_debug(MD_MMV_SYSLOG,
1245 			    "WARNING couldn't create client for %s\n"
1246 			    "Reconfig cycle required\n",
1247 			    node->nd_nodename);
1248 			commd_debug(MD_MMV_PROC_M, "proc_mas: (%d,0x%llx-%d) "
1249 			    "WARNING couldn't create client for %s\n",
1250 			    MSGID_ELEMS(msg->msg_msgid), node->nd_nodename);
1251 			rw_unlock(&client_rwlock[setno]);
1252 			return (MDMNE_IGNORE_NODE);
1253 		}
1254 		/* let's be paranoid and check again before sending */
1255 		if (client[setno][nid] == NULL) {
1256 			/*
1257 			 * if this is true, strange enough, we catch our breath,
1258 			 * and then continue, so that the client is set up
1259 			 * once again.
1260 			 */
1261 			commd_debug(MD_MMV_PROC_M, "client is NULL\n");
1262 			rw_unlock(&client_rwlock[setno]);
1263 			sleep(1);
1264 			continue;
1265 		}
1266 
1267 		/* send it over, it will return immediately */
1268 		ret = mdmn_work_2(msg, client[setno][nid], nid);
1269 
1270 		rw_unlock(&client_rwlock[setno]);
1271 
1272 		if (ret != NULL) {
1273 			commd_debug(MD_MMV_PROC_M,
1274 			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1275 			    " 0x%x\n",
1276 			    MSGID_ELEMS(msg->msg_msgid), nid, *ret);
1277 		} else {
1278 			commd_debug(MD_MMV_PROC_M,
1279 			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1280 			    " NULL \n",
1281 			    MSGID_ELEMS(msg->msg_msgid), nid);
1282 		}
1283 
1284 		if ((ret == NULL) || (*ret == MDMNE_CANNOT_CONNECT) ||
1285 		    (*ret == MDMNE_THR_CREATE_FAIL)) {
1286 			/*
1287 			 * Something happened to the daemon on the other side.
1288 			 * Kill the client, and try again.
1289 			 * check_client() will create a new client
1290 			 */
1291 			rw_wrlock(&client_rwlock[setno]);
1292 			mdmn_clnt_destroy(client[setno][nid]);
1293 			if (client[setno][nid] != (CLIENT *)NULL) {
1294 				client[setno][nid] = (CLIENT *)NULL;
1295 			}
1296 			rw_unlock(&client_rwlock[setno]);
1297 
1298 			/* ... but don't try infinitely */
1299 			--rpc_retries;
1300 			continue;
1301 		}
1302 		/*
1303 		 * If the class is locked on the other node, keep trying.
1304 		 * This situation will go away automatically,
1305 		 * if we wait long enough
1306 		 */
1307 		if (*ret == MDMNE_CLASS_LOCKED) {
1308 			sleep(1);
1309 			free(ret);
1310 			ret = NULL;
1311 			continue;
1312 		}
1313 	}
1314 	if (ret == NULL) {
1315 		return (MDMNE_RPC_FAIL);
1316 	}
1317 
1318 
1319 	/* if the slave is in abort state, we just ignore it. */
1320 	if (*ret == MDMNE_ABORT) {
1321 		commd_debug(MD_MMV_PROC_M,
1322 		    "proc_mas: work(%d,0x%llx-%d) returned "
1323 		    "MDMNE_ABORT\n",
1324 		    MSGID_ELEMS(msg->msg_msgid));
1325 		free(ret);
1326 		return (MDMNE_IGNORE_NODE);
1327 	}
1328 
1329 	/* Did the remote processing succeed? */
1330 	if (*ret != MDMNE_ACK) {
1331 		/*
1332 		 * Some commd failure in the middle of sending the msg
1333 		 * to the nodes. We don't continue here.
1334 		 */
1335 		commd_debug(MD_MMV_PROC_M,
1336 		    "proc_mas: work(%d,0x%llx-%d) returns %d\n",
1337 		    MSGID_ELEMS(msg->msg_msgid), *ret);
1338 		free(ret);
1339 		return (MDMNE_RPC_FAIL);
1340 	}
1341 	free(ret);
1342 	ret = NULL;
1343 
1344 	/*
1345 	 * When we are here, we have sent the message to the other node and
1346 	 * we know that node has accepted it.
1347 	 * We go to sleep and have trust to be woken up by wakeup.
1348 	 * If we wakeup due to a timeout, or a signal, no result has been
1349 	 * placed in the appropriate slot.
1350 	 * If we timeout, it is likely that this is because the node has
1351 	 * gone away, so we will destroy the client and try it again in the
1352 	 * expectation that the rpc will fail and we will return
1353 	 * MDMNE_IGNORE_NODE. If that is not the case, the message must still
1354 	 * be being processed on the slave. In this case just timeout for 4
1355 	 * more seconds and then return RPC_FAIL if the message is not complete.
1356 	 */
1357 	timeout.tv_nsec = 0;
1358 	timeout.tv_sec = (timeout_retries == 0) ? mdmn_get_timeout(msgtype) :
1359 	    FOUR_SECS.tv_sec;
1360 	err = cond_reltimedwait(cv, mx, &timeout);
1361 
1362 	if (err == 0) {
1363 		/* everything's fine, return success */
1364 		return (MDMNE_ACK);
1365 	}
1366 
1367 	if (err == ETIME) {
1368 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1369 		    "timeout occured, set=%d, class=%d, "
1370 		    "msgid=(%d, 0x%llx-%d), timeout_retries=%d\n",
1371 		    setno, class, MSGID_ELEMS(msg->msg_msgid), timeout_retries);
1372 		if (timeout_retries == 0) {
1373 			timeout_retries++;
1374 			/*
1375 			 * Destroy the client and try the rpc call again
1376 			 */
1377 			rw_wrlock(&client_rwlock[setno]);
1378 			mdmn_clnt_destroy(client[setno][nid]);
1379 			client[setno][nid] = (CLIENT *)NULL;
1380 			rw_unlock(&client_rwlock[setno]);
1381 			goto retry_rpc;
1382 		}
1383 	} else if (err == EINTR) {
1384 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1385 		    "commd signalled, set=%d, class=%d, "
1386 		    "msgid=(%d, 0x%llx-%d)\n",
1387 		    setno, class, MSGID_ELEMS(msg->msg_msgid));
1388 	} else {
1389 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1390 		    "cond_reltimedwait err=%d, set=%d, "
1391 		    "class=%d, msgid=(%d, 0x%llx-%d)\n",
1392 		    err, setno, class,
1393 		    MSGID_ELEMS(msg->msg_msgid));
1394 	}
1395 
1396 	/* some failure happened */
1397 	return (MDMNE_RPC_FAIL);
1398 }
1399 
1400 /*
1401  * before we return we have to
1402  * free_msg(msg); because we are working on a copied message
1403  */
1404 void
1405 mdmn_master_process_msg(md_mn_msg_t *msg)
1406 {
1407 	int		*ret;
1408 	int		err;
1409 	int		nmsgs;		/* total number of msgs */
1410 	int		curmsg;		/* index of current msg */
1411 	set_t		setno;
1412 	uint_t		inherit_flags = 0;
1413 	uint_t		secdiff, usecdiff; /* runtime of this message */
1414 	md_error_t	mde = mdnullerror;
1415 	md_mn_msg_t	*msglist[MAX_SUBMESSAGES]; /* all msgs to process */
1416 	md_mn_msg_t	*cmsg;		/* current msg */
1417 	md_mn_msgid_t	dummyid;
1418 	md_mn_result_t	*result;
1419 	md_mn_result_t	*slave_result;
1420 	md_mn_nodeid_t	sender;
1421 	md_mn_nodeid_t	set_master;
1422 	md_mnnode_desc	*node;
1423 	md_mn_msgtype_t	orig_type;	/* type of the original message */
1424 	md_mn_msgtype_t	msgtype;	/* type of the current message */
1425 	md_mn_msgclass_t orig_class;	/* class of the original message */
1426 	md_mn_msgclass_t class;		/* class of the current message */
1427 
1428 	int (*smgen)(md_mn_msg_t *msg, md_mn_msg_t **msglist);
1429 
1430 	orig_type = msgtype = msg->msg_type;
1431 	sender	= msg->msg_sender;
1432 	setno	= msg->msg_setno;
1433 
1434 	result = Zalloc(sizeof (md_mn_result_t));
1435 	result->mmr_setno	= setno;
1436 	result->mmr_msgtype	= msgtype;
1437 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1438 
1439 	orig_class = mdmn_get_message_class(msgtype);
1440 
1441 	commd_debug(MD_MMV_PROC_M,
1442 	    "proc_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1443 	    MSGID_ELEMS(msg->msg_msgid), setno, orig_class, msgtype);
1444 
1445 	rw_rdlock(&set_desc_rwlock[setno]);
1446 	set_master = set_descriptor[setno]->sd_mn_master_nodeid;
1447 	result->mmr_sender	= set_master;
1448 	/*
1449 	 * Put message into the change log unless told otherwise
1450 	 * Note that we only log original messages.
1451 	 * If they are generated by some smgen, we don't log them!
1452 	 * Replay messages aren't logged either.
1453 	 * Note, that replay messages are unlogged on completion.
1454 	 */
1455 	if ((msg->msg_flags & (MD_MSGF_NO_LOG | MD_MSGF_REPLAY_MSG)) == 0) {
1456 		commd_debug(MD_MMV_PROC_M,
1457 		    "proc_mas: calling log_msg for (%d,0x%llx-%d) type %d\n",
1458 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1459 		err = mdmn_log_msg(msg);
1460 		if (err == MDMNE_NULL) {
1461 			/* msg logged successfully */
1462 			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1463 			    "done log_msg for (%d,0x%llx-%d) type %d\n",
1464 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1465 			goto proceed;
1466 		}
1467 		if (err == MDMNE_ACK) {
1468 			/* Same msg in the slot, proceed */
1469 			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1470 			    "already logged (%d,0x%llx-%d) type %d\n",
1471 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1472 			goto proceed;
1473 		}
1474 		if (err == MDMNE_LOG_FAIL) {
1475 			/* Oh, bad, the log is non functional. */
1476 			result->mmr_comm_state = MDMNE_LOG_FAIL;
1477 			/*
1478 			 * Note that the mark_busy was already done by
1479 			 * mdmn_work_svc_2()
1480 			 */
1481 			mutex_lock(&mdmn_busy_mutex[setno]);
1482 			mdmn_mark_class_unbusy(setno, orig_class);
1483 			mutex_unlock(&mdmn_busy_mutex[setno]);
1484 
1485 		}
1486 		if (err == MDMNE_CLASS_BUSY) {
1487 			/*
1488 			 * The log is occupied with a different message
1489 			 * that needs to be played first.
1490 			 * We reject the current message with MDMNE_CLASS_BUSY
1491 			 * to the initiator and do not unbusy the set/class,
1492 			 * because we will proceed with the logged message,
1493 			 * which has the same set/class combination
1494 			 */
1495 			result->mmr_comm_state = MDMNE_CLASS_BUSY;
1496 		}
1497 		ret = (int *)NULL;
1498 		rw_rdlock(&client_rwlock[setno]);
1499 
1500 		if (check_client(setno, sender)) {
1501 			commd_debug(MD_MMV_SYSLOG,
1502 			    "proc_mas: No client for initiator \n");
1503 		} else {
1504 			ret = mdmn_wakeup_initiator_2(result,
1505 			    client[setno][sender], sender);
1506 		}
1507 		rw_unlock(&client_rwlock[setno]);
1508 
1509 		if (ret == (int *)NULL) {
1510 			commd_debug(MD_MMV_SYSLOG,
1511 			    "proc_mas: couldn't wakeup_initiator \n");
1512 		} else {
1513 			if (*ret != MDMNE_ACK) {
1514 				commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1515 				    "wakeup_initiator returned %d\n", *ret);
1516 			}
1517 			free(ret);
1518 		}
1519 		free_msg(msg);
1520 
1521 		if (err == MDMNE_LOG_FAIL) {
1522 			/* we can't proceed here */
1523 			free_result(result);
1524 			rw_unlock(&set_desc_rwlock[setno]);
1525 			return;
1526 		} else if (err == MDMNE_CLASS_BUSY) {
1527 			mdmn_changelog_record_t *lr;
1528 			lr = mdmn_get_changelogrec(setno, orig_class);
1529 			assert(lr != NULL);
1530 
1531 			/* proceed with the logged message */
1532 			msg = copy_msg(&(lr->lr_msg), NULL);
1533 
1534 			/*
1535 			 * The logged message has to have the same class but
1536 			 * type and sender can be different
1537 			 */
1538 			orig_type = msgtype = msg->msg_type;
1539 			sender	= msg->msg_sender;
1540 
1541 			commd_debug(MD_MMV_PROC_M,
1542 			    "proc_mas: Got new message from change log: "
1543 			    "(%d,0x%llx-%d) type %d\n",
1544 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1545 
1546 			/* continue normal operation with this message */
1547 		}
1548 	}
1549 
1550 proceed:
1551 	smgen = mdmn_get_submessage_generator(msgtype);
1552 	if (smgen == NULL) {
1553 		/* no submessages to create, just use the original message */
1554 		msglist[0] = msg;
1555 		nmsgs = 1;
1556 	} else {
1557 		/* some bits are passed on to submessages */
1558 		inherit_flags = msg->msg_flags & MD_MSGF_INHERIT_BITS;
1559 
1560 		nmsgs = smgen(msg, msglist);
1561 
1562 		/* some settings for the submessages */
1563 		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1564 			cmsg    = msglist[curmsg];
1565 
1566 			/* Apply the inherited flags */
1567 			cmsg->msg_flags |= inherit_flags;
1568 
1569 			/*
1570 			 * Make sure the submessage ID is set correctly
1571 			 * Note: first submessage has mid_smid of 1 (not 0)
1572 			 */
1573 			cmsg->msg_msgid.mid_smid = curmsg + 1;
1574 
1575 			/* need the original class set in msgID (for MCT) */
1576 			cmsg->msg_msgid.mid_oclass = orig_class;
1577 		}
1578 
1579 		commd_debug(MD_MMV_PROC_M,
1580 		    "smgen generated %d submsgs, origclass = %d\n",
1581 		    nmsgs, orig_class);
1582 	}
1583 	/*
1584 	 * This big loop does the following.
1585 	 * For all messages:
1586 	 *	process message on the master first (a message completion
1587 	 *		table MCT ensures a message is not processed twice)
1588 	 *	in case of an error break out of message loop
1589 	 *	for all nodes -- unless MD_MSGF_NO_BCAST is set --
1590 	 *		send message to node until that succeeds
1591 	 *		merge result -- not yet implemented
1592 	 *		respect MD_MSGF_STOP_ON_ERROR
1593 	 */
1594 	for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1595 		int	break_msg_loop = 0;
1596 		mutex_t	*mx;		/* protection for class_busy */
1597 		int	master_err;
1598 		int	master_exitval = -1;
1599 
1600 		cmsg	= msglist[curmsg];
1601 		msgtype = cmsg->msg_type;
1602 		class	= mdmn_get_message_class(msgtype);
1603 		node	= NULL;
1604 		mx	= mdmn_get_master_table_mx(setno, class);
1605 
1606 		/* If we are in the abort state, we error out immediately */
1607 		if (md_commd_global_state & MD_CGS_ABORTED) {
1608 			break; /* out of the message loop */
1609 		}
1610 
1611 		commd_debug(MD_MMV_PROC_M, "class=%d, orig_class=%d\n",
1612 		    class, orig_class);
1613 		/*
1614 		 * If the current class is different from the original class,
1615 		 * we have to lock it down.
1616 		 * The original class is already marked busy.
1617 		 * At this point we cannot refuse the message because the
1618 		 * class is busy right now, so we wait until the class becomes
1619 		 * available again. As soon as something changes for this set
1620 		 * we will be cond_signal'ed (in mdmn_mark_class_unbusy)
1621 		 *
1622 		 * Granularity could be finer (setno/class)
1623 		 */
1624 		if (class != orig_class) {
1625 			mutex_lock(&mdmn_busy_mutex[setno]);
1626 			while (mdmn_mark_class_busy(setno, class) == FALSE) {
1627 				cond_wait(&mdmn_busy_cv[setno],
1628 				    &mdmn_busy_mutex[setno]);
1629 			}
1630 			mutex_unlock(&mdmn_busy_mutex[setno]);
1631 		}
1632 
1633 		master_err = do_message_locally(cmsg, result);
1634 
1635 		if ((master_err != MDMNE_ACK) ||
1636 		    ((master_err == MDMNE_ACK) && (result->mmr_exitval != 0))) {
1637 			result->mmr_failing_node = set_master;
1638 			if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1639 				/*
1640 				 * if appropriate, unbusy the class and
1641 				 * break out of the message loop
1642 				 */
1643 				if (class != orig_class) {
1644 					mutex_lock(&mdmn_busy_mutex[setno]);
1645 					mdmn_mark_class_unbusy(setno, class);
1646 					mutex_unlock(&mdmn_busy_mutex[setno]);
1647 				}
1648 				break;
1649 			}
1650 		}
1651 
1652 		if (master_err == MDMNE_ACK)
1653 			master_exitval = result->mmr_exitval;
1654 
1655 		/* No broadcast? => next message */
1656 		if (cmsg->msg_flags & MD_MSGF_NO_BCAST) {
1657 			/* if appropriate, unbusy the class */
1658 			if (class != orig_class) {
1659 				mutex_lock(&mdmn_busy_mutex[setno]);
1660 				mdmn_mark_class_unbusy(setno, class);
1661 				mutex_unlock(&mdmn_busy_mutex[setno]);
1662 			}
1663 			continue;
1664 		}
1665 
1666 
1667 		/* fake sender, so we get notified when the results are avail */
1668 		cmsg->msg_sender = set_master;
1669 		/*
1670 		 * register to the master_table. It's needed by wakeup_master to
1671 		 * wakeup the sleeping thread.
1672 		 * Access is protected by the class lock: mdmn_mark_class_busy()
1673 		 */
1674 		mdmn_set_master_table_id(setno, class, &(cmsg->msg_msgid));
1675 
1676 
1677 
1678 		rw_rdlock(&set_desc_rwlock[setno]);
1679 		/* Send the message  to all other nodes */
1680 		for (node = set_descriptor[setno]->sd_nodelist; node;
1681 		    node = node->nd_next) {
1682 			md_mn_nodeid_t nid = node->nd_nodeid;
1683 
1684 			/* We are master and have already processed the msg */
1685 			if (node == set_descriptor[setno]->sd_mn_masternode) {
1686 				continue;
1687 			}
1688 
1689 			/* If this node didn't join the disk set, ignore it */
1690 			if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
1691 				continue;
1692 			}
1693 
1694 			/* If a DIRECTED message, skip non-recipient nodes */
1695 			if ((cmsg->msg_flags & MD_MSGF_DIRECTED) &&
1696 			    nid != cmsg->msg_recipient) {
1697 				continue;
1698 			}
1699 
1700 			mutex_lock(mx);
1701 			/*
1702 			 * Register the node that is addressed,
1703 			 * so we can detect unsolicited messages
1704 			 */
1705 			mdmn_set_master_table_addr(setno, class, nid);
1706 			slave_result = (md_mn_result_t *)NULL;
1707 
1708 			/*
1709 			 * Now send it. do_send_message() will return if
1710 			 *	a failure occurs or
1711 			 *	the results are available
1712 			 */
1713 			err = do_send_message(cmsg, node);
1714 
1715 			/*  in abort state, we error out immediately */
1716 			if (md_commd_global_state & MD_CGS_ABORTED) {
1717 				break;
1718 			}
1719 
1720 			if (err == MDMNE_ACK) {
1721 				slave_result =
1722 				    mdmn_get_master_table_res(setno, class);
1723 				commd_debug(MD_MMV_PROC_M,
1724 				    "proc_mas: got result for (%d,0x%llx-%d)\n",
1725 				    MSGID_ELEMS(cmsg->msg_msgid));
1726 			} else if (err == MDMNE_IGNORE_NODE) {
1727 				mutex_unlock(mx);
1728 				continue; /* send to next node */
1729 			}
1730 			mutex_unlock(mx);
1731 
1732 
1733 			/*
1734 			 * If the result is NULL, or err doesn't show success,
1735 			 * something went wrong with this RPC call.
1736 			 */
1737 			if ((slave_result == NULL) || (err != MDMNE_ACK)) {
1738 				/*
1739 				 * If PANIC_WHEN_INCONSISTENT set,
1740 				 * panic if the master succeeded while
1741 				 * this node failed
1742 				 */
1743 				if ((cmsg->msg_flags &
1744 				    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1745 				    (master_err == MDMNE_ACK))
1746 					panic_system(nid, cmsg->msg_type,
1747 					    master_err, master_exitval,
1748 					    slave_result);
1749 
1750 				result->mmr_failing_node = nid;
1751 				/* are we supposed to stop in case of error? */
1752 				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1753 					result->mmr_exitval = MDMNE_RPC_FAIL;
1754 					commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1755 					    "result (%d,0x%llx-%d) is NULL\n",
1756 					    MSGID_ELEMS(cmsg->msg_msgid));
1757 					FLUSH_DEBUGFILE();
1758 					break_msg_loop = 1;
1759 					break; /* out of node loop first */
1760 				} else {
1761 					/* send msg to the next node */
1762 					continue;
1763 				}
1764 
1765 			}
1766 
1767 			/*
1768 			 * Message processed on remote node.
1769 			 * If PANIC_WHEN_INCONSISTENT set, panic if the
1770 			 * result is different on this node from the result
1771 			 * on the master
1772 			 */
1773 			if ((cmsg->msg_flags &
1774 			    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1775 			    ((master_err != MDMNE_ACK) ||
1776 			    (slave_result->mmr_exitval != master_exitval)))
1777 				panic_system(nid, cmsg->msg_type, master_err,
1778 				    master_exitval, slave_result);
1779 
1780 			/*
1781 			 * At this point we know we have a message that was
1782 			 * processed on the remote node.
1783 			 * We now check if the exitval is non zero.
1784 			 * In that case we discard the previous result and
1785 			 * rather use the current.
1786 			 * This means: If a message fails on no node,
1787 			 * the result from the master will be returned.
1788 			 * There's currently no such thing as merge of results
1789 			 * If additionally STOP_ON_ERROR is set, we bail out
1790 			 */
1791 			if (slave_result->mmr_exitval != 0) {
1792 				/* throw away the previously allocated result */
1793 				free_result(result);
1794 
1795 				/* copy_result() allocates new memory */
1796 				result = copy_result(slave_result);
1797 				free_result(slave_result);
1798 
1799 				dump_result(MD_MMV_PROC_M, "proc_mas", result);
1800 
1801 				result->mmr_failing_node = nid;
1802 				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1803 					break_msg_loop = 1;
1804 					break; /* out of node loop */
1805 				}
1806 				continue; /* try next node */
1807 
1808 			} else {
1809 				/*
1810 				 * MNIssue: may want to merge the results
1811 				 * from all slaves.  Currently only report
1812 				 * the results from the master.
1813 				 */
1814 				free_result(slave_result);
1815 			}
1816 
1817 		} /* End of loop over the nodes */
1818 		rw_unlock(&set_desc_rwlock[setno]);
1819 
1820 
1821 		/* release the current class again */
1822 		if (class != orig_class) {
1823 			mutex_lock(&mdmn_busy_mutex[setno]);
1824 			mdmn_mark_class_unbusy(setno, class);
1825 			mutex_unlock(&mdmn_busy_mutex[setno]);
1826 		}
1827 
1828 		/* are we supposed to quit entirely ? */
1829 		if (break_msg_loop ||
1830 		    (md_commd_global_state & MD_CGS_ABORTED)) {
1831 			break; /* out of msg loop */
1832 		}
1833 
1834 	} /* End of loop over the messages */
1835 	/*
1836 	 * If we are here, there's two possibilities:
1837 	 * 	- we processed all messages on all nodes without an error.
1838 	 *	    In this case we return the result from the master.
1839 	 *	    (to be implemented: return the merged result)
1840 	 *	- we encountered an error in which case result has been
1841 	 *	    set accordingly already.
1842 	 */
1843 
1844 	if (md_commd_global_state & MD_CGS_ABORTED) {
1845 		result->mmr_comm_state = MDMNE_ABORT;
1846 	}
1847 
1848 	/*
1849 	 * This message has been processed completely.
1850 	 * Remove it from the changelog.
1851 	 * Do this for replay messages too.
1852 	 * Note that the message is unlogged before waking up the
1853 	 * initiator.  This is done for two reasons.
1854 	 * 1. Remove a race condition that occurs when back to back
1855 	 *   messages are sent for the same class, the registeration is
1856 	 *   is lost.
1857 	 * 2. If the initiator died but the action was completed on all the
1858 	 *   the nodes, we want that to be marked "done" quickly.
1859 	 */
1860 
1861 	if ((msg->msg_flags & MD_MSGF_NO_LOG) == 0) {
1862 		commd_debug(MD_MMV_PROC_M,
1863 		    "proc_mas: calling unlog_msg for (%d,0x%llx-%d) type %d\n",
1864 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1865 		mdmn_unlog_msg(msg);
1866 		commd_debug(MD_MMV_PROC_M,
1867 		    "proc_mas: done unlog_msg for (%d,0x%llx-%d) type %d\n",
1868 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1869 	}
1870 
1871 	/*
1872 	 * In case of submessages, we increased the submessage ID in the
1873 	 * result structure. We restore the message ID to the value that
1874 	 * the initiator is waiting for.
1875 	 */
1876 	result->mmr_msgid.mid_smid	= 0;
1877 	result->mmr_msgtype		= orig_type;
1878 	result->mmr_sender		= set_master;
1879 
1880 	/* if we have an inited client, send result */
1881 	ret = (int *)NULL;
1882 
1883 	rw_rdlock(&client_rwlock[setno]);
1884 	if (check_client(setno, sender)) {
1885 		commd_debug(MD_MMV_SYSLOG,
1886 		    "proc_mas: unable to create client for initiator\n");
1887 	} else {
1888 		ret = mdmn_wakeup_initiator_2(result, client[setno][sender],
1889 		    sender);
1890 	}
1891 	rw_unlock(&client_rwlock[setno]);
1892 
1893 	if (ret == (int *)NULL) {
1894 		commd_debug(MD_MMV_PROC_M,
1895 		    "proc_mas: couldn't wakeup initiator\n");
1896 	} else {
1897 		if (*ret != MDMNE_ACK) {
1898 			commd_debug(MD_MMV_PROC_M,
1899 			    "proc_mas: wakeup_initiator returned %d\n",
1900 			    *ret);
1901 		}
1902 		free(ret);
1903 	}
1904 
1905 	rw_unlock(&set_desc_rwlock[setno]);
1906 	/* Free all submessages, if there were any */
1907 	if (nmsgs > 1) {
1908 		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1909 			free_msg(msglist[curmsg]);
1910 		}
1911 	}
1912 	/* Free the result */
1913 	free_result(result);
1914 
1915 	mutex_lock(&mdmn_busy_mutex[setno]);
1916 	mdmn_mark_class_unbusy(setno, orig_class);
1917 	mutex_unlock(&mdmn_busy_mutex[setno]);
1918 
1919 
1920 	/*
1921 	 * We use this ioctl just to get the time in the same format as used in
1922 	 * the messageID. If it fails, all we get is a bad runtime output.
1923 	 */
1924 	(void) metaioctl(MD_IOCGUNIQMSGID, &dummyid, &mde, NULL);
1925 	secdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) >> 32;
1926 	usecdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) & 0xfffff;
1927 
1928 	/* catching possible overflow */
1929 	if (usecdiff >= 1000000) {
1930 		usecdiff -= 1000000;
1931 		secdiff++;
1932 	}
1933 
1934 
1935 	commd_debug(MD_MMV_PROC_M, "proc_mas: done (%d, 0x%llx-%d) type=%02d "
1936 	    "%5d.%06d secs runtime\n",
1937 	    MSGID_ELEMS(msg->msg_msgid), orig_type, secdiff, usecdiff);
1938 
1939 	/* Free the original message */
1940 	free_msg(msg);
1941 }
1942 
1943 void
1944 mdmn_slave_process_msg(md_mn_msg_t *msg)
1945 {
1946 	int			*ret = NULL;
1947 	int			completed;
1948 	int			retries;
1949 	int			successfully_returned;
1950 	set_t			setno;
1951 	md_mn_result_t		*result;
1952 	md_mn_nodeid_t		sender;
1953 	md_mn_nodeid_t		whoami;
1954 	md_mn_msgtype_t		msgtype;
1955 	md_mn_msgclass_t	class;
1956 
1957 	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1958 
1959 	setno	= msg->msg_setno;
1960 	sender	= msg->msg_sender; /* this is always the master of the set */
1961 	msgtype	= msg->msg_type;
1962 
1963 	rw_rdlock(&set_desc_rwlock[setno]);
1964 	whoami		= set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
1965 	rw_unlock(&set_desc_rwlock[setno]);
1966 
1967 	result = Zalloc(sizeof (md_mn_result_t));
1968 	result->mmr_flags	= msg->msg_flags;
1969 	result->mmr_setno	= setno;
1970 	result->mmr_msgtype	= msgtype;
1971 	result->mmr_sender	= whoami;
1972 	result->mmr_comm_state	= MDMNE_ACK; /* Ok state */
1973 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1974 	class = mdmn_get_message_class(msgtype);
1975 
1976 	commd_debug(MD_MMV_PROC_S,
1977 	    "proc_sla: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1978 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msgtype);
1979 
1980 	handler = mdmn_get_handler(msgtype);
1981 
1982 	if (handler == NULL) {
1983 		result->mmr_exitval = 0;
1984 		/* let the sender decide if this is an error or not */
1985 		result->mmr_comm_state = MDMNE_NO_HANDLER;
1986 		commd_debug(MD_MMV_PROC_S,
1987 		    "proc_sla: No handler for (%d, 0x%llx-%d)\n",
1988 		    MSGID_ELEMS(msg->msg_msgid));
1989 	} else {
1990 
1991 		/* Did we already process this message ? */
1992 		mutex_lock(&mct_mutex[setno][class]);
1993 		completed = mdmn_check_completion(msg, result);
1994 
1995 		if (completed == MDMN_MCT_NOT_DONE) {
1996 			/* message not yet processed locally */
1997 			commd_debug(MD_MMV_PROC_S,
1998 			    "proc_sla: calling handler for (%d, 0x%llx-%d)\n",
1999 			    MSGID_ELEMS(msg->msg_msgid));
2000 
2001 			/*
2002 			 * Mark the message as being currently processed,
2003 			 * so we won't start a second handler for it
2004 			 */
2005 			(void) mdmn_mark_completion(msg, NULL,
2006 			    MDMN_MCT_IN_PROGRESS);
2007 
2008 			mutex_unlock(&mct_mutex[setno][class]);
2009 			(*handler)(msg, MD_MSGF_ON_SLAVE, result);
2010 
2011 			commd_debug(MD_MMV_PROC_S,
2012 			    "proc_sla: finished handler for (%d, 0x%llx-%d)\n",
2013 			    MSGID_ELEMS(msg->msg_msgid));
2014 
2015 			mutex_lock(&mct_mutex[setno][class]);
2016 			/* Mark the message as fully done, store the result */
2017 			(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
2018 
2019 		} else if (completed == MDMN_MCT_DONE) {
2020 			/* message processed previously, got result from MCT */
2021 			commd_debug(MD_MMV_PROC_S,
2022 			    "proc_sla: result for (%d, 0x%llx-%d) from MCT\n",
2023 			    MSGID_ELEMS(msg->msg_msgid));
2024 		} else if (completed == MDMN_MCT_IN_PROGRESS) {
2025 			/*
2026 			 * If the message is curruntly being processed,
2027 			 * we can return here, without sending a result back.
2028 			 * This will be done by the initial message handling
2029 			 * thread
2030 			 */
2031 			mutex_unlock(&mct_mutex[setno][class]);
2032 			commd_debug(MD_MMV_PROC_M, "proc_sla: "
2033 			    "(%d, 0x%llx-%d) is currently being processed\n",
2034 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
2035 
2036 			free_msg(msg);
2037 			free_result(result);
2038 			return;
2039 		} else {
2040 			/* MCT error occurred (should never happen) */
2041 			result->mmr_comm_state = MDMNE_LOG_FAIL;
2042 			commd_debug(MD_MMV_PROC_S,
2043 			    "proc_sla: MCT error for (%d, 0x%llx-%d)\n",
2044 			    MSGID_ELEMS(msg->msg_msgid));
2045 		}
2046 		mutex_unlock(&mct_mutex[setno][class]);
2047 	}
2048 
2049 	/*
2050 	 * At this point we have a result (even in an error case)
2051 	 * that we return to the master.
2052 	 */
2053 	rw_rdlock(&set_desc_rwlock[setno]);
2054 	retries = 2; /* we will try two times to send the results */
2055 	successfully_returned = 0;
2056 
2057 	while (!successfully_returned && (retries != 0)) {
2058 		ret = (int *)NULL;
2059 		rw_rdlock(&client_rwlock[setno]);
2060 		if (check_client(setno, sender)) {
2061 			/*
2062 			 * If we cannot setup the rpc connection to the master,
2063 			 * we can't do anything besides logging this fact.
2064 			 */
2065 			commd_debug(MD_MMV_SYSLOG,
2066 			    "proc_mas: unable to create client for master\n");
2067 			rw_unlock(&client_rwlock[setno]);
2068 			break;
2069 		} else {
2070 			ret = mdmn_wakeup_master_2(result,
2071 			    client[setno][sender], sender);
2072 			/*
2073 			 * if mdmn_wakeup_master_2 returns NULL, it can be that
2074 			 * the master (or the commd on the master) had died.
2075 			 * In that case, we destroy the client to the master
2076 			 * and retry.
2077 			 * If mdmn_wakeup_master_2 doesn't return MDMNE_ACK,
2078 			 * the commd on the master is alive but
2079 			 * something else is wrong,
2080 			 * in that case a retry doesn't make sense => break out
2081 			 */
2082 			if (ret == (int *)NULL) {
2083 				commd_debug(MD_MMV_PROC_S,
2084 				    "proc_sla: wakeup_master returned NULL\n");
2085 				/* release reader lock, grab writer lock */
2086 				rw_unlock(&client_rwlock[setno]);
2087 				rw_wrlock(&client_rwlock[setno]);
2088 				mdmn_clnt_destroy(client[setno][sender]);
2089 				if (client[setno][sender] != (CLIENT *)NULL) {
2090 					client[setno][sender] = (CLIENT *)NULL;
2091 				}
2092 				rw_unlock(&client_rwlock[setno]);
2093 				retries--;
2094 				commd_debug(MD_MMV_PROC_S,
2095 				    "retries = %d\n", retries);
2096 				continue;
2097 			}
2098 			if (*ret != MDMNE_ACK) {
2099 				commd_debug(MD_MMV_PROC_S, "proc_sla: "
2100 				    "wakeup_master returned %d\n", *ret);
2101 				rw_unlock(&client_rwlock[setno]);
2102 				break;
2103 			} else { /* Good case */
2104 				successfully_returned = 1;
2105 				rw_unlock(&client_rwlock[setno]);
2106 			}
2107 		}
2108 	}
2109 
2110 	rw_unlock(&set_desc_rwlock[setno]);
2111 	commd_debug(MD_MMV_PROC_S, "proc_sla: done (%d, 0x%llx-%d)\n",
2112 	    MSGID_ELEMS(msg->msg_msgid));
2113 
2114 	if (ret != (int *)NULL)
2115 		free(ret);
2116 	free_msg(msg);
2117 	free_result(result);
2118 }
2119 
2120 
2121 /*
2122  * mdmn_send_svc_2:
2123  * ---------------
2124  * Check that the issuing node is a legitimate one (i.e. is licensed to send
2125  * messages to us), that the RPC request can be staged.
2126  *
2127  * Returns:
2128  *	0	=> no RPC request is in-flight, no deferred svc_sendreply()
2129  *	1	=> queued RPC request in-flight. Completion will be made (later)
2130  *		   by a wakeup_initiator_2() [hopefully]
2131  */
2132 int
2133 mdmn_send_svc_2(md_mn_msg_t *omsg, struct svc_req *rqstp)
2134 {
2135 	int			err;
2136 	set_t			setno;
2137 	SVCXPRT			*transp = rqstp->rq_xprt;
2138 	md_mn_msg_t		*msg;
2139 	md_mn_result_t		*resultp;
2140 	md_mn_msgclass_t	class;
2141 	md_mn_msg_and_transp_t	*matp;
2142 
2143 	msg = copy_msg(omsg, NULL);
2144 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2145 
2146 	setno = msg->msg_setno;
2147 	class = mdmn_get_message_class(msg->msg_type);
2148 
2149 	/* If we are in the abort state, we error out immediately */
2150 	if (md_commd_global_state & MD_CGS_ABORTED) {
2151 		resultp = Zalloc(sizeof (md_mn_result_t));
2152 		resultp->mmr_comm_state = MDMNE_ABORT;
2153 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2154 		free_result(resultp);
2155 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2156 		return (0);
2157 	}
2158 
2159 	/* check if the global initialization is done */
2160 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2161 		global_init();
2162 	}
2163 
2164 	commd_debug(MD_MMV_SEND,
2165 	    "send: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2166 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2167 
2168 	/* Check for verbosity related message */
2169 	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2170 		md_mn_verbose_t *d;
2171 
2172 		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2173 		md_commd_global_verb = d->mmv_what;
2174 		/* everytime the bitmask is set, we reset the timer */
2175 		__savetime = gethrtime();
2176 		/*
2177 		 * If local-only-flag is set, we are done here,
2178 		 * otherwise we pass that message on to the master.
2179 		 */
2180 		if (msg->msg_flags & MD_MSGF_LOCAL_ONLY) {
2181 			resultp = Zalloc(sizeof (md_mn_result_t));
2182 			resultp->mmr_comm_state = MDMNE_ACK;
2183 			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2184 			    (char *)resultp);
2185 			free_result(resultp);
2186 			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2187 			return (0);
2188 		}
2189 	}
2190 
2191 	/*
2192 	 * Are we entering the abort state?
2193 	 * Here we don't even need to check for MD_MSGF_LOCAL_ONLY, because
2194 	 * this message cannot be distributed anyway.
2195 	 * So, it's safe to return immediately.
2196 	 */
2197 	if (msg->msg_type == MD_MN_MSG_ABORT) {
2198 		md_commd_global_state |= MD_CGS_ABORTED;
2199 		resultp = Zalloc(sizeof (md_mn_result_t));
2200 		resultp->mmr_comm_state = MDMNE_ACK;
2201 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2202 		free_result(resultp);
2203 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2204 		return (0);
2205 	}
2206 
2207 
2208 	/*
2209 	 * Is this message type blocked?
2210 	 * If so we return MDMNE_CLASS_LOCKED, immediately
2211 	 */
2212 	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2213 		resultp = Zalloc(sizeof (md_mn_result_t));
2214 		resultp->mmr_comm_state = MDMNE_CLASS_LOCKED;
2215 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2216 		free_result(resultp);
2217 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2218 		commd_debug(MD_MMV_SEND,
2219 		    "send: type locked (%d, 0x%llx-%d), set=%d, class=%d, "
2220 		    "type=%d\n", MSGID_ELEMS(msg->msg_msgid), setno, class,
2221 		    msg->msg_type);
2222 		return (0);
2223 	}
2224 
2225 
2226 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2227 		/* Can only use the appropriate mutexes if they are inited */
2228 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2229 			rw_wrlock(&set_desc_rwlock[setno]);
2230 			rw_wrlock(&client_rwlock[setno]);
2231 			err = mdmn_init_set(setno, MDMN_SET_READY);
2232 			rw_unlock(&client_rwlock[setno]);
2233 			rw_unlock(&set_desc_rwlock[setno]);
2234 		} else {
2235 			err = mdmn_init_set(setno, MDMN_SET_READY);
2236 		}
2237 
2238 		if (err) {
2239 			/* couldn't initialize connections, cannot proceed */
2240 			resultp = Zalloc(sizeof (md_mn_result_t));
2241 			resultp->mmr_comm_state = err;
2242 			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2243 			    (char *)resultp);
2244 			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2245 			free_result(resultp);
2246 			commd_debug(MD_MMV_SEND,
2247 			    "send: init err = %d\n", err);
2248 			return (0);
2249 		}
2250 	}
2251 
2252 	mutex_lock(&mdmn_busy_mutex[setno]);
2253 	if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2254 	    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2255 		mutex_unlock(&mdmn_busy_mutex[setno]);
2256 		resultp = Zalloc(sizeof (md_mn_result_t));
2257 		resultp->mmr_comm_state = MDMNE_SUSPENDED;
2258 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2259 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2260 		free_result(resultp);
2261 		commd_debug(MD_MMV_SEND,
2262 		    "send: class suspended (%d, 0x%llx-%d), set=%d, "
2263 		    "class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2264 		    setno, class, msg->msg_type);
2265 		return (0);
2266 	}
2267 	mutex_unlock(&mdmn_busy_mutex[setno]);
2268 
2269 	/* is this rpc request coming from the local node? */
2270 	if (check_license(rqstp, 0) == FALSE) {
2271 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2272 		commd_debug(MD_MMV_SEND,
2273 		    "send: check licence fail(%d, 0x%llx-%d), set=%d, "
2274 		    "class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2275 		    setno, class, msg->msg_type);
2276 		return (0);
2277 	}
2278 
2279 
2280 	/*
2281 	 * We allocate a structure that can take two pointers in order to pass
2282 	 * both the message and the transp into thread_create.
2283 	 * The free for this alloc is done in mdmn_send_to_work()
2284 	 */
2285 	matp = Malloc(sizeof (md_mn_msg_and_transp_t));
2286 	matp->mat_msg = msg;
2287 	matp->mat_transp = transp;
2288 
2289 	/*
2290 	 * create a thread here that calls work on the master.
2291 	 * If we are already on the master, this would block if running
2292 	 * in the same context. (our service is single threaded)(
2293 	 * Make it a detached thread because it will not communicate with
2294 	 * anybody thru thr_* mechanisms
2295 	 */
2296 	thr_create(NULL, 0, mdmn_send_to_work, (void *) matp, THR_DETACHED,
2297 	    NULL);
2298 
2299 	commd_debug(MD_MMV_SEND, "send: done (%d, 0x%llx-%d)\n",
2300 	    MSGID_ELEMS(msg->msg_msgid));
2301 	/*
2302 	 * We return here without sending results. This will be done by
2303 	 * mdmn_wakeup_initiator_svc_2() as soon as the results are available.
2304 	 * Until then the calling send_message will be blocked, while we
2305 	 * are able to take calls.
2306 	 */
2307 
2308 	return (1);
2309 }
2310 
2311 /* ARGSUSED */
2312 int *
2313 mdmn_work_svc_2(md_mn_msg_t *omsg, struct svc_req *rqstp)
2314 {
2315 	int		err;
2316 	set_t		setno;
2317 	thread_t	tid;
2318 	int		*retval;
2319 	md_mn_msg_t	*msg;
2320 	md_mn_msgclass_t class;
2321 
2322 	retval = Malloc(sizeof (int));
2323 
2324 	/* If we are in the abort state, we error out immediately */
2325 	if (md_commd_global_state & MD_CGS_ABORTED) {
2326 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2327 		*retval = MDMNE_ABORT;
2328 		return (retval);
2329 	}
2330 
2331 	msg = copy_msg(omsg, NULL);
2332 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2333 
2334 	/*
2335 	 * Is this message type blocked?
2336 	 * If so we return MDMNE_CLASS_LOCKED, immediately.
2337 	 * This check is performed on master and slave.
2338 	 */
2339 	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2340 		*retval = MDMNE_CLASS_LOCKED;
2341 		return (retval);
2342 	}
2343 
2344 	/* check if the global initialization is done */
2345 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2346 		global_init();
2347 	}
2348 
2349 	class = mdmn_get_message_class(msg->msg_type);
2350 	setno = msg->msg_setno;
2351 
2352 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2353 		/* Can only use the appropriate mutexes if they are inited */
2354 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2355 			rw_wrlock(&set_desc_rwlock[setno]);
2356 			rw_wrlock(&client_rwlock[setno]);
2357 			err = mdmn_init_set(setno, MDMN_SET_READY);
2358 			rw_unlock(&client_rwlock[setno]);
2359 			rw_unlock(&set_desc_rwlock[setno]);
2360 		} else {
2361 			err = mdmn_init_set(setno, MDMN_SET_READY);
2362 		}
2363 
2364 		if (err) {
2365 			*retval = MDMNE_CANNOT_CONNECT;
2366 			free_msg(msg);
2367 			return (retval);
2368 		}
2369 	}
2370 
2371 	/* is this rpc request coming from a licensed node? */
2372 	if (check_license(rqstp, msg->msg_sender) == FALSE) {
2373 		free_msg(msg);
2374 		*retval = MDMNE_RPC_FAIL;
2375 		return (retval);
2376 	}
2377 
2378 	commd_debug(MD_MMV_WORK,
2379 	    "work: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
2380 	    "flags=0x%x\n",
2381 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type,
2382 	    msg->msg_flags);
2383 
2384 	/* Check for various CLASS0 message types */
2385 	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2386 		md_mn_verbose_t *d;
2387 
2388 		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2389 		/* for now we ignore set / class in md_mn_verbose_t */
2390 		md_commd_global_verb = d->mmv_what;
2391 		/* everytime the bitmask is set, we reset the timer */
2392 		__savetime = gethrtime();
2393 	}
2394 
2395 	mutex_lock(&mdmn_busy_mutex[setno]);
2396 
2397 	/* check if class is locked via a call to mdmn_comm_lock_svc_2 */
2398 	if (mdmn_is_class_locked(setno, class) == TRUE) {
2399 		mutex_unlock(&mdmn_busy_mutex[setno]);
2400 		*retval = MDMNE_CLASS_LOCKED;
2401 		free_msg(msg);
2402 		return (retval);
2403 	}
2404 	mutex_unlock(&mdmn_busy_mutex[setno]);
2405 
2406 	/* Check if the class is busy right now. Do it only on the master */
2407 	rw_rdlock(&set_desc_rwlock[setno]);
2408 	if (set_descriptor[setno]->sd_mn_am_i_master) {
2409 		rw_unlock(&set_desc_rwlock[setno]);
2410 		/*
2411 		 * If the class is currently suspended, don't accept new
2412 		 * messages, unless they are flagged with an override bit.
2413 		 */
2414 		mutex_lock(&mdmn_busy_mutex[setno]);
2415 		if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2416 		    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2417 			mutex_unlock(&mdmn_busy_mutex[setno]);
2418 			*retval = MDMNE_SUSPENDED;
2419 			commd_debug(MD_MMV_SEND,
2420 			    "send: set %d is suspended\n", setno);
2421 			free_msg(msg);
2422 			return (retval);
2423 		}
2424 		if (mdmn_mark_class_busy(setno, class) == FALSE) {
2425 			mutex_unlock(&mdmn_busy_mutex[setno]);
2426 			*retval = MDMNE_CLASS_BUSY;
2427 			free_msg(msg);
2428 			return (retval);
2429 		}
2430 		mutex_unlock(&mdmn_busy_mutex[setno]);
2431 		/*
2432 		 * Because the real processing of the message takes time we
2433 		 * create a thread for it. So the master thread can continue
2434 		 * to run and accept further messages.
2435 		 */
2436 		*retval = thr_create(NULL, 0,
2437 		    (void *(*)(void *))mdmn_master_process_msg, (void *)msg,
2438 		    THR_DETACHED|THR_SUSPENDED, &tid);
2439 	} else {
2440 		rw_unlock(&set_desc_rwlock[setno]);
2441 		*retval = thr_create(NULL, 0,
2442 		    (void *(*)(void *)) mdmn_slave_process_msg, (void *)msg,
2443 		    THR_DETACHED|THR_SUSPENDED, &tid);
2444 	}
2445 
2446 	if (*retval != 0) {
2447 		*retval = MDMNE_THR_CREATE_FAIL;
2448 		free_msg(msg);
2449 		return (retval);
2450 	}
2451 
2452 	/* Now run the new thread */
2453 	thr_continue(tid);
2454 
2455 	commd_debug(MD_MMV_WORK,
2456 	    "work: done (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2457 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2458 
2459 	*retval = MDMNE_ACK; /* this means success */
2460 	return (retval);
2461 }
2462 
2463 /* ARGSUSED */
2464 int *
2465 mdmn_wakeup_initiator_svc_2(md_mn_result_t *res, struct svc_req *rqstp)
2466 {
2467 
2468 	int		*retval;
2469 	int		err;
2470 	set_t		setno;
2471 	mutex_t		*mx;   /* protection of initiator_table */
2472 	SVCXPRT		*transp = NULL;
2473 	md_mn_msgid_t	initiator_table_id;
2474 	md_mn_msgclass_t class;
2475 
2476 	retval = Malloc(sizeof (int));
2477 
2478 	/* check if the global initialization is done */
2479 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2480 		global_init();
2481 	}
2482 
2483 	setno	= res->mmr_setno;
2484 
2485 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2486 		/* set not ready means we just crashed are restarted now */
2487 		/* Can only use the appropriate mutexes if they are inited */
2488 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2489 			rw_wrlock(&set_desc_rwlock[setno]);
2490 			rw_wrlock(&client_rwlock[setno]);
2491 			err = mdmn_init_set(setno, MDMN_SET_READY);
2492 			rw_unlock(&client_rwlock[setno]);
2493 			rw_unlock(&set_desc_rwlock[setno]);
2494 		} else {
2495 			err = mdmn_init_set(setno, MDMN_SET_READY);
2496 		}
2497 
2498 		if (err) {
2499 			*retval = MDMNE_CANNOT_CONNECT;
2500 			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2501 			return (retval);
2502 		}
2503 	}
2504 
2505 	/* is this rpc request coming from a licensed node? */
2506 	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2507 		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2508 		*retval = MDMNE_RPC_FAIL;
2509 		return (retval);
2510 	}
2511 
2512 
2513 	class	= mdmn_get_message_class(res->mmr_msgtype);
2514 	mx	= mdmn_get_initiator_table_mx(setno, class);
2515 
2516 	commd_debug(MD_MMV_WAKE_I,
2517 	    "wake_ini: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
2518 	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype);
2519 
2520 	mutex_lock(mx);
2521 
2522 	/*
2523 	 * Search the initiator wakeup table.
2524 	 * If we find an entry here (which should always be true)
2525 	 * we are on the initiating node and we wakeup the original
2526 	 * local rpc call.
2527 	 */
2528 	mdmn_get_initiator_table_id(setno, class, &initiator_table_id);
2529 
2530 	if (MSGID_CMP(&(initiator_table_id), &(res->mmr_msgid))) {
2531 		transp = mdmn_get_initiator_table_transp(setno, class);
2532 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)res);
2533 		svc_done(transp);
2534 		mdmn_unregister_initiator_table(setno, class);
2535 		*retval = MDMNE_ACK;
2536 
2537 		commd_debug(MD_MMV_WAKE_I,
2538 		    "wake_ini: replied (%d, 0x%llx-%d)\n",
2539 		    MSGID_ELEMS(res->mmr_msgid));
2540 	} else {
2541 		commd_debug(MD_MMV_WAKE_I,
2542 		    "wakeup initiator: unsolicited message (%d, 0x%llx-%d)\n",
2543 		    MSGID_ELEMS(res->mmr_msgid));
2544 		*retval = MDMNE_NO_WAKEUP_ENTRY;
2545 	}
2546 	mutex_unlock(mx);
2547 	/* less work for check_timeouts */
2548 	mutex_lock(&check_timeout_mutex);
2549 	if (messages_on_their_way == 0) {
2550 		commd_debug(MD_MMV_WAKE_I,
2551 		    "Oops, messages_on_their_way < 0 (%d, 0x%llx-%d)\n",
2552 		    MSGID_ELEMS(res->mmr_msgid));
2553 	} else {
2554 		messages_on_their_way--;
2555 	}
2556 	mutex_unlock(&check_timeout_mutex);
2557 	xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2558 
2559 	return (retval);
2560 }
2561 
2562 
2563 /*
2564  * res must be free'd by the thread we wake up
2565  */
2566 /* ARGSUSED */
2567 int *
2568 mdmn_wakeup_master_svc_2(md_mn_result_t *ores, struct svc_req *rqstp)
2569 {
2570 
2571 	int		*retval;
2572 	int		err;
2573 	set_t		setno;
2574 	cond_t		*cv;
2575 	mutex_t		*mx;
2576 	md_mn_msgid_t	master_table_id;
2577 	md_mn_nodeid_t	sender;
2578 	md_mn_result_t	*res;
2579 	md_mn_msgclass_t class;
2580 
2581 	retval = Malloc(sizeof (int));
2582 
2583 	/* check if the global initialization is done */
2584 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2585 		global_init();
2586 	}
2587 
2588 	/* Need to copy the results here, as they are static for RPC */
2589 	res = copy_result(ores);
2590 	xdr_free(xdr_md_mn_result_t, (caddr_t)ores);
2591 
2592 	class = mdmn_get_message_class(res->mmr_msgtype);
2593 	setno = res->mmr_setno;
2594 
2595 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2596 		/* set not ready means we just crashed are restarted now */
2597 		/* Can only use the appropriate mutexes if they are inited */
2598 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2599 			rw_wrlock(&set_desc_rwlock[setno]);
2600 			rw_wrlock(&client_rwlock[setno]);
2601 			err = mdmn_init_set(setno, MDMN_SET_READY);
2602 			rw_unlock(&client_rwlock[setno]);
2603 			rw_unlock(&set_desc_rwlock[setno]);
2604 		} else {
2605 			err = mdmn_init_set(setno, MDMN_SET_READY);
2606 		}
2607 
2608 		if (err) {
2609 			*retval = MDMNE_CANNOT_CONNECT;
2610 			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2611 			return (retval);
2612 		}
2613 	}
2614 
2615 	/* is this rpc request coming from a licensed node? */
2616 	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2617 		*retval = MDMNE_RPC_FAIL;
2618 		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2619 		return (retval);
2620 	}
2621 
2622 
2623 	commd_debug(MD_MMV_WAKE_M,
2624 	    "wake_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d "
2625 	    "from %d\n",
2626 	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype,
2627 	    res->mmr_sender);
2628 	/*
2629 	 * The mutex and cv are needed for waking up the thread
2630 	 * sleeping in mdmn_master_process_msg()
2631 	 */
2632 	mx = mdmn_get_master_table_mx(setno, class);
2633 	cv = mdmn_get_master_table_cv(setno, class);
2634 
2635 	/*
2636 	 * lookup the master wakeup table
2637 	 * If we find our message, we are on the master and
2638 	 * called by a slave that finished processing a message.
2639 	 * We store the results in the appropriate slot and
2640 	 * wakeup the thread (mdmn_master_process_msg()) waiting for them.
2641 	 */
2642 	mutex_lock(mx);
2643 	mdmn_get_master_table_id(setno, class, &master_table_id);
2644 	sender = mdmn_get_master_table_addr(setno, class);
2645 
2646 	if (MSGID_CMP(&(master_table_id), &(res->mmr_msgid))) {
2647 		if (sender == res->mmr_sender) {
2648 			mdmn_set_master_table_res(setno, class, res);
2649 			cond_signal(cv);
2650 			*retval = MDMNE_ACK;
2651 		} else {
2652 			/* id is correct but wrong sender (I smell a timeout) */
2653 			commd_debug(MD_MMV_WAKE_M,
2654 			    "wakeup master got unsolicited message: "
2655 			    "(%d, 0x%llx-%d) from %d\n",
2656 			    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender);
2657 			free_result(res);
2658 			*retval = MDMNE_TIMEOUT;
2659 		}
2660 	} else {
2661 		/* id is wrong, smells like a very late timeout */
2662 		commd_debug(MD_MMV_WAKE_M,
2663 		    "wakeup master got unsolicited message: "
2664 		    "(%d, 0x%llx-%d) from %d, expected (%d, 0x%llx-%d)\n",
2665 		    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender,
2666 		    MSGID_ELEMS(master_table_id));
2667 		free_result(res);
2668 		*retval = MDMNE_NO_WAKEUP_ENTRY;
2669 	}
2670 
2671 	mutex_unlock(mx);
2672 
2673 	return (retval);
2674 }
2675 
2676 /*
2677  * Lock a set/class combination.
2678  * This is mainly done for debug purpose.
2679  * This set/class combination immediately is blocked,
2680  * even in the middle of sending messages to multiple slaves.
2681  * This remains until the user issues a mdmn_comm_unlock_svc_2 for the same
2682  * set/class combination.
2683  *
2684  * Special messages of class MD_MSG_CLASS0 can never be locked.
2685  * 	e.g. MD_MN_MSG_VERBOSITY, MD_MN_MSG_ABORT
2686  *
2687  * That means, if MD_MSG_CLASS0 is specified, we lock all classes from
2688  * >= MD_MSG_CLASS1 to < MD_MN_NCLASSES
2689  *
2690  * set must be between 1 and MD_MAXSETS
2691  * class can be:
2692  *	MD_MSG_CLASS0 which means all other classes in this case
2693  *	or one specific class (< MD_MN_NCLASSES)
2694  *
2695  * Returns:
2696  *	MDMNE_ACK on sucess (locking a locked class is Ok)
2697  *	MDMNE_EINVAL if a parameter is out of range
2698  */
2699 
2700 /* ARGSUSED */
2701 int *
2702 mdmn_comm_lock_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2703 {
2704 	int			*retval;
2705 	set_t			setno = msc->msc_set;
2706 	md_mn_msgclass_t	class = msc->msc_class;
2707 
2708 	retval = Malloc(sizeof (int));
2709 
2710 	/* check if the global initialization is done */
2711 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2712 		global_init();
2713 	}
2714 
2715 	/* is this rpc request coming from the local node ? */
2716 	if (check_license(rqstp, 0) == FALSE) {
2717 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2718 		*retval = MDMNE_RPC_FAIL;
2719 		return (retval);
2720 	}
2721 
2722 	/* Perform some range checking */
2723 	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2724 	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2725 		*retval = MDMNE_EINVAL;
2726 		return (retval);
2727 	}
2728 
2729 	commd_debug(MD_MMV_MISC, "lock: set=%d, class=%d\n", setno, class);
2730 	mutex_lock(&mdmn_busy_mutex[setno]);
2731 	if (class != MD_MSG_CLASS0) {
2732 		mdmn_mark_class_locked(setno, class);
2733 	} else {
2734 		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2735 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2736 			mdmn_mark_class_locked(setno, class);
2737 		}
2738 	}
2739 	mutex_unlock(&mdmn_busy_mutex[setno]);
2740 
2741 	*retval = MDMNE_ACK;
2742 	return (retval);
2743 }
2744 
2745 /*
2746  * Unlock a set/class combination.
2747  * set must be between 1 and MD_MAXSETS
2748  * class can be:
2749  *	MD_MSG_CLASS0 which means all other classes in this case (like above)
2750  *	or one specific class (< MD_MN_NCLASSES)
2751  *
2752  * Returns:
2753  *	MDMNE_ACK on sucess (unlocking an unlocked class is Ok)
2754  *	MDMNE_EINVAL if a parameter is out of range
2755  */
2756 /* ARGSUSED */
2757 int *
2758 mdmn_comm_unlock_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2759 {
2760 	int			*retval;
2761 	set_t			setno  = msc->msc_set;
2762 	md_mn_msgclass_t	class  = msc->msc_class;
2763 
2764 	retval = Malloc(sizeof (int));
2765 
2766 	/* check if the global initialization is done */
2767 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2768 		global_init();
2769 	}
2770 
2771 	/* is this rpc request coming from the local node ? */
2772 	if (check_license(rqstp, 0) == FALSE) {
2773 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2774 		*retval = MDMNE_RPC_FAIL;
2775 		return (retval);
2776 	}
2777 
2778 	/* Perform some range checking */
2779 	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2780 	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2781 		*retval = MDMNE_EINVAL;
2782 		return (retval);
2783 	}
2784 	commd_debug(MD_MMV_MISC, "unlock: set=%d, class=%d\n", setno, class);
2785 
2786 	mutex_lock(&mdmn_busy_mutex[setno]);
2787 	if (class != MD_MSG_CLASS0) {
2788 		mdmn_mark_class_unlocked(setno, class);
2789 	} else {
2790 		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2791 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2792 			mdmn_mark_class_unlocked(setno, class);
2793 		}
2794 	}
2795 	mutex_unlock(&mdmn_busy_mutex[setno]);
2796 
2797 	*retval = MDMNE_ACK;
2798 	return (retval);
2799 }
2800 
2801 /*
2802  * mdmn_comm_suspend_svc_2(setno, class)
2803  *
2804  * Drain all outstanding messages for a given set/class combination
2805  * and don't allow new messages to be processed.
2806  *
2807  * Special messages of class MD_MSG_CLASS0 can never be locked.
2808  * 	e.g. MD_MN_MSG_VERBOSITY
2809  *
2810  * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2811  * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2812  *
2813  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2814  * one class as being suspended.
2815  * If messages for this class are currently on their way,
2816  * MDMNE_SET_NOT_DRAINED is returned. Otherwise MDMNE_ACK is returned.
2817  *
2818  * If class _is_ MD_COMM_ALL_CLASSES we drain all classes of this set.
2819  * Messages must be generated in ascending order.
2820  * This means, a message cannot create submessages with the same or lower class.
2821  * Draining messages must go from 1 to NCLASSES in order to ensure we don't
2822  * generate a hanging situation here.
2823  * We mark class 1 as being suspended.
2824  * if the class is not busy, we proceed with class 2
2825  * and so on
2826  * if a class *is* busy, we cannot continue here, but return
2827  * MDMNE_SET_NOT_DRAINED.
2828  * We expect the caller to hold on for some seconds and try again.
2829  * When that message, that held the class busy is done in
2830  * mdmn_master_process_msg(), mdmn_mark_class_unbusy() called.
2831  * There it is checked if the class is about to drain.
2832  * In that case it tries to drain all higher classes there.
2833  *
2834  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2835  * In that case we return MDMNE_SET_NOT_DRAINED if not all sets are
2836  * completely drained.
2837  *
2838  * Returns:
2839  *	MDMNE_ACK on sucess (set is drained, no outstanding messages)
2840  *	MDMNE_SET_NOT_DRAINED  if drain process is started, but there are
2841  *		still outstanding messages for this set(s)
2842  *	MDMNE_EINVAL if setno is out of range
2843  *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2844  */
2845 
2846 /* ARGSUSED */
2847 int *
2848 mdmn_comm_suspend_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2849 {
2850 	int			*retval;
2851 	int			failure = 0;
2852 	set_t			startset, endset;
2853 	set_t			setno  = msc->msc_set;
2854 	md_mn_msgclass_t	oclass = msc->msc_class;
2855 #ifdef NOT_YET_NEEDED
2856 	uint_t			flags  = msc->msc_flags;
2857 #endif /* NOT_YET_NEEDED */
2858 	md_mn_msgclass_t	class;
2859 
2860 	retval = Malloc(sizeof (int));
2861 
2862 	/* check if the global initialization is done */
2863 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2864 		global_init();
2865 	}
2866 
2867 	/* is this rpc request coming from the local node ? */
2868 	if (check_license(rqstp, 0) == FALSE) {
2869 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2870 		*retval = MDMNE_RPC_FAIL;
2871 		return (retval);
2872 	}
2873 
2874 	commd_debug(MD_MMV_MISC, "suspend: called for set=%d class=%d\n",
2875 	    setno, oclass);
2876 
2877 	/* Perform some range checking */
2878 	if (setno >= MD_MAXSETS) {
2879 		*retval = MDMNE_EINVAL;
2880 		commd_debug(MD_MMV_MISC, "suspend: returning MDMNE_EINVAL\n");
2881 		return (retval);
2882 	}
2883 
2884 	/*  setno == MD_COMM_ALL_SETS means: we walk thru all possible sets. */
2885 	if (setno == MD_COMM_ALL_SETS) {
2886 		startset = 1;
2887 		endset = MD_MAXSETS - 1;
2888 	} else {
2889 		startset = setno;
2890 		endset = setno;
2891 	}
2892 
2893 	for (setno = startset; setno <= endset; setno++) {
2894 		/* Here we need the mutexes for the set to be setup */
2895 		if (md_mn_set_inited[setno] != MDMN_SET_MUTEXES) {
2896 			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
2897 		}
2898 
2899 		mutex_lock(&mdmn_busy_mutex[setno]);
2900 		/* shall we drain all classes of this set? */
2901 		if (oclass == MD_COMM_ALL_CLASSES) {
2902 			for (class = 1; class < MD_MN_NCLASSES; class ++) {
2903 				commd_debug(MD_MMV_MISC,
2904 				    "suspend: suspending set %d, class %d\n",
2905 				    setno, class);
2906 				*retval = mdmn_mark_class_suspended(setno,
2907 				    class, MDMN_SUSPEND_ALL);
2908 				if (*retval == MDMNE_SET_NOT_DRAINED) {
2909 					failure++;
2910 				}
2911 			}
2912 		} else {
2913 			/* only drain one specific class */
2914 			commd_debug(MD_MMV_MISC,
2915 			    "suspend: suspending set=%d class=%d\n",
2916 			    setno, oclass);
2917 			*retval = mdmn_mark_class_suspended(setno, oclass,
2918 			    MDMN_SUSPEND_1);
2919 			if (*retval == MDMNE_SET_NOT_DRAINED) {
2920 				failure++;
2921 			}
2922 		}
2923 		mutex_unlock(&mdmn_busy_mutex[setno]);
2924 	}
2925 	/* If one or more sets are not entirely drained, failure is non-zero */
2926 	if (failure != 0) {
2927 		*retval = MDMNE_SET_NOT_DRAINED;
2928 		commd_debug(MD_MMV_MISC,
2929 		    "suspend: returning MDMNE_SET_NOT_DRAINED\n");
2930 	} else {
2931 		*retval = MDMNE_ACK;
2932 	}
2933 
2934 	return (retval);
2935 }
2936 
2937 /*
2938  * mdmn_comm_resume_svc_2(setno, class)
2939  *
2940  * Resume processing messages for a given set.
2941  * This incorporates the repeal of a previous suspend operation.
2942  *
2943  * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2944  * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2945  *
2946  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2947  * one class as being resumed.
2948  *
2949  * If class _is_ MD_COMM_ALL_CLASSES we resume all classes of this set.
2950  *
2951  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2952  *
2953  * If both setno is MD_COMM_ALL_SETS and class is MD_COMM_ALL_CLASSES we also
2954  * reset any ABORT flag from the global state.
2955  *
2956  * Returns:
2957  *	MDMNE_ACK on sucess (resuming an unlocked set is Ok)
2958  *	MDMNE_EINVAL if setno is out of range
2959  *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2960  */
2961 /* ARGSUSED */
2962 int *
2963 mdmn_comm_resume_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2964 {
2965 	int			*retval;
2966 	set_t			startset, endset;
2967 	set_t			setno  = msc->msc_set;
2968 	md_mn_msgclass_t	oclass = msc->msc_class;
2969 	uint_t			flags  = msc->msc_flags;
2970 	md_mn_msgclass_t	class;
2971 
2972 	retval = Malloc(sizeof (int));
2973 
2974 	/* check if the global initialization is done */
2975 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2976 		global_init();
2977 	}
2978 
2979 	/* is this rpc request coming from the local node ? */
2980 	if (check_license(rqstp, 0) == FALSE) {
2981 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2982 		*retval = MDMNE_RPC_FAIL;
2983 		return (retval);
2984 	}
2985 
2986 	commd_debug(MD_MMV_MISC, "resume: called for set=%d class=%d\n",
2987 	    setno, oclass);
2988 
2989 	/* Perform some range checking */
2990 	if (setno > MD_MAXSETS) {
2991 		*retval = MDMNE_EINVAL;
2992 		return (retval);
2993 	}
2994 
2995 	if (setno == MD_COMM_ALL_SETS) {
2996 		startset = 1;
2997 		endset = MD_MAXSETS - 1;
2998 		if (oclass == MD_COMM_ALL_CLASSES) {
2999 			/* This is the point where we "unabort" the commd */
3000 			commd_debug(MD_MMV_MISC, "resume: resetting ABORT\n");
3001 			md_commd_global_state &= ~MD_CGS_ABORTED;
3002 		}
3003 	} else {
3004 		startset = setno;
3005 		endset = setno;
3006 	}
3007 
3008 	for (setno = startset; setno <= endset; setno++) {
3009 
3010 		/* Here we need the mutexes for the set to be setup */
3011 		if ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0) {
3012 			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
3013 		}
3014 
3015 		mutex_lock(&mdmn_busy_mutex[setno]);
3016 
3017 		if (oclass == MD_COMM_ALL_CLASSES) {
3018 			int end_class = 1;
3019 			/*
3020 			 * When SUSPENDing all classes, we go
3021 			 * from 1 to MD_MN_NCLASSES-1
3022 			 * The correct reverse action is RESUMing
3023 			 * from MD_MN_NCLASSES-1 to 1 (or 2)
3024 			 */
3025 
3026 			if (flags & MD_MSCF_DONT_RESUME_CLASS1) {
3027 				end_class = 2;
3028 			}
3029 
3030 			/*
3031 			 * Then mark all classes of this set as no longer
3032 			 * suspended. This supersedes any previous suspend(1)
3033 			 * calls and resumes the set entirely.
3034 			 */
3035 			for (class = MD_MN_NCLASSES - 1; class >= end_class;
3036 			    class --) {
3037 				commd_debug(MD_MMV_MISC,
3038 				    "resume: resuming set=%d class=%d\n",
3039 				    setno, class);
3040 				mdmn_mark_class_resumed(setno, class,
3041 				    (MDMN_SUSPEND_ALL | MDMN_SUSPEND_1));
3042 			}
3043 		} else {
3044 			/*
3045 			 * In this case only one class is marked as not
3046 			 * suspended. If a suspend(all) is currently active for
3047 			 * this set, this class will still be suspended.
3048 			 * That state will be cleared by a suspend(all)
3049 			 * (see above)
3050 			 */
3051 			commd_debug(MD_MMV_MISC,
3052 			    "resume: resuming set=%d class=%d\n",
3053 			    setno, oclass);
3054 			mdmn_mark_class_resumed(setno, oclass, MDMN_SUSPEND_1);
3055 		}
3056 
3057 		mutex_unlock(&mdmn_busy_mutex[setno]);
3058 	}
3059 
3060 	*retval = MDMNE_ACK;
3061 	return (retval);
3062 }
3063 /* ARGSUSED */
3064 int *
3065 mdmn_comm_reinit_set_svc_2(set_t *setnop, struct svc_req *rqstp)
3066 {
3067 	int		*retval;
3068 	md_mnnode_desc	*node;
3069 	set_t		 setno = *setnop;
3070 
3071 	retval = Malloc(sizeof (int));
3072 
3073 	/* check if the global initialization is done */
3074 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3075 		global_init();
3076 	}
3077 
3078 	/* is this rpc request coming from the local node ? */
3079 	if (check_license(rqstp, 0) == FALSE) {
3080 		xdr_free(xdr_set_t, (caddr_t)setnop);
3081 		*retval = MDMNE_RPC_FAIL;
3082 		return (retval);
3083 	}
3084 
3085 	commd_debug(MD_MMV_MISC, "reinit: set=%d\n", setno);
3086 
3087 	rw_rdlock(&set_desc_rwlock[setno]);
3088 	/*
3089 	 * We assume, that all messages have been suspended previously.
3090 	 *
3091 	 * As we are modifying lots of clients here we grab the client_rwlock
3092 	 * in writer mode. This ensures, no new messages come in.
3093 	 */
3094 	rw_wrlock(&client_rwlock[setno]);
3095 	/* This set is no longer initialized */
3096 
3097 	if ((set_descriptor[setno] != NULL) &&
3098 	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
3099 		/* destroy all rpc clients from this set */
3100 		for (node = set_descriptor[setno]->sd_nodelist; node;
3101 		    node = node->nd_next) {
3102 			mdmn_clnt_destroy(client[setno][node->nd_nodeid]);
3103 			if (client[setno][node->nd_nodeid] != (CLIENT *)NULL) {
3104 				client[setno][node->nd_nodeid] = (CLIENT *)NULL;
3105 			}
3106 		}
3107 	md_mn_set_inited[setno] &= ~MDMN_SET_NODES;
3108 	}
3109 
3110 	commd_debug(MD_MMV_MISC, "reinit: done init_set(%d)\n", setno);
3111 
3112 	rw_unlock(&client_rwlock[setno]);
3113 	rw_unlock(&set_desc_rwlock[setno]);
3114 	*retval = MDMNE_ACK;
3115 	return (retval);
3116 }
3117 
3118 /*
3119  * This is just an interface for testing purpose.
3120  * Here we can disable single message types.
3121  * If we block a message type, this is valid for all MN sets.
3122  * If a message arrives later, and  it's message type is blocked, it will
3123  * be returned immediately with MDMNE_CLASS_LOCKED, which causes the sender to
3124  * resend this message over and over again.
3125  */
3126 
3127 /* ARGSUSED */
3128 int *
3129 mdmn_comm_msglock_svc_2(md_mn_type_and_lock_t *mmtl, struct svc_req *rqstp)
3130 {
3131 	int			*retval;
3132 	md_mn_msgtype_t		type = mmtl->mmtl_type;
3133 	uint_t			lock = mmtl->mmtl_lock;
3134 
3135 	retval = Malloc(sizeof (int));
3136 
3137 	/* check if the global initialization is done */
3138 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3139 		global_init();
3140 	}
3141 
3142 	/* is this rpc request coming from the local node ? */
3143 	if (check_license(rqstp, 0) == FALSE) {
3144 		xdr_free(xdr_md_mn_type_and_lock_t, (caddr_t)mmtl);
3145 		*retval = MDMNE_RPC_FAIL;
3146 		return (retval);
3147 	}
3148 
3149 	/* Perform some range checking */
3150 	if ((type == 0) || (type >= MD_MN_NMESSAGES)) {
3151 		*retval = MDMNE_EINVAL;
3152 		return (retval);
3153 	}
3154 
3155 	commd_debug(MD_MMV_MISC, "msglock: type=%d, lock=%d\n", type, lock);
3156 	msgtype_lock_state[type] = lock;
3157 
3158 	*retval = MDMNE_ACK;
3159 	return (retval);
3160 }
3161