xref: /titanic_41/usr/src/cmd/lvm/rpc.mdcommd/mdmn_commd_server.c (revision 55f5292c612446ce6f93ddd248c0019b5974618b)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include <unistd.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <sys/statvfs.h>
31 #include <sys/uadmin.h>
32 #include <sys/resource.h>
33 #include <fcntl.h>
34 #include <stdio.h>
35 #include <thread.h>
36 #include <meta.h>
37 #include <sdssc.h>
38 #include <mdmn_changelog.h>
39 #include "mdmn_subr.h"
40 
41 /*
42  * This is the communication daemon for SVM Multi Node Disksets.
43  * It runs on every node and provides the following rpc services:
44  *  - mdmn_send_svc_2
45  *  - mdmn_work_svc_2
46  *  - mdmn_wakeup_initiator_svc_2
47  *  - mdmn_wakeup_master_svc_2
48  *  - mdmn_comm_lock_svc_2
49  *  - mdmn_comm_unlock_svc_2
50  *  - mdmn_comm_suspend_svc_2
51  *  - mdmn_comm_resume_svc_2
52  *  - mdmn_comm_reinit_set_svc_2
53  * where send, lock, unlock and reinit are meant for external use,
54  * work and the two wakeups are for internal use only.
55  *
56  * NOTE:
57  * On every node only one of those xxx_2 functions can be active at the
58  * same time because the daemon is single threaded.
59  *
60  * (not quite true, as mdmn_send_svc_2 and mdmn_work_svc_2 do thr_create()s
61  * as part of their handlers, so those aspects are multi-threaded)
62  *
63  * In case an event occurs that has to be propagated to all the nodes...
64  *
65  * One node (the initiator)
66  *	calls the libmeta function mdmn_send_message()
67  *	This function calls the local daemon thru mdmn_send_svc_2.
68  *
69  * On the initiator:
70  *	mdmn_send_svc_2()
71  *	    - starts a thread -> mdmn_send_to_work() and returns.
72  *	mdmn_send_to_work()
73  *	    - sends this message over to the master of the diskset.
74  *	      This is done by calling mdmn_work_svc_2 on the master.
75  *	    - registers to the initiator_table
76  *	    - exits without doing a svc_sendreply() for the call to
77  *	      mdmn_send_svc_2. This means that call is blocked until somebody
78  *	      (see end of this comment) does a svc_sendreply().
79  *	      This means mdmn_send_message() does not yet return.
80  *	    - A timeout surveillance is started at this point.
81  *	      This means in case the master doesn't reply at all in an
82  *	      aproppriate time, an error condition is returned
83  *	      to the caller.
84  *
85  * On the master:
86  *	mdmn_work_svc_2()
87  *	    - starts a thread -> mdmn_master_process_msg() and returns
88  *	mdmn_master_process_msg()
89  *	    - logs the message to the change log
90  *	    - executes the message locally
91  *	    - flags the message in the change log
92  *	    - sends the message to mdmn_work_svc_2() on all the
93  *	      other nodes (slaves)
94  *	      after each call to mdmn_work_svc_2 the thread goes to sleep and
95  *	      will be woken up by mdmn_wakeup_master_svc_2() as soon as the
96  *	      slave node is done with this message.
97  *	    - In case the slave doesn't respond in a apropriate time, an error
98  *	      is assumed to ensure the master doesn't wait forever.
99  *
100  * On a slave:
101  *	mdmn_work_svc_2()
102  *	    - starts a thread -> mdmn_slave_process_msg() and returns
103  *	mdmn_slave_process_msg()
104  *	    - processes this message locally by calling the appropriate message
105  *	      handler, that creates some result.
106  *	    - sends that result thru a call to mdmn_wakeup_master_svc_2() to
107  *	      the master.
108  *
109  * Back on the master:
110  *	mdmn_wakeup_master_svc_2()
111  *	    - stores the result into the master_table.
112  *	    - signals the mdmn_master_process_msg-thread.
113  *	    - returns
114  *	mdmn_master_process_msg()
115  *	    - after getting the results from all nodes
116  *	    - sends them back to the initiating node thru a call to
117  *	      mdmn_wakeup_initiator_svc_2.
118  *
119  * Back on the initiator:
120  *	mdmn_wakeup_initiator_svc_2()
121  *	    - calls svc_sendreply() which makes the call to mdmn_send_svc_2()
122  *	      return.
123  *	      which allows the initial mdmn_send_message() call to return.
124  */
125 
126 FILE *commdout;		/* debug output for the commd */
127 char *commdoutfile;	/* file name for the above output */
128 /* want at least 10 MB free space when logging into a file */
129 #define	MIN_FS_SPACE	(10LL * 1024 * 1024)
130 
131 /*
132  * Number of outstanding messages that were initiated by this node.
133  * If zero, check_timeouts goes to sleep
134  */
135 uint_t	messages_on_their_way;
136 mutex_t	check_timeout_mutex;	/* need mutex to protect above */
137 cond_t	check_timeout_cv;	/* trigger for check_timeouts */
138 
139 /* for printing out time stamps */
140 hrtime_t __savetime;
141 
142 /* RPC clients for every set and every node and their protecting locks */
143 CLIENT	*client[MD_MAXSETS][NNODES];
144 rwlock_t client_rwlock[MD_MAXSETS];
145 
146 /* the descriptors of all possible sets and their protectors */
147 struct md_set_desc *set_descriptor[MD_MAXSETS];
148 rwlock_t set_desc_rwlock[MD_MAXSETS];
149 
150 /* the daemon to daemon communication has to timeout quickly */
151 static struct timeval FOUR_SECS = { 4, 0 };
152 
153 /* These indicate if a set has already been setup */
154 int md_mn_set_inited[MD_MAXSETS];
155 
156 /* For every set we have a message completion table and protecting mutexes */
157 md_mn_mct_t *mct[MD_MAXSETS];
158 mutex_t	mct_mutex[MD_MAXSETS][MD_MN_NCLASSES];
159 
160 /* Stuff to describe the global status of the commd on one node */
161 #define	MD_CGS_INITED		0x0001
162 #define	MD_CGS_ABORTED		0x0002	/* return everything with MDMNE_ABORT */
163 uint_t md_commd_global_state = 0;	/* No state when starting up */
164 
165 /*
166  * Global verbosity level for the daemon
167  */
168 uint_t md_commd_global_verb;
169 
170 /*
171  * libmeta doesn't like multiple threads in metaget_setdesc().
172  * So we must protect access to it with a global lock
173  */
174 mutex_t get_setdesc_mutex;
175 
176 /*
177  * Need a way to block single message types,
178  * hence an array with a status for every message type
179  */
180 uint_t msgtype_lock_state[MD_MN_NMESSAGES];
181 
182 /* for reading in the config file */
183 #define	MAX_LINE_SIZE 1024
184 
185 extern char *commd_get_outfile(void);
186 extern uint_t commd_get_verbosity(void);
187 
188 /*
189  * mdmn_clnt_create is a helper function for meta_client_create_retry.  It
190  * merely needs to call clnt_create_timed, and meta_client_create_retry
191  * will take care of the rest.
192  */
193 /* ARGSUSED */
194 static CLIENT *
195 mdmn_clnt_create(char *ignore, void *data, struct timeval *time_out)
196 {
197 	md_mnnode_desc	*node = (md_mnnode_desc *)data;
198 
199 	return (clnt_create_timed(node->nd_priv_ic, MDMN_COMMD, TWO, "tcp",
200 	    time_out));
201 }
202 
203 #define	FLUSH_DEBUGFILE() \
204 	if (commdout != (FILE *)NULL) { \
205 		(void) fflush(commdout); \
206 		(void) fsync(fileno(commdout)); \
207 	}
208 
209 static void
210 panic_system(int nid, md_mn_msgtype_t type, int master_err, int master_exitval,
211     md_mn_result_t *slave_result)
212 {
213 	md_mn_commd_err_t	commd_err;
214 	md_error_t		mne = mdnullerror;
215 	char			*msg_buf;
216 
217 	msg_buf = (char *)calloc(MAXPATHLEN + 1, sizeof (char));
218 
219 	FLUSH_DEBUGFILE();
220 
221 	if (master_err != MDMNE_ACK) {
222 		(void) snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC "
223 		    "fail on master when processing message type %d\n", type);
224 	} else if (slave_result == NULL) {
225 		(void) snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail "
226 		    "on node %d when processing message type %d\n", nid, type);
227 	} else {
228 		(void) snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: "
229 		    "Inconsistent return value from node %d when processing "
230 		    "message type %d. Master exitval = %d, "
231 		    "Slave exitval = %d\n", nid, type, master_exitval,
232 		    slave_result->mmr_exitval);
233 	}
234 	commd_err.size = strlen(msg_buf);
235 	commd_err.md_message = (uint64_t)(uintptr_t)&msg_buf[0];
236 
237 	(void) metaioctl(MD_MN_COMMD_ERR, &commd_err, &mne, "rpc.mdcommd");
238 	(void) uadmin(A_DUMP, AD_BOOT, NULL);
239 }
240 
241 static void
242 flush_fcout()
243 {
244 	struct statvfs64 vfsbuf;
245 	long long avail_bytes;
246 	int warned = 0;
247 
248 	for (; ; ) {
249 		(void) sleep(10);
250 		/* No output file, nothing to do */
251 		if (commdout == (FILE *)NULL)
252 			continue;
253 
254 		/*
255 		 * stat the appropriate filesystem to check for available space.
256 		 */
257 		if (statvfs64(commdoutfile, &vfsbuf)) {
258 			continue;
259 		}
260 
261 		avail_bytes = vfsbuf.f_frsize * vfsbuf.f_bavail;
262 		/*
263 		 * If we don't have enough space, we print out a warning.
264 		 * And we drop the verbosity level to NULL
265 		 * In case the condtion doesn't go away, we don't repeat
266 		 * the warning.
267 		 */
268 		if (avail_bytes < MIN_FS_SPACE) {
269 			if (warned) {
270 				continue;
271 			}
272 			commd_debug(MD_MMV_SYSLOG,
273 			    "NOT enough space available for logging\n");
274 			commd_debug(MD_MMV_SYSLOG,
275 			    "Have %lld bytes, need %lld bytes\n",
276 			    avail_bytes, MIN_FS_SPACE);
277 			warned = 1;
278 			md_commd_global_verb = MD_MMV_NULL;
279 		} else {
280 			warned = 0;
281 		}
282 
283 		(void) fflush(commdout);
284 	}
285 }
286 
287 /* safer version of clnt_destroy. If clnt is NULL don't do anything */
288 #define	mdmn_clnt_destroy(clnt) {	\
289 	if (clnt)			\
290 		clnt_destroy(clnt);	\
291 }
292 
293 /*
294  * Own version of svc_sendreply that checks the integrity of the transport
295  * handle and so prevents us from core dumps in the real svc_sendreply()
296  */
297 void
298 mdmn_svc_sendreply(SVCXPRT *transp, xdrproc_t xdr, caddr_t data)
299 {
300 	if (SVC_STAT(transp) == XPRT_DIED) {
301 		commd_debug(MD_MMV_MISC,
302 		    "mdmn_svc_sendreply: XPRT_DIED\n");
303 		return;
304 	}
305 	(void) svc_sendreply(transp, xdr, data);
306 }
307 
308 /*
309  * timeout_initiator(set, class)
310  *
311  * Alas, I sent a message and didn't get a response back in aproppriate time.
312  *
313  * timeout_initiator() takes care for doing the needed svc_sendreply() to the
314  * calling mdmn_send_message, so that guy doesn't wait forever
315  * What is done here is pretty much the same as what is done in
316  * wakeup initiator. The difference is that we cannot provide for any results,
317  * of course and we set the comm_state to MDMNE_TIMEOUT.
318  *
319  * By doing so, mdmn_send_message can decide if a retry would make sense or not.
320  * It's not our's to decide that here.
321  */
322 void
323 timeout_initiator(set_t setno, md_mn_msgclass_t class)
324 {
325 	SVCXPRT		*transp;
326 	md_mn_msgid_t	mid;
327 	md_mn_result_t *resultp;
328 
329 	resultp = Zalloc(sizeof (md_mn_result_t));
330 	resultp->mmr_comm_state	= MDMNE_TIMEOUT;
331 
332 	commd_debug(MD_MMV_MISC,
333 	    "timeout_initiator set = %d, class = %d\n", setno, class);
334 
335 	transp = mdmn_get_initiator_table_transp(setno, class);
336 	mdmn_get_initiator_table_id(setno, class, &mid);
337 
338 	commd_debug(MD_MMV_MISC, "timeout_ini: (%d, 0x%llx-%d)\n",
339 	    MSGID_ELEMS(mid));
340 	/*
341 	 * Give the result the corresponding msgid from the failed message.
342 	 */
343 	MSGID_COPY(&mid, &(resultp->mmr_msgid));
344 
345 	/* return to mdmn_send_message() and let it deal with the situation */
346 	mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
347 
348 	free(resultp);
349 	commd_debug(MD_MMV_MISC, "timeout_ini: sendreplied\n");
350 	svc_done(transp);
351 	mdmn_unregister_initiator_table(setno, class);
352 }
353 
354 
355 /*
356  * check_timeouts - thread
357  *
358  * This implements a timeout surveillance for messages sent from the
359  * initiator to the master.
360  *
361  * If a message is started, this thread is triggered thru
362  * cond_signal(&check_timeout_cv) and we keep track of the numbers of
363  * messages that are outstanding (messages_on_their_way).
364  *
365  * As long as there are messages on their way, this thread never goes to sleep.
366  * It'll keep checking all class/set combinations for outstanding messages.
367  * If one is found, it's checked if this message is overdue. In that case,
368  * timeout_initiator() is called to wakeup the calling mdmn_send_message and
369  * to clean up the mess.
370  *
371  * If the result from the master arrives later, this message is considered
372  * to be unsolicited. And will be ignored.
373  */
374 
375 void
376 check_timeouts()
377 {
378 	set_t			setno;
379 	time_t			now, then;
380 	mutex_t			*mx;
381 	md_mn_msgclass_t	class;
382 
383 	for (; ; ) {
384 		now = time((time_t *)NULL);
385 		for (setno = 1; setno < MD_MAXSETS; setno++) {
386 			if (md_mn_set_inited[setno] != MDMN_SET_READY) {
387 				continue;
388 			}
389 			for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES;
390 			    class++) {
391 				mx = mdmn_get_initiator_table_mx(setno, class);
392 				(void) mutex_lock(mx);
393 
394 				/* then is the registered time */
395 				then =
396 				    mdmn_get_initiator_table_time(setno, class);
397 				if ((then != 0) && (now > then)) {
398 					timeout_initiator(setno, class);
399 				}
400 				(void) mutex_unlock(mx);
401 			}
402 		}
403 		/* it's ok to check only once per second */
404 		(void) sleep(1);
405 
406 		/* is there work to do? */
407 		(void) mutex_lock(&check_timeout_mutex);
408 		if (messages_on_their_way == 0) {
409 			(void) cond_wait(&check_timeout_cv,
410 			    &check_timeout_mutex);
411 		}
412 		(void) mutex_unlock(&check_timeout_mutex);
413 	}
414 }
415 
416 void
417 setup_debug(void)
418 {
419 	char	*tmp_dir;
420 
421 	/* Read in the debug-controlling tokens from runtime.cf */
422 	md_commd_global_verb = commd_get_verbosity();
423 	/*
424 	 * If the user didn't specify a verbosity level in runtime.cf
425 	 * we can safely return here. As we don't intend to printout
426 	 * debug messages, we don't need to check for the output file.
427 	 */
428 	if (md_commd_global_verb == 0) {
429 		return;
430 	}
431 
432 	/* if commdout is non-NULL it is an open FILE, we'd better close it */
433 	if (commdout != (FILE *)NULL) {
434 		(void) fclose(commdout);
435 	}
436 
437 	commdoutfile = commd_get_outfile();
438 
439 	/* setup the debug output */
440 	if (commdoutfile == (char *)NULL) {
441 		/* if no valid file was specified, use the default */
442 		commdoutfile = "/var/run/commd.out";
443 		commdout = fopen(commdoutfile, "a");
444 	} else {
445 		/* check if the directory exists and is writable */
446 		tmp_dir = strdup(commdoutfile);
447 		if ((access(dirname(tmp_dir), X_OK|W_OK)) ||
448 		    ((commdout = fopen(commdoutfile, "a")) == (FILE *)NULL)) {
449 			syslog(LOG_ERR,
450 			    "Can't write to specified output file %s,\n"
451 			    "using /var/run/commd.out instead\n", commdoutfile);
452 			free(commdoutfile);
453 			commdoutfile = "/var/run/commd.out";
454 			commdout = fopen(commdoutfile, "a");
455 		}
456 		free(tmp_dir);
457 	}
458 
459 	if (commdout == (FILE *)NULL) {
460 		syslog(LOG_ERR, "Can't write to debug output file %s\n",
461 		    commdoutfile);
462 	}
463 }
464 
465 /*
466  * mdmn_is_node_dead checks to see if a node is dead using
467  * the SunCluster infrastructure which is a stable interface.
468  * If unable to contact SunCuster the node is assumed to be alive.
469  * Return values:
470  *	1 - node is dead
471  *	0 - node is alive
472  */
473 int
474 mdmn_is_node_dead(md_mnnode_desc *node)
475 {
476 	char	*fmt = "/usr/cluster/bin/scha_cluster_get -O NODESTATE_NODE ";
477 	char	*cmd;
478 	size_t	size;
479 	char	buf[10];
480 	FILE	*ptr;
481 	int	retval = 0;
482 
483 	/* I know that I'm alive */
484 	if (strcmp(node->nd_nodename, mynode()) == 0)
485 		return (retval);
486 
487 	size = strlen(fmt) + strlen(node->nd_nodename) + 1;
488 	cmd = Zalloc(size);
489 	(void) strlcat(cmd, fmt, size);
490 	(void) strlcat(cmd, node->nd_nodename, size);
491 
492 	if ((ptr = popen(cmd, "r")) != NULL) {
493 		if (fgets(buf, sizeof (buf), ptr) != NULL) {
494 			/* If scha_cluster_get returned DOWN - return dead */
495 			if (strncmp(buf, "DOWN", 4) == 0)
496 				retval = 1;
497 		}
498 		(void) pclose(ptr);
499 	}
500 	Free(cmd);
501 	return (retval);
502 }
503 
504 /*
505  * global_init()
506  *
507  * Perform some global initializations.
508  *
509  * the following routines have to call this before operation can start:
510  *  - mdmn_send_svc_2
511  *  - mdmn_work_svc_2
512  *  - mdmn_comm_lock_svc_2
513  *  - mdmn_comm_unlock_svc_2
514  *  - mdmn_comm_suspend_svc_2
515  *  - mdmn_comm_resume_svc_2
516  *  - mdmn_comm_reinit_set_svc_2
517  *
518  * This is a single threaded daemon, so it can only be in one of the above
519  * routines at the same time.
520  * This means, global_init() cannot be called more than once at the same time.
521  * Hence, no lock is needed.
522  */
523 void
524 global_init(void)
525 {
526 	set_t			set;
527 	md_mn_msgclass_t	class;
528 	struct sigaction	sighandler;
529 	time_t			clock_val;
530 	struct rlimit		commd_limit;
531 
532 
533 
534 	/* Do these global initializations only once */
535 	if (md_commd_global_state & MD_CGS_INITED) {
536 		return;
537 	}
538 	(void) sdssc_bind_library();
539 
540 	/* setup the debug options from the config file */
541 	setup_debug();
542 
543 	/* make sure that we don't run out of file descriptors */
544 	commd_limit.rlim_cur = commd_limit.rlim_max = RLIM_INFINITY;
545 	if (setrlimit(RLIMIT_NOFILE, &commd_limit) != 0) {
546 		syslog(LOG_WARNING, gettext("setrlimit failed."
547 		    "Could not increase the max file descriptors"));
548 	}
549 
550 	/* Make setup_debug() be the action in case of SIGHUP */
551 	sighandler.sa_flags = 0;
552 	(void) sigfillset(&sighandler.sa_mask);
553 	sighandler.sa_handler = (void (*)(int)) setup_debug;
554 	sigaction(SIGHUP, &sighandler, NULL);
555 
556 	__savetime = gethrtime();
557 	(void) time(&clock_val);
558 	commd_debug(MD_MMV_MISC, "global init called %s\n", ctime(&clock_val));
559 
560 	/* start a thread that flushes out the debug on a regular basis */
561 	(void) thr_create(NULL, 0, (void *(*)(void *))flush_fcout,
562 	    (void *) NULL, THR_DETACHED, NULL);
563 
564 	/* global rwlock's / mutex's / cond_t's go here */
565 	(void) mutex_init(&check_timeout_mutex, USYNC_THREAD, NULL);
566 	(void) cond_init(&check_timeout_cv, USYNC_THREAD, NULL);
567 	(void) mutex_init(&get_setdesc_mutex, USYNC_THREAD, NULL);
568 
569 	/* Make sure the initiator table is initialized correctly */
570 	for (set = 0; set < MD_MAXSETS; set++) {
571 		for (class = 0; class < MD_MN_NCLASSES; class++) {
572 			mdmn_unregister_initiator_table(set, class);
573 		}
574 	}
575 
576 
577 	/* setup the check for timeouts */
578 	(void) thr_create(NULL, 0, (void *(*)(void *))check_timeouts,
579 	    (void *) NULL, THR_DETACHED, NULL);
580 
581 	md_commd_global_state |= MD_CGS_INITED;
582 }
583 
584 
585 /*
586  * mdmn_init_client(setno, nodeid)
587  * called if client[setno][nodeid] is NULL
588  *
589  * NOTE: Must be called with set_desc_rwlock held as a reader
590  * NOTE: Must be called with client_rwlock held as a writer
591  *
592  * If the rpc client for this node has not been setup for any set, we do it now.
593  *
594  * Returns	0 on success (node found in set, rpc client setup)
595  *		-1 if metaget_setdesc failed,
596  *		-2 if node not part of set
597  *		-3 if clnt_create fails
598  */
599 static int
600 mdmn_init_client(set_t setno, md_mn_nodeid_t nid)
601 {
602 	md_error_t	ep = mdnullerror;
603 	md_mnnode_desc	*node;
604 	md_set_desc	*sd;	/* just an abbr for set_descriptor[setno] */
605 
606 	sd = set_descriptor[setno];
607 
608 	/*
609 	 * Is the appropriate set_descriptor already initialized ?
610 	 * Can't think of a scenario where this is not the case, but we'd better
611 	 * check for it anyway.
612 	 */
613 	if (sd == NULL) {
614 		mdsetname_t	*sp;
615 
616 		/* readlock -> writelock */
617 		(void) rw_unlock(&set_desc_rwlock[setno]);
618 		(void) rw_wrlock(&set_desc_rwlock[setno]);
619 		sp = metasetnosetname(setno, &ep);
620 		/* Only one thread is supposed to be in metaget_setdesc() */
621 		(void) mutex_lock(&get_setdesc_mutex);
622 		sd = metaget_setdesc(sp, &ep);
623 		(void) mutex_unlock(&get_setdesc_mutex);
624 		if (sd == NULL) {
625 			/* back to ... */
626 			(void) rw_unlock(&set_desc_rwlock[setno]);
627 			/* ... readlock */
628 			(void) rw_rdlock(&set_desc_rwlock[setno]);
629 			return (-1);
630 		}
631 		set_descriptor[setno] = sd;
632 		/* back to readlock */
633 		(void) rw_unlock(&set_desc_rwlock[setno]);
634 		(void) rw_rdlock(&set_desc_rwlock[setno]);
635 	}
636 
637 	/* first we have to find the node name for this node id */
638 	for (node = sd->sd_nodelist; node; node = node->nd_next) {
639 		if (node->nd_nodeid == nid)
640 			break; /* we found our node in this set */
641 	}
642 
643 
644 	if (node == (md_mnnode_desc *)NULL) {
645 		commd_debug(MD_MMV_SYSLOG,
646 		    "FATAL: node %d not found in set %d\n", nid, setno);
647 		(void) rw_unlock(&set_desc_rwlock[setno]);
648 		return (-2);
649 	}
650 
651 	commd_debug(MD_MMV_INIT, "init: %s has the flags: 0x%x\n",
652 	    node->nd_nodename ? node->nd_nodename : "NULL", node->nd_flags);
653 
654 	/* Did this node join the diskset?  */
655 	if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
656 		commd_debug(MD_MMV_INIT, "init: %s didn't join set %d\n",
657 		    node->nd_nodename ? node->nd_nodename : "NULL", setno);
658 		(void) rw_unlock(&set_desc_rwlock[setno]);
659 		return (-2);
660 	}
661 
662 	/* if clnt_create has not been done for that node, do it now */
663 	if (client[setno][nid] == (CLIENT *) NULL) {
664 		time_t	tout = 0;
665 
666 		/*
667 		 * While trying to create a connection to a node,
668 		 * periodically check to see if the node has been marked
669 		 * dead by the SunCluster infrastructure.
670 		 * This periodic check is needed since a non-responsive
671 		 * rpc.mdcommd (while it is attempting to create a connection
672 		 * to a dead node) can lead to large delays and/or failures
673 		 * in the reconfig steps.
674 		 */
675 		while ((client[setno][nid] == (CLIENT *) NULL) &&
676 		    (tout < MD_CLNT_CREATE_TOUT)) {
677 			client[setno][nid] = meta_client_create_retry(
678 			    node->nd_nodename, mdmn_clnt_create,
679 			    (void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
680 			/* Is the node dead? */
681 			if (mdmn_is_node_dead(node) == 1) {
682 				commd_debug(MD_MMV_SYSLOG,
683 				    "rpc.mdcommd: no client for dead node %s\n",
684 				    node->nd_nodename);
685 				break;
686 			} else
687 				tout += MD_CLNT_CREATE_SUBTIMEOUT;
688 		}
689 
690 		if (client[setno][nid] == (CLIENT *) NULL) {
691 			clnt_pcreateerror(node->nd_nodename);
692 			(void) rw_unlock(&set_desc_rwlock[setno]);
693 			return (-3);
694 		}
695 		/* this node has the license to send */
696 		commd_debug(MD_MMV_MISC, "init_client: calling add_lic\n");
697 		add_license(node);
698 
699 		/* set the timeout value */
700 		clnt_control(client[setno][nid], CLSET_TIMEOUT,
701 		    (char *)&FOUR_SECS);
702 
703 	}
704 	(void) rw_unlock(&set_desc_rwlock[setno]);
705 	return (0);
706 }
707 
708 /*
709  * check_client(setno, nodeid)
710  *
711  * must be called with reader lock held for set_desc_rwlock[setno]
712  * and must be called with reader lock held for client_rwlock[setno]
713  * Checks if the client for this set/node combination is already setup
714  * if not it upgrades the lock to a writer lock
715  * and tries to initialize the client.
716  * Finally it's checked if the client nulled out again due to some race
717  *
718  * returns 0 if there is a usable client
719  * returns MDMNE_RPC_FAIL otherwise
720  */
721 static int
722 check_client(set_t setno, md_mn_nodeid_t nodeid)
723 {
724 	int ret = 0;
725 
726 	while ((client[setno][nodeid] == (CLIENT *)NULL) && (ret == 0)) {
727 		/* upgrade reader ... */
728 		(void) rw_unlock(&client_rwlock[setno]);
729 		/* ... to writer lock. */
730 		(void) rw_wrlock(&client_rwlock[setno]);
731 		if (mdmn_init_client(setno, nodeid) != 0) {
732 			ret = MDMNE_RPC_FAIL;
733 		}
734 		/* downgrade writer ... */
735 		(void) rw_unlock(&client_rwlock[setno]);
736 		/* ... back to reader lock. */
737 		(void) rw_rdlock(&client_rwlock[setno]);
738 	}
739 	return (ret);
740 }
741 
742 /*
743  * mdmn_init_set(setno, todo)
744  * setno is the number of the set to be initialized.
745  * todo is one of the MDMN_SET_* thingies or MDMN_SET_READY
746  * If called with MDMN_SET_READY everything is initialized.
747  *
748  * If the set mutexes are already initialized, the caller has to hold
749  * both set_desc_rwlock[setno] and client_rwlock[setno] as a writer, before
750  * calling mdmn_init_set()
751  */
752 int
753 mdmn_init_set(set_t setno, int todo)
754 {
755 	int class;
756 	md_mnnode_desc	*node;
757 	md_set_desc	*sd; /* just an abbr for set_descriptor[setno] */
758 	mdsetname_t	*sp;
759 	md_error_t	ep = mdnullerror;
760 	md_mn_nodeid_t	nid;
761 
762 	/*
763 	 * Check if we are told to setup the mutexes and
764 	 * if these are not yet setup
765 	 */
766 	if ((todo & MDMN_SET_MUTEXES) &&
767 	    ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0)) {
768 		(void) mutex_init(&mdmn_busy_mutex[setno], USYNC_THREAD, NULL);
769 		(void) cond_init(&mdmn_busy_cv[setno], USYNC_THREAD, NULL);
770 		(void) rwlock_init(&client_rwlock[setno], USYNC_THREAD, NULL);
771 		(void) rwlock_init(&set_desc_rwlock[setno], USYNC_THREAD, NULL);
772 
773 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
774 			(void) mutex_init(mdmn_get_master_table_mx(setno,
775 			    class), USYNC_THREAD, NULL);
776 			(void) cond_init(mdmn_get_master_table_cv(setno, class),
777 			    USYNC_THREAD, NULL);
778 			(void) mutex_init(mdmn_get_initiator_table_mx(setno,
779 			    class), USYNC_THREAD, NULL);
780 		}
781 		md_mn_set_inited[setno] |= MDMN_SET_MUTEXES;
782 	}
783 	if ((todo & MDMN_SET_MCT) &&
784 	    ((md_mn_set_inited[setno] & MDMN_SET_MCT) == 0)) {
785 		int	fd;
786 		size_t	filesize;
787 		caddr_t	addr;
788 		char table_name[32];
789 
790 		filesize = (sizeof (md_mn_mct_t));
791 		(void) snprintf(table_name, sizeof (table_name), "%s%d",
792 		    MD_MN_MSG_COMP_TABLE, setno);
793 		/*
794 		 * If the mct file exists we map it into memory.
795 		 * Otherwise we create an empty file of appropriate
796 		 * size and map that into memory.
797 		 * The mapped areas are stored in mct[setno].
798 		 */
799 		fd = open(table_name, O_RDWR|O_CREAT|O_DSYNC, 0600);
800 		if (fd < 0) {
801 			commd_debug(MD_MMV_MISC,
802 			    "init_set: Can't open MCT\n");
803 			return (-1);
804 		}
805 		/*
806 		 * To ensure that the file has the appropriate size,
807 		 * we write a byte at the end of the file.
808 		 */
809 		(void) lseek(fd, filesize + 1, SEEK_SET);
810 		(void) write(fd, "\0", 1);
811 
812 		/* at this point we have a file in place that we can mmap */
813 		addr = mmap(0, filesize, PROT_READ | PROT_WRITE,
814 		    MAP_SHARED, fd, (off_t)0);
815 		if (addr == MAP_FAILED) {
816 			commd_debug(MD_MMV_INIT,
817 			    "init_set: mmap mct error %d\n",
818 			    errno);
819 			return (-1);
820 		}
821 		/* LINTED pointer alignment */
822 		mct[setno] = (md_mn_mct_t *)addr;
823 
824 		/* finally we initialize the mutexes that protect the mct */
825 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
826 			(void) mutex_init(&(mct_mutex[setno][class]),
827 			    USYNC_THREAD, NULL);
828 		}
829 
830 		md_mn_set_inited[setno] |= MDMN_SET_MCT;
831 	}
832 	/*
833 	 * Check if we are told to setup the nodes and
834 	 * if these are not yet setup
835 	 * (Attention: negative logic here compared to above!)
836 	 */
837 	if (((todo & MDMN_SET_NODES) == 0) ||
838 	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
839 		return (0); /* success */
840 	}
841 
842 	if ((sp = metasetnosetname(setno, &ep)) == NULL) {
843 		commd_debug(MD_MMV_SYSLOG,
844 		    "metasetnosetname(%d) returned NULL\n", setno);
845 		return (MDMNE_NOT_JOINED);
846 	}
847 
848 	/* flush local copy of rpc.metad data */
849 	metaflushsetname(sp);
850 
851 	(void) mutex_lock(&get_setdesc_mutex);
852 	sd = metaget_setdesc(sp, &ep);
853 	(void) mutex_unlock(&get_setdesc_mutex);
854 
855 	if (sd == NULL) {
856 		commd_debug(MD_MMV_SYSLOG,
857 		    "metaget_setdesc(%d) returned NULL\n", setno);
858 		return (MDMNE_NOT_JOINED);
859 	}
860 
861 	/*
862 	 * if this set is not a multinode set or
863 	 * this node didn't join yet the diskset, better don't do anything
864 	 */
865 	if ((MD_MNSET_DESC(sd) == 0) ||
866 	    (sd->sd_mn_mynode->nd_flags & MD_MN_NODE_OWN) == 0) {
867 		commd_debug(MD_MMV_INIT, "didn't yet join set %d\n", setno);
868 		return (MDMNE_NOT_JOINED);
869 	}
870 
871 	for (node = sd->sd_nodelist; node != NULL; node = node->nd_next) {
872 		time_t	tout = 0;
873 		nid = node->nd_nodeid;
874 
875 		commd_debug(MD_MMV_INIT,
876 		    "setting up: node=%s, priv_ic=%s, flags=0x%x\n",
877 		    node->nd_nodename ? node->nd_nodename : "NULL",
878 		    node->nd_priv_ic ? node->nd_priv_ic : "NULL",
879 		    node->nd_flags);
880 
881 		if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
882 			commd_debug(MD_MMV_INIT,
883 			    "init: %s didn't join set %d\n",
884 			    node->nd_nodename ? node->nd_nodename : "NULL",
885 			    setno);
886 			continue;
887 		}
888 
889 		if (client[setno][nid] != (CLIENT *) NULL) {
890 			/* already inited */
891 			commd_debug(MD_MMV_INIT, "init: already: node=%s\n",
892 			    node->nd_nodename ? node->nd_nodename : "NULL");
893 			continue;
894 		}
895 
896 		/*
897 		 * While trying to create a connection to a node,
898 		 * periodically check to see if the node has been marked
899 		 * dead by the SunCluster infrastructure.
900 		 * This periodic check is needed since a non-responsive
901 		 * rpc.mdcommd (while it is attempting to create a connection
902 		 * to a dead node) can lead to large delays and/or failures
903 		 * in the reconfig steps.
904 		 */
905 		while ((client[setno][nid] == (CLIENT *) NULL) &&
906 		    (tout < MD_CLNT_CREATE_TOUT)) {
907 			client[setno][nid] = meta_client_create_retry(
908 			    node->nd_nodename, mdmn_clnt_create,
909 			    (void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
910 			/* Is the node dead? */
911 			if (mdmn_is_node_dead(node) == 1) {
912 				commd_debug(MD_MMV_SYSLOG,
913 				    "rpc.mdcommd: no client for dead node %s\n",
914 				    node->nd_nodename);
915 				break;
916 			} else
917 				tout += MD_CLNT_CREATE_SUBTIMEOUT;
918 		}
919 
920 		if (client[setno][nid] == (CLIENT *) NULL) {
921 			clnt_pcreateerror(node->nd_nodename);
922 			/*
923 			 * If we cannot connect to a single node
924 			 * (maybe because it is down) we mark this node as not
925 			 * owned and continue with the next node in the list.
926 			 * This is better than failing the entire starting up
927 			 * of the commd system.
928 			 */
929 			node->nd_flags &= ~MD_MN_NODE_OWN;
930 			commd_debug(MD_MMV_SYSLOG,
931 			    "WARNING couldn't create client for %s\n"
932 			    "Reconfig cycle required\n",
933 			    node->nd_nodename);
934 			commd_debug(MD_MMV_INIT,
935 			    "WARNING couldn't create client for %s\n"
936 			    "Reconfig cycle required\n",
937 			    node->nd_nodename);
938 			continue;
939 		}
940 		/* this node has the license to send */
941 		commd_debug(MD_MMV_MISC, "init_set: calling add_lic\n");
942 		add_license(node);
943 
944 		/* set the timeout value */
945 		clnt_control(client[setno][nid], CLSET_TIMEOUT,
946 		    (char *)&FOUR_SECS);
947 
948 		commd_debug(MD_MMV_INIT, "init: done: node=%s\n",
949 		    node->nd_nodename ? node->nd_nodename : "NULL");
950 	}
951 
952 	set_descriptor[setno] = sd;
953 	md_mn_set_inited[setno] |= MDMN_SET_NODES;
954 	return (0); /* success */
955 }
956 
957 void *
958 mdmn_send_to_work(void *arg)
959 {
960 	int			*rpc_err = NULL;
961 	int			success;
962 	int			try_master;
963 	set_t			setno;
964 	mutex_t			*mx;	/* protection for initiator_table */
965 	SVCXPRT			*transp;
966 	md_mn_msg_t		*msg;
967 	md_mn_nodeid_t		set_master;
968 	md_mn_msgclass_t	class;
969 	md_mn_msg_and_transp_t	*matp = (md_mn_msg_and_transp_t *)arg;
970 
971 	msg			= matp->mat_msg;
972 	transp			= matp->mat_transp;
973 
974 	class = mdmn_get_message_class(msg->msg_type);
975 	setno = msg->msg_setno;
976 
977 	/* set the sender, so the master knows who to send the results */
978 	(void) rw_rdlock(&set_desc_rwlock[setno]);
979 	msg->msg_sender = set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
980 	set_master	= set_descriptor[setno]->sd_mn_master_nodeid;
981 
982 	mx = mdmn_get_initiator_table_mx(setno, class);
983 	(void) mutex_lock(mx);
984 
985 	/*
986 	 * Here we check, if the initiator table slot for this set/class
987 	 * combination is free to use.
988 	 * If this is not the case, we return CLASS_BUSY forcing the
989 	 * initiating send_message call to retry
990 	 */
991 	success = mdmn_check_initiator_table(setno, class);
992 	if (success == MDMNE_CLASS_BUSY) {
993 		md_mn_msgid_t		active_mid;
994 
995 		mdmn_get_initiator_table_id(setno, class, &active_mid);
996 
997 		commd_debug(MD_MMV_SEND,
998 		    "send_to_work: received but locally busy "
999 		    "(%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
1000 		    "active msg=(%d, 0x%llx-%d)\n",
1001 		    MSGID_ELEMS(msg->msg_msgid), setno, class,
1002 		    msg->msg_type, MSGID_ELEMS(active_mid));
1003 	} else {
1004 		commd_debug(MD_MMV_SEND,
1005 		    "send_to_work: received (%d, 0x%llx-%d), "
1006 		    "set=%d, class=%d, type=%d\n",
1007 		    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
1008 	}
1009 
1010 	try_master = 2; /* return failure after two retries */
1011 	while ((success == MDMNE_ACK) && (try_master--)) {
1012 		(void) rw_rdlock(&client_rwlock[setno]);
1013 		/* is the rpc client to the master still around ? */
1014 		if (check_client(setno, set_master)) {
1015 			success = MDMNE_RPC_FAIL;
1016 			FLUSH_DEBUGFILE();
1017 			(void) rw_unlock(&client_rwlock[setno]);
1018 			break; /* out of try_master-loop */
1019 		}
1020 
1021 		/*
1022 		 * Send the request to the work function on the master
1023 		 * this call will return immediately
1024 		 */
1025 		rpc_err = mdmn_work_2(msg, client[setno][set_master],
1026 		    set_master);
1027 
1028 		/* Everything's Ok? */
1029 		if (rpc_err == NULL) {
1030 			success = MDMNE_RPC_FAIL;
1031 			/*
1032 			 * Probably something happened to the daemon on the
1033 			 * master. Kill the client, and try again...
1034 			 */
1035 			(void) rw_unlock(&client_rwlock[setno]);
1036 			(void) rw_wrlock(&client_rwlock[setno]);
1037 			mdmn_clnt_destroy(client[setno][set_master]);
1038 			if (client[setno][set_master] != (CLIENT *)NULL) {
1039 				client[setno][set_master] = (CLIENT *)NULL;
1040 			}
1041 			(void) rw_unlock(&client_rwlock[setno]);
1042 			continue;
1043 
1044 		} else  if (*rpc_err != MDMNE_ACK) {
1045 			/* something went wrong, break out */
1046 			success = *rpc_err;
1047 			free(rpc_err);
1048 			(void) rw_unlock(&client_rwlock[setno]);
1049 			break; /* out of try_master-loop */
1050 		}
1051 
1052 		(void) rw_unlock(&client_rwlock[setno]);
1053 		free(rpc_err);
1054 
1055 		/*
1056 		 * If we are here, we sucessfully delivered the message.
1057 		 * We register the initiator_table, so that
1058 		 * wakeup_initiator_2 can do the sendreply with the
1059 		 * results for us.
1060 		 */
1061 		success = MDMNE_ACK;
1062 		mdmn_register_initiator_table(setno, class, msg, transp);
1063 
1064 		/* tell check_timeouts, there's work to do */
1065 		(void) mutex_lock(&check_timeout_mutex);
1066 		messages_on_their_way++;
1067 		(void) cond_signal(&check_timeout_cv);
1068 		(void) mutex_unlock(&check_timeout_mutex);
1069 		break; /* out of try_master-loop */
1070 	}
1071 
1072 	(void) rw_unlock(&set_desc_rwlock[setno]);
1073 
1074 	if (success == MDMNE_ACK) {
1075 		commd_debug(MD_MMV_SEND,
1076 		    "send_to_work: registered (%d, 0x%llx-%d)\n",
1077 		    MSGID_ELEMS(msg->msg_msgid));
1078 	} else {
1079 		/* In case of failure do the sendreply now */
1080 		md_mn_result_t *resultp;
1081 		resultp = Zalloc(sizeof (md_mn_result_t));
1082 		resultp->mmr_comm_state = success;
1083 		/*
1084 		 * copy the MSGID so that we know _which_ message
1085 		 * failed (if the transp has got mangled)
1086 		 */
1087 		MSGID_COPY(&(msg->msg_msgid), &(resultp->mmr_msgid));
1088 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
1089 		commd_debug(MD_MMV_SEND,
1090 		    "send_to_work: not registered (%d, 0x%llx-%d) cs=%d\n",
1091 		    MSGID_ELEMS(msg->msg_msgid), success);
1092 		free_result(resultp);
1093 		/*
1094 		 * We don't have a timeout registered to wake us up, so we're
1095 		 * now done with this handle. Release it back to the pool.
1096 		 */
1097 		svc_done(transp);
1098 
1099 	}
1100 
1101 	free_msg(msg);
1102 	/* the alloc was done in mdmn_send_svc_2 */
1103 	Free(matp);
1104 	(void) mutex_unlock(mx);
1105 	return (NULL);
1106 
1107 }
1108 
1109 /*
1110  * do_message_locally(msg, result)
1111  * Process a message locally on the master
1112  * Lookup the MCT if the message has already been processed.
1113  * If not, call the handler and store the result
1114  * If yes, retrieve the result from the MCT.
1115  * Return:
1116  *	MDMNE_ACK in case of success
1117  *	MDMNE_LOG_FAIL if the MCT could not be checked
1118  */
1119 static int
1120 do_message_locally(md_mn_msg_t *msg, md_mn_result_t *result)
1121 {
1122 	int			completed;
1123 	set_t			setno;
1124 	md_mn_msgtype_t		msgtype = msg->msg_type;
1125 	md_mn_msgclass_t	class;
1126 
1127 	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1128 
1129 	handler = mdmn_get_handler(msgtype);
1130 	if (handler == NULL) {
1131 		result->mmr_exitval = 0;
1132 		/* let the sender decide if this is an error or not */
1133 		result->mmr_comm_state = MDMNE_NO_HANDLER;
1134 		return (MDMNE_NO_HANDLER);
1135 	}
1136 
1137 	class = mdmn_get_message_class(msg->msg_type);
1138 	setno = msg->msg_setno;
1139 
1140 	result->mmr_msgtype	= msgtype;
1141 	result->mmr_flags	= msg->msg_flags;
1142 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1143 
1144 	(void) mutex_lock(&mct_mutex[setno][class]);
1145 	completed = mdmn_check_completion(msg, result);
1146 	if (completed == MDMN_MCT_NOT_DONE) {
1147 		/* message not yet processed locally */
1148 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1149 		    "calling handler for (%d,0x%llx-%d) type %d\n",
1150 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1151 
1152 		/*
1153 		 * Mark the message as being currently processed,
1154 		 * so we won't start a second handler for it
1155 		 */
1156 		(void) mdmn_mark_completion(msg, NULL, MDMN_MCT_IN_PROGRESS);
1157 		(void) mutex_unlock(&mct_mutex[setno][class]);
1158 
1159 		/* here we actually process the message on the master */
1160 		(*handler)(msg, MD_MSGF_ON_MASTER, result);
1161 
1162 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1163 		    "finished handler for (%d,0x%llx-%d) type %d\n",
1164 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1165 
1166 		/* Mark the message as fully processed, store the result */
1167 		(void) mutex_lock(&mct_mutex[setno][class]);
1168 		(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
1169 	} else if (completed == MDMN_MCT_DONE) {
1170 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1171 		    "result for (%d, 0x%llx-%d) from MCT\n",
1172 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1173 	} else if (completed == MDMN_MCT_IN_PROGRESS) {
1174 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1175 		    "(%d, 0x%llx-%d) is currently being processed\n",
1176 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1177 	} else {
1178 		/* MCT error occurred (should never happen) */
1179 		(void) mutex_unlock(&mct_mutex[setno][class]);
1180 		result->mmr_comm_state = MDMNE_LOG_FAIL;
1181 		commd_debug(MD_MMV_SYSLOG, "WARNING "
1182 		    "mdmn_check_completion returned %d "
1183 		    "for (%d,0x%llx-%d)\n", completed,
1184 		    MSGID_ELEMS(msg->msg_msgid));
1185 		return (MDMNE_LOG_FAIL);
1186 	}
1187 	(void) mutex_unlock(&mct_mutex[setno][class]);
1188 	return (MDMNE_ACK);
1189 
1190 }
1191 
1192 /*
1193  * do_send_message(msg, node)
1194  *
1195  * Send a message to a given node and wait for a acknowledgment, that the
1196  * message has arrived on the remote node.
1197  * Make sure that the client for the set is setup correctly.
1198  * If no ACK arrives, destroy and recreate the RPC client and retry the
1199  * message one time
1200  * After actually sending wait no longer than the appropriate number of
1201  * before timing out the message.
1202  *
1203  * Note must be called with set_desc_wrlock held in reader mode
1204  */
1205 static int
1206 do_send_message(md_mn_msg_t *msg, md_mnnode_desc *node)
1207 {
1208 	int			err;
1209 	int			rpc_retries;
1210 	int			timeout_retries = 0;
1211 	int			*ret = NULL;
1212 	set_t			setno;
1213 	cond_t			*cv;	/* see mdmn_wakeup_master_svc_2 */
1214 	mutex_t			*mx;	/* protection for class_busy */
1215 	timestruc_t		timeout; /* surveillance for remote daemon */
1216 	md_mn_nodeid_t		nid;
1217 	md_mn_msgtype_t		msgtype;
1218 	md_mn_msgclass_t	class;
1219 
1220 	nid	= node->nd_nodeid;
1221 	msgtype = msg->msg_type;
1222 	setno	= msg->msg_setno;
1223 	class	= mdmn_get_message_class(msgtype);
1224 	mx	= mdmn_get_master_table_mx(setno, class);
1225 	cv	= mdmn_get_master_table_cv(setno, class);
1226 
1227 retry_rpc:
1228 
1229 	/* We try two times to send the message */
1230 	rpc_retries = 2;
1231 
1232 	/*
1233 	 * if sending the message doesn't succeed the first time due to a
1234 	 * RPC problem, we retry one time
1235 	 */
1236 	while ((rpc_retries != 0) && (ret == NULL)) {
1237 		/*  in abort state, we error out immediately */
1238 		if (md_commd_global_state & MD_CGS_ABORTED) {
1239 			return (MDMNE_ABORT);
1240 		}
1241 
1242 		(void) rw_rdlock(&client_rwlock[setno]);
1243 		/* unable to create client? Ignore it */
1244 		if (check_client(setno, nid)) {
1245 			/*
1246 			 * In case we cannot establish an RPC client, we
1247 			 * take this node out of our considerations.
1248 			 * This will be reset by a reconfig
1249 			 * cycle that should come pretty soon.
1250 			 * MNISSUE: Should a reconfig cycle
1251 			 * be forced on SunCluster?
1252 			 */
1253 			node->nd_flags &= ~MD_MN_NODE_OWN;
1254 			commd_debug(MD_MMV_SYSLOG,
1255 			    "WARNING couldn't create client for %s\n"
1256 			    "Reconfig cycle required\n",
1257 			    node->nd_nodename);
1258 			commd_debug(MD_MMV_PROC_M, "proc_mas: (%d,0x%llx-%d) "
1259 			    "WARNING couldn't create client for %s\n",
1260 			    MSGID_ELEMS(msg->msg_msgid), node->nd_nodename);
1261 			(void) rw_unlock(&client_rwlock[setno]);
1262 			return (MDMNE_IGNORE_NODE);
1263 		}
1264 		/* let's be paranoid and check again before sending */
1265 		if (client[setno][nid] == NULL) {
1266 			/*
1267 			 * if this is true, strange enough, we catch our breath,
1268 			 * and then continue, so that the client is set up
1269 			 * once again.
1270 			 */
1271 			commd_debug(MD_MMV_PROC_M, "client is NULL\n");
1272 			(void) rw_unlock(&client_rwlock[setno]);
1273 			(void) sleep(1);
1274 			continue;
1275 		}
1276 
1277 		/* send it over, it will return immediately */
1278 		ret = mdmn_work_2(msg, client[setno][nid], nid);
1279 
1280 		(void) rw_unlock(&client_rwlock[setno]);
1281 
1282 		if (ret != NULL) {
1283 			commd_debug(MD_MMV_PROC_M,
1284 			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1285 			    " 0x%x\n",
1286 			    MSGID_ELEMS(msg->msg_msgid), nid, *ret);
1287 		} else {
1288 			commd_debug(MD_MMV_PROC_M,
1289 			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1290 			    " NULL \n",
1291 			    MSGID_ELEMS(msg->msg_msgid), nid);
1292 		}
1293 
1294 		if ((ret == NULL) || (*ret == MDMNE_CANNOT_CONNECT) ||
1295 		    (*ret == MDMNE_THR_CREATE_FAIL)) {
1296 			/*
1297 			 * Something happened to the daemon on the other side.
1298 			 * Kill the client, and try again.
1299 			 * check_client() will create a new client
1300 			 */
1301 			(void) rw_wrlock(&client_rwlock[setno]);
1302 			mdmn_clnt_destroy(client[setno][nid]);
1303 			if (client[setno][nid] != (CLIENT *)NULL) {
1304 				client[setno][nid] = (CLIENT *)NULL;
1305 			}
1306 			(void) rw_unlock(&client_rwlock[setno]);
1307 
1308 			/* ... but don't try infinitely */
1309 			--rpc_retries;
1310 			continue;
1311 		}
1312 		/*
1313 		 * If the class is locked on the other node, keep trying.
1314 		 * This situation will go away automatically,
1315 		 * if we wait long enough
1316 		 */
1317 		if (*ret == MDMNE_CLASS_LOCKED) {
1318 			(void) sleep(1);
1319 			free(ret);
1320 			ret = NULL;
1321 			continue;
1322 		}
1323 	}
1324 	if (ret == NULL) {
1325 		return (MDMNE_RPC_FAIL);
1326 	}
1327 
1328 
1329 	/* if the slave is in abort state, we just ignore it. */
1330 	if (*ret == MDMNE_ABORT) {
1331 		commd_debug(MD_MMV_PROC_M,
1332 		    "proc_mas: work(%d,0x%llx-%d) returned "
1333 		    "MDMNE_ABORT\n",
1334 		    MSGID_ELEMS(msg->msg_msgid));
1335 		free(ret);
1336 		return (MDMNE_IGNORE_NODE);
1337 	}
1338 
1339 	/* Did the remote processing succeed? */
1340 	if (*ret != MDMNE_ACK) {
1341 		/*
1342 		 * Some commd failure in the middle of sending the msg
1343 		 * to the nodes. We don't continue here.
1344 		 */
1345 		commd_debug(MD_MMV_PROC_M,
1346 		    "proc_mas: work(%d,0x%llx-%d) returns %d\n",
1347 		    MSGID_ELEMS(msg->msg_msgid), *ret);
1348 		free(ret);
1349 		return (MDMNE_RPC_FAIL);
1350 	}
1351 	free(ret);
1352 	ret = NULL;
1353 
1354 	/*
1355 	 * When we are here, we have sent the message to the other node and
1356 	 * we know that node has accepted it.
1357 	 * We go to sleep and have trust to be woken up by wakeup.
1358 	 * If we wakeup due to a timeout, or a signal, no result has been
1359 	 * placed in the appropriate slot.
1360 	 * If we timeout, it is likely that this is because the node has
1361 	 * gone away, so we will destroy the client and try it again in the
1362 	 * expectation that the rpc will fail and we will return
1363 	 * MDMNE_IGNORE_NODE. If that is not the case, the message must still
1364 	 * be being processed on the slave. In this case just timeout for 4
1365 	 * more seconds and then return RPC_FAIL if the message is not complete.
1366 	 */
1367 	timeout.tv_nsec = 0;
1368 	timeout.tv_sec = (timeout_retries == 0) ? mdmn_get_timeout(msgtype) :
1369 	    FOUR_SECS.tv_sec;
1370 	err = cond_reltimedwait(cv, mx, &timeout);
1371 
1372 	if (err == 0) {
1373 		/* everything's fine, return success */
1374 		return (MDMNE_ACK);
1375 	}
1376 
1377 	if (err == ETIME) {
1378 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1379 		    "timeout occured, set=%d, class=%d, "
1380 		    "msgid=(%d, 0x%llx-%d), timeout_retries=%d\n",
1381 		    setno, class, MSGID_ELEMS(msg->msg_msgid), timeout_retries);
1382 		if (timeout_retries == 0) {
1383 			timeout_retries++;
1384 			/*
1385 			 * Destroy the client and try the rpc call again
1386 			 */
1387 			(void) rw_wrlock(&client_rwlock[setno]);
1388 			mdmn_clnt_destroy(client[setno][nid]);
1389 			client[setno][nid] = (CLIENT *)NULL;
1390 			(void) rw_unlock(&client_rwlock[setno]);
1391 			goto retry_rpc;
1392 		}
1393 	} else if (err == EINTR) {
1394 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1395 		    "commd signalled, set=%d, class=%d, "
1396 		    "msgid=(%d, 0x%llx-%d)\n",
1397 		    setno, class, MSGID_ELEMS(msg->msg_msgid));
1398 	} else {
1399 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1400 		    "cond_reltimedwait err=%d, set=%d, "
1401 		    "class=%d, msgid=(%d, 0x%llx-%d)\n",
1402 		    err, setno, class,
1403 		    MSGID_ELEMS(msg->msg_msgid));
1404 	}
1405 
1406 	/* some failure happened */
1407 	return (MDMNE_RPC_FAIL);
1408 }
1409 
1410 /*
1411  * before we return we have to
1412  * free_msg(msg); because we are working on a copied message
1413  */
1414 void
1415 mdmn_master_process_msg(md_mn_msg_t *msg)
1416 {
1417 	int		*ret;
1418 	int		err;
1419 	int		nmsgs;		/* total number of msgs */
1420 	int		curmsg;		/* index of current msg */
1421 	set_t		setno;
1422 	uint_t		inherit_flags = 0;
1423 	uint_t		secdiff, usecdiff; /* runtime of this message */
1424 	md_error_t	mde = mdnullerror;
1425 	md_mn_msg_t	*msglist[MAX_SUBMESSAGES]; /* all msgs to process */
1426 	md_mn_msg_t	*cmsg;		/* current msg */
1427 	md_mn_msgid_t	dummyid;
1428 	md_mn_result_t	*result;
1429 	md_mn_result_t	*slave_result;
1430 	md_mn_nodeid_t	sender;
1431 	md_mn_nodeid_t	set_master;
1432 	md_mnnode_desc	*node;
1433 	md_mn_msgtype_t	orig_type;	/* type of the original message */
1434 	md_mn_msgtype_t	msgtype;	/* type of the current message */
1435 	md_mn_msgclass_t orig_class;	/* class of the original message */
1436 	md_mn_msgclass_t class;		/* class of the current message */
1437 
1438 	int (*smgen)(md_mn_msg_t *msg, md_mn_msg_t **msglist);
1439 
1440 	orig_type = msgtype = msg->msg_type;
1441 	sender	= msg->msg_sender;
1442 	setno	= msg->msg_setno;
1443 
1444 	result = Zalloc(sizeof (md_mn_result_t));
1445 	result->mmr_setno	= setno;
1446 	result->mmr_msgtype	= msgtype;
1447 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1448 
1449 	orig_class = mdmn_get_message_class(msgtype);
1450 
1451 	commd_debug(MD_MMV_PROC_M,
1452 	    "proc_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1453 	    MSGID_ELEMS(msg->msg_msgid), setno, orig_class, msgtype);
1454 
1455 	(void) rw_rdlock(&set_desc_rwlock[setno]);
1456 	set_master = set_descriptor[setno]->sd_mn_master_nodeid;
1457 	result->mmr_sender	= set_master;
1458 	/*
1459 	 * Put message into the change log unless told otherwise
1460 	 * Note that we only log original messages.
1461 	 * If they are generated by some smgen, we don't log them!
1462 	 * Replay messages aren't logged either.
1463 	 * Note, that replay messages are unlogged on completion.
1464 	 */
1465 	if ((msg->msg_flags & (MD_MSGF_NO_LOG | MD_MSGF_REPLAY_MSG)) == 0) {
1466 		commd_debug(MD_MMV_PROC_M,
1467 		    "proc_mas: calling log_msg for (%d,0x%llx-%d) type %d\n",
1468 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1469 		err = mdmn_log_msg(msg);
1470 		if (err == MDMNE_NULL) {
1471 			/* msg logged successfully */
1472 			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1473 			    "done log_msg for (%d,0x%llx-%d) type %d\n",
1474 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1475 			goto proceed;
1476 		}
1477 		if (err == MDMNE_ACK) {
1478 			/* Same msg in the slot, proceed */
1479 			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1480 			    "already logged (%d,0x%llx-%d) type %d\n",
1481 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1482 			goto proceed;
1483 		}
1484 		if (err == MDMNE_LOG_FAIL) {
1485 			/* Oh, bad, the log is non functional. */
1486 			result->mmr_comm_state = MDMNE_LOG_FAIL;
1487 			/*
1488 			 * Note that the mark_busy was already done by
1489 			 * mdmn_work_svc_2()
1490 			 */
1491 			(void) mutex_lock(&mdmn_busy_mutex[setno]);
1492 			mdmn_mark_class_unbusy(setno, orig_class);
1493 			(void) mutex_unlock(&mdmn_busy_mutex[setno]);
1494 
1495 		}
1496 		if (err == MDMNE_CLASS_BUSY) {
1497 			/*
1498 			 * The log is occupied with a different message
1499 			 * that needs to be played first.
1500 			 * We reject the current message with MDMNE_CLASS_BUSY
1501 			 * to the initiator and do not unbusy the set/class,
1502 			 * because we will proceed with the logged message,
1503 			 * which has the same set/class combination
1504 			 */
1505 			result->mmr_comm_state = MDMNE_CLASS_BUSY;
1506 		}
1507 		ret = (int *)NULL;
1508 		(void) rw_rdlock(&client_rwlock[setno]);
1509 
1510 		if (check_client(setno, sender)) {
1511 			commd_debug(MD_MMV_SYSLOG,
1512 			    "proc_mas: No client for initiator \n");
1513 		} else {
1514 			ret = mdmn_wakeup_initiator_2(result,
1515 			    client[setno][sender], sender);
1516 		}
1517 		(void) rw_unlock(&client_rwlock[setno]);
1518 
1519 		if (ret == (int *)NULL) {
1520 			commd_debug(MD_MMV_SYSLOG,
1521 			    "proc_mas: couldn't wakeup_initiator \n");
1522 		} else {
1523 			if (*ret != MDMNE_ACK) {
1524 				commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1525 				    "wakeup_initiator returned %d\n", *ret);
1526 			}
1527 			free(ret);
1528 		}
1529 		free_msg(msg);
1530 
1531 		if (err == MDMNE_LOG_FAIL) {
1532 			/* we can't proceed here */
1533 			free_result(result);
1534 			(void) rw_unlock(&set_desc_rwlock[setno]);
1535 			return;
1536 		} else if (err == MDMNE_CLASS_BUSY) {
1537 			mdmn_changelog_record_t *lr;
1538 			lr = mdmn_get_changelogrec(setno, orig_class);
1539 			assert(lr != NULL);
1540 
1541 			/* proceed with the logged message */
1542 			msg = copy_msg(&(lr->lr_msg), NULL);
1543 
1544 			/*
1545 			 * The logged message has to have the same class but
1546 			 * type and sender can be different
1547 			 */
1548 			orig_type = msgtype = msg->msg_type;
1549 			sender	= msg->msg_sender;
1550 
1551 			commd_debug(MD_MMV_PROC_M,
1552 			    "proc_mas: Got new message from change log: "
1553 			    "(%d,0x%llx-%d) type %d\n",
1554 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1555 
1556 			/* continue normal operation with this message */
1557 		}
1558 	}
1559 
1560 proceed:
1561 	smgen = mdmn_get_submessage_generator(msgtype);
1562 	if (smgen == NULL) {
1563 		/* no submessages to create, just use the original message */
1564 		msglist[0] = msg;
1565 		nmsgs = 1;
1566 	} else {
1567 		/* some bits are passed on to submessages */
1568 		inherit_flags = msg->msg_flags & MD_MSGF_INHERIT_BITS;
1569 
1570 		nmsgs = smgen(msg, msglist);
1571 
1572 		/* some settings for the submessages */
1573 		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1574 			cmsg    = msglist[curmsg];
1575 
1576 			/* Apply the inherited flags */
1577 			cmsg->msg_flags |= inherit_flags;
1578 
1579 			/*
1580 			 * Make sure the submessage ID is set correctly
1581 			 * Note: first submessage has mid_smid of 1 (not 0)
1582 			 */
1583 			cmsg->msg_msgid.mid_smid = curmsg + 1;
1584 
1585 			/* need the original class set in msgID (for MCT) */
1586 			cmsg->msg_msgid.mid_oclass = orig_class;
1587 		}
1588 
1589 		commd_debug(MD_MMV_PROC_M,
1590 		    "smgen generated %d submsgs, origclass = %d\n",
1591 		    nmsgs, orig_class);
1592 	}
1593 	/*
1594 	 * This big loop does the following.
1595 	 * For all messages:
1596 	 *	process message on the master first (a message completion
1597 	 *		table MCT ensures a message is not processed twice)
1598 	 *	in case of an error break out of message loop
1599 	 *	for all nodes -- unless MD_MSGF_NO_BCAST is set --
1600 	 *		send message to node until that succeeds
1601 	 *		merge result -- not yet implemented
1602 	 *		respect MD_MSGF_STOP_ON_ERROR
1603 	 */
1604 	for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1605 		int	break_msg_loop = 0;
1606 		mutex_t	*mx;		/* protection for class_busy */
1607 		int	master_err;
1608 		int	master_exitval = -1;
1609 
1610 		cmsg	= msglist[curmsg];
1611 		msgtype = cmsg->msg_type;
1612 		class	= mdmn_get_message_class(msgtype);
1613 		node	= NULL;
1614 		mx	= mdmn_get_master_table_mx(setno, class);
1615 
1616 		/* If we are in the abort state, we error out immediately */
1617 		if (md_commd_global_state & MD_CGS_ABORTED) {
1618 			break; /* out of the message loop */
1619 		}
1620 
1621 		commd_debug(MD_MMV_PROC_M, "class=%d, orig_class=%d\n",
1622 		    class, orig_class);
1623 		/*
1624 		 * If the current class is different from the original class,
1625 		 * we have to lock it down.
1626 		 * The original class is already marked busy.
1627 		 * At this point we cannot refuse the message because the
1628 		 * class is busy right now, so we wait until the class becomes
1629 		 * available again. As soon as something changes for this set
1630 		 * we will be cond_signal'ed (in mdmn_mark_class_unbusy)
1631 		 *
1632 		 * Granularity could be finer (setno/class)
1633 		 */
1634 		if (class != orig_class) {
1635 			(void) mutex_lock(&mdmn_busy_mutex[setno]);
1636 			while (mdmn_mark_class_busy(setno, class) == FALSE) {
1637 				(void) cond_wait(&mdmn_busy_cv[setno],
1638 				    &mdmn_busy_mutex[setno]);
1639 			}
1640 			(void) mutex_unlock(&mdmn_busy_mutex[setno]);
1641 		}
1642 
1643 		master_err = do_message_locally(cmsg, result);
1644 
1645 		if ((master_err != MDMNE_ACK) ||
1646 		    ((master_err == MDMNE_ACK) && (result->mmr_exitval != 0))) {
1647 			result->mmr_failing_node = set_master;
1648 			if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1649 				/*
1650 				 * if appropriate, unbusy the class and
1651 				 * break out of the message loop
1652 				 */
1653 				if (class != orig_class) {
1654 					(void) mutex_lock(
1655 					    &mdmn_busy_mutex[setno]);
1656 					mdmn_mark_class_unbusy(setno, class);
1657 					(void) mutex_unlock(
1658 					    &mdmn_busy_mutex[setno]);
1659 				}
1660 				break;
1661 			}
1662 		}
1663 
1664 		if (master_err == MDMNE_ACK)
1665 			master_exitval = result->mmr_exitval;
1666 
1667 		/* No broadcast? => next message */
1668 		if (cmsg->msg_flags & MD_MSGF_NO_BCAST) {
1669 			/* if appropriate, unbusy the class */
1670 			if (class != orig_class) {
1671 				(void) mutex_lock(&mdmn_busy_mutex[setno]);
1672 				mdmn_mark_class_unbusy(setno, class);
1673 				(void) mutex_unlock(&mdmn_busy_mutex[setno]);
1674 			}
1675 			continue;
1676 		}
1677 
1678 
1679 		/* fake sender, so we get notified when the results are avail */
1680 		cmsg->msg_sender = set_master;
1681 		/*
1682 		 * register to the master_table. It's needed by wakeup_master to
1683 		 * wakeup the sleeping thread.
1684 		 * Access is protected by the class lock: mdmn_mark_class_busy()
1685 		 */
1686 		mdmn_set_master_table_id(setno, class, &(cmsg->msg_msgid));
1687 
1688 
1689 
1690 		(void) rw_rdlock(&set_desc_rwlock[setno]);
1691 		/* Send the message  to all other nodes */
1692 		for (node = set_descriptor[setno]->sd_nodelist; node;
1693 		    node = node->nd_next) {
1694 			md_mn_nodeid_t nid = node->nd_nodeid;
1695 
1696 			/* We are master and have already processed the msg */
1697 			if (node == set_descriptor[setno]->sd_mn_masternode) {
1698 				continue;
1699 			}
1700 
1701 			/* If this node didn't join the disk set, ignore it */
1702 			if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
1703 				continue;
1704 			}
1705 
1706 			/* If a DIRECTED message, skip non-recipient nodes */
1707 			if ((cmsg->msg_flags & MD_MSGF_DIRECTED) &&
1708 			    nid != cmsg->msg_recipient) {
1709 				continue;
1710 			}
1711 
1712 			(void) mutex_lock(mx);
1713 			/*
1714 			 * Register the node that is addressed,
1715 			 * so we can detect unsolicited messages
1716 			 */
1717 			mdmn_set_master_table_addr(setno, class, nid);
1718 			slave_result = (md_mn_result_t *)NULL;
1719 
1720 			/*
1721 			 * Now send it. do_send_message() will return if
1722 			 *	a failure occurs or
1723 			 *	the results are available
1724 			 */
1725 			err = do_send_message(cmsg, node);
1726 
1727 			/*  in abort state, we error out immediately */
1728 			if (md_commd_global_state & MD_CGS_ABORTED) {
1729 				break;
1730 			}
1731 
1732 			if (err == MDMNE_ACK) {
1733 				slave_result =
1734 				    mdmn_get_master_table_res(setno, class);
1735 				commd_debug(MD_MMV_PROC_M,
1736 				    "proc_mas: got result for (%d,0x%llx-%d)\n",
1737 				    MSGID_ELEMS(cmsg->msg_msgid));
1738 			} else if (err == MDMNE_IGNORE_NODE) {
1739 				(void) mutex_unlock(mx);
1740 				continue; /* send to next node */
1741 			}
1742 			(void) mutex_unlock(mx);
1743 
1744 
1745 			/*
1746 			 * If the result is NULL, or err doesn't show success,
1747 			 * something went wrong with this RPC call.
1748 			 */
1749 			if ((slave_result == NULL) || (err != MDMNE_ACK)) {
1750 				/*
1751 				 * If PANIC_WHEN_INCONSISTENT set,
1752 				 * panic if the master succeeded while
1753 				 * this node failed
1754 				 */
1755 				if ((cmsg->msg_flags &
1756 				    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1757 				    (master_err == MDMNE_ACK))
1758 					panic_system(nid, cmsg->msg_type,
1759 					    master_err, master_exitval,
1760 					    slave_result);
1761 
1762 				result->mmr_failing_node = nid;
1763 				/* are we supposed to stop in case of error? */
1764 				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1765 					result->mmr_exitval = MDMNE_RPC_FAIL;
1766 					commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1767 					    "result (%d,0x%llx-%d) is NULL\n",
1768 					    MSGID_ELEMS(cmsg->msg_msgid));
1769 					FLUSH_DEBUGFILE();
1770 					break_msg_loop = 1;
1771 					break; /* out of node loop first */
1772 				} else {
1773 					/* send msg to the next node */
1774 					continue;
1775 				}
1776 
1777 			}
1778 
1779 			/*
1780 			 * Message processed on remote node.
1781 			 * If PANIC_WHEN_INCONSISTENT set, panic if the
1782 			 * result is different on this node from the result
1783 			 * on the master
1784 			 */
1785 			if ((cmsg->msg_flags &
1786 			    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1787 			    ((master_err != MDMNE_ACK) ||
1788 			    (slave_result->mmr_exitval != master_exitval)))
1789 				panic_system(nid, cmsg->msg_type, master_err,
1790 				    master_exitval, slave_result);
1791 
1792 			/*
1793 			 * At this point we know we have a message that was
1794 			 * processed on the remote node.
1795 			 * We now check if the exitval is non zero.
1796 			 * In that case we discard the previous result and
1797 			 * rather use the current.
1798 			 * This means: If a message fails on no node,
1799 			 * the result from the master will be returned.
1800 			 * There's currently no such thing as merge of results
1801 			 * If additionally STOP_ON_ERROR is set, we bail out
1802 			 */
1803 			if (slave_result->mmr_exitval != 0) {
1804 				/* throw away the previously allocated result */
1805 				free_result(result);
1806 
1807 				/* copy_result() allocates new memory */
1808 				result = copy_result(slave_result);
1809 				free_result(slave_result);
1810 
1811 				dump_result(MD_MMV_PROC_M, "proc_mas", result);
1812 
1813 				result->mmr_failing_node = nid;
1814 				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1815 					break_msg_loop = 1;
1816 					break; /* out of node loop */
1817 				}
1818 				continue; /* try next node */
1819 
1820 			} else {
1821 				/*
1822 				 * MNIssue: may want to merge the results
1823 				 * from all slaves.  Currently only report
1824 				 * the results from the master.
1825 				 */
1826 				free_result(slave_result);
1827 			}
1828 
1829 		} /* End of loop over the nodes */
1830 		(void) rw_unlock(&set_desc_rwlock[setno]);
1831 
1832 
1833 		/* release the current class again */
1834 		if (class != orig_class) {
1835 			(void) mutex_lock(&mdmn_busy_mutex[setno]);
1836 			mdmn_mark_class_unbusy(setno, class);
1837 			(void) mutex_unlock(&mdmn_busy_mutex[setno]);
1838 		}
1839 
1840 		/* are we supposed to quit entirely ? */
1841 		if (break_msg_loop ||
1842 		    (md_commd_global_state & MD_CGS_ABORTED)) {
1843 			break; /* out of msg loop */
1844 		}
1845 
1846 	} /* End of loop over the messages */
1847 	/*
1848 	 * If we are here, there's two possibilities:
1849 	 * 	- we processed all messages on all nodes without an error.
1850 	 *	    In this case we return the result from the master.
1851 	 *	    (to be implemented: return the merged result)
1852 	 *	- we encountered an error in which case result has been
1853 	 *	    set accordingly already.
1854 	 */
1855 
1856 	if (md_commd_global_state & MD_CGS_ABORTED) {
1857 		result->mmr_comm_state = MDMNE_ABORT;
1858 	}
1859 
1860 	/*
1861 	 * This message has been processed completely.
1862 	 * Remove it from the changelog.
1863 	 * Do this for replay messages too.
1864 	 * Note that the message is unlogged before waking up the
1865 	 * initiator.  This is done for two reasons.
1866 	 * 1. Remove a race condition that occurs when back to back
1867 	 *   messages are sent for the same class, the registeration is
1868 	 *   is lost.
1869 	 * 2. If the initiator died but the action was completed on all the
1870 	 *   the nodes, we want that to be marked "done" quickly.
1871 	 */
1872 
1873 	if ((msg->msg_flags & MD_MSGF_NO_LOG) == 0) {
1874 		commd_debug(MD_MMV_PROC_M,
1875 		    "proc_mas: calling unlog_msg for (%d,0x%llx-%d) type %d\n",
1876 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1877 		(void) mdmn_unlog_msg(msg);
1878 		commd_debug(MD_MMV_PROC_M,
1879 		    "proc_mas: done unlog_msg for (%d,0x%llx-%d) type %d\n",
1880 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1881 	}
1882 
1883 	/*
1884 	 * In case of submessages, we increased the submessage ID in the
1885 	 * result structure. We restore the message ID to the value that
1886 	 * the initiator is waiting for.
1887 	 */
1888 	result->mmr_msgid.mid_smid	= 0;
1889 	result->mmr_msgtype		= orig_type;
1890 	result->mmr_sender		= set_master;
1891 
1892 	/* if we have an inited client, send result */
1893 	ret = (int *)NULL;
1894 
1895 	(void) rw_rdlock(&client_rwlock[setno]);
1896 	if (check_client(setno, sender)) {
1897 		commd_debug(MD_MMV_SYSLOG,
1898 		    "proc_mas: unable to create client for initiator\n");
1899 	} else {
1900 		ret = mdmn_wakeup_initiator_2(result, client[setno][sender],
1901 		    sender);
1902 	}
1903 	(void) rw_unlock(&client_rwlock[setno]);
1904 
1905 	if (ret == (int *)NULL) {
1906 		commd_debug(MD_MMV_PROC_M,
1907 		    "proc_mas: couldn't wakeup initiator\n");
1908 	} else {
1909 		if (*ret != MDMNE_ACK) {
1910 			commd_debug(MD_MMV_PROC_M,
1911 			    "proc_mas: wakeup_initiator returned %d\n",
1912 			    *ret);
1913 		}
1914 		free(ret);
1915 	}
1916 
1917 	(void) rw_unlock(&set_desc_rwlock[setno]);
1918 	/* Free all submessages, if there were any */
1919 	if (nmsgs > 1) {
1920 		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1921 			free_msg(msglist[curmsg]);
1922 		}
1923 	}
1924 	/* Free the result */
1925 	free_result(result);
1926 
1927 	(void) mutex_lock(&mdmn_busy_mutex[setno]);
1928 	mdmn_mark_class_unbusy(setno, orig_class);
1929 	(void) mutex_unlock(&mdmn_busy_mutex[setno]);
1930 
1931 
1932 	/*
1933 	 * We use this ioctl just to get the time in the same format as used in
1934 	 * the messageID. If it fails, all we get is a bad runtime output.
1935 	 */
1936 	(void) metaioctl(MD_IOCGUNIQMSGID, &dummyid, &mde, NULL);
1937 	secdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) >> 32;
1938 	usecdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) & 0xfffff;
1939 
1940 	/* catching possible overflow */
1941 	if (usecdiff >= 1000000) {
1942 		usecdiff -= 1000000;
1943 		secdiff++;
1944 	}
1945 
1946 
1947 	commd_debug(MD_MMV_PROC_M, "proc_mas: done (%d, 0x%llx-%d) type=%02d "
1948 	    "%5d.%06d secs runtime\n",
1949 	    MSGID_ELEMS(msg->msg_msgid), orig_type, secdiff, usecdiff);
1950 
1951 	/* Free the original message */
1952 	free_msg(msg);
1953 }
1954 
1955 void
1956 mdmn_slave_process_msg(md_mn_msg_t *msg)
1957 {
1958 	int			*ret = NULL;
1959 	int			completed;
1960 	int			retries;
1961 	int			successfully_returned;
1962 	set_t			setno;
1963 	md_mn_result_t		*result;
1964 	md_mn_nodeid_t		sender;
1965 	md_mn_nodeid_t		whoami;
1966 	md_mn_msgtype_t		msgtype;
1967 	md_mn_msgclass_t	class;
1968 
1969 	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1970 
1971 	setno	= msg->msg_setno;
1972 	sender	= msg->msg_sender; /* this is always the master of the set */
1973 	msgtype	= msg->msg_type;
1974 
1975 	(void) rw_rdlock(&set_desc_rwlock[setno]);
1976 	whoami		= set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
1977 	(void) rw_unlock(&set_desc_rwlock[setno]);
1978 
1979 	result = Zalloc(sizeof (md_mn_result_t));
1980 	result->mmr_flags	= msg->msg_flags;
1981 	result->mmr_setno	= setno;
1982 	result->mmr_msgtype	= msgtype;
1983 	result->mmr_sender	= whoami;
1984 	result->mmr_comm_state	= MDMNE_ACK; /* Ok state */
1985 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1986 	class = mdmn_get_message_class(msgtype);
1987 
1988 	commd_debug(MD_MMV_PROC_S,
1989 	    "proc_sla: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1990 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msgtype);
1991 
1992 	handler = mdmn_get_handler(msgtype);
1993 
1994 	if (handler == NULL) {
1995 		result->mmr_exitval = 0;
1996 		/* let the sender decide if this is an error or not */
1997 		result->mmr_comm_state = MDMNE_NO_HANDLER;
1998 		commd_debug(MD_MMV_PROC_S,
1999 		    "proc_sla: No handler for (%d, 0x%llx-%d)\n",
2000 		    MSGID_ELEMS(msg->msg_msgid));
2001 	} else {
2002 
2003 		/* Did we already process this message ? */
2004 		(void) mutex_lock(&mct_mutex[setno][class]);
2005 		completed = mdmn_check_completion(msg, result);
2006 
2007 		if (completed == MDMN_MCT_NOT_DONE) {
2008 			/* message not yet processed locally */
2009 			commd_debug(MD_MMV_PROC_S,
2010 			    "proc_sla: calling handler for (%d, 0x%llx-%d)\n",
2011 			    MSGID_ELEMS(msg->msg_msgid));
2012 
2013 			/*
2014 			 * Mark the message as being currently processed,
2015 			 * so we won't start a second handler for it
2016 			 */
2017 			(void) mdmn_mark_completion(msg, NULL,
2018 			    MDMN_MCT_IN_PROGRESS);
2019 
2020 			(void) mutex_unlock(&mct_mutex[setno][class]);
2021 			(*handler)(msg, MD_MSGF_ON_SLAVE, result);
2022 
2023 			commd_debug(MD_MMV_PROC_S,
2024 			    "proc_sla: finished handler for (%d, 0x%llx-%d)\n",
2025 			    MSGID_ELEMS(msg->msg_msgid));
2026 
2027 			(void) mutex_lock(&mct_mutex[setno][class]);
2028 			/* Mark the message as fully done, store the result */
2029 			(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
2030 
2031 		} else if (completed == MDMN_MCT_DONE) {
2032 			/* message processed previously, got result from MCT */
2033 			commd_debug(MD_MMV_PROC_S,
2034 			    "proc_sla: result for (%d, 0x%llx-%d) from MCT\n",
2035 			    MSGID_ELEMS(msg->msg_msgid));
2036 		} else if (completed == MDMN_MCT_IN_PROGRESS) {
2037 			/*
2038 			 * If the message is curruntly being processed,
2039 			 * we can return here, without sending a result back.
2040 			 * This will be done by the initial message handling
2041 			 * thread
2042 			 */
2043 			(void) mutex_unlock(&mct_mutex[setno][class]);
2044 			commd_debug(MD_MMV_PROC_M, "proc_sla: "
2045 			    "(%d, 0x%llx-%d) is currently being processed\n",
2046 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
2047 
2048 			free_msg(msg);
2049 			free_result(result);
2050 			return;
2051 		} else {
2052 			/* MCT error occurred (should never happen) */
2053 			result->mmr_comm_state = MDMNE_LOG_FAIL;
2054 			commd_debug(MD_MMV_PROC_S,
2055 			    "proc_sla: MCT error for (%d, 0x%llx-%d)\n",
2056 			    MSGID_ELEMS(msg->msg_msgid));
2057 		}
2058 		(void) mutex_unlock(&mct_mutex[setno][class]);
2059 	}
2060 
2061 	/*
2062 	 * At this point we have a result (even in an error case)
2063 	 * that we return to the master.
2064 	 */
2065 	(void) rw_rdlock(&set_desc_rwlock[setno]);
2066 	retries = 2; /* we will try two times to send the results */
2067 	successfully_returned = 0;
2068 
2069 	while (!successfully_returned && (retries != 0)) {
2070 		ret = (int *)NULL;
2071 		(void) rw_rdlock(&client_rwlock[setno]);
2072 		if (check_client(setno, sender)) {
2073 			/*
2074 			 * If we cannot setup the rpc connection to the master,
2075 			 * we can't do anything besides logging this fact.
2076 			 */
2077 			commd_debug(MD_MMV_SYSLOG,
2078 			    "proc_mas: unable to create client for master\n");
2079 			(void) rw_unlock(&client_rwlock[setno]);
2080 			break;
2081 		} else {
2082 			ret = mdmn_wakeup_master_2(result,
2083 			    client[setno][sender], sender);
2084 			/*
2085 			 * if mdmn_wakeup_master_2 returns NULL, it can be that
2086 			 * the master (or the commd on the master) had died.
2087 			 * In that case, we destroy the client to the master
2088 			 * and retry.
2089 			 * If mdmn_wakeup_master_2 doesn't return MDMNE_ACK,
2090 			 * the commd on the master is alive but
2091 			 * something else is wrong,
2092 			 * in that case a retry doesn't make sense => break out
2093 			 */
2094 			if (ret == (int *)NULL) {
2095 				commd_debug(MD_MMV_PROC_S,
2096 				    "proc_sla: wakeup_master returned NULL\n");
2097 				/* release reader lock, grab writer lock */
2098 				(void) rw_unlock(&client_rwlock[setno]);
2099 				(void) rw_wrlock(&client_rwlock[setno]);
2100 				mdmn_clnt_destroy(client[setno][sender]);
2101 				if (client[setno][sender] != (CLIENT *)NULL) {
2102 					client[setno][sender] = (CLIENT *)NULL;
2103 				}
2104 				(void) rw_unlock(&client_rwlock[setno]);
2105 				retries--;
2106 				commd_debug(MD_MMV_PROC_S,
2107 				    "retries = %d\n", retries);
2108 				continue;
2109 			}
2110 			if (*ret != MDMNE_ACK) {
2111 				commd_debug(MD_MMV_PROC_S, "proc_sla: "
2112 				    "wakeup_master returned %d\n", *ret);
2113 				(void) rw_unlock(&client_rwlock[setno]);
2114 				break;
2115 			} else { /* Good case */
2116 				successfully_returned = 1;
2117 				(void) rw_unlock(&client_rwlock[setno]);
2118 			}
2119 		}
2120 	}
2121 
2122 	(void) rw_unlock(&set_desc_rwlock[setno]);
2123 	commd_debug(MD_MMV_PROC_S, "proc_sla: done (%d, 0x%llx-%d)\n",
2124 	    MSGID_ELEMS(msg->msg_msgid));
2125 
2126 	if (ret != (int *)NULL)
2127 		free(ret);
2128 	free_msg(msg);
2129 	free_result(result);
2130 }
2131 
2132 
2133 /*
2134  * mdmn_send_svc_2:
2135  * ---------------
2136  * Check that the issuing node is a legitimate one (i.e. is licensed to send
2137  * messages to us), that the RPC request can be staged.
2138  *
2139  * Returns:
2140  *	0	=> no RPC request is in-flight, no deferred svc_sendreply()
2141  *	1	=> queued RPC request in-flight. Completion will be made (later)
2142  *		   by a wakeup_initiator_2() [hopefully]
2143  */
2144 int
2145 mdmn_send_svc_2(md_mn_msg_t *omsg, struct svc_req *rqstp)
2146 {
2147 	int			err;
2148 	set_t			setno;
2149 	SVCXPRT			*transp = rqstp->rq_xprt;
2150 	md_mn_msg_t		*msg;
2151 	md_mn_result_t		*resultp;
2152 	md_mn_msgclass_t	class;
2153 	md_mn_msg_and_transp_t	*matp;
2154 
2155 	msg = copy_msg(omsg, NULL);
2156 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2157 
2158 	setno = msg->msg_setno;
2159 	class = mdmn_get_message_class(msg->msg_type);
2160 
2161 	/* If we are in the abort state, we error out immediately */
2162 	if (md_commd_global_state & MD_CGS_ABORTED) {
2163 		resultp = Zalloc(sizeof (md_mn_result_t));
2164 		resultp->mmr_comm_state = MDMNE_ABORT;
2165 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2166 		free_result(resultp);
2167 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2168 		return (0);
2169 	}
2170 
2171 	/* check if the global initialization is done */
2172 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2173 		global_init();
2174 	}
2175 
2176 	commd_debug(MD_MMV_SEND,
2177 	    "send: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2178 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2179 
2180 	/* Check for verbosity related message */
2181 	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2182 		md_mn_verbose_t *d;
2183 
2184 		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2185 		md_commd_global_verb = d->mmv_what;
2186 		/* everytime the bitmask is set, we reset the timer */
2187 		__savetime = gethrtime();
2188 		/*
2189 		 * If local-only-flag is set, we are done here,
2190 		 * otherwise we pass that message on to the master.
2191 		 */
2192 		if (msg->msg_flags & MD_MSGF_LOCAL_ONLY) {
2193 			resultp = Zalloc(sizeof (md_mn_result_t));
2194 			resultp->mmr_comm_state = MDMNE_ACK;
2195 			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2196 			    (char *)resultp);
2197 			free_result(resultp);
2198 			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2199 			return (0);
2200 		}
2201 	}
2202 
2203 	/*
2204 	 * Are we entering the abort state?
2205 	 * Here we don't even need to check for MD_MSGF_LOCAL_ONLY, because
2206 	 * this message cannot be distributed anyway.
2207 	 * So, it's safe to return immediately.
2208 	 */
2209 	if (msg->msg_type == MD_MN_MSG_ABORT) {
2210 		md_commd_global_state |= MD_CGS_ABORTED;
2211 		resultp = Zalloc(sizeof (md_mn_result_t));
2212 		resultp->mmr_comm_state = MDMNE_ACK;
2213 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2214 		free_result(resultp);
2215 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2216 		return (0);
2217 	}
2218 
2219 
2220 	/*
2221 	 * Is this message type blocked?
2222 	 * If so we return MDMNE_CLASS_LOCKED, immediately
2223 	 */
2224 	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2225 		resultp = Zalloc(sizeof (md_mn_result_t));
2226 		resultp->mmr_comm_state = MDMNE_CLASS_LOCKED;
2227 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2228 		free_result(resultp);
2229 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2230 		commd_debug(MD_MMV_SEND,
2231 		    "send: type locked (%d, 0x%llx-%d), set=%d, class=%d, "
2232 		    "type=%d\n", MSGID_ELEMS(msg->msg_msgid), setno, class,
2233 		    msg->msg_type);
2234 		return (0);
2235 	}
2236 
2237 
2238 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2239 		/* Can only use the appropriate mutexes if they are inited */
2240 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2241 			(void) rw_wrlock(&set_desc_rwlock[setno]);
2242 			(void) rw_wrlock(&client_rwlock[setno]);
2243 			err = mdmn_init_set(setno, MDMN_SET_READY);
2244 			(void) rw_unlock(&client_rwlock[setno]);
2245 			(void) rw_unlock(&set_desc_rwlock[setno]);
2246 		} else {
2247 			err = mdmn_init_set(setno, MDMN_SET_READY);
2248 		}
2249 
2250 		if (err) {
2251 			/* couldn't initialize connections, cannot proceed */
2252 			resultp = Zalloc(sizeof (md_mn_result_t));
2253 			resultp->mmr_comm_state = err;
2254 			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2255 			    (char *)resultp);
2256 			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2257 			free_result(resultp);
2258 			commd_debug(MD_MMV_SEND,
2259 			    "send: init err = %d\n", err);
2260 			return (0);
2261 		}
2262 	}
2263 
2264 	(void) mutex_lock(&mdmn_busy_mutex[setno]);
2265 	if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2266 	    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2267 		(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2268 		resultp = Zalloc(sizeof (md_mn_result_t));
2269 		resultp->mmr_comm_state = MDMNE_SUSPENDED;
2270 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2271 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2272 		free_result(resultp);
2273 		commd_debug(MD_MMV_SEND,
2274 		    "send: class suspended (%d, 0x%llx-%d), set=%d, "
2275 		    "class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2276 		    setno, class, msg->msg_type);
2277 		return (0);
2278 	}
2279 	(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2280 
2281 	/* is this rpc request coming from the local node? */
2282 	if (check_license(rqstp, 0) == FALSE) {
2283 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2284 		commd_debug(MD_MMV_SEND,
2285 		    "send: check licence fail(%d, 0x%llx-%d), set=%d, "
2286 		    "class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2287 		    setno, class, msg->msg_type);
2288 		return (0);
2289 	}
2290 
2291 
2292 	/*
2293 	 * We allocate a structure that can take two pointers in order to pass
2294 	 * both the message and the transp into thread_create.
2295 	 * The free for this alloc is done in mdmn_send_to_work()
2296 	 */
2297 	matp = Malloc(sizeof (md_mn_msg_and_transp_t));
2298 	matp->mat_msg = msg;
2299 	matp->mat_transp = transp;
2300 
2301 	/*
2302 	 * create a thread here that calls work on the master.
2303 	 * If we are already on the master, this would block if running
2304 	 * in the same context. (our service is single threaded)(
2305 	 * Make it a detached thread because it will not communicate with
2306 	 * anybody thru thr_* mechanisms
2307 	 */
2308 	(void) thr_create(NULL, 0, mdmn_send_to_work, (void *) matp,
2309 	    THR_DETACHED, NULL);
2310 
2311 	commd_debug(MD_MMV_SEND, "send: done (%d, 0x%llx-%d)\n",
2312 	    MSGID_ELEMS(msg->msg_msgid));
2313 	/*
2314 	 * We return here without sending results. This will be done by
2315 	 * mdmn_wakeup_initiator_svc_2() as soon as the results are available.
2316 	 * Until then the calling send_message will be blocked, while we
2317 	 * are able to take calls.
2318 	 */
2319 
2320 	return (1);
2321 }
2322 
2323 /* ARGSUSED */
2324 int *
2325 mdmn_work_svc_2(md_mn_msg_t *omsg, struct svc_req *rqstp)
2326 {
2327 	int		err;
2328 	set_t		setno;
2329 	thread_t	tid;
2330 	int		*retval;
2331 	md_mn_msg_t	*msg;
2332 	md_mn_msgclass_t class;
2333 
2334 	retval = Malloc(sizeof (int));
2335 
2336 	/* If we are in the abort state, we error out immediately */
2337 	if (md_commd_global_state & MD_CGS_ABORTED) {
2338 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2339 		*retval = MDMNE_ABORT;
2340 		return (retval);
2341 	}
2342 
2343 	msg = copy_msg(omsg, NULL);
2344 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2345 
2346 	/*
2347 	 * Is this message type blocked?
2348 	 * If so we return MDMNE_CLASS_LOCKED, immediately.
2349 	 * This check is performed on master and slave.
2350 	 */
2351 	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2352 		*retval = MDMNE_CLASS_LOCKED;
2353 		return (retval);
2354 	}
2355 
2356 	/* check if the global initialization is done */
2357 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2358 		global_init();
2359 	}
2360 
2361 	class = mdmn_get_message_class(msg->msg_type);
2362 	setno = msg->msg_setno;
2363 
2364 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2365 		/* Can only use the appropriate mutexes if they are inited */
2366 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2367 			(void) rw_wrlock(&set_desc_rwlock[setno]);
2368 			(void) rw_wrlock(&client_rwlock[setno]);
2369 			err = mdmn_init_set(setno, MDMN_SET_READY);
2370 			(void) rw_unlock(&client_rwlock[setno]);
2371 			(void) rw_unlock(&set_desc_rwlock[setno]);
2372 		} else {
2373 			err = mdmn_init_set(setno, MDMN_SET_READY);
2374 		}
2375 
2376 		if (err) {
2377 			*retval = MDMNE_CANNOT_CONNECT;
2378 			free_msg(msg);
2379 			return (retval);
2380 		}
2381 	}
2382 
2383 	/* is this rpc request coming from a licensed node? */
2384 	if (check_license(rqstp, msg->msg_sender) == FALSE) {
2385 		free_msg(msg);
2386 		*retval = MDMNE_RPC_FAIL;
2387 		return (retval);
2388 	}
2389 
2390 	commd_debug(MD_MMV_WORK,
2391 	    "work: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
2392 	    "flags=0x%x\n",
2393 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type,
2394 	    msg->msg_flags);
2395 
2396 	/* Check for various CLASS0 message types */
2397 	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2398 		md_mn_verbose_t *d;
2399 
2400 		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2401 		/* for now we ignore set / class in md_mn_verbose_t */
2402 		md_commd_global_verb = d->mmv_what;
2403 		/* everytime the bitmask is set, we reset the timer */
2404 		__savetime = gethrtime();
2405 	}
2406 
2407 	(void) mutex_lock(&mdmn_busy_mutex[setno]);
2408 
2409 	/* check if class is locked via a call to mdmn_comm_lock_svc_2 */
2410 	if (mdmn_is_class_locked(setno, class) == TRUE) {
2411 		(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2412 		*retval = MDMNE_CLASS_LOCKED;
2413 		free_msg(msg);
2414 		return (retval);
2415 	}
2416 	(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2417 
2418 	/* Check if the class is busy right now. Do it only on the master */
2419 	(void) rw_rdlock(&set_desc_rwlock[setno]);
2420 	if (set_descriptor[setno]->sd_mn_am_i_master) {
2421 		(void) rw_unlock(&set_desc_rwlock[setno]);
2422 		/*
2423 		 * If the class is currently suspended, don't accept new
2424 		 * messages, unless they are flagged with an override bit.
2425 		 */
2426 		(void) mutex_lock(&mdmn_busy_mutex[setno]);
2427 		if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2428 		    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2429 			(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2430 			*retval = MDMNE_SUSPENDED;
2431 			commd_debug(MD_MMV_SEND,
2432 			    "send: set %d is suspended\n", setno);
2433 			free_msg(msg);
2434 			return (retval);
2435 		}
2436 		if (mdmn_mark_class_busy(setno, class) == FALSE) {
2437 			(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2438 			*retval = MDMNE_CLASS_BUSY;
2439 			free_msg(msg);
2440 			return (retval);
2441 		}
2442 		(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2443 		/*
2444 		 * Because the real processing of the message takes time we
2445 		 * create a thread for it. So the master thread can continue
2446 		 * to run and accept further messages.
2447 		 */
2448 		*retval = thr_create(NULL, 0,
2449 		    (void *(*)(void *))mdmn_master_process_msg, (void *)msg,
2450 		    THR_DETACHED|THR_SUSPENDED, &tid);
2451 	} else {
2452 		(void) rw_unlock(&set_desc_rwlock[setno]);
2453 		*retval = thr_create(NULL, 0,
2454 		    (void *(*)(void *)) mdmn_slave_process_msg, (void *)msg,
2455 		    THR_DETACHED|THR_SUSPENDED, &tid);
2456 	}
2457 
2458 	if (*retval != 0) {
2459 		*retval = MDMNE_THR_CREATE_FAIL;
2460 		free_msg(msg);
2461 		return (retval);
2462 	}
2463 
2464 	/* Now run the new thread */
2465 	(void) thr_continue(tid);
2466 
2467 	commd_debug(MD_MMV_WORK,
2468 	    "work: done (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2469 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2470 
2471 	*retval = MDMNE_ACK; /* this means success */
2472 	return (retval);
2473 }
2474 
2475 /* ARGSUSED */
2476 int *
2477 mdmn_wakeup_initiator_svc_2(md_mn_result_t *res, struct svc_req *rqstp)
2478 {
2479 
2480 	int		*retval;
2481 	int		err;
2482 	set_t		setno;
2483 	mutex_t		*mx;   /* protection of initiator_table */
2484 	SVCXPRT		*transp = NULL;
2485 	md_mn_msgid_t	initiator_table_id;
2486 	md_mn_msgclass_t class;
2487 
2488 	retval = Malloc(sizeof (int));
2489 
2490 	/* check if the global initialization is done */
2491 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2492 		global_init();
2493 	}
2494 
2495 	setno	= res->mmr_setno;
2496 
2497 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2498 		/* set not ready means we just crashed are restarted now */
2499 		/* Can only use the appropriate mutexes if they are inited */
2500 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2501 			(void) rw_wrlock(&set_desc_rwlock[setno]);
2502 			(void) rw_wrlock(&client_rwlock[setno]);
2503 			err = mdmn_init_set(setno, MDMN_SET_READY);
2504 			(void) rw_unlock(&client_rwlock[setno]);
2505 			(void) rw_unlock(&set_desc_rwlock[setno]);
2506 		} else {
2507 			err = mdmn_init_set(setno, MDMN_SET_READY);
2508 		}
2509 
2510 		if (err) {
2511 			*retval = MDMNE_CANNOT_CONNECT;
2512 			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2513 			return (retval);
2514 		}
2515 	}
2516 
2517 	/* is this rpc request coming from a licensed node? */
2518 	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2519 		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2520 		*retval = MDMNE_RPC_FAIL;
2521 		return (retval);
2522 	}
2523 
2524 
2525 	class	= mdmn_get_message_class(res->mmr_msgtype);
2526 	mx	= mdmn_get_initiator_table_mx(setno, class);
2527 
2528 	commd_debug(MD_MMV_WAKE_I,
2529 	    "wake_ini: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
2530 	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype);
2531 
2532 	(void) mutex_lock(mx);
2533 
2534 	/*
2535 	 * Search the initiator wakeup table.
2536 	 * If we find an entry here (which should always be true)
2537 	 * we are on the initiating node and we wakeup the original
2538 	 * local rpc call.
2539 	 */
2540 	mdmn_get_initiator_table_id(setno, class, &initiator_table_id);
2541 
2542 	if (MSGID_CMP(&(initiator_table_id), &(res->mmr_msgid))) {
2543 		transp = mdmn_get_initiator_table_transp(setno, class);
2544 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)res);
2545 		svc_done(transp);
2546 		mdmn_unregister_initiator_table(setno, class);
2547 		*retval = MDMNE_ACK;
2548 
2549 		commd_debug(MD_MMV_WAKE_I,
2550 		    "wake_ini: replied (%d, 0x%llx-%d)\n",
2551 		    MSGID_ELEMS(res->mmr_msgid));
2552 	} else {
2553 		commd_debug(MD_MMV_WAKE_I,
2554 		    "wakeup initiator: unsolicited message (%d, 0x%llx-%d)\n",
2555 		    MSGID_ELEMS(res->mmr_msgid));
2556 		*retval = MDMNE_NO_WAKEUP_ENTRY;
2557 	}
2558 	(void) mutex_unlock(mx);
2559 	/* less work for check_timeouts */
2560 	(void) mutex_lock(&check_timeout_mutex);
2561 	if (messages_on_their_way == 0) {
2562 		commd_debug(MD_MMV_WAKE_I,
2563 		    "Oops, messages_on_their_way < 0 (%d, 0x%llx-%d)\n",
2564 		    MSGID_ELEMS(res->mmr_msgid));
2565 	} else {
2566 		messages_on_their_way--;
2567 	}
2568 	(void) mutex_unlock(&check_timeout_mutex);
2569 	xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2570 
2571 	return (retval);
2572 }
2573 
2574 
2575 /*
2576  * res must be free'd by the thread we wake up
2577  */
2578 /* ARGSUSED */
2579 int *
2580 mdmn_wakeup_master_svc_2(md_mn_result_t *ores, struct svc_req *rqstp)
2581 {
2582 
2583 	int		*retval;
2584 	int		err;
2585 	set_t		setno;
2586 	cond_t		*cv;
2587 	mutex_t		*mx;
2588 	md_mn_msgid_t	master_table_id;
2589 	md_mn_nodeid_t	sender;
2590 	md_mn_result_t	*res;
2591 	md_mn_msgclass_t class;
2592 
2593 	retval = Malloc(sizeof (int));
2594 
2595 	/* check if the global initialization is done */
2596 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2597 		global_init();
2598 	}
2599 
2600 	/* Need to copy the results here, as they are static for RPC */
2601 	res = copy_result(ores);
2602 	xdr_free(xdr_md_mn_result_t, (caddr_t)ores);
2603 
2604 	class = mdmn_get_message_class(res->mmr_msgtype);
2605 	setno = res->mmr_setno;
2606 
2607 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2608 		/* set not ready means we just crashed are restarted now */
2609 		/* Can only use the appropriate mutexes if they are inited */
2610 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2611 			(void) rw_wrlock(&set_desc_rwlock[setno]);
2612 			(void) rw_wrlock(&client_rwlock[setno]);
2613 			err = mdmn_init_set(setno, MDMN_SET_READY);
2614 			(void) rw_unlock(&client_rwlock[setno]);
2615 			(void) rw_unlock(&set_desc_rwlock[setno]);
2616 		} else {
2617 			err = mdmn_init_set(setno, MDMN_SET_READY);
2618 		}
2619 
2620 		if (err) {
2621 			*retval = MDMNE_CANNOT_CONNECT;
2622 			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2623 			return (retval);
2624 		}
2625 	}
2626 
2627 	/* is this rpc request coming from a licensed node? */
2628 	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2629 		*retval = MDMNE_RPC_FAIL;
2630 		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2631 		return (retval);
2632 	}
2633 
2634 
2635 	commd_debug(MD_MMV_WAKE_M,
2636 	    "wake_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d "
2637 	    "from %d\n",
2638 	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype,
2639 	    res->mmr_sender);
2640 	/*
2641 	 * The mutex and cv are needed for waking up the thread
2642 	 * sleeping in mdmn_master_process_msg()
2643 	 */
2644 	mx = mdmn_get_master_table_mx(setno, class);
2645 	cv = mdmn_get_master_table_cv(setno, class);
2646 
2647 	/*
2648 	 * lookup the master wakeup table
2649 	 * If we find our message, we are on the master and
2650 	 * called by a slave that finished processing a message.
2651 	 * We store the results in the appropriate slot and
2652 	 * wakeup the thread (mdmn_master_process_msg()) waiting for them.
2653 	 */
2654 	(void) mutex_lock(mx);
2655 	mdmn_get_master_table_id(setno, class, &master_table_id);
2656 	sender = mdmn_get_master_table_addr(setno, class);
2657 
2658 	if (MSGID_CMP(&(master_table_id), &(res->mmr_msgid))) {
2659 		if (sender == res->mmr_sender) {
2660 			mdmn_set_master_table_res(setno, class, res);
2661 			(void) cond_signal(cv);
2662 			*retval = MDMNE_ACK;
2663 		} else {
2664 			/* id is correct but wrong sender (I smell a timeout) */
2665 			commd_debug(MD_MMV_WAKE_M,
2666 			    "wakeup master got unsolicited message: "
2667 			    "(%d, 0x%llx-%d) from %d\n",
2668 			    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender);
2669 			free_result(res);
2670 			*retval = MDMNE_TIMEOUT;
2671 		}
2672 	} else {
2673 		/* id is wrong, smells like a very late timeout */
2674 		commd_debug(MD_MMV_WAKE_M,
2675 		    "wakeup master got unsolicited message: "
2676 		    "(%d, 0x%llx-%d) from %d, expected (%d, 0x%llx-%d)\n",
2677 		    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender,
2678 		    MSGID_ELEMS(master_table_id));
2679 		free_result(res);
2680 		*retval = MDMNE_NO_WAKEUP_ENTRY;
2681 	}
2682 
2683 	(void) mutex_unlock(mx);
2684 
2685 	return (retval);
2686 }
2687 
2688 /*
2689  * Lock a set/class combination.
2690  * This is mainly done for debug purpose.
2691  * This set/class combination immediately is blocked,
2692  * even in the middle of sending messages to multiple slaves.
2693  * This remains until the user issues a mdmn_comm_unlock_svc_2 for the same
2694  * set/class combination.
2695  *
2696  * Special messages of class MD_MSG_CLASS0 can never be locked.
2697  * 	e.g. MD_MN_MSG_VERBOSITY, MD_MN_MSG_ABORT
2698  *
2699  * That means, if MD_MSG_CLASS0 is specified, we lock all classes from
2700  * >= MD_MSG_CLASS1 to < MD_MN_NCLASSES
2701  *
2702  * set must be between 1 and MD_MAXSETS
2703  * class can be:
2704  *	MD_MSG_CLASS0 which means all other classes in this case
2705  *	or one specific class (< MD_MN_NCLASSES)
2706  *
2707  * Returns:
2708  *	MDMNE_ACK on sucess (locking a locked class is Ok)
2709  *	MDMNE_EINVAL if a parameter is out of range
2710  */
2711 
2712 /* ARGSUSED */
2713 int *
2714 mdmn_comm_lock_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2715 {
2716 	int			*retval;
2717 	set_t			setno = msc->msc_set;
2718 	md_mn_msgclass_t	class = msc->msc_class;
2719 
2720 	retval = Malloc(sizeof (int));
2721 
2722 	/* check if the global initialization is done */
2723 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2724 		global_init();
2725 	}
2726 
2727 	/* is this rpc request coming from the local node ? */
2728 	if (check_license(rqstp, 0) == FALSE) {
2729 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2730 		*retval = MDMNE_RPC_FAIL;
2731 		return (retval);
2732 	}
2733 
2734 	/* Perform some range checking */
2735 	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2736 	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2737 		*retval = MDMNE_EINVAL;
2738 		return (retval);
2739 	}
2740 
2741 	commd_debug(MD_MMV_MISC, "lock: set=%d, class=%d\n", setno, class);
2742 	(void) mutex_lock(&mdmn_busy_mutex[setno]);
2743 	if (class != MD_MSG_CLASS0) {
2744 		mdmn_mark_class_locked(setno, class);
2745 	} else {
2746 		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2747 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2748 			mdmn_mark_class_locked(setno, class);
2749 		}
2750 	}
2751 	(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2752 
2753 	*retval = MDMNE_ACK;
2754 	return (retval);
2755 }
2756 
2757 /*
2758  * Unlock a set/class combination.
2759  * set must be between 1 and MD_MAXSETS
2760  * class can be:
2761  *	MD_MSG_CLASS0 which means all other classes in this case (like above)
2762  *	or one specific class (< MD_MN_NCLASSES)
2763  *
2764  * Returns:
2765  *	MDMNE_ACK on sucess (unlocking an unlocked class is Ok)
2766  *	MDMNE_EINVAL if a parameter is out of range
2767  */
2768 /* ARGSUSED */
2769 int *
2770 mdmn_comm_unlock_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2771 {
2772 	int			*retval;
2773 	set_t			setno  = msc->msc_set;
2774 	md_mn_msgclass_t	class  = msc->msc_class;
2775 
2776 	retval = Malloc(sizeof (int));
2777 
2778 	/* check if the global initialization is done */
2779 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2780 		global_init();
2781 	}
2782 
2783 	/* is this rpc request coming from the local node ? */
2784 	if (check_license(rqstp, 0) == FALSE) {
2785 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2786 		*retval = MDMNE_RPC_FAIL;
2787 		return (retval);
2788 	}
2789 
2790 	/* Perform some range checking */
2791 	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2792 	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2793 		*retval = MDMNE_EINVAL;
2794 		return (retval);
2795 	}
2796 	commd_debug(MD_MMV_MISC, "unlock: set=%d, class=%d\n", setno, class);
2797 
2798 	(void) mutex_lock(&mdmn_busy_mutex[setno]);
2799 	if (class != MD_MSG_CLASS0) {
2800 		mdmn_mark_class_unlocked(setno, class);
2801 	} else {
2802 		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2803 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2804 			mdmn_mark_class_unlocked(setno, class);
2805 		}
2806 	}
2807 	(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2808 
2809 	*retval = MDMNE_ACK;
2810 	return (retval);
2811 }
2812 
2813 /*
2814  * mdmn_comm_suspend_svc_2(setno, class)
2815  *
2816  * Drain all outstanding messages for a given set/class combination
2817  * and don't allow new messages to be processed.
2818  *
2819  * Special messages of class MD_MSG_CLASS0 can never be locked.
2820  * 	e.g. MD_MN_MSG_VERBOSITY
2821  *
2822  * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2823  * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2824  *
2825  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2826  * one class as being suspended.
2827  * If messages for this class are currently on their way,
2828  * MDMNE_SET_NOT_DRAINED is returned. Otherwise MDMNE_ACK is returned.
2829  *
2830  * If class _is_ MD_COMM_ALL_CLASSES we drain all classes of this set.
2831  * Messages must be generated in ascending order.
2832  * This means, a message cannot create submessages with the same or lower class.
2833  * Draining messages must go from 1 to NCLASSES in order to ensure we don't
2834  * generate a hanging situation here.
2835  * We mark class 1 as being suspended.
2836  * if the class is not busy, we proceed with class 2
2837  * and so on
2838  * if a class *is* busy, we cannot continue here, but return
2839  * MDMNE_SET_NOT_DRAINED.
2840  * We expect the caller to hold on for some seconds and try again.
2841  * When that message, that held the class busy is done in
2842  * mdmn_master_process_msg(), mdmn_mark_class_unbusy() called.
2843  * There it is checked if the class is about to drain.
2844  * In that case it tries to drain all higher classes there.
2845  *
2846  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2847  * In that case we return MDMNE_SET_NOT_DRAINED if not all sets are
2848  * completely drained.
2849  *
2850  * Returns:
2851  *	MDMNE_ACK on sucess (set is drained, no outstanding messages)
2852  *	MDMNE_SET_NOT_DRAINED  if drain process is started, but there are
2853  *		still outstanding messages for this set(s)
2854  *	MDMNE_EINVAL if setno is out of range
2855  *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2856  */
2857 
2858 /* ARGSUSED */
2859 int *
2860 mdmn_comm_suspend_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2861 {
2862 	int			*retval;
2863 	int			failure = 0;
2864 	set_t			startset, endset;
2865 	set_t			setno  = msc->msc_set;
2866 	md_mn_msgclass_t	oclass = msc->msc_class;
2867 #ifdef NOT_YET_NEEDED
2868 	uint_t			flags  = msc->msc_flags;
2869 #endif /* NOT_YET_NEEDED */
2870 	md_mn_msgclass_t	class;
2871 
2872 	retval = Malloc(sizeof (int));
2873 
2874 	/* check if the global initialization is done */
2875 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2876 		global_init();
2877 	}
2878 
2879 	/* is this rpc request coming from the local node ? */
2880 	if (check_license(rqstp, 0) == FALSE) {
2881 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2882 		*retval = MDMNE_RPC_FAIL;
2883 		return (retval);
2884 	}
2885 
2886 	commd_debug(MD_MMV_MISC, "suspend: called for set=%d class=%d\n",
2887 	    setno, oclass);
2888 
2889 	/* Perform some range checking */
2890 	if (setno >= MD_MAXSETS) {
2891 		*retval = MDMNE_EINVAL;
2892 		commd_debug(MD_MMV_MISC, "suspend: returning MDMNE_EINVAL\n");
2893 		return (retval);
2894 	}
2895 
2896 	/*  setno == MD_COMM_ALL_SETS means: we walk thru all possible sets. */
2897 	if (setno == MD_COMM_ALL_SETS) {
2898 		startset = 1;
2899 		endset = MD_MAXSETS - 1;
2900 	} else {
2901 		startset = setno;
2902 		endset = setno;
2903 	}
2904 
2905 	for (setno = startset; setno <= endset; setno++) {
2906 		/* Here we need the mutexes for the set to be setup */
2907 		if (md_mn_set_inited[setno] != MDMN_SET_MUTEXES) {
2908 			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
2909 		}
2910 
2911 		(void) mutex_lock(&mdmn_busy_mutex[setno]);
2912 		/* shall we drain all classes of this set? */
2913 		if (oclass == MD_COMM_ALL_CLASSES) {
2914 			for (class = 1; class < MD_MN_NCLASSES; class ++) {
2915 				commd_debug(MD_MMV_MISC,
2916 				    "suspend: suspending set %d, class %d\n",
2917 				    setno, class);
2918 				*retval = mdmn_mark_class_suspended(setno,
2919 				    class, MDMN_SUSPEND_ALL);
2920 				if (*retval == MDMNE_SET_NOT_DRAINED) {
2921 					failure++;
2922 				}
2923 			}
2924 		} else {
2925 			/* only drain one specific class */
2926 			commd_debug(MD_MMV_MISC,
2927 			    "suspend: suspending set=%d class=%d\n",
2928 			    setno, oclass);
2929 			*retval = mdmn_mark_class_suspended(setno, oclass,
2930 			    MDMN_SUSPEND_1);
2931 			if (*retval == MDMNE_SET_NOT_DRAINED) {
2932 				failure++;
2933 			}
2934 		}
2935 		(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2936 	}
2937 	/* If one or more sets are not entirely drained, failure is non-zero */
2938 	if (failure != 0) {
2939 		*retval = MDMNE_SET_NOT_DRAINED;
2940 		commd_debug(MD_MMV_MISC,
2941 		    "suspend: returning MDMNE_SET_NOT_DRAINED\n");
2942 	} else {
2943 		*retval = MDMNE_ACK;
2944 	}
2945 
2946 	return (retval);
2947 }
2948 
2949 /*
2950  * mdmn_comm_resume_svc_2(setno, class)
2951  *
2952  * Resume processing messages for a given set.
2953  * This incorporates the repeal of a previous suspend operation.
2954  *
2955  * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2956  * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2957  *
2958  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2959  * one class as being resumed.
2960  *
2961  * If class _is_ MD_COMM_ALL_CLASSES we resume all classes of this set.
2962  *
2963  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2964  *
2965  * If both setno is MD_COMM_ALL_SETS and class is MD_COMM_ALL_CLASSES we also
2966  * reset any ABORT flag from the global state.
2967  *
2968  * Returns:
2969  *	MDMNE_ACK on sucess (resuming an unlocked set is Ok)
2970  *	MDMNE_EINVAL if setno is out of range
2971  *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2972  */
2973 /* ARGSUSED */
2974 int *
2975 mdmn_comm_resume_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2976 {
2977 	int			*retval;
2978 	set_t			startset, endset;
2979 	set_t			setno  = msc->msc_set;
2980 	md_mn_msgclass_t	oclass = msc->msc_class;
2981 	uint_t			flags  = msc->msc_flags;
2982 	md_mn_msgclass_t	class;
2983 
2984 	retval = Malloc(sizeof (int));
2985 
2986 	/* check if the global initialization is done */
2987 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2988 		global_init();
2989 	}
2990 
2991 	/* is this rpc request coming from the local node ? */
2992 	if (check_license(rqstp, 0) == FALSE) {
2993 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2994 		*retval = MDMNE_RPC_FAIL;
2995 		return (retval);
2996 	}
2997 
2998 	commd_debug(MD_MMV_MISC, "resume: called for set=%d class=%d\n",
2999 	    setno, oclass);
3000 
3001 	/* Perform some range checking */
3002 	if (setno > MD_MAXSETS) {
3003 		*retval = MDMNE_EINVAL;
3004 		return (retval);
3005 	}
3006 
3007 	if (setno == MD_COMM_ALL_SETS) {
3008 		startset = 1;
3009 		endset = MD_MAXSETS - 1;
3010 		if (oclass == MD_COMM_ALL_CLASSES) {
3011 			/* This is the point where we "unabort" the commd */
3012 			commd_debug(MD_MMV_MISC, "resume: resetting ABORT\n");
3013 			md_commd_global_state &= ~MD_CGS_ABORTED;
3014 		}
3015 	} else {
3016 		startset = setno;
3017 		endset = setno;
3018 	}
3019 
3020 	for (setno = startset; setno <= endset; setno++) {
3021 
3022 		/* Here we need the mutexes for the set to be setup */
3023 		if ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0) {
3024 			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
3025 		}
3026 
3027 		(void) mutex_lock(&mdmn_busy_mutex[setno]);
3028 
3029 		if (oclass == MD_COMM_ALL_CLASSES) {
3030 			int end_class = 1;
3031 			/*
3032 			 * When SUSPENDing all classes, we go
3033 			 * from 1 to MD_MN_NCLASSES-1
3034 			 * The correct reverse action is RESUMing
3035 			 * from MD_MN_NCLASSES-1 to 1 (or 2)
3036 			 */
3037 
3038 			if (flags & MD_MSCF_DONT_RESUME_CLASS1) {
3039 				end_class = 2;
3040 			}
3041 
3042 			/*
3043 			 * Then mark all classes of this set as no longer
3044 			 * suspended. This supersedes any previous suspend(1)
3045 			 * calls and resumes the set entirely.
3046 			 */
3047 			for (class = MD_MN_NCLASSES - 1; class >= end_class;
3048 			    class --) {
3049 				commd_debug(MD_MMV_MISC,
3050 				    "resume: resuming set=%d class=%d\n",
3051 				    setno, class);
3052 				mdmn_mark_class_resumed(setno, class,
3053 				    (MDMN_SUSPEND_ALL | MDMN_SUSPEND_1));
3054 			}
3055 		} else {
3056 			/*
3057 			 * In this case only one class is marked as not
3058 			 * suspended. If a suspend(all) is currently active for
3059 			 * this set, this class will still be suspended.
3060 			 * That state will be cleared by a suspend(all)
3061 			 * (see above)
3062 			 */
3063 			commd_debug(MD_MMV_MISC,
3064 			    "resume: resuming set=%d class=%d\n",
3065 			    setno, oclass);
3066 			mdmn_mark_class_resumed(setno, oclass, MDMN_SUSPEND_1);
3067 		}
3068 
3069 		(void) mutex_unlock(&mdmn_busy_mutex[setno]);
3070 	}
3071 
3072 	*retval = MDMNE_ACK;
3073 	return (retval);
3074 }
3075 /* ARGSUSED */
3076 int *
3077 mdmn_comm_reinit_set_svc_2(set_t *setnop, struct svc_req *rqstp)
3078 {
3079 	int		*retval;
3080 	md_mnnode_desc	*node;
3081 	set_t		 setno = *setnop;
3082 
3083 	retval = Malloc(sizeof (int));
3084 
3085 	/* check if the global initialization is done */
3086 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3087 		global_init();
3088 	}
3089 
3090 	/* is this rpc request coming from the local node ? */
3091 	if (check_license(rqstp, 0) == FALSE) {
3092 		xdr_free(xdr_set_t, (caddr_t)setnop);
3093 		*retval = MDMNE_RPC_FAIL;
3094 		return (retval);
3095 	}
3096 
3097 	commd_debug(MD_MMV_MISC, "reinit: set=%d\n", setno);
3098 
3099 	(void) rw_rdlock(&set_desc_rwlock[setno]);
3100 	/*
3101 	 * We assume, that all messages have been suspended previously.
3102 	 *
3103 	 * As we are modifying lots of clients here we grab the client_rwlock
3104 	 * in writer mode. This ensures, no new messages come in.
3105 	 */
3106 	(void) rw_wrlock(&client_rwlock[setno]);
3107 	/* This set is no longer initialized */
3108 
3109 	if ((set_descriptor[setno] != NULL) &&
3110 	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
3111 		/* destroy all rpc clients from this set */
3112 		for (node = set_descriptor[setno]->sd_nodelist; node;
3113 		    node = node->nd_next) {
3114 			/*
3115 			 * Since the CLIENT for ourself will be recreated
3116 			 * shortly, and this node is guaranteed to be
3117 			 * there after a reconfig, there's no reason to go
3118 			 * through destroying it.  It also avoids an issue
3119 			 * with calling clnt_create() later from within the
3120 			 * server thread, which can effectively deadlock
3121 			 * itself due to RPC design limitations.
3122 			 */
3123 			if (node == set_descriptor[setno]->sd_mn_mynode)
3124 				continue;
3125 			mdmn_clnt_destroy(client[setno][node->nd_nodeid]);
3126 			if (client[setno][node->nd_nodeid] != (CLIENT *)NULL) {
3127 				client[setno][node->nd_nodeid] = (CLIENT *)NULL;
3128 			}
3129 		}
3130 		md_mn_set_inited[setno] &= ~MDMN_SET_NODES;
3131 	}
3132 
3133 	commd_debug(MD_MMV_MISC, "reinit: done init_set(%d)\n", setno);
3134 
3135 	(void) rw_unlock(&client_rwlock[setno]);
3136 	(void) rw_unlock(&set_desc_rwlock[setno]);
3137 	*retval = MDMNE_ACK;
3138 	return (retval);
3139 }
3140 
3141 /*
3142  * This is just an interface for testing purpose.
3143  * Here we can disable single message types.
3144  * If we block a message type, this is valid for all MN sets.
3145  * If a message arrives later, and  it's message type is blocked, it will
3146  * be returned immediately with MDMNE_CLASS_LOCKED, which causes the sender to
3147  * resend this message over and over again.
3148  */
3149 
3150 /* ARGSUSED */
3151 int *
3152 mdmn_comm_msglock_svc_2(md_mn_type_and_lock_t *mmtl, struct svc_req *rqstp)
3153 {
3154 	int			*retval;
3155 	md_mn_msgtype_t		type = mmtl->mmtl_type;
3156 	uint_t			lock = mmtl->mmtl_lock;
3157 
3158 	retval = Malloc(sizeof (int));
3159 
3160 	/* check if the global initialization is done */
3161 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3162 		global_init();
3163 	}
3164 
3165 	/* is this rpc request coming from the local node ? */
3166 	if (check_license(rqstp, 0) == FALSE) {
3167 		xdr_free(xdr_md_mn_type_and_lock_t, (caddr_t)mmtl);
3168 		*retval = MDMNE_RPC_FAIL;
3169 		return (retval);
3170 	}
3171 
3172 	/* Perform some range checking */
3173 	if ((type == 0) || (type >= MD_MN_NMESSAGES)) {
3174 		*retval = MDMNE_EINVAL;
3175 		return (retval);
3176 	}
3177 
3178 	commd_debug(MD_MMV_MISC, "msglock: type=%d, lock=%d\n", type, lock);
3179 	msgtype_lock_state[type] = lock;
3180 
3181 	*retval = MDMNE_ACK;
3182 	return (retval);
3183 }
3184