xref: /titanic_41/usr/src/cmd/lvm/rpc.mdcommd/mdmn_commd_server.c (revision 5db09ef5a5ba16a4728800a9055f47b3f404a2b3)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include <unistd.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <sys/statvfs.h>
33 #include <sys/uadmin.h>
34 #include <fcntl.h>
35 #include <stdio.h>
36 #include <thread.h>
37 #include <meta.h>
38 #include <sdssc.h>
39 #include <mdmn_changelog.h>
40 #include "mdmn_subr.h"
41 
42 /*
43  * This is the communication daemon for SVM Multi Node Disksets.
44  * It runs on every node and provides the following rpc services:
45  *  - mdmn_send_svc_1
46  *  - mdmn_work_svc_1
47  *  - mdmn_wakeup_initiator_svc_1
48  *  - mdmn_wakeup_master_svc_1
49  *  - mdmn_comm_lock_svc_1
50  *  - mdmn_comm_unlock_svc_1
51  *  - mdmn_comm_suspend_svc_1
52  *  - mdmn_comm_resume_svc_1
53  *  - mdmn_comm_reinit_set_svc_1
54  * where send, lock, unlock and reinit are meant for external use,
55  * work and the two wakeups are for internal use only.
56  *
57  * NOTE:
58  * On every node only one of those xxx_1 functions can be active at the
59  * same time because the daemon is single threaded.
60  *
61  *
62  * In case an event occurs that has to be propagated to all the nodes...
63  *
64  * One node (the initiator)
65  *	calls the libmeta function mdmn_send_message()
66  *	This function calls the local daemon thru mdmn_send_svc_1.
67  *
68  * On the initiator:
69  *	mdmn_send_svc_1()
70  *	    - starts a thread -> mdmn_send_to_work() and returns.
71  *	mdmn_send_to_work()
72  *	    - sends this message over to the master of the diskset.
73  *	      This is done by calling mdmn_work_svc_1 on the master.
74  *	    - registers to the initiator_table
75  *	    - exits without doing a svc_sendreply() for the call to
76  *	      mdmn_send_svc_1. This means that call is blocked until somebody
77  *	      (see end of this comment) does a svc_sendreply().
78  *	      This means mdmn_send_message() does not yet return.
79  *	    - A timeout surveillance is started at this point.
80  *	      This means in case the master doesn't reply at all in an
81  *	      aproppriate time, an error condition is returned
82  *	      to the caller.
83  *
84  * On the master:
85  *	mdmn_work_svc_1()
86  *	    - starts a thread -> mdmn_master_process_msg() and returns
87  *	mdmn_master_process_msg()
88  *	    - logs the message to the change log
89  *	    - executes the message locally
90  *	    - flags the message in the change log
91  *	    - sends the message to mdmn_work_svc_1() on all the
92  *	      other nodes (slaves)
93  *	      after each call to mdmn_work_svc_1 the thread goes to sleep and
94  *	      will be woken up by mdmn_wakeup_master_svc_1() as soon as the
95  *	      slave node is done with this message.
96  *	    - In case the slave doesn't respond in a apropriate time, an error
97  *	      is assumed to ensure the master doesn't wait forever.
98  *
99  * On a slave:
100  *	mdmn_work_svc_1()
101  *	    - starts a thread -> mdmn_slave_process_msg() and returns
102  *	mdmn_slave_process_msg()
103  *	    - processes this message locally by calling the appropriate message
104  *	      handler, that creates some result.
105  *	    - sends that result thru a call to mdmn_wakeup_master_svc_1() to
106  *	      the master.
107  *
108  * Back on the master:
109  *	mdmn_wakeup_master_svc_1()
110  *	    - stores the result into the master_table.
111  *	    - signals the mdmn_master_process_msg-thread.
112  *	    - returns
113  *	mdmn_master_process_msg()
114  *	    - after getting the results from all nodes
115  *	    - sends them back to the initiating node thru a call to
116  *	      mdmn_wakeup_initiator_svc_1.
117  *
118  * Back on the initiator:
119  *	mdmn_wakeup_initiator_svc_1()
120  *	    - calls svc_sendreply() which makes the call to mdmn_send_svc_1()
121  *	      return.
122  *	      which allows the initial mdmn_send_message() call to return.
123  */
124 
125 FILE *commdout;		/* debug output for the commd */
126 char *commdoutfile;	/* file name for the above output */
127 /* want at least 10 MB free space when logging into a file */
128 #define	MIN_FS_SPACE	(10LL * 1024 * 1024)
129 
130 /*
131  * Number of outstanding messages that were initiated by this node.
132  * If zero, check_timeouts goes to sleep
133  */
134 uint_t	messages_on_their_way;
135 mutex_t	check_timeout_mutex;	/* need mutex to protect above */
136 cond_t	check_timeout_cv;	/* trigger for check_timeouts */
137 
138 /* for printing out time stamps */
139 hrtime_t __savetime;
140 
141 /* RPC clients for every set and every node and their protecting locks */
142 CLIENT	*client[MD_MAXSETS][NNODES];
143 rwlock_t client_rwlock[MD_MAXSETS];
144 
145 /* the descriptors of all possible sets and their protectors */
146 struct md_set_desc *set_descriptor[MD_MAXSETS];
147 rwlock_t set_desc_rwlock[MD_MAXSETS];
148 
149 /* the daemon to daemon communication has to timeout quickly */
150 static struct timeval FOUR_SECS = { 4, 0 };
151 
152 /* These indicate if a set has already been setup */
153 int md_mn_set_inited[MD_MAXSETS];
154 
155 /* For every set we have a message completion table and protecting mutexes */
156 md_mn_mct_t *mct[MD_MAXSETS];
157 mutex_t	mct_mutex[MD_MAXSETS][MD_MN_NCLASSES];
158 
159 /* Stuff to describe the global status of the commd on one node */
160 #define	MD_CGS_INITED		0x0001
161 #define	MD_CGS_ABORTED		0x0002	/* return everything with MDMNE_ABORT */
162 uint_t md_commd_global_state = 0;	/* No state when starting up */
163 
164 /*
165  * Global verbosity level for the daemon
166  */
167 uint_t md_commd_global_verb;
168 
169 /*
170  * libmeta doesn't like multiple threads in metaget_setdesc().
171  * So we must protect access to it with a global lock
172  */
173 mutex_t get_setdesc_mutex;
174 
175 /*
176  * Need a way to block single message types,
177  * hence an array with a status for every message type
178  */
179 uint_t msgtype_lock_state[MD_MN_NMESSAGES];
180 
181 /* for reading in the config file */
182 #define	MAX_LINE_SIZE 1024
183 
184 extern char *commd_get_outfile(void);
185 extern uint_t commd_get_verbosity(void);
186 
187 /*
188  * mdmn_clnt_create is a helper function for meta_client_create_retry.  It
189  * merely needs to call clnt_create_timed, and meta_client_create_retry
190  * will take care of the rest.
191  */
192 /* ARGSUSED */
193 static CLIENT *
194 mdmn_clnt_create(char *ignore, void *data, struct timeval *time_out)
195 {
196 	md_mnnode_desc	*node = (md_mnnode_desc *)data;
197 
198 	return (clnt_create_timed(node->nd_priv_ic, MDMN_COMMD, ONE, "tcp",
199 		time_out));
200 }
201 
202 #define	FLUSH_DEBUGFILE() \
203 	if (commdout != (FILE *)NULL) { \
204 		fflush(commdout); \
205 		fsync(fileno(commdout)); \
206 	}
207 
208 static void
209 panic_system(int nid, md_mn_msgtype_t type, int master_err, int master_exitval,
210     md_mn_result_t *slave_result)
211 {
212 	md_mn_commd_err_t	commd_err;
213 	md_error_t		mne = mdnullerror;
214 	char			*msg_buf;
215 
216 	msg_buf = (char *)calloc(MAXPATHLEN + 1, sizeof (char));
217 
218 	FLUSH_DEBUGFILE();
219 
220 	if (master_err != MDMNE_ACK) {
221 		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail on master "
222 			"when processing message type %d\n", type);
223 	} else if (slave_result == NULL) {
224 		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail on node "
225 			"%d when processing message type %d\n", nid, type);
226 	} else {
227 		snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: Inconsistent "
228 			"return value from node %d when processing message "
229 			"type %d. Master exitval = %d, Slave exitval = %d\n",
230 			nid, type, master_exitval, slave_result->mmr_exitval);
231 	}
232 	commd_err.size = strlen(msg_buf);
233 	commd_err.md_message = (uint64_t)(uintptr_t)&msg_buf[0];
234 
235 	metaioctl(MD_MN_COMMD_ERR, &commd_err, &mne, "rpc.mdcommd");
236 	(void) uadmin(A_DUMP, AD_BOOT, NULL);
237 }
238 
239 static void
240 flush_fcout()
241 {
242 	struct statvfs64 vfsbuf;
243 	long long avail_bytes;
244 	int warned = 0;
245 
246 	for (; ; ) {
247 		sleep(10);
248 		/* No output file, nothing to do */
249 		if (commdout == (FILE *)NULL)
250 			continue;
251 
252 		/*
253 		 * stat the appropriate filesystem to check for available space.
254 		 */
255 		if (statvfs64(commdoutfile, &vfsbuf)) {
256 			continue;
257 		}
258 
259 		avail_bytes = vfsbuf.f_frsize * vfsbuf.f_bavail;
260 		/*
261 		 * If we don't have enough space, we print out a warning.
262 		 * And we drop the verbosity level to NULL
263 		 * In case the condtion doesn't go away, we don't repeat
264 		 * the warning.
265 		 */
266 		if (avail_bytes < MIN_FS_SPACE) {
267 			if (warned) {
268 				continue;
269 			}
270 			commd_debug(MD_MMV_SYSLOG,
271 			    "NOT enough space available for logging\n");
272 			commd_debug(MD_MMV_SYSLOG,
273 			    "Have %lld bytes, need %lld bytes\n",
274 			    avail_bytes, MIN_FS_SPACE);
275 			warned = 1;
276 			md_commd_global_verb = MD_MMV_NULL;
277 		} else {
278 			warned = 0;
279 		}
280 
281 		fflush(commdout);
282 	}
283 }
284 
285 /* safer version of clnt_destroy. If clnt is NULL don't do anything */
286 #define	mdmn_clnt_destroy(clnt) {	\
287 	if (clnt)			\
288 		clnt_destroy(clnt);	\
289 }
290 
291 /*
292  * Own version of svc_sendreply that checks the integrity of the transport
293  * handle and so prevents us from core dumps in the real svc_sendreply()
294  */
295 void
296 mdmn_svc_sendreply(SVCXPRT *transp, xdrproc_t xdr, caddr_t data)
297 {
298 	if (SVC_STAT(transp) == XPRT_DIED) {
299 		commd_debug(MD_MMV_MISC,
300 		    "mdmn_svc_sendreply: XPRT_DIED\n");
301 		return;
302 	}
303 	(void) svc_sendreply(transp, xdr, data);
304 }
305 
306 /*
307  * timeout_initiator(set, class)
308  *
309  * Alas, I sent a message and didn't get a response back in aproppriate time.
310  *
311  * timeout_initiator() takes care for doing the needed svc_sendreply() to the
312  * calling mdmn_send_message, so that guy doesn't wait forever
313  * What is done here is pretty much the same as what is done in
314  * wakeup initiator. The difference is that we cannot provide for any results,
315  * of course and we set the comm_state to MDMNE_TIMEOUT.
316  *
317  * By doing so, mdmn_send_message can decide if a retry would make sense or not.
318  * It's not our's to decide that here.
319  */
320 void
321 timeout_initiator(set_t setno, md_mn_msgclass_t class)
322 {
323 	SVCXPRT		*transp;
324 	md_mn_msgid_t	mid;
325 	md_mn_result_t *resultp;
326 
327 	resultp = Zalloc(sizeof (md_mn_result_t));
328 	resultp->mmr_comm_state	= MDMNE_TIMEOUT;
329 
330 	commd_debug(MD_MMV_MISC,
331 	    "timeout_initiator set = %d, class = %d\n", setno, class);
332 
333 	transp = mdmn_get_initiator_table_transp(setno, class);
334 	mdmn_get_initiator_table_id(setno, class, &mid);
335 
336 	commd_debug(MD_MMV_MISC, "timeout_ini: (%d, 0x%llx-%d)\n",
337 	    MSGID_ELEMS(mid));
338 
339 	/* return to mdmn_send_message() and let it deal with the situation */
340 	mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
341 
342 	free(resultp);
343 	commd_debug(MD_MMV_MISC, "timeout_ini: sendreplied\n");
344 	mdmn_unregister_initiator_table(setno, class);
345 }
346 
347 
348 /*
349  * check_timeouts - thread
350  *
351  * This implements a timeout surveillance for messages sent from the
352  * initiator to the master.
353  *
354  * If a message is started, this thread is triggered thru
355  * cond_signal(&check_timeout_cv) and we keep track of the numbers of
356  * messages that are outstanding (messages_on_their_way).
357  *
358  * As long as there are messages on their way, this thread never goes to sleep.
359  * It'll keep checking all class/set combinations for outstanding messages.
360  * If one is found, it's checked if this message is overdue. In that case,
361  * timeout_initiator() is called to wakeup the calling mdmn_send_message and
362  * to clean up the mess.
363  *
364  * If the result from the master arrives later, this message is considered
365  * to be unsolicited. And will be ignored.
366  */
367 
368 void
369 check_timeouts()
370 {
371 	set_t			setno;
372 	time_t			now, then;
373 	mutex_t			*mx;
374 	md_mn_msgclass_t	class;
375 
376 	for (; ; ) {
377 		now = time((time_t *)NULL);
378 		for (setno = 1; setno < MD_MAXSETS; setno++) {
379 			if (md_mn_set_inited[setno] != MDMN_SET_READY) {
380 				continue;
381 			}
382 			for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES;
383 			    class++) {
384 				mx = mdmn_get_initiator_table_mx(setno, class);
385 				mutex_lock(mx);
386 
387 				/* then is the registered time */
388 				then =
389 				    mdmn_get_initiator_table_time(setno, class);
390 				if ((then != 0) && (now > then)) {
391 					timeout_initiator(setno, class);
392 				}
393 				mutex_unlock(mx);
394 			}
395 		}
396 		/* it's ok to check only once per second */
397 		sleep(1);
398 
399 		/* is there work to do? */
400 		mutex_lock(&check_timeout_mutex);
401 		if (messages_on_their_way == 0) {
402 			cond_wait(&check_timeout_cv, &check_timeout_mutex);
403 		}
404 		mutex_unlock(&check_timeout_mutex);
405 	}
406 }
407 
408 void
409 setup_debug(void)
410 {
411 	char	*tmp_dir;
412 
413 	/* Read in the debug-controlling tokens from runtime.cf */
414 	md_commd_global_verb = commd_get_verbosity();
415 	/*
416 	 * If the user didn't specify a verbosity level in runtime.cf
417 	 * we can safely return here. As we don't intend to printout
418 	 * debug messages, we don't need to check for the output file.
419 	 */
420 	if (md_commd_global_verb == 0) {
421 		return;
422 	}
423 
424 	/* if commdout is non-NULL it is an open FILE, we'd better close it */
425 	if (commdout != (FILE *)NULL) {
426 		fclose(commdout);
427 	}
428 
429 	commdoutfile = commd_get_outfile();
430 
431 	/* setup the debug output */
432 	if (commdoutfile == (char *)NULL) {
433 		/* if no valid file was specified, use the default */
434 		commdoutfile = "/var/run/commd.out";
435 		commdout = fopen(commdoutfile, "a");
436 	} else {
437 		/* check if the directory exists and is writable */
438 		tmp_dir = strdup(commdoutfile);
439 		if ((access(dirname(tmp_dir), X_OK|W_OK)) ||
440 		    ((commdout = fopen(commdoutfile, "a")) == (FILE *)NULL)) {
441 			syslog(LOG_ERR,
442 			    "Can't write to specified output file %s,\n"
443 			    "using /var/run/commd.out instead\n", commdoutfile);
444 			free(commdoutfile);
445 			commdoutfile = "/var/run/commd.out";
446 			commdout = fopen(commdoutfile, "a");
447 		}
448 		free(tmp_dir);
449 	}
450 
451 	if (commdout == (FILE *)NULL) {
452 		syslog(LOG_ERR, "Can't write to debug output file %s\n",
453 		    commdoutfile);
454 	}
455 }
456 /*
457  * global_init()
458  *
459  * Perform some global initializations.
460  *
461  * the following routines have to call this before operation can start:
462  *  - mdmn_send_svc_1
463  *  - mdmn_work_svc_1
464  *  - mdmn_comm_lock_svc_1
465  *  - mdmn_comm_unlock_svc_1
466  *  - mdmn_comm_suspend_svc_1
467  *  - mdmn_comm_resume_svc_1
468  *  - mdmn_comm_reinit_set_svc_1
469  *
470  * This is a single threaded daemon, so it can only be in one of the above
471  * routines at the same time.
472  * This means, global_init() cannot be called more than once at the same time.
473  * Hence, no lock is needed.
474  */
475 void
476 global_init(void)
477 {
478 	set_t			set;
479 	md_mn_msgclass_t	class;
480 	struct sigaction	sighandler;
481 	time_t			clock_val;
482 
483 	/* Do these global initializations only once */
484 	if (md_commd_global_state & MD_CGS_INITED) {
485 		return;
486 	}
487 	(void) sdssc_bind_library();
488 
489 	/* setup the debug options from the config file */
490 	setup_debug();
491 
492 	/* Make setup_debug() be the action in case of SIGHUP */
493 	sighandler.sa_flags = 0;
494 	sigfillset(&sighandler.sa_mask);
495 	sighandler.sa_handler = (void (*)(int)) setup_debug;
496 	sigaction(SIGHUP, &sighandler, NULL);
497 
498 	__savetime = gethrtime();
499 	(void) time(&clock_val);
500 	commd_debug(MD_MMV_MISC, "global init called %s\n",
501 			ctime(&clock_val));
502 
503 	/* start a thread that flushes out the debug on a regular basis */
504 	thr_create(NULL, 0, (void *(*)(void *))flush_fcout,
505 	    (void *) NULL, THR_DETACHED, NULL);
506 
507 	/* global rwlock's / mutex's / cond_t's go here */
508 	mutex_init(&check_timeout_mutex, USYNC_THREAD, NULL);
509 	cond_init(&check_timeout_cv, USYNC_THREAD, NULL);
510 	mutex_init(&get_setdesc_mutex, USYNC_THREAD, NULL);
511 
512 	/* Make sure the initiator table is initialized correctly */
513 	for (set = 0; set < MD_MAXSETS; set++) {
514 		for (class = 0; class < MD_MN_NCLASSES; class++) {
515 			mdmn_unregister_initiator_table(set, class);
516 		}
517 	}
518 
519 
520 	/* setup the check for timeouts */
521 	thr_create(NULL, 0, (void *(*)(void *))check_timeouts,
522 	    (void *) NULL, THR_DETACHED, NULL);
523 
524 	md_commd_global_state |= MD_CGS_INITED;
525 }
526 
527 
528 /*
529  * mdmn_init_client(setno, nodeid)
530  * called if client[setno][nodeid] is NULL
531  *
532  * NOTE: Must be called with set_desc_rwlock held as a reader
533  * NOTE: Must be called with client_rwlock held as a writer
534  *
535  * If the rpc client for this node has not been setup for any set, we do it now.
536  *
537  * Returns	0 on success (node found in set, rpc client setup)
538  *		-1 if metaget_setdesc failed,
539  *		-2 if node not part of set
540  *		-3 if clnt_create fails
541  */
542 static int
543 mdmn_init_client(set_t setno, md_mn_nodeid_t nid)
544 {
545 	md_error_t	ep = mdnullerror;
546 	md_mnnode_desc	*node;
547 	md_set_desc	*sd;	/* just an abbr for set_descriptor[setno] */
548 
549 	sd = set_descriptor[setno];
550 
551 	/*
552 	 * Is the appropriate set_descriptor already initialized ?
553 	 * Can't think of a scenario where this is not the case, but we'd better
554 	 * check for it anyway.
555 	 */
556 	if (sd == NULL) {
557 		mdsetname_t	*sp;
558 
559 		rw_unlock(&set_desc_rwlock[setno]); /* readlock -> writelock */
560 		rw_wrlock(&set_desc_rwlock[setno]);
561 		sp = metasetnosetname(setno, &ep);
562 		/* Only one thread is supposed to be in metaget_setdesc() */
563 		mutex_lock(&get_setdesc_mutex);
564 		sd = metaget_setdesc(sp, &ep);
565 		mutex_unlock(&get_setdesc_mutex);
566 		if (sd == NULL) {
567 			rw_unlock(&set_desc_rwlock[setno]); /* back to ... */
568 			rw_rdlock(&set_desc_rwlock[setno]); /* ... readlock */
569 			return (-1);
570 		}
571 		set_descriptor[setno] = sd;
572 		rw_unlock(&set_desc_rwlock[setno]); /* back to readlock */
573 		rw_rdlock(&set_desc_rwlock[setno]);
574 	}
575 
576 	/* first we have to find the node name for this node id */
577 	for (node = sd->sd_nodelist; node; node = node->nd_next) {
578 		if (node->nd_nodeid == nid)
579 			break; /* we found our node in this set */
580 	}
581 
582 
583 	if (node == (md_mnnode_desc *)NULL) {
584 		commd_debug(MD_MMV_SYSLOG,
585 		    "FATAL: node %d not found in set %d\n", nid, setno);
586 		rw_unlock(&set_desc_rwlock[setno]);
587 		return (-2);
588 	}
589 
590 	commd_debug(MD_MMV_INIT, "init: %s has the flags: 0x%x\n",
591 	    node->nd_nodename ? node->nd_nodename : "NULL", node->nd_flags);
592 
593 	/* Did this node join the diskset?  */
594 	if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
595 		commd_debug(MD_MMV_INIT, "init: %s didn't join set %d\n",
596 		    node->nd_nodename ? node->nd_nodename : "NULL", setno);
597 		rw_unlock(&set_desc_rwlock[setno]);
598 		return (-2);
599 	}
600 
601 	/* if clnt_create has not been done for that node, do it now */
602 	if (client[setno][nid] == (CLIENT *) NULL) {
603 		client[setno][nid] = meta_client_create_retry(node->nd_nodename,
604 			mdmn_clnt_create, (void *) node, MD_CLNT_CREATE_TOUT,
605 			&ep);
606 		if (client[setno][nid] == (CLIENT *) NULL) {
607 			clnt_pcreateerror(node->nd_nodename);
608 			rw_unlock(&set_desc_rwlock[setno]);
609 			return (-3);
610 		}
611 		/* this node has the license to send */
612 		commd_debug(MD_MMV_MISC, "init_client: calling add_lic\n");
613 		add_license(node);
614 
615 		/* set the timeout value */
616 		clnt_control(client[setno][nid], CLSET_TIMEOUT,
617 		    (char *)&FOUR_SECS);
618 
619 	}
620 	rw_unlock(&set_desc_rwlock[setno]);
621 	return (0);
622 }
623 
624 /*
625  * check_client(setno, nodeid)
626  *
627  * must be called with reader lock held for set_desc_rwlock[setno]
628  * and must be called with reader lock held for client_rwlock[setno]
629  * Checks if the client for this set/node combination is already setup
630  * if not it upgrades the lock to a writer lock
631  * and tries to initialize the client.
632  * Finally it's checked if the client nulled out again due to some race
633  *
634  * returns 0 if there is a usable client
635  * returns MDMNE_RPC_FAIL otherwise
636  */
637 static int
638 check_client(set_t setno, md_mn_nodeid_t nodeid)
639 {
640 	int ret = 0;
641 
642 	while ((client[setno][nodeid] == (CLIENT *)NULL) && (ret == 0)) {
643 		rw_unlock(&client_rwlock[setno]); /* upgrade reader ... */
644 		rw_wrlock(&client_rwlock[setno]); /* ... to writer lock. */
645 		if (mdmn_init_client(setno, nodeid) != 0) {
646 			ret = MDMNE_RPC_FAIL;
647 		}
648 		rw_unlock(&client_rwlock[setno]); /* downgrade writer ... */
649 		rw_rdlock(&client_rwlock[setno]); /* ... back to reader lock. */
650 	}
651 	return (ret);
652 }
653 
654 /*
655  * mdmn_init_set(setno, todo)
656  * setno is the number of the set to be initialized.
657  * todo is one of the MDMN_SET_* thingies or MDMN_SET_READY
658  * If called with MDMN_SET_READY everything is initialized.
659  *
660  * If the set mutexes are already initialized, the caller has to hold
661  * both set_desc_rwlock[setno] and client_rwlock[setno] as a writer, before
662  * calling mdmn_init_set()
663  */
664 int
665 mdmn_init_set(set_t setno, int todo)
666 {
667 	int class;
668 	md_mnnode_desc	*node;
669 	md_set_desc	*sd; /* just an abbr for set_descriptor[setno] */
670 	mdsetname_t	*sp;
671 	md_error_t	ep = mdnullerror;
672 	md_mn_nodeid_t	nid;
673 
674 	/*
675 	 * Check if we are told to setup the mutexes and
676 	 * if these are not yet setup
677 	 */
678 	if ((todo & MDMN_SET_MUTEXES) &&
679 	    ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0)) {
680 		mutex_init(&mdmn_busy_mutex[setno], USYNC_THREAD, NULL);
681 		cond_init(&mdmn_busy_cv[setno], USYNC_THREAD, NULL);
682 		rwlock_init(&client_rwlock[setno], USYNC_THREAD, NULL);
683 		rwlock_init(&set_desc_rwlock[setno], USYNC_THREAD, NULL);
684 
685 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
686 			mutex_init(mdmn_get_master_table_mx(setno, class),
687 			    USYNC_THREAD, NULL);
688 			cond_init(mdmn_get_master_table_cv(setno, class),
689 			    USYNC_THREAD, NULL);
690 			mutex_init(mdmn_get_initiator_table_mx(setno, class),
691 			    USYNC_THREAD, NULL);
692 		}
693 		md_mn_set_inited[setno] |= MDMN_SET_MUTEXES;
694 	}
695 	if ((todo & MDMN_SET_MCT) &&
696 	    ((md_mn_set_inited[setno] & MDMN_SET_MCT) == 0)) {
697 		int	fd;
698 		size_t	filesize;
699 		caddr_t	addr;
700 		char table_name[32];
701 
702 		filesize = (sizeof (md_mn_mct_t));
703 		(void) snprintf(table_name, sizeof (table_name), "%s%d",
704 		    MD_MN_MSG_COMP_TABLE, setno);
705 		/*
706 		 * If the mct file exists we map it into memory.
707 		 * Otherwise we create an empty file of appropriate
708 		 * size and map that into memory.
709 		 * The mapped areas are stored in mct[setno].
710 		 */
711 		fd = open(table_name, O_RDWR|O_CREAT|O_DSYNC, 0600);
712 		if (fd < 0) {
713 			commd_debug(MD_MMV_MISC,
714 			    "init_set: Can't open MCT\n");
715 			return (-1);
716 		}
717 		/*
718 		 * To ensure that the file has the appropriate size,
719 		 * we write a byte at the end of the file.
720 		 */
721 		lseek(fd, filesize + 1, SEEK_SET);
722 		write(fd, "\0", 1);
723 
724 		/* at this point we have a file in place that we can mmap */
725 		addr = mmap(0, filesize, PROT_READ | PROT_WRITE,
726 		    MAP_SHARED, fd, (off_t)0);
727 		if (addr == MAP_FAILED) {
728 			commd_debug(MD_MMV_INIT,
729 			    "init_set: mmap mct error %d\n",
730 			    errno);
731 			return (-1);
732 		}
733 		/* LINTED pointer alignment */
734 		mct[setno] = (md_mn_mct_t *)addr;
735 
736 		/* finally we initialize the mutexes that protect the mct */
737 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
738 			mutex_init(&(mct_mutex[setno][class]),
739 			    USYNC_THREAD, NULL);
740 		}
741 
742 		md_mn_set_inited[setno] |= MDMN_SET_MCT;
743 	}
744 	/*
745 	 * Check if we are told to setup the nodes and
746 	 * if these are not yet setup
747 	 * (Attention: negative logic here compared to above!)
748 	 */
749 	if (((todo & MDMN_SET_NODES) == 0) ||
750 	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
751 		return (0); /* success */
752 	}
753 
754 	if ((sp = metasetnosetname(setno, &ep)) == NULL) {
755 		commd_debug(MD_MMV_SYSLOG,
756 		    "metasetnosetname(%d) returned NULL\n", setno);
757 		return (MDMNE_NOT_JOINED);
758 	}
759 
760 	/* flush local copy of rpc.metad data */
761 	metaflushsetname(sp);
762 
763 	mutex_lock(&get_setdesc_mutex);
764 	sd = metaget_setdesc(sp, &ep);
765 	mutex_unlock(&get_setdesc_mutex);
766 
767 	if (sd == NULL) {
768 		commd_debug(MD_MMV_SYSLOG,
769 		    "metaget_setdesc(%d) returned NULL\n", setno);
770 		return (MDMNE_NOT_JOINED);
771 	}
772 
773 	/*
774 	 * if this set is not a multinode set or
775 	 * this node didn't join yet the diskset, better don't do anything
776 	 */
777 	if ((MD_MNSET_DESC(sd) == 0) ||
778 	    (sd->sd_mn_mynode->nd_flags & MD_MN_NODE_OWN) == 0) {
779 		commd_debug(MD_MMV_INIT, "didn't yet join set %d\n", setno);
780 		return (MDMNE_NOT_JOINED);
781 	}
782 
783 	for (node = sd->sd_nodelist; node != NULL; node = node->nd_next) {
784 		nid = node->nd_nodeid;
785 
786 		commd_debug(MD_MMV_INIT,
787 		    "setting up: node=%s, priv_ic=%s, flags=0x%x\n",
788 		    node->nd_nodename ? node->nd_nodename : "NULL",
789 		    node->nd_priv_ic ? node->nd_priv_ic : "NULL",
790 		    node->nd_flags);
791 
792 		if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
793 			commd_debug(MD_MMV_INIT,
794 			    "init: %s didn't join set %d\n",
795 			    node->nd_nodename ? node->nd_nodename : "NULL",
796 			    setno);
797 			continue;
798 		}
799 
800 		if (client[setno][nid] != (CLIENT *) NULL) {
801 			/* already inited */
802 			commd_debug(MD_MMV_INIT, "init: already: node=%s\n",
803 			    node->nd_nodename ? node->nd_nodename : "NULL");
804 			continue;
805 		}
806 		client[setno][nid] = meta_client_create_retry(node->nd_nodename,
807 			mdmn_clnt_create, (void *)node, MD_CLNT_CREATE_TOUT,
808 			&ep);
809 
810 		if (client[setno][nid] == (CLIENT *) NULL) {
811 			clnt_pcreateerror(node->nd_nodename);
812 			/*
813 			 * If we cannot connect to a single node
814 			 * (maybe because it is down) we mark this node as not
815 			 * owned and continue with the next node in the list.
816 			 * This is better than failing the entire starting up
817 			 * of the commd system.
818 			 */
819 			node->nd_flags &= ~MD_MN_NODE_OWN;
820 			commd_debug(MD_MMV_SYSLOG,
821 			    "WARNING couldn't create client for %s\n"
822 			    "Reconfig cycle required\n",
823 			    node->nd_nodename);
824 			commd_debug(MD_MMV_INIT,
825 			    "WARNING couldn't create client for %s\n"
826 			    "Reconfig cycle required\n",
827 			    node->nd_nodename);
828 			continue;
829 		}
830 		/* this node has the license to send */
831 		commd_debug(MD_MMV_MISC, "init_set: calling add_lic\n");
832 		add_license(node);
833 
834 		/* set the timeout value */
835 		clnt_control(client[setno][nid], CLSET_TIMEOUT,
836 		    (char *)&FOUR_SECS);
837 
838 		commd_debug(MD_MMV_INIT, "init: done: node=%s\n",
839 		    node->nd_nodename ? node->nd_nodename : "NULL");
840 	}
841 
842 	set_descriptor[setno] = sd;
843 	md_mn_set_inited[setno] |= MDMN_SET_NODES;
844 	return (0); /* success */
845 }
846 
847 void *
848 mdmn_send_to_work(void *arg)
849 {
850 	int			*rpc_err;
851 	int			success;
852 	int			try_master;
853 	set_t			setno;
854 	mutex_t			*mx;	/* protection for initiator_table */
855 	SVCXPRT			*transp;
856 	md_mn_msg_t		*msg;
857 	md_mn_nodeid_t		set_master;
858 	md_mn_msgclass_t	class;
859 	md_mn_msg_and_transp_t	*matp = (md_mn_msg_and_transp_t *)arg;
860 
861 	msg			= matp->mat_msg;
862 	transp			= matp->mat_transp;
863 
864 	/* the alloc was done in mdmn_send_svc_1 */
865 	free(matp);
866 
867 	class = mdmn_get_message_class(msg->msg_type);
868 	setno = msg->msg_setno;
869 
870 	/* set the sender, so the master knows who to send the results */
871 	rw_rdlock(&set_desc_rwlock[setno]);
872 	msg->msg_sender = set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
873 	set_master	= set_descriptor[setno]->sd_mn_master_nodeid;
874 
875 	mx = mdmn_get_initiator_table_mx(setno, class);
876 	mutex_lock(mx);
877 
878 	/*
879 	 * Here we check, if the initiator table slot for this set/class
880 	 * combination is free to use.
881 	 * If this is not the case, we return CLASS_BUSY forcing the
882 	 * initiating send_message call to retry
883 	 */
884 	success = mdmn_check_initiator_table(setno, class);
885 	if (success == MDMNE_CLASS_BUSY) {
886 		md_mn_msgid_t		active_mid;
887 
888 		mdmn_get_initiator_table_id(setno, class,
889 		&active_mid);
890 
891 		commd_debug(MD_MMV_SEND,
892 		    "send_to_work: received but locally busy "
893 		    "(%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
894 		    "active msg=(%d, 0x%llx-%d)\n",
895 		    MSGID_ELEMS(msg->msg_msgid), setno, class,
896 		    msg->msg_type, MSGID_ELEMS(active_mid));
897 	} else {
898 		commd_debug(MD_MMV_SEND,
899 		    "send_to_work: received (%d, 0x%llx-%d), "
900 		    "set=%d, class=%d, type=%d\n",
901 		    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
902 	}
903 
904 	try_master = 2; /* return failure after two retries */
905 	while ((success == MDMNE_ACK) && (try_master--)) {
906 		rw_rdlock(&client_rwlock[setno]);
907 		/* is the rpc client to the master still around ? */
908 		if (check_client(setno, set_master)) {
909 			success = MDMNE_RPC_FAIL;
910 			FLUSH_DEBUGFILE();
911 			rw_unlock(&client_rwlock[setno]);
912 			break; /* out of try_master-loop */
913 		}
914 
915 		/*
916 		 * Send the request to the work function on the master
917 		 * this call will return immediately
918 		 */
919 		rpc_err = mdmn_work_1(msg, client[setno][set_master]);
920 
921 		/* Everything's Ok? */
922 		if (rpc_err == NULL) {
923 			success = MDMNE_RPC_FAIL;
924 			/*
925 			 * Probably something happened to the daemon on the
926 			 * master. Kill the client, and try again...
927 			 */
928 			rw_unlock(&client_rwlock[setno]);
929 			rw_wrlock(&client_rwlock[setno]);
930 			mdmn_clnt_destroy(client[setno][set_master]);
931 			if (client[setno][set_master] != (CLIENT *)NULL) {
932 				client[setno][set_master] = (CLIENT *)NULL;
933 			}
934 			rw_unlock(&client_rwlock[setno]);
935 			continue;
936 
937 		} else  if (*rpc_err != MDMNE_ACK) {
938 			/* something went wrong, break out */
939 			success = *rpc_err;
940 			free(rpc_err);
941 			rw_unlock(&client_rwlock[setno]);
942 			break; /* out of try_master-loop */
943 		}
944 
945 		rw_unlock(&client_rwlock[setno]);
946 		free(rpc_err);
947 
948 		/*
949 		 * If we are here, we sucessfully delivered the message.
950 		 * We register the initiator_table, so that
951 		 * wakeup_initiator_1  can do the sendreply with the
952 		 * results for us.
953 		 */
954 		success = MDMNE_ACK;
955 		mdmn_register_initiator_table(setno, class, msg, transp);
956 
957 		/* tell check_timeouts, there's work to do */
958 		mutex_lock(&check_timeout_mutex);
959 		messages_on_their_way++;
960 		cond_signal(&check_timeout_cv);
961 		mutex_unlock(&check_timeout_mutex);
962 		break; /* out of try_master-loop */
963 	}
964 
965 	rw_unlock(&set_desc_rwlock[setno]);
966 
967 	if (success == MDMNE_ACK) {
968 		commd_debug(MD_MMV_SEND,
969 		    "send_to_work: registered (%d, 0x%llx-%d)\n",
970 		    MSGID_ELEMS(msg->msg_msgid));
971 	} else {
972 		/* In case of failure do the sendreply now */
973 		md_mn_result_t *resultp;
974 		resultp = Zalloc(sizeof (md_mn_result_t));
975 		resultp->mmr_comm_state = success;
976 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
977 		commd_debug(MD_MMV_SEND,
978 		    "send_to_work: not registered (%d, 0x%llx-%d) cs=%d\n",
979 		    MSGID_ELEMS(msg->msg_msgid), success);
980 		free_result(resultp);
981 
982 	}
983 
984 	free_msg(msg);
985 	mutex_unlock(mx);
986 	return (NULL);
987 
988 }
989 
990 /*
991  * do_message_locally(msg, result)
992  * Process a message locally on the master
993  * Lookup the MCT if the message has already been processed.
994  * If not, call the handler and store the result
995  * If yes, retrieve the result from the MCT.
996  * Return:
997  *	MDMNE_ACK in case of success
998  *	MDMNE_LOG_FAIL if the MCT could not be checked
999  */
1000 static int
1001 do_message_locally(md_mn_msg_t *msg, md_mn_result_t *result)
1002 {
1003 	int			completed;
1004 	set_t			setno;
1005 	md_mn_msgtype_t		msgtype = msg->msg_type;
1006 	md_mn_msgclass_t	class;
1007 
1008 	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1009 
1010 	handler = mdmn_get_handler(msgtype);
1011 	if (handler == NULL) {
1012 		result->mmr_exitval = 0;
1013 		/* let the sender decide if this is an error or not */
1014 		result->mmr_comm_state = MDMNE_NO_HANDLER;
1015 		return (MDMNE_NO_HANDLER);
1016 	}
1017 
1018 	class = mdmn_get_message_class(msg->msg_type);
1019 	setno = msg->msg_setno;
1020 
1021 	result->mmr_msgtype	= msgtype;
1022 	result->mmr_flags	= msg->msg_flags;
1023 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1024 
1025 	mutex_lock(&mct_mutex[setno][class]);
1026 	completed = mdmn_check_completion(msg, result);
1027 	if (completed == MDMN_MCT_NOT_DONE) {
1028 		/* message not yet processed locally */
1029 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1030 		    "calling handler for (%d,0x%llx-%d) type %d\n",
1031 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1032 
1033 		/*
1034 		 * Mark the message as being currently processed,
1035 		 * so we won't start a second handler for it
1036 		 */
1037 		(void) mdmn_mark_completion(msg, NULL, MDMN_MCT_IN_PROGRESS);
1038 		mutex_unlock(&mct_mutex[setno][class]);
1039 
1040 		/* here we actually process the message on the master */
1041 		(*handler)(msg, MD_MSGF_ON_MASTER, result);
1042 
1043 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1044 		    "finished handler for (%d,0x%llx-%d) type %d\n",
1045 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1046 
1047 		/* Mark the message as fully processed, store the result */
1048 		mutex_lock(&mct_mutex[setno][class]);
1049 		(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
1050 	} else if (completed == MDMN_MCT_DONE) {
1051 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1052 		    "result for (%d, 0x%llx-%d) from MCT\n",
1053 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1054 	} else if (completed == MDMN_MCT_IN_PROGRESS) {
1055 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1056 		    "(%d, 0x%llx-%d) is currently being processed\n",
1057 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1058 	} else {
1059 		/* MCT error occurred (should never happen) */
1060 		mutex_unlock(&mct_mutex[setno][class]);
1061 		result->mmr_comm_state = MDMNE_LOG_FAIL;
1062 		commd_debug(MD_MMV_SYSLOG, "WARNING "
1063 		    "mdmn_check_completion returned %d "
1064 		    "for (%d,0x%llx-%d)\n", completed,
1065 		    MSGID_ELEMS(msg->msg_msgid));
1066 		return (MDMNE_LOG_FAIL);
1067 	}
1068 	mutex_unlock(&mct_mutex[setno][class]);
1069 	return (MDMNE_ACK);
1070 
1071 }
1072 
1073 /*
1074  * do_send_message(msg, node)
1075  *
1076  * Send a message to a given node and wait for a acknowledgment, that the
1077  * message has arrived on the remote node.
1078  * Make sure that the client for the set is setup correctly.
1079  * If no ACK arrives, destroy and recreate the RPC client and retry the
1080  * message one time
1081  * After actually sending wait no longer than the appropriate number of
1082  * before timing out the message.
1083  *
1084  * Note must be called with set_desc_wrlock held in reader mode
1085  */
1086 static int
1087 do_send_message(md_mn_msg_t *msg, md_mnnode_desc *node)
1088 {
1089 	int			err;
1090 	int			rpc_retries;
1091 	int			timeout_retries = 0;
1092 	int			*ret = NULL;
1093 	set_t			setno;
1094 	cond_t			*cv;	/* see mdmn_wakeup_master_svc_1 */
1095 	mutex_t			*mx;	/* protection for class_busy */
1096 	timestruc_t		timeout; /* surveillance for remote daemon */
1097 	md_mn_nodeid_t		nid;
1098 	md_mn_msgtype_t		msgtype;
1099 	md_mn_msgclass_t	class;
1100 
1101 	nid	= node->nd_nodeid;
1102 	msgtype = msg->msg_type;
1103 	setno	= msg->msg_setno;
1104 	class	= mdmn_get_message_class(msgtype);
1105 	mx	= mdmn_get_master_table_mx(setno, class);
1106 	cv	= mdmn_get_master_table_cv(setno, class);
1107 
1108 retry_rpc:
1109 
1110 	/* We try two times to send the message */
1111 	rpc_retries = 2;
1112 
1113 	/*
1114 	 * if sending the message doesn't succeed the first time due to a
1115 	 * RPC problem, we retry one time
1116 	 */
1117 	while ((rpc_retries != 0) && (ret == NULL)) {
1118 		/*  in abort state, we error out immediately */
1119 		if (md_commd_global_state & MD_CGS_ABORTED) {
1120 			return (MDMNE_ABORT);
1121 		}
1122 
1123 		rw_rdlock(&client_rwlock[setno]);
1124 		/* unable to create client? Ignore it */
1125 		if (check_client(setno, nid)) {
1126 			/*
1127 			 * In case we cannot establish an RPC client, we
1128 			 * take this node out of our considerations.
1129 			 * This will be reset by a reconfig
1130 			 * cycle that should come pretty soon.
1131 			 * MNISSUE: Should a reconfig cycle
1132 			 * be forced on SunCluster?
1133 			 */
1134 			node->nd_flags &= ~MD_MN_NODE_OWN;
1135 			commd_debug(MD_MMV_SYSLOG,
1136 			    "WARNING couldn't create client for %s\n"
1137 			    "Reconfig cycle required\n",
1138 			    node->nd_nodename);
1139 			commd_debug(MD_MMV_PROC_M, "proc_mas: (%d,0x%llx-%d) "
1140 			    "WARNING couldn't create client for %s\n",
1141 			    MSGID_ELEMS(msg->msg_msgid), node->nd_nodename);
1142 			rw_unlock(&client_rwlock[setno]);
1143 			return (MDMNE_IGNORE_NODE);
1144 		}
1145 		/* let's be paranoid and check again before sending */
1146 		if (client[setno][nid] == NULL) {
1147 			/*
1148 			 * if this is true, strange enough, we catch our breath,
1149 			 * and then continue, so that the client is set up
1150 			 * once again.
1151 			 */
1152 			commd_debug(MD_MMV_PROC_M, "client is NULL\n");
1153 			rw_unlock(&client_rwlock[setno]);
1154 			sleep(1);
1155 			continue;
1156 		}
1157 
1158 		/* send it over, it will return immediately */
1159 		ret = mdmn_work_1(msg, client[setno][nid]);
1160 
1161 		rw_unlock(&client_rwlock[setno]);
1162 
1163 		if (ret != NULL) {
1164 			commd_debug(MD_MMV_PROC_M,
1165 			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1166 			    " 0x%x\n",
1167 			    MSGID_ELEMS(msg->msg_msgid), nid, *ret);
1168 		} else {
1169 			commd_debug(MD_MMV_PROC_M,
1170 			    "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1171 			    " NULL \n",
1172 			    MSGID_ELEMS(msg->msg_msgid), nid);
1173 		}
1174 
1175 		if ((ret == NULL) || (*ret == MDMNE_CANNOT_CONNECT) ||
1176 		    (*ret == MDMNE_THR_CREATE_FAIL)) {
1177 			/*
1178 			 * Something happened to the daemon on the other side.
1179 			 * Kill the client, and try again.
1180 			 * check_client() will create a new client
1181 			 */
1182 			rw_wrlock(&client_rwlock[setno]);
1183 			mdmn_clnt_destroy(client[setno][nid]);
1184 			if (client[setno][nid] != (CLIENT *)NULL) {
1185 				client[setno][nid] = (CLIENT *)NULL;
1186 			}
1187 			rw_unlock(&client_rwlock[setno]);
1188 
1189 			/* ... but don't try infinitely */
1190 			--rpc_retries;
1191 			continue;
1192 		}
1193 		/*
1194 		 * If the class is locked on the other node, keep trying.
1195 		 * This situation will go away automatically,
1196 		 * if we wait long enough
1197 		 */
1198 		if (*ret == MDMNE_CLASS_LOCKED) {
1199 			sleep(1);
1200 			free(ret);
1201 			ret = NULL;
1202 			continue;
1203 		}
1204 	}
1205 	if (ret == NULL) {
1206 		return (MDMNE_RPC_FAIL);
1207 	}
1208 
1209 
1210 	/* if the slave is in abort state, we just ignore it. */
1211 	if (*ret == MDMNE_ABORT) {
1212 		commd_debug(MD_MMV_PROC_M,
1213 		    "proc_mas: work(%d,0x%llx-%d) returned "
1214 		    "MDMNE_ABORT\n",
1215 		    MSGID_ELEMS(msg->msg_msgid));
1216 		free(ret);
1217 		return (MDMNE_IGNORE_NODE);
1218 	}
1219 
1220 	/* Did the remote processing succeed? */
1221 	if (*ret != MDMNE_ACK) {
1222 		/*
1223 		 * Some commd failure in the middle of sending the msg
1224 		 * to the nodes. We don't continue here.
1225 		 */
1226 		commd_debug(MD_MMV_PROC_M,
1227 		    "proc_mas: work(%d,0x%llx-%d) returns %d\n",
1228 		    MSGID_ELEMS(msg->msg_msgid), *ret);
1229 		free(ret);
1230 		return (MDMNE_RPC_FAIL);
1231 	}
1232 	free(ret);
1233 	ret = NULL;
1234 
1235 	/*
1236 	 * When we are here, we have sent the message to the other node and
1237 	 * we know that node has accepted it.
1238 	 * We go to sleep and have trust to be woken up by wakeup.
1239 	 * If we wakeup due to a timeout, or a signal, no result has been
1240 	 * placed in the appropriate slot.
1241 	 * If we timeout, it is likely that this is because the node has
1242 	 * gone away, so we will destroy the client and try it again in the
1243 	 * expectation that the rpc will fail and we will return
1244 	 * MDMNE_IGNORE_NODE. If that is not the case, the message must still
1245 	 * be being processed on the slave. In this case just timeout for 4
1246 	 * more seconds and then return RPC_FAIL if the message is not complete.
1247 	 */
1248 	timeout.tv_nsec = 0;
1249 	timeout.tv_sec = (timeout_retries == 0) ? mdmn_get_timeout(msgtype) :
1250 	    FOUR_SECS.tv_sec;
1251 	err = cond_reltimedwait(cv, mx, &timeout);
1252 
1253 	if (err == 0) {
1254 		/* everything's fine, return success */
1255 		return (MDMNE_ACK);
1256 	}
1257 
1258 	if (err == ETIME) {
1259 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1260 		    "timeout occured, set=%d, class=%d, "
1261 		    "msgid=(%d, 0x%llx-%d), timeout_retries=%d\n",
1262 		    setno, class, MSGID_ELEMS(msg->msg_msgid), timeout_retries);
1263 		if (timeout_retries == 0) {
1264 			timeout_retries++;
1265 			/*
1266 			 * Destroy the client and try the rpc call again
1267 			 */
1268 			rw_wrlock(&client_rwlock[setno]);
1269 			mdmn_clnt_destroy(client[setno][nid]);
1270 			client[setno][nid] = (CLIENT *)NULL;
1271 			rw_unlock(&client_rwlock[setno]);
1272 			goto retry_rpc;
1273 		}
1274 	} else if (err == EINTR) {
1275 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1276 		    "commd signalled, set=%d, class=%d, "
1277 		    "msgid=(%d, 0x%llx-%d)\n",
1278 		    setno, class, MSGID_ELEMS(msg->msg_msgid));
1279 	} else {
1280 		commd_debug(MD_MMV_PROC_M, "proc_mas: "
1281 		    "cond_reltimedwait err=%d, set=%d, "
1282 		    "class=%d, msgid=(%d, 0x%llx-%d)\n",
1283 		    err, setno, class,
1284 		    MSGID_ELEMS(msg->msg_msgid));
1285 	}
1286 
1287 	/* some failure happened */
1288 	return (MDMNE_RPC_FAIL);
1289 }
1290 
1291 /*
1292  * before we return we have to
1293  * free_msg(msg); because we are working on a copied message
1294  */
1295 void
1296 mdmn_master_process_msg(md_mn_msg_t *msg)
1297 {
1298 	int		*ret;
1299 	int		err;
1300 	int		nmsgs;		/* total number of msgs */
1301 	int		curmsg;		/* index of current msg */
1302 	set_t		setno;
1303 	uint_t		inherit_flags = 0;
1304 	uint_t		secdiff, usecdiff; /* runtime of this message */
1305 	md_error_t	mde = mdnullerror;
1306 	md_mn_msg_t	*msglist[MAX_SUBMESSAGES]; /* all msgs to process */
1307 	md_mn_msg_t	*cmsg;		/* current msg */
1308 	md_mn_msgid_t	dummyid;
1309 	md_mn_result_t	*result;
1310 	md_mn_result_t	*slave_result;
1311 	md_mn_nodeid_t	sender;
1312 	md_mn_nodeid_t	set_master;
1313 	md_mnnode_desc	*node;
1314 	md_mn_msgtype_t	orig_type;	/* type of the original message */
1315 	md_mn_msgtype_t	msgtype;	/* type of the current message */
1316 	md_mn_msgclass_t orig_class;	/* class of the original message */
1317 	md_mn_msgclass_t class;		/* class of the current message */
1318 
1319 	int (*smgen)(md_mn_msg_t *msg, md_mn_msg_t **msglist);
1320 
1321 	orig_type = msgtype = msg->msg_type;
1322 	sender	= msg->msg_sender;
1323 	setno	= msg->msg_setno;
1324 
1325 	result = Zalloc(sizeof (md_mn_result_t));
1326 	result->mmr_setno	= setno;
1327 	result->mmr_msgtype	= msgtype;
1328 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1329 
1330 	orig_class = mdmn_get_message_class(msgtype);
1331 
1332 	commd_debug(MD_MMV_PROC_M,
1333 	    "proc_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1334 	    MSGID_ELEMS(msg->msg_msgid), setno, orig_class, msgtype);
1335 
1336 	rw_rdlock(&set_desc_rwlock[setno]);
1337 	set_master = set_descriptor[setno]->sd_mn_master_nodeid;
1338 	result->mmr_sender	= set_master;
1339 	/*
1340 	 * Put message into the change log unless told otherwise
1341 	 * Note that we only log original messages.
1342 	 * If they are generated by some smgen, we don't log them!
1343 	 * Replay messages aren't logged either.
1344 	 * Note, that replay messages are unlogged on completion.
1345 	 */
1346 	if ((msg->msg_flags & (MD_MSGF_NO_LOG | MD_MSGF_REPLAY_MSG)) == 0) {
1347 		commd_debug(MD_MMV_PROC_M,
1348 		    "proc_mas: calling log_msg for (%d,0x%llx-%d) type %d\n",
1349 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1350 		err = mdmn_log_msg(msg);
1351 		if (err == MDMNE_NULL) {
1352 			/* msg logged successfully */
1353 			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1354 			    "done log_msg for (%d,0x%llx-%d) type %d\n",
1355 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1356 			goto proceed;
1357 		}
1358 		if (err == MDMNE_ACK) {
1359 			/* Same msg in the slot, proceed */
1360 			commd_debug(MD_MMV_PROC_M, "proc_mas: "
1361 			    "already logged (%d,0x%llx-%d) type %d\n",
1362 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1363 			goto proceed;
1364 		}
1365 		if (err == MDMNE_LOG_FAIL) {
1366 			/* Oh, bad, the log is non functional. */
1367 			result->mmr_comm_state = MDMNE_LOG_FAIL;
1368 			/*
1369 			 * Note that the mark_busy was already done by
1370 			 * mdmn_work_svc_1()
1371 			 */
1372 			mutex_lock(&mdmn_busy_mutex[setno]);
1373 			mdmn_mark_class_unbusy(setno, orig_class);
1374 			mutex_unlock(&mdmn_busy_mutex[setno]);
1375 
1376 		}
1377 		if (err == MDMNE_CLASS_BUSY) {
1378 			/*
1379 			 * The log is occupied with a different message
1380 			 * that needs to be played first.
1381 			 * We reject the current message with MDMNE_CLASS_BUSY
1382 			 * to the initiator and do not unbusy the set/class,
1383 			 * because we will proceed with the logged message,
1384 			 * which has the same set/class combination
1385 			 */
1386 			result->mmr_comm_state = MDMNE_CLASS_BUSY;
1387 		}
1388 		ret = (int *)NULL;
1389 		rw_rdlock(&client_rwlock[setno]);
1390 
1391 		if (check_client(setno, sender)) {
1392 			commd_debug(MD_MMV_SYSLOG,
1393 			    "proc_mas: No client for initiator \n");
1394 		} else {
1395 			ret = mdmn_wakeup_initiator_1(result,
1396 			    client[setno][sender]);
1397 		}
1398 		rw_unlock(&client_rwlock[setno]);
1399 
1400 		if (ret == (int *)NULL) {
1401 			commd_debug(MD_MMV_SYSLOG,
1402 			    "proc_mas: couldn't wakeup_initiator \n");
1403 		} else {
1404 			if (*ret != MDMNE_ACK) {
1405 				commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1406 				    "wakeup_initiator returned %d\n", *ret);
1407 			}
1408 			free(ret);
1409 		}
1410 		free_msg(msg);
1411 
1412 		if (err == MDMNE_LOG_FAIL) {
1413 			/* we can't proceed here */
1414 			free_result(result);
1415 			rw_unlock(&set_desc_rwlock[setno]);
1416 			return;
1417 		} else if (err == MDMNE_CLASS_BUSY) {
1418 			mdmn_changelog_record_t *lr;
1419 			lr = mdmn_get_changelogrec(setno, orig_class);
1420 			assert(lr != NULL);
1421 
1422 			/* proceed with the logged message */
1423 			msg = copy_msg(&(lr->lr_msg), NULL);
1424 
1425 			/*
1426 			 * The logged message has to have the same class but
1427 			 * type and sender can be different
1428 			 */
1429 			orig_type = msgtype = msg->msg_type;
1430 			sender	= msg->msg_sender;
1431 
1432 			commd_debug(MD_MMV_PROC_M,
1433 			    "proc_mas: Got new message from change log: "
1434 			    "(%d,0x%llx-%d) type %d\n",
1435 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1436 
1437 			/* continue normal operation with this message */
1438 		}
1439 	}
1440 
1441 proceed:
1442 	smgen = mdmn_get_submessage_generator(msgtype);
1443 	if (smgen == NULL) {
1444 		/* no submessages to create, just use the original message */
1445 		msglist[0] = msg;
1446 		nmsgs = 1;
1447 	} else {
1448 		/* some bits are passed on to submessages */
1449 		inherit_flags = msg->msg_flags & MD_MSGF_INHERIT_BITS;
1450 
1451 		nmsgs = smgen(msg, msglist);
1452 
1453 		/* some settings for the submessages */
1454 		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1455 			cmsg    = msglist[curmsg];
1456 
1457 			/* Apply the inherited flags */
1458 			cmsg->msg_flags |= inherit_flags;
1459 
1460 			/*
1461 			 * Make sure the submessage ID is set correctly
1462 			 * Note: first submessage has mid_smid of 1 (not 0)
1463 			 */
1464 			cmsg->msg_msgid.mid_smid = curmsg + 1;
1465 
1466 			/* need the original class set in msgID (for MCT) */
1467 			cmsg->msg_msgid.mid_oclass = orig_class;
1468 		}
1469 
1470 		commd_debug(MD_MMV_PROC_M,
1471 		    "smgen generated %d submsgs, origclass = %d\n",
1472 		    nmsgs, orig_class);
1473 	}
1474 	/*
1475 	 * This big loop does the following.
1476 	 * For all messages:
1477 	 *	process message on the master first (a message completion
1478 	 *		table MCT ensures a message is not processed twice)
1479 	 *	in case of an error break out of message loop
1480 	 *	for all nodes -- unless MD_MSGF_NO_BCAST is set --
1481 	 *		send message to node until that succeeds
1482 	 *		merge result -- not yet implemented
1483 	 *		respect MD_MSGF_STOP_ON_ERROR
1484 	 */
1485 	for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1486 		int	break_msg_loop = 0;
1487 		mutex_t	*mx;		/* protection for class_busy */
1488 		int	master_err;
1489 		int	master_exitval = -1;
1490 
1491 		cmsg	= msglist[curmsg];
1492 		msgtype = cmsg->msg_type;
1493 		class	= mdmn_get_message_class(msgtype);
1494 		node	= NULL;
1495 		mx	= mdmn_get_master_table_mx(setno, class);
1496 
1497 		/* If we are in the abort state, we error out immediately */
1498 		if (md_commd_global_state & MD_CGS_ABORTED) {
1499 			break; /* out of the message loop */
1500 		}
1501 
1502 		commd_debug(MD_MMV_PROC_M, "class=%d, orig_class=%d\n",
1503 		    class, orig_class);
1504 		/*
1505 		 * If the current class is different from the original class,
1506 		 * we have to lock it down.
1507 		 * The original class is already marked busy.
1508 		 * At this point we cannot refuse the message because the
1509 		 * class is busy right now, so we wait until the class becomes
1510 		 * available again. As soon as something changes for this set
1511 		 * we will be cond_signal'ed (in mdmn_mark_class_unbusy)
1512 		 *
1513 		 * Granularity could be finer (setno/class)
1514 		 */
1515 		if (class != orig_class) {
1516 			mutex_lock(&mdmn_busy_mutex[setno]);
1517 			while (mdmn_mark_class_busy(setno, class) == FALSE) {
1518 				cond_wait(&mdmn_busy_cv[setno],
1519 				    &mdmn_busy_mutex[setno]);
1520 			}
1521 			mutex_unlock(&mdmn_busy_mutex[setno]);
1522 		}
1523 
1524 		master_err = do_message_locally(cmsg, result);
1525 
1526 		if ((master_err != MDMNE_ACK) ||
1527 		    ((master_err == MDMNE_ACK) && (result->mmr_exitval != 0))) {
1528 			result->mmr_failing_node = set_master;
1529 			if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1530 				/*
1531 				 * if appropriate, unbusy the class and
1532 				 * break out of the message loop
1533 				 */
1534 				if (class != orig_class) {
1535 					mutex_lock(&mdmn_busy_mutex[setno]);
1536 					mdmn_mark_class_unbusy(setno, class);
1537 					mutex_unlock(&mdmn_busy_mutex[setno]);
1538 				}
1539 				break;
1540 			}
1541 		}
1542 
1543 		if (master_err == MDMNE_ACK)
1544 			master_exitval = result->mmr_exitval;
1545 
1546 		/* No broadcast? => next message */
1547 		if (cmsg->msg_flags & MD_MSGF_NO_BCAST) {
1548 			/* if appropriate, unbusy the class */
1549 			if (class != orig_class) {
1550 				mutex_lock(&mdmn_busy_mutex[setno]);
1551 				mdmn_mark_class_unbusy(setno, class);
1552 				mutex_unlock(&mdmn_busy_mutex[setno]);
1553 			}
1554 			continue;
1555 		}
1556 
1557 
1558 		/* fake sender, so we get notified when the results are avail */
1559 		cmsg->msg_sender = set_master;
1560 		/*
1561 		 * register to the master_table. It's needed by wakeup_master to
1562 		 * wakeup the sleeping thread.
1563 		 * Access is protected by the class lock: mdmn_mark_class_busy()
1564 		 */
1565 		mdmn_set_master_table_id(setno, class, &(cmsg->msg_msgid));
1566 
1567 
1568 
1569 		rw_rdlock(&set_desc_rwlock[setno]);
1570 		/* Send the message  to all other nodes */
1571 		for (node = set_descriptor[setno]->sd_nodelist; node;
1572 		    node = node->nd_next) {
1573 			md_mn_nodeid_t nid = node->nd_nodeid;
1574 
1575 			/* We are master and have already processed the msg */
1576 			if (node == set_descriptor[setno]->sd_mn_masternode) {
1577 				continue;
1578 			}
1579 
1580 			/* If this node didn't join the disk set, ignore it */
1581 			if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
1582 				continue;
1583 			}
1584 
1585 			mutex_lock(mx);
1586 			/*
1587 			 * Register the node that is addressed,
1588 			 * so we can detect unsolicited messages
1589 			 */
1590 			mdmn_set_master_table_addr(setno, class, nid);
1591 			slave_result = (md_mn_result_t *)NULL;
1592 
1593 			/*
1594 			 * Now send it. do_send_message() will return if
1595 			 *	a failure occurs or
1596 			 *	the results are available
1597 			 */
1598 			err = do_send_message(cmsg, node);
1599 
1600 			/*  in abort state, we error out immediately */
1601 			if (md_commd_global_state & MD_CGS_ABORTED) {
1602 				break;
1603 			}
1604 
1605 			if (err == MDMNE_ACK) {
1606 				slave_result =
1607 				    mdmn_get_master_table_res(setno, class);
1608 				commd_debug(MD_MMV_PROC_M,
1609 				    "proc_mas: got result for (%d,0x%llx-%d)\n",
1610 				    MSGID_ELEMS(cmsg->msg_msgid));
1611 			} else if (err == MDMNE_IGNORE_NODE) {
1612 				mutex_unlock(mx);
1613 				continue; /* send to next node */
1614 			}
1615 			mutex_unlock(mx);
1616 
1617 
1618 			/*
1619 			 * If the result is NULL, or err doesn't show success,
1620 			 * something went wrong with this RPC call.
1621 			 */
1622 			if ((slave_result == NULL) || (err != MDMNE_ACK)) {
1623 				/*
1624 				 * If PANIC_WHEN_INCONSISTENT set,
1625 				 * panic if the master succeeded while
1626 				 * this node failed
1627 				 */
1628 				if ((cmsg->msg_flags &
1629 				    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1630 				    (master_err == MDMNE_ACK))
1631 					panic_system(nid, cmsg->msg_type,
1632 					    master_err, master_exitval,
1633 					    slave_result);
1634 
1635 				result->mmr_failing_node = nid;
1636 				/* are we supposed to stop in case of error? */
1637 				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1638 					result->mmr_exitval = MDMNE_RPC_FAIL;
1639 					commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1640 					    "result (%d,0x%llx-%d) is NULL\n",
1641 					    MSGID_ELEMS(cmsg->msg_msgid));
1642 					FLUSH_DEBUGFILE();
1643 					break_msg_loop = 1;
1644 					break; /* out of node loop first */
1645 				} else {
1646 					/* send msg to the next node */
1647 					continue;
1648 				}
1649 
1650 			}
1651 
1652 			/*
1653 			 * Message processed on remote node.
1654 			 * If PANIC_WHEN_INCONSISTENT set, panic if the
1655 			 * result is different on this node from the result
1656 			 * on the master
1657 			 */
1658 			if ((cmsg->msg_flags &
1659 			    MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1660 			    ((master_err != MDMNE_ACK) ||
1661 			    (slave_result->mmr_exitval != master_exitval)))
1662 				panic_system(nid, cmsg->msg_type, master_err,
1663 				    master_exitval, slave_result);
1664 
1665 			/*
1666 			 * At this point we know we have a message that was
1667 			 * processed on the remote node.
1668 			 * We now check if the exitval is non zero.
1669 			 * In that case we discard the previous result and
1670 			 * rather use the current.
1671 			 * This means: If a message fails on no node,
1672 			 * the result from the master will be returned.
1673 			 * There's currently no such thing as merge of results
1674 			 * If additionally STOP_ON_ERROR is set, we bail out
1675 			 */
1676 			if (slave_result->mmr_exitval != 0) {
1677 				/* throw away the previously allocated result */
1678 				free_result(result);
1679 
1680 				/* copy_result() allocates new memory */
1681 				result = copy_result(slave_result);
1682 				free_result(slave_result);
1683 
1684 				dump_result(MD_MMV_PROC_M, "proc_mas", result);
1685 
1686 				result->mmr_failing_node = nid;
1687 				if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1688 					break_msg_loop = 1;
1689 					break; /* out of node loop */
1690 				}
1691 				continue; /* try next node */
1692 
1693 			} else {
1694 				/*
1695 				 * MNIssue: may want to merge the results
1696 				 * from all slaves.  Currently only report
1697 				 * the results from the master.
1698 				 */
1699 				free_result(slave_result);
1700 			}
1701 
1702 		} /* End of loop over the nodes */
1703 		rw_unlock(&set_desc_rwlock[setno]);
1704 
1705 
1706 		/* release the current class again */
1707 		if (class != orig_class) {
1708 			mutex_lock(&mdmn_busy_mutex[setno]);
1709 			mdmn_mark_class_unbusy(setno, class);
1710 			mutex_unlock(&mdmn_busy_mutex[setno]);
1711 		}
1712 
1713 		/* are we supposed to quit entirely ? */
1714 		if (break_msg_loop ||
1715 		    (md_commd_global_state & MD_CGS_ABORTED)) {
1716 			break; /* out of msg loop */
1717 		}
1718 
1719 	} /* End of loop over the messages */
1720 	/*
1721 	 * If we are here, there's two possibilities:
1722 	 * 	- we processed all messages on all nodes without an error.
1723 	 *	    In this case we return the result from the master.
1724 	 *	    (to be implemented: return the merged result)
1725 	 *	- we encountered an error in which case result has been
1726 	 *	    set accordingly already.
1727 	 */
1728 
1729 	if (md_commd_global_state & MD_CGS_ABORTED) {
1730 		result->mmr_comm_state = MDMNE_ABORT;
1731 	}
1732 
1733 	/*
1734 	 * This message has been processed completely.
1735 	 * Remove it from the changelog.
1736 	 * Do this for replay messages too.
1737 	 * Note that the message is unlogged before waking up the
1738 	 * initiator.  This is done for two reasons.
1739 	 * 1. Remove a race condition that occurs when back to back
1740 	 *   messages are sent for the same class, the registeration is
1741 	 *   is lost.
1742 	 * 2. If the initiator died but the action was completed on all the
1743 	 *   the nodes, we want that to be marked "done" quickly.
1744 	 */
1745 
1746 	if ((msg->msg_flags & MD_MSGF_NO_LOG) == 0) {
1747 		commd_debug(MD_MMV_PROC_M,
1748 		    "proc_mas: calling unlog_msg for (%d,0x%llx-%d) type %d\n",
1749 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1750 		mdmn_unlog_msg(msg);
1751 		commd_debug(MD_MMV_PROC_M,
1752 		    "proc_mas: done unlog_msg for (%d,0x%llx-%d) type %d\n",
1753 		    MSGID_ELEMS(msg->msg_msgid), msgtype);
1754 	}
1755 
1756 	/*
1757 	 * In case of submessages, we increased the submessage ID in the
1758 	 * result structure. We restore the message ID to the value that
1759 	 * the initiator is waiting for.
1760 	 */
1761 	result->mmr_msgid.mid_smid	= 0;
1762 	result->mmr_msgtype		= orig_type;
1763 	result->mmr_sender		= set_master;
1764 
1765 	/* if we have an inited client, send result */
1766 	ret = (int *)NULL;
1767 
1768 	rw_rdlock(&client_rwlock[setno]);
1769 	if (check_client(setno, sender)) {
1770 		commd_debug(MD_MMV_SYSLOG,
1771 		    "proc_mas: unable to create client for initiator\n");
1772 	} else {
1773 		ret = mdmn_wakeup_initiator_1(result, client[setno][sender]);
1774 	}
1775 	rw_unlock(&client_rwlock[setno]);
1776 
1777 	if (ret == (int *)NULL) {
1778 		commd_debug(MD_MMV_PROC_M,
1779 		    "proc_mas: couldn't wakeup initiator\n");
1780 	} else {
1781 		if (*ret != MDMNE_ACK) {
1782 			commd_debug(MD_MMV_PROC_M,
1783 			    "proc_mas: wakeup_initiator returned %d\n",
1784 			    *ret);
1785 		}
1786 		free(ret);
1787 	}
1788 
1789 	rw_unlock(&set_desc_rwlock[setno]);
1790 	/* Free all submessages, if there were any */
1791 	if (nmsgs > 1) {
1792 		for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1793 			free_msg(msglist[curmsg]);
1794 		}
1795 	}
1796 	/* Free the result */
1797 	free_result(result);
1798 
1799 	mutex_lock(&mdmn_busy_mutex[setno]);
1800 	mdmn_mark_class_unbusy(setno, orig_class);
1801 	mutex_unlock(&mdmn_busy_mutex[setno]);
1802 
1803 
1804 	/*
1805 	 * We use this ioctl just to get the time in the same format as used in
1806 	 * the messageID. If it fails, all we get is a bad runtime output.
1807 	 */
1808 	(void) metaioctl(MD_IOCGUNIQMSGID, &dummyid, &mde, NULL);
1809 	secdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) >> 32;
1810 	usecdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) & 0xfffff;
1811 
1812 	/* catching possible overflow */
1813 	if (usecdiff >= 1000000) {
1814 		usecdiff -= 1000000;
1815 		secdiff++;
1816 	}
1817 
1818 
1819 	commd_debug(MD_MMV_PROC_M, "proc_mas: done (%d, 0x%llx-%d) type=%02d "
1820 	    "%5d.%06d secs runtime\n",
1821 	    MSGID_ELEMS(msg->msg_msgid), orig_type, secdiff, usecdiff);
1822 
1823 	/* Free the original message */
1824 	free_msg(msg);
1825 }
1826 
1827 void
1828 mdmn_slave_process_msg(md_mn_msg_t *msg)
1829 {
1830 	int			*ret = NULL;
1831 	int			completed;
1832 	int			retries;
1833 	int			successfully_returned;
1834 	set_t			setno;
1835 	md_mn_result_t		*result;
1836 	md_mn_nodeid_t		sender;
1837 	md_mn_nodeid_t		whoami;
1838 	md_mn_msgtype_t		msgtype;
1839 	md_mn_msgclass_t	class;
1840 
1841 	void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1842 
1843 	setno	= msg->msg_setno;
1844 	sender	= msg->msg_sender; /* this is always the master of the set */
1845 	msgtype	= msg->msg_type;
1846 
1847 	rw_rdlock(&set_desc_rwlock[setno]);
1848 	whoami		= set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
1849 	rw_unlock(&set_desc_rwlock[setno]);
1850 
1851 	result = Zalloc(sizeof (md_mn_result_t));
1852 	result->mmr_flags	= msg->msg_flags;
1853 	result->mmr_setno	= setno;
1854 	result->mmr_msgtype	= msgtype;
1855 	result->mmr_sender	= whoami;
1856 	result->mmr_comm_state	= MDMNE_ACK; /* Ok state */
1857 	MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1858 	class = mdmn_get_message_class(msgtype);
1859 
1860 	commd_debug(MD_MMV_PROC_S,
1861 	    "proc_sla: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1862 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msgtype);
1863 
1864 	handler = mdmn_get_handler(msgtype);
1865 
1866 	if (handler == NULL) {
1867 		result->mmr_exitval = 0;
1868 		/* let the sender decide if this is an error or not */
1869 		result->mmr_comm_state = MDMNE_NO_HANDLER;
1870 		commd_debug(MD_MMV_PROC_S,
1871 		    "proc_sla: No handler for (%d, 0x%llx-%d)\n",
1872 		    MSGID_ELEMS(msg->msg_msgid));
1873 	} else {
1874 
1875 		/* Did we already process this message ? */
1876 		mutex_lock(&mct_mutex[setno][class]);
1877 		completed = mdmn_check_completion(msg, result);
1878 
1879 		if (completed == MDMN_MCT_NOT_DONE) {
1880 			/* message not yet processed locally */
1881 			commd_debug(MD_MMV_PROC_S,
1882 			    "proc_sla: calling handler for (%d, 0x%llx-%d)\n",
1883 			    MSGID_ELEMS(msg->msg_msgid));
1884 
1885 			/*
1886 			 * Mark the message as being currently processed,
1887 			 * so we won't start a second handler for it
1888 			 */
1889 			(void) mdmn_mark_completion(msg, NULL,
1890 			    MDMN_MCT_IN_PROGRESS);
1891 
1892 			mutex_unlock(&mct_mutex[setno][class]);
1893 			(*handler)(msg, MD_MSGF_ON_SLAVE, result);
1894 
1895 			commd_debug(MD_MMV_PROC_S,
1896 			    "proc_sla: finished handler for (%d, 0x%llx-%d)\n",
1897 			    MSGID_ELEMS(msg->msg_msgid));
1898 
1899 			mutex_lock(&mct_mutex[setno][class]);
1900 			/* Mark the message as fully done, store the result */
1901 			(void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
1902 
1903 		} else if (completed == MDMN_MCT_DONE) {
1904 			/* message processed previously, got result from MCT */
1905 			commd_debug(MD_MMV_PROC_S,
1906 			    "proc_sla: result for (%d, 0x%llx-%d) from MCT\n",
1907 			    MSGID_ELEMS(msg->msg_msgid));
1908 		} else if (completed == MDMN_MCT_IN_PROGRESS) {
1909 			/*
1910 			 * If the message is curruntly being processed,
1911 			 * we can return here, without sending a result back.
1912 			 * This will be done by the initial message handling
1913 			 * thread
1914 			 */
1915 			mutex_unlock(&mct_mutex[setno][class]);
1916 			commd_debug(MD_MMV_PROC_M, "proc_sla: "
1917 			    "(%d, 0x%llx-%d) is currently being processed\n",
1918 			    MSGID_ELEMS(msg->msg_msgid), msgtype);
1919 
1920 			free_msg(msg);
1921 			free_result(result);
1922 			return;
1923 		} else {
1924 			/* MCT error occurred (should never happen) */
1925 			result->mmr_comm_state = MDMNE_LOG_FAIL;
1926 			commd_debug(MD_MMV_PROC_S,
1927 			    "proc_sla: MCT error for (%d, 0x%llx-%d)\n",
1928 			    MSGID_ELEMS(msg->msg_msgid));
1929 		}
1930 		mutex_unlock(&mct_mutex[setno][class]);
1931 	}
1932 
1933 	/*
1934 	 * At this point we have a result (even in an error case)
1935 	 * that we return to the master.
1936 	 */
1937 	rw_rdlock(&set_desc_rwlock[setno]);
1938 	retries = 2; /* we will try two times to send the results */
1939 	successfully_returned = 0;
1940 
1941 	while (!successfully_returned && (retries != 0)) {
1942 		ret = (int *)NULL;
1943 		rw_rdlock(&client_rwlock[setno]);
1944 		if (check_client(setno, sender)) {
1945 			/*
1946 			 * If we cannot setup the rpc connection to the master,
1947 			 * we can't do anything besides logging this fact.
1948 			 */
1949 			commd_debug(MD_MMV_SYSLOG,
1950 			    "proc_mas: unable to create client for master\n");
1951 			rw_unlock(&client_rwlock[setno]);
1952 			break;
1953 		} else {
1954 			ret = mdmn_wakeup_master_1(result,
1955 			    client[setno][sender]);
1956 			/*
1957 			 * if mdmn_wakeup_master_1 returns NULL, it can be that
1958 			 * the master (or the commd on the master) had died.
1959 			 * In that case, we destroy the client to the master
1960 			 * and retry.
1961 			 * If mdmn_wakeup_master_1 doesn't return MDMNE_ACK,
1962 			 * the commd on the master is alive but
1963 			 * something else is wrong,
1964 			 * in that case a retry doesn't make sense => break out
1965 			 */
1966 			if (ret == (int *)NULL) {
1967 				commd_debug(MD_MMV_PROC_S,
1968 				    "proc_sla: wakeup_master returned NULL\n");
1969 				/* release reader lock, grab writer lock */
1970 				rw_unlock(&client_rwlock[setno]);
1971 				rw_wrlock(&client_rwlock[setno]);
1972 				mdmn_clnt_destroy(client[setno][sender]);
1973 				if (client[setno][sender] != (CLIENT *)NULL) {
1974 					client[setno][sender] = (CLIENT *)NULL;
1975 				}
1976 				rw_unlock(&client_rwlock[setno]);
1977 				retries--;
1978 				commd_debug(MD_MMV_PROC_S,
1979 				    "retries = %d\n", retries);
1980 				continue;
1981 			}
1982 			if (*ret != MDMNE_ACK) {
1983 				commd_debug(MD_MMV_PROC_S, "proc_sla: "
1984 				    "wakeup_master returned %d\n", *ret);
1985 				rw_unlock(&client_rwlock[setno]);
1986 				break;
1987 			} else { /* Good case */
1988 				successfully_returned = 1;
1989 				rw_unlock(&client_rwlock[setno]);
1990 			}
1991 		}
1992 	}
1993 
1994 	rw_unlock(&set_desc_rwlock[setno]);
1995 	commd_debug(MD_MMV_PROC_S, "proc_sla: done (%d, 0x%llx-%d)\n",
1996 	    MSGID_ELEMS(msg->msg_msgid));
1997 
1998 	if (ret != (int *)NULL)
1999 		free(ret);
2000 	free_msg(msg);
2001 	free_result(result);
2002 }
2003 
2004 
2005 md_mn_result_t *
2006 mdmn_send_svc_1(md_mn_msg_t *omsg, struct svc_req *rqstp)
2007 {
2008 	int			err;
2009 	set_t			setno;
2010 	SVCXPRT			*transp = rqstp->rq_xprt;
2011 	md_mn_msg_t		*msg;
2012 	md_mn_result_t		*resultp;
2013 	md_mn_msgclass_t	class;
2014 	md_mn_msg_and_transp_t	*matp;
2015 
2016 	msg = copy_msg(omsg, NULL);
2017 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2018 
2019 	setno = msg->msg_setno;
2020 	class = mdmn_get_message_class(msg->msg_type);
2021 
2022 	/* If we are in the abort state, we error out immediately */
2023 	if (md_commd_global_state & MD_CGS_ABORTED) {
2024 		resultp = Zalloc(sizeof (md_mn_result_t));
2025 		resultp->mmr_comm_state = MDMNE_ABORT;
2026 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2027 		free_result(resultp);
2028 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2029 		return (NULL);
2030 	}
2031 
2032 	/* check if the global initialization is done */
2033 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2034 		global_init();
2035 	}
2036 
2037 	commd_debug(MD_MMV_SEND,
2038 	    "send: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2039 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2040 
2041 	/* Check for verbosity related message */
2042 	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2043 		md_mn_verbose_t *d;
2044 
2045 		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2046 		md_commd_global_verb = d->mmv_what;
2047 		/* everytime the bitmask is set, we reset the timer */
2048 		__savetime = gethrtime();
2049 		/*
2050 		 * If local-only-flag is set, we are done here,
2051 		 * otherwise we pass that message on to the master.
2052 		 */
2053 		if (msg->msg_flags & MD_MSGF_LOCAL_ONLY) {
2054 			resultp = Zalloc(sizeof (md_mn_result_t));
2055 			resultp->mmr_comm_state = MDMNE_ACK;
2056 			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2057 			    (char *)resultp);
2058 			free_result(resultp);
2059 			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2060 			return (NULL);
2061 		}
2062 	}
2063 
2064 	/*
2065 	 * Are we entering the abort state?
2066 	 * Here we don't even need to check for MD_MSGF_LOCAL_ONLY, because
2067 	 * this message cannot be distributed anyway.
2068 	 * So, it's safe to return immediately.
2069 	 */
2070 	if (msg->msg_type == MD_MN_MSG_ABORT) {
2071 		md_commd_global_state |= MD_CGS_ABORTED;
2072 		resultp = Zalloc(sizeof (md_mn_result_t));
2073 		resultp->mmr_comm_state = MDMNE_ACK;
2074 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2075 		free_result(resultp);
2076 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2077 		return (NULL);
2078 	}
2079 
2080 
2081 	/*
2082 	 * Is this message type blocked?
2083 	 * If so we return MDMNE_CLASS_LOCKED, immediately
2084 	 */
2085 	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2086 		resultp = Zalloc(sizeof (md_mn_result_t));
2087 		resultp->mmr_comm_state = MDMNE_CLASS_LOCKED;
2088 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2089 		free_result(resultp);
2090 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2091 		commd_debug(MD_MMV_SEND,
2092 			"send: type locked (%d, 0x%llx-%d), set=%d, class=%d, "
2093 			"type=%d\n", MSGID_ELEMS(msg->msg_msgid), setno, class,
2094 			msg->msg_type);
2095 		return (NULL);
2096 	}
2097 
2098 
2099 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2100 		/* Can only use the appropriate mutexes if they are inited */
2101 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2102 			rw_wrlock(&set_desc_rwlock[setno]);
2103 			rw_wrlock(&client_rwlock[setno]);
2104 			err = mdmn_init_set(setno, MDMN_SET_READY);
2105 			rw_unlock(&client_rwlock[setno]);
2106 			rw_unlock(&set_desc_rwlock[setno]);
2107 		} else {
2108 			err = mdmn_init_set(setno, MDMN_SET_READY);
2109 		}
2110 
2111 		if (err) {
2112 			/* couldn't initialize connections, cannot proceed */
2113 			resultp = Zalloc(sizeof (md_mn_result_t));
2114 			resultp->mmr_comm_state = err;
2115 			mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2116 			    (char *)resultp);
2117 			svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2118 			free_result(resultp);
2119 			commd_debug(MD_MMV_SEND,
2120 			    "send: init err = %d\n", err);
2121 			return (NULL);
2122 		}
2123 	}
2124 
2125 	mutex_lock(&mdmn_busy_mutex[setno]);
2126 	if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2127 	    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2128 		mutex_unlock(&mdmn_busy_mutex[setno]);
2129 		resultp = Zalloc(sizeof (md_mn_result_t));
2130 		resultp->mmr_comm_state = MDMNE_SUSPENDED;
2131 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2132 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2133 		free_result(resultp);
2134 		commd_debug(MD_MMV_SEND,
2135 			"send: class suspended (%d, 0x%llx-%d), set=%d, "
2136 			"class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2137 			setno, class, msg->msg_type);
2138 		return (NULL);
2139 	}
2140 	mutex_unlock(&mdmn_busy_mutex[setno]);
2141 
2142 	/* is this rpc request coming from the local node? */
2143 	if (check_license(rqstp, 0) == FALSE) {
2144 		svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2145 		commd_debug(MD_MMV_SEND,
2146 			"send: check licence fail(%d, 0x%llx-%d), set=%d, "
2147 			"class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2148 			setno, class, msg->msg_type);
2149 		return (NULL);
2150 	}
2151 
2152 
2153 	/*
2154 	 * We allocate a structure that can take two pointers in order to pass
2155 	 * both the message and the transp into thread_create.
2156 	 * The free for this alloc is done in mdmn_send_to_work()
2157 	 */
2158 	matp = Malloc(sizeof (md_mn_msg_and_transp_t));
2159 	matp->mat_msg = msg;
2160 	matp->mat_transp = transp;
2161 
2162 	/*
2163 	 * create a thread here that calls work on the master.
2164 	 * If we are already on the master, this would block if running
2165 	 * in the same context. (our service is single threaded)(
2166 	 * Make it a detached thread because it will not communicate with
2167 	 * anybody thru thr_* mechanisms
2168 	 */
2169 	thr_create(NULL, 0, mdmn_send_to_work, (void *) matp, THR_DETACHED,
2170 	    NULL);
2171 
2172 	commd_debug(MD_MMV_SEND, "send: done (%d, 0x%llx-%d)\n",
2173 	    MSGID_ELEMS(msg->msg_msgid));
2174 	/*
2175 	 * We return here without sending results. This will be done by
2176 	 * mdmn_wakeup_initiator_svc_1() as soon as the results are available.
2177 	 * Until then the calling send_message will be blocked, while we
2178 	 * are able to take calls.
2179 	 */
2180 
2181 	return (NULL);
2182 }
2183 
2184 /* ARGSUSED */
2185 int *
2186 mdmn_work_svc_1(md_mn_msg_t *omsg, struct svc_req *rqstp)
2187 {
2188 	int		err;
2189 	set_t		setno;
2190 	thread_t	tid;
2191 	int		*retval;
2192 	md_mn_msg_t	*msg;
2193 	md_mn_msgclass_t class;
2194 
2195 	retval = Malloc(sizeof (int));
2196 
2197 	/* If we are in the abort state, we error out immediately */
2198 	if (md_commd_global_state & MD_CGS_ABORTED) {
2199 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2200 		*retval = MDMNE_ABORT;
2201 		return (retval);
2202 	}
2203 
2204 	msg = copy_msg(omsg, NULL);
2205 	xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2206 
2207 	/*
2208 	 * Is this message type blocked?
2209 	 * If so we return MDMNE_CLASS_LOCKED, immediately.
2210 	 * This check is performed on master and slave.
2211 	 */
2212 	if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2213 		*retval = MDMNE_CLASS_LOCKED;
2214 		return (retval);
2215 	}
2216 
2217 	/* check if the global initialization is done */
2218 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2219 		global_init();
2220 	}
2221 
2222 	class = mdmn_get_message_class(msg->msg_type);
2223 	setno = msg->msg_setno;
2224 
2225 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2226 		/* Can only use the appropriate mutexes if they are inited */
2227 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2228 			rw_wrlock(&set_desc_rwlock[setno]);
2229 			rw_wrlock(&client_rwlock[setno]);
2230 			err = mdmn_init_set(setno, MDMN_SET_READY);
2231 			rw_unlock(&client_rwlock[setno]);
2232 			rw_unlock(&set_desc_rwlock[setno]);
2233 		} else {
2234 			err = mdmn_init_set(setno, MDMN_SET_READY);
2235 		}
2236 
2237 		if (err) {
2238 			*retval = MDMNE_CANNOT_CONNECT;
2239 			free_msg(msg);
2240 			return (retval);
2241 		}
2242 	}
2243 
2244 	/* is this rpc request coming from a licensed node? */
2245 	if (check_license(rqstp, msg->msg_sender) == FALSE) {
2246 		free_msg(msg);
2247 		*retval = MDMNE_RPC_FAIL;
2248 		return (retval);
2249 	}
2250 
2251 	commd_debug(MD_MMV_WORK,
2252 	    "work: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
2253 	    "flags=0x%x\n",
2254 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type,
2255 	    msg->msg_flags);
2256 
2257 	/* Check for various CLASS0 message types */
2258 	if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2259 		md_mn_verbose_t *d;
2260 
2261 		d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2262 		/* for now we ignore set / class in md_mn_verbose_t */
2263 		md_commd_global_verb = d->mmv_what;
2264 		/* everytime the bitmask is set, we reset the timer */
2265 		__savetime = gethrtime();
2266 	}
2267 
2268 	mutex_lock(&mdmn_busy_mutex[setno]);
2269 
2270 	/* check if class is locked via a call to mdmn_comm_lock_svc_1 */
2271 	if (mdmn_is_class_locked(setno, class) == TRUE) {
2272 		mutex_unlock(&mdmn_busy_mutex[setno]);
2273 		*retval = MDMNE_CLASS_LOCKED;
2274 		free_msg(msg);
2275 		return (retval);
2276 	}
2277 	mutex_unlock(&mdmn_busy_mutex[setno]);
2278 
2279 	/* Check if the class is busy right now. Do it only on the master */
2280 	rw_rdlock(&set_desc_rwlock[setno]);
2281 	if (set_descriptor[setno]->sd_mn_am_i_master) {
2282 		rw_unlock(&set_desc_rwlock[setno]);
2283 		/*
2284 		 * If the class is currently suspended, don't accept new
2285 		 * messages, unless they are flagged with an override bit.
2286 		 */
2287 		mutex_lock(&mdmn_busy_mutex[setno]);
2288 		if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2289 		    ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2290 			mutex_unlock(&mdmn_busy_mutex[setno]);
2291 			*retval = MDMNE_SUSPENDED;
2292 			commd_debug(MD_MMV_SEND,
2293 			    "send: set %d is suspended\n", setno);
2294 			free_msg(msg);
2295 			return (retval);
2296 		}
2297 		if (mdmn_mark_class_busy(setno, class) == FALSE) {
2298 			mutex_unlock(&mdmn_busy_mutex[setno]);
2299 			*retval = MDMNE_CLASS_BUSY;
2300 			free_msg(msg);
2301 			return (retval);
2302 		}
2303 		mutex_unlock(&mdmn_busy_mutex[setno]);
2304 		/*
2305 		 * Because the real processing of the message takes time we
2306 		 * create a thread for it. So the master thread can continue
2307 		 * to run and accept further messages.
2308 		 */
2309 		*retval = thr_create(NULL, 0,
2310 		    (void *(*)(void *))mdmn_master_process_msg, (void *)msg,
2311 		    THR_DETACHED|THR_SUSPENDED, &tid);
2312 	} else {
2313 		rw_unlock(&set_desc_rwlock[setno]);
2314 		*retval = thr_create(NULL, 0,
2315 		    (void *(*)(void *)) mdmn_slave_process_msg, (void *)msg,
2316 		    THR_DETACHED|THR_SUSPENDED, &tid);
2317 	}
2318 
2319 	if (*retval != 0) {
2320 		*retval = MDMNE_THR_CREATE_FAIL;
2321 		free_msg(msg);
2322 		return (retval);
2323 	}
2324 
2325 	/* Now run the new thread */
2326 	thr_continue(tid);
2327 
2328 	commd_debug(MD_MMV_WORK,
2329 	    "work: done (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2330 	    MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2331 
2332 	*retval = MDMNE_ACK; /* this means success */
2333 	return (retval);
2334 }
2335 
2336 /* ARGSUSED */
2337 int *
2338 mdmn_wakeup_initiator_svc_1(md_mn_result_t *res, struct svc_req *rqstp)
2339 {
2340 
2341 	int		*retval;
2342 	int		err;
2343 	set_t		setno;
2344 	mutex_t		*mx;   /* protection of initiator_table */
2345 	SVCXPRT		*transp;
2346 	md_mn_msgid_t	initiator_table_id;
2347 	md_mn_msgclass_t class;
2348 
2349 	retval = Malloc(sizeof (int));
2350 
2351 	/* check if the global initialization is done */
2352 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2353 		global_init();
2354 	}
2355 
2356 	setno	= res->mmr_setno;
2357 
2358 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2359 		/* set not ready means we just crashed are restarted now */
2360 		/* Can only use the appropriate mutexes if they are inited */
2361 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2362 			rw_wrlock(&set_desc_rwlock[setno]);
2363 			rw_wrlock(&client_rwlock[setno]);
2364 			err = mdmn_init_set(setno, MDMN_SET_READY);
2365 			rw_unlock(&client_rwlock[setno]);
2366 			rw_unlock(&set_desc_rwlock[setno]);
2367 		} else {
2368 			err = mdmn_init_set(setno, MDMN_SET_READY);
2369 		}
2370 
2371 		if (err) {
2372 			*retval = MDMNE_CANNOT_CONNECT;
2373 			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2374 			return (retval);
2375 		}
2376 	}
2377 
2378 	/* is this rpc request coming from a licensed node? */
2379 	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2380 		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2381 		*retval = MDMNE_RPC_FAIL;
2382 		return (retval);
2383 	}
2384 
2385 
2386 	class	= mdmn_get_message_class(res->mmr_msgtype);
2387 	mx	= mdmn_get_initiator_table_mx(setno, class);
2388 
2389 	commd_debug(MD_MMV_WAKE_I,
2390 	    "wake_ini: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
2391 	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype);
2392 
2393 	mutex_lock(mx);
2394 
2395 	/*
2396 	 * Search the initiator wakeup table.
2397 	 * If we find an entry here (which should always be true)
2398 	 * we are on the initiating node and we wakeup the original
2399 	 * local rpc call
2400 	 */
2401 	mdmn_get_initiator_table_id(setno, class, &initiator_table_id);
2402 
2403 	if (MSGID_CMP(&(initiator_table_id), &(res->mmr_msgid))) {
2404 		transp = mdmn_get_initiator_table_transp(setno, class);
2405 		mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)res);
2406 		mdmn_unregister_initiator_table(setno, class);
2407 		*retval = MDMNE_ACK;
2408 
2409 		commd_debug(MD_MMV_WAKE_I,
2410 		    "wake_ini: replied (%d, 0x%llx-%d)\n",
2411 		    MSGID_ELEMS(res->mmr_msgid));
2412 	} else {
2413 		commd_debug(MD_MMV_WAKE_I,
2414 		    "wakeup initiator: unsolicited message (%d, 0x%llx-%d)\n",
2415 		    MSGID_ELEMS(res->mmr_msgid));
2416 		*retval = MDMNE_NO_WAKEUP_ENTRY;
2417 	}
2418 	mutex_unlock(mx);
2419 	/* less work for check_timeouts */
2420 	mutex_lock(&check_timeout_mutex);
2421 	if (messages_on_their_way == 0) {
2422 		commd_debug(MD_MMV_WAKE_I,
2423 		    "Oops, messages_on_their_way < 0 (%d, 0x%llx-%d)\n",
2424 		    MSGID_ELEMS(res->mmr_msgid));
2425 	} else {
2426 		messages_on_their_way--;
2427 	}
2428 	mutex_unlock(&check_timeout_mutex);
2429 	xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2430 
2431 	return (retval);
2432 }
2433 
2434 
2435 /*
2436  * res must be free'd by the thread we wake up
2437  */
2438 /* ARGSUSED */
2439 int *
2440 mdmn_wakeup_master_svc_1(md_mn_result_t *ores, struct svc_req *rqstp)
2441 {
2442 
2443 	int		*retval;
2444 	int		err;
2445 	set_t		setno;
2446 	cond_t		*cv;
2447 	mutex_t		*mx;
2448 	md_mn_msgid_t	master_table_id;
2449 	md_mn_nodeid_t	sender;
2450 	md_mn_result_t	*res;
2451 	md_mn_msgclass_t class;
2452 
2453 	retval = Malloc(sizeof (int));
2454 
2455 	/* check if the global initialization is done */
2456 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2457 		global_init();
2458 	}
2459 
2460 	/* Need to copy the results here, as they are static for RPC */
2461 	res = copy_result(ores);
2462 	xdr_free(xdr_md_mn_result_t, (caddr_t)ores);
2463 
2464 	class = mdmn_get_message_class(res->mmr_msgtype);
2465 	setno = res->mmr_setno;
2466 
2467 	if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2468 		/* set not ready means we just crashed are restarted now */
2469 		/* Can only use the appropriate mutexes if they are inited */
2470 		if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2471 			rw_wrlock(&set_desc_rwlock[setno]);
2472 			rw_wrlock(&client_rwlock[setno]);
2473 			err = mdmn_init_set(setno, MDMN_SET_READY);
2474 			rw_unlock(&client_rwlock[setno]);
2475 			rw_unlock(&set_desc_rwlock[setno]);
2476 		} else {
2477 			err = mdmn_init_set(setno, MDMN_SET_READY);
2478 		}
2479 
2480 		if (err) {
2481 			*retval = MDMNE_CANNOT_CONNECT;
2482 			xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2483 			return (retval);
2484 		}
2485 	}
2486 
2487 	/* is this rpc request coming from a licensed node? */
2488 	if (check_license(rqstp, res->mmr_sender) == FALSE) {
2489 		*retval = MDMNE_RPC_FAIL;
2490 		xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2491 		return (retval);
2492 	}
2493 
2494 
2495 	commd_debug(MD_MMV_WAKE_M,
2496 	    "wake_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d "
2497 	    "from %d\n",
2498 	    MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype,
2499 	    res->mmr_sender);
2500 	/*
2501 	 * The mutex and cv are needed for waking up the thread
2502 	 * sleeping in mdmn_master_process_msg()
2503 	 */
2504 	mx = mdmn_get_master_table_mx(setno, class);
2505 	cv = mdmn_get_master_table_cv(setno, class);
2506 
2507 	/*
2508 	 * lookup the master wakeup table
2509 	 * If we find our message, we are on the master and
2510 	 * called by a slave that finished processing a message.
2511 	 * We store the results in the appropriate slot and
2512 	 * wakeup the thread (mdmn_master_process_msg()) waiting for them.
2513 	 */
2514 	mutex_lock(mx);
2515 	mdmn_get_master_table_id(setno, class, &master_table_id);
2516 	sender = mdmn_get_master_table_addr(setno, class);
2517 
2518 	if (MSGID_CMP(&(master_table_id), &(res->mmr_msgid))) {
2519 		if (sender == res->mmr_sender) {
2520 			mdmn_set_master_table_res(setno, class, res);
2521 			cond_signal(cv);
2522 			*retval = MDMNE_ACK;
2523 		} else {
2524 			/* id is correct but wrong sender (I smell a timeout) */
2525 			commd_debug(MD_MMV_WAKE_M,
2526 			    "wakeup master got unsolicited message: "
2527 			    "(%d, 0x%llx-%d) from %d\n",
2528 			    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender);
2529 			free_result(res);
2530 			*retval = MDMNE_TIMEOUT;
2531 		}
2532 	} else {
2533 		/* id is wrong, smells like a very late timeout */
2534 		commd_debug(MD_MMV_WAKE_M,
2535 		    "wakeup master got unsolicited message: "
2536 		    "(%d, 0x%llx-%d) from %d, expected (%d, 0x%llx-%d)\n",
2537 		    MSGID_ELEMS(res->mmr_msgid), res->mmr_sender,
2538 		    MSGID_ELEMS(master_table_id));
2539 		free_result(res);
2540 		*retval = MDMNE_NO_WAKEUP_ENTRY;
2541 	}
2542 
2543 	mutex_unlock(mx);
2544 
2545 	return (retval);
2546 }
2547 
2548 /*
2549  * Lock a set/class combination.
2550  * This is mainly done for debug purpose.
2551  * This set/class combination immediately is blocked,
2552  * even in the middle of sending messages to multiple slaves.
2553  * This remains until the user issues a mdmn_comm_unlock_svc_1 for the same
2554  * set/class combination.
2555  *
2556  * Special messages of class MD_MSG_CLASS0 can never be locked.
2557  * 	e.g. MD_MN_MSG_VERBOSITY, MD_MN_MSG_ABORT
2558  *
2559  * That means, if MD_MSG_CLASS0 is specified, we lock all classes from
2560  * >= MD_MSG_CLASS1 to < MD_MN_NCLASSES
2561  *
2562  * set must be between 1 and MD_MAXSETS
2563  * class can be:
2564  *	MD_MSG_CLASS0 which means all other classes in this case
2565  *	or one specific class (< MD_MN_NCLASSES)
2566  *
2567  * Returns:
2568  *	MDMNE_ACK on sucess (locking a locked class is Ok)
2569  *	MDMNE_EINVAL if a parameter is out of range
2570  */
2571 
2572 /* ARGSUSED */
2573 int *
2574 mdmn_comm_lock_svc_1(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2575 {
2576 	int			*retval;
2577 	set_t			setno = msc->msc_set;
2578 	md_mn_msgclass_t	class = msc->msc_class;
2579 
2580 	retval = Malloc(sizeof (int));
2581 
2582 	/* check if the global initialization is done */
2583 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2584 		global_init();
2585 	}
2586 
2587 	/* is this rpc request coming from the local node ? */
2588 	if (check_license(rqstp, 0) == FALSE) {
2589 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2590 		*retval = MDMNE_RPC_FAIL;
2591 		return (retval);
2592 	}
2593 
2594 	/* Perform some range checking */
2595 	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2596 	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2597 		*retval = MDMNE_EINVAL;
2598 		return (retval);
2599 	}
2600 
2601 	commd_debug(MD_MMV_MISC, "lock: set=%d, class=%d\n", setno, class);
2602 	mutex_lock(&mdmn_busy_mutex[setno]);
2603 	if (class != MD_MSG_CLASS0) {
2604 		mdmn_mark_class_locked(setno, class);
2605 	} else {
2606 		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2607 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2608 			mdmn_mark_class_locked(setno, class);
2609 		}
2610 	}
2611 	mutex_unlock(&mdmn_busy_mutex[setno]);
2612 
2613 	*retval = MDMNE_ACK;
2614 	return (retval);
2615 }
2616 
2617 /*
2618  * Unlock a set/class combination.
2619  * set must be between 1 and MD_MAXSETS
2620  * class can be:
2621  *	MD_MSG_CLASS0 which means all other classes in this case (like above)
2622  *	or one specific class (< MD_MN_NCLASSES)
2623  *
2624  * Returns:
2625  *	MDMNE_ACK on sucess (unlocking an unlocked class is Ok)
2626  *	MDMNE_EINVAL if a parameter is out of range
2627  */
2628 /* ARGSUSED */
2629 int *
2630 mdmn_comm_unlock_svc_1(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2631 {
2632 	int			*retval;
2633 	set_t			setno  = msc->msc_set;
2634 	md_mn_msgclass_t	class  = msc->msc_class;
2635 
2636 	retval = Malloc(sizeof (int));
2637 
2638 	/* check if the global initialization is done */
2639 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2640 		global_init();
2641 	}
2642 
2643 	/* is this rpc request coming from the local node ? */
2644 	if (check_license(rqstp, 0) == FALSE) {
2645 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2646 		*retval = MDMNE_RPC_FAIL;
2647 		return (retval);
2648 	}
2649 
2650 	/* Perform some range checking */
2651 	if ((setno == 0) || (setno >= MD_MAXSETS) ||
2652 	    (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2653 		*retval = MDMNE_EINVAL;
2654 		return (retval);
2655 	}
2656 	commd_debug(MD_MMV_MISC, "unlock: set=%d, class=%d\n", setno, class);
2657 
2658 	mutex_lock(&mdmn_busy_mutex[setno]);
2659 	if (class != MD_MSG_CLASS0) {
2660 		mdmn_mark_class_unlocked(setno, class);
2661 	} else {
2662 		/* MD_MSG_CLASS0 is used as a wild card for all classes */
2663 		for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2664 			mdmn_mark_class_unlocked(setno, class);
2665 		}
2666 	}
2667 	mutex_unlock(&mdmn_busy_mutex[setno]);
2668 
2669 	*retval = MDMNE_ACK;
2670 	return (retval);
2671 }
2672 
2673 /*
2674  * mdmn_comm_suspend_svc_1(setno, class)
2675  *
2676  * Drain all outstanding messages for a given set/class combination
2677  * and don't allow new messages to be processed.
2678  *
2679  * Special messages of class MD_MSG_CLASS0 can never be locked.
2680  * 	e.g. MD_MN_MSG_VERBOSITY
2681  *
2682  * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2683  * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2684  *
2685  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2686  * one class as being suspended.
2687  * If messages for this class are currently on their way,
2688  * MDMNE_SET_NOT_DRAINED is returned. Otherwise MDMNE_ACK is returned.
2689  *
2690  * If class _is_ MD_COMM_ALL_CLASSES we drain all classes of this set.
2691  * Messages must be generated in ascending order.
2692  * This means, a message cannot create submessages with the same or lower class.
2693  * Draining messages must go from 1 to NCLASSES in order to ensure we don't
2694  * generate a hanging situation here.
2695  * We mark class 1 as being suspended.
2696  * if the class is not busy, we proceed with class 2
2697  * and so on
2698  * if a class *is* busy, we cannot continue here, but return
2699  * MDMNE_SET_NOT_DRAINED.
2700  * We expect the caller to hold on for some seconds and try again.
2701  * When that message, that held the class busy is done in
2702  * mdmn_master_process_msg(), mdmn_mark_class_unbusy() called.
2703  * There it is checked if the class is about to drain.
2704  * In that case it tries to drain all higher classes there.
2705  *
2706  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2707  * In that case we return MDMNE_SET_NOT_DRAINED if not all sets are
2708  * completely drained.
2709  *
2710  * Returns:
2711  *	MDMNE_ACK on sucess (set is drained, no outstanding messages)
2712  *	MDMNE_SET_NOT_DRAINED  if drain process is started, but there are
2713  *		still outstanding messages for this set(s)
2714  *	MDMNE_EINVAL if setno is out of range
2715  *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2716  */
2717 
2718 /* ARGSUSED */
2719 int *
2720 mdmn_comm_suspend_svc_1(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2721 {
2722 	int			*retval;
2723 	int			failure = 0;
2724 	set_t			startset, endset;
2725 	set_t			setno  = msc->msc_set;
2726 	md_mn_msgclass_t	oclass = msc->msc_class;
2727 #ifdef NOT_YET_NEEDED
2728 	uint_t			flags  = msc->msc_flags;
2729 #endif /* NOT_YET_NEEDED */
2730 	md_mn_msgclass_t	class;
2731 
2732 	retval = Malloc(sizeof (int));
2733 
2734 	/* check if the global initialization is done */
2735 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2736 		global_init();
2737 	}
2738 
2739 	/* is this rpc request coming from the local node ? */
2740 	if (check_license(rqstp, 0) == FALSE) {
2741 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2742 		*retval = MDMNE_RPC_FAIL;
2743 		return (retval);
2744 	}
2745 
2746 	commd_debug(MD_MMV_MISC, "suspend: called for set=%d class=%d\n",
2747 	    setno, oclass);
2748 
2749 	/* Perform some range checking */
2750 	if (setno >= MD_MAXSETS) {
2751 		*retval = MDMNE_EINVAL;
2752 		commd_debug(MD_MMV_MISC, "suspend: returning MDMNE_EINVAL\n");
2753 		return (retval);
2754 	}
2755 
2756 	/*  setno == MD_COMM_ALL_SETS means: we walk thru all possible sets. */
2757 	if (setno == MD_COMM_ALL_SETS) {
2758 		startset = 1;
2759 		endset = MD_MAXSETS - 1;
2760 	} else {
2761 		startset = setno;
2762 		endset = setno;
2763 	}
2764 
2765 	for (setno = startset; setno <= endset; setno++) {
2766 		/* Here we need the mutexes for the set to be setup */
2767 		if (md_mn_set_inited[setno] != MDMN_SET_MUTEXES) {
2768 			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
2769 		}
2770 
2771 		mutex_lock(&mdmn_busy_mutex[setno]);
2772 		/* shall we drain all classes of this set? */
2773 		if (oclass == MD_COMM_ALL_CLASSES) {
2774 			for (class = 1; class < MD_MN_NCLASSES; class ++) {
2775 				commd_debug(MD_MMV_MISC,
2776 				    "suspend: suspending set %d, class %d\n",
2777 				    setno, class);
2778 				*retval = mdmn_mark_class_suspended(setno,
2779 				    class, MDMN_SUSPEND_ALL);
2780 				if (*retval == MDMNE_SET_NOT_DRAINED) {
2781 					failure++;
2782 				}
2783 			}
2784 		} else {
2785 			/* only drain one specific class */
2786 			commd_debug(MD_MMV_MISC,
2787 			    "suspend: suspending set=%d class=%d\n",
2788 			    setno, oclass);
2789 			*retval = mdmn_mark_class_suspended(setno, oclass,
2790 			    MDMN_SUSPEND_1);
2791 			if (*retval == MDMNE_SET_NOT_DRAINED) {
2792 				failure++;
2793 			}
2794 		}
2795 		mutex_unlock(&mdmn_busy_mutex[setno]);
2796 	}
2797 	/* If one or more sets are not entirely drained, failure is non-zero */
2798 	if (failure != 0) {
2799 		*retval = MDMNE_SET_NOT_DRAINED;
2800 		commd_debug(MD_MMV_MISC,
2801 		    "suspend: returning MDMNE_SET_NOT_DRAINED\n");
2802 	} else {
2803 		*retval = MDMNE_ACK;
2804 	}
2805 
2806 	return (retval);
2807 }
2808 
2809 /*
2810  * mdmn_comm_resume_svc_1(setno, class)
2811  *
2812  * Resume processing messages for a given set.
2813  * This incorporates the repeal of a previous suspend operation.
2814  *
2815  * 1 <= setno < MD_MAXSETS	or setno == MD_COMM_ALL_SETS
2816  * 1 <= class < MD_MN_NCLASSES	or class == MD_COMM_ALL_CLASSES
2817  *
2818  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2819  * one class as being resumed.
2820  *
2821  * If class _is_ MD_COMM_ALL_CLASSES we resume all classes of this set.
2822  *
2823  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2824  *
2825  * If both setno is MD_COMM_ALL_SETS and class is MD_COMM_ALL_CLASSES we also
2826  * reset any ABORT flag from the global state.
2827  *
2828  * Returns:
2829  *	MDMNE_ACK on sucess (resuming an unlocked set is Ok)
2830  *	MDMNE_EINVAL if setno is out of range
2831  *	MDMNE_NOT_JOINED if the set is not yet initialized on this node
2832  */
2833 /* ARGSUSED */
2834 int *
2835 mdmn_comm_resume_svc_1(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2836 {
2837 	int			*retval;
2838 	set_t			startset, endset;
2839 	set_t			setno  = msc->msc_set;
2840 	md_mn_msgclass_t	oclass = msc->msc_class;
2841 	uint_t			flags  = msc->msc_flags;
2842 	md_mn_msgclass_t	class;
2843 
2844 	retval = Malloc(sizeof (int));
2845 
2846 	/* check if the global initialization is done */
2847 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2848 		global_init();
2849 	}
2850 
2851 	/* is this rpc request coming from the local node ? */
2852 	if (check_license(rqstp, 0) == FALSE) {
2853 		xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2854 		*retval = MDMNE_RPC_FAIL;
2855 		return (retval);
2856 	}
2857 
2858 	commd_debug(MD_MMV_MISC, "resume: called for set=%d class=%d\n",
2859 	    setno, oclass);
2860 
2861 	/* Perform some range checking */
2862 	if (setno > MD_MAXSETS) {
2863 		*retval = MDMNE_EINVAL;
2864 		return (retval);
2865 	}
2866 
2867 	if (setno == MD_COMM_ALL_SETS) {
2868 		startset = 1;
2869 		endset = MD_MAXSETS - 1;
2870 		if (oclass == MD_COMM_ALL_CLASSES) {
2871 			/* This is the point where we "unabort" the commd */
2872 			commd_debug(MD_MMV_MISC, "resume: resetting ABORT\n");
2873 			md_commd_global_state &= ~MD_CGS_ABORTED;
2874 		}
2875 	} else {
2876 		startset = setno;
2877 		endset = setno;
2878 	}
2879 
2880 	for (setno = startset; setno <= endset; setno++) {
2881 
2882 		/* Here we need the mutexes for the set to be setup */
2883 		if ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0) {
2884 			(void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
2885 		}
2886 
2887 		mutex_lock(&mdmn_busy_mutex[setno]);
2888 
2889 		if (oclass == MD_COMM_ALL_CLASSES) {
2890 			int end_class = 1;
2891 			/*
2892 			 * When SUSPENDing all classes, we go
2893 			 * from 1 to MD_MN_NCLASSES-1
2894 			 * The correct reverse action is RESUMing
2895 			 * from MD_MN_NCLASSES-1 to 1 (or 2)
2896 			 */
2897 
2898 			if (flags & MD_MSCF_DONT_RESUME_CLASS1) {
2899 				end_class = 2;
2900 			}
2901 
2902 			/*
2903 			 * Then mark all classes of this set as no longer
2904 			 * suspended. This supersedes any previous suspend(1)
2905 			 * calls and resumes the set entirely.
2906 			 */
2907 			for (class = MD_MN_NCLASSES - 1; class >= end_class;
2908 			    class --) {
2909 				commd_debug(MD_MMV_MISC,
2910 				    "resume: resuming set=%d class=%d\n",
2911 				    setno, class);
2912 				mdmn_mark_class_resumed(setno, class,
2913 				    (MDMN_SUSPEND_ALL | MDMN_SUSPEND_1));
2914 			}
2915 		} else {
2916 			/*
2917 			 * In this case only one class is marked as not
2918 			 * suspended. If a suspend(all) is currently active for
2919 			 * this set, this class will still be suspended.
2920 			 * That state will be cleared by a suspend(all)
2921 			 * (see above)
2922 			 */
2923 			commd_debug(MD_MMV_MISC,
2924 			    "resume: resuming set=%d class=%d\n",
2925 			    setno, oclass);
2926 			mdmn_mark_class_resumed(setno, oclass, MDMN_SUSPEND_1);
2927 		}
2928 
2929 		mutex_unlock(&mdmn_busy_mutex[setno]);
2930 	}
2931 
2932 	*retval = MDMNE_ACK;
2933 	return (retval);
2934 }
2935 /* ARGSUSED */
2936 int *
2937 mdmn_comm_reinit_set_svc_1(set_t *setnop, struct svc_req *rqstp)
2938 {
2939 	int		*retval;
2940 	md_mnnode_desc	*node;
2941 	set_t		 setno = *setnop;
2942 
2943 	retval = Malloc(sizeof (int));
2944 
2945 	/* check if the global initialization is done */
2946 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2947 		global_init();
2948 	}
2949 
2950 	/* is this rpc request coming from the local node ? */
2951 	if (check_license(rqstp, 0) == FALSE) {
2952 		xdr_free(xdr_set_t, (caddr_t)setnop);
2953 		*retval = MDMNE_RPC_FAIL;
2954 		return (retval);
2955 	}
2956 
2957 	commd_debug(MD_MMV_MISC, "reinit: set=%d\n", setno);
2958 
2959 	rw_rdlock(&set_desc_rwlock[setno]);
2960 	/*
2961 	 * We assume, that all messages have been suspended previously.
2962 	 *
2963 	 * As we are modifying lots of clients here we grab the client_rwlock
2964 	 * in writer mode. This ensures, no new messages come in.
2965 	 */
2966 	rw_wrlock(&client_rwlock[setno]);
2967 	/* This set is no longer initialized */
2968 
2969 	if ((set_descriptor[setno] != NULL) &&
2970 	    (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
2971 		/* destroy all rpc clients from this set */
2972 		for (node = set_descriptor[setno]->sd_nodelist; node;
2973 		    node = node->nd_next) {
2974 			mdmn_clnt_destroy(client[setno][node->nd_nodeid]);
2975 			if (client[setno][node->nd_nodeid] != (CLIENT *)NULL) {
2976 				client[setno][node->nd_nodeid] = (CLIENT *)NULL;
2977 			}
2978 		}
2979 	md_mn_set_inited[setno] &= ~MDMN_SET_NODES;
2980 	}
2981 
2982 	commd_debug(MD_MMV_MISC, "reinit: done init_set(%d)\n", setno);
2983 
2984 	rw_unlock(&client_rwlock[setno]);
2985 	rw_unlock(&set_desc_rwlock[setno]);
2986 	*retval = MDMNE_ACK;
2987 	return (retval);
2988 }
2989 
2990 /*
2991  * This is just an interface for testing purpose.
2992  * Here we can disable single message types.
2993  * If we block a message type, this is valid for all MN sets.
2994  * If a message arrives later, and  it's message type is blocked, it will
2995  * be returned immediately with MDMNE_CLASS_LOCKED, which causes the sender to
2996  * resend this message over and over again.
2997  */
2998 
2999 /* ARGSUSED */
3000 int *
3001 mdmn_comm_msglock_svc_1(md_mn_type_and_lock_t *mmtl, struct svc_req *rqstp)
3002 {
3003 	int			*retval;
3004 	md_mn_msgtype_t		type = mmtl->mmtl_type;
3005 	uint_t			lock = mmtl->mmtl_lock;
3006 
3007 	retval = Malloc(sizeof (int));
3008 
3009 	/* check if the global initialization is done */
3010 	if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3011 		global_init();
3012 	}
3013 
3014 	/* is this rpc request coming from the local node ? */
3015 	if (check_license(rqstp, 0) == FALSE) {
3016 		xdr_free(xdr_md_mn_type_and_lock_t, (caddr_t)mmtl);
3017 		*retval = MDMNE_RPC_FAIL;
3018 		return (retval);
3019 	}
3020 
3021 	/* Perform some range checking */
3022 	if ((type == 0) || (type >= MD_MN_NMESSAGES)) {
3023 		*retval = MDMNE_EINVAL;
3024 		return (retval);
3025 	}
3026 
3027 	commd_debug(MD_MMV_MISC, "msglock: type=%d, lock=%d\n", type, lock);
3028 	msgtype_lock_state[type] = lock;
3029 
3030 	*retval = MDMNE_ACK;
3031 	return (retval);
3032 }
3033