xref: /titanic_41/usr/src/cmd/fm/fmd/common/fmd_xprt.c (revision 3c112a2b34403220c06c3e2fcac403358cfba168)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * FMD Transport Subsystem
29  *
30  * A transport module uses some underlying mechanism to transport events.
31  * This mechanism may use any underlying link-layer protocol and may support
32  * additional link-layer packets unrelated to FMA.  Some appropriate link-
33  * layer mechanism to create the underlying connection is expected to be
34  * called prior to calling fmd_xprt_open() itself.  Alternatively, a transport
35  * may be created in the suspended state by specifying the FMD_XPRT_SUSPENDED
36  * flag as part of the call to fmd_xprt_open(), and then may be resumed later.
37  * The underlying transport mechanism is *required* to provide ordering: that
38  * is, the sequences of bytes written across the transport must be read by
39  * the remote peer in the order that they are written, even across separate
40  * calls to fmdo_send().  As an example, the Internet TCP protocol would be
41  * a valid transport as it guarantees ordering, whereas the Internet UDP
42  * protocol would not because UDP datagrams may be delivered in any order
43  * as a result of delays introduced when datagrams pass through routers.
44  *
45  * Similar to sending events, a transport module receives events that are from
46  * its peer remote endpoint using some transport-specific mechanism that is
47  * unknown to FMD.  As each event is received, the transport module is
48  * responsible for constructing a valid nvlist_t object from the data and then
49  * calling fmd_xprt_post() to post the event to the containing FMD's dispatch
50  * queue, making it available to all local modules that are not transport
51  * modules that have subscribed to the event.
52  *
53  * The following state machine is used for each transport.  The initial state
54  * is either SYN, ACK, or RUN, depending on the flags specified to xprt_create.
55  *
56  *       FMD_XPRT_ACCEPT   !FMD_XPRT_ACCEPT
57  *             |                 |
58  * waiting  +--v--+           +--v--+  waiting
59  * for syn  | SYN |--+     --+| ACK |  for ack
60  * event    +-----+   \   /   +-----+  event
61  *             |       \ /       |
62  * drop all +--v--+     X     +--v--+  send subscriptions,
63  * events   | ERR |<---+ +--->| SUB |  recv subscriptions,
64  *          +-----+           +-----+  wait for run event
65  *             ^                 |
66  *             |     +-----+     |
67  *             +-----| RUN |<----+
68  *                   +--^--+
69  *                      |
70  *               FMD_XPRT_RDONLY
71  *
72  * When fmd_xprt_open() is called without FMD_XPRT_ACCEPT, the Common Transport
73  * Layer enqueues a "syn" event for the module in its event queue and sets the
74  * state to ACK.  In state ACK, we are waiting for the transport to get an
75  * "ack" event and call fmd_xprt_post() on this event.  Other events will be
76  * discarded.  If an "ack" is received, we transition to state SUB.  If a
77  * configurable timeout occurs or if the "ack" is invalid (e.g. invalid version
78  * exchange), we transition to state ERR.  Once in state ERR, no further
79  * operations are valid except fmd_xprt_close() and fmd_xprt_error() will
80  * return a non-zero value to the caller indicating the transport has failed.
81  *
82  * When fmd_xprt_open() is called with FMD_XPRT_ACCEPT, the Common Transport
83  * Layer assumes this transport is being used to accept a virtual connection
84  * from a remote peer that is sending a "syn", and sets the initial state to
85  * SYN.  In this state, the transport waits for a "syn" event, validates it,
86  * and then transitions to state SUB if it is valid or state ERR if it is not.
87  *
88  * Once in state SUB, the transport module is expected to receive a sequence of
89  * zero or more "subscribe" events from the remote peer, followed by a "run"
90  * event.  Once in state RUN, the transport is active and any events can be
91  * sent or received.  The transport module is free to call fmd_xprt_close()
92  * from any state.  The fmd_xprt_error() function will return zero if the
93  * transport is not in the ERR state, or non-zero if it is in the ERR state.
94  *
95  * Once the state machine reaches RUN, other FMA protocol events can be sent
96  * and received across the transport in addition to the various control events.
97  *
98  * Table of Common Transport Layer Control Events
99  * ==============================================
100  *
101  * FMA Class                     Payload
102  * ---------                     -------
103  * resource.fm.xprt.uuclose      string (uuid of case)
104  * resource.fm.xprt.uuresolved   string (uuid of case)
105  * resource.fm.xprt.updated      string (uuid of case)
106  * resource.fm.xprt.subscribe    string (class pattern)
107  * resource.fm.xprt.unsubscribe  string (class pattern)
108  * resource.fm.xprt.unsuback     string (class pattern)
109  * resource.fm.xprt.syn          version information
110  * resource.fm.xprt.ack          version information
111  * resource.fm.xprt.run          version information
112  *
113  * Control events are used to add and delete proxy subscriptions on the remote
114  * transport peer module, and to set up connections.  When a "syn" event is
115  * sent, FMD will include in the payload the highest version of the FMA event
116  * protocol that is supported by the sender.  When a "syn" event is received,
117  * the receiving FMD will use the minimum of this version and its version of
118  * the protocol, and reply with this new minimum version in the "ack" event.
119  * The receiver will then use this new minimum for subsequent event semantics.
120  */
121 
122 #include <sys/fm/protocol.h>
123 #include <strings.h>
124 #include <limits.h>
125 
126 #include <fmd_alloc.h>
127 #include <fmd_error.h>
128 #include <fmd_conf.h>
129 #include <fmd_subr.h>
130 #include <fmd_string.h>
131 #include <fmd_protocol.h>
132 #include <fmd_thread.h>
133 #include <fmd_eventq.h>
134 #include <fmd_dispq.h>
135 #include <fmd_ctl.h>
136 #include <fmd_log.h>
137 #include <fmd_ustat.h>
138 #include <fmd_case.h>
139 #include <fmd_api.h>
140 #include <fmd_fmri.h>
141 #include <fmd_asru.h>
142 #include <fmd_xprt.h>
143 
144 #include <fmd.h>
145 
146 /*
147  * The states shown above in the transport state machine diagram are encoded
148  * using arrays of class patterns and a corresponding action function.  These
149  * arrays are then passed to fmd_xprt_transition() to change transport states.
150  */
151 
152 const fmd_xprt_rule_t _fmd_xprt_state_syn[] = {
153 { "resource.fm.xprt.syn", fmd_xprt_event_syn },
154 { "*", fmd_xprt_event_error },
155 { NULL, NULL }
156 };
157 
158 const fmd_xprt_rule_t _fmd_xprt_state_ack[] = {
159 { "resource.fm.xprt.ack", fmd_xprt_event_ack },
160 { "*", fmd_xprt_event_error },
161 };
162 
163 const fmd_xprt_rule_t _fmd_xprt_state_err[] = {
164 { "*", fmd_xprt_event_drop },
165 { NULL, NULL }
166 };
167 
168 const fmd_xprt_rule_t _fmd_xprt_state_sub[] = {
169 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
170 { "resource.fm.xprt.run", fmd_xprt_event_run },
171 { "resource.fm.xprt.*", fmd_xprt_event_error },
172 { "*", fmd_xprt_event_drop },
173 { NULL, NULL }
174 };
175 
176 const fmd_xprt_rule_t _fmd_xprt_state_run[] = {
177 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
178 { "resource.fm.xprt.unsubscribe", fmd_xprt_event_unsub },
179 { "resource.fm.xprt.unsuback", fmd_xprt_event_unsuback },
180 { "resource.fm.xprt.uuclose", fmd_xprt_event_uuclose },
181 { "resource.fm.xprt.uuresolved", fmd_xprt_event_uuresolved },
182 { "resource.fm.xprt.updated", fmd_xprt_event_updated },
183 { "resource.fm.xprt.*", fmd_xprt_event_error },
184 { NULL, NULL }
185 };
186 
187 /*
188  * Template for per-transport statistics installed by fmd on behalf of each
189  * transport.  These are used to initialize the per-transport xi_stats.  For
190  * each statistic, the name is prepended with "fmd.xprt.%u", where %u is the
191  * transport ID (xi_id) and then are inserted into the per-module stats hash.
192  * The values in this array must match fmd_xprt_stat_t from <fmd_xprt.h>.
193  */
194 static const fmd_xprt_stat_t _fmd_xprt_stat_tmpl = {
195 {
196 { "dispatched", FMD_TYPE_UINT64, "total events dispatched to transport" },
197 { "dequeued", FMD_TYPE_UINT64, "total events dequeued by transport" },
198 { "prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by transport" },
199 { "dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" },
200 { "wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" },
201 { "wtime", FMD_TYPE_TIME, "total wait time on queue" },
202 { "wlentime", FMD_TYPE_TIME, "total wait length * time product" },
203 { "wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" },
204 { "dtime", FMD_TYPE_TIME, "total processing time after dequeue" },
205 { "dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" },
206 },
207 { "module", FMD_TYPE_STRING, "module that owns this transport" },
208 { "authority", FMD_TYPE_STRING, "authority associated with this transport" },
209 { "state", FMD_TYPE_STRING, "current transport state" },
210 { "received", FMD_TYPE_UINT64, "events received by transport" },
211 { "discarded", FMD_TYPE_UINT64, "bad events discarded by transport" },
212 { "retried", FMD_TYPE_UINT64, "retries requested of transport" },
213 { "replayed", FMD_TYPE_UINT64, "events replayed by transport" },
214 { "lost", FMD_TYPE_UINT64, "events lost by transport" },
215 { "timeouts", FMD_TYPE_UINT64, "events received by transport with ttl=0" },
216 { "subscriptions", FMD_TYPE_UINT64, "subscriptions registered to transport" },
217 };
218 
219 static void
220 fmd_xprt_class_hash_create(fmd_xprt_class_hash_t *xch, fmd_eventq_t *eq)
221 {
222 	uint_t hashlen = fmd.d_str_buckets;
223 
224 	xch->xch_queue = eq;
225 	xch->xch_hashlen = hashlen;
226 	xch->xch_hash = fmd_zalloc(sizeof (void *) * hashlen, FMD_SLEEP);
227 }
228 
229 static void
230 fmd_xprt_class_hash_destroy(fmd_xprt_class_hash_t *xch)
231 {
232 	fmd_eventq_t *eq = xch->xch_queue;
233 	fmd_xprt_class_t *xcp, *ncp;
234 	uint_t i;
235 
236 	for (i = 0; i < xch->xch_hashlen; i++) {
237 		for (xcp = xch->xch_hash[i]; xcp != NULL; xcp = ncp) {
238 			ncp = xcp->xc_next;
239 
240 			if (eq != NULL)
241 				fmd_dispq_delete(fmd.d_disp, eq, xcp->xc_class);
242 
243 			fmd_strfree(xcp->xc_class);
244 			fmd_free(xcp, sizeof (fmd_xprt_class_t));
245 		}
246 	}
247 
248 	fmd_free(xch->xch_hash, sizeof (void *) * xch->xch_hashlen);
249 }
250 
251 /*
252  * Insert the specified class into the specified class hash, and return the
253  * reference count.  A return value of one indicates this is the first insert.
254  * If an eventq is associated with the hash, insert a dispq subscription for it.
255  */
256 static uint_t
257 fmd_xprt_class_hash_insert(fmd_xprt_impl_t *xip,
258     fmd_xprt_class_hash_t *xch, const char *class)
259 {
260 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
261 	fmd_xprt_class_t *xcp;
262 
263 	ASSERT(MUTEX_HELD(&xip->xi_lock));
264 
265 	for (xcp = xch->xch_hash[h]; xcp != NULL; xcp = xcp->xc_next) {
266 		if (strcmp(class, xcp->xc_class) == 0)
267 			return (++xcp->xc_refs);
268 	}
269 
270 	xcp = fmd_alloc(sizeof (fmd_xprt_class_t), FMD_SLEEP);
271 	xcp->xc_class = fmd_strdup(class, FMD_SLEEP);
272 	xcp->xc_next = xch->xch_hash[h];
273 	xcp->xc_refs = 1;
274 	xch->xch_hash[h] = xcp;
275 
276 	if (xch->xch_queue != NULL)
277 		fmd_dispq_insert(fmd.d_disp, xch->xch_queue, class);
278 
279 	return (xcp->xc_refs);
280 }
281 
282 /*
283  * Delete the specified class from the specified class hash, and return the
284  * reference count.  A return value of zero indicates the class was deleted.
285  * If an eventq is associated with the hash, delete the dispq subscription.
286  */
287 static uint_t
288 fmd_xprt_class_hash_delete(fmd_xprt_impl_t *xip,
289     fmd_xprt_class_hash_t *xch, const char *class)
290 {
291 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
292 	fmd_xprt_class_t *xcp, **pp;
293 
294 	ASSERT(MUTEX_HELD(&xip->xi_lock));
295 	pp = &xch->xch_hash[h];
296 
297 	for (xcp = *pp; xcp != NULL; xcp = xcp->xc_next) {
298 		if (strcmp(class, xcp->xc_class) == 0)
299 			break;
300 		else
301 			pp = &xcp->xc_next;
302 	}
303 
304 	if (xcp == NULL)
305 		return (-1U); /* explicitly permit an invalid delete */
306 
307 	if (--xcp->xc_refs != 0)
308 		return (xcp->xc_refs);
309 
310 	ASSERT(xcp->xc_refs == 0);
311 	*pp = xcp->xc_next;
312 
313 	fmd_strfree(xcp->xc_class);
314 	fmd_free(xcp, sizeof (fmd_xprt_class_t));
315 
316 	if (xch->xch_queue != NULL)
317 		fmd_dispq_delete(fmd.d_disp, xch->xch_queue, class);
318 
319 	return (0);
320 }
321 
322 /*
323  * Queue subscribe events for the specified transport corresponding to all of
324  * the active module subscriptions.  This is an extremely heavyweight operation
325  * that we expect to take place rarely (i.e. when loading a transport module
326  * or when it establishes a connection).  We lock all of the known modules to
327  * prevent them from adding or deleting subscriptions, then snapshot their
328  * subscriptions, and then unlock all of the modules.  We hold the modhash
329  * lock for the duration of this operation to prevent new modules from loading.
330  */
331 static void
332 fmd_xprt_subscribe_modhash(fmd_xprt_impl_t *xip, fmd_modhash_t *mhp)
333 {
334 	fmd_xprt_t *xp = (fmd_xprt_t *)xip;
335 	const fmd_conf_path_t *pap;
336 	fmd_module_t *mp;
337 	uint_t i, j;
338 
339 	(void) pthread_rwlock_rdlock(&mhp->mh_lock);
340 
341 	for (i = 0; i < mhp->mh_hashlen; i++) {
342 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
343 			fmd_module_lock(mp);
344 	}
345 
346 	(void) pthread_mutex_lock(&xip->xi_lock);
347 	ASSERT(!(xip->xi_flags & FMD_XPRT_SUBSCRIBER));
348 	xip->xi_flags |= FMD_XPRT_SUBSCRIBER;
349 	(void) pthread_mutex_unlock(&xip->xi_lock);
350 
351 	for (i = 0; i < mhp->mh_hashlen; i++) {
352 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) {
353 			(void) fmd_conf_getprop(mp->mod_conf,
354 			    FMD_PROP_SUBSCRIPTIONS, &pap);
355 			for (j = 0; j < pap->cpa_argc; j++)
356 				fmd_xprt_subscribe(xp, pap->cpa_argv[j]);
357 		}
358 	}
359 
360 	for (i = 0; i < mhp->mh_hashlen; i++) {
361 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
362 			fmd_module_unlock(mp);
363 	}
364 
365 	(void) pthread_rwlock_unlock(&mhp->mh_lock);
366 }
367 
368 static void
369 fmd_xprt_transition(fmd_xprt_impl_t *xip,
370     const fmd_xprt_rule_t *state, const char *tag)
371 {
372 	fmd_xprt_t *xp = (fmd_xprt_t *)xip;
373 	fmd_event_t *e;
374 	nvlist_t *nvl;
375 	char *s;
376 
377 	TRACE((FMD_DBG_XPRT, "xprt %u -> %s\n", xip->xi_id, tag));
378 
379 	xip->xi_state = state;
380 	s = fmd_strdup(tag, FMD_SLEEP);
381 
382 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
383 	fmd_strfree(xip->xi_stats->xs_state.fmds_value.str);
384 	xip->xi_stats->xs_state.fmds_value.str = s;
385 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
386 
387 	/*
388 	 * If we've reached the SUB state, take out the big hammer and snapshot
389 	 * all of the subscriptions of all of the loaded modules.  Then queue a
390 	 * run event for our remote peer indicating that it can enter RUN.
391 	 */
392 	if (state == _fmd_xprt_state_sub) {
393 		fmd_xprt_subscribe_modhash(xip, fmd.d_mod_hash);
394 
395 		/*
396 		 * For read-write transports, we always want to set up remote
397 		 * subscriptions to the bultin list.* events, regardless of
398 		 * whether any agents have subscribed to them.
399 		 */
400 		if (xip->xi_flags & FMD_XPRT_RDWR) {
401 			fmd_xprt_subscribe(xp, FM_LIST_SUSPECT_CLASS);
402 			fmd_xprt_subscribe(xp, FM_LIST_ISOLATED_CLASS);
403 			fmd_xprt_subscribe(xp, FM_LIST_UPDATED_CLASS);
404 			fmd_xprt_subscribe(xp, FM_LIST_RESOLVED_CLASS);
405 			fmd_xprt_subscribe(xp, FM_LIST_REPAIRED_CLASS);
406 		}
407 
408 		nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
409 		    "resource.fm.xprt.run", xip->xi_version);
410 
411 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
412 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
413 		fmd_eventq_insert_at_time(xip->xi_queue, e);
414 	}
415 }
416 
417 static void
418 fmd_xprt_authupdate(fmd_xprt_impl_t *xip)
419 {
420 	char *s = fmd_fmri_auth2str(xip->xi_auth);
421 
422 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
423 	fmd_strfree(xip->xi_stats->xs_authority.fmds_value.str);
424 	xip->xi_stats->xs_authority.fmds_value.str = s;
425 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
426 }
427 
428 static int
429 fmd_xprt_vmismatch(fmd_xprt_impl_t *xip, nvlist_t *nvl, uint_t *rversionp)
430 {
431 	uint8_t rversion;
432 
433 	if (nvlist_lookup_uint8(nvl, FM_VERSION, &rversion) != 0) {
434 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
435 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
436 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
437 
438 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
439 		return (1);
440 	}
441 
442 	if (rversion > xip->xi_version) {
443 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u protocol mismatch: %u>%u\n",
444 		    xip->xi_id, rversion, xip->xi_version);
445 
446 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
447 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
448 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
449 
450 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
451 		return (1);
452 	}
453 
454 	if (rversionp != NULL)
455 		*rversionp = rversion;
456 
457 	return (0);
458 }
459 
460 void
461 fmd_xprt_event_syn(fmd_xprt_impl_t *xip, nvlist_t *nvl)
462 {
463 	fmd_event_t *e;
464 	uint_t vers;
465 	char *class;
466 
467 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
468 		return; /* transitioned to error state */
469 
470 	/*
471 	 * If the transport module didn't specify an authority, extract the
472 	 * one that is passed along with the xprt.syn event and use that.
473 	 */
474 	if (xip->xi_auth == NULL &&
475 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
476 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
477 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
478 		fmd_xprt_authupdate(xip);
479 	}
480 
481 	nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
482 	    "resource.fm.xprt.ack", xip->xi_version);
483 
484 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
485 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
486 	fmd_eventq_insert_at_time(xip->xi_queue, e);
487 
488 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
489 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
490 }
491 
492 void
493 fmd_xprt_event_ack(fmd_xprt_impl_t *xip, nvlist_t *nvl)
494 {
495 	uint_t vers;
496 
497 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
498 		return; /* transitioned to error state */
499 
500 	/*
501 	 * If the transport module didn't specify an authority, extract the
502 	 * one that is passed along with the xprt.syn event and use that.
503 	 */
504 	if (xip->xi_auth == NULL &&
505 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
506 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
507 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
508 		fmd_xprt_authupdate(xip);
509 	}
510 
511 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
512 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
513 }
514 
515 /*
516  * Upon transition to RUN, we take every solved case and resend a list.suspect
517  * event for it to our remote peer.  If a case transitions from solved to a
518  * future state (CLOSE_WAIT, CLOSED, or REPAIRED) while we are iterating over
519  * the case hash, we will get it as part of examining the resource cache, next.
520  */
521 static void
522 fmd_xprt_send_case(fmd_case_t *cp, void *arg)
523 {
524 	fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
525 	fmd_xprt_impl_t *xip = arg;
526 
527 	fmd_event_t *e;
528 	nvlist_t *nvl;
529 	char *class;
530 
531 	if (cip->ci_state == FMD_CASE_UNSOLVED)
532 		return;
533 
534 	nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS);
535 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
536 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
537 
538 	fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n",
539 	    FM_LIST_SUSPECT_CLASS, cip->ci_uuid, xip->xi_id);
540 
541 	fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid);
542 }
543 
544 void
545 fmd_xprt_event_run(fmd_xprt_impl_t *xip, nvlist_t *nvl)
546 {
547 	if (!fmd_xprt_vmismatch(xip, nvl, NULL)) {
548 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
549 		fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case, xip);
550 	}
551 }
552 
553 void
554 fmd_xprt_event_sub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
555 {
556 	char *class;
557 
558 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
559 		return; /* transitioned to error state */
560 
561 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
562 		return; /* malformed protocol event */
563 
564 	(void) pthread_mutex_lock(&xip->xi_lock);
565 	(void) fmd_xprt_class_hash_insert(xip, &xip->xi_lsub, class);
566 	(void) pthread_mutex_unlock(&xip->xi_lock);
567 
568 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
569 	xip->xi_stats->xs_subscriptions.fmds_value.ui64++;
570 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
571 }
572 
573 void
574 fmd_xprt_event_unsub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
575 {
576 	fmd_event_t *e;
577 	char *class;
578 
579 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
580 		return; /* transitioned to error state */
581 
582 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
583 		return; /* malformed protocol event */
584 
585 	(void) pthread_mutex_lock(&xip->xi_lock);
586 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_lsub, class);
587 	(void) pthread_mutex_unlock(&xip->xi_lock);
588 
589 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
590 	xip->xi_stats->xs_subscriptions.fmds_value.ui64--;
591 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
592 
593 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
594 	    "resource.fm.xprt.unsuback", xip->xi_version, class);
595 
596 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
597 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
598 	fmd_eventq_insert_at_time(xip->xi_queue, e);
599 }
600 
601 void
602 fmd_xprt_event_unsuback(fmd_xprt_impl_t *xip, nvlist_t *nvl)
603 {
604 	char *class;
605 
606 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
607 		return; /* transitioned to error state */
608 
609 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
610 		return; /* malformed protocol event */
611 
612 	(void) pthread_mutex_lock(&xip->xi_lock);
613 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_usub, class);
614 	(void) pthread_mutex_unlock(&xip->xi_lock);
615 }
616 
617 /*
618  * on diagnosing side, receive a uuclose from the proxy.
619  */
620 void
621 fmd_xprt_event_uuclose(fmd_xprt_impl_t *xip, nvlist_t *nvl)
622 {
623 	fmd_case_t *cp;
624 	char *uuid;
625 
626 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
627 		return; /* transitioned to error state */
628 
629 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
630 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
631 		/*
632 		 * update resource cache status and transition case
633 		 */
634 		fmd_case_close_status(cp);
635 		fmd_case_transition(cp, FMD_CASE_CLOSE_WAIT, FMD_CF_ISOLATED);
636 		fmd_case_rele(cp);
637 	}
638 }
639 
640 /*
641  * on diagnosing side, receive a uuresolved from the proxy.
642  */
643 void
644 fmd_xprt_event_uuresolved(fmd_xprt_impl_t *xip, nvlist_t *nvl)
645 {
646 	fmd_case_t *cp;
647 	char *uuid;
648 
649 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
650 		return; /* transitioned to error state */
651 
652 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
653 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
654 		fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
655 
656 		fmd_case_transition(cp, (cip->ci_state == FMD_CASE_REPAIRED) ?
657 		    FMD_CASE_RESOLVED : (cip->ci_state == FMD_CASE_CLOSED) ?
658 		    FMD_CASE_REPAIRED : FMD_CASE_CLOSE_WAIT, FMD_CF_RESOLVED);
659 		fmd_case_rele(cp);
660 	}
661 }
662 
663 /*
664  * on diagnosing side, receive a repair/acquit from the proxy.
665  */
666 void
667 fmd_xprt_event_updated(fmd_xprt_impl_t *xip, nvlist_t *nvl)
668 {
669 	fmd_case_t *cp;
670 	char *uuid;
671 
672 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
673 		return; /* transitioned to error state */
674 
675 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
676 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
677 		uint8_t *statusp, *proxy_asrup = NULL;
678 		uint_t nelem = 0;
679 
680 		/*
681 		 * Only update status with new repairs if "no remote repair"
682 		 * is not set. Do the case_update anyway though (as this will
683 		 * refresh the status on the proxy side).
684 		 */
685 		if (!(xip->xi_flags & FMD_XPRT_NO_REMOTE_REPAIR)) {
686 			if (nvlist_lookup_uint8_array(nvl,
687 			    FM_RSRC_XPRT_FAULT_STATUS, &statusp, &nelem) == 0 &&
688 			    nelem != 0) {
689 				(void) nvlist_lookup_uint8_array(nvl,
690 				    FM_RSRC_XPRT_FAULT_HAS_ASRU, &proxy_asrup,
691 				    &nelem);
692 				fmd_case_update_status(cp, statusp,
693 				    proxy_asrup, NULL);
694 			}
695 			fmd_case_update_containees(cp);
696 		}
697 		fmd_case_update(cp);
698 		fmd_case_rele(cp);
699 	}
700 }
701 
702 void
703 fmd_xprt_event_error(fmd_xprt_impl_t *xip, nvlist_t *nvl)
704 {
705 	char *class = "<unknown>";
706 
707 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
708 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
709 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
710 
711 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
712 	TRACE((FMD_DBG_XPRT, "xprt %u bad event %s\n", xip->xi_id, class));
713 
714 	fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
715 }
716 
717 void
718 fmd_xprt_event_drop(fmd_xprt_impl_t *xip, nvlist_t *nvl)
719 {
720 	char *class = "<unknown>";
721 
722 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
723 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
724 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
725 
726 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
727 	TRACE((FMD_DBG_XPRT, "xprt %u drop event %s\n", xip->xi_id, class));
728 
729 }
730 
731 fmd_xprt_t *
732 fmd_xprt_create(fmd_module_t *mp, uint_t flags, nvlist_t *auth, void *data)
733 {
734 	fmd_xprt_impl_t *xip = fmd_zalloc(sizeof (fmd_xprt_impl_t), FMD_SLEEP);
735 	fmd_stat_t *statv;
736 	uint_t i, statc;
737 
738 	char buf[PATH_MAX];
739 	fmd_event_t *e;
740 	nvlist_t *nvl;
741 	char *s;
742 
743 	(void) pthread_mutex_init(&xip->xi_lock, NULL);
744 	(void) pthread_cond_init(&xip->xi_cv, NULL);
745 	(void) pthread_mutex_init(&xip->xi_stats_lock, NULL);
746 
747 	xip->xi_auth = auth;
748 	xip->xi_data = data;
749 	xip->xi_version = FM_RSRC_XPRT_VERSION;
750 	xip->xi_flags = flags;
751 
752 	/*
753 	 * Grab fmd.d_xprt_lock to block fmd_xprt_suspend_all() and then create
754 	 * a transport ID and make it visible in fmd.d_xprt_ids.  If transports
755 	 * were previously suspended, set the FMD_XPRT_DSUSPENDED flag on us to
756 	 * ensure that this transport will not run until fmd_xprt_resume_all().
757 	 */
758 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
759 	xip->xi_id = fmd_idspace_alloc(fmd.d_xprt_ids, xip);
760 
761 	if (fmd.d_xprt_suspend != 0)
762 		xip->xi_flags |= FMD_XPRT_DSUSPENDED;
763 
764 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
765 
766 	/*
767 	 * If the module has not yet finished _fmd_init(), set the ISUSPENDED
768 	 * bit so that fmdo_send() is not called until _fmd_init() completes.
769 	 */
770 	if (!(mp->mod_flags & FMD_MOD_INIT))
771 		xip->xi_flags |= FMD_XPRT_ISUSPENDED;
772 
773 	/*
774 	 * Initialize the transport statistics that we keep on behalf of fmd.
775 	 * These are set up using a template defined at the top of this file.
776 	 * We rename each statistic with a prefix ensuring its uniqueness.
777 	 */
778 	statc = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
779 	statv = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
780 	bcopy(&_fmd_xprt_stat_tmpl, statv, sizeof (_fmd_xprt_stat_tmpl));
781 
782 	for (i = 0; i < statc; i++) {
783 		(void) snprintf(statv[i].fmds_name,
784 		    sizeof (statv[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
785 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
786 	}
787 
788 	xip->xi_stats = (fmd_xprt_stat_t *)fmd_ustat_insert(
789 	    mp->mod_ustat, FMD_USTAT_NOALLOC, statc, statv, NULL);
790 
791 	if (xip->xi_stats == NULL)
792 		fmd_panic("failed to create xi_stats (%p)\n", (void *)statv);
793 
794 	xip->xi_stats->xs_module.fmds_value.str =
795 	    fmd_strdup(mp->mod_name, FMD_SLEEP);
796 
797 	if (xip->xi_auth != NULL)
798 		fmd_xprt_authupdate(xip);
799 
800 	/*
801 	 * Create the outbound eventq for this transport and link to its stats.
802 	 * If any suspend bits were set above, suspend the eventq immediately.
803 	 */
804 	xip->xi_queue = fmd_eventq_create(mp, &xip->xi_stats->xs_evqstat,
805 	    &xip->xi_stats_lock, mp->mod_stats->ms_xprtqlimit.fmds_value.ui32);
806 
807 	if (xip->xi_flags & FMD_XPRT_SMASK)
808 		fmd_eventq_suspend(xip->xi_queue);
809 
810 	/*
811 	 * Create our subscription hashes: local subscriptions go to xi_queue,
812 	 * remote subscriptions are tracked only for protocol requests, and
813 	 * pending unsubscriptions are associated with the /dev/null eventq.
814 	 */
815 	fmd_xprt_class_hash_create(&xip->xi_lsub, xip->xi_queue);
816 	fmd_xprt_class_hash_create(&xip->xi_rsub, NULL);
817 	fmd_xprt_class_hash_create(&xip->xi_usub, fmd.d_rmod->mod_queue);
818 
819 	/*
820 	 * Determine our initial state based upon the creation flags.  If we're
821 	 * read-only, go directly to RUN.  If we're accepting a new connection,
822 	 * wait for a SYN.  Otherwise send a SYN and wait for an ACK.
823 	 */
824 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
825 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
826 	else if (flags & FMD_XPRT_ACCEPT)
827 		fmd_xprt_transition(xip, _fmd_xprt_state_syn, "SYN");
828 	else
829 		fmd_xprt_transition(xip, _fmd_xprt_state_ack, "ACK");
830 
831 	/*
832 	 * If client.xprtlog is set to TRUE, create a debugging log for the
833 	 * events received by the transport in var/fm/fmd/xprt/.
834 	 */
835 	(void) fmd_conf_getprop(fmd.d_conf, "client.xprtlog", &i);
836 	(void) fmd_conf_getprop(fmd.d_conf, "log.xprt", &s);
837 
838 	if (i) {
839 		(void) snprintf(buf, sizeof (buf), "%s/%u.log", s, xip->xi_id);
840 		xip->xi_log = fmd_log_open(fmd.d_rootdir, buf, FMD_LOG_XPRT);
841 	}
842 
843 	ASSERT(fmd_module_locked(mp));
844 	fmd_list_append(&mp->mod_transports, xip);
845 
846 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
847 	mp->mod_stats->ms_xprtopen.fmds_value.ui32++;
848 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
849 
850 	/*
851 	 * If this is a read-only transport, return without creating a send
852 	 * queue thread and setting up any connection events in our queue.
853 	 */
854 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
855 		goto out;
856 
857 	/*
858 	 * Once the transport is fully initialized, create a send queue thread
859 	 * and start any connect events flowing to complete our initialization.
860 	 */
861 	if ((xip->xi_thread = fmd_thread_create(mp,
862 	    (fmd_thread_f *)fmd_xprt_send, xip)) == NULL) {
863 
864 		fmd_error(EFMD_XPRT_THR,
865 		    "failed to create thread for transport %u", xip->xi_id);
866 
867 		fmd_xprt_destroy((fmd_xprt_t *)xip);
868 		(void) fmd_set_errno(EFMD_XPRT_THR);
869 		return (NULL);
870 	}
871 
872 	/*
873 	 * If the transport is not being opened to accept an inbound connect,
874 	 * start an outbound connection by enqueuing a SYN event for our peer.
875 	 */
876 	if (!(flags & FMD_XPRT_ACCEPT)) {
877 		nvl = fmd_protocol_xprt_ctl(mp,
878 		    "resource.fm.xprt.syn", FM_RSRC_XPRT_VERSION);
879 
880 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
881 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
882 		fmd_eventq_insert_at_time(xip->xi_queue, e);
883 	}
884 out:
885 	fmd_dprintf(FMD_DBG_XPRT, "opened transport %u\n", xip->xi_id);
886 	return ((fmd_xprt_t *)xip);
887 }
888 
889 void
890 fmd_xprt_destroy(fmd_xprt_t *xp)
891 {
892 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
893 	fmd_module_t *mp = xip->xi_queue->eq_mod;
894 	uint_t id = xip->xi_id;
895 
896 	fmd_case_impl_t *cip, *nip;
897 	fmd_stat_t *sp;
898 	uint_t i, n;
899 
900 	ASSERT(fmd_module_locked(mp));
901 	fmd_list_delete(&mp->mod_transports, xip);
902 
903 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
904 	mp->mod_stats->ms_xprtopen.fmds_value.ui32--;
905 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
906 
907 	(void) pthread_mutex_lock(&xip->xi_lock);
908 
909 	while (xip->xi_busy != 0)
910 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
911 
912 	/*
913 	 * Remove the transport from global visibility, cancel its send-side
914 	 * thread, join with it, and then remove the transport from module
915 	 * visibility.  Once all this is done, destroy and free the transport.
916 	 */
917 	(void) fmd_idspace_free(fmd.d_xprt_ids, xip->xi_id);
918 
919 	if (xip->xi_thread != NULL) {
920 		fmd_eventq_abort(xip->xi_queue);
921 		fmd_module_unlock(mp);
922 		fmd_thread_destroy(xip->xi_thread, FMD_THREAD_JOIN);
923 		fmd_module_lock(mp);
924 	}
925 
926 	if (xip->xi_log != NULL)
927 		fmd_log_rele(xip->xi_log);
928 
929 	/*
930 	 * Release every case handle in the module that was cached by this
931 	 * transport.  This will result in these cases disappearing from the
932 	 * local case hash so that fmd_case_uuclose() and fmd_case_repaired()
933 	 * etc can no longer be used.
934 	 */
935 	for (cip = fmd_list_next(&mp->mod_cases); cip != NULL; cip = nip) {
936 		nip = fmd_list_next(cip);
937 		if (cip->ci_xprt == xp)
938 			fmd_case_discard((fmd_case_t *)cip, B_TRUE);
939 	}
940 
941 	/*
942 	 * Destroy every class in the various subscription hashes and remove
943 	 * any corresponding subscriptions from the event dispatch queue.
944 	 */
945 	fmd_xprt_class_hash_destroy(&xip->xi_lsub);
946 	fmd_xprt_class_hash_destroy(&xip->xi_rsub);
947 	fmd_xprt_class_hash_destroy(&xip->xi_usub);
948 
949 	/*
950 	 * Uniquify the stat names exactly as was done in fmd_xprt_create()
951 	 * before calling fmd_ustat_insert(), otherwise fmd_ustat_delete()
952 	 * won't find the entries in the hash table.
953 	 */
954 	n = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
955 	sp = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
956 	bcopy(&_fmd_xprt_stat_tmpl, sp, sizeof (_fmd_xprt_stat_tmpl));
957 	for (i = 0; i < n; i++) {
958 		(void) snprintf(sp[i].fmds_name,
959 		    sizeof (sp[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
960 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
961 	}
962 	fmd_ustat_delete(mp->mod_ustat, n, sp);
963 	fmd_free(sp, sizeof (_fmd_xprt_stat_tmpl));
964 
965 	fmd_free(xip->xi_stats, sizeof (fmd_xprt_stat_t));
966 	fmd_eventq_destroy(xip->xi_queue);
967 	nvlist_free(xip->xi_auth);
968 	fmd_free(xip, sizeof (fmd_xprt_impl_t));
969 
970 	fmd_dprintf(FMD_DBG_XPRT, "closed transport %u\n", id);
971 }
972 
973 void
974 fmd_xprt_xsuspend(fmd_xprt_t *xp, uint_t flags)
975 {
976 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
977 	uint_t oflags;
978 
979 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
980 	(void) pthread_mutex_lock(&xip->xi_lock);
981 
982 	oflags = xip->xi_flags;
983 	xip->xi_flags |= flags;
984 
985 	if (!(oflags & FMD_XPRT_SMASK) && (xip->xi_flags & FMD_XPRT_SMASK) != 0)
986 		fmd_eventq_suspend(xip->xi_queue);
987 
988 	(void) pthread_cond_broadcast(&xip->xi_cv);
989 
990 	while (xip->xi_busy != 0)
991 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
992 
993 	(void) pthread_mutex_unlock(&xip->xi_lock);
994 }
995 
996 void
997 fmd_xprt_xresume(fmd_xprt_t *xp, uint_t flags)
998 {
999 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1000 	uint_t oflags;
1001 
1002 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
1003 	(void) pthread_mutex_lock(&xip->xi_lock);
1004 
1005 	oflags = xip->xi_flags;
1006 	xip->xi_flags &= ~flags;
1007 
1008 	if ((oflags & FMD_XPRT_SMASK) != 0 && !(xip->xi_flags & FMD_XPRT_SMASK))
1009 		fmd_eventq_resume(xip->xi_queue);
1010 
1011 	(void) pthread_cond_broadcast(&xip->xi_cv);
1012 	(void) pthread_mutex_unlock(&xip->xi_lock);
1013 }
1014 
1015 void
1016 fmd_xprt_send(fmd_xprt_t *xp)
1017 {
1018 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1019 	fmd_module_t *mp = xip->xi_queue->eq_mod;
1020 	fmd_event_t *ep;
1021 	int err;
1022 
1023 	while ((ep = fmd_eventq_delete(xip->xi_queue)) != NULL) {
1024 		if (FMD_EVENT_TTL(ep) == 0) {
1025 			fmd_event_rele(ep);
1026 			continue;
1027 		}
1028 
1029 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u sending %s\n",
1030 		    xip->xi_id, (char *)FMD_EVENT_DATA(ep));
1031 
1032 		err = mp->mod_ops->mop_transport(mp, xp, ep);
1033 		fmd_eventq_done(xip->xi_queue);
1034 
1035 		if (err == FMD_SEND_RETRY) {
1036 			fmd_eventq_insert_at_time(xip->xi_queue, ep);
1037 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1038 			xip->xi_stats->xs_retried.fmds_value.ui64++;
1039 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1040 		}
1041 
1042 		if (err != FMD_SEND_SUCCESS && err != FMD_SEND_RETRY) {
1043 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1044 			xip->xi_stats->xs_lost.fmds_value.ui64++;
1045 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1046 		}
1047 
1048 		fmd_event_rele(ep);
1049 	}
1050 }
1051 
1052 /*
1053  * This function creates a local suspect list. This is used when a suspect list
1054  * is created directly by an external source like fminject.
1055  */
1056 static void
1057 fmd_xprt_list_suspect_local(fmd_xprt_t *xp, nvlist_t *nvl)
1058 {
1059 	nvlist_t **nvlp;
1060 	nvlist_t *de_fmri, *de_fmri_dup = NULL;
1061 	int64_t *diag_time;
1062 	char *code = NULL;
1063 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1064 	fmd_case_t *cp;
1065 	uint_t nelem = 0, nelem2 = 0, i;
1066 	boolean_t injected;
1067 
1068 	fmd_module_lock(xip->xi_queue->eq_mod);
1069 	cp = fmd_case_create(xip->xi_queue->eq_mod, NULL);
1070 	if (cp == NULL) {
1071 		fmd_module_unlock(xip->xi_queue->eq_mod);
1072 		return;
1073 	}
1074 
1075 	/*
1076 	 * copy diag_code if present
1077 	 */
1078 	(void) nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code);
1079 	if (code != NULL) {
1080 		fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1081 
1082 		cip->ci_precanned = 1;
1083 		fmd_case_setcode(cp, code);
1084 	}
1085 
1086 	/*
1087 	 * copy suspects
1088 	 */
1089 	(void) nvlist_lookup_nvlist_array(nvl, FM_SUSPECT_FAULT_LIST, &nvlp,
1090 	    &nelem);
1091 	for (i = 0; i < nelem; i++) {
1092 		nvlist_t *flt_copy, *asru = NULL, *fru = NULL, *rsrc = NULL;
1093 		topo_hdl_t *thp;
1094 		char *loc = NULL;
1095 		int err;
1096 
1097 		thp = fmd_fmri_topo_hold(TOPO_VERSION);
1098 		(void) nvlist_xdup(nvlp[i], &flt_copy, &fmd.d_nva);
1099 		(void) nvlist_lookup_nvlist(nvlp[i], FM_FAULT_RESOURCE, &rsrc);
1100 
1101 		/*
1102 		 * If no fru specified, get it from topo
1103 		 */
1104 		if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_FRU, &fru) != 0 &&
1105 		    rsrc && topo_fmri_fru(thp, rsrc, &fru, &err) == 0)
1106 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_FRU, fru);
1107 		/*
1108 		 * If no asru specified, get it from topo
1109 		 */
1110 		if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU, &asru) != 0 &&
1111 		    rsrc && topo_fmri_asru(thp, rsrc, &asru, &err) == 0)
1112 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_ASRU, asru);
1113 		/*
1114 		 * If no location specified, get it from topo
1115 		 */
1116 		if (nvlist_lookup_string(nvlp[i], FM_FAULT_LOCATION,
1117 		    &loc) != 0) {
1118 			if (fru && topo_fmri_label(thp, fru, &loc, &err) == 0)
1119 				(void) nvlist_add_string(flt_copy,
1120 				    FM_FAULT_LOCATION, loc);
1121 			else if (rsrc && topo_fmri_label(thp, rsrc, &loc,
1122 			    &err) == 0)
1123 				(void) nvlist_add_string(flt_copy,
1124 				    FM_FAULT_LOCATION, loc);
1125 			if (loc)
1126 				topo_hdl_strfree(thp, loc);
1127 		}
1128 		if (fru)
1129 			nvlist_free(fru);
1130 		if (asru)
1131 			nvlist_free(asru);
1132 		if (rsrc)
1133 			nvlist_free(rsrc);
1134 		fmd_fmri_topo_rele(thp);
1135 		fmd_case_insert_suspect(cp, flt_copy);
1136 	}
1137 
1138 	/*
1139 	 * copy diag_time if present
1140 	 */
1141 	if (nvlist_lookup_int64_array(nvl, FM_SUSPECT_DIAG_TIME, &diag_time,
1142 	    &nelem2) == 0 && nelem2 >= 2)
1143 		fmd_case_settime(cp, diag_time[0], diag_time[1]);
1144 
1145 	/*
1146 	 * copy DE fmri if present
1147 	 */
1148 	if (nvlist_lookup_nvlist(nvl, FM_SUSPECT_DE, &de_fmri) == 0) {
1149 		(void) nvlist_xdup(de_fmri, &de_fmri_dup, &fmd.d_nva);
1150 		fmd_case_set_de_fmri(cp, de_fmri_dup);
1151 	}
1152 
1153 	/*
1154 	 * copy injected if present
1155 	 */
1156 	if (nvlist_lookup_boolean_value(nvl, FM_SUSPECT_INJECTED,
1157 	    &injected) == 0 && injected)
1158 		fmd_case_set_injected(cp);
1159 
1160 	fmd_case_transition(cp, FMD_CASE_SOLVED, FMD_CF_SOLVED);
1161 	fmd_module_unlock(xip->xi_queue->eq_mod);
1162 }
1163 
1164 /*
1165  * This function is called to create a proxy case on receipt of a list.suspect
1166  * from the diagnosing side of the transport.
1167  */
1168 static void
1169 fmd_xprt_list_suspect(fmd_xprt_t *xp, nvlist_t *nvl)
1170 {
1171 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1172 	nvlist_t **nvlp;
1173 	uint_t nelem = 0, nelem2 = 0, i;
1174 	int64_t *diag_time;
1175 	topo_hdl_t *thp;
1176 	char *class;
1177 	nvlist_t *rsrc, *asru, *de_fmri, *de_fmri_dup = NULL;
1178 	nvlist_t *flt_copy;
1179 	int err;
1180 	nvlist_t **asrua;
1181 	uint8_t *proxy_asru = NULL;
1182 	int got_proxy_asru = 0;
1183 	int got_hc_rsrc = 0;
1184 	int got_hc_asru = 0;
1185 	int got_present_rsrc = 0;
1186 	uint8_t *diag_asru = NULL;
1187 	char *scheme;
1188 	uint8_t *statusp;
1189 	char *uuid, *code;
1190 	fmd_case_t *cp;
1191 	fmd_case_impl_t *cip;
1192 	int need_update = 0;
1193 	boolean_t injected;
1194 
1195 	if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) != 0)
1196 		return;
1197 	if (nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code) != 0)
1198 		return;
1199 	(void) nvlist_lookup_nvlist_array(nvl, FM_SUSPECT_FAULT_LIST, &nvlp,
1200 	    &nelem);
1201 
1202 	/*
1203 	 * In order to implement FMD_XPRT_HCONLY and FMD_XPRT_HC_PRESENT_ONLY
1204 	 * etc we first scan the suspects to see if
1205 	 * - there was an asru in the received fault
1206 	 * - there was an hc-scheme resource in the received fault
1207 	 * - any hc-scheme resource in the received fault is present in the
1208 	 *   local topology
1209 	 * - any hc-scheme resource in the received fault has an asru in the
1210 	 *   local topology
1211 	 */
1212 	if (nelem > 0) {
1213 		asrua = fmd_zalloc(sizeof (nvlist_t *) * nelem, FMD_SLEEP);
1214 		proxy_asru = fmd_zalloc(sizeof (uint8_t) * nelem, FMD_SLEEP);
1215 		diag_asru = fmd_zalloc(sizeof (uint8_t) * nelem, FMD_SLEEP);
1216 		thp = fmd_fmri_topo_hold(TOPO_VERSION);
1217 		for (i = 0; i < nelem; i++) {
1218 			if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU,
1219 			    &asru) == 0 && asru != NULL)
1220 				diag_asru[i] = 1;
1221 			if (nvlist_lookup_string(nvlp[i], FM_CLASS,
1222 			    &class) != 0 || strncmp(class, "fault", 5) != 0)
1223 				continue;
1224 			/*
1225 			 * If there is an hc-scheme asru, use that to find the
1226 			 * real asru. Otherwise if there is an hc-scheme
1227 			 * resource, work out the old asru from that.
1228 			 * This order is to allow a two stage evaluation
1229 			 * of the asru where a fault in the diagnosing side
1230 			 * is in a component not visible to the proxy side,
1231 			 * but prevents a component that is visible from
1232 			 * working. So the diagnosing side sets the asru to
1233 			 * the latter component (in hc-scheme as the diagnosing
1234 			 * side doesn't know about the proxy side's virtual
1235 			 * schemes), and then the proxy side can convert that
1236 			 * to a suitable virtual scheme asru.
1237 			 */
1238 			if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU,
1239 			    &asru) == 0 && asru != NULL &&
1240 			    nvlist_lookup_string(asru, FM_FMRI_SCHEME,
1241 			    &scheme) == 0 &&
1242 			    strcmp(scheme, FM_FMRI_SCHEME_HC) == 0) {
1243 				got_hc_asru = 1;
1244 				if (xip->xi_flags & FMD_XPRT_EXTERNAL)
1245 					continue;
1246 				if (topo_fmri_present(thp, asru, &err) != 0)
1247 					got_present_rsrc = 1;
1248 				if (topo_fmri_asru(thp, asru, &asrua[i],
1249 				    &err) == 0) {
1250 					proxy_asru[i] =
1251 					    FMD_PROXY_ASRU_FROM_ASRU;
1252 					got_proxy_asru = 1;
1253 				}
1254 			} else if (nvlist_lookup_nvlist(nvlp[i],
1255 			    FM_FAULT_RESOURCE, &rsrc) == 0 && rsrc != NULL &&
1256 			    nvlist_lookup_string(rsrc, FM_FMRI_SCHEME,
1257 			    &scheme) == 0 &&
1258 			    strcmp(scheme, FM_FMRI_SCHEME_HC) == 0) {
1259 				got_hc_rsrc = 1;
1260 				if (xip->xi_flags & FMD_XPRT_EXTERNAL)
1261 					continue;
1262 				if (topo_fmri_present(thp, rsrc, &err) != 0)
1263 					got_present_rsrc = 1;
1264 				if (topo_fmri_asru(thp, rsrc, &asrua[i],
1265 				    &err) == 0) {
1266 					proxy_asru[i] =
1267 					    FMD_PROXY_ASRU_FROM_RSRC;
1268 					got_proxy_asru = 1;
1269 				}
1270 			}
1271 		}
1272 		fmd_fmri_topo_rele(thp);
1273 	}
1274 
1275 	/*
1276 	 * If we're set up only to report hc-scheme faults, and
1277 	 * there aren't any, then just drop the event.
1278 	 */
1279 	if (got_hc_rsrc == 0 && got_hc_asru == 0 &&
1280 	    (xip->xi_flags & FMD_XPRT_HCONLY)) {
1281 		if (nelem > 0) {
1282 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1283 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1284 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1285 		}
1286 		return;
1287 	}
1288 
1289 	/*
1290 	 * If we're set up only to report locally present hc-scheme
1291 	 * faults, and there aren't any, then just drop the event.
1292 	 */
1293 	if (got_present_rsrc == 0 &&
1294 	    (xip->xi_flags & FMD_XPRT_HC_PRESENT_ONLY)) {
1295 		if (nelem > 0) {
1296 			for (i = 0; i < nelem; i++)
1297 				if (asrua[i])
1298 					nvlist_free(asrua[i]);
1299 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1300 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1301 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1302 		}
1303 		return;
1304 	}
1305 
1306 	/*
1307 	 * If fmd_case_recreate() returns NULL, UUID is already known.
1308 	 */
1309 	fmd_module_lock(xip->xi_queue->eq_mod);
1310 	if ((cp = fmd_case_recreate(xip->xi_queue->eq_mod, xp,
1311 	    FMD_CASE_UNSOLVED, uuid, code)) == NULL) {
1312 		if (nelem > 0) {
1313 			for (i = 0; i < nelem; i++)
1314 				if (asrua[i])
1315 					nvlist_free(asrua[i]);
1316 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1317 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1318 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1319 		}
1320 		fmd_module_unlock(xip->xi_queue->eq_mod);
1321 		return;
1322 	}
1323 
1324 	cip = (fmd_case_impl_t *)cp;
1325 	cip->ci_diag_asru = diag_asru;
1326 	cip->ci_proxy_asru = proxy_asru;
1327 	for (i = 0; i < nelem; i++) {
1328 		(void) nvlist_xdup(nvlp[i], &flt_copy, &fmd.d_nva);
1329 		if (proxy_asru[i] != FMD_PROXY_ASRU_NOT_NEEDED) {
1330 			/*
1331 			 * Copy suspects, but remove/replace asru first. Also if
1332 			 * the original asru was hc-scheme use that as resource.
1333 			 */
1334 			if (proxy_asru[i] == FMD_PROXY_ASRU_FROM_ASRU) {
1335 				(void) nvlist_remove(flt_copy,
1336 				    FM_FAULT_RESOURCE, DATA_TYPE_NVLIST);
1337 				(void) nvlist_lookup_nvlist(flt_copy,
1338 				    FM_FAULT_ASRU, &asru);
1339 				(void) nvlist_add_nvlist(flt_copy,
1340 				    FM_FAULT_RESOURCE, asru);
1341 			}
1342 			(void) nvlist_remove(flt_copy, FM_FAULT_ASRU,
1343 			    DATA_TYPE_NVLIST);
1344 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_ASRU,
1345 			    asrua[i]);
1346 			nvlist_free(asrua[i]);
1347 		} else if (got_hc_asru == 0 &&
1348 		    nvlist_lookup_nvlist(flt_copy, FM_FAULT_ASRU,
1349 		    &asru) == 0 && asru != NULL) {
1350 			/*
1351 			 * If we have an asru from diag side, but it's not
1352 			 * in hc scheme, then we can't be sure what it
1353 			 * represents, so mark as no retire.
1354 			 */
1355 			(void) nvlist_add_boolean_value(flt_copy,
1356 			    FM_SUSPECT_RETIRE, B_FALSE);
1357 		}
1358 		fmd_case_insert_suspect(cp, flt_copy);
1359 	}
1360 	/*
1361 	 * copy diag_time
1362 	 */
1363 	if (nvlist_lookup_int64_array(nvl, FM_SUSPECT_DIAG_TIME, &diag_time,
1364 	    &nelem2) == 0 && nelem2 >= 2)
1365 		fmd_case_settime(cp, diag_time[0], diag_time[1]);
1366 	/*
1367 	 * copy DE fmri
1368 	 */
1369 	if (nvlist_lookup_nvlist(nvl, FM_SUSPECT_DE, &de_fmri) == 0) {
1370 		(void) nvlist_xdup(de_fmri, &de_fmri_dup, &fmd.d_nva);
1371 		fmd_case_set_de_fmri(cp, de_fmri_dup);
1372 	}
1373 
1374 	/*
1375 	 * copy injected if present
1376 	 */
1377 	if (nvlist_lookup_boolean_value(nvl, FM_SUSPECT_INJECTED,
1378 	    &injected) == 0 && injected)
1379 		fmd_case_set_injected(cp);
1380 
1381 	/*
1382 	 * Transition to solved. This will log the suspect list and create
1383 	 * the resource cache entries.
1384 	 */
1385 	fmd_case_transition(cp, FMD_CASE_SOLVED, FMD_CF_SOLVED);
1386 
1387 	/*
1388 	 * Update status if it is not simply "all faulty" (can happen if
1389 	 * list.suspects are being re-sent when the transport has reconnected).
1390 	 */
1391 	(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS, &statusp,
1392 	    &nelem);
1393 	for (i = 0; i < nelem; i++) {
1394 		if ((statusp[i] & (FM_SUSPECT_FAULTY | FM_SUSPECT_UNUSABLE |
1395 		    FM_SUSPECT_NOT_PRESENT | FM_SUSPECT_DEGRADED)) !=
1396 		    FM_SUSPECT_FAULTY)
1397 			need_update = 1;
1398 	}
1399 	if (need_update) {
1400 		fmd_case_update_status(cp, statusp, cip->ci_proxy_asru,
1401 		    cip->ci_diag_asru);
1402 		fmd_case_update_containees(cp);
1403 		fmd_case_update(cp);
1404 	}
1405 
1406 	/*
1407 	 * if asru on proxy side, send an update back to the diagnosing side to
1408 	 * update UNUSABLE/DEGRADED.
1409 	 */
1410 	if (got_proxy_asru)
1411 		fmd_case_xprt_updated(cp);
1412 
1413 	if (nelem > 0)
1414 		fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1415 	fmd_module_unlock(xip->xi_queue->eq_mod);
1416 }
1417 
1418 void
1419 fmd_xprt_recv(fmd_xprt_t *xp, nvlist_t *nvl, hrtime_t hrt, boolean_t logonly)
1420 {
1421 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1422 	const fmd_xprt_rule_t *xrp;
1423 	fmd_t *dp = &fmd;
1424 
1425 	fmd_event_t *e;
1426 	char *class, *uuid;
1427 	boolean_t isproto, isereport;
1428 
1429 	uint64_t *tod;
1430 	uint8_t ttl;
1431 	uint_t n;
1432 	fmd_case_t *cp;
1433 
1434 	/*
1435 	 * Grab the transport lock and set the busy flag to indicate we are
1436 	 * busy receiving an event.  If [DI]SUSPEND is pending, wait until fmd
1437 	 * resumes the transport before continuing on with the receive.
1438 	 */
1439 	(void) pthread_mutex_lock(&xip->xi_lock);
1440 
1441 	while (xip->xi_flags & (FMD_XPRT_DSUSPENDED | FMD_XPRT_ISUSPENDED)) {
1442 
1443 		if (fmd.d_signal != 0) {
1444 			(void) pthread_mutex_unlock(&xip->xi_lock);
1445 			return; /* fmd_destroy() is in progress */
1446 		}
1447 
1448 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
1449 	}
1450 
1451 	xip->xi_busy++;
1452 	ASSERT(xip->xi_busy != 0);
1453 
1454 	(void) pthread_mutex_unlock(&xip->xi_lock);
1455 
1456 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
1457 	xip->xi_stats->xs_received.fmds_value.ui64++;
1458 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1459 
1460 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) {
1461 		fmd_error(EFMD_XPRT_PAYLOAD, "discarding nvlist %p: missing "
1462 		    "required \"%s\" payload element", (void *)nvl, FM_CLASS);
1463 
1464 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1465 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1466 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1467 
1468 		nvlist_free(nvl);
1469 		goto done;
1470 	}
1471 
1472 	fmd_dprintf(FMD_DBG_XPRT, "xprt %u %s %s\n", xip->xi_id,
1473 	    ((logonly == FMD_B_TRUE) ? "logging" : "posting"), class);
1474 
1475 	isereport = (strncmp(class, FM_EREPORT_CLASS,
1476 	    sizeof (FM_EREPORT_CLASS - 1)) == 0) ? FMD_B_TRUE : FMD_B_FALSE;
1477 
1478 	/*
1479 	 * The logonly flag should only be set for ereports.
1480 	 */
1481 	if ((logonly == FMD_B_TRUE) && (isereport == FMD_B_FALSE)) {
1482 		fmd_error(EFMD_XPRT_INVAL, "discarding nvlist %p: "
1483 		    "logonly flag is not valid for class %s",
1484 		    (void *)nvl, class);
1485 
1486 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1487 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1488 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1489 
1490 		nvlist_free(nvl);
1491 		goto done;
1492 	}
1493 
1494 	/*
1495 	 * If a time-to-live value is present in the event and is zero, drop
1496 	 * the event and bump xs_timeouts.  Otherwise decrement the TTL value.
1497 	 */
1498 	if (nvlist_lookup_uint8(nvl, FMD_EVN_TTL, &ttl) == 0) {
1499 		if (ttl == 0) {
1500 			fmd_dprintf(FMD_DBG_XPRT, "xprt %u nvlist %p (%s) "
1501 			    "timeout: event received with ttl=0\n",
1502 			    xip->xi_id, (void *)nvl, class);
1503 
1504 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1505 			xip->xi_stats->xs_timeouts.fmds_value.ui64++;
1506 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1507 
1508 			nvlist_free(nvl);
1509 			goto done;
1510 		}
1511 		(void) nvlist_remove(nvl, FMD_EVN_TTL, DATA_TYPE_UINT8);
1512 		(void) nvlist_add_uint8(nvl, FMD_EVN_TTL, ttl - 1);
1513 	}
1514 
1515 	/*
1516 	 * If we are using the native system clock, the underlying transport
1517 	 * code can provide a tighter event time bound by telling us when the
1518 	 * event was enqueued.  If we're using simulated clocks, this time
1519 	 * has no meaning to us, so just reset the value to use HRT_NOW.
1520 	 */
1521 	if (dp->d_clockops != &fmd_timeops_native)
1522 		hrt = FMD_HRT_NOW;
1523 
1524 	/*
1525 	 * If an event's class is in the FMD_CTL_CLASS family, then create a
1526 	 * control event.  If a FMD_EVN_TOD member is found, create a protocol
1527 	 * event using this time.  Otherwise create a protocol event using hrt.
1528 	 */
1529 	isproto = (strncmp(class, FMD_CTL_CLASS, FMD_CTL_CLASS_LEN) == 0) ?
1530 	    FMD_B_FALSE : FMD_B_TRUE;
1531 	if (isproto == FMD_B_FALSE)
1532 		e = fmd_event_create(FMD_EVT_CTL, hrt, nvl, fmd_ctl_init(nvl));
1533 	else if (nvlist_lookup_uint64_array(nvl, FMD_EVN_TOD, &tod, &n) != 0)
1534 		e = fmd_event_create(FMD_EVT_PROTOCOL, hrt, nvl, class);
1535 	else {
1536 		e = fmd_event_recreate(FMD_EVT_PROTOCOL,
1537 		    NULL, nvl, class, NULL, 0, 0);
1538 	}
1539 
1540 	/*
1541 	 * If the debug log is enabled, create a temporary event, log it to the
1542 	 * debug log, and then reset the underlying state of the event.
1543 	 */
1544 	if (xip->xi_log != NULL) {
1545 		fmd_event_impl_t *ep = (fmd_event_impl_t *)e;
1546 
1547 		fmd_log_append(xip->xi_log, e, NULL);
1548 
1549 		ep->ev_flags |= FMD_EVF_VOLATILE;
1550 		ep->ev_off = 0;
1551 		ep->ev_len = 0;
1552 
1553 		if (ep->ev_log != NULL) {
1554 			fmd_log_rele(ep->ev_log);
1555 			ep->ev_log = NULL;
1556 		}
1557 	}
1558 
1559 	/*
1560 	 * Iterate over the rules for the current state trying to match the
1561 	 * event class to one of our special rules.  If a rule is matched, the
1562 	 * event is consumed and not dispatched to other modules.  If the rule
1563 	 * set ends without matching an event, we fall through to dispatching.
1564 	 */
1565 	for (xrp = xip->xi_state; xrp->xr_class != NULL; xrp++) {
1566 		if (fmd_event_match(e, FMD_EVT_PROTOCOL, xrp->xr_class)) {
1567 			fmd_event_hold(e);
1568 			xrp->xr_func(xip, nvl);
1569 			fmd_event_rele(e);
1570 			goto done;
1571 		}
1572 	}
1573 
1574 	/*
1575 	 * Record the event in the errlog if it is an ereport.  This code will
1576 	 * be replaced later with a per-transport intent log instead.
1577 	 */
1578 	if (isereport == FMD_B_TRUE) {
1579 		(void) pthread_rwlock_rdlock(&dp->d_log_lock);
1580 		fmd_log_append(dp->d_errlog, e, NULL);
1581 		(void) pthread_rwlock_unlock(&dp->d_log_lock);
1582 	}
1583 
1584 	/*
1585 	 * If a list.suspect event is received, create a case for the specified
1586 	 * UUID in the case hash, with the transport module as its owner.
1587 	 */
1588 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_SUSPECT_CLASS)) {
1589 		if (xip->xi_flags & FMD_XPRT_CACHE_AS_LOCAL)
1590 			fmd_xprt_list_suspect_local(xp, nvl);
1591 		else
1592 			fmd_xprt_list_suspect(xp, nvl);
1593 		fmd_event_hold(e);
1594 		fmd_event_rele(e);
1595 		goto done;
1596 	}
1597 
1598 	/*
1599 	 * If a list.updated or list.repaired event is received, update the
1600 	 * resource cache status and the local case.
1601 	 */
1602 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_REPAIRED_CLASS) ||
1603 	    fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_UPDATED_CLASS)) {
1604 		uint8_t *statusp;
1605 		uint_t nelem = 0;
1606 
1607 		(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS,
1608 		    &statusp, &nelem);
1609 		fmd_module_lock(xip->xi_queue->eq_mod);
1610 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1611 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1612 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1613 			if (cip->ci_xprt != NULL) {
1614 				fmd_case_update_status(cp, statusp,
1615 				    cip->ci_proxy_asru, cip->ci_diag_asru);
1616 				fmd_case_update_containees(cp);
1617 				fmd_case_update(cp);
1618 			}
1619 			fmd_case_rele(cp);
1620 		}
1621 		fmd_module_unlock(xip->xi_queue->eq_mod);
1622 		fmd_event_hold(e);
1623 		fmd_event_rele(e);
1624 		goto done;
1625 	}
1626 
1627 	/*
1628 	 * If a list.isolated event is received, update resource cache status
1629 	 */
1630 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_ISOLATED_CLASS)) {
1631 		uint8_t *statusp;
1632 		uint_t nelem = 0;
1633 
1634 		(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS,
1635 		    &statusp, &nelem);
1636 		fmd_module_lock(xip->xi_queue->eq_mod);
1637 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1638 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1639 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1640 			if (cip->ci_xprt != NULL)
1641 				fmd_case_update_status(cp, statusp,
1642 				    cip->ci_proxy_asru, cip->ci_diag_asru);
1643 			fmd_case_rele(cp);
1644 		}
1645 		fmd_module_unlock(xip->xi_queue->eq_mod);
1646 		fmd_event_hold(e);
1647 		fmd_event_rele(e);
1648 		goto done;
1649 	}
1650 
1651 	/*
1652 	 * If a list.resolved event is received, resolve the local case.
1653 	 */
1654 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_RESOLVED_CLASS)) {
1655 		fmd_module_lock(xip->xi_queue->eq_mod);
1656 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1657 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1658 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1659 			if (cip->ci_xprt != NULL)
1660 				fmd_case_transition(cp, (cip->ci_state ==
1661 				    FMD_CASE_REPAIRED) ? FMD_CASE_RESOLVED :
1662 				    (cip->ci_state == FMD_CASE_CLOSED) ?
1663 				    FMD_CASE_REPAIRED : FMD_CASE_CLOSE_WAIT,
1664 				    FMD_CF_RESOLVED);
1665 			fmd_case_rele(cp);
1666 		}
1667 		fmd_module_unlock(xip->xi_queue->eq_mod);
1668 		fmd_event_hold(e);
1669 		fmd_event_rele(e);
1670 		goto done;
1671 	}
1672 
1673 	if (logonly == FMD_B_TRUE || (xip->xi_flags & FMD_XPRT_EXTERNAL)) {
1674 		/*
1675 		 * Don't proxy ereports on an EXTERNAL transport - we won't
1676 		 * know how to diagnose them with the wrong topology. Note
1677 		 * that here (and above) we have to hold/release the event in
1678 		 * order for it to be freed.
1679 		 */
1680 		fmd_event_hold(e);
1681 		fmd_event_rele(e);
1682 	} else if (isproto == FMD_B_TRUE)
1683 		fmd_dispq_dispatch(dp->d_disp, e, class);
1684 	else
1685 		fmd_modhash_dispatch(dp->d_mod_hash, e);
1686 done:
1687 	(void) pthread_mutex_lock(&xip->xi_lock);
1688 
1689 	ASSERT(xip->xi_busy != 0);
1690 	xip->xi_busy--;
1691 
1692 	(void) pthread_cond_broadcast(&xip->xi_cv);
1693 	(void) pthread_mutex_unlock(&xip->xi_lock);
1694 }
1695 
1696 void
1697 fmd_xprt_uuclose(fmd_xprt_t *xp, const char *uuid)
1698 {
1699 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1700 
1701 	fmd_event_t *e;
1702 	nvlist_t *nvl;
1703 	char *s;
1704 
1705 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1706 		return; /* read-only transports do not proxy uuclose */
1707 
1708 	TRACE((FMD_DBG_XPRT, "xprt %u closing case %s\n", xip->xi_id, uuid));
1709 
1710 	nvl = fmd_protocol_xprt_uuclose(xip->xi_queue->eq_mod,
1711 	    "resource.fm.xprt.uuclose", xip->xi_version, uuid);
1712 
1713 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1714 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1715 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1716 }
1717 
1718 /*
1719  * On proxy side, send back uuresolved request to diagnosing side
1720  */
1721 void
1722 fmd_xprt_uuresolved(fmd_xprt_t *xp, const char *uuid)
1723 {
1724 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1725 
1726 	fmd_event_t *e;
1727 	nvlist_t *nvl;
1728 	char *s;
1729 
1730 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1731 		return; /* read-only transports do not proxy uuresolved */
1732 
1733 	TRACE((FMD_DBG_XPRT, "xprt %u resolving case %s\n", xip->xi_id, uuid));
1734 
1735 	nvl = fmd_protocol_xprt_uuresolved(xip->xi_queue->eq_mod,
1736 	    "resource.fm.xprt.uuresolved", xip->xi_version, uuid);
1737 
1738 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1739 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1740 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1741 }
1742 
1743 /*
1744  * On proxy side, send back repair/acquit/etc request to diagnosing side
1745  */
1746 void
1747 fmd_xprt_updated(fmd_xprt_t *xp, const char *uuid, uint8_t *statusp,
1748 	uint8_t *has_asrup, uint_t nelem)
1749 {
1750 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1751 
1752 	fmd_event_t *e;
1753 	nvlist_t *nvl;
1754 	char *s;
1755 
1756 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1757 		return; /* read-only transports do not support remote repairs */
1758 
1759 	TRACE((FMD_DBG_XPRT, "xprt %u updating case %s\n", xip->xi_id, uuid));
1760 
1761 	nvl = fmd_protocol_xprt_updated(xip->xi_queue->eq_mod,
1762 	    "resource.fm.xprt.updated", xip->xi_version, uuid, statusp,
1763 	    has_asrup, nelem);
1764 
1765 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1766 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1767 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1768 }
1769 
1770 /*
1771  * Insert the specified class into our remote subscription hash.  If the class
1772  * is already present, bump the reference count; otherwise add it to the hash
1773  * and then enqueue an event for our remote peer to proxy our subscription.
1774  */
1775 void
1776 fmd_xprt_subscribe(fmd_xprt_t *xp, const char *class)
1777 {
1778 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1779 
1780 	uint_t refs;
1781 	nvlist_t *nvl;
1782 	fmd_event_t *e;
1783 	char *s;
1784 
1785 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1786 		return; /* read-only transports do not proxy subscriptions */
1787 
1788 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1789 		return; /* transport is not yet an active subscriber */
1790 
1791 	(void) pthread_mutex_lock(&xip->xi_lock);
1792 	refs = fmd_xprt_class_hash_insert(xip, &xip->xi_rsub, class);
1793 	(void) pthread_mutex_unlock(&xip->xi_lock);
1794 
1795 	if (refs > 1)
1796 		return; /* we've already asked our peer for this subscription */
1797 
1798 	fmd_dprintf(FMD_DBG_XPRT,
1799 	    "xprt %u subscribing to %s\n", xip->xi_id, class);
1800 
1801 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1802 	    "resource.fm.xprt.subscribe", xip->xi_version, class);
1803 
1804 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1805 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1806 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1807 }
1808 
1809 /*
1810  * Delete the specified class from the remote subscription hash.  If the
1811  * reference count drops to zero, ask our remote peer to unsubscribe by proxy.
1812  */
1813 void
1814 fmd_xprt_unsubscribe(fmd_xprt_t *xp, const char *class)
1815 {
1816 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1817 
1818 	uint_t refs;
1819 	nvlist_t *nvl;
1820 	fmd_event_t *e;
1821 	char *s;
1822 
1823 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1824 		return; /* read-only transports do not proxy subscriptions */
1825 
1826 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1827 		return; /* transport is not yet an active subscriber */
1828 
1829 	/*
1830 	 * If the subscription reference count drops to zero in xi_rsub, insert
1831 	 * an entry into the xi_usub hash indicating we await an unsuback event.
1832 	 */
1833 	(void) pthread_mutex_lock(&xip->xi_lock);
1834 
1835 	if ((refs = fmd_xprt_class_hash_delete(xip, &xip->xi_rsub, class)) == 0)
1836 		(void) fmd_xprt_class_hash_insert(xip, &xip->xi_usub, class);
1837 
1838 	(void) pthread_mutex_unlock(&xip->xi_lock);
1839 
1840 	if (refs != 0)
1841 		return; /* other subscriptions for this class still active */
1842 
1843 	fmd_dprintf(FMD_DBG_XPRT,
1844 	    "xprt %u unsubscribing from %s\n", xip->xi_id, class);
1845 
1846 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1847 	    "resource.fm.xprt.unsubscribe", xip->xi_version, class);
1848 
1849 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1850 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1851 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1852 }
1853 
1854 static void
1855 fmd_xprt_subscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1856 {
1857 	fmd_xprt_t *xp;
1858 
1859 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1860 		fmd_xprt_subscribe(xp, class);
1861 		fmd_idspace_rele(ids, id);
1862 	}
1863 }
1864 
1865 void
1866 fmd_xprt_subscribe_all(const char *class)
1867 {
1868 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1869 
1870 	if (ids->ids_count != 0)
1871 		fmd_idspace_apply(ids, fmd_xprt_subscribe_xid, (void *)class);
1872 }
1873 
1874 static void
1875 fmd_xprt_unsubscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1876 {
1877 	fmd_xprt_t *xp;
1878 
1879 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1880 		fmd_xprt_unsubscribe(xp, class);
1881 		fmd_idspace_rele(ids, id);
1882 	}
1883 }
1884 
1885 void
1886 fmd_xprt_unsubscribe_all(const char *class)
1887 {
1888 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1889 
1890 	if (ids->ids_count != 0)
1891 		fmd_idspace_apply(ids, fmd_xprt_unsubscribe_xid, (void *)class);
1892 }
1893 
1894 /*ARGSUSED*/
1895 static void
1896 fmd_xprt_suspend_xid(fmd_idspace_t *ids, id_t id, void *arg)
1897 {
1898 	fmd_xprt_t *xp;
1899 
1900 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1901 		fmd_xprt_xsuspend(xp, FMD_XPRT_DSUSPENDED);
1902 		fmd_idspace_rele(ids, id);
1903 	}
1904 }
1905 
1906 void
1907 fmd_xprt_suspend_all(void)
1908 {
1909 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1910 
1911 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1912 
1913 	if (fmd.d_xprt_suspend++ != 0) {
1914 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1915 		return; /* already suspended */
1916 	}
1917 
1918 	if (ids->ids_count != 0)
1919 		fmd_idspace_apply(ids, fmd_xprt_suspend_xid, NULL);
1920 
1921 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1922 }
1923 
1924 /*ARGSUSED*/
1925 static void
1926 fmd_xprt_resume_xid(fmd_idspace_t *ids, id_t id, void *arg)
1927 {
1928 	fmd_xprt_t *xp;
1929 
1930 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1931 		fmd_xprt_xresume(xp, FMD_XPRT_DSUSPENDED);
1932 		fmd_idspace_rele(ids, id);
1933 	}
1934 }
1935 
1936 void
1937 fmd_xprt_resume_all(void)
1938 {
1939 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1940 
1941 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1942 
1943 	if (fmd.d_xprt_suspend == 0)
1944 		fmd_panic("fmd_xprt_suspend/resume_all mismatch\n");
1945 
1946 	if (--fmd.d_xprt_suspend != 0) {
1947 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1948 		return; /* not ready to be resumed */
1949 	}
1950 
1951 	if (ids->ids_count != 0)
1952 		fmd_idspace_apply(ids, fmd_xprt_resume_xid, NULL);
1953 
1954 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1955 }
1956