xref: /titanic_41/usr/src/cmd/fm/fmd/common/fmd_xprt.c (revision 6f8d59d8fcaf391990ca04c7bdcf65ab23320fe0)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * FMD Transport Subsystem
29  *
30  * A transport module uses some underlying mechanism to transport events.
31  * This mechanism may use any underlying link-layer protocol and may support
32  * additional link-layer packets unrelated to FMA.  Some appropriate link-
33  * layer mechanism to create the underlying connection is expected to be
34  * called prior to calling fmd_xprt_open() itself.  Alternatively, a transport
35  * may be created in the suspended state by specifying the FMD_XPRT_SUSPENDED
36  * flag as part of the call to fmd_xprt_open(), and then may be resumed later.
37  * The underlying transport mechanism is *required* to provide ordering: that
38  * is, the sequences of bytes written across the transport must be read by
39  * the remote peer in the order that they are written, even across separate
40  * calls to fmdo_send().  As an example, the Internet TCP protocol would be
41  * a valid transport as it guarantees ordering, whereas the Internet UDP
42  * protocol would not because UDP datagrams may be delivered in any order
43  * as a result of delays introduced when datagrams pass through routers.
44  *
45  * Similar to sending events, a transport module receives events that are from
46  * its peer remote endpoint using some transport-specific mechanism that is
47  * unknown to FMD.  As each event is received, the transport module is
48  * responsible for constructing a valid nvlist_t object from the data and then
49  * calling fmd_xprt_post() to post the event to the containing FMD's dispatch
50  * queue, making it available to all local modules that are not transport
51  * modules that have subscribed to the event.
52  *
53  * The following state machine is used for each transport.  The initial state
54  * is either SYN, ACK, or RUN, depending on the flags specified to xprt_create.
55  *
56  *       FMD_XPRT_ACCEPT   !FMD_XPRT_ACCEPT
57  *             |                 |
58  * waiting  +--v--+           +--v--+  waiting
59  * for syn  | SYN |--+     --+| ACK |  for ack
60  * event    +-----+   \   /   +-----+  event
61  *             |       \ /       |
62  * drop all +--v--+     X     +--v--+  send subscriptions,
63  * events   | ERR |<---+ +--->| SUB |  recv subscriptions,
64  *          +-----+           +-----+  wait for run event
65  *             ^                 |
66  *             |     +-----+     |
67  *             +-----| RUN |<----+
68  *                   +--^--+
69  *                      |
70  *               FMD_XPRT_RDONLY
71  *
72  * When fmd_xprt_open() is called without FMD_XPRT_ACCEPT, the Common Transport
73  * Layer enqueues a "syn" event for the module in its event queue and sets the
74  * state to ACK.  In state ACK, we are waiting for the transport to get an
75  * "ack" event and call fmd_xprt_post() on this event.  Other events will be
76  * discarded.  If an "ack" is received, we transition to state SUB.  If a
77  * configurable timeout occurs or if the "ack" is invalid (e.g. invalid version
78  * exchange), we transition to state ERR.  Once in state ERR, no further
79  * operations are valid except fmd_xprt_close() and fmd_xprt_error() will
80  * return a non-zero value to the caller indicating the transport has failed.
81  *
82  * When fmd_xprt_open() is called with FMD_XPRT_ACCEPT, the Common Transport
83  * Layer assumes this transport is being used to accept a virtual connection
84  * from a remote peer that is sending a "syn", and sets the initial state to
85  * SYN.  In this state, the transport waits for a "syn" event, validates it,
86  * and then transitions to state SUB if it is valid or state ERR if it is not.
87  *
88  * Once in state SUB, the transport module is expected to receive a sequence of
89  * zero or more "subscribe" events from the remote peer, followed by a "run"
90  * event.  Once in state RUN, the transport is active and any events can be
91  * sent or received.  The transport module is free to call fmd_xprt_close()
92  * from any state.  The fmd_xprt_error() function will return zero if the
93  * transport is not in the ERR state, or non-zero if it is in the ERR state.
94  *
95  * Once the state machine reaches RUN, other FMA protocol events can be sent
96  * and received across the transport in addition to the various control events.
97  *
98  * Table of Common Transport Layer Control Events
99  * ==============================================
100  *
101  * FMA Class                     Payload
102  * ---------                     -------
103  * resource.fm.xprt.uuclose      string (uuid of case)
104  * resource.fm.xprt.uuresolved   string (uuid of case)
105  * resource.fm.xprt.updated      string (uuid of case)
106  * resource.fm.xprt.subscribe    string (class pattern)
107  * resource.fm.xprt.unsubscribe  string (class pattern)
108  * resource.fm.xprt.unsuback     string (class pattern)
109  * resource.fm.xprt.syn          version information
110  * resource.fm.xprt.ack          version information
111  * resource.fm.xprt.run          version information
112  *
113  * Control events are used to add and delete proxy subscriptions on the remote
114  * transport peer module, and to set up connections.  When a "syn" event is
115  * sent, FMD will include in the payload the highest version of the FMA event
116  * protocol that is supported by the sender.  When a "syn" event is received,
117  * the receiving FMD will use the minimum of this version and its version of
118  * the protocol, and reply with this new minimum version in the "ack" event.
119  * The receiver will then use this new minimum for subsequent event semantics.
120  */
121 
122 #include <sys/fm/protocol.h>
123 #include <strings.h>
124 #include <limits.h>
125 
126 #include <fmd_alloc.h>
127 #include <fmd_error.h>
128 #include <fmd_conf.h>
129 #include <fmd_subr.h>
130 #include <fmd_string.h>
131 #include <fmd_protocol.h>
132 #include <fmd_thread.h>
133 #include <fmd_eventq.h>
134 #include <fmd_dispq.h>
135 #include <fmd_ctl.h>
136 #include <fmd_log.h>
137 #include <fmd_ustat.h>
138 #include <fmd_case.h>
139 #include <fmd_api.h>
140 #include <fmd_fmri.h>
141 #include <fmd_asru.h>
142 #include <fmd_xprt.h>
143 
144 #include <fmd.h>
145 
146 /*
147  * The states shown above in the transport state machine diagram are encoded
148  * using arrays of class patterns and a corresponding action function.  These
149  * arrays are then passed to fmd_xprt_transition() to change transport states.
150  */
151 
152 const fmd_xprt_rule_t _fmd_xprt_state_syn[] = {
153 { "resource.fm.xprt.syn", fmd_xprt_event_syn },
154 { "*", fmd_xprt_event_error },
155 { NULL, NULL }
156 };
157 
158 const fmd_xprt_rule_t _fmd_xprt_state_ack[] = {
159 { "resource.fm.xprt.ack", fmd_xprt_event_ack },
160 { "*", fmd_xprt_event_error },
161 };
162 
163 const fmd_xprt_rule_t _fmd_xprt_state_err[] = {
164 { "*", fmd_xprt_event_drop },
165 { NULL, NULL }
166 };
167 
168 const fmd_xprt_rule_t _fmd_xprt_state_sub[] = {
169 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
170 { "resource.fm.xprt.run", fmd_xprt_event_run },
171 { "resource.fm.xprt.*", fmd_xprt_event_error },
172 { "*", fmd_xprt_event_drop },
173 { NULL, NULL }
174 };
175 
176 const fmd_xprt_rule_t _fmd_xprt_state_run[] = {
177 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
178 { "resource.fm.xprt.unsubscribe", fmd_xprt_event_unsub },
179 { "resource.fm.xprt.unsuback", fmd_xprt_event_unsuback },
180 { "resource.fm.xprt.uuclose", fmd_xprt_event_uuclose },
181 { "resource.fm.xprt.uuresolved", fmd_xprt_event_uuresolved },
182 { "resource.fm.xprt.updated", fmd_xprt_event_updated },
183 { "resource.fm.xprt.*", fmd_xprt_event_error },
184 { NULL, NULL }
185 };
186 
187 /*
188  * Template for per-transport statistics installed by fmd on behalf of each
189  * transport.  These are used to initialize the per-transport xi_stats.  For
190  * each statistic, the name is prepended with "fmd.xprt.%u", where %u is the
191  * transport ID (xi_id) and then are inserted into the per-module stats hash.
192  * The values in this array must match fmd_xprt_stat_t from <fmd_xprt.h>.
193  */
194 static const fmd_xprt_stat_t _fmd_xprt_stat_tmpl = {
195 {
196 { "dispatched", FMD_TYPE_UINT64, "total events dispatched to transport" },
197 { "dequeued", FMD_TYPE_UINT64, "total events dequeued by transport" },
198 { "prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by transport" },
199 { "dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" },
200 { "wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" },
201 { "wtime", FMD_TYPE_TIME, "total wait time on queue" },
202 { "wlentime", FMD_TYPE_TIME, "total wait length * time product" },
203 { "wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" },
204 { "dtime", FMD_TYPE_TIME, "total processing time after dequeue" },
205 { "dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" },
206 },
207 { "module", FMD_TYPE_STRING, "module that owns this transport" },
208 { "authority", FMD_TYPE_STRING, "authority associated with this transport" },
209 { "state", FMD_TYPE_STRING, "current transport state" },
210 { "received", FMD_TYPE_UINT64, "events received by transport" },
211 { "discarded", FMD_TYPE_UINT64, "bad events discarded by transport" },
212 { "retried", FMD_TYPE_UINT64, "retries requested of transport" },
213 { "replayed", FMD_TYPE_UINT64, "events replayed by transport" },
214 { "lost", FMD_TYPE_UINT64, "events lost by transport" },
215 { "timeouts", FMD_TYPE_UINT64, "events received by transport with ttl=0" },
216 { "subscriptions", FMD_TYPE_UINT64, "subscriptions registered to transport" },
217 };
218 
219 static void
220 fmd_xprt_class_hash_create(fmd_xprt_class_hash_t *xch, fmd_eventq_t *eq)
221 {
222 	uint_t hashlen = fmd.d_str_buckets;
223 
224 	xch->xch_queue = eq;
225 	xch->xch_hashlen = hashlen;
226 	xch->xch_hash = fmd_zalloc(sizeof (void *) * hashlen, FMD_SLEEP);
227 }
228 
229 static void
230 fmd_xprt_class_hash_destroy(fmd_xprt_class_hash_t *xch)
231 {
232 	fmd_eventq_t *eq = xch->xch_queue;
233 	fmd_xprt_class_t *xcp, *ncp;
234 	uint_t i;
235 
236 	for (i = 0; i < xch->xch_hashlen; i++) {
237 		for (xcp = xch->xch_hash[i]; xcp != NULL; xcp = ncp) {
238 			ncp = xcp->xc_next;
239 
240 			if (eq != NULL)
241 				fmd_dispq_delete(fmd.d_disp, eq, xcp->xc_class);
242 
243 			fmd_strfree(xcp->xc_class);
244 			fmd_free(xcp, sizeof (fmd_xprt_class_t));
245 		}
246 	}
247 
248 	fmd_free(xch->xch_hash, sizeof (void *) * xch->xch_hashlen);
249 }
250 
251 /*
252  * Insert the specified class into the specified class hash, and return the
253  * reference count.  A return value of one indicates this is the first insert.
254  * If an eventq is associated with the hash, insert a dispq subscription for it.
255  */
256 static uint_t
257 fmd_xprt_class_hash_insert(fmd_xprt_impl_t *xip,
258     fmd_xprt_class_hash_t *xch, const char *class)
259 {
260 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
261 	fmd_xprt_class_t *xcp;
262 
263 	ASSERT(MUTEX_HELD(&xip->xi_lock));
264 
265 	for (xcp = xch->xch_hash[h]; xcp != NULL; xcp = xcp->xc_next) {
266 		if (strcmp(class, xcp->xc_class) == 0)
267 			return (++xcp->xc_refs);
268 	}
269 
270 	xcp = fmd_alloc(sizeof (fmd_xprt_class_t), FMD_SLEEP);
271 	xcp->xc_class = fmd_strdup(class, FMD_SLEEP);
272 	xcp->xc_next = xch->xch_hash[h];
273 	xcp->xc_refs = 1;
274 	xch->xch_hash[h] = xcp;
275 
276 	if (xch->xch_queue != NULL)
277 		fmd_dispq_insert(fmd.d_disp, xch->xch_queue, class);
278 
279 	return (xcp->xc_refs);
280 }
281 
282 /*
283  * Delete the specified class from the specified class hash, and return the
284  * reference count.  A return value of zero indicates the class was deleted.
285  * If an eventq is associated with the hash, delete the dispq subscription.
286  */
287 static uint_t
288 fmd_xprt_class_hash_delete(fmd_xprt_impl_t *xip,
289     fmd_xprt_class_hash_t *xch, const char *class)
290 {
291 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
292 	fmd_xprt_class_t *xcp, **pp;
293 
294 	ASSERT(MUTEX_HELD(&xip->xi_lock));
295 	pp = &xch->xch_hash[h];
296 
297 	for (xcp = *pp; xcp != NULL; xcp = xcp->xc_next) {
298 		if (strcmp(class, xcp->xc_class) == 0)
299 			break;
300 		else
301 			pp = &xcp->xc_next;
302 	}
303 
304 	if (xcp == NULL)
305 		return (-1U); /* explicitly permit an invalid delete */
306 
307 	if (--xcp->xc_refs != 0)
308 		return (xcp->xc_refs);
309 
310 	ASSERT(xcp->xc_refs == 0);
311 	*pp = xcp->xc_next;
312 
313 	fmd_strfree(xcp->xc_class);
314 	fmd_free(xcp, sizeof (fmd_xprt_class_t));
315 
316 	if (xch->xch_queue != NULL)
317 		fmd_dispq_delete(fmd.d_disp, xch->xch_queue, class);
318 
319 	return (0);
320 }
321 
322 /*
323  * Queue subscribe events for the specified transport corresponding to all of
324  * the active module subscriptions.  This is an extremely heavyweight operation
325  * that we expect to take place rarely (i.e. when loading a transport module
326  * or when it establishes a connection).  We lock all of the known modules to
327  * prevent them from adding or deleting subscriptions, then snapshot their
328  * subscriptions, and then unlock all of the modules.  We hold the modhash
329  * lock for the duration of this operation to prevent new modules from loading.
330  */
331 static void
332 fmd_xprt_subscribe_modhash(fmd_xprt_impl_t *xip, fmd_modhash_t *mhp)
333 {
334 	fmd_xprt_t *xp = (fmd_xprt_t *)xip;
335 	const fmd_conf_path_t *pap;
336 	fmd_module_t *mp;
337 	uint_t i, j;
338 
339 	(void) pthread_rwlock_rdlock(&mhp->mh_lock);
340 
341 	for (i = 0; i < mhp->mh_hashlen; i++) {
342 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
343 			fmd_module_lock(mp);
344 	}
345 
346 	(void) pthread_mutex_lock(&xip->xi_lock);
347 	ASSERT(!(xip->xi_flags & FMD_XPRT_SUBSCRIBER));
348 	xip->xi_flags |= FMD_XPRT_SUBSCRIBER;
349 	(void) pthread_mutex_unlock(&xip->xi_lock);
350 
351 	for (i = 0; i < mhp->mh_hashlen; i++) {
352 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) {
353 			(void) fmd_conf_getprop(mp->mod_conf,
354 			    FMD_PROP_SUBSCRIPTIONS, &pap);
355 			for (j = 0; j < pap->cpa_argc; j++)
356 				fmd_xprt_subscribe(xp, pap->cpa_argv[j]);
357 		}
358 	}
359 
360 	for (i = 0; i < mhp->mh_hashlen; i++) {
361 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
362 			fmd_module_unlock(mp);
363 	}
364 
365 	(void) pthread_rwlock_unlock(&mhp->mh_lock);
366 }
367 
368 static void
369 fmd_xprt_transition(fmd_xprt_impl_t *xip,
370     const fmd_xprt_rule_t *state, const char *tag)
371 {
372 	fmd_event_t *e;
373 	nvlist_t *nvl;
374 	char *s;
375 
376 	TRACE((FMD_DBG_XPRT, "xprt %u -> %s\n", xip->xi_id, tag));
377 
378 	xip->xi_state = state;
379 	s = fmd_strdup(tag, FMD_SLEEP);
380 
381 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
382 	fmd_strfree(xip->xi_stats->xs_state.fmds_value.str);
383 	xip->xi_stats->xs_state.fmds_value.str = s;
384 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
385 
386 	/*
387 	 * If we've reached the SUB state, take out the big hammer and snapshot
388 	 * all of the subscriptions of all of the loaded modules.  Then queue a
389 	 * run event for our remote peer indicating that it can enter RUN.
390 	 */
391 	if (state == _fmd_xprt_state_sub) {
392 		fmd_xprt_subscribe_modhash(xip, fmd.d_mod_hash);
393 
394 		nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
395 		    "resource.fm.xprt.run", xip->xi_version);
396 
397 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
398 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
399 		fmd_eventq_insert_at_time(xip->xi_queue, e);
400 	}
401 }
402 
403 static void
404 fmd_xprt_authupdate(fmd_xprt_impl_t *xip)
405 {
406 	char *s = fmd_fmri_auth2str(xip->xi_auth);
407 
408 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
409 	fmd_strfree(xip->xi_stats->xs_authority.fmds_value.str);
410 	xip->xi_stats->xs_authority.fmds_value.str = s;
411 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
412 }
413 
414 static int
415 fmd_xprt_vmismatch(fmd_xprt_impl_t *xip, nvlist_t *nvl, uint_t *rversionp)
416 {
417 	uint8_t rversion;
418 
419 	if (nvlist_lookup_uint8(nvl, FM_VERSION, &rversion) != 0) {
420 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
421 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
422 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
423 
424 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
425 		return (1);
426 	}
427 
428 	if (rversion > xip->xi_version) {
429 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u protocol mismatch: %u>%u\n",
430 		    xip->xi_id, rversion, xip->xi_version);
431 
432 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
433 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
434 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
435 
436 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
437 		return (1);
438 	}
439 
440 	if (rversionp != NULL)
441 		*rversionp = rversion;
442 
443 	return (0);
444 }
445 
446 void
447 fmd_xprt_event_syn(fmd_xprt_impl_t *xip, nvlist_t *nvl)
448 {
449 	fmd_event_t *e;
450 	uint_t vers;
451 	char *class;
452 
453 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
454 		return; /* transitioned to error state */
455 
456 	/*
457 	 * If the transport module didn't specify an authority, extract the
458 	 * one that is passed along with the xprt.syn event and use that.
459 	 */
460 	if (xip->xi_auth == NULL &&
461 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
462 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
463 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
464 		fmd_xprt_authupdate(xip);
465 	}
466 
467 	nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
468 	    "resource.fm.xprt.ack", xip->xi_version);
469 
470 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
471 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
472 	fmd_eventq_insert_at_time(xip->xi_queue, e);
473 
474 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
475 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
476 }
477 
478 void
479 fmd_xprt_event_ack(fmd_xprt_impl_t *xip, nvlist_t *nvl)
480 {
481 	uint_t vers;
482 
483 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
484 		return; /* transitioned to error state */
485 
486 	/*
487 	 * If the transport module didn't specify an authority, extract the
488 	 * one that is passed along with the xprt.syn event and use that.
489 	 */
490 	if (xip->xi_auth == NULL &&
491 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
492 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
493 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
494 		fmd_xprt_authupdate(xip);
495 	}
496 
497 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
498 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
499 }
500 
501 /*
502  * Upon transition to RUN, we take every solved case and resend a list.suspect
503  * event for it to our remote peer.  If a case transitions from solved to a
504  * future state (CLOSE_WAIT, CLOSED, or REPAIRED) while we are iterating over
505  * the case hash, we will get it as part of examining the resource cache, next.
506  */
507 static void
508 fmd_xprt_send_case(fmd_case_t *cp, void *arg)
509 {
510 	fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
511 	fmd_xprt_impl_t *xip = arg;
512 
513 	fmd_event_t *e;
514 	nvlist_t *nvl;
515 	char *class;
516 
517 	if (cip->ci_state == FMD_CASE_UNSOLVED)
518 		return;
519 
520 	nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS);
521 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
522 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
523 
524 	fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n",
525 	    FM_LIST_SUSPECT_CLASS, cip->ci_uuid, xip->xi_id);
526 
527 	fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid);
528 }
529 
530 void
531 fmd_xprt_event_run(fmd_xprt_impl_t *xip, nvlist_t *nvl)
532 {
533 	if (!fmd_xprt_vmismatch(xip, nvl, NULL)) {
534 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
535 		fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case, xip);
536 	}
537 }
538 
539 void
540 fmd_xprt_event_sub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
541 {
542 	char *class;
543 
544 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
545 		return; /* transitioned to error state */
546 
547 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
548 		return; /* malformed protocol event */
549 
550 	(void) pthread_mutex_lock(&xip->xi_lock);
551 	(void) fmd_xprt_class_hash_insert(xip, &xip->xi_lsub, class);
552 	(void) pthread_mutex_unlock(&xip->xi_lock);
553 
554 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
555 	xip->xi_stats->xs_subscriptions.fmds_value.ui64++;
556 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
557 }
558 
559 void
560 fmd_xprt_event_unsub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
561 {
562 	fmd_event_t *e;
563 	char *class;
564 
565 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
566 		return; /* transitioned to error state */
567 
568 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
569 		return; /* malformed protocol event */
570 
571 	(void) pthread_mutex_lock(&xip->xi_lock);
572 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_lsub, class);
573 	(void) pthread_mutex_unlock(&xip->xi_lock);
574 
575 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
576 	xip->xi_stats->xs_subscriptions.fmds_value.ui64--;
577 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
578 
579 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
580 	    "resource.fm.xprt.unsuback", xip->xi_version, class);
581 
582 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
583 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
584 	fmd_eventq_insert_at_time(xip->xi_queue, e);
585 }
586 
587 void
588 fmd_xprt_event_unsuback(fmd_xprt_impl_t *xip, nvlist_t *nvl)
589 {
590 	char *class;
591 
592 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
593 		return; /* transitioned to error state */
594 
595 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
596 		return; /* malformed protocol event */
597 
598 	(void) pthread_mutex_lock(&xip->xi_lock);
599 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_usub, class);
600 	(void) pthread_mutex_unlock(&xip->xi_lock);
601 }
602 
603 /*
604  * on diagnosing side, receive a uuclose from the proxy.
605  */
606 void
607 fmd_xprt_event_uuclose(fmd_xprt_impl_t *xip, nvlist_t *nvl)
608 {
609 	fmd_case_t *cp;
610 	char *uuid;
611 
612 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
613 		return; /* transitioned to error state */
614 
615 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
616 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
617 		/*
618 		 * update resource cache status and transition case
619 		 */
620 		fmd_case_close_status(cp);
621 		fmd_case_transition(cp, FMD_CASE_CLOSE_WAIT, FMD_CF_ISOLATED);
622 		fmd_case_rele(cp);
623 	}
624 }
625 
626 /*
627  * on diagnosing side, receive a uuresolved from the proxy.
628  */
629 void
630 fmd_xprt_event_uuresolved(fmd_xprt_impl_t *xip, nvlist_t *nvl)
631 {
632 	fmd_case_t *cp;
633 	char *uuid;
634 
635 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
636 		return; /* transitioned to error state */
637 
638 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
639 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
640 		fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
641 
642 		fmd_case_transition(cp, (cip->ci_state == FMD_CASE_REPAIRED) ?
643 		    FMD_CASE_RESOLVED : (cip->ci_state == FMD_CASE_CLOSED) ?
644 		    FMD_CASE_REPAIRED : FMD_CASE_CLOSE_WAIT, FMD_CF_RESOLVED);
645 		fmd_case_rele(cp);
646 	}
647 }
648 
649 /*
650  * on diagnosing side, receive a repair/acquit from the proxy.
651  */
652 void
653 fmd_xprt_event_updated(fmd_xprt_impl_t *xip, nvlist_t *nvl)
654 {
655 	fmd_case_t *cp;
656 	char *uuid;
657 
658 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
659 		return; /* transitioned to error state */
660 
661 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
662 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
663 		uint8_t *statusp, *proxy_asrup = NULL;
664 		uint_t nelem = 0;
665 
666 		/*
667 		 * Only update status with new repairs if "no remote repair"
668 		 * is not set. Do the case_update anyway though (as this will
669 		 * refresh the status on the proxy side).
670 		 */
671 		if (!(xip->xi_flags & FMD_XPRT_NO_REMOTE_REPAIR)) {
672 			if (nvlist_lookup_uint8_array(nvl,
673 			    FM_RSRC_XPRT_FAULT_STATUS, &statusp, &nelem) == 0 &&
674 			    nelem != 0) {
675 				(void) nvlist_lookup_uint8_array(nvl,
676 				    FM_RSRC_XPRT_FAULT_HAS_ASRU, &proxy_asrup,
677 				    &nelem);
678 				fmd_case_update_status(cp, statusp,
679 				    proxy_asrup, NULL);
680 			}
681 			fmd_case_update_containees(cp);
682 		}
683 		fmd_case_update(cp);
684 		fmd_case_rele(cp);
685 	}
686 }
687 
688 void
689 fmd_xprt_event_error(fmd_xprt_impl_t *xip, nvlist_t *nvl)
690 {
691 	char *class = "<unknown>";
692 
693 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
694 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
695 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
696 
697 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
698 	TRACE((FMD_DBG_XPRT, "xprt %u bad event %s\n", xip->xi_id, class));
699 
700 	fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
701 }
702 
703 void
704 fmd_xprt_event_drop(fmd_xprt_impl_t *xip, nvlist_t *nvl)
705 {
706 	char *class = "<unknown>";
707 
708 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
709 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
710 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
711 
712 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
713 	TRACE((FMD_DBG_XPRT, "xprt %u drop event %s\n", xip->xi_id, class));
714 
715 }
716 
717 fmd_xprt_t *
718 fmd_xprt_create(fmd_module_t *mp, uint_t flags, nvlist_t *auth, void *data)
719 {
720 	fmd_xprt_impl_t *xip = fmd_zalloc(sizeof (fmd_xprt_impl_t), FMD_SLEEP);
721 	fmd_stat_t *statv;
722 	uint_t i, statc;
723 
724 	char buf[PATH_MAX];
725 	fmd_event_t *e;
726 	nvlist_t *nvl;
727 	char *s;
728 
729 	(void) pthread_mutex_init(&xip->xi_lock, NULL);
730 	(void) pthread_cond_init(&xip->xi_cv, NULL);
731 	(void) pthread_mutex_init(&xip->xi_stats_lock, NULL);
732 
733 	xip->xi_auth = auth;
734 	xip->xi_data = data;
735 	xip->xi_version = FM_RSRC_XPRT_VERSION;
736 	xip->xi_flags = flags;
737 
738 	/*
739 	 * Grab fmd.d_xprt_lock to block fmd_xprt_suspend_all() and then create
740 	 * a transport ID and make it visible in fmd.d_xprt_ids.  If transports
741 	 * were previously suspended, set the FMD_XPRT_DSUSPENDED flag on us to
742 	 * ensure that this transport will not run until fmd_xprt_resume_all().
743 	 */
744 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
745 	xip->xi_id = fmd_idspace_alloc(fmd.d_xprt_ids, xip);
746 
747 	if (fmd.d_xprt_suspend != 0)
748 		xip->xi_flags |= FMD_XPRT_DSUSPENDED;
749 
750 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
751 
752 	/*
753 	 * If the module has not yet finished _fmd_init(), set the ISUSPENDED
754 	 * bit so that fmdo_send() is not called until _fmd_init() completes.
755 	 */
756 	if (!(mp->mod_flags & FMD_MOD_INIT))
757 		xip->xi_flags |= FMD_XPRT_ISUSPENDED;
758 
759 	/*
760 	 * Initialize the transport statistics that we keep on behalf of fmd.
761 	 * These are set up using a template defined at the top of this file.
762 	 * We rename each statistic with a prefix ensuring its uniqueness.
763 	 */
764 	statc = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
765 	statv = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
766 	bcopy(&_fmd_xprt_stat_tmpl, statv, sizeof (_fmd_xprt_stat_tmpl));
767 
768 	for (i = 0; i < statc; i++) {
769 		(void) snprintf(statv[i].fmds_name,
770 		    sizeof (statv[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
771 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
772 	}
773 
774 	xip->xi_stats = (fmd_xprt_stat_t *)fmd_ustat_insert(
775 	    mp->mod_ustat, FMD_USTAT_NOALLOC, statc, statv, NULL);
776 
777 	if (xip->xi_stats == NULL)
778 		fmd_panic("failed to create xi_stats (%p)\n", (void *)statv);
779 
780 	xip->xi_stats->xs_module.fmds_value.str =
781 	    fmd_strdup(mp->mod_name, FMD_SLEEP);
782 
783 	if (xip->xi_auth != NULL)
784 		fmd_xprt_authupdate(xip);
785 
786 	/*
787 	 * Create the outbound eventq for this transport and link to its stats.
788 	 * If any suspend bits were set above, suspend the eventq immediately.
789 	 */
790 	xip->xi_queue = fmd_eventq_create(mp, &xip->xi_stats->xs_evqstat,
791 	    &xip->xi_stats_lock, mp->mod_stats->ms_xprtqlimit.fmds_value.ui32);
792 
793 	if (xip->xi_flags & FMD_XPRT_SMASK)
794 		fmd_eventq_suspend(xip->xi_queue);
795 
796 	/*
797 	 * Create our subscription hashes: local subscriptions go to xi_queue,
798 	 * remote subscriptions are tracked only for protocol requests, and
799 	 * pending unsubscriptions are associated with the /dev/null eventq.
800 	 */
801 	fmd_xprt_class_hash_create(&xip->xi_lsub, xip->xi_queue);
802 	fmd_xprt_class_hash_create(&xip->xi_rsub, NULL);
803 	fmd_xprt_class_hash_create(&xip->xi_usub, fmd.d_rmod->mod_queue);
804 
805 	/*
806 	 * Determine our initial state based upon the creation flags.  If we're
807 	 * read-only, go directly to RUN.  If we're accepting a new connection,
808 	 * wait for a SYN.  Otherwise send a SYN and wait for an ACK.
809 	 */
810 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
811 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
812 	else if (flags & FMD_XPRT_ACCEPT)
813 		fmd_xprt_transition(xip, _fmd_xprt_state_syn, "SYN");
814 	else
815 		fmd_xprt_transition(xip, _fmd_xprt_state_ack, "ACK");
816 
817 	/*
818 	 * If client.xprtlog is set to TRUE, create a debugging log for the
819 	 * events received by the transport in var/fm/fmd/xprt/.
820 	 */
821 	(void) fmd_conf_getprop(fmd.d_conf, "client.xprtlog", &i);
822 	(void) fmd_conf_getprop(fmd.d_conf, "log.xprt", &s);
823 
824 	if (i) {
825 		(void) snprintf(buf, sizeof (buf), "%s/%u.log", s, xip->xi_id);
826 		xip->xi_log = fmd_log_open(fmd.d_rootdir, buf, FMD_LOG_XPRT);
827 	}
828 
829 	ASSERT(fmd_module_locked(mp));
830 	fmd_list_append(&mp->mod_transports, xip);
831 
832 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
833 	mp->mod_stats->ms_xprtopen.fmds_value.ui32++;
834 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
835 
836 	/*
837 	 * If this is a read-only transport, return without creating a send
838 	 * queue thread and setting up any connection events in our queue.
839 	 */
840 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
841 		goto out;
842 
843 	/*
844 	 * Once the transport is fully initialized, create a send queue thread
845 	 * and start any connect events flowing to complete our initialization.
846 	 */
847 	if ((xip->xi_thread = fmd_thread_create(mp,
848 	    (fmd_thread_f *)fmd_xprt_send, xip)) == NULL) {
849 
850 		fmd_error(EFMD_XPRT_THR,
851 		    "failed to create thread for transport %u", xip->xi_id);
852 
853 		fmd_xprt_destroy((fmd_xprt_t *)xip);
854 		(void) fmd_set_errno(EFMD_XPRT_THR);
855 		return (NULL);
856 	}
857 
858 	/*
859 	 * If the transport is not being opened to accept an inbound connect,
860 	 * start an outbound connection by enqueuing a SYN event for our peer.
861 	 */
862 	if (!(flags & FMD_XPRT_ACCEPT)) {
863 		nvl = fmd_protocol_xprt_ctl(mp,
864 		    "resource.fm.xprt.syn", FM_RSRC_XPRT_VERSION);
865 
866 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
867 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
868 		fmd_eventq_insert_at_time(xip->xi_queue, e);
869 	}
870 out:
871 	fmd_dprintf(FMD_DBG_XPRT, "opened transport %u\n", xip->xi_id);
872 	return ((fmd_xprt_t *)xip);
873 }
874 
875 void
876 fmd_xprt_destroy(fmd_xprt_t *xp)
877 {
878 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
879 	fmd_module_t *mp = xip->xi_queue->eq_mod;
880 	uint_t id = xip->xi_id;
881 
882 	fmd_case_impl_t *cip, *nip;
883 	fmd_stat_t *sp;
884 	uint_t i, n;
885 
886 	ASSERT(fmd_module_locked(mp));
887 	fmd_list_delete(&mp->mod_transports, xip);
888 
889 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
890 	mp->mod_stats->ms_xprtopen.fmds_value.ui32--;
891 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
892 
893 	(void) pthread_mutex_lock(&xip->xi_lock);
894 
895 	while (xip->xi_busy != 0)
896 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
897 
898 	/*
899 	 * Remove the transport from global visibility, cancel its send-side
900 	 * thread, join with it, and then remove the transport from module
901 	 * visibility.  Once all this is done, destroy and free the transport.
902 	 */
903 	(void) fmd_idspace_free(fmd.d_xprt_ids, xip->xi_id);
904 
905 	if (xip->xi_thread != NULL) {
906 		fmd_eventq_abort(xip->xi_queue);
907 		fmd_module_unlock(mp);
908 		fmd_thread_destroy(xip->xi_thread, FMD_THREAD_JOIN);
909 		fmd_module_lock(mp);
910 	}
911 
912 	if (xip->xi_log != NULL)
913 		fmd_log_rele(xip->xi_log);
914 
915 	/*
916 	 * Release every case handle in the module that was cached by this
917 	 * transport.  This will result in these cases disappearing from the
918 	 * local case hash so that fmd_case_uuclose() and fmd_case_repaired()
919 	 * etc can no longer be used.
920 	 */
921 	for (cip = fmd_list_next(&mp->mod_cases); cip != NULL; cip = nip) {
922 		nip = fmd_list_next(cip);
923 		if (cip->ci_xprt == xp)
924 			fmd_case_discard((fmd_case_t *)cip, B_TRUE);
925 	}
926 
927 	/*
928 	 * Destroy every class in the various subscription hashes and remove
929 	 * any corresponding subscriptions from the event dispatch queue.
930 	 */
931 	fmd_xprt_class_hash_destroy(&xip->xi_lsub);
932 	fmd_xprt_class_hash_destroy(&xip->xi_rsub);
933 	fmd_xprt_class_hash_destroy(&xip->xi_usub);
934 
935 	/*
936 	 * Uniquify the stat names exactly as was done in fmd_xprt_create()
937 	 * before calling fmd_ustat_insert(), otherwise fmd_ustat_delete()
938 	 * won't find the entries in the hash table.
939 	 */
940 	n = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
941 	sp = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
942 	bcopy(&_fmd_xprt_stat_tmpl, sp, sizeof (_fmd_xprt_stat_tmpl));
943 	for (i = 0; i < n; i++) {
944 		(void) snprintf(sp[i].fmds_name,
945 		    sizeof (sp[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
946 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
947 	}
948 	fmd_ustat_delete(mp->mod_ustat, n, sp);
949 	fmd_free(sp, sizeof (_fmd_xprt_stat_tmpl));
950 
951 	fmd_free(xip->xi_stats, sizeof (fmd_xprt_stat_t));
952 	fmd_eventq_destroy(xip->xi_queue);
953 	nvlist_free(xip->xi_auth);
954 	fmd_free(xip, sizeof (fmd_xprt_impl_t));
955 
956 	fmd_dprintf(FMD_DBG_XPRT, "closed transport %u\n", id);
957 }
958 
959 void
960 fmd_xprt_xsuspend(fmd_xprt_t *xp, uint_t flags)
961 {
962 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
963 	uint_t oflags;
964 
965 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
966 	(void) pthread_mutex_lock(&xip->xi_lock);
967 
968 	oflags = xip->xi_flags;
969 	xip->xi_flags |= flags;
970 
971 	if (!(oflags & FMD_XPRT_SMASK) && (xip->xi_flags & FMD_XPRT_SMASK) != 0)
972 		fmd_eventq_suspend(xip->xi_queue);
973 
974 	(void) pthread_cond_broadcast(&xip->xi_cv);
975 
976 	while (xip->xi_busy != 0)
977 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
978 
979 	(void) pthread_mutex_unlock(&xip->xi_lock);
980 }
981 
982 void
983 fmd_xprt_xresume(fmd_xprt_t *xp, uint_t flags)
984 {
985 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
986 	uint_t oflags;
987 
988 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
989 	(void) pthread_mutex_lock(&xip->xi_lock);
990 
991 	oflags = xip->xi_flags;
992 	xip->xi_flags &= ~flags;
993 
994 	if ((oflags & FMD_XPRT_SMASK) != 0 && !(xip->xi_flags & FMD_XPRT_SMASK))
995 		fmd_eventq_resume(xip->xi_queue);
996 
997 	(void) pthread_cond_broadcast(&xip->xi_cv);
998 	(void) pthread_mutex_unlock(&xip->xi_lock);
999 }
1000 
1001 void
1002 fmd_xprt_send(fmd_xprt_t *xp)
1003 {
1004 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1005 	fmd_module_t *mp = xip->xi_queue->eq_mod;
1006 	fmd_event_t *ep;
1007 	int err;
1008 
1009 	while ((ep = fmd_eventq_delete(xip->xi_queue)) != NULL) {
1010 		if (FMD_EVENT_TTL(ep) == 0) {
1011 			fmd_event_rele(ep);
1012 			continue;
1013 		}
1014 
1015 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u sending %s\n",
1016 		    xip->xi_id, (char *)FMD_EVENT_DATA(ep));
1017 
1018 		err = mp->mod_ops->mop_transport(mp, xp, ep);
1019 		fmd_eventq_done(xip->xi_queue);
1020 
1021 		if (err == FMD_SEND_RETRY) {
1022 			fmd_eventq_insert_at_time(xip->xi_queue, ep);
1023 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1024 			xip->xi_stats->xs_retried.fmds_value.ui64++;
1025 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1026 		}
1027 
1028 		if (err != FMD_SEND_SUCCESS && err != FMD_SEND_RETRY) {
1029 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1030 			xip->xi_stats->xs_lost.fmds_value.ui64++;
1031 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1032 		}
1033 
1034 		fmd_event_rele(ep);
1035 	}
1036 }
1037 
1038 /*
1039  * This function creates a local suspect list. This is used when a suspect list
1040  * is created directly by an external source like fminject.
1041  */
1042 static void
1043 fmd_xprt_list_suspect_local(fmd_xprt_t *xp, nvlist_t *nvl)
1044 {
1045 	nvlist_t **nvlp;
1046 	nvlist_t *de_fmri, *de_fmri_dup = NULL;
1047 	int64_t *diag_time;
1048 	char *code = NULL;
1049 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1050 	fmd_case_t *cp;
1051 	uint_t nelem = 0, nelem2 = 0, i;
1052 
1053 	fmd_module_lock(xip->xi_queue->eq_mod);
1054 	cp = fmd_case_create(xip->xi_queue->eq_mod, NULL);
1055 	if (cp == NULL) {
1056 		fmd_module_unlock(xip->xi_queue->eq_mod);
1057 		return;
1058 	}
1059 
1060 	/*
1061 	 * copy diag_code if present
1062 	 */
1063 	(void) nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code);
1064 	if (code != NULL) {
1065 		fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1066 
1067 		cip->ci_precanned = 1;
1068 		fmd_case_setcode(cp, code);
1069 	}
1070 
1071 	/*
1072 	 * copy suspects
1073 	 */
1074 	(void) nvlist_lookup_nvlist_array(nvl, FM_SUSPECT_FAULT_LIST, &nvlp,
1075 	    &nelem);
1076 	for (i = 0; i < nelem; i++) {
1077 		nvlist_t *flt_copy, *asru = NULL, *fru = NULL, *rsrc = NULL;
1078 		topo_hdl_t *thp;
1079 		char *loc = NULL;
1080 		int err;
1081 
1082 		thp = fmd_fmri_topo_hold(TOPO_VERSION);
1083 		(void) nvlist_xdup(nvlp[i], &flt_copy, &fmd.d_nva);
1084 		(void) nvlist_lookup_nvlist(nvlp[i], FM_FAULT_RESOURCE, &rsrc);
1085 
1086 		/*
1087 		 * If no fru specified, get it from topo
1088 		 */
1089 		if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_FRU, &fru) != 0 &&
1090 		    rsrc && topo_fmri_fru(thp, rsrc, &fru, &err) == 0)
1091 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_FRU, fru);
1092 		/*
1093 		 * If no asru specified, get it from topo
1094 		 */
1095 		if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU, &asru) != 0 &&
1096 		    rsrc && topo_fmri_asru(thp, rsrc, &asru, &err) == 0)
1097 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_ASRU, asru);
1098 		/*
1099 		 * If no location specified, get it from topo
1100 		 */
1101 		if (nvlist_lookup_string(nvlp[i], FM_FAULT_LOCATION,
1102 		    &loc) != 0) {
1103 			if (fru && topo_fmri_label(thp, fru, &loc, &err) == 0)
1104 				(void) nvlist_add_string(flt_copy,
1105 				    FM_FAULT_LOCATION, loc);
1106 			else if (rsrc && topo_fmri_label(thp, rsrc, &loc,
1107 			    &err) == 0)
1108 				(void) nvlist_add_string(flt_copy,
1109 				    FM_FAULT_LOCATION, loc);
1110 			if (loc)
1111 				topo_hdl_strfree(thp, loc);
1112 		}
1113 		if (fru)
1114 			nvlist_free(fru);
1115 		if (asru)
1116 			nvlist_free(asru);
1117 		if (rsrc)
1118 			nvlist_free(rsrc);
1119 		fmd_fmri_topo_rele(thp);
1120 		fmd_case_insert_suspect(cp, flt_copy);
1121 	}
1122 
1123 	/*
1124 	 * copy diag_time if present
1125 	 */
1126 	if (nvlist_lookup_int64_array(nvl, FM_SUSPECT_DIAG_TIME, &diag_time,
1127 	    &nelem2) == 0 && nelem2 >= 2)
1128 		fmd_case_settime(cp, diag_time[0], diag_time[1]);
1129 
1130 	/*
1131 	 * copy DE fmri if present
1132 	 */
1133 	if (nvlist_lookup_nvlist(nvl, FM_SUSPECT_DE, &de_fmri) == 0) {
1134 		(void) nvlist_xdup(de_fmri, &de_fmri_dup, &fmd.d_nva);
1135 		fmd_case_set_de_fmri(cp, de_fmri_dup);
1136 	}
1137 
1138 	fmd_case_transition(cp, FMD_CASE_SOLVED, FMD_CF_SOLVED);
1139 	fmd_module_unlock(xip->xi_queue->eq_mod);
1140 }
1141 
1142 /*
1143  * This function is called to create a proxy case on receipt of a list.suspect
1144  * from the diagnosing side of the transport.
1145  */
1146 static void
1147 fmd_xprt_list_suspect(fmd_xprt_t *xp, nvlist_t *nvl)
1148 {
1149 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1150 	nvlist_t **nvlp;
1151 	uint_t nelem = 0, nelem2 = 0, i;
1152 	int64_t *diag_time;
1153 	topo_hdl_t *thp;
1154 	char *class;
1155 	nvlist_t *rsrc, *asru, *de_fmri, *de_fmri_dup = NULL;
1156 	nvlist_t *flt_copy;
1157 	int err;
1158 	nvlist_t **asrua;
1159 	uint8_t *proxy_asru = NULL;
1160 	int got_proxy_asru = 0;
1161 	int got_hc_rsrc = 0;
1162 	int got_present_rsrc = 0;
1163 	uint8_t *diag_asru = NULL;
1164 	char *scheme;
1165 	uint8_t *statusp;
1166 	char *uuid, *code;
1167 	fmd_case_t *cp;
1168 	fmd_case_impl_t *cip;
1169 	int need_update = 0;
1170 
1171 	if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) != 0)
1172 		return;
1173 	if (nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code) != 0)
1174 		return;
1175 	(void) nvlist_lookup_nvlist_array(nvl, FM_SUSPECT_FAULT_LIST, &nvlp,
1176 	    &nelem);
1177 
1178 	/*
1179 	 * In order to implement FMD_XPRT_HCONLY and FMD_XPRT_HC_PRESENT_ONLY
1180 	 * etc we first scan the suspects to see if
1181 	 * - there was an asru in the received fault
1182 	 * - there was an hc-scheme resource in the received fault
1183 	 * - any hc-scheme resource in the received fault is present in the
1184 	 *   local topology
1185 	 * - any hc-scheme resource in the received fault has an asru in the
1186 	 *   local topology
1187 	 */
1188 	if (nelem > 0) {
1189 		asrua = fmd_zalloc(sizeof (nvlist_t *) * nelem, FMD_SLEEP);
1190 		proxy_asru = fmd_zalloc(sizeof (uint8_t) * nelem, FMD_SLEEP);
1191 		diag_asru = fmd_zalloc(sizeof (uint8_t) * nelem, FMD_SLEEP);
1192 		thp = fmd_fmri_topo_hold(TOPO_VERSION);
1193 		for (i = 0; i < nelem; i++) {
1194 			if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU,
1195 			    &asru) == 0 && asru != NULL)
1196 				diag_asru[i] = 1;
1197 			if (nvlist_lookup_string(nvlp[i], FM_CLASS,
1198 			    &class) != 0 || strncmp(class, "fault", 5) != 0)
1199 				continue;
1200 			/*
1201 			 * If there is an hc-scheme asru, use that to find the
1202 			 * real asru. Otherwise if there is an hc-scheme
1203 			 * resource, work out the old asru from that.
1204 			 * This order is to allow a two stage evaluation
1205 			 * of the asru where a fault in the diagnosing side
1206 			 * is in a component not visible to the proxy side,
1207 			 * but prevents a component that is visible from
1208 			 * working. So the diagnosing side sets the asru to
1209 			 * the latter component (in hc-scheme as the diagnosing
1210 			 * side doesn't know about the proxy side's virtual
1211 			 * schemes), and then the proxy side can convert that
1212 			 * to a suitable virtual scheme asru.
1213 			 */
1214 			if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU,
1215 			    &asru) == 0 && asru != NULL &&
1216 			    nvlist_lookup_string(asru, FM_FMRI_SCHEME,
1217 			    &scheme) == 0 &&
1218 			    strcmp(scheme, FM_FMRI_SCHEME_HC) == 0) {
1219 				got_hc_rsrc = 1;
1220 				if (xip->xi_flags & FMD_XPRT_EXTERNAL)
1221 					continue;
1222 				if (topo_fmri_present(thp, asru, &err) == 0)
1223 					got_present_rsrc = 1;
1224 				if (topo_fmri_asru(thp, asru, &asrua[i],
1225 				    &err) == 0) {
1226 					proxy_asru[i] =
1227 					    FMD_PROXY_ASRU_FROM_ASRU;
1228 					got_proxy_asru = 1;
1229 				}
1230 			} else if (nvlist_lookup_nvlist(nvlp[i],
1231 			    FM_FAULT_RESOURCE, &rsrc) == 0 && rsrc != NULL &&
1232 			    nvlist_lookup_string(rsrc, FM_FMRI_SCHEME,
1233 			    &scheme) == 0 &&
1234 			    strcmp(scheme, FM_FMRI_SCHEME_HC) == 0) {
1235 				got_hc_rsrc = 1;
1236 				if (xip->xi_flags & FMD_XPRT_EXTERNAL)
1237 					continue;
1238 				if (topo_fmri_present(thp, rsrc, &err) == 0)
1239 					got_present_rsrc = 1;
1240 				if (topo_fmri_asru(thp, rsrc, &asrua[i],
1241 				    &err) == 0) {
1242 					proxy_asru[i] =
1243 					    FMD_PROXY_ASRU_FROM_RSRC;
1244 					got_proxy_asru = 1;
1245 				}
1246 			}
1247 		}
1248 		fmd_fmri_topo_rele(thp);
1249 	}
1250 
1251 	/*
1252 	 * If we're set up only to report hc-scheme faults, and
1253 	 * there aren't any, then just drop the event.
1254 	 */
1255 	if (got_hc_rsrc == 0 && (xip->xi_flags & FMD_XPRT_HCONLY)) {
1256 		if (nelem > 0) {
1257 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1258 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1259 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1260 		}
1261 		return;
1262 	}
1263 
1264 	/*
1265 	 * If we're set up only to report locally present hc-scheme
1266 	 * faults, and there aren't any, then just drop the event.
1267 	 */
1268 	if (got_present_rsrc == 0 &&
1269 	    (xip->xi_flags & FMD_XPRT_HC_PRESENT_ONLY)) {
1270 		if (nelem > 0) {
1271 			for (i = 0; i < nelem; i++)
1272 				if (asrua[i])
1273 					nvlist_free(asrua[i]);
1274 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1275 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1276 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1277 		}
1278 		return;
1279 	}
1280 
1281 	/*
1282 	 * If fmd_case_recreate() returns NULL, UUID is already known.
1283 	 */
1284 	fmd_module_lock(xip->xi_queue->eq_mod);
1285 	if ((cp = fmd_case_recreate(xip->xi_queue->eq_mod, xp,
1286 	    FMD_CASE_UNSOLVED, uuid, code)) == NULL) {
1287 		if (nelem > 0) {
1288 			for (i = 0; i < nelem; i++)
1289 				if (asrua[i])
1290 					nvlist_free(asrua[i]);
1291 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1292 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1293 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1294 		}
1295 		fmd_module_unlock(xip->xi_queue->eq_mod);
1296 		return;
1297 	}
1298 
1299 	cip = (fmd_case_impl_t *)cp;
1300 	cip->ci_diag_asru = diag_asru;
1301 	cip->ci_proxy_asru = proxy_asru;
1302 	for (i = 0; i < nelem; i++) {
1303 		(void) nvlist_xdup(nvlp[i], &flt_copy, &fmd.d_nva);
1304 		if (proxy_asru[i] != FMD_PROXY_ASRU_NOT_NEEDED) {
1305 			/*
1306 			 * Copy suspects, but remove/replace asru first. Also if
1307 			 * the original asru was hc-scheme use that as resource.
1308 			 */
1309 			if (proxy_asru[i] == FMD_PROXY_ASRU_FROM_ASRU) {
1310 				(void) nvlist_remove(flt_copy,
1311 				    FM_FAULT_RESOURCE, DATA_TYPE_NVLIST);
1312 				(void) nvlist_lookup_nvlist(flt_copy,
1313 				    FM_FAULT_ASRU, &asru);
1314 				(void) nvlist_add_nvlist(flt_copy,
1315 				    FM_FAULT_RESOURCE, asru);
1316 			}
1317 			(void) nvlist_remove(flt_copy, FM_FAULT_ASRU,
1318 			    DATA_TYPE_NVLIST);
1319 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_ASRU,
1320 			    asrua[i]);
1321 			nvlist_free(asrua[i]);
1322 		} else if (nvlist_lookup_nvlist(flt_copy, FM_FAULT_ASRU,
1323 		    &asru) == 0 && asru != NULL) {
1324 			/*
1325 			 * keep asru from diag side, but but mark as no retire
1326 			 */
1327 			(void) nvlist_add_boolean_value(flt_copy,
1328 			    FM_SUSPECT_RETIRE, B_FALSE);
1329 		}
1330 		fmd_case_insert_suspect(cp, flt_copy);
1331 	}
1332 	/*
1333 	 * copy diag_time
1334 	 */
1335 	if (nvlist_lookup_int64_array(nvl, FM_SUSPECT_DIAG_TIME, &diag_time,
1336 	    &nelem2) == 0 && nelem2 >= 2)
1337 		fmd_case_settime(cp, diag_time[0], diag_time[1]);
1338 	/*
1339 	 * copy DE fmri
1340 	 */
1341 	if (nvlist_lookup_nvlist(nvl, FM_SUSPECT_DE, &de_fmri) == 0) {
1342 		(void) nvlist_xdup(de_fmri, &de_fmri_dup, &fmd.d_nva);
1343 		fmd_case_set_de_fmri(cp, de_fmri_dup);
1344 	}
1345 
1346 	/*
1347 	 * Transition to solved. This will log the suspect list and create
1348 	 * the resource cache entries.
1349 	 */
1350 	fmd_case_transition(cp, FMD_CASE_SOLVED, FMD_CF_SOLVED);
1351 
1352 	/*
1353 	 * Update status if it is not simply "all faulty" (can happen if
1354 	 * list.suspects are being re-sent when the transport has reconnected).
1355 	 */
1356 	(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS, &statusp,
1357 	    &nelem);
1358 	for (i = 0; i < nelem; i++) {
1359 		if ((statusp[i] & (FM_SUSPECT_FAULTY | FM_SUSPECT_UNUSABLE |
1360 		    FM_SUSPECT_NOT_PRESENT | FM_SUSPECT_DEGRADED)) !=
1361 		    FM_SUSPECT_FAULTY)
1362 			need_update = 1;
1363 	}
1364 	if (need_update) {
1365 		fmd_case_update_status(cp, statusp, cip->ci_proxy_asru,
1366 		    cip->ci_diag_asru);
1367 		fmd_case_update_containees(cp);
1368 		fmd_case_update(cp);
1369 	}
1370 
1371 	/*
1372 	 * if asru on proxy side, send an update back to the diagnosing side to
1373 	 * update UNUSABLE/DEGRADED.
1374 	 */
1375 	if (got_proxy_asru)
1376 		fmd_case_xprt_updated(cp);
1377 
1378 	if (nelem > 0)
1379 		fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1380 	fmd_module_unlock(xip->xi_queue->eq_mod);
1381 }
1382 
1383 void
1384 fmd_xprt_recv(fmd_xprt_t *xp, nvlist_t *nvl, hrtime_t hrt, boolean_t logonly)
1385 {
1386 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1387 	const fmd_xprt_rule_t *xrp;
1388 	fmd_t *dp = &fmd;
1389 
1390 	fmd_event_t *e;
1391 	char *class, *uuid;
1392 	boolean_t isproto, isereport;
1393 
1394 	uint64_t *tod;
1395 	uint8_t ttl;
1396 	uint_t n;
1397 	fmd_case_t *cp;
1398 
1399 	/*
1400 	 * Grab the transport lock and set the busy flag to indicate we are
1401 	 * busy receiving an event.  If [DI]SUSPEND is pending, wait until fmd
1402 	 * resumes the transport before continuing on with the receive.
1403 	 */
1404 	(void) pthread_mutex_lock(&xip->xi_lock);
1405 
1406 	while (xip->xi_flags & (FMD_XPRT_DSUSPENDED | FMD_XPRT_ISUSPENDED)) {
1407 
1408 		if (fmd.d_signal != 0) {
1409 			(void) pthread_mutex_unlock(&xip->xi_lock);
1410 			return; /* fmd_destroy() is in progress */
1411 		}
1412 
1413 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
1414 	}
1415 
1416 	xip->xi_busy++;
1417 	ASSERT(xip->xi_busy != 0);
1418 
1419 	(void) pthread_mutex_unlock(&xip->xi_lock);
1420 
1421 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
1422 	xip->xi_stats->xs_received.fmds_value.ui64++;
1423 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1424 
1425 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) {
1426 		fmd_error(EFMD_XPRT_PAYLOAD, "discarding nvlist %p: missing "
1427 		    "required \"%s\" payload element", (void *)nvl, FM_CLASS);
1428 
1429 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1430 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1431 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1432 
1433 		nvlist_free(nvl);
1434 		goto done;
1435 	}
1436 
1437 	fmd_dprintf(FMD_DBG_XPRT, "xprt %u %s %s\n", xip->xi_id,
1438 	    ((logonly == FMD_B_TRUE) ? "logging" : "posting"), class);
1439 
1440 	isereport = (strncmp(class, FM_EREPORT_CLASS,
1441 	    sizeof (FM_EREPORT_CLASS - 1)) == 0) ? FMD_B_TRUE : FMD_B_FALSE;
1442 
1443 	/*
1444 	 * The logonly flag should only be set for ereports.
1445 	 */
1446 	if ((logonly == FMD_B_TRUE) && (isereport == FMD_B_FALSE)) {
1447 		fmd_error(EFMD_XPRT_INVAL, "discarding nvlist %p: "
1448 		    "logonly flag is not valid for class %s",
1449 		    (void *)nvl, class);
1450 
1451 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1452 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1453 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1454 
1455 		nvlist_free(nvl);
1456 		goto done;
1457 	}
1458 
1459 	/*
1460 	 * If a time-to-live value is present in the event and is zero, drop
1461 	 * the event and bump xs_timeouts.  Otherwise decrement the TTL value.
1462 	 */
1463 	if (nvlist_lookup_uint8(nvl, FMD_EVN_TTL, &ttl) == 0) {
1464 		if (ttl == 0) {
1465 			fmd_dprintf(FMD_DBG_XPRT, "xprt %u nvlist %p (%s) "
1466 			    "timeout: event received with ttl=0\n",
1467 			    xip->xi_id, (void *)nvl, class);
1468 
1469 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1470 			xip->xi_stats->xs_timeouts.fmds_value.ui64++;
1471 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1472 
1473 			nvlist_free(nvl);
1474 			goto done;
1475 		}
1476 		(void) nvlist_remove(nvl, FMD_EVN_TTL, DATA_TYPE_UINT8);
1477 		(void) nvlist_add_uint8(nvl, FMD_EVN_TTL, ttl - 1);
1478 	}
1479 
1480 	/*
1481 	 * If we are using the native system clock, the underlying transport
1482 	 * code can provide a tighter event time bound by telling us when the
1483 	 * event was enqueued.  If we're using simulated clocks, this time
1484 	 * has no meaning to us, so just reset the value to use HRT_NOW.
1485 	 */
1486 	if (dp->d_clockops != &fmd_timeops_native)
1487 		hrt = FMD_HRT_NOW;
1488 
1489 	/*
1490 	 * If an event's class is in the FMD_CTL_CLASS family, then create a
1491 	 * control event.  If a FMD_EVN_TOD member is found, create a protocol
1492 	 * event using this time.  Otherwise create a protocol event using hrt.
1493 	 */
1494 	isproto = (strncmp(class, FMD_CTL_CLASS, FMD_CTL_CLASS_LEN) == 0) ?
1495 	    FMD_B_FALSE : FMD_B_TRUE;
1496 	if (isproto == FMD_B_FALSE)
1497 		e = fmd_event_create(FMD_EVT_CTL, hrt, nvl, fmd_ctl_init(nvl));
1498 	else if (nvlist_lookup_uint64_array(nvl, FMD_EVN_TOD, &tod, &n) != 0)
1499 		e = fmd_event_create(FMD_EVT_PROTOCOL, hrt, nvl, class);
1500 	else {
1501 		e = fmd_event_recreate(FMD_EVT_PROTOCOL,
1502 		    NULL, nvl, class, NULL, 0, 0);
1503 	}
1504 
1505 	/*
1506 	 * If the debug log is enabled, create a temporary event, log it to the
1507 	 * debug log, and then reset the underlying state of the event.
1508 	 */
1509 	if (xip->xi_log != NULL) {
1510 		fmd_event_impl_t *ep = (fmd_event_impl_t *)e;
1511 
1512 		fmd_log_append(xip->xi_log, e, NULL);
1513 
1514 		ep->ev_flags |= FMD_EVF_VOLATILE;
1515 		ep->ev_off = 0;
1516 		ep->ev_len = 0;
1517 
1518 		if (ep->ev_log != NULL) {
1519 			fmd_log_rele(ep->ev_log);
1520 			ep->ev_log = NULL;
1521 		}
1522 	}
1523 
1524 	/*
1525 	 * Iterate over the rules for the current state trying to match the
1526 	 * event class to one of our special rules.  If a rule is matched, the
1527 	 * event is consumed and not dispatched to other modules.  If the rule
1528 	 * set ends without matching an event, we fall through to dispatching.
1529 	 */
1530 	for (xrp = xip->xi_state; xrp->xr_class != NULL; xrp++) {
1531 		if (fmd_event_match(e, FMD_EVT_PROTOCOL, xrp->xr_class)) {
1532 			fmd_event_hold(e);
1533 			xrp->xr_func(xip, nvl);
1534 			fmd_event_rele(e);
1535 			goto done;
1536 		}
1537 	}
1538 
1539 	/*
1540 	 * Record the event in the errlog if it is an ereport.  This code will
1541 	 * be replaced later with a per-transport intent log instead.
1542 	 */
1543 	if (isereport == FMD_B_TRUE) {
1544 		(void) pthread_rwlock_rdlock(&dp->d_log_lock);
1545 		fmd_log_append(dp->d_errlog, e, NULL);
1546 		(void) pthread_rwlock_unlock(&dp->d_log_lock);
1547 	}
1548 
1549 	/*
1550 	 * If a list.suspect event is received, create a case for the specified
1551 	 * UUID in the case hash, with the transport module as its owner.
1552 	 */
1553 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_SUSPECT_CLASS)) {
1554 		if (xip->xi_flags & FMD_XPRT_CACHE_AS_LOCAL)
1555 			fmd_xprt_list_suspect_local(xp, nvl);
1556 		else
1557 			fmd_xprt_list_suspect(xp, nvl);
1558 		fmd_event_hold(e);
1559 		fmd_event_rele(e);
1560 		goto done;
1561 	}
1562 
1563 	/*
1564 	 * If a list.updated or list.repaired event is received, update the
1565 	 * resource cache status and the local case.
1566 	 */
1567 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_REPAIRED_CLASS) ||
1568 	    fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_UPDATED_CLASS)) {
1569 		uint8_t *statusp;
1570 		uint_t nelem = 0;
1571 
1572 		(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS,
1573 		    &statusp, &nelem);
1574 		fmd_module_lock(xip->xi_queue->eq_mod);
1575 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1576 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1577 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1578 			if (cip->ci_xprt != NULL) {
1579 				fmd_case_update_status(cp, statusp,
1580 				    cip->ci_proxy_asru, cip->ci_diag_asru);
1581 				fmd_case_update_containees(cp);
1582 				fmd_case_update(cp);
1583 			}
1584 			fmd_case_rele(cp);
1585 		}
1586 		fmd_module_unlock(xip->xi_queue->eq_mod);
1587 		fmd_event_hold(e);
1588 		fmd_event_rele(e);
1589 		goto done;
1590 	}
1591 
1592 	/*
1593 	 * If a list.isolated event is received, update resource cache status
1594 	 */
1595 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_ISOLATED_CLASS)) {
1596 		uint8_t *statusp;
1597 		uint_t nelem = 0;
1598 
1599 		(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS,
1600 		    &statusp, &nelem);
1601 		fmd_module_lock(xip->xi_queue->eq_mod);
1602 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1603 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1604 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1605 			if (cip->ci_xprt != NULL)
1606 				fmd_case_update_status(cp, statusp,
1607 				    cip->ci_proxy_asru, cip->ci_diag_asru);
1608 			fmd_case_rele(cp);
1609 		}
1610 		fmd_module_unlock(xip->xi_queue->eq_mod);
1611 		fmd_event_hold(e);
1612 		fmd_event_rele(e);
1613 		goto done;
1614 	}
1615 
1616 	/*
1617 	 * If a list.resolved event is received, resolve the local case.
1618 	 */
1619 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_RESOLVED_CLASS)) {
1620 		fmd_module_lock(xip->xi_queue->eq_mod);
1621 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1622 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1623 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1624 			if (cip->ci_xprt != NULL)
1625 				fmd_case_transition(cp, (cip->ci_state ==
1626 				    FMD_CASE_REPAIRED) ? FMD_CASE_RESOLVED :
1627 				    (cip->ci_state == FMD_CASE_CLOSED) ?
1628 				    FMD_CASE_REPAIRED : FMD_CASE_CLOSE_WAIT,
1629 				    FMD_CF_RESOLVED);
1630 			fmd_case_rele(cp);
1631 		}
1632 		fmd_module_unlock(xip->xi_queue->eq_mod);
1633 		fmd_event_hold(e);
1634 		fmd_event_rele(e);
1635 		goto done;
1636 	}
1637 
1638 	if (logonly == FMD_B_TRUE || (xip->xi_flags & FMD_XPRT_EXTERNAL)) {
1639 		/*
1640 		 * Don't proxy ereports on an EXTERNAL transport - we won't
1641 		 * know how to diagnose them with the wrong topology. Note
1642 		 * that here (and above) we have to hold/release the event in
1643 		 * order for it to be freed.
1644 		 */
1645 		fmd_event_hold(e);
1646 		fmd_event_rele(e);
1647 	} else if (isproto == FMD_B_TRUE)
1648 		fmd_dispq_dispatch(dp->d_disp, e, class);
1649 	else
1650 		fmd_modhash_dispatch(dp->d_mod_hash, e);
1651 done:
1652 	(void) pthread_mutex_lock(&xip->xi_lock);
1653 
1654 	ASSERT(xip->xi_busy != 0);
1655 	xip->xi_busy--;
1656 
1657 	(void) pthread_cond_broadcast(&xip->xi_cv);
1658 	(void) pthread_mutex_unlock(&xip->xi_lock);
1659 }
1660 
1661 void
1662 fmd_xprt_uuclose(fmd_xprt_t *xp, const char *uuid)
1663 {
1664 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1665 
1666 	fmd_event_t *e;
1667 	nvlist_t *nvl;
1668 	char *s;
1669 
1670 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1671 		return; /* read-only transports do not proxy uuclose */
1672 
1673 	TRACE((FMD_DBG_XPRT, "xprt %u closing case %s\n", xip->xi_id, uuid));
1674 
1675 	nvl = fmd_protocol_xprt_uuclose(xip->xi_queue->eq_mod,
1676 	    "resource.fm.xprt.uuclose", xip->xi_version, uuid);
1677 
1678 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1679 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1680 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1681 }
1682 
1683 /*
1684  * On proxy side, send back uuresolved request to diagnosing side
1685  */
1686 void
1687 fmd_xprt_uuresolved(fmd_xprt_t *xp, const char *uuid)
1688 {
1689 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1690 
1691 	fmd_event_t *e;
1692 	nvlist_t *nvl;
1693 	char *s;
1694 
1695 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1696 		return; /* read-only transports do not proxy uuresolved */
1697 
1698 	TRACE((FMD_DBG_XPRT, "xprt %u resolving case %s\n", xip->xi_id, uuid));
1699 
1700 	nvl = fmd_protocol_xprt_uuresolved(xip->xi_queue->eq_mod,
1701 	    "resource.fm.xprt.uuresolved", xip->xi_version, uuid);
1702 
1703 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1704 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1705 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1706 }
1707 
1708 /*
1709  * On proxy side, send back repair/acquit/etc request to diagnosing side
1710  */
1711 void
1712 fmd_xprt_updated(fmd_xprt_t *xp, const char *uuid, uint8_t *statusp,
1713 	uint8_t *has_asrup, uint_t nelem)
1714 {
1715 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1716 
1717 	fmd_event_t *e;
1718 	nvlist_t *nvl;
1719 	char *s;
1720 
1721 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1722 		return; /* read-only transports do not support remote repairs */
1723 
1724 	TRACE((FMD_DBG_XPRT, "xprt %u updating case %s\n", xip->xi_id, uuid));
1725 
1726 	nvl = fmd_protocol_xprt_updated(xip->xi_queue->eq_mod,
1727 	    "resource.fm.xprt.updated", xip->xi_version, uuid, statusp,
1728 	    has_asrup, nelem);
1729 
1730 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1731 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1732 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1733 }
1734 
1735 /*
1736  * Insert the specified class into our remote subscription hash.  If the class
1737  * is already present, bump the reference count; otherwise add it to the hash
1738  * and then enqueue an event for our remote peer to proxy our subscription.
1739  */
1740 void
1741 fmd_xprt_subscribe(fmd_xprt_t *xp, const char *class)
1742 {
1743 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1744 
1745 	uint_t refs;
1746 	nvlist_t *nvl;
1747 	fmd_event_t *e;
1748 	char *s;
1749 
1750 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1751 		return; /* read-only transports do not proxy subscriptions */
1752 
1753 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1754 		return; /* transport is not yet an active subscriber */
1755 
1756 	(void) pthread_mutex_lock(&xip->xi_lock);
1757 	refs = fmd_xprt_class_hash_insert(xip, &xip->xi_rsub, class);
1758 	(void) pthread_mutex_unlock(&xip->xi_lock);
1759 
1760 	if (refs > 1)
1761 		return; /* we've already asked our peer for this subscription */
1762 
1763 	fmd_dprintf(FMD_DBG_XPRT,
1764 	    "xprt %u subscribing to %s\n", xip->xi_id, class);
1765 
1766 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1767 	    "resource.fm.xprt.subscribe", xip->xi_version, class);
1768 
1769 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1770 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1771 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1772 }
1773 
1774 /*
1775  * Delete the specified class from the remote subscription hash.  If the
1776  * reference count drops to zero, ask our remote peer to unsubscribe by proxy.
1777  */
1778 void
1779 fmd_xprt_unsubscribe(fmd_xprt_t *xp, const char *class)
1780 {
1781 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1782 
1783 	uint_t refs;
1784 	nvlist_t *nvl;
1785 	fmd_event_t *e;
1786 	char *s;
1787 
1788 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1789 		return; /* read-only transports do not proxy subscriptions */
1790 
1791 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1792 		return; /* transport is not yet an active subscriber */
1793 
1794 	/*
1795 	 * If the subscription reference count drops to zero in xi_rsub, insert
1796 	 * an entry into the xi_usub hash indicating we await an unsuback event.
1797 	 */
1798 	(void) pthread_mutex_lock(&xip->xi_lock);
1799 
1800 	if ((refs = fmd_xprt_class_hash_delete(xip, &xip->xi_rsub, class)) == 0)
1801 		(void) fmd_xprt_class_hash_insert(xip, &xip->xi_usub, class);
1802 
1803 	(void) pthread_mutex_unlock(&xip->xi_lock);
1804 
1805 	if (refs != 0)
1806 		return; /* other subscriptions for this class still active */
1807 
1808 	fmd_dprintf(FMD_DBG_XPRT,
1809 	    "xprt %u unsubscribing from %s\n", xip->xi_id, class);
1810 
1811 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1812 	    "resource.fm.xprt.unsubscribe", xip->xi_version, class);
1813 
1814 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1815 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1816 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1817 }
1818 
1819 static void
1820 fmd_xprt_subscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1821 {
1822 	fmd_xprt_t *xp;
1823 
1824 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1825 		fmd_xprt_subscribe(xp, class);
1826 		fmd_idspace_rele(ids, id);
1827 	}
1828 }
1829 
1830 void
1831 fmd_xprt_subscribe_all(const char *class)
1832 {
1833 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1834 
1835 	if (ids->ids_count != 0)
1836 		fmd_idspace_apply(ids, fmd_xprt_subscribe_xid, (void *)class);
1837 }
1838 
1839 static void
1840 fmd_xprt_unsubscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1841 {
1842 	fmd_xprt_t *xp;
1843 
1844 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1845 		fmd_xprt_unsubscribe(xp, class);
1846 		fmd_idspace_rele(ids, id);
1847 	}
1848 }
1849 
1850 void
1851 fmd_xprt_unsubscribe_all(const char *class)
1852 {
1853 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1854 
1855 	if (ids->ids_count != 0)
1856 		fmd_idspace_apply(ids, fmd_xprt_unsubscribe_xid, (void *)class);
1857 }
1858 
1859 /*ARGSUSED*/
1860 static void
1861 fmd_xprt_suspend_xid(fmd_idspace_t *ids, id_t id, void *arg)
1862 {
1863 	fmd_xprt_t *xp;
1864 
1865 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1866 		fmd_xprt_xsuspend(xp, FMD_XPRT_DSUSPENDED);
1867 		fmd_idspace_rele(ids, id);
1868 	}
1869 }
1870 
1871 void
1872 fmd_xprt_suspend_all(void)
1873 {
1874 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1875 
1876 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1877 
1878 	if (fmd.d_xprt_suspend++ != 0) {
1879 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1880 		return; /* already suspended */
1881 	}
1882 
1883 	if (ids->ids_count != 0)
1884 		fmd_idspace_apply(ids, fmd_xprt_suspend_xid, NULL);
1885 
1886 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1887 }
1888 
1889 /*ARGSUSED*/
1890 static void
1891 fmd_xprt_resume_xid(fmd_idspace_t *ids, id_t id, void *arg)
1892 {
1893 	fmd_xprt_t *xp;
1894 
1895 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1896 		fmd_xprt_xresume(xp, FMD_XPRT_DSUSPENDED);
1897 		fmd_idspace_rele(ids, id);
1898 	}
1899 }
1900 
1901 void
1902 fmd_xprt_resume_all(void)
1903 {
1904 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1905 
1906 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1907 
1908 	if (fmd.d_xprt_suspend == 0)
1909 		fmd_panic("fmd_xprt_suspend/resume_all mismatch\n");
1910 
1911 	if (--fmd.d_xprt_suspend != 0) {
1912 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1913 		return; /* not ready to be resumed */
1914 	}
1915 
1916 	if (ids->ids_count != 0)
1917 		fmd_idspace_apply(ids, fmd_xprt_resume_xid, NULL);
1918 
1919 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1920 }
1921