xref: /titanic_41/usr/src/cmd/fm/fmd/common/fmd_xprt.c (revision 450396635f70344c58b6b1e4db38cf17ff34445c)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * FMD Transport Subsystem
29  *
30  * A transport module uses some underlying mechanism to transport events.
31  * This mechanism may use any underlying link-layer protocol and may support
32  * additional link-layer packets unrelated to FMA.  Some appropriate link-
33  * layer mechanism to create the underlying connection is expected to be
34  * called prior to calling fmd_xprt_open() itself.  Alternatively, a transport
35  * may be created in the suspended state by specifying the FMD_XPRT_SUSPENDED
36  * flag as part of the call to fmd_xprt_open(), and then may be resumed later.
37  * The underlying transport mechanism is *required* to provide ordering: that
38  * is, the sequences of bytes written across the transport must be read by
39  * the remote peer in the order that they are written, even across separate
40  * calls to fmdo_send().  As an example, the Internet TCP protocol would be
41  * a valid transport as it guarantees ordering, whereas the Internet UDP
42  * protocol would not because UDP datagrams may be delivered in any order
43  * as a result of delays introduced when datagrams pass through routers.
44  *
45  * Similar to sending events, a transport module receives events that are from
46  * its peer remote endpoint using some transport-specific mechanism that is
47  * unknown to FMD.  As each event is received, the transport module is
48  * responsible for constructing a valid nvlist_t object from the data and then
49  * calling fmd_xprt_post() to post the event to the containing FMD's dispatch
50  * queue, making it available to all local modules that are not transport
51  * modules that have subscribed to the event.
52  *
53  * The following state machine is used for each transport.  The initial state
54  * is either SYN, ACK, or RUN, depending on the flags specified to xprt_create.
55  *
56  *       FMD_XPRT_ACCEPT   !FMD_XPRT_ACCEPT
57  *             |                 |
58  * waiting  +--v--+           +--v--+  waiting
59  * for syn  | SYN |--+     --+| ACK |  for ack
60  * event    +-----+   \   /   +-----+  event
61  *             |       \ /       |
62  * drop all +--v--+     X     +--v--+  send subscriptions,
63  * events   | ERR |<---+ +--->| SUB |  recv subscriptions,
64  *          +-----+           +-----+  wait for run event
65  *             ^                 |
66  *             |     +-----+     |
67  *             +-----| RUN |<----+
68  *                   +--^--+
69  *                      |
70  *               FMD_XPRT_RDONLY
71  *
72  * When fmd_xprt_open() is called without FMD_XPRT_ACCEPT, the Common Transport
73  * Layer enqueues a "syn" event for the module in its event queue and sets the
74  * state to ACK.  In state ACK, we are waiting for the transport to get an
75  * "ack" event and call fmd_xprt_post() on this event.  Other events will be
76  * discarded.  If an "ack" is received, we transition to state SUB.  If a
77  * configurable timeout occurs or if the "ack" is invalid (e.g. invalid version
78  * exchange), we transition to state ERR.  Once in state ERR, no further
79  * operations are valid except fmd_xprt_close() and fmd_xprt_error() will
80  * return a non-zero value to the caller indicating the transport has failed.
81  *
82  * When fmd_xprt_open() is called with FMD_XPRT_ACCEPT, the Common Transport
83  * Layer assumes this transport is being used to accept a virtual connection
84  * from a remote peer that is sending a "syn", and sets the initial state to
85  * SYN.  In this state, the transport waits for a "syn" event, validates it,
86  * and then transitions to state SUB if it is valid or state ERR if it is not.
87  *
88  * Once in state SUB, the transport module is expected to receive a sequence of
89  * zero or more "subscribe" events from the remote peer, followed by a "run"
90  * event.  Once in state RUN, the transport is active and any events can be
91  * sent or received.  The transport module is free to call fmd_xprt_close()
92  * from any state.  The fmd_xprt_error() function will return zero if the
93  * transport is not in the ERR state, or non-zero if it is in the ERR state.
94  *
95  * Once the state machine reaches RUN, other FMA protocol events can be sent
96  * and received across the transport in addition to the various control events.
97  *
98  * Table of Common Transport Layer Control Events
99  * ==============================================
100  *
101  * FMA Class                     Payload
102  * ---------                     -------
103  * resource.fm.xprt.uuclose      string (uuid of case)
104  * resource.fm.xprt.uuresolved   string (uuid of case)
105  * resource.fm.xprt.updated      string (uuid of case)
106  * resource.fm.xprt.subscribe    string (class pattern)
107  * resource.fm.xprt.unsubscribe  string (class pattern)
108  * resource.fm.xprt.unsuback     string (class pattern)
109  * resource.fm.xprt.syn          version information
110  * resource.fm.xprt.ack          version information
111  * resource.fm.xprt.run          version information
112  *
113  * Control events are used to add and delete proxy subscriptions on the remote
114  * transport peer module, and to set up connections.  When a "syn" event is
115  * sent, FMD will include in the payload the highest version of the FMA event
116  * protocol that is supported by the sender.  When a "syn" event is received,
117  * the receiving FMD will use the minimum of this version and its version of
118  * the protocol, and reply with this new minimum version in the "ack" event.
119  * The receiver will then use this new minimum for subsequent event semantics.
120  */
121 
122 #include <sys/fm/protocol.h>
123 #include <strings.h>
124 #include <limits.h>
125 
126 #include <fmd_alloc.h>
127 #include <fmd_error.h>
128 #include <fmd_conf.h>
129 #include <fmd_subr.h>
130 #include <fmd_string.h>
131 #include <fmd_protocol.h>
132 #include <fmd_thread.h>
133 #include <fmd_eventq.h>
134 #include <fmd_dispq.h>
135 #include <fmd_ctl.h>
136 #include <fmd_log.h>
137 #include <fmd_ustat.h>
138 #include <fmd_case.h>
139 #include <fmd_api.h>
140 #include <fmd_fmri.h>
141 #include <fmd_asru.h>
142 #include <fmd_xprt.h>
143 
144 #include <fmd.h>
145 
146 /*
147  * The states shown above in the transport state machine diagram are encoded
148  * using arrays of class patterns and a corresponding action function.  These
149  * arrays are then passed to fmd_xprt_transition() to change transport states.
150  */
151 
152 const fmd_xprt_rule_t _fmd_xprt_state_syn[] = {
153 { "resource.fm.xprt.syn", fmd_xprt_event_syn },
154 { "*", fmd_xprt_event_error },
155 { NULL, NULL }
156 };
157 
158 const fmd_xprt_rule_t _fmd_xprt_state_ack[] = {
159 { "resource.fm.xprt.ack", fmd_xprt_event_ack },
160 { "*", fmd_xprt_event_error },
161 };
162 
163 const fmd_xprt_rule_t _fmd_xprt_state_err[] = {
164 { "*", fmd_xprt_event_drop },
165 { NULL, NULL }
166 };
167 
168 const fmd_xprt_rule_t _fmd_xprt_state_sub[] = {
169 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
170 { "resource.fm.xprt.run", fmd_xprt_event_run },
171 { "resource.fm.xprt.*", fmd_xprt_event_error },
172 { "*", fmd_xprt_event_drop },
173 { NULL, NULL }
174 };
175 
176 const fmd_xprt_rule_t _fmd_xprt_state_run[] = {
177 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
178 { "resource.fm.xprt.unsubscribe", fmd_xprt_event_unsub },
179 { "resource.fm.xprt.unsuback", fmd_xprt_event_unsuback },
180 { "resource.fm.xprt.uuclose", fmd_xprt_event_uuclose },
181 { "resource.fm.xprt.uuresolved", fmd_xprt_event_uuresolved },
182 { "resource.fm.xprt.updated", fmd_xprt_event_updated },
183 { "resource.fm.xprt.*", fmd_xprt_event_error },
184 { NULL, NULL }
185 };
186 
187 /*
188  * Template for per-transport statistics installed by fmd on behalf of each
189  * transport.  These are used to initialize the per-transport xi_stats.  For
190  * each statistic, the name is prepended with "fmd.xprt.%u", where %u is the
191  * transport ID (xi_id) and then are inserted into the per-module stats hash.
192  * The values in this array must match fmd_xprt_stat_t from <fmd_xprt.h>.
193  */
194 static const fmd_xprt_stat_t _fmd_xprt_stat_tmpl = {
195 {
196 { "dispatched", FMD_TYPE_UINT64, "total events dispatched to transport" },
197 { "dequeued", FMD_TYPE_UINT64, "total events dequeued by transport" },
198 { "prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by transport" },
199 { "dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" },
200 { "wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" },
201 { "wtime", FMD_TYPE_TIME, "total wait time on queue" },
202 { "wlentime", FMD_TYPE_TIME, "total wait length * time product" },
203 { "wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" },
204 { "dtime", FMD_TYPE_TIME, "total processing time after dequeue" },
205 { "dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" },
206 },
207 { "module", FMD_TYPE_STRING, "module that owns this transport" },
208 { "authority", FMD_TYPE_STRING, "authority associated with this transport" },
209 { "state", FMD_TYPE_STRING, "current transport state" },
210 { "received", FMD_TYPE_UINT64, "events received by transport" },
211 { "discarded", FMD_TYPE_UINT64, "bad events discarded by transport" },
212 { "retried", FMD_TYPE_UINT64, "retries requested of transport" },
213 { "replayed", FMD_TYPE_UINT64, "events replayed by transport" },
214 { "lost", FMD_TYPE_UINT64, "events lost by transport" },
215 { "timeouts", FMD_TYPE_UINT64, "events received by transport with ttl=0" },
216 { "subscriptions", FMD_TYPE_UINT64, "subscriptions registered to transport" },
217 };
218 
219 static void
220 fmd_xprt_class_hash_create(fmd_xprt_class_hash_t *xch, fmd_eventq_t *eq)
221 {
222 	uint_t hashlen = fmd.d_str_buckets;
223 
224 	xch->xch_queue = eq;
225 	xch->xch_hashlen = hashlen;
226 	xch->xch_hash = fmd_zalloc(sizeof (void *) * hashlen, FMD_SLEEP);
227 }
228 
229 static void
230 fmd_xprt_class_hash_destroy(fmd_xprt_class_hash_t *xch)
231 {
232 	fmd_eventq_t *eq = xch->xch_queue;
233 	fmd_xprt_class_t *xcp, *ncp;
234 	uint_t i;
235 
236 	for (i = 0; i < xch->xch_hashlen; i++) {
237 		for (xcp = xch->xch_hash[i]; xcp != NULL; xcp = ncp) {
238 			ncp = xcp->xc_next;
239 
240 			if (eq != NULL)
241 				fmd_dispq_delete(fmd.d_disp, eq, xcp->xc_class);
242 
243 			fmd_strfree(xcp->xc_class);
244 			fmd_free(xcp, sizeof (fmd_xprt_class_t));
245 		}
246 	}
247 
248 	fmd_free(xch->xch_hash, sizeof (void *) * xch->xch_hashlen);
249 }
250 
251 /*
252  * Insert the specified class into the specified class hash, and return the
253  * reference count.  A return value of one indicates this is the first insert.
254  * If an eventq is associated with the hash, insert a dispq subscription for it.
255  */
256 static uint_t
257 fmd_xprt_class_hash_insert(fmd_xprt_impl_t *xip,
258     fmd_xprt_class_hash_t *xch, const char *class)
259 {
260 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
261 	fmd_xprt_class_t *xcp;
262 
263 	ASSERT(MUTEX_HELD(&xip->xi_lock));
264 
265 	for (xcp = xch->xch_hash[h]; xcp != NULL; xcp = xcp->xc_next) {
266 		if (strcmp(class, xcp->xc_class) == 0)
267 			return (++xcp->xc_refs);
268 	}
269 
270 	xcp = fmd_alloc(sizeof (fmd_xprt_class_t), FMD_SLEEP);
271 	xcp->xc_class = fmd_strdup(class, FMD_SLEEP);
272 	xcp->xc_next = xch->xch_hash[h];
273 	xcp->xc_refs = 1;
274 	xch->xch_hash[h] = xcp;
275 
276 	if (xch->xch_queue != NULL)
277 		fmd_dispq_insert(fmd.d_disp, xch->xch_queue, class);
278 
279 	return (xcp->xc_refs);
280 }
281 
282 /*
283  * Delete the specified class from the specified class hash, and return the
284  * reference count.  A return value of zero indicates the class was deleted.
285  * If an eventq is associated with the hash, delete the dispq subscription.
286  */
287 static uint_t
288 fmd_xprt_class_hash_delete(fmd_xprt_impl_t *xip,
289     fmd_xprt_class_hash_t *xch, const char *class)
290 {
291 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
292 	fmd_xprt_class_t *xcp, **pp;
293 
294 	ASSERT(MUTEX_HELD(&xip->xi_lock));
295 	pp = &xch->xch_hash[h];
296 
297 	for (xcp = *pp; xcp != NULL; xcp = xcp->xc_next) {
298 		if (strcmp(class, xcp->xc_class) == 0)
299 			break;
300 		else
301 			pp = &xcp->xc_next;
302 	}
303 
304 	if (xcp == NULL)
305 		return (-1U); /* explicitly permit an invalid delete */
306 
307 	if (--xcp->xc_refs != 0)
308 		return (xcp->xc_refs);
309 
310 	ASSERT(xcp->xc_refs == 0);
311 	*pp = xcp->xc_next;
312 
313 	fmd_strfree(xcp->xc_class);
314 	fmd_free(xcp, sizeof (fmd_xprt_class_t));
315 
316 	if (xch->xch_queue != NULL)
317 		fmd_dispq_delete(fmd.d_disp, xch->xch_queue, class);
318 
319 	return (0);
320 }
321 
322 /*
323  * Queue subscribe events for the specified transport corresponding to all of
324  * the active module subscriptions.  This is an extremely heavyweight operation
325  * that we expect to take place rarely (i.e. when loading a transport module
326  * or when it establishes a connection).  We lock all of the known modules to
327  * prevent them from adding or deleting subscriptions, then snapshot their
328  * subscriptions, and then unlock all of the modules.  We hold the modhash
329  * lock for the duration of this operation to prevent new modules from loading.
330  */
331 static void
332 fmd_xprt_subscribe_modhash(fmd_xprt_impl_t *xip, fmd_modhash_t *mhp)
333 {
334 	fmd_xprt_t *xp = (fmd_xprt_t *)xip;
335 	const fmd_conf_path_t *pap;
336 	fmd_module_t *mp;
337 	uint_t i, j;
338 
339 	(void) pthread_rwlock_rdlock(&mhp->mh_lock);
340 
341 	for (i = 0; i < mhp->mh_hashlen; i++) {
342 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
343 			fmd_module_lock(mp);
344 	}
345 
346 	(void) pthread_mutex_lock(&xip->xi_lock);
347 	ASSERT(!(xip->xi_flags & FMD_XPRT_SUBSCRIBER));
348 	xip->xi_flags |= FMD_XPRT_SUBSCRIBER;
349 	(void) pthread_mutex_unlock(&xip->xi_lock);
350 
351 	for (i = 0; i < mhp->mh_hashlen; i++) {
352 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) {
353 			(void) fmd_conf_getprop(mp->mod_conf,
354 			    FMD_PROP_SUBSCRIPTIONS, &pap);
355 			for (j = 0; j < pap->cpa_argc; j++)
356 				fmd_xprt_subscribe(xp, pap->cpa_argv[j]);
357 		}
358 	}
359 
360 	for (i = 0; i < mhp->mh_hashlen; i++) {
361 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
362 			fmd_module_unlock(mp);
363 	}
364 
365 	(void) pthread_rwlock_unlock(&mhp->mh_lock);
366 }
367 
368 static void
369 fmd_xprt_transition(fmd_xprt_impl_t *xip,
370     const fmd_xprt_rule_t *state, const char *tag)
371 {
372 	fmd_xprt_t *xp = (fmd_xprt_t *)xip;
373 	fmd_event_t *e;
374 	nvlist_t *nvl;
375 	char *s;
376 
377 	TRACE((FMD_DBG_XPRT, "xprt %u -> %s\n", xip->xi_id, tag));
378 
379 	xip->xi_state = state;
380 	s = fmd_strdup(tag, FMD_SLEEP);
381 
382 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
383 	fmd_strfree(xip->xi_stats->xs_state.fmds_value.str);
384 	xip->xi_stats->xs_state.fmds_value.str = s;
385 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
386 
387 	/*
388 	 * If we've reached the SUB state, take out the big hammer and snapshot
389 	 * all of the subscriptions of all of the loaded modules.  Then queue a
390 	 * run event for our remote peer indicating that it can enter RUN.
391 	 */
392 	if (state == _fmd_xprt_state_sub) {
393 		fmd_xprt_subscribe_modhash(xip, fmd.d_mod_hash);
394 
395 		/*
396 		 * For read-write transports, we always want to set up remote
397 		 * subscriptions to the bultin list.* events, regardless of
398 		 * whether any agents have subscribed to them.
399 		 */
400 		if (xip->xi_flags & FMD_XPRT_RDWR) {
401 			fmd_xprt_subscribe(xp, FM_LIST_SUSPECT_CLASS);
402 			fmd_xprt_subscribe(xp, FM_LIST_ISOLATED_CLASS);
403 			fmd_xprt_subscribe(xp, FM_LIST_UPDATED_CLASS);
404 			fmd_xprt_subscribe(xp, FM_LIST_RESOLVED_CLASS);
405 			fmd_xprt_subscribe(xp, FM_LIST_REPAIRED_CLASS);
406 		}
407 
408 		nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
409 		    "resource.fm.xprt.run", xip->xi_version);
410 
411 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
412 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
413 		fmd_eventq_insert_at_time(xip->xi_queue, e);
414 	}
415 }
416 
417 static void
418 fmd_xprt_authupdate(fmd_xprt_impl_t *xip)
419 {
420 	char *s = fmd_fmri_auth2str(xip->xi_auth);
421 
422 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
423 	fmd_strfree(xip->xi_stats->xs_authority.fmds_value.str);
424 	xip->xi_stats->xs_authority.fmds_value.str = s;
425 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
426 }
427 
428 static int
429 fmd_xprt_vmismatch(fmd_xprt_impl_t *xip, nvlist_t *nvl, uint_t *rversionp)
430 {
431 	uint8_t rversion;
432 
433 	if (nvlist_lookup_uint8(nvl, FM_VERSION, &rversion) != 0) {
434 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
435 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
436 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
437 
438 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
439 		return (1);
440 	}
441 
442 	if (rversion > xip->xi_version) {
443 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u protocol mismatch: %u>%u\n",
444 		    xip->xi_id, rversion, xip->xi_version);
445 
446 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
447 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
448 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
449 
450 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
451 		return (1);
452 	}
453 
454 	if (rversionp != NULL)
455 		*rversionp = rversion;
456 
457 	return (0);
458 }
459 
460 void
461 fmd_xprt_event_syn(fmd_xprt_impl_t *xip, nvlist_t *nvl)
462 {
463 	fmd_event_t *e;
464 	uint_t vers;
465 	char *class;
466 
467 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
468 		return; /* transitioned to error state */
469 
470 	/*
471 	 * If the transport module didn't specify an authority, extract the
472 	 * one that is passed along with the xprt.syn event and use that.
473 	 */
474 	if (xip->xi_auth == NULL &&
475 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
476 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
477 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
478 		fmd_xprt_authupdate(xip);
479 	}
480 
481 	nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
482 	    "resource.fm.xprt.ack", xip->xi_version);
483 
484 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
485 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
486 	fmd_eventq_insert_at_time(xip->xi_queue, e);
487 
488 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
489 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
490 }
491 
492 void
493 fmd_xprt_event_ack(fmd_xprt_impl_t *xip, nvlist_t *nvl)
494 {
495 	uint_t vers;
496 
497 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
498 		return; /* transitioned to error state */
499 
500 	/*
501 	 * If the transport module didn't specify an authority, extract the
502 	 * one that is passed along with the xprt.syn event and use that.
503 	 */
504 	if (xip->xi_auth == NULL &&
505 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
506 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
507 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
508 		fmd_xprt_authupdate(xip);
509 	}
510 
511 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
512 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
513 }
514 
515 /*
516  * Upon transition to RUN, we take every solved case and resend a list.suspect
517  * event for it to our remote peer.  If a case transitions from solved to a
518  * future state (CLOSE_WAIT, CLOSED, or REPAIRED) while we are iterating over
519  * the case hash, we will get it as part of examining the resource cache, next.
520  */
521 static void
522 fmd_xprt_send_case(fmd_case_t *cp, void *arg)
523 {
524 	fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
525 	fmd_xprt_impl_t *xip = arg;
526 
527 	fmd_event_t *e;
528 	nvlist_t *nvl;
529 	char *class;
530 
531 	if (cip->ci_state == FMD_CASE_UNSOLVED)
532 		return;
533 
534 	nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS);
535 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
536 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
537 
538 	fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n",
539 	    FM_LIST_SUSPECT_CLASS, cip->ci_uuid, xip->xi_id);
540 
541 	fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid);
542 }
543 
544 void
545 fmd_xprt_event_run(fmd_xprt_impl_t *xip, nvlist_t *nvl)
546 {
547 	if (!fmd_xprt_vmismatch(xip, nvl, NULL)) {
548 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
549 		fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case, xip);
550 	}
551 }
552 
553 void
554 fmd_xprt_event_sub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
555 {
556 	char *class;
557 
558 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
559 		return; /* transitioned to error state */
560 
561 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
562 		return; /* malformed protocol event */
563 
564 	(void) pthread_mutex_lock(&xip->xi_lock);
565 	(void) fmd_xprt_class_hash_insert(xip, &xip->xi_lsub, class);
566 	(void) pthread_mutex_unlock(&xip->xi_lock);
567 
568 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
569 	xip->xi_stats->xs_subscriptions.fmds_value.ui64++;
570 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
571 }
572 
573 void
574 fmd_xprt_event_unsub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
575 {
576 	fmd_event_t *e;
577 	char *class;
578 
579 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
580 		return; /* transitioned to error state */
581 
582 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
583 		return; /* malformed protocol event */
584 
585 	(void) pthread_mutex_lock(&xip->xi_lock);
586 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_lsub, class);
587 	(void) pthread_mutex_unlock(&xip->xi_lock);
588 
589 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
590 	xip->xi_stats->xs_subscriptions.fmds_value.ui64--;
591 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
592 
593 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
594 	    "resource.fm.xprt.unsuback", xip->xi_version, class);
595 
596 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
597 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
598 	fmd_eventq_insert_at_time(xip->xi_queue, e);
599 }
600 
601 void
602 fmd_xprt_event_unsuback(fmd_xprt_impl_t *xip, nvlist_t *nvl)
603 {
604 	char *class;
605 
606 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
607 		return; /* transitioned to error state */
608 
609 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
610 		return; /* malformed protocol event */
611 
612 	(void) pthread_mutex_lock(&xip->xi_lock);
613 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_usub, class);
614 	(void) pthread_mutex_unlock(&xip->xi_lock);
615 }
616 
617 /*
618  * on diagnosing side, receive a uuclose from the proxy.
619  */
620 void
621 fmd_xprt_event_uuclose(fmd_xprt_impl_t *xip, nvlist_t *nvl)
622 {
623 	fmd_case_t *cp;
624 	char *uuid;
625 
626 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
627 		return; /* transitioned to error state */
628 
629 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
630 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
631 		/*
632 		 * update resource cache status and transition case
633 		 */
634 		fmd_case_close_status(cp);
635 		fmd_case_transition(cp, FMD_CASE_CLOSE_WAIT, FMD_CF_ISOLATED);
636 		fmd_case_rele(cp);
637 	}
638 }
639 
640 /*
641  * on diagnosing side, receive a uuresolved from the proxy.
642  */
643 void
644 fmd_xprt_event_uuresolved(fmd_xprt_impl_t *xip, nvlist_t *nvl)
645 {
646 	fmd_case_t *cp;
647 	char *uuid;
648 
649 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
650 		return; /* transitioned to error state */
651 
652 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
653 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
654 		fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
655 
656 		fmd_case_transition(cp, (cip->ci_state == FMD_CASE_REPAIRED) ?
657 		    FMD_CASE_RESOLVED : (cip->ci_state == FMD_CASE_CLOSED) ?
658 		    FMD_CASE_REPAIRED : FMD_CASE_CLOSE_WAIT, FMD_CF_RESOLVED);
659 		fmd_case_rele(cp);
660 	}
661 }
662 
663 /*
664  * on diagnosing side, receive a repair/acquit from the proxy.
665  */
666 void
667 fmd_xprt_event_updated(fmd_xprt_impl_t *xip, nvlist_t *nvl)
668 {
669 	fmd_case_t *cp;
670 	char *uuid;
671 
672 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
673 		return; /* transitioned to error state */
674 
675 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
676 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
677 		uint8_t *statusp, *proxy_asrup = NULL;
678 		uint_t nelem = 0;
679 
680 		/*
681 		 * Only update status with new repairs if "no remote repair"
682 		 * is not set. Do the case_update anyway though (as this will
683 		 * refresh the status on the proxy side).
684 		 */
685 		if (!(xip->xi_flags & FMD_XPRT_NO_REMOTE_REPAIR)) {
686 			if (nvlist_lookup_uint8_array(nvl,
687 			    FM_RSRC_XPRT_FAULT_STATUS, &statusp, &nelem) == 0 &&
688 			    nelem != 0) {
689 				(void) nvlist_lookup_uint8_array(nvl,
690 				    FM_RSRC_XPRT_FAULT_HAS_ASRU, &proxy_asrup,
691 				    &nelem);
692 				fmd_case_update_status(cp, statusp,
693 				    proxy_asrup, NULL);
694 			}
695 			fmd_case_update_containees(cp);
696 		}
697 		fmd_case_update(cp);
698 		fmd_case_rele(cp);
699 	}
700 }
701 
702 void
703 fmd_xprt_event_error(fmd_xprt_impl_t *xip, nvlist_t *nvl)
704 {
705 	char *class = "<unknown>";
706 
707 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
708 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
709 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
710 
711 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
712 	TRACE((FMD_DBG_XPRT, "xprt %u bad event %s\n", xip->xi_id, class));
713 
714 	fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
715 }
716 
717 void
718 fmd_xprt_event_drop(fmd_xprt_impl_t *xip, nvlist_t *nvl)
719 {
720 	char *class = "<unknown>";
721 
722 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
723 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
724 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
725 
726 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
727 	TRACE((FMD_DBG_XPRT, "xprt %u drop event %s\n", xip->xi_id, class));
728 
729 }
730 
731 fmd_xprt_t *
732 fmd_xprt_create(fmd_module_t *mp, uint_t flags, nvlist_t *auth, void *data)
733 {
734 	fmd_xprt_impl_t *xip = fmd_zalloc(sizeof (fmd_xprt_impl_t), FMD_SLEEP);
735 	fmd_stat_t *statv;
736 	uint_t i, statc;
737 
738 	char buf[PATH_MAX];
739 	fmd_event_t *e;
740 	nvlist_t *nvl;
741 	char *s;
742 
743 	(void) pthread_mutex_init(&xip->xi_lock, NULL);
744 	(void) pthread_cond_init(&xip->xi_cv, NULL);
745 	(void) pthread_mutex_init(&xip->xi_stats_lock, NULL);
746 
747 	xip->xi_auth = auth;
748 	xip->xi_data = data;
749 	xip->xi_version = FM_RSRC_XPRT_VERSION;
750 	xip->xi_flags = flags;
751 
752 	/*
753 	 * Grab fmd.d_xprt_lock to block fmd_xprt_suspend_all() and then create
754 	 * a transport ID and make it visible in fmd.d_xprt_ids.  If transports
755 	 * were previously suspended, set the FMD_XPRT_DSUSPENDED flag on us to
756 	 * ensure that this transport will not run until fmd_xprt_resume_all().
757 	 */
758 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
759 	xip->xi_id = fmd_idspace_alloc(fmd.d_xprt_ids, xip);
760 
761 	if (fmd.d_xprt_suspend != 0)
762 		xip->xi_flags |= FMD_XPRT_DSUSPENDED;
763 
764 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
765 
766 	/*
767 	 * If the module has not yet finished _fmd_init(), set the ISUSPENDED
768 	 * bit so that fmdo_send() is not called until _fmd_init() completes.
769 	 */
770 	if (!(mp->mod_flags & FMD_MOD_INIT))
771 		xip->xi_flags |= FMD_XPRT_ISUSPENDED;
772 
773 	/*
774 	 * Initialize the transport statistics that we keep on behalf of fmd.
775 	 * These are set up using a template defined at the top of this file.
776 	 * We rename each statistic with a prefix ensuring its uniqueness.
777 	 */
778 	statc = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
779 	statv = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
780 	bcopy(&_fmd_xprt_stat_tmpl, statv, sizeof (_fmd_xprt_stat_tmpl));
781 
782 	for (i = 0; i < statc; i++) {
783 		(void) snprintf(statv[i].fmds_name,
784 		    sizeof (statv[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
785 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
786 	}
787 
788 	xip->xi_stats = (fmd_xprt_stat_t *)fmd_ustat_insert(
789 	    mp->mod_ustat, FMD_USTAT_NOALLOC, statc, statv, NULL);
790 
791 	if (xip->xi_stats == NULL)
792 		fmd_panic("failed to create xi_stats (%p)\n", (void *)statv);
793 
794 	xip->xi_stats->xs_module.fmds_value.str =
795 	    fmd_strdup(mp->mod_name, FMD_SLEEP);
796 
797 	if (xip->xi_auth != NULL)
798 		fmd_xprt_authupdate(xip);
799 
800 	/*
801 	 * Create the outbound eventq for this transport and link to its stats.
802 	 * If any suspend bits were set above, suspend the eventq immediately.
803 	 */
804 	xip->xi_queue = fmd_eventq_create(mp, &xip->xi_stats->xs_evqstat,
805 	    &xip->xi_stats_lock, mp->mod_stats->ms_xprtqlimit.fmds_value.ui32);
806 
807 	if (xip->xi_flags & FMD_XPRT_SMASK)
808 		fmd_eventq_suspend(xip->xi_queue);
809 
810 	/*
811 	 * Create our subscription hashes: local subscriptions go to xi_queue,
812 	 * remote subscriptions are tracked only for protocol requests, and
813 	 * pending unsubscriptions are associated with the /dev/null eventq.
814 	 */
815 	fmd_xprt_class_hash_create(&xip->xi_lsub, xip->xi_queue);
816 	fmd_xprt_class_hash_create(&xip->xi_rsub, NULL);
817 	fmd_xprt_class_hash_create(&xip->xi_usub, fmd.d_rmod->mod_queue);
818 
819 	/*
820 	 * Determine our initial state based upon the creation flags.  If we're
821 	 * read-only, go directly to RUN.  If we're accepting a new connection,
822 	 * wait for a SYN.  Otherwise send a SYN and wait for an ACK.
823 	 */
824 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
825 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
826 	else if (flags & FMD_XPRT_ACCEPT)
827 		fmd_xprt_transition(xip, _fmd_xprt_state_syn, "SYN");
828 	else
829 		fmd_xprt_transition(xip, _fmd_xprt_state_ack, "ACK");
830 
831 	/*
832 	 * If client.xprtlog is set to TRUE, create a debugging log for the
833 	 * events received by the transport in var/fm/fmd/xprt/.
834 	 */
835 	(void) fmd_conf_getprop(fmd.d_conf, "client.xprtlog", &i);
836 	(void) fmd_conf_getprop(fmd.d_conf, "log.xprt", &s);
837 
838 	if (i) {
839 		(void) snprintf(buf, sizeof (buf), "%s/%u.log", s, xip->xi_id);
840 		xip->xi_log = fmd_log_open(fmd.d_rootdir, buf, FMD_LOG_XPRT);
841 	}
842 
843 	ASSERT(fmd_module_locked(mp));
844 	fmd_list_append(&mp->mod_transports, xip);
845 
846 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
847 	mp->mod_stats->ms_xprtopen.fmds_value.ui32++;
848 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
849 
850 	/*
851 	 * If this is a read-only transport, return without creating a send
852 	 * queue thread and setting up any connection events in our queue.
853 	 */
854 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
855 		goto out;
856 
857 	/*
858 	 * Once the transport is fully initialized, create a send queue thread
859 	 * and start any connect events flowing to complete our initialization.
860 	 */
861 	if ((xip->xi_thread = fmd_thread_create(mp,
862 	    (fmd_thread_f *)fmd_xprt_send, xip)) == NULL) {
863 
864 		fmd_error(EFMD_XPRT_THR,
865 		    "failed to create thread for transport %u", xip->xi_id);
866 
867 		fmd_xprt_destroy((fmd_xprt_t *)xip);
868 		(void) fmd_set_errno(EFMD_XPRT_THR);
869 		return (NULL);
870 	}
871 
872 	/*
873 	 * If the transport is not being opened to accept an inbound connect,
874 	 * start an outbound connection by enqueuing a SYN event for our peer.
875 	 */
876 	if (!(flags & FMD_XPRT_ACCEPT)) {
877 		nvl = fmd_protocol_xprt_ctl(mp,
878 		    "resource.fm.xprt.syn", FM_RSRC_XPRT_VERSION);
879 
880 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
881 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
882 		fmd_eventq_insert_at_time(xip->xi_queue, e);
883 	}
884 out:
885 	fmd_dprintf(FMD_DBG_XPRT, "opened transport %u\n", xip->xi_id);
886 	return ((fmd_xprt_t *)xip);
887 }
888 
889 void
890 fmd_xprt_destroy(fmd_xprt_t *xp)
891 {
892 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
893 	fmd_module_t *mp = xip->xi_queue->eq_mod;
894 	uint_t id = xip->xi_id;
895 
896 	fmd_case_impl_t *cip, *nip;
897 	fmd_stat_t *sp;
898 	uint_t i, n;
899 
900 	ASSERT(fmd_module_locked(mp));
901 	fmd_list_delete(&mp->mod_transports, xip);
902 
903 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
904 	mp->mod_stats->ms_xprtopen.fmds_value.ui32--;
905 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
906 
907 	(void) pthread_mutex_lock(&xip->xi_lock);
908 
909 	while (xip->xi_busy != 0)
910 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
911 
912 	/*
913 	 * Remove the transport from global visibility, cancel its send-side
914 	 * thread, join with it, and then remove the transport from module
915 	 * visibility.  Once all this is done, destroy and free the transport.
916 	 */
917 	(void) fmd_idspace_free(fmd.d_xprt_ids, xip->xi_id);
918 
919 	if (xip->xi_thread != NULL) {
920 		fmd_eventq_abort(xip->xi_queue);
921 		fmd_module_unlock(mp);
922 		fmd_thread_destroy(xip->xi_thread, FMD_THREAD_JOIN);
923 		fmd_module_lock(mp);
924 	}
925 
926 	if (xip->xi_log != NULL)
927 		fmd_log_rele(xip->xi_log);
928 
929 	/*
930 	 * Release every case handle in the module that was cached by this
931 	 * transport.  This will result in these cases disappearing from the
932 	 * local case hash so that fmd_case_uuclose() and fmd_case_repaired()
933 	 * etc can no longer be used.
934 	 */
935 	for (cip = fmd_list_next(&mp->mod_cases); cip != NULL; cip = nip) {
936 		nip = fmd_list_next(cip);
937 		if (cip->ci_xprt == xp)
938 			fmd_case_discard((fmd_case_t *)cip, B_TRUE);
939 	}
940 
941 	/*
942 	 * Destroy every class in the various subscription hashes and remove
943 	 * any corresponding subscriptions from the event dispatch queue.
944 	 */
945 	fmd_xprt_class_hash_destroy(&xip->xi_lsub);
946 	fmd_xprt_class_hash_destroy(&xip->xi_rsub);
947 	fmd_xprt_class_hash_destroy(&xip->xi_usub);
948 
949 	/*
950 	 * Uniquify the stat names exactly as was done in fmd_xprt_create()
951 	 * before calling fmd_ustat_insert(), otherwise fmd_ustat_delete()
952 	 * won't find the entries in the hash table.
953 	 */
954 	n = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
955 	sp = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
956 	bcopy(&_fmd_xprt_stat_tmpl, sp, sizeof (_fmd_xprt_stat_tmpl));
957 	for (i = 0; i < n; i++) {
958 		(void) snprintf(sp[i].fmds_name,
959 		    sizeof (sp[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
960 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
961 	}
962 	fmd_ustat_delete(mp->mod_ustat, n, sp);
963 	fmd_free(sp, sizeof (_fmd_xprt_stat_tmpl));
964 
965 	fmd_free(xip->xi_stats, sizeof (fmd_xprt_stat_t));
966 	fmd_eventq_destroy(xip->xi_queue);
967 	nvlist_free(xip->xi_auth);
968 	fmd_free(xip, sizeof (fmd_xprt_impl_t));
969 
970 	fmd_dprintf(FMD_DBG_XPRT, "closed transport %u\n", id);
971 }
972 
973 void
974 fmd_xprt_xsuspend(fmd_xprt_t *xp, uint_t flags)
975 {
976 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
977 	uint_t oflags;
978 
979 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
980 	(void) pthread_mutex_lock(&xip->xi_lock);
981 
982 	oflags = xip->xi_flags;
983 	xip->xi_flags |= flags;
984 
985 	if (!(oflags & FMD_XPRT_SMASK) && (xip->xi_flags & FMD_XPRT_SMASK) != 0)
986 		fmd_eventq_suspend(xip->xi_queue);
987 
988 	(void) pthread_cond_broadcast(&xip->xi_cv);
989 
990 	while (xip->xi_busy != 0)
991 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
992 
993 	(void) pthread_mutex_unlock(&xip->xi_lock);
994 }
995 
996 void
997 fmd_xprt_xresume(fmd_xprt_t *xp, uint_t flags)
998 {
999 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1000 	uint_t oflags;
1001 
1002 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
1003 	(void) pthread_mutex_lock(&xip->xi_lock);
1004 
1005 	oflags = xip->xi_flags;
1006 	xip->xi_flags &= ~flags;
1007 
1008 	if ((oflags & FMD_XPRT_SMASK) != 0 && !(xip->xi_flags & FMD_XPRT_SMASK))
1009 		fmd_eventq_resume(xip->xi_queue);
1010 
1011 	(void) pthread_cond_broadcast(&xip->xi_cv);
1012 	(void) pthread_mutex_unlock(&xip->xi_lock);
1013 }
1014 
1015 void
1016 fmd_xprt_send(fmd_xprt_t *xp)
1017 {
1018 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1019 	fmd_module_t *mp = xip->xi_queue->eq_mod;
1020 	fmd_event_t *ep;
1021 	int err;
1022 
1023 	while ((ep = fmd_eventq_delete(xip->xi_queue)) != NULL) {
1024 		if (FMD_EVENT_TTL(ep) == 0) {
1025 			fmd_event_rele(ep);
1026 			continue;
1027 		}
1028 
1029 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u sending %s\n",
1030 		    xip->xi_id, (char *)FMD_EVENT_DATA(ep));
1031 
1032 		err = mp->mod_ops->mop_transport(mp, xp, ep);
1033 		fmd_eventq_done(xip->xi_queue);
1034 
1035 		if (err == FMD_SEND_RETRY) {
1036 			fmd_eventq_insert_at_time(xip->xi_queue, ep);
1037 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1038 			xip->xi_stats->xs_retried.fmds_value.ui64++;
1039 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1040 		}
1041 
1042 		if (err != FMD_SEND_SUCCESS && err != FMD_SEND_RETRY) {
1043 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1044 			xip->xi_stats->xs_lost.fmds_value.ui64++;
1045 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1046 		}
1047 
1048 		fmd_event_rele(ep);
1049 	}
1050 }
1051 
1052 /*
1053  * This function creates a local suspect list. This is used when a suspect list
1054  * is created directly by an external source like fminject.
1055  */
1056 static void
1057 fmd_xprt_list_suspect_local(fmd_xprt_t *xp, nvlist_t *nvl)
1058 {
1059 	nvlist_t **nvlp;
1060 	nvlist_t *de_fmri, *de_fmri_dup = NULL;
1061 	int64_t *diag_time;
1062 	char *code = NULL;
1063 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1064 	fmd_case_t *cp;
1065 	uint_t nelem = 0, nelem2 = 0, i;
1066 
1067 	fmd_module_lock(xip->xi_queue->eq_mod);
1068 	cp = fmd_case_create(xip->xi_queue->eq_mod, NULL);
1069 	if (cp == NULL) {
1070 		fmd_module_unlock(xip->xi_queue->eq_mod);
1071 		return;
1072 	}
1073 
1074 	/*
1075 	 * copy diag_code if present
1076 	 */
1077 	(void) nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code);
1078 	if (code != NULL) {
1079 		fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1080 
1081 		cip->ci_precanned = 1;
1082 		fmd_case_setcode(cp, code);
1083 	}
1084 
1085 	/*
1086 	 * copy suspects
1087 	 */
1088 	(void) nvlist_lookup_nvlist_array(nvl, FM_SUSPECT_FAULT_LIST, &nvlp,
1089 	    &nelem);
1090 	for (i = 0; i < nelem; i++) {
1091 		nvlist_t *flt_copy, *asru = NULL, *fru = NULL, *rsrc = NULL;
1092 		topo_hdl_t *thp;
1093 		char *loc = NULL;
1094 		int err;
1095 
1096 		thp = fmd_fmri_topo_hold(TOPO_VERSION);
1097 		(void) nvlist_xdup(nvlp[i], &flt_copy, &fmd.d_nva);
1098 		(void) nvlist_lookup_nvlist(nvlp[i], FM_FAULT_RESOURCE, &rsrc);
1099 
1100 		/*
1101 		 * If no fru specified, get it from topo
1102 		 */
1103 		if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_FRU, &fru) != 0 &&
1104 		    rsrc && topo_fmri_fru(thp, rsrc, &fru, &err) == 0)
1105 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_FRU, fru);
1106 		/*
1107 		 * If no asru specified, get it from topo
1108 		 */
1109 		if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU, &asru) != 0 &&
1110 		    rsrc && topo_fmri_asru(thp, rsrc, &asru, &err) == 0)
1111 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_ASRU, asru);
1112 		/*
1113 		 * If no location specified, get it from topo
1114 		 */
1115 		if (nvlist_lookup_string(nvlp[i], FM_FAULT_LOCATION,
1116 		    &loc) != 0) {
1117 			if (fru && topo_fmri_label(thp, fru, &loc, &err) == 0)
1118 				(void) nvlist_add_string(flt_copy,
1119 				    FM_FAULT_LOCATION, loc);
1120 			else if (rsrc && topo_fmri_label(thp, rsrc, &loc,
1121 			    &err) == 0)
1122 				(void) nvlist_add_string(flt_copy,
1123 				    FM_FAULT_LOCATION, loc);
1124 			if (loc)
1125 				topo_hdl_strfree(thp, loc);
1126 		}
1127 		if (fru)
1128 			nvlist_free(fru);
1129 		if (asru)
1130 			nvlist_free(asru);
1131 		if (rsrc)
1132 			nvlist_free(rsrc);
1133 		fmd_fmri_topo_rele(thp);
1134 		fmd_case_insert_suspect(cp, flt_copy);
1135 	}
1136 
1137 	/*
1138 	 * copy diag_time if present
1139 	 */
1140 	if (nvlist_lookup_int64_array(nvl, FM_SUSPECT_DIAG_TIME, &diag_time,
1141 	    &nelem2) == 0 && nelem2 >= 2)
1142 		fmd_case_settime(cp, diag_time[0], diag_time[1]);
1143 
1144 	/*
1145 	 * copy DE fmri if present
1146 	 */
1147 	if (nvlist_lookup_nvlist(nvl, FM_SUSPECT_DE, &de_fmri) == 0) {
1148 		(void) nvlist_xdup(de_fmri, &de_fmri_dup, &fmd.d_nva);
1149 		fmd_case_set_de_fmri(cp, de_fmri_dup);
1150 	}
1151 
1152 	fmd_case_transition(cp, FMD_CASE_SOLVED, FMD_CF_SOLVED);
1153 	fmd_module_unlock(xip->xi_queue->eq_mod);
1154 }
1155 
1156 /*
1157  * This function is called to create a proxy case on receipt of a list.suspect
1158  * from the diagnosing side of the transport.
1159  */
1160 static void
1161 fmd_xprt_list_suspect(fmd_xprt_t *xp, nvlist_t *nvl)
1162 {
1163 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1164 	nvlist_t **nvlp;
1165 	uint_t nelem = 0, nelem2 = 0, i;
1166 	int64_t *diag_time;
1167 	topo_hdl_t *thp;
1168 	char *class;
1169 	nvlist_t *rsrc, *asru, *de_fmri, *de_fmri_dup = NULL;
1170 	nvlist_t *flt_copy;
1171 	int err;
1172 	nvlist_t **asrua;
1173 	uint8_t *proxy_asru = NULL;
1174 	int got_proxy_asru = 0;
1175 	int got_hc_rsrc = 0;
1176 	int got_hc_asru = 0;
1177 	int got_present_rsrc = 0;
1178 	uint8_t *diag_asru = NULL;
1179 	char *scheme;
1180 	uint8_t *statusp;
1181 	char *uuid, *code;
1182 	fmd_case_t *cp;
1183 	fmd_case_impl_t *cip;
1184 	int need_update = 0;
1185 
1186 	if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) != 0)
1187 		return;
1188 	if (nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code) != 0)
1189 		return;
1190 	(void) nvlist_lookup_nvlist_array(nvl, FM_SUSPECT_FAULT_LIST, &nvlp,
1191 	    &nelem);
1192 
1193 	/*
1194 	 * In order to implement FMD_XPRT_HCONLY and FMD_XPRT_HC_PRESENT_ONLY
1195 	 * etc we first scan the suspects to see if
1196 	 * - there was an asru in the received fault
1197 	 * - there was an hc-scheme resource in the received fault
1198 	 * - any hc-scheme resource in the received fault is present in the
1199 	 *   local topology
1200 	 * - any hc-scheme resource in the received fault has an asru in the
1201 	 *   local topology
1202 	 */
1203 	if (nelem > 0) {
1204 		asrua = fmd_zalloc(sizeof (nvlist_t *) * nelem, FMD_SLEEP);
1205 		proxy_asru = fmd_zalloc(sizeof (uint8_t) * nelem, FMD_SLEEP);
1206 		diag_asru = fmd_zalloc(sizeof (uint8_t) * nelem, FMD_SLEEP);
1207 		thp = fmd_fmri_topo_hold(TOPO_VERSION);
1208 		for (i = 0; i < nelem; i++) {
1209 			if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU,
1210 			    &asru) == 0 && asru != NULL)
1211 				diag_asru[i] = 1;
1212 			if (nvlist_lookup_string(nvlp[i], FM_CLASS,
1213 			    &class) != 0 || strncmp(class, "fault", 5) != 0)
1214 				continue;
1215 			/*
1216 			 * If there is an hc-scheme asru, use that to find the
1217 			 * real asru. Otherwise if there is an hc-scheme
1218 			 * resource, work out the old asru from that.
1219 			 * This order is to allow a two stage evaluation
1220 			 * of the asru where a fault in the diagnosing side
1221 			 * is in a component not visible to the proxy side,
1222 			 * but prevents a component that is visible from
1223 			 * working. So the diagnosing side sets the asru to
1224 			 * the latter component (in hc-scheme as the diagnosing
1225 			 * side doesn't know about the proxy side's virtual
1226 			 * schemes), and then the proxy side can convert that
1227 			 * to a suitable virtual scheme asru.
1228 			 */
1229 			if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU,
1230 			    &asru) == 0 && asru != NULL &&
1231 			    nvlist_lookup_string(asru, FM_FMRI_SCHEME,
1232 			    &scheme) == 0 &&
1233 			    strcmp(scheme, FM_FMRI_SCHEME_HC) == 0) {
1234 				got_hc_asru = 1;
1235 				if (xip->xi_flags & FMD_XPRT_EXTERNAL)
1236 					continue;
1237 				if (topo_fmri_present(thp, asru, &err) != 0)
1238 					got_present_rsrc = 1;
1239 				if (topo_fmri_asru(thp, asru, &asrua[i],
1240 				    &err) == 0) {
1241 					proxy_asru[i] =
1242 					    FMD_PROXY_ASRU_FROM_ASRU;
1243 					got_proxy_asru = 1;
1244 				}
1245 			} else if (nvlist_lookup_nvlist(nvlp[i],
1246 			    FM_FAULT_RESOURCE, &rsrc) == 0 && rsrc != NULL &&
1247 			    nvlist_lookup_string(rsrc, FM_FMRI_SCHEME,
1248 			    &scheme) == 0 &&
1249 			    strcmp(scheme, FM_FMRI_SCHEME_HC) == 0) {
1250 				got_hc_rsrc = 1;
1251 				if (xip->xi_flags & FMD_XPRT_EXTERNAL)
1252 					continue;
1253 				if (topo_fmri_present(thp, rsrc, &err) != 0)
1254 					got_present_rsrc = 1;
1255 				if (topo_fmri_asru(thp, rsrc, &asrua[i],
1256 				    &err) == 0) {
1257 					proxy_asru[i] =
1258 					    FMD_PROXY_ASRU_FROM_RSRC;
1259 					got_proxy_asru = 1;
1260 				}
1261 			}
1262 		}
1263 		fmd_fmri_topo_rele(thp);
1264 	}
1265 
1266 	/*
1267 	 * If we're set up only to report hc-scheme faults, and
1268 	 * there aren't any, then just drop the event.
1269 	 */
1270 	if (got_hc_rsrc == 0 && got_hc_asru == 0 &&
1271 	    (xip->xi_flags & FMD_XPRT_HCONLY)) {
1272 		if (nelem > 0) {
1273 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1274 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1275 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1276 		}
1277 		return;
1278 	}
1279 
1280 	/*
1281 	 * If we're set up only to report locally present hc-scheme
1282 	 * faults, and there aren't any, then just drop the event.
1283 	 */
1284 	if (got_present_rsrc == 0 &&
1285 	    (xip->xi_flags & FMD_XPRT_HC_PRESENT_ONLY)) {
1286 		if (nelem > 0) {
1287 			for (i = 0; i < nelem; i++)
1288 				if (asrua[i])
1289 					nvlist_free(asrua[i]);
1290 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1291 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1292 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1293 		}
1294 		return;
1295 	}
1296 
1297 	/*
1298 	 * If fmd_case_recreate() returns NULL, UUID is already known.
1299 	 */
1300 	fmd_module_lock(xip->xi_queue->eq_mod);
1301 	if ((cp = fmd_case_recreate(xip->xi_queue->eq_mod, xp,
1302 	    FMD_CASE_UNSOLVED, uuid, code)) == NULL) {
1303 		if (nelem > 0) {
1304 			for (i = 0; i < nelem; i++)
1305 				if (asrua[i])
1306 					nvlist_free(asrua[i]);
1307 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1308 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1309 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1310 		}
1311 		fmd_module_unlock(xip->xi_queue->eq_mod);
1312 		return;
1313 	}
1314 
1315 	cip = (fmd_case_impl_t *)cp;
1316 	cip->ci_diag_asru = diag_asru;
1317 	cip->ci_proxy_asru = proxy_asru;
1318 	for (i = 0; i < nelem; i++) {
1319 		(void) nvlist_xdup(nvlp[i], &flt_copy, &fmd.d_nva);
1320 		if (proxy_asru[i] != FMD_PROXY_ASRU_NOT_NEEDED) {
1321 			/*
1322 			 * Copy suspects, but remove/replace asru first. Also if
1323 			 * the original asru was hc-scheme use that as resource.
1324 			 */
1325 			if (proxy_asru[i] == FMD_PROXY_ASRU_FROM_ASRU) {
1326 				(void) nvlist_remove(flt_copy,
1327 				    FM_FAULT_RESOURCE, DATA_TYPE_NVLIST);
1328 				(void) nvlist_lookup_nvlist(flt_copy,
1329 				    FM_FAULT_ASRU, &asru);
1330 				(void) nvlist_add_nvlist(flt_copy,
1331 				    FM_FAULT_RESOURCE, asru);
1332 			}
1333 			(void) nvlist_remove(flt_copy, FM_FAULT_ASRU,
1334 			    DATA_TYPE_NVLIST);
1335 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_ASRU,
1336 			    asrua[i]);
1337 			nvlist_free(asrua[i]);
1338 		} else if (got_hc_asru == 0 &&
1339 		    nvlist_lookup_nvlist(flt_copy, FM_FAULT_ASRU,
1340 		    &asru) == 0 && asru != NULL) {
1341 			/*
1342 			 * If we have an asru from diag side, but it's not
1343 			 * in hc scheme, then we can't be sure what it
1344 			 * represents, so mark as no retire.
1345 			 */
1346 			(void) nvlist_add_boolean_value(flt_copy,
1347 			    FM_SUSPECT_RETIRE, B_FALSE);
1348 		}
1349 		fmd_case_insert_suspect(cp, flt_copy);
1350 	}
1351 	/*
1352 	 * copy diag_time
1353 	 */
1354 	if (nvlist_lookup_int64_array(nvl, FM_SUSPECT_DIAG_TIME, &diag_time,
1355 	    &nelem2) == 0 && nelem2 >= 2)
1356 		fmd_case_settime(cp, diag_time[0], diag_time[1]);
1357 	/*
1358 	 * copy DE fmri
1359 	 */
1360 	if (nvlist_lookup_nvlist(nvl, FM_SUSPECT_DE, &de_fmri) == 0) {
1361 		(void) nvlist_xdup(de_fmri, &de_fmri_dup, &fmd.d_nva);
1362 		fmd_case_set_de_fmri(cp, de_fmri_dup);
1363 	}
1364 
1365 	/*
1366 	 * Transition to solved. This will log the suspect list and create
1367 	 * the resource cache entries.
1368 	 */
1369 	fmd_case_transition(cp, FMD_CASE_SOLVED, FMD_CF_SOLVED);
1370 
1371 	/*
1372 	 * Update status if it is not simply "all faulty" (can happen if
1373 	 * list.suspects are being re-sent when the transport has reconnected).
1374 	 */
1375 	(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS, &statusp,
1376 	    &nelem);
1377 	for (i = 0; i < nelem; i++) {
1378 		if ((statusp[i] & (FM_SUSPECT_FAULTY | FM_SUSPECT_UNUSABLE |
1379 		    FM_SUSPECT_NOT_PRESENT | FM_SUSPECT_DEGRADED)) !=
1380 		    FM_SUSPECT_FAULTY)
1381 			need_update = 1;
1382 	}
1383 	if (need_update) {
1384 		fmd_case_update_status(cp, statusp, cip->ci_proxy_asru,
1385 		    cip->ci_diag_asru);
1386 		fmd_case_update_containees(cp);
1387 		fmd_case_update(cp);
1388 	}
1389 
1390 	/*
1391 	 * if asru on proxy side, send an update back to the diagnosing side to
1392 	 * update UNUSABLE/DEGRADED.
1393 	 */
1394 	if (got_proxy_asru)
1395 		fmd_case_xprt_updated(cp);
1396 
1397 	if (nelem > 0)
1398 		fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1399 	fmd_module_unlock(xip->xi_queue->eq_mod);
1400 }
1401 
1402 void
1403 fmd_xprt_recv(fmd_xprt_t *xp, nvlist_t *nvl, hrtime_t hrt, boolean_t logonly)
1404 {
1405 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1406 	const fmd_xprt_rule_t *xrp;
1407 	fmd_t *dp = &fmd;
1408 
1409 	fmd_event_t *e;
1410 	char *class, *uuid;
1411 	boolean_t isproto, isereport;
1412 
1413 	uint64_t *tod;
1414 	uint8_t ttl;
1415 	uint_t n;
1416 	fmd_case_t *cp;
1417 
1418 	/*
1419 	 * Grab the transport lock and set the busy flag to indicate we are
1420 	 * busy receiving an event.  If [DI]SUSPEND is pending, wait until fmd
1421 	 * resumes the transport before continuing on with the receive.
1422 	 */
1423 	(void) pthread_mutex_lock(&xip->xi_lock);
1424 
1425 	while (xip->xi_flags & (FMD_XPRT_DSUSPENDED | FMD_XPRT_ISUSPENDED)) {
1426 
1427 		if (fmd.d_signal != 0) {
1428 			(void) pthread_mutex_unlock(&xip->xi_lock);
1429 			return; /* fmd_destroy() is in progress */
1430 		}
1431 
1432 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
1433 	}
1434 
1435 	xip->xi_busy++;
1436 	ASSERT(xip->xi_busy != 0);
1437 
1438 	(void) pthread_mutex_unlock(&xip->xi_lock);
1439 
1440 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
1441 	xip->xi_stats->xs_received.fmds_value.ui64++;
1442 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1443 
1444 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) {
1445 		fmd_error(EFMD_XPRT_PAYLOAD, "discarding nvlist %p: missing "
1446 		    "required \"%s\" payload element", (void *)nvl, FM_CLASS);
1447 
1448 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1449 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1450 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1451 
1452 		nvlist_free(nvl);
1453 		goto done;
1454 	}
1455 
1456 	fmd_dprintf(FMD_DBG_XPRT, "xprt %u %s %s\n", xip->xi_id,
1457 	    ((logonly == FMD_B_TRUE) ? "logging" : "posting"), class);
1458 
1459 	isereport = (strncmp(class, FM_EREPORT_CLASS,
1460 	    sizeof (FM_EREPORT_CLASS - 1)) == 0) ? FMD_B_TRUE : FMD_B_FALSE;
1461 
1462 	/*
1463 	 * The logonly flag should only be set for ereports.
1464 	 */
1465 	if ((logonly == FMD_B_TRUE) && (isereport == FMD_B_FALSE)) {
1466 		fmd_error(EFMD_XPRT_INVAL, "discarding nvlist %p: "
1467 		    "logonly flag is not valid for class %s",
1468 		    (void *)nvl, class);
1469 
1470 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1471 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1472 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1473 
1474 		nvlist_free(nvl);
1475 		goto done;
1476 	}
1477 
1478 	/*
1479 	 * If a time-to-live value is present in the event and is zero, drop
1480 	 * the event and bump xs_timeouts.  Otherwise decrement the TTL value.
1481 	 */
1482 	if (nvlist_lookup_uint8(nvl, FMD_EVN_TTL, &ttl) == 0) {
1483 		if (ttl == 0) {
1484 			fmd_dprintf(FMD_DBG_XPRT, "xprt %u nvlist %p (%s) "
1485 			    "timeout: event received with ttl=0\n",
1486 			    xip->xi_id, (void *)nvl, class);
1487 
1488 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1489 			xip->xi_stats->xs_timeouts.fmds_value.ui64++;
1490 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1491 
1492 			nvlist_free(nvl);
1493 			goto done;
1494 		}
1495 		(void) nvlist_remove(nvl, FMD_EVN_TTL, DATA_TYPE_UINT8);
1496 		(void) nvlist_add_uint8(nvl, FMD_EVN_TTL, ttl - 1);
1497 	}
1498 
1499 	/*
1500 	 * If we are using the native system clock, the underlying transport
1501 	 * code can provide a tighter event time bound by telling us when the
1502 	 * event was enqueued.  If we're using simulated clocks, this time
1503 	 * has no meaning to us, so just reset the value to use HRT_NOW.
1504 	 */
1505 	if (dp->d_clockops != &fmd_timeops_native)
1506 		hrt = FMD_HRT_NOW;
1507 
1508 	/*
1509 	 * If an event's class is in the FMD_CTL_CLASS family, then create a
1510 	 * control event.  If a FMD_EVN_TOD member is found, create a protocol
1511 	 * event using this time.  Otherwise create a protocol event using hrt.
1512 	 */
1513 	isproto = (strncmp(class, FMD_CTL_CLASS, FMD_CTL_CLASS_LEN) == 0) ?
1514 	    FMD_B_FALSE : FMD_B_TRUE;
1515 	if (isproto == FMD_B_FALSE)
1516 		e = fmd_event_create(FMD_EVT_CTL, hrt, nvl, fmd_ctl_init(nvl));
1517 	else if (nvlist_lookup_uint64_array(nvl, FMD_EVN_TOD, &tod, &n) != 0)
1518 		e = fmd_event_create(FMD_EVT_PROTOCOL, hrt, nvl, class);
1519 	else {
1520 		e = fmd_event_recreate(FMD_EVT_PROTOCOL,
1521 		    NULL, nvl, class, NULL, 0, 0);
1522 	}
1523 
1524 	/*
1525 	 * If the debug log is enabled, create a temporary event, log it to the
1526 	 * debug log, and then reset the underlying state of the event.
1527 	 */
1528 	if (xip->xi_log != NULL) {
1529 		fmd_event_impl_t *ep = (fmd_event_impl_t *)e;
1530 
1531 		fmd_log_append(xip->xi_log, e, NULL);
1532 
1533 		ep->ev_flags |= FMD_EVF_VOLATILE;
1534 		ep->ev_off = 0;
1535 		ep->ev_len = 0;
1536 
1537 		if (ep->ev_log != NULL) {
1538 			fmd_log_rele(ep->ev_log);
1539 			ep->ev_log = NULL;
1540 		}
1541 	}
1542 
1543 	/*
1544 	 * Iterate over the rules for the current state trying to match the
1545 	 * event class to one of our special rules.  If a rule is matched, the
1546 	 * event is consumed and not dispatched to other modules.  If the rule
1547 	 * set ends without matching an event, we fall through to dispatching.
1548 	 */
1549 	for (xrp = xip->xi_state; xrp->xr_class != NULL; xrp++) {
1550 		if (fmd_event_match(e, FMD_EVT_PROTOCOL, xrp->xr_class)) {
1551 			fmd_event_hold(e);
1552 			xrp->xr_func(xip, nvl);
1553 			fmd_event_rele(e);
1554 			goto done;
1555 		}
1556 	}
1557 
1558 	/*
1559 	 * Record the event in the errlog if it is an ereport.  This code will
1560 	 * be replaced later with a per-transport intent log instead.
1561 	 */
1562 	if (isereport == FMD_B_TRUE) {
1563 		(void) pthread_rwlock_rdlock(&dp->d_log_lock);
1564 		fmd_log_append(dp->d_errlog, e, NULL);
1565 		(void) pthread_rwlock_unlock(&dp->d_log_lock);
1566 	}
1567 
1568 	/*
1569 	 * If a list.suspect event is received, create a case for the specified
1570 	 * UUID in the case hash, with the transport module as its owner.
1571 	 */
1572 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_SUSPECT_CLASS)) {
1573 		if (xip->xi_flags & FMD_XPRT_CACHE_AS_LOCAL)
1574 			fmd_xprt_list_suspect_local(xp, nvl);
1575 		else
1576 			fmd_xprt_list_suspect(xp, nvl);
1577 		fmd_event_hold(e);
1578 		fmd_event_rele(e);
1579 		goto done;
1580 	}
1581 
1582 	/*
1583 	 * If a list.updated or list.repaired event is received, update the
1584 	 * resource cache status and the local case.
1585 	 */
1586 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_REPAIRED_CLASS) ||
1587 	    fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_UPDATED_CLASS)) {
1588 		uint8_t *statusp;
1589 		uint_t nelem = 0;
1590 
1591 		(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS,
1592 		    &statusp, &nelem);
1593 		fmd_module_lock(xip->xi_queue->eq_mod);
1594 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1595 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1596 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1597 			if (cip->ci_xprt != NULL) {
1598 				fmd_case_update_status(cp, statusp,
1599 				    cip->ci_proxy_asru, cip->ci_diag_asru);
1600 				fmd_case_update_containees(cp);
1601 				fmd_case_update(cp);
1602 			}
1603 			fmd_case_rele(cp);
1604 		}
1605 		fmd_module_unlock(xip->xi_queue->eq_mod);
1606 		fmd_event_hold(e);
1607 		fmd_event_rele(e);
1608 		goto done;
1609 	}
1610 
1611 	/*
1612 	 * If a list.isolated event is received, update resource cache status
1613 	 */
1614 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_ISOLATED_CLASS)) {
1615 		uint8_t *statusp;
1616 		uint_t nelem = 0;
1617 
1618 		(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS,
1619 		    &statusp, &nelem);
1620 		fmd_module_lock(xip->xi_queue->eq_mod);
1621 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1622 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1623 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1624 			if (cip->ci_xprt != NULL)
1625 				fmd_case_update_status(cp, statusp,
1626 				    cip->ci_proxy_asru, cip->ci_diag_asru);
1627 			fmd_case_rele(cp);
1628 		}
1629 		fmd_module_unlock(xip->xi_queue->eq_mod);
1630 		fmd_event_hold(e);
1631 		fmd_event_rele(e);
1632 		goto done;
1633 	}
1634 
1635 	/*
1636 	 * If a list.resolved event is received, resolve the local case.
1637 	 */
1638 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_RESOLVED_CLASS)) {
1639 		fmd_module_lock(xip->xi_queue->eq_mod);
1640 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1641 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1642 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1643 			if (cip->ci_xprt != NULL)
1644 				fmd_case_transition(cp, (cip->ci_state ==
1645 				    FMD_CASE_REPAIRED) ? FMD_CASE_RESOLVED :
1646 				    (cip->ci_state == FMD_CASE_CLOSED) ?
1647 				    FMD_CASE_REPAIRED : FMD_CASE_CLOSE_WAIT,
1648 				    FMD_CF_RESOLVED);
1649 			fmd_case_rele(cp);
1650 		}
1651 		fmd_module_unlock(xip->xi_queue->eq_mod);
1652 		fmd_event_hold(e);
1653 		fmd_event_rele(e);
1654 		goto done;
1655 	}
1656 
1657 	if (logonly == FMD_B_TRUE || (xip->xi_flags & FMD_XPRT_EXTERNAL)) {
1658 		/*
1659 		 * Don't proxy ereports on an EXTERNAL transport - we won't
1660 		 * know how to diagnose them with the wrong topology. Note
1661 		 * that here (and above) we have to hold/release the event in
1662 		 * order for it to be freed.
1663 		 */
1664 		fmd_event_hold(e);
1665 		fmd_event_rele(e);
1666 	} else if (isproto == FMD_B_TRUE)
1667 		fmd_dispq_dispatch(dp->d_disp, e, class);
1668 	else
1669 		fmd_modhash_dispatch(dp->d_mod_hash, e);
1670 done:
1671 	(void) pthread_mutex_lock(&xip->xi_lock);
1672 
1673 	ASSERT(xip->xi_busy != 0);
1674 	xip->xi_busy--;
1675 
1676 	(void) pthread_cond_broadcast(&xip->xi_cv);
1677 	(void) pthread_mutex_unlock(&xip->xi_lock);
1678 }
1679 
1680 void
1681 fmd_xprt_uuclose(fmd_xprt_t *xp, const char *uuid)
1682 {
1683 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1684 
1685 	fmd_event_t *e;
1686 	nvlist_t *nvl;
1687 	char *s;
1688 
1689 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1690 		return; /* read-only transports do not proxy uuclose */
1691 
1692 	TRACE((FMD_DBG_XPRT, "xprt %u closing case %s\n", xip->xi_id, uuid));
1693 
1694 	nvl = fmd_protocol_xprt_uuclose(xip->xi_queue->eq_mod,
1695 	    "resource.fm.xprt.uuclose", xip->xi_version, uuid);
1696 
1697 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1698 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1699 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1700 }
1701 
1702 /*
1703  * On proxy side, send back uuresolved request to diagnosing side
1704  */
1705 void
1706 fmd_xprt_uuresolved(fmd_xprt_t *xp, const char *uuid)
1707 {
1708 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1709 
1710 	fmd_event_t *e;
1711 	nvlist_t *nvl;
1712 	char *s;
1713 
1714 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1715 		return; /* read-only transports do not proxy uuresolved */
1716 
1717 	TRACE((FMD_DBG_XPRT, "xprt %u resolving case %s\n", xip->xi_id, uuid));
1718 
1719 	nvl = fmd_protocol_xprt_uuresolved(xip->xi_queue->eq_mod,
1720 	    "resource.fm.xprt.uuresolved", xip->xi_version, uuid);
1721 
1722 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1723 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1724 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1725 }
1726 
1727 /*
1728  * On proxy side, send back repair/acquit/etc request to diagnosing side
1729  */
1730 void
1731 fmd_xprt_updated(fmd_xprt_t *xp, const char *uuid, uint8_t *statusp,
1732 	uint8_t *has_asrup, uint_t nelem)
1733 {
1734 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1735 
1736 	fmd_event_t *e;
1737 	nvlist_t *nvl;
1738 	char *s;
1739 
1740 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1741 		return; /* read-only transports do not support remote repairs */
1742 
1743 	TRACE((FMD_DBG_XPRT, "xprt %u updating case %s\n", xip->xi_id, uuid));
1744 
1745 	nvl = fmd_protocol_xprt_updated(xip->xi_queue->eq_mod,
1746 	    "resource.fm.xprt.updated", xip->xi_version, uuid, statusp,
1747 	    has_asrup, nelem);
1748 
1749 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1750 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1751 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1752 }
1753 
1754 /*
1755  * Insert the specified class into our remote subscription hash.  If the class
1756  * is already present, bump the reference count; otherwise add it to the hash
1757  * and then enqueue an event for our remote peer to proxy our subscription.
1758  */
1759 void
1760 fmd_xprt_subscribe(fmd_xprt_t *xp, const char *class)
1761 {
1762 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1763 
1764 	uint_t refs;
1765 	nvlist_t *nvl;
1766 	fmd_event_t *e;
1767 	char *s;
1768 
1769 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1770 		return; /* read-only transports do not proxy subscriptions */
1771 
1772 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1773 		return; /* transport is not yet an active subscriber */
1774 
1775 	(void) pthread_mutex_lock(&xip->xi_lock);
1776 	refs = fmd_xprt_class_hash_insert(xip, &xip->xi_rsub, class);
1777 	(void) pthread_mutex_unlock(&xip->xi_lock);
1778 
1779 	if (refs > 1)
1780 		return; /* we've already asked our peer for this subscription */
1781 
1782 	fmd_dprintf(FMD_DBG_XPRT,
1783 	    "xprt %u subscribing to %s\n", xip->xi_id, class);
1784 
1785 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1786 	    "resource.fm.xprt.subscribe", xip->xi_version, class);
1787 
1788 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1789 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1790 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1791 }
1792 
1793 /*
1794  * Delete the specified class from the remote subscription hash.  If the
1795  * reference count drops to zero, ask our remote peer to unsubscribe by proxy.
1796  */
1797 void
1798 fmd_xprt_unsubscribe(fmd_xprt_t *xp, const char *class)
1799 {
1800 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1801 
1802 	uint_t refs;
1803 	nvlist_t *nvl;
1804 	fmd_event_t *e;
1805 	char *s;
1806 
1807 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1808 		return; /* read-only transports do not proxy subscriptions */
1809 
1810 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1811 		return; /* transport is not yet an active subscriber */
1812 
1813 	/*
1814 	 * If the subscription reference count drops to zero in xi_rsub, insert
1815 	 * an entry into the xi_usub hash indicating we await an unsuback event.
1816 	 */
1817 	(void) pthread_mutex_lock(&xip->xi_lock);
1818 
1819 	if ((refs = fmd_xprt_class_hash_delete(xip, &xip->xi_rsub, class)) == 0)
1820 		(void) fmd_xprt_class_hash_insert(xip, &xip->xi_usub, class);
1821 
1822 	(void) pthread_mutex_unlock(&xip->xi_lock);
1823 
1824 	if (refs != 0)
1825 		return; /* other subscriptions for this class still active */
1826 
1827 	fmd_dprintf(FMD_DBG_XPRT,
1828 	    "xprt %u unsubscribing from %s\n", xip->xi_id, class);
1829 
1830 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1831 	    "resource.fm.xprt.unsubscribe", xip->xi_version, class);
1832 
1833 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1834 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1835 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1836 }
1837 
1838 static void
1839 fmd_xprt_subscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1840 {
1841 	fmd_xprt_t *xp;
1842 
1843 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1844 		fmd_xprt_subscribe(xp, class);
1845 		fmd_idspace_rele(ids, id);
1846 	}
1847 }
1848 
1849 void
1850 fmd_xprt_subscribe_all(const char *class)
1851 {
1852 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1853 
1854 	if (ids->ids_count != 0)
1855 		fmd_idspace_apply(ids, fmd_xprt_subscribe_xid, (void *)class);
1856 }
1857 
1858 static void
1859 fmd_xprt_unsubscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1860 {
1861 	fmd_xprt_t *xp;
1862 
1863 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1864 		fmd_xprt_unsubscribe(xp, class);
1865 		fmd_idspace_rele(ids, id);
1866 	}
1867 }
1868 
1869 void
1870 fmd_xprt_unsubscribe_all(const char *class)
1871 {
1872 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1873 
1874 	if (ids->ids_count != 0)
1875 		fmd_idspace_apply(ids, fmd_xprt_unsubscribe_xid, (void *)class);
1876 }
1877 
1878 /*ARGSUSED*/
1879 static void
1880 fmd_xprt_suspend_xid(fmd_idspace_t *ids, id_t id, void *arg)
1881 {
1882 	fmd_xprt_t *xp;
1883 
1884 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1885 		fmd_xprt_xsuspend(xp, FMD_XPRT_DSUSPENDED);
1886 		fmd_idspace_rele(ids, id);
1887 	}
1888 }
1889 
1890 void
1891 fmd_xprt_suspend_all(void)
1892 {
1893 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1894 
1895 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1896 
1897 	if (fmd.d_xprt_suspend++ != 0) {
1898 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1899 		return; /* already suspended */
1900 	}
1901 
1902 	if (ids->ids_count != 0)
1903 		fmd_idspace_apply(ids, fmd_xprt_suspend_xid, NULL);
1904 
1905 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1906 }
1907 
1908 /*ARGSUSED*/
1909 static void
1910 fmd_xprt_resume_xid(fmd_idspace_t *ids, id_t id, void *arg)
1911 {
1912 	fmd_xprt_t *xp;
1913 
1914 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1915 		fmd_xprt_xresume(xp, FMD_XPRT_DSUSPENDED);
1916 		fmd_idspace_rele(ids, id);
1917 	}
1918 }
1919 
1920 void
1921 fmd_xprt_resume_all(void)
1922 {
1923 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1924 
1925 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1926 
1927 	if (fmd.d_xprt_suspend == 0)
1928 		fmd_panic("fmd_xprt_suspend/resume_all mismatch\n");
1929 
1930 	if (--fmd.d_xprt_suspend != 0) {
1931 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1932 		return; /* not ready to be resumed */
1933 	}
1934 
1935 	if (ids->ids_count != 0)
1936 		fmd_idspace_apply(ids, fmd_xprt_resume_xid, NULL);
1937 
1938 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1939 }
1940