xref: /titanic_41/usr/src/cmd/lvm/rpc.mdcommd/mdmn_commd_server.c (revision 6e0cbcaa0c6f2bc34634a4cc17b099f9ecef03d1)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
24  */
25 
26 #include <unistd.h>
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <sys/statvfs.h>
30 #include <sys/uadmin.h>
31 #include <sys/resource.h>
32 #include <fcntl.h>
33 #include <stdio.h>
34 #include <thread.h>
35 #include <meta.h>
36 #include <sdssc.h>
37 #include <mdmn_changelog.h>
38 #include "mdmn_subr.h"
39 
40 /*
41  * This is the communication daemon for SVM Multi Node Disksets.
42  * It runs on every node and provides the following rpc services:
43  *  - mdmn_send_svc_2
44  *  - mdmn_work_svc_2
45  *  - mdmn_wakeup_initiator_svc_2
46  *  - mdmn_wakeup_master_svc_2
47  *  - mdmn_comm_lock_svc_2
48  *  - mdmn_comm_unlock_svc_2
49  *  - mdmn_comm_suspend_svc_2
50  *  - mdmn_comm_resume_svc_2
51  *  - mdmn_comm_reinit_set_svc_2
52  * where send, lock, unlock and reinit are meant for external use,
53  * work and the two wakeups are for internal use only.
54  *
55  * NOTE:
56  * On every node only one of those xxx_2 functions can be active at the
57  * same time because the daemon is single threaded.
58  *
59  * (not quite true, as mdmn_send_svc_2 and mdmn_work_svc_2 do thr_create()s
60  * as part of their handlers, so those aspects are multi-threaded)
61  *
62  * In case an event occurs that has to be propagated to all the nodes...
63  *
64  * One node (the initiator)
65  *	calls the libmeta function mdmn_send_message()
66  *	This function calls the local daemon thru mdmn_send_svc_2.
67  *
68  * On the initiator:
69  *	mdmn_send_svc_2()
70  *	    - starts a thread -> mdmn_send_to_work() and returns.
71  *	mdmn_send_to_work()
72  *	    - sends this message over to the master of the diskset.
73  *	      This is done by calling mdmn_work_svc_2 on the master.
74  *	    - registers to the initiator_table
75  *	    - exits without doing a svc_sendreply() for the call to
76  *	      mdmn_send_svc_2. This means that call is blocked until somebody
77  *	      (see end of this comment) does a svc_sendreply().
78  *	      This means mdmn_send_message() does not yet return.
79  *	    - A timeout surveillance is started at this point.
80  *	      This means in case the master doesn't reply at all in an
81  *	      aproppriate time, an error condition is returned
82  *	      to the caller.
83  *
84  * On the master:
85  *	mdmn_work_svc_2()
86  *	    - starts a thread -> mdmn_master_process_msg() and returns
87  *	mdmn_master_process_msg()
88  *	    - logs the message to the change log
89  *	    - executes the message locally
90  *	    - flags the message in the change log
91  *	    - sends the message to mdmn_work_svc_2() on all the
92  *	      other nodes (slaves)
93  *	      after each call to mdmn_work_svc_2 the thread goes to sleep and
94  *	      will be woken up by mdmn_wakeup_master_svc_2() as soon as the
95  *	      slave node is done with this message.
96  *	    - In case the slave doesn't respond in a apropriate time, an error
97  *	      is assumed to ensure the master doesn't wait forever.
98  *
99  * On a slave:
100  *	mdmn_work_svc_2()
101  *	    - starts a thread -> mdmn_slave_process_msg() and returns
102  *	mdmn_slave_process_msg()
103  *	    - processes this message locally by calling the appropriate message
104  *	      handler, that creates some result.
105  *	    - sends that result thru a call to mdmn_wakeup_master_svc_2() to
106  *	      the master.
107  *
108  * Back on the master:
109  *	mdmn_wakeup_master_svc_2()
110  *	    - stores the result into the master_table.
111  *	    - signals the mdmn_master_process_msg-thread.
112  *	    - returns
113  *	mdmn_master_process_msg()
114  *	    - after getting the results from all nodes
115  *	    - sends them back to the initiating node thru a call to
116  *	      mdmn_wakeup_initiator_svc_2.
117  *
118  * Back on the initiator:
119  *	mdmn_wakeup_initiator_svc_2()
120  *	    - calls svc_sendreply() which makes the call to mdmn_send_svc_2()
121  *	      return.
122  *	      which allows the initial mdmn_send_message() call to return.
123  */
124 
125 FILE *commdout;		/* debug output for the commd */
126 char *commdoutfile;	/* file name for the above output */
127 /* want at least 10 MB free space when logging into a file */
128 #define	MIN_FS_SPACE	(10LL * 1024 * 1024)
129 
130 /*
131  * Number of outstanding messages that were initiated by this node.
132  * If zero, check_timeouts goes to sleep
133  */
134 uint_t	messages_on_their_way;
135 mutex_t	check_timeout_mutex;	/* need mutex to protect above */
136 cond_t	check_timeout_cv;	/* trigger for check_timeouts */
137 
138 /* for printing out time stamps */
139 hrtime_t __savetime;
140 
141 /* RPC clients for every set and every node and their protecting locks */
142 CLIENT	*client[MD_MAXSETS][NNODES];
143 rwlock_t client_rwlock[MD_MAXSETS];
144 
145 /* the descriptors of all possible sets and their protectors */
146 struct md_set_desc *set_descriptor[MD_MAXSETS];
147 rwlock_t set_desc_rwlock[MD_MAXSETS];
148 
149 /* the daemon to daemon communication has to timeout quickly */
150 static struct timeval FOUR_SECS = { 4, 0 };
151 
152 /* These indicate if a set has already been setup */
153 int md_mn_set_inited[MD_MAXSETS];
154 
155 /* For every set we have a message completion table and protecting mutexes */
156 md_mn_mct_t *mct[MD_MAXSETS];
157 mutex_t	mct_mutex[MD_MAXSETS][MD_MN_NCLASSES];
158 
159 /* Stuff to describe the global status of the commd on one node */
160 #define	MD_CGS_INITED		0x0001
161 #define	MD_CGS_ABORTED		0x0002	/* return everything with MDMNE_ABORT */
162 uint_t md_commd_global_state = 0;	/* No state when starting up */
163 
164 /*
165  * Global verbosity level for the daemon
166  */
167 uint_t md_commd_global_verb;
168 
169 /*
170  * libmeta doesn't like multiple threads in metaget_setdesc().
171  * So we must protect access to it with a global lock
172  */
173 mutex_t get_setdesc_mutex;
174 
175 /*
176  * Need a way to block single message types,
177  * hence an array with a status for every message type
178  */
179 uint_t msgtype_lock_state[MD_MN_NMESSAGES];
180 
181 /* for reading in the config file */
182 #define	MAX_LINE_SIZE 1024
183 
184 extern char *commd_get_outfile(void);
185 extern uint_t commd_get_verbosity(void);
186 
187 /*
188  * mdmn_clnt_create is a helper function for meta_client_create_retry.  It
189  * merely needs to call clnt_create_timed, and meta_client_create_retry
190  * will take care of the rest.
191  */
192 /* ARGSUSED */
193 static CLIENT *
194 mdmn_clnt_create(char *ignore, void *data, struct timeval *time_out)
195 {
196 	md_mnnode_desc	*node = (md_mnnode_desc *)data;
197 
198 	return (clnt_create_timed(node->nd_priv_ic, MDMN_COMMD, TWO, "tcp",
199 	    time_out));
200 }
201 
202 #define	FLUSH_DEBUGFILE() \
203 	if (commdout != (FILE *)NULL) { \
204 		(void) fflush(commdout); \
205 		(void) fsync(fileno(commdout)); \
206 	}
207 
208 static void
209 panic_system(int nid, md_mn_msgtype_t type, int master_err, int master_exitval,
210     md_mn_result_t *slave_result)
211 {
212 	md_mn_commd_err_t	commd_err;
213 	md_error_t		mne = mdnullerror;
214 	char			*msg_buf;
215 
216 	msg_buf = (char *)calloc(MAXPATHLEN + 1, sizeof (char));
217 
218 	FLUSH_DEBUGFILE();
219 
220 	if (master_err != MDMNE_ACK) {
221 		(void) snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC "
222 		    "fail on master when processing message type %d\n", type);
223 	} else if (slave_result == NULL) {
224 		(void) snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail "
225 		    "on node %d when processing message type %d\n", nid, type);
226 	} else {
227 		(void) snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: "
228 		    "Inconsistent return value from node %d when processing "
229 		    "message type %d. Master exitval = %d, "
230 		    "Slave exitval = %d\n", nid, type, master_exitval,
231 		    slave_result->mmr_exitval);
232 	}
233 	commd_err.size = strlen(msg_buf);
234 	commd_err.md_message = (uint64_t)(uintptr_t)&msg_buf[0];
235 
236 	(void) metaioctl(MD_MN_COMMD_ERR, &commd_err, &mne, "rpc.mdcommd");
237 	(void) uadmin(A_DUMP, AD_BOOT, NULL);
238 }
239 
240 static void
241 flush_fcout()
242 {
243 	struct statvfs64 vfsbuf;
244 	long long avail_bytes;
245 	int warned = 0;
246 
247 	for (; ; ) {
248 		(void) sleep(10);
249 		/* No output file, nothing to do */
250 		if (commdout == (FILE *)NULL)
251 			continue;
252 
253 		/*
254 		 * stat the appropriate filesystem to check for available space.
255 		 */
256 		if (statvfs64(commdoutfile, &vfsbuf)) {
257 			continue;
258 		}
259 
260 		avail_bytes = vfsbuf.f_frsize * vfsbuf.f_bavail;
261 		/*
262 		 * If we don't have enough space, we print out a warning.
263 		 * And we drop the verbosity level to NULL
264 		 * In case the condtion doesn't go away, we don't repeat
265 		 * the warning.
266 		 */
267 		if (avail_bytes < MIN_FS_SPACE) {
268 			if (warned) {
269 				continue;
270 			}
271 			commd_debug(MD_MMV_SYSLOG,
272 			    "NOT enough space available for logging\n");
273 			commd_debug(MD_MMV_SYSLOG,
274 			    "Have %lld bytes, need %lld bytes\n",
275 			    avail_bytes, MIN_FS_SPACE);
276 			warned = 1;
277 			md_commd_global_verb = MD_MMV_NULL;
278 		} else {
279 			warned = 0;
280 		}
281 
282 		(void) fflush(commdout);
283 	}
284 }
285 
286 /* safer version of clnt_destroy. If clnt is NULL don't do anything */
287 #define	mdmn_clnt_destroy(clnt) {	\
288 	if (clnt)			\
289 		clnt_destroy(clnt);	\
290 }
291 
292 /*
293  * Own version of svc_sendreply that checks the integrity of the transport
294  * handle and so prevents us from core dumps in the real svc_sendreply()
295  */
296 void
297 mdmn_svc_sendreply(SVCXPRT *transp, xdrproc_t xdr, caddr_t data)
298 {
299 	if (SVC_STAT(transp) == XPRT_DIED) {
300 		commd_debug(MD_MMV_MISC,
301 		    "mdmn_svc_sendreply: XPRT_DIED\n");
302 		return;
303 	}
304 	(void) svc_sendreply(transp, xdr, data);
305 }
306 
307 /*
308  * timeout_initiator(set, class)
309  *
310  * Alas, I sent a message and didn't get a response back in aproppriate time.
311  *
312  * timeout_initiator() takes care for doing the needed svc_sendreply() to the
313  * calling mdmn_send_message, so that guy doesn't wait forever
314  * What is done here is pretty much the same as what is done in
315  * wakeup initiator. The difference is that we cannot provide for any results,
316  * of course and we set the comm_state to MDMNE_TIMEOUT.
317  *
318  * By doing so, mdmn_send_message can decide if a retry would make sense or not.
319  * It's not our's to decide that here.
320  */
321 void
322 timeout_initiator(set_t setno, md_mn_msgclass_t class)
323 {
324 	SVCXPRT		*transp;
325 	md_mn_msgid_t	mid;
326 	md_mn_result_t *resultp;
327 
328 	resultp = Zalloc(sizeof (md_mn_result_t));
329 	resultp->mmr_comm_state	= MDMNE_TIMEOUT;
330 
331 	commd_debug(MD_MMV_MISC,
332 	    "timeout_initiator set = %d, class = %d\n", setno, class);
333 
334 	transp = mdmn_get_initiator_table_transp(setno, class);
335 	mdmn_get_initiator_table_id(setno, class, &mid);
336 
337 	commd_debug(MD_MMV_MISC, "timeout_ini: (%d, 0x%llx-%d)\n",
338 	    MSGID_ELEMS(mid));
339 	/*
340 	 * Give the result the corresponding msgid from the failed message.
341 	 */
342 	MSGID_COPY(&mid, &(resultp->mmr_msgid));
343 
344 	/* return to mdmn_send_message() and let it deal with the situation */
345 	mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
346 
347 	free(resultp);
348 	commd_debug(MD_MMV_MISC, "timeout_ini: sendreplied\n");
349 	svc_done(transp);
350 	mdmn_unregister_initiator_table(setno, class);
351 }
352 
353 
354 /*
355  * check_timeouts - thread
356  *
357  * This implements a timeout surveillance for messages sent from the
358  * initiator to the master.
359  *
360  * If a message is started, this thread is triggered thru
361  * cond_signal(&check_timeout_cv) and we keep track of the numbers of
362  * messages that are outstanding (messages_on_their_way).
363  *
364  * As long as there are messages on their way, this thread never goes to sleep.
365  * It'll keep checking all class/set combinations for outstanding messages.
366  * If one is found, it's checked if this message is overdue. In that case,
367  * timeout_initiator() is called to wakeup the calling mdmn_send_message and
368  * to clean up the mess.
369  *
370  * If the result from the master arrives later, this message is considered
371  * to be unsolicited. And will be ignored.
372  */
373 
374 void
375 check_timeouts()
376 {
377 	set_t			setno;
378 	time_t			now, then;
379 	mutex_t			*mx;
380 	md_mn_msgclass_t	class;
381 
382 	for (; ; ) {
383 		now = time((time_t *)NULL);
384 		for (setno = 1; setno < MD_MAXSETS; setno++) {
385 			if (md_mn_set_inited[setno] != MDMN_SET_READY) {
386 				continue;
387 			}
388 			for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES;
389 			    class++) {
390 				mx = mdmn_get_initiator_table_mx(setno, class);
391 				(void) mutex_lock(mx);
392 
393 				/* then is the registered time */
394 				then =
395 				    mdmn_get_initiator_table_time(setno, class);
396 				if ((then != 0) && (now > then)) {
397 					timeout_initiator(setno, class);
398 				}
399 				(void) mutex_unlock(mx);
400 			}
401 		}
402 		/* it's ok to check only once per second */
403 		(void) sleep(1);
404 
405 		/* is there work to do? */
406 		(void) mutex_lock(&check_timeout_mutex);
407 		if (messages_on_their_way == 0) {
408 			(void) cond_wait(&check_timeout_cv,
409 			    &check_timeout_mutex);
410 		}
411 		(void) mutex_unlock(&check_timeout_mutex);
412 	}
413 }
414 
415 void
416 setup_debug(void)
417 {
418 	char	*tmp_dir;
419 
420 	/* Read in the debug-controlling tokens from runtime.cf */
421 	md_commd_global_verb = commd_get_verbosity();
422 	/*
423 	 * If the user didn't specify a verbosity level in runtime.cf
424 	 * we can safely return here. As we don't intend to printout
425 	 * debug messages, we don't need to check for the output file.
426 	 */
427 	if (md_commd_global_verb == 0) {
428 		return;
429 	}
430 
431 	/* if commdout is non-NULL it is an open FILE, we'd better close it */
432 	if (commdout != (FILE *)NULL) {
433 		(void) fclose(commdout);
434 	}
435 
436 	commdoutfile = commd_get_outfile();
437 
438 	/* setup the debug output */
439 	if (commdoutfile == (char *)NULL) {
440 		/* if no valid file was specified, use the default */
441 		commdoutfile = "/var/run/commd.out";
442 		commdout = fopen(commdoutfile, "a");
443 	} else {
444 		/* check if the directory exists and is writable */
445 		tmp_dir = strdup(commdoutfile);
446 		if ((access(dirname(tmp_dir), X_OK|W_OK)) ||
447 		    ((commdout = fopen(commdoutfile, "a")) == (FILE *)NULL)) {
448 			syslog(LOG_ERR,
449 			    "Can't write to specified output file %s,\n"
450 			    "using /var/run/commd.out instead\n", commdoutfile);
451 			free(commdoutfile);
452 			commdoutfile = "/var/run/commd.out";
453 			commdout = fopen(commdoutfile, "a");
454 		}
455 		free(tmp_dir);
456 	}
457 
458 	if (commdout == (FILE *)NULL) {
459 		syslog(LOG_ERR, "Can't write to debug output file %s\n",
460 		    commdoutfile);
461 	}
462 }
463 
464 /*
465  * mdmn_is_node_dead checks to see if a node is dead using
466  * the SunCluster infrastructure which is a stable interface.
467  * If unable to contact SunCuster the node is assumed to be alive.
468  * Return values:
469  *	1 - node is dead
470  *	0 - node is alive
471  */
472 int
473 mdmn_is_node_dead(md_mnnode_desc *node)
474 {
475 	char	*fmt = "/usr/cluster/bin/scha_cluster_get -O NODESTATE_NODE ";
476 	char	*cmd;
477 	size_t	size;
478 	char	buf[10];
479 	FILE	*ptr;
480 	int	retval = 0;
481 
482 	/* I know that I'm alive */
483 	if (strcmp(node->nd_nodename, mynode()) == 0)
484 		return (retval);
485 
486 	size = strlen(fmt) + strlen(node->nd_nodename) + 1;
487 	cmd = Zalloc(size);
488 	(void) strlcat(cmd, fmt, size);
489 	(void) strlcat(cmd, node->nd_nodename, size);
490 
491 	if ((ptr = popen(cmd, "r")) != NULL) {
492 		if (fgets(buf, sizeof (buf), ptr) != NULL) {
493 			/* If scha_cluster_get returned DOWN - return dead */
494 			if (strncmp(buf, "DOWN", 4) == 0)
495 				retval = 1;
496 		}
497 		(void) pclose(ptr);
498 	}
499 	Free(cmd);
500 	return (retval);
501 }
502 
503 /*
504  * global_init()
505  *
506  * Perform some global initializations.
507  *
508  * the following routines have to call this before operation can start:
509  *  - mdmn_send_svc_2
510  *  - mdmn_work_svc_2
511  *  - mdmn_comm_lock_svc_2
512  *  - mdmn_comm_unlock_svc_2
513  *  - mdmn_comm_suspend_svc_2
514  *  - mdmn_comm_resume_svc_2
515  *  - mdmn_comm_reinit_set_svc_2
516  *
517  * This is a single threaded daemon, so it can only be in one of the above
518  * routines at the same time.
519  * This means, global_init() cannot be called more than once at the same time.
520  * Hence, no lock is needed.
521  */
522 void
523 global_init(void)
524 {
525 	set_t			set;
526 	md_mn_msgclass_t	class;
527 	struct sigaction	sighandler;
528 	time_t			clock_val;
529 	struct rlimit		commd_limit;
530 
531 
532 
533 	/* Do these global initializations only once */
534 	if (md_commd_global_state & MD_CGS_INITED) {
535 		return;
536 	}
537 	(void) sdssc_bind_library();
538 
539 	/* setup the debug options from the config file */
540 	setup_debug();
541 
542 	/* make sure that we don't run out of file descriptors */
543 	commd_limit.rlim_cur = commd_limit.rlim_max = RLIM_INFINITY;
544 	if (setrlimit(RLIMIT_NOFILE, &commd_limit) != 0) {
545 		syslog(LOG_WARNING, gettext("setrlimit failed."
546 		    "Could not increase the max file descriptors"));
547 	}
548 
549 	/* Make setup_debug() be the action in case of SIGHUP */
550 	sighandler.sa_flags = 0;
551 	(void) sigfillset(&sighandler.sa_mask);
552 	sighandler.sa_handler = (void (*)(int)) setup_debug;
553 	(void) sigaction(SIGHUP, &sighandler, NULL);
554 
555 	__savetime = gethrtime();
556 	(void) time(&clock_val);
557 	commd_debug(MD_MMV_MISC, "global init called %s\n", ctime(&clock_val));
558 
559 	/* start a thread that flushes out the debug on a regular basis */
560 	(void) thr_create(NULL, 0, (void *(*)(void *))flush_fcout,
561 	    (void *) NULL, THR_DETACHED, NULL);
562 
563 	/* global rwlock's / mutex's / cond_t's go here */
564 	(void) mutex_init(&check_timeout_mutex, USYNC_THREAD, NULL);
565 	(void) cond_init(&check_timeout_cv, USYNC_THREAD, NULL);
566 	(void) mutex_init(&get_setdesc_mutex, USYNC_THREAD, NULL);
567 
568 	/* Make sure the initiator table is initialized correctly */
569 	for (set = 0; set < MD_MAXSETS; set++) {
570 		for (class = 0; class < MD_MN_NCLASSES; class++) {
571 			mdmn_unregister_initiator_table(set, class);
572 		}
573 	}
574 
575 
576 	/* setup the check for timeouts */
577 	(void) thr_create(NULL, 0, (void *(*)(void *))check_timeouts,
578 	    (void *) NULL, THR_DETACHED, NULL);
579 
580 	md_commd_global_state |= MD_CGS_INITED;
581 }
582 
583 
584 /*
585  * mdmn_init_client(setno, nodeid)
586  * called if client[setno][nodeid] is NULL
587  *
588  * NOTE: Must be called with set_desc_rwlock held as a reader
589  * NOTE: Must be called with client_rwlock held as a writer
590  *
591  * If the rpc client for this node has not been setup for any set, we do it now.
592  *
593  * Returns	0 on success (node found in set, rpc client setup)
594  *		-1 if metaget_setdesc failed,
595  *		-2 if node not part of set
596  *		-3 if clnt_create fails
597  */
598 static int
599 mdmn_init_client(set_t setno, md_mn_nodeid_t nid)
600 {
601 	md_error_t	ep = mdnullerror;
602 	md_mnnode_desc	*node;
603 	md_set_desc	*sd;	/* just an abbr for set_descriptor[setno] */
604 
605 	sd = set_descriptor[setno];
606 
607 	/*
608 	 * Is the appropriate set_descriptor already initialized ?
609 	 * Can't think of a scenario where this is not the case, but we'd better
610 	 * check for it anyway.
611 	 */
612 	if (sd == NULL) {
613 		mdsetname_t	*sp;
614 
615 		/* readlock -> writelock */
616 		(void) rw_unlock(&set_desc_rwlock[setno]);
617 		(void) rw_wrlock(&set_desc_rwlock[setno]);
618 		sp = metasetnosetname(setno, &ep);
619 		/* Only one thread is supposed to be in metaget_setdesc() */
620 		(void) mutex_lock(&get_setdesc_mutex);
621 		sd = metaget_setdesc(sp, &ep);
622 		(void) mutex_unlock(&get_setdesc_mutex);
623 		if (sd == NULL) {
624 			/* back to ... */
625 			(void) rw_unlock(&set_desc_rwlock[setno]);
626 			/* ... readlock */
627 			(void) rw_rdlock(&set_desc_rwlock[setno]);
628 			return (-1);
629 		}
630 		set_descriptor[setno] = sd;
631 		/* back to readlock */
632 		(void) rw_unlock(&set_desc_rwlock[setno]);
633 		(void) rw_rdlock(&set_desc_rwlock[setno]);
634 	}
635 
636 	/* first we have to find the node name for this node id */
637 	for (node = sd->sd_nodelist; node; node = node->nd_next) {
638 		if (node->nd_nodeid == nid)
639 			break; /* we found our node in this set */
640 	}
641 
642 
643 	if (node == (md_mnnode_desc *)NULL) {
644 		commd_debug(MD_MMV_SYSLOG,
645 		    "FATAL: node %d not found in set %d\n", nid, setno);
646 		(void) rw_unlock(&set_desc_rwlock[setno]);
647 		return (-2);
648 	}
649 
650 	commd_debug(MD_MMV_INIT, "init: %s has the flags: 0x%x\n",
651 	    node->nd_nodename ? node->nd_nodename : "NULL", node->nd_flags);
652 
653 	/* Did this node join the diskset?  */
654 	if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
655 		commd_debug(MD_MMV_INIT, "init: %s didn't join set %d\n",
656 		    node->nd_nodename ? node->nd_nodename : "NULL", setno);
657 		(void) rw_unlock(&set_desc_rwlock[setno]);
658 		return (-2);
659 	}
660 
661 	/* if clnt_create has not been done for that node, do it now */
662 	if (client[setno][nid] == (CLIENT *) NULL) {
663 		time_t	tout = 0;
664 
665 		/*
666 		 * While trying to create a connection to a node,
667 		 * periodically check to see if the node has been marked
668 		 * dead by the SunCluster infrastructure.
669 		 * This periodic check is needed since a non-responsive
670 		 * rpc.mdcommd (while it is attempting to create a connection
671 		 * to a dead node) can lead to large delays and/or failures
672 		 * in the reconfig steps.
673 		 */
674 		while ((client[setno][nid] == (CLIENT *) NULL) &&
675 		    (tout < MD_CLNT_CREATE_TOUT)) {
676 			client[setno][nid] = meta_client_create_retry(
677 			    node->nd_nodename, mdmn_clnt_create,
678 			    (void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
679 			/* Is the node dead? */
680 			if (mdmn_is_node_dead(node) == 1) {
681 				commd_debug(MD_MMV_SYSLOG,
682 				    "rpc.mdcommd: no client for dead node %s\n",
683 				    node->nd_nodename);
684 				break;
685 			} else
686 				tout += MD_CLNT_CREATE_SUBTIMEOUT;
687 		}
688 
689 		if (client[setno][nid] == (CLIENT *) NULL) {
690 			clnt_pcreateerror(node->nd_nodename);
691 			(void) rw_unlock(&set_desc_rwlock[setno]);
692 			return (-3);
693 		}
694 		/* this node has the license to send */
695 		commd_debug(MD_MMV_MISC, "init_client: calling add_lic\n");
696 		add_license(node);
697 
698 		/* set the timeout value */
699 		clnt_control(client[setno][nid], CLSET_TIMEOUT,
700 		    (char *)&FOUR_SECS);
701 
702 	}
703 	(void) rw_unlock(&set_desc_rwlock[setno]);
704 	return (0);
705 }
706 
707 /*
708  * check_client(setno, nodeid)
709  *
710  * must be called with reader lock held for set_desc_rwlock[setno]
711  * and must be called with reader lock held for client_rwlock[setno]
712  * Checks if the client for this set/node combination is already setup
713  * if not it upgrades the lock to a writer lock
714  * and tries to initialize the client.
715  * Finally it's checked if the client nulled out again due to some race
716  *
717  * returns 0 if there is a usable client
718  * returns MDMNE_RPC_FAIL otherwise
719  */
720 static int
721 check_client(set_t setno, md_mn_nodeid_t nodeid)
722 {
723 	int ret = 0;
724 
725 	while ((client[setno][nodeid] == (CLIENT *)NULL) && (ret == 0)) {
726 		/* upgrade reader ... */
727 		(void) rw_unlock(&client_rwlock[setno]);
728 		/* ... to writer lock. */
729 		(void) rw_wrlock(&client_rwlock[setno]);
730 		if (mdmn_init_client(setno, nodeid) != 0) {
731 			ret = MDMNE_RPC_FAIL;
732 		}
733 		/* downgrade writer ... */
734 		(void) rw_unlock(&client_rwlock[setno]);
735 		/* ... back to reader lock. */
736 		(void) rw_rdlock(&client_rwlock[setno]);
737 	}
738 	return (ret);
739 }
740 
741 /*
742  * mdmn_init_set(setno, todo)
743  * setno is the number of the set to be initialized.
744  * todo is one of the MDMN_SET_* thingies or MDMN_SET_READY
745  * If called with MDMN_SET_READY everything is initialized.
746  *
747  * If the set mutexes are already initialized, the caller has to hold
748  * both set_desc_rwlock[setno] and client_rwlock[setno] as a writer, before
749  * calling mdmn_init_set()
750  */
751 int
752 mdmn_init_set(set_t setno, int todo)
753 {
754 	int class;
755 	md_mnnode_desc	*node;
756 	md_set_desc	*sd; /* just an abbr for set_descriptor[setno] */
757 	mdsetname_t	*sp;
758 	md_error_t	ep = mdnullerror;
759 	md_mn_nodeid_t	nid;
760 
761 	/*
762 	 * Check if we are told to setup the mutexes and
763 	 * if these are not yet setup
764 	 */
765 	if ((todo & MDMN_SET_MUTEXES) &&
766 	    ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0)) {
767 		(void) mutex_init(&mdmn_busy_mutex[setno], USYNC_THREAD, NULL);
768 		(void) cond_init(&mdmn_busy_cv[setno], USYNC_THREAD, NULL);
769 		(void) rwlock_init(&client_rwlock[setno], USYNC_THREAD, NULL);
770 		(void) rwlock_init(&set_desc_rwlock[setno], USYNC_THREAD, NULL);
771 
772 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
773 			(void) mutex_init(mdmn_get_master_table_mx(setno,
774 			    class), USYNC_THREAD, NULL);
775 			(void) cond_init(mdmn_get_master_table_cv(setno, class),
776 			    USYNC_THREAD, NULL);
777 			(void) mutex_init(mdmn_get_initiator_table_mx(setno,
778 			    class), USYNC_THREAD, NULL);
779 		}
780 		md_mn_set_inited[setno] |= MDMN_SET_MUTEXES;
781 	}
782 	if ((todo & MDMN_SET_MCT) &&
783 	    ((md_mn_set_inited[setno] & MDMN_SET_MCT) == 0)) {
784 		int	fd;
785 		size_t	filesize;
786 		caddr_t	addr;
787 		char table_name[32];
788 		struct flock	fl;
789 
790 		filesize = (sizeof (md_mn_mct_t));
791 		(void) snprintf(table_name, sizeof (table_name), "%s%d",
792 		    MD_MN_MSG_COMP_TABLE, setno);
793 		/*
794 		 * If the mct file exists we map it into memory.
795 		 * Otherwise we create an empty file of appropriate
796 		 * size and map that into memory.
797 		 * The mapped areas are stored in mct[setno].
798 		 */
799 		fd = open(table_name, O_RDWR|O_CREAT|O_DSYNC, 0600);
800 		if (fd < 0) {
801 			commd_debug(MD_MMV_MISC,
802 			    "init_set: Can't open MCT\n");
803 			return (-1);
804 		}
805 		/*
806 		 * Ensure that we are the only process that has this file
807 		 * mapped. If another instance of rpc.mdcommd has beaten us
808 		 * then we display the failing process and attempt to terminate
809 		 * it. The next call of this routine should establish us as
810 		 * the only rpc.mdcommd on the system.
811 		 */
812 		(void) memset(&fl, 0, sizeof (fl));
813 		fl.l_type = F_WRLCK;
814 		fl.l_whence = SEEK_SET;
815 		fl.l_start = 0;
816 		fl.l_len = filesize + 1;
817 
818 		if (fcntl(fd, F_SETLK, &fl) == -1) {
819 			commd_debug(MD_MMV_SYSLOG,
820 			    "init_set: Cannot lock MCT '%s'\n", table_name);
821 			if (fcntl(fd, F_GETLK, &fl) != -1) {
822 				commd_debug(MD_MMV_SYSLOG, "rpc.mdcommd:"
823 				    "Process %d holds lock\n", fl.l_pid);
824 				(void) close(fd);
825 			} else {
826 				commd_debug(MD_MMV_SYSLOG, "rpc.mdcommd:"
827 				    "F_GETLK failed\n");
828 				(void) close(fd);
829 				return (-1);
830 			}
831 
832 			/*
833 			 * Try to terminate other mdcommd process so that we
834 			 * can establish ourselves.
835 			 */
836 			if (sigsend(P_PID, fl.l_pid, 0) == 0) {
837 				if (sigsend(P_PID, fl.l_pid, SIGKILL) < 0) {
838 					commd_debug(MD_MMV_SYSLOG,
839 					    "rpc.mdcommd:"
840 					    "SIGKILL of %d failed\n", fl.l_pid);
841 				} else {
842 					commd_debug(MD_MMV_SYSLOG,
843 					    "rpc.mdcommd:"
844 					    "Process %d killed\n", fl.l_pid);
845 				}
846 			} else {
847 				commd_debug(MD_MMV_SYSLOG, "rpc.mdcommd:"
848 				    "Process %d not killable\n", fl.l_pid);
849 			}
850 			return (-1);
851 		}
852 		/*
853 		 * To ensure that the file has the appropriate size,
854 		 * we write a byte at the end of the file.
855 		 */
856 		(void) lseek(fd, filesize + 1, SEEK_SET);
857 		(void) write(fd, "\0", 1);
858 
859 		/* at this point we have a file in place that we can mmap */
860 		addr = mmap(0, filesize, PROT_READ | PROT_WRITE,
861 		    MAP_SHARED, fd, (off_t)0);
862 		if (addr == MAP_FAILED) {
863 			commd_debug(MD_MMV_INIT,
864 			    "init_set: mmap mct error %d\n",
865 			    errno);
866 			return (-1);
867 		}
868 		/* LINTED pointer alignment */
869 		mct[setno] = (md_mn_mct_t *)addr;
870 
871 		/* finally we initialize the mutexes that protect the mct */
872 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
873 			(void) mutex_init(&(mct_mutex[setno][class]),
874 			    USYNC_THREAD, NULL);
875 		}
876 
877 		md_mn_set_inited[setno] |= MDMN_SET_MCT;
878 	}
879 	/*
880 	 * Check if we are told to setup the nodes and
881 	 * if these are not yet setup
882 	 * (Attention: negative logic here compared to above!)
883 	 */
884 	if (((todo & MDMN_SET_NODES) == 0) ||
885 	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
886 		return (0); /* success */
887 	}
888 
889 	if ((sp = metasetnosetname(setno, &ep)) == NULL) {
890 		commd_debug(MD_MMV_SYSLOG,
891 		    "metasetnosetname(%d) returned NULL\n", setno);
892 		return (MDMNE_NOT_JOINED);
893 	}
894 
895 	/* flush local copy of rpc.metad data */
896 	metaflushsetname(sp);
897 
898 	(void) mutex_lock(&get_setdesc_mutex);
899 	sd = metaget_setdesc(sp, &ep);
900 	(void) mutex_unlock(&get_setdesc_mutex);
901 
902 	if (sd == NULL) {
903 		commd_debug(MD_MMV_SYSLOG,
904 		    "metaget_setdesc(%d) returned NULL\n", setno);
905 		return (MDMNE_NOT_JOINED);
906 	}
907 
908 	/*
909 	 * if this set is not a multinode set or
910 	 * this node didn't join yet the diskset, better don't do anything
911 	 */
912 	if ((MD_MNSET_DESC(sd) == 0) ||
913 	    (sd->sd_mn_mynode->nd_flags & MD_MN_NODE_OWN) == 0) {
914 		commd_debug(MD_MMV_INIT, "didn't yet join set %d\n", setno);
915 		return (MDMNE_NOT_JOINED);
916 	}
917 
918 	for (node = sd->sd_nodelist; node != NULL; node = node->nd_next) {
919 		time_t	tout = 0;
920 		nid = node->nd_nodeid;
921 
922 		commd_debug(MD_MMV_INIT,
923 		    "setting up: node=%s, priv_ic=%s, flags=0x%x\n",
924 		    node->nd_nodename ? node->nd_nodename : "NULL",
925 		    node->nd_priv_ic ? node->nd_priv_ic : "NULL",
926 		    node->nd_flags);
927 
928 		if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
929 			commd_debug(MD_MMV_INIT,
930 			    "init: %s didn't join set %d\n",
931 			    node->nd_nodename ? node->nd_nodename : "NULL",
932 			    setno);
933 			continue;
934 		}
935 
936 		if (client[setno][nid] != (CLIENT *) NULL) {
937 			/* already inited */
938 			commd_debug(MD_MMV_INIT, "init: already: node=%s\n",
939 			    node->nd_nodename ? node->nd_nodename : "NULL");
940 			continue;
941 		}
942 
943 		/*
944 		 * While trying to create a connection to a node,
945 		 * periodically check to see if the node has been marked
946 		 * dead by the SunCluster infrastructure.
947 		 * This periodic check is needed since a non-responsive
948 		 * rpc.mdcommd (while it is attempting to create a connection
949 		 * to a dead node) can lead to large delays and/or failures
950 		 * in the reconfig steps.
951 		 */
952 		while ((client[setno][nid] == (CLIENT *) NULL) &&
953 		    (tout < MD_CLNT_CREATE_TOUT)) {
954 			client[setno][nid] = meta_client_create_retry(
955 			    node->nd_nodename, mdmn_clnt_create,
956 			    (void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
957 			/* Is the node dead? */
958 			if (mdmn_is_node_dead(node) == 1) {
959 				commd_debug(MD_MMV_SYSLOG,
960 				    "rpc.mdcommd: no client for dead node %s\n",
961 				    node->nd_nodename);
962 				break;
963 			} else
964 				tout += MD_CLNT_CREATE_SUBTIMEOUT;
965 		}
966 
967 		if (client[setno][nid] == (CLIENT *) NULL) {
968 			clnt_pcreateerror(node->nd_nodename);
969 			/*
970 			 * If we cannot connect to a single node
971 			 * (maybe because it is down) we mark this node as not
972 			 * owned and continue with the next node in the list.
973 			 * This is better than failing the entire starting up
974 			 * of the commd system.
975 			 */
976 			node->nd_flags &= ~MD_MN_NODE_OWN;
977 			commd_debug(MD_MMV_SYSLOG,
978 			    "WARNING couldn't create client for %s\n"
979 			    "Reconfig cycle required\n",
980 			    node->nd_nodename);
981 			commd_debug(MD_MMV_INIT,
982 			    "WARNING couldn't create client for %s\n"
983 			    "Reconfig cycle required\n",
984 			    node->nd_nodename);
985 			continue;
986 		}
987 		/* this node has the license to send */
988 		commd_debug(MD_MMV_MISC, "init_set: calling add_lic\n");
989 		add_license(node);
990 
991 		/* set the timeout value */
992 		clnt_control(client[setno][nid], CLSET_TIMEOUT,
993 		    (char *)&FOUR_SECS);
994 
995 		commd_debug(MD_MMV_INIT, "init: done: node=%s\n",
996 		    node->nd_nodename ? node->nd_nodename : "NULL");
997 	}
998 
999 	set_descriptor[setno] = sd;
1000 	md_mn_set_inited[setno] |= MDMN_SET_NODES;
1001 	return (0); /* success */
1002 }
1003 
1004 void *
1005 mdmn_send_to_work(void *arg)
1006 {
1007 	int			*rpc_err = NULL;
1008 	int			success;
1009 	int			try_master;
1010 	set_t			setno;
1011 	mutex_t			*mx;	/* protection for initiator_table */
1012 	SVCXPRT			*transp;
1013 	md_mn_msg_t		*msg;
1014 	md_mn_nodeid_t		set_master;
1015 	md_mn_msgclass_t	class;
1016 	md_mn_msg_and_transp_t	*matp = (md_mn_msg_and_transp_t *)arg;
1017 
1018 	msg			= matp->mat_msg;
1019 	transp			= matp->mat_transp;
1020 
1021 	class = mdmn_get_message_class(msg->msg_type);
1022 	setno = msg->msg_setno;
1023 
1024 	/* set the sender, so the master knows who to send the results */
1025 	(void) rw_rdlock(&set_desc_rwlock[setno]);
1026 	msg->msg_sender = set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
1027 	set_master	= set_descriptor[setno]->sd_mn_master_nodeid;
1028 
1029 	mx = mdmn_get_initiator_table_mx(setno, class);
1030 	(void) mutex_lock(mx);
1031 
1032 	/*
1033 	 * Here we check, if the initiator table slot for this set/class
1034 	 * combination is free to use.
1035 	 * If this is not the case, we return CLASS_BUSY forcing the
1036 	 * initiating send_message call to retry
1037 	 */
1038 	success = mdmn_check_initiator_table(setno, class);
1039 	if (success == MDMNE_CLASS_BUSY) {
1040 		md_mn_msgid_t		active_mid;
1041 
1042 		mdmn_get_initiator_table_id(setno, class, &active_mid);
1043 
1044 		commd_debug(MD_MMV_SEND,
1045 		    "send_to_work: received but locally busy "
1046 		    "(%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
1047 		    "active msg=(%d, 0x%llx-%d)\n",
1048 		    MSGID_ELEMS(msg->msg_msgid), setno, class,
1049 		    msg->msg_type, MSGID_ELEMS(active_mid));
1050 	} else {
1051 		commd_debug(MD_MMV_SEND,
1052 		    "send_to_work: received (%d, 0x%llx-%d), "
1053 		    "set=%d, class=%d, type=%d\n",
1054 		    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
1055 	}
1056 
1057 	try_master = 2; /* return failure after two retries */
1058 	while ((success == MDMNE_ACK) && (try_master--)) {
1059 		(void) rw_rdlock(&client_rwlock[setno]);
1060 		/* is the rpc client to the master still around ? */
1061 		if (check_client(setno, set_master)) {
1062 			success = MDMNE_RPC_FAIL;
1063 			FLUSH_DEBUGFILE();
1064 			(void) rw_unlock(&client_rwlock[setno]);
1065 			break; /* out of try_master-loop */
1066 		}
1067 
1068 		/*
1069 		 * Send the request to the work function on the master
1070 		 * this call will return immediately
1071 		 */
1072 		rpc_err = mdmn_work_2(msg, client[setno][set_master],
1073 		    set_master);
1074 
1075 		/* Everything's Ok? */
1076 		if (rpc_err == NULL) {
1077 			success = MDMNE_RPC_FAIL;
1078 			/*
1079 			 * Probably something happened to the daemon on the
1080 			 * master. Kill the client, and try again...
1081 			 */
1082 			(void) rw_unlock(&client_rwlock[setno]);
1083 			(void) rw_wrlock(&client_rwlock[setno]);
1084 			mdmn_clnt_destroy(client[setno][set_master]);
1085 			if (client[setno][set_master] != (CLIENT *)NULL) {
1086 				client[setno][set_master] = (CLIENT *)NULL;
1087 			}
1088 			(void) rw_unlock(&client_rwlock[setno]);
1089 			continue;
1090 
1091 		} else  if (*rpc_err != MDMNE_ACK) {
1092 			/* something went wrong, break out */
1093 			success = *rpc_err;
1094 			free(rpc_err);
1095 			(void) rw_unlock(&client_rwlock[setno]);
1096 			break; /* out of try_master-loop */
1097 		}
1098 
1099 		(void) rw_unlock(&client_rwlock[setno]);
1100 		free(rpc_err);
1101 
1102 		/*
1103 		 * If we are here, we sucessfully delivered the message.
1104 		 * We register the initiator_table, so that
1105 		 * wakeup_initiator_2 can do the sendreply with the
1106 		 * results for us.
1107 		 */
1108 		success = MDMNE_ACK;
1109 		mdmn_register_initiator_table(setno, class, msg, transp);
1110 
1111 		/* tell check_timeouts, there's work to do */
1112 		(void) mutex_lock(&check_timeout_mutex);
1113 		messages_on_their_way++;
1114 		(void) cond_signal(&check_timeout_cv);
1115 		(void) mutex_unlock(&check_timeout_mutex);
1116 		break; /* out of try_master-loop */
1117 	}
1118 
1119 	(void) rw_unlock(&set_desc_rwlock[setno]);
1120 
1121 	if (success == MDMNE_ACK) {
1122 		commd_debug(MD_MMV_SEND,
1123 		    "send_to_work: registered (%d, 0x%llx-%d)\n",
1124 		    MSGID_ELEMS(msg->msg_msgid));
1125 	} else {
1126 		/* In case of failure do the sendreply now */
1127 		md_mn_result_t *resultp;
1128 		resultp = Zalloc(sizeof (md_mn_result_t));
1129 		resultp->mmr_comm_state = success;
1130 		/*
1131 		 * copy the MSGID so that we know _which_ message
1132 		 * failed (if the transp has got mangled)
1133 		 */
1134 		MSGID_COPY(&(msg->msg_msgid), &(resultp->mmr_msgid));
1135 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
1136 		commd_debug(MD_MMV_SEND,
1137 		    "send_to_work: not registered (%d, 0x%llx-%d) cs=%d\n",
1138 		    MSGID_ELEMS(msg->msg_msgid), success);
1139 		free_result(resultp);
1140 		/*
1141 		 * We don't have a timeout registered to wake us up, so we're
1142 		 * now done with this handle. Release it back to the pool.
1143 		 */
1144 		svc_done(transp);
1145 
1146 	}
1147 
1148 	free_msg(msg);
1149 	/* the alloc was done in mdmn_send_svc_2 */
1150 	Free(matp);
1151 	(void) mutex_unlock(mx);
1152 	return (NULL);
1153 
1154 }
1155 
1156 /*
1157  * do_message_locally(msg, result)
1158  * Process a message locally on the master
1159  * Lookup the MCT if the message has already been processed.
1160  * If not, call the handler and store the result
1161  * If yes, retrieve the result from the MCT.
1162  * Return:
1163  *	MDMNE_ACK in case of success
1164  *	MDMNE_LOG_FAIL if the MCT could not be checked
1165  */
1166 static int
1167 do_message_locally(md_mn_msg_t *msg, md_mn_result_t *result)
1168 {
1169 	int			completed;
1170 	set_t			setno;
1171 	md_mn_msgtype_t		msgtype = msg->msg_type;
1172 	md_mn_msgclass_t	class;
1173 
1174 	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1175 
1176 	handler = mdmn_get_handler(msgtype);
1177 	if (handler == NULL) {
1178 		result->mmr_exitval = 0;
1179 		/* let the sender decide if this is an error or not */
1180 		result->mmr_comm_state = MDMNE_NO_HANDLER;
1181 		return (MDMNE_NO_HANDLER);
1182 	}
1183 
1184 	class = mdmn_get_message_class(msg->msg_type);
1185 	setno = msg->msg_setno;
1186 
1187 	result->mmr_msgtype	= msgtype;
1188 	result->mmr_flags	= msg->msg_flags;
1189 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1190 
1191 	(void) mutex_lock(&mct_mutex[setno][class]);
1192 	completed = mdmn_check_completion(msg, result);
1193 	if (completed == MDMN_MCT_NOT_DONE) {
1194 		/* message not yet processed locally */
1195 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1196 		    "calling handler for (%d,0x%llx-%d) type %d\n",
1197 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1198 
1199 		/*
1200 		 * Mark the message as being currently processed,
1201 		 * so we won't start a second handler for it
1202 		 */
1203 		(void) mdmn_mark_completion(msg, NULL, MDMN_MCT_IN_PROGRESS);
1204 		(void) mutex_unlock(&mct_mutex[setno][class]);
1205 
1206 		/* here we actually process the message on the master */
1207 		(*handler)(msg, MD_MSGF_ON_MASTER, result);
1208 
1209 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1210 		    "finished handler for (%d,0x%llx-%d) type %d\n",
1211 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1212 
1213 		/* Mark the message as fully processed, store the result */
1214 		(void) mutex_lock(&mct_mutex[setno][class]);
1215 		(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
1216 	} else if (completed == MDMN_MCT_DONE) {
1217 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1218 		    "result for (%d, 0x%llx-%d) from MCT\n",
1219 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1220 	} else if (completed == MDMN_MCT_IN_PROGRESS) {
1221 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1222 		    "(%d, 0x%llx-%d) is currently being processed\n",
1223 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1224 	} else {
1225 		/* MCT error occurred (should never happen) */
1226 		(void) mutex_unlock(&mct_mutex[setno][class]);
1227 		result->mmr_comm_state = MDMNE_LOG_FAIL;
1228 		commd_debug(MD_MMV_SYSLOG, "WARNING "
1229 		    "mdmn_check_completion returned %d "
1230 		    "for (%d,0x%llx-%d)\n", completed,
1231 		    MSGID_ELEMS(msg->msg_msgid));
1232 		return (MDMNE_LOG_FAIL);
1233 	}
1234 	(void) mutex_unlock(&mct_mutex[setno][class]);
1235 	return (MDMNE_ACK);
1236 
1237 }
1238 
1239 /*
1240  * do_send_message(msg, node)
1241  *
1242  * Send a message to a given node and wait for a acknowledgment, that the
1243  * message has arrived on the remote node.
1244  * Make sure that the client for the set is setup correctly.
1245  * If no ACK arrives, destroy and recreate the RPC client and retry the
1246  * message one time
1247  * After actually sending wait no longer than the appropriate number of
1248  * before timing out the message.
1249  *
1250  * Note must be called with set_desc_wrlock held in reader mode
1251  */
1252 static int
1253 do_send_message(md_mn_msg_t *msg, md_mnnode_desc *node)
1254 {
1255 	int			err;
1256 	int			rpc_retries;
1257 	int			timeout_retries = 0;
1258 	int			*ret = NULL;
1259 	set_t			setno;
1260 	cond_t			*cv;	/* see mdmn_wakeup_master_svc_2 */
1261 	mutex_t			*mx;	/* protection for class_busy */
1262 	timestruc_t		timeout; /* surveillance for remote daemon */
1263 	md_mn_nodeid_t		nid;
1264 	md_mn_msgtype_t		msgtype;
1265 	md_mn_msgclass_t	class;
1266 
1267 	nid	= node->nd_nodeid;
1268 	msgtype = msg->msg_type;
1269 	setno	= msg->msg_setno;
1270 	class	= mdmn_get_message_class(msgtype);
1271 	mx	= mdmn_get_master_table_mx(setno, class);
1272 	cv	= mdmn_get_master_table_cv(setno, class);
1273 
1274 retry_rpc:
1275 
1276 	/* We try two times to send the message */
1277 	rpc_retries = 2;
1278 
1279 	/*
1280 	 * if sending the message doesn't succeed the first time due to a
1281 	 * RPC problem, we retry one time
1282 	 */
1283 	while ((rpc_retries != 0) && (ret == NULL)) {
1284 		/*  in abort state, we error out immediately */
1285 		if (md_commd_global_state & MD_CGS_ABORTED) {
1286 			return (MDMNE_ABORT);
1287 		}
1288 
1289 		(void) rw_rdlock(&client_rwlock[setno]);
1290 		/* unable to create client? Ignore it */
1291 		if (check_client(setno, nid)) {
1292 			/*
1293 			 * In case we cannot establish an RPC client, we
1294 			 * take this node out of our considerations.
1295 			 * This will be reset by a reconfig
1296 			 * cycle that should come pretty soon.
1297 			 * MNISSUE: Should a reconfig cycle
1298 			 * be forced on SunCluster?
1299 			 */
1300 			node->nd_flags &= ~MD_MN_NODE_OWN;
1301 			commd_debug(MD_MMV_SYSLOG,
1302 			    "WARNING couldn't create client for %s\n"
1303 			    "Reconfig cycle required\n",
1304 			    node->nd_nodename);
1305 			commd_debug(MD_MMV_PROC_M, "proc_mas: (%d,0x%llx-%d) "
1306 			    "WARNING couldn't create client for %s\n",
1307 			    MSGID_ELEMS(msg->msg_msgid), node->nd_nodename);
1308 			(void) rw_unlock(&client_rwlock[setno]);
1309 			return (MDMNE_IGNORE_NODE);
1310 		}
1311 		/* let's be paranoid and check again before sending */
1312 		if (client[setno][nid] == NULL) {
1313 			/*
1314 			 * if this is true, strange enough, we catch our breath,
1315 			 * and then continue, so that the client is set up
1316 			 * once again.
1317 			 */
1318 			commd_debug(MD_MMV_PROC_M, "client is NULL\n");
1319 			(void) rw_unlock(&client_rwlock[setno]);
1320 			(void) sleep(1);
1321 			continue;
1322 		}
1323 
1324 		/* send it over, it will return immediately */
1325 		ret = mdmn_work_2(msg, client[setno][nid], nid);
1326 
1327 		(void) rw_unlock(&client_rwlock[setno]);
1328 
1329 		if (ret != NULL) {
1330 			commd_debug(MD_MMV_PROC_M,
1331 			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1332 			    " 0x%x\n",
1333 			    MSGID_ELEMS(msg->msg_msgid), nid, *ret);
1334 		} else {
1335 			commd_debug(MD_MMV_PROC_M,
1336 			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1337 			    " NULL \n",
1338 			    MSGID_ELEMS(msg->msg_msgid), nid);
1339 		}
1340 
1341 		if ((ret == NULL) || (*ret == MDMNE_CANNOT_CONNECT) ||
1342 		    (*ret == MDMNE_THR_CREATE_FAIL)) {
1343 			/*
1344 			 * Something happened to the daemon on the other side.
1345 			 * Kill the client, and try again.
1346 			 * check_client() will create a new client
1347 			 */
1348 			(void) rw_wrlock(&client_rwlock[setno]);
1349 			mdmn_clnt_destroy(client[setno][nid]);
1350 			if (client[setno][nid] != (CLIENT *)NULL) {
1351 				client[setno][nid] = (CLIENT *)NULL;
1352 			}
1353 			(void) rw_unlock(&client_rwlock[setno]);
1354 
1355 			/* ... but don't try infinitely */
1356 			--rpc_retries;
1357 			continue;
1358 		}
1359 		/*
1360 		 * If the class is locked on the other node, keep trying.
1361 		 * This situation will go away automatically,
1362 		 * if we wait long enough
1363 		 */
1364 		if (*ret == MDMNE_CLASS_LOCKED) {
1365 			(void) sleep(1);
1366 			free(ret);
1367 			ret = NULL;
1368 			continue;
1369 		}
1370 	}
1371 	if (ret == NULL) {
1372 		return (MDMNE_RPC_FAIL);
1373 	}
1374 
1375 
1376 	/* if the slave is in abort state, we just ignore it. */
1377 	if (*ret == MDMNE_ABORT) {
1378 		commd_debug(MD_MMV_PROC_M,
1379 		    "proc_mas: work(%d,0x%llx-%d) returned "
1380 		    "MDMNE_ABORT\n",
1381 		    MSGID_ELEMS(msg->msg_msgid));
1382 		free(ret);
1383 		return (MDMNE_IGNORE_NODE);
1384 	}
1385 
1386 	/* Did the remote processing succeed? */
1387 	if (*ret != MDMNE_ACK) {
1388 		/*
1389 		 * Some commd failure in the middle of sending the msg
1390 		 * to the nodes. We don't continue here.
1391 		 */
1392 		commd_debug(MD_MMV_PROC_M,
1393 		    "proc_mas: work(%d,0x%llx-%d) returns %d\n",
1394 		    MSGID_ELEMS(msg->msg_msgid), *ret);
1395 		free(ret);
1396 		return (MDMNE_RPC_FAIL);
1397 	}
1398 	free(ret);
1399 	ret = NULL;
1400 
1401 	/*
1402 	 * When we are here, we have sent the message to the other node and
1403 	 * we know that node has accepted it.
1404 	 * We go to sleep and have trust to be woken up by wakeup.
1405 	 * If we wakeup due to a timeout, or a signal, no result has been
1406 	 * placed in the appropriate slot.
1407 	 * If we timeout, it is likely that this is because the node has
1408 	 * gone away, so we will destroy the client and try it again in the
1409 	 * expectation that the rpc will fail and we will return
1410 	 * MDMNE_IGNORE_NODE. If that is not the case, the message must still
1411 	 * be being processed on the slave. In this case just timeout for 4
1412 	 * more seconds and then return RPC_FAIL if the message is not complete.
1413 	 */
1414 	timeout.tv_nsec = 0;
1415 	timeout.tv_sec = (timeout_retries == 0) ? mdmn_get_timeout(msgtype) :
1416 	    FOUR_SECS.tv_sec;
1417 	err = cond_reltimedwait(cv, mx, &timeout);
1418 
1419 	if (err == 0) {
1420 		/* everything's fine, return success */
1421 		return (MDMNE_ACK);
1422 	}
1423 
1424 	if (err == ETIME) {
1425 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1426 		    "timeout occured, set=%d, class=%d, "
1427 		    "msgid=(%d, 0x%llx-%d), timeout_retries=%d\n",
1428 		    setno, class, MSGID_ELEMS(msg->msg_msgid), timeout_retries);
1429 		if (timeout_retries == 0) {
1430 			timeout_retries++;
1431 			/*
1432 			 * Destroy the client and try the rpc call again
1433 			 */
1434 			(void) rw_wrlock(&client_rwlock[setno]);
1435 			mdmn_clnt_destroy(client[setno][nid]);
1436 			client[setno][nid] = (CLIENT *)NULL;
1437 			(void) rw_unlock(&client_rwlock[setno]);
1438 			goto retry_rpc;
1439 		}
1440 	} else if (err == EINTR) {
1441 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1442 		    "commd signalled, set=%d, class=%d, "
1443 		    "msgid=(%d, 0x%llx-%d)\n",
1444 		    setno, class, MSGID_ELEMS(msg->msg_msgid));
1445 	} else {
1446 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1447 		    "cond_reltimedwait err=%d, set=%d, "
1448 		    "class=%d, msgid=(%d, 0x%llx-%d)\n",
1449 		    err, setno, class,
1450 		    MSGID_ELEMS(msg->msg_msgid));
1451 	}
1452 
1453 	/* some failure happened */
1454 	return (MDMNE_RPC_FAIL);
1455 }
1456 
1457 /*
1458  * before we return we have to
1459  * free_msg(msg); because we are working on a copied message
1460  */
1461 void
1462 mdmn_master_process_msg(md_mn_msg_t *msg)
1463 {
1464 	int		*ret;
1465 	int		err;
1466 	int		nmsgs;		/* total number of msgs */
1467 	int		curmsg;		/* index of current msg */
1468 	set_t		setno;
1469 	uint_t		inherit_flags = 0;
1470 	uint_t		secdiff, usecdiff; /* runtime of this message */
1471 	md_error_t	mde = mdnullerror;
1472 	md_mn_msg_t	*msglist[MAX_SUBMESSAGES]; /* all msgs to process */
1473 	md_mn_msg_t	*cmsg;		/* current msg */
1474 	md_mn_msgid_t	dummyid;
1475 	md_mn_result_t	*result;
1476 	md_mn_result_t	*slave_result;
1477 	md_mn_nodeid_t	sender;
1478 	md_mn_nodeid_t	set_master;
1479 	md_mnnode_desc	*node;
1480 	md_mn_msgtype_t	orig_type;	/* type of the original message */
1481 	md_mn_msgtype_t	msgtype;	/* type of the current message */
1482 	md_mn_msgclass_t orig_class;	/* class of the original message */
1483 	md_mn_msgclass_t class;		/* class of the current message */
1484 
1485 	int (*smgen)(md_mn_msg_t *msg, md_mn_msg_t **msglist);
1486 
1487 	orig_type = msgtype = msg->msg_type;
1488 	sender	= msg->msg_sender;
1489 	setno	= msg->msg_setno;
1490 
1491 	result = Zalloc(sizeof (md_mn_result_t));
1492 	result->mmr_setno	= setno;
1493 	result->mmr_msgtype	= msgtype;
1494 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1495 
1496 	orig_class = mdmn_get_message_class(msgtype);
1497 
1498 	commd_debug(MD_MMV_PROC_M,
1499 	    "proc_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1500 	    MSGID_ELEMS(msg->msg_msgid), setno, orig_class, msgtype);
1501 
1502 	(void) rw_rdlock(&set_desc_rwlock[setno]);
1503 	set_master = set_descriptor[setno]->sd_mn_master_nodeid;
1504 	result->mmr_sender	= set_master;
1505 	/*
1506 	 * Put message into the change log unless told otherwise
1507 	 * Note that we only log original messages.
1508 	 * If they are generated by some smgen, we don't log them!
1509 	 * Replay messages aren't logged either.
1510 	 * Note, that replay messages are unlogged on completion.
1511 	 */
1512 	if ((msg->msg_flags & (MD_MSGF_NO_LOG | MD_MSGF_REPLAY_MSG)) == 0) {
1513 		commd_debug(MD_MMV_PROC_M,
1514 		    "proc_mas: calling log_msg for (%d,0x%llx-%d) type %d\n",
1515 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1516 		err = mdmn_log_msg(msg);
1517 		if (err == MDMNE_NULL) {
1518 			/* msg logged successfully */
1519 			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1520 			    "done log_msg for (%d,0x%llx-%d) type %d\n",
1521 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1522 			goto proceed;
1523 		}
1524 		if (err == MDMNE_ACK) {
1525 			/* Same msg in the slot, proceed */
1526 			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1527 			    "already logged (%d,0x%llx-%d) type %d\n",
1528 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1529 			goto proceed;
1530 		}
1531 		if (err == MDMNE_LOG_FAIL) {
1532 			/* Oh, bad, the log is non functional. */
1533 			result->mmr_comm_state = MDMNE_LOG_FAIL;
1534 			/*
1535 			 * Note that the mark_busy was already done by
1536 			 * mdmn_work_svc_2()
1537 			 */
1538 			(void) mutex_lock(&mdmn_busy_mutex[setno]);
1539 			mdmn_mark_class_unbusy(setno, orig_class);
1540 			(void) mutex_unlock(&mdmn_busy_mutex[setno]);
1541 
1542 		}
1543 		if (err == MDMNE_CLASS_BUSY) {
1544 			/*
1545 			 * The log is occupied with a different message
1546 			 * that needs to be played first.
1547 			 * We reject the current message with MDMNE_CLASS_BUSY
1548 			 * to the initiator and do not unbusy the set/class,
1549 			 * because we will proceed with the logged message,
1550 			 * which has the same set/class combination
1551 			 */
1552 			result->mmr_comm_state = MDMNE_CLASS_BUSY;
1553 		}
1554 		ret = (int *)NULL;
1555 		(void) rw_rdlock(&client_rwlock[setno]);
1556 
1557 		if (check_client(setno, sender)) {
1558 			commd_debug(MD_MMV_SYSLOG,
1559 			    "proc_mas: No client for initiator \n");
1560 		} else {
1561 			ret = mdmn_wakeup_initiator_2(result,
1562 			    client[setno][sender], sender);
1563 		}
1564 		(void) rw_unlock(&client_rwlock[setno]);
1565 
1566 		if (ret == (int *)NULL) {
1567 			commd_debug(MD_MMV_SYSLOG,
1568 			    "proc_mas: couldn't wakeup_initiator \n");
1569 		} else {
1570 			if (*ret != MDMNE_ACK) {
1571 				commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1572 				    "wakeup_initiator returned %d\n", *ret);
1573 			}
1574 			free(ret);
1575 		}
1576 		free_msg(msg);
1577 
1578 		if (err == MDMNE_LOG_FAIL) {
1579 			/* we can't proceed here */
1580 			free_result(result);
1581 			(void) rw_unlock(&set_desc_rwlock[setno]);
1582 			return;
1583 		} else if (err == MDMNE_CLASS_BUSY) {
1584 			mdmn_changelog_record_t *lr;
1585 			lr = mdmn_get_changelogrec(setno, orig_class);
1586 			assert(lr != NULL);
1587 
1588 			/* proceed with the logged message */
1589 			msg = copy_msg(&(lr->lr_msg), NULL);
1590 
1591 			/*
1592 			 * The logged message has to have the same class but
1593 			 * type and sender can be different
1594 			 */
1595 			orig_type = msgtype = msg->msg_type;
1596 			sender	= msg->msg_sender;
1597 
1598 			commd_debug(MD_MMV_PROC_M,
1599 			    "proc_mas: Got new message from change log: "
1600 			    "(%d,0x%llx-%d) type %d\n",
1601 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1602 
1603 			/* continue normal operation with this message */
1604 		}
1605 	}
1606 
1607 proceed:
1608 	smgen = mdmn_get_submessage_generator(msgtype);
1609 	if (smgen == NULL) {
1610 		/* no submessages to create, just use the original message */
1611 		msglist[0] = msg;
1612 		nmsgs = 1;
1613 	} else {
1614 		/* some bits are passed on to submessages */
1615 		inherit_flags = msg->msg_flags & MD_MSGF_INHERIT_BITS;
1616 
1617 		nmsgs = smgen(msg, msglist);
1618 
1619 		/* some settings for the submessages */
1620 		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1621 			cmsg    = msglist[curmsg];
1622 
1623 			/* Apply the inherited flags */
1624 			cmsg->msg_flags |= inherit_flags;
1625 
1626 			/*
1627 			 * Make sure the submessage ID is set correctly
1628 			 * Note: first submessage has mid_smid of 1 (not 0)
1629 			 */
1630 			cmsg->msg_msgid.mid_smid = curmsg + 1;
1631 
1632 			/* need the original class set in msgID (for MCT) */
1633 			cmsg->msg_msgid.mid_oclass = orig_class;
1634 		}
1635 
1636 		commd_debug(MD_MMV_PROC_M,
1637 		    "smgen generated %d submsgs, origclass = %d\n",
1638 		    nmsgs, orig_class);
1639 	}
1640 	/*
1641 	 * This big loop does the following.
1642 	 * For all messages:
1643 	 *	process message on the master first (a message completion
1644 	 *		table MCT ensures a message is not processed twice)
1645 	 *	in case of an error break out of message loop
1646 	 *	for all nodes -- unless MD_MSGF_NO_BCAST is set --
1647 	 *		send message to node until that succeeds
1648 	 *		merge result -- not yet implemented
1649 	 *		respect MD_MSGF_STOP_ON_ERROR
1650 	 */
1651 	for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1652 		int	break_msg_loop = 0;
1653 		mutex_t	*mx;		/* protection for class_busy */
1654 		int	master_err;
1655 		int	master_exitval = -1;
1656 
1657 		cmsg	= msglist[curmsg];
1658 		msgtype = cmsg->msg_type;
1659 		class	= mdmn_get_message_class(msgtype);
1660 		node	= NULL;
1661 		mx	= mdmn_get_master_table_mx(setno, class);
1662 
1663 		/* If we are in the abort state, we error out immediately */
1664 		if (md_commd_global_state & MD_CGS_ABORTED) {
1665 			break; /* out of the message loop */
1666 		}
1667 
1668 		commd_debug(MD_MMV_PROC_M, "class=%d, orig_class=%d\n",
1669 		    class, orig_class);
1670 		/*
1671 		 * If the current class is different from the original class,
1672 		 * we have to lock it down.
1673 		 * The original class is already marked busy.
1674 		 * At this point we cannot refuse the message because the
1675 		 * class is busy right now, so we wait until the class becomes
1676 		 * available again. As soon as something changes for this set
1677 		 * we will be cond_signal'ed (in mdmn_mark_class_unbusy)
1678 		 *
1679 		 * Granularity could be finer (setno/class)
1680 		 */
1681 		if (class != orig_class) {
1682 			(void) mutex_lock(&mdmn_busy_mutex[setno]);
1683 			while (mdmn_mark_class_busy(setno, class) == FALSE) {
1684 				(void) cond_wait(&mdmn_busy_cv[setno],
1685 				    &mdmn_busy_mutex[setno]);
1686 			}
1687 			(void) mutex_unlock(&mdmn_busy_mutex[setno]);
1688 		}
1689 
1690 		master_err = do_message_locally(cmsg, result);
1691 
1692 		if ((master_err != MDMNE_ACK) ||
1693 		    ((master_err == MDMNE_ACK) && (result->mmr_exitval != 0))) {
1694 			result->mmr_failing_node = set_master;
1695 			if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1696 				/*
1697 				 * if appropriate, unbusy the class and
1698 				 * break out of the message loop
1699 				 */
1700 				if (class != orig_class) {
1701 					(void) mutex_lock(
1702 					    &mdmn_busy_mutex[setno]);
1703 					mdmn_mark_class_unbusy(setno, class);
1704 					(void) mutex_unlock(
1705 					    &mdmn_busy_mutex[setno]);
1706 				}
1707 				break;
1708 			}
1709 		}
1710 
1711 		if (master_err == MDMNE_ACK)
1712 			master_exitval = result->mmr_exitval;
1713 
1714 		/* No broadcast? => next message */
1715 		if (cmsg->msg_flags & MD_MSGF_NO_BCAST) {
1716 			/* if appropriate, unbusy the class */
1717 			if (class != orig_class) {
1718 				(void) mutex_lock(&mdmn_busy_mutex[setno]);
1719 				mdmn_mark_class_unbusy(setno, class);
1720 				(void) mutex_unlock(&mdmn_busy_mutex[setno]);
1721 			}
1722 			continue;
1723 		}
1724 
1725 
1726 		/* fake sender, so we get notified when the results are avail */
1727 		cmsg->msg_sender = set_master;
1728 		/*
1729 		 * register to the master_table. It's needed by wakeup_master to
1730 		 * wakeup the sleeping thread.
1731 		 * Access is protected by the class lock: mdmn_mark_class_busy()
1732 		 */
1733 		mdmn_set_master_table_id(setno, class, &(cmsg->msg_msgid));
1734 
1735 
1736 
1737 		(void) rw_rdlock(&set_desc_rwlock[setno]);
1738 		/* Send the message  to all other nodes */
1739 		for (node = set_descriptor[setno]->sd_nodelist; node;
1740 		    node = node->nd_next) {
1741 			md_mn_nodeid_t nid = node->nd_nodeid;
1742 
1743 			/* We are master and have already processed the msg */
1744 			if (node == set_descriptor[setno]->sd_mn_masternode) {
1745 				continue;
1746 			}
1747 
1748 			/* If this node didn't join the disk set, ignore it */
1749 			if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
1750 				continue;
1751 			}
1752 
1753 			/* If a DIRECTED message, skip non-recipient nodes */
1754 			if ((cmsg->msg_flags & MD_MSGF_DIRECTED) &&
1755 			    nid != cmsg->msg_recipient) {
1756 				continue;
1757 			}
1758 
1759 			(void) mutex_lock(mx);
1760 			/*
1761 			 * Register the node that is addressed,
1762 			 * so we can detect unsolicited messages
1763 			 */
1764 			mdmn_set_master_table_addr(setno, class, nid);
1765 			slave_result = (md_mn_result_t *)NULL;
1766 
1767 			/*
1768 			 * Now send it. do_send_message() will return if
1769 			 *	a failure occurs or
1770 			 *	the results are available
1771 			 */
1772 			err = do_send_message(cmsg, node);
1773 
1774 			/*  in abort state, we error out immediately */
1775 			if (md_commd_global_state & MD_CGS_ABORTED) {
1776 				break;
1777 			}
1778 
1779 			if (err == MDMNE_ACK) {
1780 				slave_result =
1781 				    mdmn_get_master_table_res(setno, class);
1782 				commd_debug(MD_MMV_PROC_M,
1783 				    "proc_mas: got result for (%d,0x%llx-%d)\n",
1784 				    MSGID_ELEMS(cmsg->msg_msgid));
1785 			} else if (err == MDMNE_IGNORE_NODE) {
1786 				(void) mutex_unlock(mx);
1787 				continue; /* send to next node */
1788 			}
1789 			(void) mutex_unlock(mx);
1790 
1791 
1792 			/*
1793 			 * If the result is NULL, or err doesn't show success,
1794 			 * something went wrong with this RPC call.
1795 			 */
1796 			if ((slave_result == NULL) || (err != MDMNE_ACK)) {
1797 				/*
1798 				 * If PANIC_WHEN_INCONSISTENT set,
1799 				 * panic if the master succeeded while
1800 				 * this node failed
1801 				 */
1802 				if ((cmsg->msg_flags &
1803 				    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1804 				    (master_err == MDMNE_ACK))
1805 					panic_system(nid, cmsg->msg_type,
1806 					    master_err, master_exitval,
1807 					    slave_result);
1808 
1809 				result->mmr_failing_node = nid;
1810 				/* are we supposed to stop in case of error? */
1811 				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1812 					result->mmr_exitval = MDMNE_RPC_FAIL;
1813 					commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1814 					    "result (%d,0x%llx-%d) is NULL\n",
1815 					    MSGID_ELEMS(cmsg->msg_msgid));
1816 					FLUSH_DEBUGFILE();
1817 					break_msg_loop = 1;
1818 					break; /* out of node loop first */
1819 				} else {
1820 					/* send msg to the next node */
1821 					continue;
1822 				}
1823 
1824 			}
1825 
1826 			/*
1827 			 * Message processed on remote node.
1828 			 * If PANIC_WHEN_INCONSISTENT set, panic if the
1829 			 * result is different on this node from the result
1830 			 * on the master
1831 			 */
1832 			if ((cmsg->msg_flags &
1833 			    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1834 			    ((master_err != MDMNE_ACK) ||
1835 			    (slave_result->mmr_exitval != master_exitval)))
1836 				panic_system(nid, cmsg->msg_type, master_err,
1837 				    master_exitval, slave_result);
1838 
1839 			/*
1840 			 * At this point we know we have a message that was
1841 			 * processed on the remote node.
1842 			 * We now check if the exitval is non zero.
1843 			 * In that case we discard the previous result and
1844 			 * rather use the current.
1845 			 * This means: If a message fails on no node,
1846 			 * the result from the master will be returned.
1847 			 * There's currently no such thing as merge of results
1848 			 * If additionally STOP_ON_ERROR is set, we bail out
1849 			 */
1850 			if (slave_result->mmr_exitval != 0) {
1851 				/* throw away the previously allocated result */
1852 				free_result(result);
1853 
1854 				/* copy_result() allocates new memory */
1855 				result = copy_result(slave_result);
1856 				free_result(slave_result);
1857 
1858 				dump_result(MD_MMV_PROC_M, "proc_mas", result);
1859 
1860 				result->mmr_failing_node = nid;
1861 				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1862 					break_msg_loop = 1;
1863 					break; /* out of node loop */
1864 				}
1865 				continue; /* try next node */
1866 
1867 			} else {
1868 				/*
1869 				 * MNIssue: may want to merge the results
1870 				 * from all slaves.  Currently only report
1871 				 * the results from the master.
1872 				 */
1873 				free_result(slave_result);
1874 			}
1875 
1876 		} /* End of loop over the nodes */
1877 		(void) rw_unlock(&set_desc_rwlock[setno]);
1878 
1879 
1880 		/* release the current class again */
1881 		if (class != orig_class) {
1882 			(void) mutex_lock(&mdmn_busy_mutex[setno]);
1883 			mdmn_mark_class_unbusy(setno, class);
1884 			(void) mutex_unlock(&mdmn_busy_mutex[setno]);
1885 		}
1886 
1887 		/* are we supposed to quit entirely ? */
1888 		if (break_msg_loop ||
1889 		    (md_commd_global_state & MD_CGS_ABORTED)) {
1890 			break; /* out of msg loop */
1891 		}
1892 
1893 	} /* End of loop over the messages */
1894 	/*
1895 	 * If we are here, there's two possibilities:
1896 	 * 	- we processed all messages on all nodes without an error.
1897 	 *	    In this case we return the result from the master.
1898 	 *	    (to be implemented: return the merged result)
1899 	 *	- we encountered an error in which case result has been
1900 	 *	    set accordingly already.
1901 	 */
1902 
1903 	if (md_commd_global_state & MD_CGS_ABORTED) {
1904 		result->mmr_comm_state = MDMNE_ABORT;
1905 	}
1906 
1907 	/*
1908 	 * This message has been processed completely.
1909 	 * Remove it from the changelog.
1910 	 * Do this for replay messages too.
1911 	 * Note that the message is unlogged before waking up the
1912 	 * initiator.  This is done for two reasons.
1913 	 * 1. Remove a race condition that occurs when back to back
1914 	 *   messages are sent for the same class, the registeration is
1915 	 *   is lost.
1916 	 * 2. If the initiator died but the action was completed on all the
1917 	 *   the nodes, we want that to be marked "done" quickly.
1918 	 */
1919 
1920 	if ((msg->msg_flags & MD_MSGF_NO_LOG) == 0) {
1921 		commd_debug(MD_MMV_PROC_M,
1922 		    "proc_mas: calling unlog_msg for (%d,0x%llx-%d) type %d\n",
1923 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1924 		(void) mdmn_unlog_msg(msg);
1925 		commd_debug(MD_MMV_PROC_M,
1926 		    "proc_mas: done unlog_msg for (%d,0x%llx-%d) type %d\n",
1927 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1928 	}
1929 
1930 	/*
1931 	 * In case of submessages, we increased the submessage ID in the
1932 	 * result structure. We restore the message ID to the value that
1933 	 * the initiator is waiting for.
1934 	 */
1935 	result->mmr_msgid.mid_smid	= 0;
1936 	result->mmr_msgtype		= orig_type;
1937 	result->mmr_sender		= set_master;
1938 
1939 	/* if we have an inited client, send result */
1940 	ret = (int *)NULL;
1941 
1942 	(void) rw_rdlock(&client_rwlock[setno]);
1943 	if (check_client(setno, sender)) {
1944 		commd_debug(MD_MMV_SYSLOG,
1945 		    "proc_mas: unable to create client for initiator\n");
1946 	} else {
1947 		ret = mdmn_wakeup_initiator_2(result, client[setno][sender],
1948 		    sender);
1949 	}
1950 	(void) rw_unlock(&client_rwlock[setno]);
1951 
1952 	if (ret == (int *)NULL) {
1953 		commd_debug(MD_MMV_PROC_M,
1954 		    "proc_mas: couldn't wakeup initiator\n");
1955 	} else {
1956 		if (*ret != MDMNE_ACK) {
1957 			commd_debug(MD_MMV_PROC_M,
1958 			    "proc_mas: wakeup_initiator returned %d\n",
1959 			    *ret);
1960 		}
1961 		free(ret);
1962 	}
1963 
1964 	(void) rw_unlock(&set_desc_rwlock[setno]);
1965 	/* Free all submessages, if there were any */
1966 	if (nmsgs > 1) {
1967 		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1968 			free_msg(msglist[curmsg]);
1969 		}
1970 	}
1971 	/* Free the result */
1972 	free_result(result);
1973 
1974 	(void) mutex_lock(&mdmn_busy_mutex[setno]);
1975 	mdmn_mark_class_unbusy(setno, orig_class);
1976 	(void) mutex_unlock(&mdmn_busy_mutex[setno]);
1977 
1978 
1979 	/*
1980 	 * We use this ioctl just to get the time in the same format as used in
1981 	 * the messageID. If it fails, all we get is a bad runtime output.
1982 	 */
1983 	(void) metaioctl(MD_IOCGUNIQMSGID, &dummyid, &mde, NULL);
1984 	secdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) >> 32;
1985 	usecdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) & 0xfffff;
1986 
1987 	/* catching possible overflow */
1988 	if (usecdiff >= 1000000) {
1989 		usecdiff -= 1000000;
1990 		secdiff++;
1991 	}
1992 
1993 
1994 	commd_debug(MD_MMV_PROC_M, "proc_mas: done (%d, 0x%llx-%d) type=%02d "
1995 	    "%5d.%06d secs runtime\n",
1996 	    MSGID_ELEMS(msg->msg_msgid), orig_type, secdiff, usecdiff);
1997 
1998 	/* Free the original message */
1999 	free_msg(msg);
2000 }
2001 
2002 void
2003 mdmn_slave_process_msg(md_mn_msg_t *msg)
2004 {
2005 	int			*ret = NULL;
2006 	int			completed;
2007 	int			retries;
2008 	int			successfully_returned;
2009 	set_t			setno;
2010 	md_mn_result_t		*result;
2011 	md_mn_nodeid_t		sender;
2012 	md_mn_nodeid_t		whoami;
2013 	md_mn_msgtype_t		msgtype;
2014 	md_mn_msgclass_t	class;
2015 
2016 	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
2017 
2018 	setno	= msg->msg_setno;
2019 	sender	= msg->msg_sender; /* this is always the master of the set */
2020 	msgtype	= msg->msg_type;
2021 
2022 	(void) rw_rdlock(&set_desc_rwlock[setno]);
2023 	whoami		= set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
2024 	(void) rw_unlock(&set_desc_rwlock[setno]);
2025 
2026 	result = Zalloc(sizeof (md_mn_result_t));
2027 	result->mmr_flags	= msg->msg_flags;
2028 	result->mmr_setno	= setno;
2029 	result->mmr_msgtype	= msgtype;
2030 	result->mmr_sender	= whoami;
2031 	result->mmr_comm_state	= MDMNE_ACK; /* Ok state */
2032 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
2033 	class = mdmn_get_message_class(msgtype);
2034 
2035 	commd_debug(MD_MMV_PROC_S,
2036 	    "proc_sla: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
2037 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msgtype);
2038 
2039 	handler = mdmn_get_handler(msgtype);
2040 
2041 	if (handler == NULL) {
2042 		result->mmr_exitval = 0;
2043 		/* let the sender decide if this is an error or not */
2044 		result->mmr_comm_state = MDMNE_NO_HANDLER;
2045 		commd_debug(MD_MMV_PROC_S,
2046 		    "proc_sla: No handler for (%d, 0x%llx-%d)\n",
2047 		    MSGID_ELEMS(msg->msg_msgid));
2048 	} else {
2049 
2050 		/* Did we already process this message ? */
2051 		(void) mutex_lock(&mct_mutex[setno][class]);
2052 		completed = mdmn_check_completion(msg, result);
2053 
2054 		if (completed == MDMN_MCT_NOT_DONE) {
2055 			/* message not yet processed locally */
2056 			commd_debug(MD_MMV_PROC_S,
2057 			    "proc_sla: calling handler for (%d, 0x%llx-%d)\n",
2058 			    MSGID_ELEMS(msg->msg_msgid));
2059 
2060 			/*
2061 			 * Mark the message as being currently processed,
2062 			 * so we won't start a second handler for it
2063 			 */
2064 			(void) mdmn_mark_completion(msg, NULL,
2065 			    MDMN_MCT_IN_PROGRESS);
2066 
2067 			(void) mutex_unlock(&mct_mutex[setno][class]);
2068 			(*handler)(msg, MD_MSGF_ON_SLAVE, result);
2069 
2070 			commd_debug(MD_MMV_PROC_S,
2071 			    "proc_sla: finished handler for (%d, 0x%llx-%d)\n",
2072 			    MSGID_ELEMS(msg->msg_msgid));
2073 
2074 			(void) mutex_lock(&mct_mutex[setno][class]);
2075 			/* Mark the message as fully done, store the result */
2076 			(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
2077 
2078 		} else if (completed == MDMN_MCT_DONE) {
2079 			/* message processed previously, got result from MCT */
2080 			commd_debug(MD_MMV_PROC_S,
2081 			    "proc_sla: result for (%d, 0x%llx-%d) from MCT\n",
2082 			    MSGID_ELEMS(msg->msg_msgid));
2083 		} else if (completed == MDMN_MCT_IN_PROGRESS) {
2084 			/*
2085 			 * If the message is curruntly being processed,
2086 			 * we can return here, without sending a result back.
2087 			 * This will be done by the initial message handling
2088 			 * thread
2089 			 */
2090 			(void) mutex_unlock(&mct_mutex[setno][class]);
2091 			commd_debug(MD_MMV_PROC_M, "proc_sla: "
2092 			    "(%d, 0x%llx-%d) is currently being processed\n",
2093 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
2094 
2095 			free_msg(msg);
2096 			free_result(result);
2097 			return;
2098 		} else {
2099 			/* MCT error occurred (should never happen) */
2100 			result->mmr_comm_state = MDMNE_LOG_FAIL;
2101 			commd_debug(MD_MMV_PROC_S,
2102 			    "proc_sla: MCT error for (%d, 0x%llx-%d)\n",
2103 			    MSGID_ELEMS(msg->msg_msgid));
2104 		}
2105 		(void) mutex_unlock(&mct_mutex[setno][class]);
2106 	}
2107 
2108 	/*
2109 	 * At this point we have a result (even in an error case)
2110 	 * that we return to the master.
2111 	 */
2112 	(void) rw_rdlock(&set_desc_rwlock[setno]);
2113 	retries = 2; /* we will try two times to send the results */
2114 	successfully_returned = 0;
2115 
2116 	while (!successfully_returned && (retries != 0)) {
2117 		ret = (int *)NULL;
2118 		(void) rw_rdlock(&client_rwlock[setno]);
2119 		if (check_client(setno, sender)) {
2120 			/*
2121 			 * If we cannot setup the rpc connection to the master,
2122 			 * we can't do anything besides logging this fact.
2123 			 */
2124 			commd_debug(MD_MMV_SYSLOG,
2125 			    "proc_mas: unable to create client for master\n");
2126 			(void) rw_unlock(&client_rwlock[setno]);
2127 			break;
2128 		} else {
2129 			ret = mdmn_wakeup_master_2(result,
2130 			    client[setno][sender], sender);
2131 			/*
2132 			 * if mdmn_wakeup_master_2 returns NULL, it can be that
2133 			 * the master (or the commd on the master) had died.
2134 			 * In that case, we destroy the client to the master
2135 			 * and retry.
2136 			 * If mdmn_wakeup_master_2 doesn't return MDMNE_ACK,
2137 			 * the commd on the master is alive but
2138 			 * something else is wrong,
2139 			 * in that case a retry doesn't make sense => break out
2140 			 */
2141 			if (ret == (int *)NULL) {
2142 				commd_debug(MD_MMV_PROC_S,
2143 				    "proc_sla: wakeup_master returned NULL\n");
2144 				/* release reader lock, grab writer lock */
2145 				(void) rw_unlock(&client_rwlock[setno]);
2146 				(void) rw_wrlock(&client_rwlock[setno]);
2147 				mdmn_clnt_destroy(client[setno][sender]);
2148 				if (client[setno][sender] != (CLIENT *)NULL) {
2149 					client[setno][sender] = (CLIENT *)NULL;
2150 				}
2151 				(void) rw_unlock(&client_rwlock[setno]);
2152 				retries--;
2153 				commd_debug(MD_MMV_PROC_S,
2154 				    "retries = %d\n", retries);
2155 				continue;
2156 			}
2157 			if (*ret != MDMNE_ACK) {
2158 				commd_debug(MD_MMV_PROC_S, "proc_sla: "
2159 				    "wakeup_master returned %d\n", *ret);
2160 				(void) rw_unlock(&client_rwlock[setno]);
2161 				break;
2162 			} else { /* Good case */
2163 				successfully_returned = 1;
2164 				(void) rw_unlock(&client_rwlock[setno]);
2165 			}
2166 		}
2167 	}
2168 
2169 	(void) rw_unlock(&set_desc_rwlock[setno]);
2170 	commd_debug(MD_MMV_PROC_S, "proc_sla: done (%d, 0x%llx-%d)\n",
2171 	    MSGID_ELEMS(msg->msg_msgid));
2172 
2173 	if (ret != (int *)NULL)
2174 		free(ret);
2175 	free_msg(msg);
2176 	free_result(result);
2177 }
2178 
2179 
2180 /*
2181  * mdmn_send_svc_2:
2182  * ---------------
2183  * Check that the issuing node is a legitimate one (i.e. is licensed to send
2184  * messages to us), that the RPC request can be staged.
2185  *
2186  * Returns:
2187  *	0	=> no RPC request is in-flight, no deferred svc_sendreply()
2188  *	1	=> queued RPC request in-flight. Completion will be made (later)
2189  *		   by a wakeup_initiator_2() [hopefully]
2190  */
2191 int
2192 mdmn_send_svc_2(md_mn_msg_t *omsg, struct svc_req *rqstp)
2193 {
2194 	int			err;
2195 	set_t			setno;
2196 	SVCXPRT			*transp = rqstp->rq_xprt;
2197 	md_mn_msg_t		*msg;
2198 	md_mn_result_t		*resultp;
2199 	md_mn_msgclass_t	class;
2200 	md_mn_msg_and_transp_t	*matp;
2201 
2202 	msg = copy_msg(omsg, NULL);
2203 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2204 
2205 	setno = msg->msg_setno;
2206 	class = mdmn_get_message_class(msg->msg_type);
2207 
2208 	/* If we are in the abort state, we error out immediately */
2209 	if (md_commd_global_state & MD_CGS_ABORTED) {
2210 		resultp = Zalloc(sizeof (md_mn_result_t));
2211 		resultp->mmr_comm_state = MDMNE_ABORT;
2212 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2213 		free_result(resultp);
2214 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2215 		return (0);
2216 	}
2217 
2218 	/* check if the global initialization is done */
2219 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2220 		global_init();
2221 	}
2222 
2223 	commd_debug(MD_MMV_SEND,
2224 	    "send: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2225 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2226 
2227 	/* Check for verbosity related message */
2228 	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2229 		md_mn_verbose_t *d;
2230 
2231 		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2232 		md_commd_global_verb = d->mmv_what;
2233 		/* everytime the bitmask is set, we reset the timer */
2234 		__savetime = gethrtime();
2235 		/*
2236 		 * If local-only-flag is set, we are done here,
2237 		 * otherwise we pass that message on to the master.
2238 		 */
2239 		if (msg->msg_flags & MD_MSGF_LOCAL_ONLY) {
2240 			resultp = Zalloc(sizeof (md_mn_result_t));
2241 			resultp->mmr_comm_state = MDMNE_ACK;
2242 			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2243 			    (char *)resultp);
2244 			free_result(resultp);
2245 			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2246 			return (0);
2247 		}
2248 	}
2249 
2250 	/*
2251 	 * Are we entering the abort state?
2252 	 * Here we don't even need to check for MD_MSGF_LOCAL_ONLY, because
2253 	 * this message cannot be distributed anyway.
2254 	 * So, it's safe to return immediately.
2255 	 */
2256 	if (msg->msg_type == MD_MN_MSG_ABORT) {
2257 		md_commd_global_state |= MD_CGS_ABORTED;
2258 		resultp = Zalloc(sizeof (md_mn_result_t));
2259 		resultp->mmr_comm_state = MDMNE_ACK;
2260 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2261 		free_result(resultp);
2262 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2263 		return (0);
2264 	}
2265 
2266 
2267 	/*
2268 	 * Is this message type blocked?
2269 	 * If so we return MDMNE_CLASS_LOCKED, immediately
2270 	 */
2271 	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2272 		resultp = Zalloc(sizeof (md_mn_result_t));
2273 		resultp->mmr_comm_state = MDMNE_CLASS_LOCKED;
2274 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2275 		free_result(resultp);
2276 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2277 		commd_debug(MD_MMV_SEND,
2278 		    "send: type locked (%d, 0x%llx-%d), set=%d, class=%d, "
2279 		    "type=%d\n", MSGID_ELEMS(msg->msg_msgid), setno, class,
2280 		    msg->msg_type);
2281 		return (0);
2282 	}
2283 
2284 
2285 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2286 		/* Can only use the appropriate mutexes if they are inited */
2287 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2288 			(void) rw_wrlock(&set_desc_rwlock[setno]);
2289 			(void) rw_wrlock(&client_rwlock[setno]);
2290 			err = mdmn_init_set(setno, MDMN_SET_READY);
2291 			(void) rw_unlock(&client_rwlock[setno]);
2292 			(void) rw_unlock(&set_desc_rwlock[setno]);
2293 		} else {
2294 			err = mdmn_init_set(setno, MDMN_SET_READY);
2295 		}
2296 
2297 		if (err) {
2298 			/* couldn't initialize connections, cannot proceed */
2299 			resultp = Zalloc(sizeof (md_mn_result_t));
2300 			resultp->mmr_comm_state = err;
2301 			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2302 			    (char *)resultp);
2303 			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2304 			free_result(resultp);
2305 			commd_debug(MD_MMV_SEND,
2306 			    "send: init err = %d\n", err);
2307 			return (0);
2308 		}
2309 	}
2310 
2311 	(void) mutex_lock(&mdmn_busy_mutex[setno]);
2312 	if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2313 	    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2314 		(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2315 		resultp = Zalloc(sizeof (md_mn_result_t));
2316 		resultp->mmr_comm_state = MDMNE_SUSPENDED;
2317 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2318 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2319 		free_result(resultp);
2320 		commd_debug(MD_MMV_SEND,
2321 		    "send: class suspended (%d, 0x%llx-%d), set=%d, "
2322 		    "class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2323 		    setno, class, msg->msg_type);
2324 		return (0);
2325 	}
2326 	(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2327 
2328 	/* is this rpc request coming from the local node? */
2329 	if (check_license(rqstp, 0) == FALSE) {
2330 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2331 		commd_debug(MD_MMV_SEND,
2332 		    "send: check licence fail(%d, 0x%llx-%d), set=%d, "
2333 		    "class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2334 		    setno, class, msg->msg_type);
2335 		return (0);
2336 	}
2337 
2338 
2339 	/*
2340 	 * We allocate a structure that can take two pointers in order to pass
2341 	 * both the message and the transp into thread_create.
2342 	 * The free for this alloc is done in mdmn_send_to_work()
2343 	 */
2344 	matp = Malloc(sizeof (md_mn_msg_and_transp_t));
2345 	matp->mat_msg = msg;
2346 	matp->mat_transp = transp;
2347 
2348 	/*
2349 	 * create a thread here that calls work on the master.
2350 	 * If we are already on the master, this would block if running
2351 	 * in the same context. (our service is single threaded)(
2352 	 * Make it a detached thread because it will not communicate with
2353 	 * anybody thru thr_* mechanisms
2354 	 */
2355 	(void) thr_create(NULL, 0, mdmn_send_to_work, (void *) matp,
2356 	    THR_DETACHED, NULL);
2357 
2358 	commd_debug(MD_MMV_SEND, "send: done (%d, 0x%llx-%d)\n",
2359 	    MSGID_ELEMS(msg->msg_msgid));
2360 	/*
2361 	 * We return here without sending results. This will be done by
2362 	 * mdmn_wakeup_initiator_svc_2() as soon as the results are available.
2363 	 * Until then the calling send_message will be blocked, while we
2364 	 * are able to take calls.
2365 	 */
2366 
2367 	return (1);
2368 }
2369 
2370 /* ARGSUSED */
2371 int *
2372 mdmn_work_svc_2(md_mn_msg_t *omsg, struct svc_req *rqstp)
2373 {
2374 	int		err;
2375 	set_t		setno;
2376 	thread_t	tid;
2377 	int		*retval;
2378 	md_mn_msg_t	*msg;
2379 	md_mn_msgclass_t class;
2380 
2381 	retval = Malloc(sizeof (int));
2382 
2383 	/* If we are in the abort state, we error out immediately */
2384 	if (md_commd_global_state & MD_CGS_ABORTED) {
2385 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2386 		*retval = MDMNE_ABORT;
2387 		return (retval);
2388 	}
2389 
2390 	msg = copy_msg(omsg, NULL);
2391 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2392 
2393 	/*
2394 	 * Is this message type blocked?
2395 	 * If so we return MDMNE_CLASS_LOCKED, immediately.
2396 	 * This check is performed on master and slave.
2397 	 */
2398 	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2399 		*retval = MDMNE_CLASS_LOCKED;
2400 		return (retval);
2401 	}
2402 
2403 	/* check if the global initialization is done */
2404 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2405 		global_init();
2406 	}
2407 
2408 	class = mdmn_get_message_class(msg->msg_type);
2409 	setno = msg->msg_setno;
2410 
2411 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2412 		/* Can only use the appropriate mutexes if they are inited */
2413 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2414 			(void) rw_wrlock(&set_desc_rwlock[setno]);
2415 			(void) rw_wrlock(&client_rwlock[setno]);
2416 			err = mdmn_init_set(setno, MDMN_SET_READY);
2417 			(void) rw_unlock(&client_rwlock[setno]);
2418 			(void) rw_unlock(&set_desc_rwlock[setno]);
2419 		} else {
2420 			err = mdmn_init_set(setno, MDMN_SET_READY);
2421 		}
2422 
2423 		if (err) {
2424 			*retval = MDMNE_CANNOT_CONNECT;
2425 			free_msg(msg);
2426 			return (retval);
2427 		}
2428 	}
2429 
2430 	/* is this rpc request coming from a licensed node? */
2431 	if (check_license(rqstp, msg->msg_sender) == FALSE) {
2432 		free_msg(msg);
2433 		*retval = MDMNE_RPC_FAIL;
2434 		return (retval);
2435 	}
2436 
2437 	commd_debug(MD_MMV_WORK,
2438 	    "work: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
2439 	    "flags=0x%x\n",
2440 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type,
2441 	    msg->msg_flags);
2442 
2443 	/* Check for various CLASS0 message types */
2444 	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2445 		md_mn_verbose_t *d;
2446 
2447 		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2448 		/* for now we ignore set / class in md_mn_verbose_t */
2449 		md_commd_global_verb = d->mmv_what;
2450 		/* everytime the bitmask is set, we reset the timer */
2451 		__savetime = gethrtime();
2452 	}
2453 
2454 	(void) mutex_lock(&mdmn_busy_mutex[setno]);
2455 
2456 	/* check if class is locked via a call to mdmn_comm_lock_svc_2 */
2457 	if (mdmn_is_class_locked(setno, class) == TRUE) {
2458 		(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2459 		*retval = MDMNE_CLASS_LOCKED;
2460 		free_msg(msg);
2461 		return (retval);
2462 	}
2463 	(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2464 
2465 	/* Check if the class is busy right now. Do it only on the master */
2466 	(void) rw_rdlock(&set_desc_rwlock[setno]);
2467 	if (set_descriptor[setno]->sd_mn_am_i_master) {
2468 		(void) rw_unlock(&set_desc_rwlock[setno]);
2469 		/*
2470 		 * If the class is currently suspended, don't accept new
2471 		 * messages, unless they are flagged with an override bit.
2472 		 */
2473 		(void) mutex_lock(&mdmn_busy_mutex[setno]);
2474 		if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2475 		    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2476 			(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2477 			*retval = MDMNE_SUSPENDED;
2478 			commd_debug(MD_MMV_SEND,
2479 			    "send: set %d is suspended\n", setno);
2480 			free_msg(msg);
2481 			return (retval);
2482 		}
2483 		if (mdmn_mark_class_busy(setno, class) == FALSE) {
2484 			(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2485 			*retval = MDMNE_CLASS_BUSY;
2486 			free_msg(msg);
2487 			return (retval);
2488 		}
2489 		(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2490 		/*
2491 		 * Because the real processing of the message takes time we
2492 		 * create a thread for it. So the master thread can continue
2493 		 * to run and accept further messages.
2494 		 */
2495 		*retval = thr_create(NULL, 0,
2496 		    (void *(*)(void *))mdmn_master_process_msg, (void *)msg,
2497 		    THR_DETACHED|THR_SUSPENDED, &tid);
2498 	} else {
2499 		(void) rw_unlock(&set_desc_rwlock[setno]);
2500 		*retval = thr_create(NULL, 0,
2501 		    (void *(*)(void *)) mdmn_slave_process_msg, (void *)msg,
2502 		    THR_DETACHED|THR_SUSPENDED, &tid);
2503 	}
2504 
2505 	if (*retval != 0) {
2506 		*retval = MDMNE_THR_CREATE_FAIL;
2507 		free_msg(msg);
2508 		return (retval);
2509 	}
2510 
2511 	/* Now run the new thread */
2512 	(void) thr_continue(tid);
2513 
2514 	commd_debug(MD_MMV_WORK,
2515 	    "work: done (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2516 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2517 
2518 	*retval = MDMNE_ACK; /* this means success */
2519 	return (retval);
2520 }
2521 
2522 /* ARGSUSED */
2523 int *
2524 mdmn_wakeup_initiator_svc_2(md_mn_result_t *res, struct svc_req *rqstp)
2525 {
2526 
2527 	int		*retval;
2528 	int		err;
2529 	set_t		setno;
2530 	mutex_t		*mx;   /* protection of initiator_table */
2531 	SVCXPRT		*transp = NULL;
2532 	md_mn_msgid_t	initiator_table_id;
2533 	md_mn_msgclass_t class;
2534 
2535 	retval = Malloc(sizeof (int));
2536 
2537 	/* check if the global initialization is done */
2538 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2539 		global_init();
2540 	}
2541 
2542 	setno	= res->mmr_setno;
2543 
2544 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2545 		/* set not ready means we just crashed are restarted now */
2546 		/* Can only use the appropriate mutexes if they are inited */
2547 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2548 			(void) rw_wrlock(&set_desc_rwlock[setno]);
2549 			(void) rw_wrlock(&client_rwlock[setno]);
2550 			err = mdmn_init_set(setno, MDMN_SET_READY);
2551 			(void) rw_unlock(&client_rwlock[setno]);
2552 			(void) rw_unlock(&set_desc_rwlock[setno]);
2553 		} else {
2554 			err = mdmn_init_set(setno, MDMN_SET_READY);
2555 		}
2556 
2557 		if (err) {
2558 			*retval = MDMNE_CANNOT_CONNECT;
2559 			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2560 			return (retval);
2561 		}
2562 	}
2563 
2564 	/* is this rpc request coming from a licensed node? */
2565 	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2566 		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2567 		*retval = MDMNE_RPC_FAIL;
2568 		return (retval);
2569 	}
2570 
2571 
2572 	class	= mdmn_get_message_class(res->mmr_msgtype);
2573 	mx	= mdmn_get_initiator_table_mx(setno, class);
2574 
2575 	commd_debug(MD_MMV_WAKE_I,
2576 	    "wake_ini: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
2577 	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype);
2578 
2579 	(void) mutex_lock(mx);
2580 
2581 	/*
2582 	 * Search the initiator wakeup table.
2583 	 * If we find an entry here (which should always be true)
2584 	 * we are on the initiating node and we wakeup the original
2585 	 * local rpc call.
2586 	 */
2587 	mdmn_get_initiator_table_id(setno, class, &initiator_table_id);
2588 
2589 	if (MSGID_CMP(&(initiator_table_id), &(res->mmr_msgid))) {
2590 		transp = mdmn_get_initiator_table_transp(setno, class);
2591 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)res);
2592 		svc_done(transp);
2593 		mdmn_unregister_initiator_table(setno, class);
2594 		*retval = MDMNE_ACK;
2595 
2596 		commd_debug(MD_MMV_WAKE_I,
2597 		    "wake_ini: replied (%d, 0x%llx-%d)\n",
2598 		    MSGID_ELEMS(res->mmr_msgid));
2599 	} else {
2600 		commd_debug(MD_MMV_WAKE_I,
2601 		    "wakeup initiator: unsolicited message (%d, 0x%llx-%d)\n",
2602 		    MSGID_ELEMS(res->mmr_msgid));
2603 		*retval = MDMNE_NO_WAKEUP_ENTRY;
2604 	}
2605 	(void) mutex_unlock(mx);
2606 	/* less work for check_timeouts */
2607 	(void) mutex_lock(&check_timeout_mutex);
2608 	if (messages_on_their_way == 0) {
2609 		commd_debug(MD_MMV_WAKE_I,
2610 		    "Oops, messages_on_their_way < 0 (%d, 0x%llx-%d)\n",
2611 		    MSGID_ELEMS(res->mmr_msgid));
2612 	} else {
2613 		messages_on_their_way--;
2614 	}
2615 	(void) mutex_unlock(&check_timeout_mutex);
2616 	xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2617 
2618 	return (retval);
2619 }
2620 
2621 
2622 /*
2623  * res must be free'd by the thread we wake up
2624  */
2625 /* ARGSUSED */
2626 int *
2627 mdmn_wakeup_master_svc_2(md_mn_result_t *ores, struct svc_req *rqstp)
2628 {
2629 
2630 	int		*retval;
2631 	int		err;
2632 	set_t		setno;
2633 	cond_t		*cv;
2634 	mutex_t		*mx;
2635 	md_mn_msgid_t	master_table_id;
2636 	md_mn_nodeid_t	sender;
2637 	md_mn_result_t	*res;
2638 	md_mn_msgclass_t class;
2639 
2640 	retval = Malloc(sizeof (int));
2641 
2642 	/* check if the global initialization is done */
2643 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2644 		global_init();
2645 	}
2646 
2647 	/* Need to copy the results here, as they are static for RPC */
2648 	res = copy_result(ores);
2649 	xdr_free(xdr_md_mn_result_t, (caddr_t)ores);
2650 
2651 	class = mdmn_get_message_class(res->mmr_msgtype);
2652 	setno = res->mmr_setno;
2653 
2654 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2655 		/* set not ready means we just crashed are restarted now */
2656 		/* Can only use the appropriate mutexes if they are inited */
2657 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2658 			(void) rw_wrlock(&set_desc_rwlock[setno]);
2659 			(void) rw_wrlock(&client_rwlock[setno]);
2660 			err = mdmn_init_set(setno, MDMN_SET_READY);
2661 			(void) rw_unlock(&client_rwlock[setno]);
2662 			(void) rw_unlock(&set_desc_rwlock[setno]);
2663 		} else {
2664 			err = mdmn_init_set(setno, MDMN_SET_READY);
2665 		}
2666 
2667 		if (err) {
2668 			*retval = MDMNE_CANNOT_CONNECT;
2669 			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2670 			return (retval);
2671 		}
2672 	}
2673 
2674 	/* is this rpc request coming from a licensed node? */
2675 	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2676 		*retval = MDMNE_RPC_FAIL;
2677 		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2678 		return (retval);
2679 	}
2680 
2681 
2682 	commd_debug(MD_MMV_WAKE_M,
2683 	    "wake_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d "
2684 	    "from %d\n",
2685 	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype,
2686 	    res->mmr_sender);
2687 	/*
2688 	 * The mutex and cv are needed for waking up the thread
2689 	 * sleeping in mdmn_master_process_msg()
2690 	 */
2691 	mx = mdmn_get_master_table_mx(setno, class);
2692 	cv = mdmn_get_master_table_cv(setno, class);
2693 
2694 	/*
2695 	 * lookup the master wakeup table
2696 	 * If we find our message, we are on the master and
2697 	 * called by a slave that finished processing a message.
2698 	 * We store the results in the appropriate slot and
2699 	 * wakeup the thread (mdmn_master_process_msg()) waiting for them.
2700 	 */
2701 	(void) mutex_lock(mx);
2702 	mdmn_get_master_table_id(setno, class, &master_table_id);
2703 	sender = mdmn_get_master_table_addr(setno, class);
2704 
2705 	if (MSGID_CMP(&(master_table_id), &(res->mmr_msgid))) {
2706 		if (sender == res->mmr_sender) {
2707 			mdmn_set_master_table_res(setno, class, res);
2708 			(void) cond_signal(cv);
2709 			*retval = MDMNE_ACK;
2710 		} else {
2711 			/* id is correct but wrong sender (I smell a timeout) */
2712 			commd_debug(MD_MMV_WAKE_M,
2713 			    "wakeup master got unsolicited message: "
2714 			    "(%d, 0x%llx-%d) from %d\n",
2715 			    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender);
2716 			free_result(res);
2717 			*retval = MDMNE_TIMEOUT;
2718 		}
2719 	} else {
2720 		/* id is wrong, smells like a very late timeout */
2721 		commd_debug(MD_MMV_WAKE_M,
2722 		    "wakeup master got unsolicited message: "
2723 		    "(%d, 0x%llx-%d) from %d, expected (%d, 0x%llx-%d)\n",
2724 		    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender,
2725 		    MSGID_ELEMS(master_table_id));
2726 		free_result(res);
2727 		*retval = MDMNE_NO_WAKEUP_ENTRY;
2728 	}
2729 
2730 	(void) mutex_unlock(mx);
2731 
2732 	return (retval);
2733 }
2734 
2735 /*
2736  * Lock a set/class combination.
2737  * This is mainly done for debug purpose.
2738  * This set/class combination immediately is blocked,
2739  * even in the middle of sending messages to multiple slaves.
2740  * This remains until the user issues a mdmn_comm_unlock_svc_2 for the same
2741  * set/class combination.
2742  *
2743  * Special messages of class MD_MSG_CLASS0 can never be locked.
2744  * 	e.g. MD_MN_MSG_VERBOSITY, MD_MN_MSG_ABORT
2745  *
2746  * That means, if MD_MSG_CLASS0 is specified, we lock all classes from
2747  * >= MD_MSG_CLASS1 to < MD_MN_NCLASSES
2748  *
2749  * set must be between 1 and MD_MAXSETS
2750  * class can be:
2751  *	MD_MSG_CLASS0 which means all other classes in this case
2752  *	or one specific class (< MD_MN_NCLASSES)
2753  *
2754  * Returns:
2755  *	MDMNE_ACK on sucess (locking a locked class is Ok)
2756  *	MDMNE_EINVAL if a parameter is out of range
2757  */
2758 
2759 /* ARGSUSED */
2760 int *
2761 mdmn_comm_lock_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2762 {
2763 	int			*retval;
2764 	set_t			setno = msc->msc_set;
2765 	md_mn_msgclass_t	class = msc->msc_class;
2766 
2767 	retval = Malloc(sizeof (int));
2768 
2769 	/* check if the global initialization is done */
2770 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2771 		global_init();
2772 	}
2773 
2774 	/* is this rpc request coming from the local node ? */
2775 	if (check_license(rqstp, 0) == FALSE) {
2776 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2777 		*retval = MDMNE_RPC_FAIL;
2778 		return (retval);
2779 	}
2780 
2781 	/* Perform some range checking */
2782 	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2783 	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2784 		*retval = MDMNE_EINVAL;
2785 		return (retval);
2786 	}
2787 
2788 	commd_debug(MD_MMV_MISC, "lock: set=%d, class=%d\n", setno, class);
2789 	(void) mutex_lock(&mdmn_busy_mutex[setno]);
2790 	if (class != MD_MSG_CLASS0) {
2791 		mdmn_mark_class_locked(setno, class);
2792 	} else {
2793 		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2794 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2795 			mdmn_mark_class_locked(setno, class);
2796 		}
2797 	}
2798 	(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2799 
2800 	*retval = MDMNE_ACK;
2801 	return (retval);
2802 }
2803 
2804 /*
2805  * Unlock a set/class combination.
2806  * set must be between 1 and MD_MAXSETS
2807  * class can be:
2808  *	MD_MSG_CLASS0 which means all other classes in this case (like above)
2809  *	or one specific class (< MD_MN_NCLASSES)
2810  *
2811  * Returns:
2812  *	MDMNE_ACK on sucess (unlocking an unlocked class is Ok)
2813  *	MDMNE_EINVAL if a parameter is out of range
2814  */
2815 /* ARGSUSED */
2816 int *
2817 mdmn_comm_unlock_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2818 {
2819 	int			*retval;
2820 	set_t			setno  = msc->msc_set;
2821 	md_mn_msgclass_t	class  = msc->msc_class;
2822 
2823 	retval = Malloc(sizeof (int));
2824 
2825 	/* check if the global initialization is done */
2826 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2827 		global_init();
2828 	}
2829 
2830 	/* is this rpc request coming from the local node ? */
2831 	if (check_license(rqstp, 0) == FALSE) {
2832 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2833 		*retval = MDMNE_RPC_FAIL;
2834 		return (retval);
2835 	}
2836 
2837 	/* Perform some range checking */
2838 	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2839 	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2840 		*retval = MDMNE_EINVAL;
2841 		return (retval);
2842 	}
2843 	commd_debug(MD_MMV_MISC, "unlock: set=%d, class=%d\n", setno, class);
2844 
2845 	(void) mutex_lock(&mdmn_busy_mutex[setno]);
2846 	if (class != MD_MSG_CLASS0) {
2847 		mdmn_mark_class_unlocked(setno, class);
2848 	} else {
2849 		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2850 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2851 			mdmn_mark_class_unlocked(setno, class);
2852 		}
2853 	}
2854 	(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2855 
2856 	*retval = MDMNE_ACK;
2857 	return (retval);
2858 }
2859 
2860 /*
2861  * mdmn_comm_suspend_svc_2(setno, class)
2862  *
2863  * Drain all outstanding messages for a given set/class combination
2864  * and don't allow new messages to be processed.
2865  *
2866  * Special messages of class MD_MSG_CLASS0 can never be locked.
2867  * 	e.g. MD_MN_MSG_VERBOSITY
2868  *
2869  * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2870  * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2871  *
2872  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2873  * one class as being suspended.
2874  * If messages for this class are currently on their way,
2875  * MDMNE_SET_NOT_DRAINED is returned. Otherwise MDMNE_ACK is returned.
2876  *
2877  * If class _is_ MD_COMM_ALL_CLASSES we drain all classes of this set.
2878  * Messages must be generated in ascending order.
2879  * This means, a message cannot create submessages with the same or lower class.
2880  * Draining messages must go from 1 to NCLASSES in order to ensure we don't
2881  * generate a hanging situation here.
2882  * We mark class 1 as being suspended.
2883  * if the class is not busy, we proceed with class 2
2884  * and so on
2885  * if a class *is* busy, we cannot continue here, but return
2886  * MDMNE_SET_NOT_DRAINED.
2887  * We expect the caller to hold on for some seconds and try again.
2888  * When that message, that held the class busy is done in
2889  * mdmn_master_process_msg(), mdmn_mark_class_unbusy() called.
2890  * There it is checked if the class is about to drain.
2891  * In that case it tries to drain all higher classes there.
2892  *
2893  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2894  * In that case we return MDMNE_SET_NOT_DRAINED if not all sets are
2895  * completely drained.
2896  *
2897  * Returns:
2898  *	MDMNE_ACK on sucess (set is drained, no outstanding messages)
2899  *	MDMNE_SET_NOT_DRAINED  if drain process is started, but there are
2900  *		still outstanding messages for this set(s)
2901  *	MDMNE_EINVAL if setno is out of range
2902  *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2903  */
2904 
2905 /* ARGSUSED */
2906 int *
2907 mdmn_comm_suspend_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2908 {
2909 	int			*retval;
2910 	int			failure = 0;
2911 	set_t			startset, endset;
2912 	set_t			setno  = msc->msc_set;
2913 	md_mn_msgclass_t	oclass = msc->msc_class;
2914 #ifdef NOT_YET_NEEDED
2915 	uint_t			flags  = msc->msc_flags;
2916 #endif /* NOT_YET_NEEDED */
2917 	md_mn_msgclass_t	class;
2918 
2919 	retval = Malloc(sizeof (int));
2920 
2921 	/* check if the global initialization is done */
2922 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2923 		global_init();
2924 	}
2925 
2926 	/* is this rpc request coming from the local node ? */
2927 	if (check_license(rqstp, 0) == FALSE) {
2928 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2929 		*retval = MDMNE_RPC_FAIL;
2930 		return (retval);
2931 	}
2932 
2933 	commd_debug(MD_MMV_MISC, "suspend: called for set=%d class=%d\n",
2934 	    setno, oclass);
2935 
2936 	/* Perform some range checking */
2937 	if (setno >= MD_MAXSETS) {
2938 		*retval = MDMNE_EINVAL;
2939 		commd_debug(MD_MMV_MISC, "suspend: returning MDMNE_EINVAL\n");
2940 		return (retval);
2941 	}
2942 
2943 	/*  setno == MD_COMM_ALL_SETS means: we walk thru all possible sets. */
2944 	if (setno == MD_COMM_ALL_SETS) {
2945 		startset = 1;
2946 		endset = MD_MAXSETS - 1;
2947 	} else {
2948 		startset = setno;
2949 		endset = setno;
2950 	}
2951 
2952 	for (setno = startset; setno <= endset; setno++) {
2953 		/* Here we need the mutexes for the set to be setup */
2954 		if (md_mn_set_inited[setno] != MDMN_SET_MUTEXES) {
2955 			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
2956 		}
2957 
2958 		(void) mutex_lock(&mdmn_busy_mutex[setno]);
2959 		/* shall we drain all classes of this set? */
2960 		if (oclass == MD_COMM_ALL_CLASSES) {
2961 			for (class = 1; class < MD_MN_NCLASSES; class ++) {
2962 				commd_debug(MD_MMV_MISC,
2963 				    "suspend: suspending set %d, class %d\n",
2964 				    setno, class);
2965 				*retval = mdmn_mark_class_suspended(setno,
2966 				    class, MDMN_SUSPEND_ALL);
2967 				if (*retval == MDMNE_SET_NOT_DRAINED) {
2968 					failure++;
2969 				}
2970 			}
2971 		} else {
2972 			/* only drain one specific class */
2973 			commd_debug(MD_MMV_MISC,
2974 			    "suspend: suspending set=%d class=%d\n",
2975 			    setno, oclass);
2976 			*retval = mdmn_mark_class_suspended(setno, oclass,
2977 			    MDMN_SUSPEND_1);
2978 			if (*retval == MDMNE_SET_NOT_DRAINED) {
2979 				failure++;
2980 			}
2981 		}
2982 		(void) mutex_unlock(&mdmn_busy_mutex[setno]);
2983 	}
2984 	/* If one or more sets are not entirely drained, failure is non-zero */
2985 	if (failure != 0) {
2986 		*retval = MDMNE_SET_NOT_DRAINED;
2987 		commd_debug(MD_MMV_MISC,
2988 		    "suspend: returning MDMNE_SET_NOT_DRAINED\n");
2989 	} else {
2990 		*retval = MDMNE_ACK;
2991 	}
2992 
2993 	return (retval);
2994 }
2995 
2996 /*
2997  * mdmn_comm_resume_svc_2(setno, class)
2998  *
2999  * Resume processing messages for a given set.
3000  * This incorporates the repeal of a previous suspend operation.
3001  *
3002  * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
3003  * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
3004  *
3005  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
3006  * one class as being resumed.
3007  *
3008  * If class _is_ MD_COMM_ALL_CLASSES we resume all classes of this set.
3009  *
3010  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
3011  *
3012  * If both setno is MD_COMM_ALL_SETS and class is MD_COMM_ALL_CLASSES we also
3013  * reset any ABORT flag from the global state.
3014  *
3015  * Returns:
3016  *	MDMNE_ACK on sucess (resuming an unlocked set is Ok)
3017  *	MDMNE_EINVAL if setno is out of range
3018  *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
3019  */
3020 /* ARGSUSED */
3021 int *
3022 mdmn_comm_resume_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
3023 {
3024 	int			*retval;
3025 	set_t			startset, endset;
3026 	set_t			setno  = msc->msc_set;
3027 	md_mn_msgclass_t	oclass = msc->msc_class;
3028 	uint_t			flags  = msc->msc_flags;
3029 	md_mn_msgclass_t	class;
3030 
3031 	retval = Malloc(sizeof (int));
3032 
3033 	/* check if the global initialization is done */
3034 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3035 		global_init();
3036 	}
3037 
3038 	/* is this rpc request coming from the local node ? */
3039 	if (check_license(rqstp, 0) == FALSE) {
3040 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
3041 		*retval = MDMNE_RPC_FAIL;
3042 		return (retval);
3043 	}
3044 
3045 	commd_debug(MD_MMV_MISC, "resume: called for set=%d class=%d\n",
3046 	    setno, oclass);
3047 
3048 	/* Perform some range checking */
3049 	if (setno > MD_MAXSETS) {
3050 		*retval = MDMNE_EINVAL;
3051 		return (retval);
3052 	}
3053 
3054 	if (setno == MD_COMM_ALL_SETS) {
3055 		startset = 1;
3056 		endset = MD_MAXSETS - 1;
3057 		if (oclass == MD_COMM_ALL_CLASSES) {
3058 			/* This is the point where we "unabort" the commd */
3059 			commd_debug(MD_MMV_MISC, "resume: resetting ABORT\n");
3060 			md_commd_global_state &= ~MD_CGS_ABORTED;
3061 		}
3062 	} else {
3063 		startset = setno;
3064 		endset = setno;
3065 	}
3066 
3067 	for (setno = startset; setno <= endset; setno++) {
3068 
3069 		/* Here we need the mutexes for the set to be setup */
3070 		if ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0) {
3071 			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
3072 		}
3073 
3074 		(void) mutex_lock(&mdmn_busy_mutex[setno]);
3075 
3076 		if (oclass == MD_COMM_ALL_CLASSES) {
3077 			int end_class = 1;
3078 			/*
3079 			 * When SUSPENDing all classes, we go
3080 			 * from 1 to MD_MN_NCLASSES-1
3081 			 * The correct reverse action is RESUMing
3082 			 * from MD_MN_NCLASSES-1 to 1 (or 2)
3083 			 */
3084 
3085 			if (flags & MD_MSCF_DONT_RESUME_CLASS1) {
3086 				end_class = 2;
3087 			}
3088 
3089 			/*
3090 			 * Then mark all classes of this set as no longer
3091 			 * suspended. This supersedes any previous suspend(1)
3092 			 * calls and resumes the set entirely.
3093 			 */
3094 			for (class = MD_MN_NCLASSES - 1; class >= end_class;
3095 			    class --) {
3096 				commd_debug(MD_MMV_MISC,
3097 				    "resume: resuming set=%d class=%d\n",
3098 				    setno, class);
3099 				mdmn_mark_class_resumed(setno, class,
3100 				    (MDMN_SUSPEND_ALL | MDMN_SUSPEND_1));
3101 			}
3102 		} else {
3103 			/*
3104 			 * In this case only one class is marked as not
3105 			 * suspended. If a suspend(all) is currently active for
3106 			 * this set, this class will still be suspended.
3107 			 * That state will be cleared by a suspend(all)
3108 			 * (see above)
3109 			 */
3110 			commd_debug(MD_MMV_MISC,
3111 			    "resume: resuming set=%d class=%d\n",
3112 			    setno, oclass);
3113 			mdmn_mark_class_resumed(setno, oclass, MDMN_SUSPEND_1);
3114 		}
3115 
3116 		(void) mutex_unlock(&mdmn_busy_mutex[setno]);
3117 	}
3118 
3119 	*retval = MDMNE_ACK;
3120 	return (retval);
3121 }
3122 /* ARGSUSED */
3123 int *
3124 mdmn_comm_reinit_set_svc_2(set_t *setnop, struct svc_req *rqstp)
3125 {
3126 	int		*retval;
3127 	md_mnnode_desc	*node;
3128 	set_t		 setno = *setnop;
3129 
3130 	retval = Malloc(sizeof (int));
3131 
3132 	/* check if the global initialization is done */
3133 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3134 		global_init();
3135 	}
3136 
3137 	/* is this rpc request coming from the local node ? */
3138 	if (check_license(rqstp, 0) == FALSE) {
3139 		xdr_free(xdr_set_t, (caddr_t)setnop);
3140 		*retval = MDMNE_RPC_FAIL;
3141 		return (retval);
3142 	}
3143 
3144 	commd_debug(MD_MMV_MISC, "reinit: set=%d\n", setno);
3145 
3146 	(void) rw_rdlock(&set_desc_rwlock[setno]);
3147 	/*
3148 	 * We assume, that all messages have been suspended previously.
3149 	 *
3150 	 * As we are modifying lots of clients here we grab the client_rwlock
3151 	 * in writer mode. This ensures, no new messages come in.
3152 	 */
3153 	(void) rw_wrlock(&client_rwlock[setno]);
3154 	/* This set is no longer initialized */
3155 
3156 	if ((set_descriptor[setno] != NULL) &&
3157 	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
3158 		/* destroy all rpc clients from this set */
3159 		for (node = set_descriptor[setno]->sd_nodelist; node;
3160 		    node = node->nd_next) {
3161 			/*
3162 			 * Since the CLIENT for ourself will be recreated
3163 			 * shortly, and this node is guaranteed to be
3164 			 * there after a reconfig, there's no reason to go
3165 			 * through destroying it.  It also avoids an issue
3166 			 * with calling clnt_create() later from within the
3167 			 * server thread, which can effectively deadlock
3168 			 * itself due to RPC design limitations.
3169 			 */
3170 			if (node == set_descriptor[setno]->sd_mn_mynode)
3171 				continue;
3172 			mdmn_clnt_destroy(client[setno][node->nd_nodeid]);
3173 			if (client[setno][node->nd_nodeid] != (CLIENT *)NULL) {
3174 				client[setno][node->nd_nodeid] = (CLIENT *)NULL;
3175 			}
3176 		}
3177 		md_mn_set_inited[setno] &= ~MDMN_SET_NODES;
3178 	}
3179 
3180 	commd_debug(MD_MMV_MISC, "reinit: done init_set(%d)\n", setno);
3181 
3182 	(void) rw_unlock(&client_rwlock[setno]);
3183 	(void) rw_unlock(&set_desc_rwlock[setno]);
3184 	*retval = MDMNE_ACK;
3185 	return (retval);
3186 }
3187 
3188 /*
3189  * This is just an interface for testing purpose.
3190  * Here we can disable single message types.
3191  * If we block a message type, this is valid for all MN sets.
3192  * If a message arrives later, and  it's message type is blocked, it will
3193  * be returned immediately with MDMNE_CLASS_LOCKED, which causes the sender to
3194  * resend this message over and over again.
3195  */
3196 
3197 /* ARGSUSED */
3198 int *
3199 mdmn_comm_msglock_svc_2(md_mn_type_and_lock_t *mmtl, struct svc_req *rqstp)
3200 {
3201 	int			*retval;
3202 	md_mn_msgtype_t		type = mmtl->mmtl_type;
3203 	uint_t			lock = mmtl->mmtl_lock;
3204 
3205 	retval = Malloc(sizeof (int));
3206 
3207 	/* check if the global initialization is done */
3208 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3209 		global_init();
3210 	}
3211 
3212 	/* is this rpc request coming from the local node ? */
3213 	if (check_license(rqstp, 0) == FALSE) {
3214 		xdr_free(xdr_md_mn_type_and_lock_t, (caddr_t)mmtl);
3215 		*retval = MDMNE_RPC_FAIL;
3216 		return (retval);
3217 	}
3218 
3219 	/* Perform some range checking */
3220 	if ((type == 0) || (type >= MD_MN_NMESSAGES)) {
3221 		*retval = MDMNE_EINVAL;
3222 		return (retval);
3223 	}
3224 
3225 	commd_debug(MD_MMV_MISC, "msglock: type=%d, lock=%d\n", type, lock);
3226 	msgtype_lock_state[type] = lock;
3227 
3228 	*retval = MDMNE_ACK;
3229 	return (retval);
3230 }
3231