xref: /titanic_51/usr/src/cmd/fm/fmd/common/fmd_xprt.c (revision 4a8d0ea71c9a4e51c6a916a083ced6b499eb207f)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * FMD Transport Subsystem
29  *
30  * A transport module uses some underlying mechanism to transport events.
31  * This mechanism may use any underlying link-layer protocol and may support
32  * additional link-layer packets unrelated to FMA.  Some appropriate link-
33  * layer mechanism to create the underlying connection is expected to be
34  * called prior to calling fmd_xprt_open() itself.  Alternatively, a transport
35  * may be created in the suspended state by specifying the FMD_XPRT_SUSPENDED
36  * flag as part of the call to fmd_xprt_open(), and then may be resumed later.
37  * The underlying transport mechanism is *required* to provide ordering: that
38  * is, the sequences of bytes written across the transport must be read by
39  * the remote peer in the order that they are written, even across separate
40  * calls to fmdo_send().  As an example, the Internet TCP protocol would be
41  * a valid transport as it guarantees ordering, whereas the Internet UDP
42  * protocol would not because UDP datagrams may be delivered in any order
43  * as a result of delays introduced when datagrams pass through routers.
44  *
45  * Similar to sending events, a transport module receives events that are from
46  * its peer remote endpoint using some transport-specific mechanism that is
47  * unknown to FMD.  As each event is received, the transport module is
48  * responsible for constructing a valid nvlist_t object from the data and then
49  * calling fmd_xprt_post() to post the event to the containing FMD's dispatch
50  * queue, making it available to all local modules that are not transport
51  * modules that have subscribed to the event.
52  *
53  * The following state machine is used for each transport.  The initial state
54  * is either SYN, ACK, or RUN, depending on the flags specified to xprt_create.
55  *
56  *       FMD_XPRT_ACCEPT   !FMD_XPRT_ACCEPT
57  *             |                 |
58  * waiting  +--v--+           +--v--+  waiting
59  * for syn  | SYN |--+     --+| ACK |  for ack
60  * event    +-----+   \   /   +-----+  event
61  *             |       \ /       |
62  * drop all +--v--+     X     +--v--+  send subscriptions,
63  * events   | ERR |<---+ +--->| SUB |  recv subscriptions,
64  *          +-----+           +-----+  wait for run event
65  *             ^                 |
66  *             |     +-----+     |
67  *             +-----| RUN |<----+
68  *                   +--^--+
69  *                      |
70  *               FMD_XPRT_RDONLY
71  *
72  * When fmd_xprt_open() is called without FMD_XPRT_ACCEPT, the Common Transport
73  * Layer enqueues a "syn" event for the module in its event queue and sets the
74  * state to ACK.  In state ACK, we are waiting for the transport to get an
75  * "ack" event and call fmd_xprt_post() on this event.  Other events will be
76  * discarded.  If an "ack" is received, we transition to state SUB.  If a
77  * configurable timeout occurs or if the "ack" is invalid (e.g. invalid version
78  * exchange), we transition to state ERR.  Once in state ERR, no further
79  * operations are valid except fmd_xprt_close() and fmd_xprt_error() will
80  * return a non-zero value to the caller indicating the transport has failed.
81  *
82  * When fmd_xprt_open() is called with FMD_XPRT_ACCEPT, the Common Transport
83  * Layer assumes this transport is being used to accept a virtual connection
84  * from a remote peer that is sending a "syn", and sets the initial state to
85  * SYN.  In this state, the transport waits for a "syn" event, validates it,
86  * and then transitions to state SUB if it is valid or state ERR if it is not.
87  *
88  * Once in state SUB, the transport module is expected to receive a sequence of
89  * zero or more "subscribe" events from the remote peer, followed by a "run"
90  * event.  Once in state RUN, the transport is active and any events can be
91  * sent or received.  The transport module is free to call fmd_xprt_close()
92  * from any state.  The fmd_xprt_error() function will return zero if the
93  * transport is not in the ERR state, or non-zero if it is in the ERR state.
94  *
95  * Once the state machine reaches RUN, other FMA protocol events can be sent
96  * and received across the transport in addition to the various control events.
97  *
98  * Table of Common Transport Layer Control Events
99  * ==============================================
100  *
101  * FMA Class                     Payload
102  * ---------                     -------
103  * resource.fm.xprt.uuclose      string (uuid of case)
104  * resource.fm.xprt.uuresolved   string (uuid of case)
105  * resource.fm.xprt.updated      string (uuid of case)
106  * resource.fm.xprt.subscribe    string (class pattern)
107  * resource.fm.xprt.unsubscribe  string (class pattern)
108  * resource.fm.xprt.unsuback     string (class pattern)
109  * resource.fm.xprt.syn          version information
110  * resource.fm.xprt.ack          version information
111  * resource.fm.xprt.run          version information
112  *
113  * Control events are used to add and delete proxy subscriptions on the remote
114  * transport peer module, and to set up connections.  When a "syn" event is
115  * sent, FMD will include in the payload the highest version of the FMA event
116  * protocol that is supported by the sender.  When a "syn" event is received,
117  * the receiving FMD will use the minimum of this version and its version of
118  * the protocol, and reply with this new minimum version in the "ack" event.
119  * The receiver will then use this new minimum for subsequent event semantics.
120  */
121 
122 #include <sys/fm/protocol.h>
123 #include <strings.h>
124 #include <limits.h>
125 
126 #include <fmd_alloc.h>
127 #include <fmd_error.h>
128 #include <fmd_conf.h>
129 #include <fmd_subr.h>
130 #include <fmd_string.h>
131 #include <fmd_protocol.h>
132 #include <fmd_thread.h>
133 #include <fmd_eventq.h>
134 #include <fmd_dispq.h>
135 #include <fmd_ctl.h>
136 #include <fmd_log.h>
137 #include <fmd_ustat.h>
138 #include <fmd_case.h>
139 #include <fmd_api.h>
140 #include <fmd_fmri.h>
141 #include <fmd_asru.h>
142 #include <fmd_xprt.h>
143 
144 #include <fmd.h>
145 
146 /*
147  * The states shown above in the transport state machine diagram are encoded
148  * using arrays of class patterns and a corresponding action function.  These
149  * arrays are then passed to fmd_xprt_transition() to change transport states.
150  */
151 
152 const fmd_xprt_rule_t _fmd_xprt_state_syn[] = {
153 { "resource.fm.xprt.syn", fmd_xprt_event_syn },
154 { "*", fmd_xprt_event_error },
155 { NULL, NULL }
156 };
157 
158 const fmd_xprt_rule_t _fmd_xprt_state_ack[] = {
159 { "resource.fm.xprt.ack", fmd_xprt_event_ack },
160 { "*", fmd_xprt_event_error },
161 };
162 
163 const fmd_xprt_rule_t _fmd_xprt_state_err[] = {
164 { "*", fmd_xprt_event_drop },
165 { NULL, NULL }
166 };
167 
168 const fmd_xprt_rule_t _fmd_xprt_state_sub[] = {
169 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
170 { "resource.fm.xprt.run", fmd_xprt_event_run },
171 { "resource.fm.xprt.*", fmd_xprt_event_error },
172 { "*", fmd_xprt_event_drop },
173 { NULL, NULL }
174 };
175 
176 const fmd_xprt_rule_t _fmd_xprt_state_run[] = {
177 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
178 { "resource.fm.xprt.unsubscribe", fmd_xprt_event_unsub },
179 { "resource.fm.xprt.unsuback", fmd_xprt_event_unsuback },
180 { "resource.fm.xprt.uuclose", fmd_xprt_event_uuclose },
181 { "resource.fm.xprt.uuresolved", fmd_xprt_event_uuresolved },
182 { "resource.fm.xprt.updated", fmd_xprt_event_updated },
183 { "resource.fm.xprt.*", fmd_xprt_event_error },
184 { NULL, NULL }
185 };
186 
187 /*
188  * Template for per-transport statistics installed by fmd on behalf of each
189  * transport.  These are used to initialize the per-transport xi_stats.  For
190  * each statistic, the name is prepended with "fmd.xprt.%u", where %u is the
191  * transport ID (xi_id) and then are inserted into the per-module stats hash.
192  * The values in this array must match fmd_xprt_stat_t from <fmd_xprt.h>.
193  */
194 static const fmd_xprt_stat_t _fmd_xprt_stat_tmpl = {
195 {
196 { "dispatched", FMD_TYPE_UINT64, "total events dispatched to transport" },
197 { "dequeued", FMD_TYPE_UINT64, "total events dequeued by transport" },
198 { "prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by transport" },
199 { "dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" },
200 { "wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" },
201 { "wtime", FMD_TYPE_TIME, "total wait time on queue" },
202 { "wlentime", FMD_TYPE_TIME, "total wait length * time product" },
203 { "wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" },
204 { "dtime", FMD_TYPE_TIME, "total processing time after dequeue" },
205 { "dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" },
206 },
207 { "module", FMD_TYPE_STRING, "module that owns this transport" },
208 { "authority", FMD_TYPE_STRING, "authority associated with this transport" },
209 { "state", FMD_TYPE_STRING, "current transport state" },
210 { "received", FMD_TYPE_UINT64, "events received by transport" },
211 { "discarded", FMD_TYPE_UINT64, "bad events discarded by transport" },
212 { "retried", FMD_TYPE_UINT64, "retries requested of transport" },
213 { "replayed", FMD_TYPE_UINT64, "events replayed by transport" },
214 { "lost", FMD_TYPE_UINT64, "events lost by transport" },
215 { "timeouts", FMD_TYPE_UINT64, "events received by transport with ttl=0" },
216 { "subscriptions", FMD_TYPE_UINT64, "subscriptions registered to transport" },
217 };
218 
219 static void
220 fmd_xprt_class_hash_create(fmd_xprt_class_hash_t *xch, fmd_eventq_t *eq)
221 {
222 	uint_t hashlen = fmd.d_str_buckets;
223 
224 	xch->xch_queue = eq;
225 	xch->xch_hashlen = hashlen;
226 	xch->xch_hash = fmd_zalloc(sizeof (void *) * hashlen, FMD_SLEEP);
227 }
228 
229 static void
230 fmd_xprt_class_hash_destroy(fmd_xprt_class_hash_t *xch)
231 {
232 	fmd_eventq_t *eq = xch->xch_queue;
233 	fmd_xprt_class_t *xcp, *ncp;
234 	uint_t i;
235 
236 	for (i = 0; i < xch->xch_hashlen; i++) {
237 		for (xcp = xch->xch_hash[i]; xcp != NULL; xcp = ncp) {
238 			ncp = xcp->xc_next;
239 
240 			if (eq != NULL)
241 				fmd_dispq_delete(fmd.d_disp, eq, xcp->xc_class);
242 
243 			fmd_strfree(xcp->xc_class);
244 			fmd_free(xcp, sizeof (fmd_xprt_class_t));
245 		}
246 	}
247 
248 	fmd_free(xch->xch_hash, sizeof (void *) * xch->xch_hashlen);
249 }
250 
251 /*
252  * Insert the specified class into the specified class hash, and return the
253  * reference count.  A return value of one indicates this is the first insert.
254  * If an eventq is associated with the hash, insert a dispq subscription for it.
255  */
256 static uint_t
257 fmd_xprt_class_hash_insert(fmd_xprt_impl_t *xip,
258     fmd_xprt_class_hash_t *xch, const char *class)
259 {
260 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
261 	fmd_xprt_class_t *xcp;
262 
263 	ASSERT(MUTEX_HELD(&xip->xi_lock));
264 
265 	for (xcp = xch->xch_hash[h]; xcp != NULL; xcp = xcp->xc_next) {
266 		if (strcmp(class, xcp->xc_class) == 0)
267 			return (++xcp->xc_refs);
268 	}
269 
270 	xcp = fmd_alloc(sizeof (fmd_xprt_class_t), FMD_SLEEP);
271 	xcp->xc_class = fmd_strdup(class, FMD_SLEEP);
272 	xcp->xc_next = xch->xch_hash[h];
273 	xcp->xc_refs = 1;
274 	xch->xch_hash[h] = xcp;
275 
276 	if (xch->xch_queue != NULL)
277 		fmd_dispq_insert(fmd.d_disp, xch->xch_queue, class);
278 
279 	return (xcp->xc_refs);
280 }
281 
282 /*
283  * Delete the specified class from the specified class hash, and return the
284  * reference count.  A return value of zero indicates the class was deleted.
285  * If an eventq is associated with the hash, delete the dispq subscription.
286  */
287 static uint_t
288 fmd_xprt_class_hash_delete(fmd_xprt_impl_t *xip,
289     fmd_xprt_class_hash_t *xch, const char *class)
290 {
291 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
292 	fmd_xprt_class_t *xcp, **pp;
293 
294 	ASSERT(MUTEX_HELD(&xip->xi_lock));
295 	pp = &xch->xch_hash[h];
296 
297 	for (xcp = *pp; xcp != NULL; xcp = xcp->xc_next) {
298 		if (strcmp(class, xcp->xc_class) == 0)
299 			break;
300 		else
301 			pp = &xcp->xc_next;
302 	}
303 
304 	if (xcp == NULL)
305 		return (-1U); /* explicitly permit an invalid delete */
306 
307 	if (--xcp->xc_refs != 0)
308 		return (xcp->xc_refs);
309 
310 	ASSERT(xcp->xc_refs == 0);
311 	*pp = xcp->xc_next;
312 
313 	fmd_strfree(xcp->xc_class);
314 	fmd_free(xcp, sizeof (fmd_xprt_class_t));
315 
316 	if (xch->xch_queue != NULL)
317 		fmd_dispq_delete(fmd.d_disp, xch->xch_queue, class);
318 
319 	return (0);
320 }
321 
322 /*
323  * Queue subscribe events for the specified transport corresponding to all of
324  * the active module subscriptions.  This is an extremely heavyweight operation
325  * that we expect to take place rarely (i.e. when loading a transport module
326  * or when it establishes a connection).  We lock all of the known modules to
327  * prevent them from adding or deleting subscriptions, then snapshot their
328  * subscriptions, and then unlock all of the modules.  We hold the modhash
329  * lock for the duration of this operation to prevent new modules from loading.
330  */
331 static void
332 fmd_xprt_subscribe_modhash(fmd_xprt_impl_t *xip, fmd_modhash_t *mhp)
333 {
334 	fmd_xprt_t *xp = (fmd_xprt_t *)xip;
335 	const fmd_conf_path_t *pap;
336 	fmd_module_t *mp;
337 	uint_t i, j;
338 
339 	(void) pthread_rwlock_rdlock(&mhp->mh_lock);
340 
341 	for (i = 0; i < mhp->mh_hashlen; i++) {
342 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
343 			fmd_module_lock(mp);
344 	}
345 
346 	(void) pthread_mutex_lock(&xip->xi_lock);
347 	ASSERT(!(xip->xi_flags & FMD_XPRT_SUBSCRIBER));
348 	xip->xi_flags |= FMD_XPRT_SUBSCRIBER;
349 	(void) pthread_mutex_unlock(&xip->xi_lock);
350 
351 	for (i = 0; i < mhp->mh_hashlen; i++) {
352 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) {
353 			(void) fmd_conf_getprop(mp->mod_conf,
354 			    FMD_PROP_SUBSCRIPTIONS, &pap);
355 			for (j = 0; j < pap->cpa_argc; j++)
356 				fmd_xprt_subscribe(xp, pap->cpa_argv[j]);
357 		}
358 	}
359 
360 	for (i = 0; i < mhp->mh_hashlen; i++) {
361 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
362 			fmd_module_unlock(mp);
363 	}
364 
365 	(void) pthread_rwlock_unlock(&mhp->mh_lock);
366 }
367 
368 static void
369 fmd_xprt_transition(fmd_xprt_impl_t *xip,
370     const fmd_xprt_rule_t *state, const char *tag)
371 {
372 	fmd_xprt_t *xp = (fmd_xprt_t *)xip;
373 	fmd_event_t *e;
374 	nvlist_t *nvl;
375 	char *s;
376 
377 	TRACE((FMD_DBG_XPRT, "xprt %u -> %s\n", xip->xi_id, tag));
378 
379 	xip->xi_state = state;
380 	s = fmd_strdup(tag, FMD_SLEEP);
381 
382 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
383 	fmd_strfree(xip->xi_stats->xs_state.fmds_value.str);
384 	xip->xi_stats->xs_state.fmds_value.str = s;
385 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
386 
387 	/*
388 	 * If we've reached the SUB state, take out the big hammer and snapshot
389 	 * all of the subscriptions of all of the loaded modules.  Then queue a
390 	 * run event for our remote peer indicating that it can enter RUN.
391 	 */
392 	if (state == _fmd_xprt_state_sub) {
393 		fmd_xprt_subscribe_modhash(xip, fmd.d_mod_hash);
394 
395 		/*
396 		 * For read-write transports, we always want to set up remote
397 		 * subscriptions to the bultin list.* events, regardless of
398 		 * whether any agents have subscribed to them.
399 		 */
400 		if (xip->xi_flags & FMD_XPRT_RDWR) {
401 			fmd_xprt_subscribe(xp, FM_LIST_SUSPECT_CLASS);
402 			fmd_xprt_subscribe(xp, FM_LIST_ISOLATED_CLASS);
403 			fmd_xprt_subscribe(xp, FM_LIST_UPDATED_CLASS);
404 			fmd_xprt_subscribe(xp, FM_LIST_RESOLVED_CLASS);
405 			fmd_xprt_subscribe(xp, FM_LIST_REPAIRED_CLASS);
406 		}
407 
408 		nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
409 		    "resource.fm.xprt.run", xip->xi_version);
410 
411 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
412 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
413 		fmd_eventq_insert_at_time(xip->xi_queue, e);
414 	}
415 }
416 
417 static void
418 fmd_xprt_authupdate(fmd_xprt_impl_t *xip)
419 {
420 	char *s = fmd_fmri_auth2str(xip->xi_auth);
421 
422 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
423 	fmd_strfree(xip->xi_stats->xs_authority.fmds_value.str);
424 	xip->xi_stats->xs_authority.fmds_value.str = s;
425 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
426 }
427 
428 static int
429 fmd_xprt_vmismatch(fmd_xprt_impl_t *xip, nvlist_t *nvl, uint_t *rversionp)
430 {
431 	uint8_t rversion;
432 
433 	if (nvlist_lookup_uint8(nvl, FM_VERSION, &rversion) != 0) {
434 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
435 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
436 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
437 
438 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
439 		return (1);
440 	}
441 
442 	if (rversion > xip->xi_version) {
443 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u protocol mismatch: %u>%u\n",
444 		    xip->xi_id, rversion, xip->xi_version);
445 
446 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
447 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
448 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
449 
450 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
451 		return (1);
452 	}
453 
454 	if (rversionp != NULL)
455 		*rversionp = rversion;
456 
457 	return (0);
458 }
459 
460 void
461 fmd_xprt_event_syn(fmd_xprt_impl_t *xip, nvlist_t *nvl)
462 {
463 	fmd_event_t *e;
464 	uint_t vers;
465 	char *class;
466 
467 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
468 		return; /* transitioned to error state */
469 
470 	/*
471 	 * If the transport module didn't specify an authority, extract the
472 	 * one that is passed along with the xprt.syn event and use that.
473 	 */
474 	if (xip->xi_auth == NULL &&
475 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
476 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
477 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
478 		fmd_xprt_authupdate(xip);
479 	}
480 
481 	nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
482 	    "resource.fm.xprt.ack", xip->xi_version);
483 
484 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
485 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
486 	fmd_eventq_insert_at_time(xip->xi_queue, e);
487 
488 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
489 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
490 }
491 
492 void
493 fmd_xprt_event_ack(fmd_xprt_impl_t *xip, nvlist_t *nvl)
494 {
495 	uint_t vers;
496 
497 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
498 		return; /* transitioned to error state */
499 
500 	/*
501 	 * If the transport module didn't specify an authority, extract the
502 	 * one that is passed along with the xprt.syn event and use that.
503 	 */
504 	if (xip->xi_auth == NULL &&
505 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
506 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
507 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
508 		fmd_xprt_authupdate(xip);
509 	}
510 
511 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
512 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
513 }
514 
515 /*
516  * Upon transition to RUN, we take every solved case and resend a list.suspect
517  * event for it to our remote peer.  If a case transitions from solved to a
518  * future state (CLOSE_WAIT, CLOSED, or REPAIRED) while we are iterating over
519  * the case hash, we will get it as part of examining the resource cache, next.
520  */
521 static void
522 fmd_xprt_send_case(fmd_case_t *cp, void *arg)
523 {
524 	fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
525 	fmd_xprt_impl_t *xip = arg;
526 
527 	fmd_event_t *e;
528 	nvlist_t *nvl;
529 	char *class;
530 
531 	if (cip->ci_state == FMD_CASE_UNSOLVED)
532 		return;
533 
534 	nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS);
535 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
536 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
537 
538 	fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n",
539 	    FM_LIST_SUSPECT_CLASS, cip->ci_uuid, xip->xi_id);
540 
541 	fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid);
542 }
543 
544 void
545 fmd_xprt_event_run(fmd_xprt_impl_t *xip, nvlist_t *nvl)
546 {
547 	if (!fmd_xprt_vmismatch(xip, nvl, NULL)) {
548 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
549 		fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case, xip);
550 	}
551 }
552 
553 void
554 fmd_xprt_event_sub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
555 {
556 	char *class;
557 
558 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
559 		return; /* transitioned to error state */
560 
561 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
562 		return; /* malformed protocol event */
563 
564 	(void) pthread_mutex_lock(&xip->xi_lock);
565 	(void) fmd_xprt_class_hash_insert(xip, &xip->xi_lsub, class);
566 	(void) pthread_mutex_unlock(&xip->xi_lock);
567 
568 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
569 	xip->xi_stats->xs_subscriptions.fmds_value.ui64++;
570 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
571 }
572 
573 void
574 fmd_xprt_event_unsub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
575 {
576 	fmd_event_t *e;
577 	char *class;
578 
579 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
580 		return; /* transitioned to error state */
581 
582 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
583 		return; /* malformed protocol event */
584 
585 	(void) pthread_mutex_lock(&xip->xi_lock);
586 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_lsub, class);
587 	(void) pthread_mutex_unlock(&xip->xi_lock);
588 
589 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
590 	xip->xi_stats->xs_subscriptions.fmds_value.ui64--;
591 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
592 
593 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
594 	    "resource.fm.xprt.unsuback", xip->xi_version, class);
595 
596 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
597 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
598 	fmd_eventq_insert_at_time(xip->xi_queue, e);
599 }
600 
601 void
602 fmd_xprt_event_unsuback(fmd_xprt_impl_t *xip, nvlist_t *nvl)
603 {
604 	char *class;
605 
606 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
607 		return; /* transitioned to error state */
608 
609 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
610 		return; /* malformed protocol event */
611 
612 	(void) pthread_mutex_lock(&xip->xi_lock);
613 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_usub, class);
614 	(void) pthread_mutex_unlock(&xip->xi_lock);
615 }
616 
617 /*
618  * on diagnosing side, receive a uuclose from the proxy.
619  */
620 void
621 fmd_xprt_event_uuclose(fmd_xprt_impl_t *xip, nvlist_t *nvl)
622 {
623 	fmd_case_t *cp;
624 	char *uuid;
625 
626 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
627 		return; /* transitioned to error state */
628 
629 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
630 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
631 		/*
632 		 * update resource cache status and transition case
633 		 */
634 		fmd_case_close_status(cp);
635 		fmd_case_transition(cp, FMD_CASE_CLOSE_WAIT, FMD_CF_ISOLATED);
636 		fmd_case_rele(cp);
637 	}
638 }
639 
640 /*
641  * on diagnosing side, receive a uuresolved from the proxy.
642  */
643 void
644 fmd_xprt_event_uuresolved(fmd_xprt_impl_t *xip, nvlist_t *nvl)
645 {
646 	fmd_case_t *cp;
647 	char *uuid;
648 
649 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
650 		return; /* transitioned to error state */
651 
652 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
653 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
654 		fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
655 
656 		fmd_case_transition(cp, (cip->ci_state == FMD_CASE_REPAIRED) ?
657 		    FMD_CASE_RESOLVED : (cip->ci_state == FMD_CASE_CLOSED) ?
658 		    FMD_CASE_REPAIRED : FMD_CASE_CLOSE_WAIT, FMD_CF_RESOLVED);
659 		fmd_case_rele(cp);
660 	}
661 }
662 
663 /*
664  * on diagnosing side, receive a repair/acquit from the proxy.
665  */
666 void
667 fmd_xprt_event_updated(fmd_xprt_impl_t *xip, nvlist_t *nvl)
668 {
669 	fmd_case_t *cp;
670 	char *uuid;
671 
672 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
673 		return; /* transitioned to error state */
674 
675 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
676 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
677 		uint8_t *statusp, *proxy_asrup = NULL;
678 		uint_t nelem = 0;
679 
680 		/*
681 		 * Only update status with new repairs if "no remote repair"
682 		 * is not set. Do the case_update anyway though (as this will
683 		 * refresh the status on the proxy side).
684 		 */
685 		if (!(xip->xi_flags & FMD_XPRT_NO_REMOTE_REPAIR)) {
686 			if (nvlist_lookup_uint8_array(nvl,
687 			    FM_RSRC_XPRT_FAULT_STATUS, &statusp, &nelem) == 0 &&
688 			    nelem != 0) {
689 				(void) nvlist_lookup_uint8_array(nvl,
690 				    FM_RSRC_XPRT_FAULT_HAS_ASRU, &proxy_asrup,
691 				    &nelem);
692 				fmd_case_update_status(cp, statusp,
693 				    proxy_asrup, NULL);
694 			}
695 			fmd_case_update_containees(cp);
696 		}
697 		fmd_case_update(cp);
698 		fmd_case_rele(cp);
699 	}
700 }
701 
702 void
703 fmd_xprt_event_error(fmd_xprt_impl_t *xip, nvlist_t *nvl)
704 {
705 	char *class = "<unknown>";
706 
707 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
708 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
709 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
710 
711 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
712 	TRACE((FMD_DBG_XPRT, "xprt %u bad event %s\n", xip->xi_id, class));
713 
714 	fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
715 }
716 
717 void
718 fmd_xprt_event_drop(fmd_xprt_impl_t *xip, nvlist_t *nvl)
719 {
720 	char *class = "<unknown>";
721 
722 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
723 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
724 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
725 
726 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
727 	TRACE((FMD_DBG_XPRT, "xprt %u drop event %s\n", xip->xi_id, class));
728 
729 }
730 
731 fmd_xprt_t *
732 fmd_xprt_create(fmd_module_t *mp, uint_t flags, nvlist_t *auth, void *data)
733 {
734 	fmd_xprt_impl_t *xip = fmd_zalloc(sizeof (fmd_xprt_impl_t), FMD_SLEEP);
735 	fmd_stat_t *statv;
736 	uint_t i, statc;
737 
738 	char buf[PATH_MAX];
739 	fmd_event_t *e;
740 	nvlist_t *nvl;
741 	char *s;
742 
743 	(void) pthread_mutex_init(&xip->xi_lock, NULL);
744 	(void) pthread_cond_init(&xip->xi_cv, NULL);
745 	(void) pthread_mutex_init(&xip->xi_stats_lock, NULL);
746 
747 	xip->xi_auth = auth;
748 	xip->xi_data = data;
749 	xip->xi_version = FM_RSRC_XPRT_VERSION;
750 	xip->xi_flags = flags;
751 
752 	/*
753 	 * Grab fmd.d_xprt_lock to block fmd_xprt_suspend_all() and then create
754 	 * a transport ID and make it visible in fmd.d_xprt_ids.  If transports
755 	 * were previously suspended, set the FMD_XPRT_DSUSPENDED flag on us to
756 	 * ensure that this transport will not run until fmd_xprt_resume_all().
757 	 */
758 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
759 	xip->xi_id = fmd_idspace_alloc(fmd.d_xprt_ids, xip);
760 
761 	if (fmd.d_xprt_suspend != 0)
762 		xip->xi_flags |= FMD_XPRT_DSUSPENDED;
763 
764 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
765 
766 	/*
767 	 * If the module has not yet finished _fmd_init(), set the ISUSPENDED
768 	 * bit so that fmdo_send() is not called until _fmd_init() completes.
769 	 */
770 	if (!(mp->mod_flags & FMD_MOD_INIT))
771 		xip->xi_flags |= FMD_XPRT_ISUSPENDED;
772 
773 	/*
774 	 * Initialize the transport statistics that we keep on behalf of fmd.
775 	 * These are set up using a template defined at the top of this file.
776 	 * We rename each statistic with a prefix ensuring its uniqueness.
777 	 */
778 	statc = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
779 	statv = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
780 	bcopy(&_fmd_xprt_stat_tmpl, statv, sizeof (_fmd_xprt_stat_tmpl));
781 
782 	for (i = 0; i < statc; i++) {
783 		(void) snprintf(statv[i].fmds_name,
784 		    sizeof (statv[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
785 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
786 	}
787 
788 	xip->xi_stats = (fmd_xprt_stat_t *)fmd_ustat_insert(
789 	    mp->mod_ustat, FMD_USTAT_NOALLOC, statc, statv, NULL);
790 
791 	if (xip->xi_stats == NULL)
792 		fmd_panic("failed to create xi_stats (%p)\n", (void *)statv);
793 
794 	xip->xi_stats->xs_module.fmds_value.str =
795 	    fmd_strdup(mp->mod_name, FMD_SLEEP);
796 
797 	if (xip->xi_auth != NULL)
798 		fmd_xprt_authupdate(xip);
799 
800 	/*
801 	 * Create the outbound eventq for this transport and link to its stats.
802 	 * If any suspend bits were set above, suspend the eventq immediately.
803 	 */
804 	xip->xi_queue = fmd_eventq_create(mp, &xip->xi_stats->xs_evqstat,
805 	    &xip->xi_stats_lock, mp->mod_stats->ms_xprtqlimit.fmds_value.ui32);
806 
807 	if (xip->xi_flags & FMD_XPRT_SMASK)
808 		fmd_eventq_suspend(xip->xi_queue);
809 
810 	/*
811 	 * Create our subscription hashes: local subscriptions go to xi_queue,
812 	 * remote subscriptions are tracked only for protocol requests, and
813 	 * pending unsubscriptions are associated with the /dev/null eventq.
814 	 */
815 	fmd_xprt_class_hash_create(&xip->xi_lsub, xip->xi_queue);
816 	fmd_xprt_class_hash_create(&xip->xi_rsub, NULL);
817 	fmd_xprt_class_hash_create(&xip->xi_usub, fmd.d_rmod->mod_queue);
818 
819 	/*
820 	 * Determine our initial state based upon the creation flags.  If we're
821 	 * read-only, go directly to RUN.  If we're accepting a new connection,
822 	 * wait for a SYN.  Otherwise send a SYN and wait for an ACK.
823 	 */
824 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
825 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
826 	else if (flags & FMD_XPRT_ACCEPT)
827 		fmd_xprt_transition(xip, _fmd_xprt_state_syn, "SYN");
828 	else
829 		fmd_xprt_transition(xip, _fmd_xprt_state_ack, "ACK");
830 
831 	/*
832 	 * If client.xprtlog is set to TRUE, create a debugging log for the
833 	 * events received by the transport in var/fm/fmd/xprt/.
834 	 */
835 	(void) fmd_conf_getprop(fmd.d_conf, "client.xprtlog", &i);
836 	(void) fmd_conf_getprop(fmd.d_conf, "log.xprt", &s);
837 
838 	if (i) {
839 		(void) snprintf(buf, sizeof (buf), "%s/%u.log", s, xip->xi_id);
840 		xip->xi_log = fmd_log_open(fmd.d_rootdir, buf, FMD_LOG_XPRT);
841 	}
842 
843 	ASSERT(fmd_module_locked(mp));
844 	fmd_list_append(&mp->mod_transports, xip);
845 
846 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
847 	mp->mod_stats->ms_xprtopen.fmds_value.ui32++;
848 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
849 
850 	/*
851 	 * If this is a read-only transport, return without creating a send
852 	 * queue thread and setting up any connection events in our queue.
853 	 */
854 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
855 		goto out;
856 
857 	/*
858 	 * Once the transport is fully initialized, create a send queue thread
859 	 * and start any connect events flowing to complete our initialization.
860 	 */
861 	if ((xip->xi_thread = fmd_thread_create(mp,
862 	    (fmd_thread_f *)fmd_xprt_send, xip)) == NULL) {
863 
864 		fmd_error(EFMD_XPRT_THR,
865 		    "failed to create thread for transport %u", xip->xi_id);
866 
867 		fmd_xprt_destroy((fmd_xprt_t *)xip);
868 		(void) fmd_set_errno(EFMD_XPRT_THR);
869 		return (NULL);
870 	}
871 
872 	/*
873 	 * If the transport is not being opened to accept an inbound connect,
874 	 * start an outbound connection by enqueuing a SYN event for our peer.
875 	 */
876 	if (!(flags & FMD_XPRT_ACCEPT)) {
877 		nvl = fmd_protocol_xprt_ctl(mp,
878 		    "resource.fm.xprt.syn", FM_RSRC_XPRT_VERSION);
879 
880 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
881 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
882 		fmd_eventq_insert_at_time(xip->xi_queue, e);
883 	}
884 out:
885 	fmd_dprintf(FMD_DBG_XPRT, "opened transport %u\n", xip->xi_id);
886 	return ((fmd_xprt_t *)xip);
887 }
888 
889 void
890 fmd_xprt_destroy(fmd_xprt_t *xp)
891 {
892 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
893 	fmd_module_t *mp = xip->xi_queue->eq_mod;
894 	uint_t id = xip->xi_id;
895 
896 	fmd_case_impl_t *cip, *nip;
897 	fmd_stat_t *sp;
898 	uint_t i, n;
899 
900 	ASSERT(fmd_module_locked(mp));
901 	fmd_list_delete(&mp->mod_transports, xip);
902 
903 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
904 	mp->mod_stats->ms_xprtopen.fmds_value.ui32--;
905 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
906 
907 	(void) pthread_mutex_lock(&xip->xi_lock);
908 
909 	while (xip->xi_busy != 0)
910 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
911 
912 	/*
913 	 * Remove the transport from global visibility, cancel its send-side
914 	 * thread, join with it, and then remove the transport from module
915 	 * visibility.  Once all this is done, destroy and free the transport.
916 	 */
917 	(void) fmd_idspace_free(fmd.d_xprt_ids, xip->xi_id);
918 
919 	if (xip->xi_thread != NULL) {
920 		fmd_eventq_abort(xip->xi_queue);
921 		fmd_module_unlock(mp);
922 		fmd_thread_destroy(xip->xi_thread, FMD_THREAD_JOIN);
923 		fmd_module_lock(mp);
924 	}
925 
926 	if (xip->xi_log != NULL)
927 		fmd_log_rele(xip->xi_log);
928 
929 	/*
930 	 * Release every case handle in the module that was cached by this
931 	 * transport.  This will result in these cases disappearing from the
932 	 * local case hash so that fmd_case_uuclose() and fmd_case_repaired()
933 	 * etc can no longer be used.
934 	 */
935 	for (cip = fmd_list_next(&mp->mod_cases); cip != NULL; cip = nip) {
936 		nip = fmd_list_next(cip);
937 		if (cip->ci_xprt == xp)
938 			fmd_case_discard((fmd_case_t *)cip, B_TRUE);
939 	}
940 
941 	/*
942 	 * Destroy every class in the various subscription hashes and remove
943 	 * any corresponding subscriptions from the event dispatch queue.
944 	 */
945 	fmd_xprt_class_hash_destroy(&xip->xi_lsub);
946 	fmd_xprt_class_hash_destroy(&xip->xi_rsub);
947 	fmd_xprt_class_hash_destroy(&xip->xi_usub);
948 
949 	/*
950 	 * Uniquify the stat names exactly as was done in fmd_xprt_create()
951 	 * before calling fmd_ustat_insert(), otherwise fmd_ustat_delete()
952 	 * won't find the entries in the hash table.
953 	 */
954 	n = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
955 	sp = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
956 	bcopy(&_fmd_xprt_stat_tmpl, sp, sizeof (_fmd_xprt_stat_tmpl));
957 	for (i = 0; i < n; i++) {
958 		(void) snprintf(sp[i].fmds_name,
959 		    sizeof (sp[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
960 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
961 	}
962 	fmd_ustat_delete(mp->mod_ustat, n, sp);
963 	fmd_free(sp, sizeof (_fmd_xprt_stat_tmpl));
964 
965 	fmd_free(xip->xi_stats, sizeof (fmd_xprt_stat_t));
966 	fmd_eventq_destroy(xip->xi_queue);
967 	nvlist_free(xip->xi_auth);
968 	fmd_free(xip, sizeof (fmd_xprt_impl_t));
969 
970 	fmd_dprintf(FMD_DBG_XPRT, "closed transport %u\n", id);
971 }
972 
973 void
974 fmd_xprt_xsuspend(fmd_xprt_t *xp, uint_t flags)
975 {
976 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
977 	uint_t oflags;
978 
979 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
980 	(void) pthread_mutex_lock(&xip->xi_lock);
981 
982 	oflags = xip->xi_flags;
983 	xip->xi_flags |= flags;
984 
985 	if (!(oflags & FMD_XPRT_SMASK) && (xip->xi_flags & FMD_XPRT_SMASK) != 0)
986 		fmd_eventq_suspend(xip->xi_queue);
987 
988 	(void) pthread_cond_broadcast(&xip->xi_cv);
989 
990 	while (xip->xi_busy != 0)
991 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
992 
993 	(void) pthread_mutex_unlock(&xip->xi_lock);
994 }
995 
996 void
997 fmd_xprt_xresume(fmd_xprt_t *xp, uint_t flags)
998 {
999 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1000 	uint_t oflags;
1001 
1002 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
1003 	(void) pthread_mutex_lock(&xip->xi_lock);
1004 
1005 	oflags = xip->xi_flags;
1006 	xip->xi_flags &= ~flags;
1007 
1008 	if ((oflags & FMD_XPRT_SMASK) != 0 && !(xip->xi_flags & FMD_XPRT_SMASK))
1009 		fmd_eventq_resume(xip->xi_queue);
1010 
1011 	(void) pthread_cond_broadcast(&xip->xi_cv);
1012 	(void) pthread_mutex_unlock(&xip->xi_lock);
1013 }
1014 
1015 void
1016 fmd_xprt_send(fmd_xprt_t *xp)
1017 {
1018 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1019 	fmd_module_t *mp = xip->xi_queue->eq_mod;
1020 	fmd_event_t *ep;
1021 	int err;
1022 
1023 	while ((ep = fmd_eventq_delete(xip->xi_queue)) != NULL) {
1024 		if (FMD_EVENT_TTL(ep) == 0) {
1025 			fmd_event_rele(ep);
1026 			continue;
1027 		}
1028 
1029 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u sending %s\n",
1030 		    xip->xi_id, (char *)FMD_EVENT_DATA(ep));
1031 
1032 		err = mp->mod_ops->mop_transport(mp, xp, ep);
1033 		fmd_eventq_done(xip->xi_queue);
1034 
1035 		if (err == FMD_SEND_RETRY) {
1036 			fmd_eventq_insert_at_time(xip->xi_queue, ep);
1037 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1038 			xip->xi_stats->xs_retried.fmds_value.ui64++;
1039 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1040 		}
1041 
1042 		if (err != FMD_SEND_SUCCESS && err != FMD_SEND_RETRY) {
1043 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1044 			xip->xi_stats->xs_lost.fmds_value.ui64++;
1045 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1046 		}
1047 
1048 		fmd_event_rele(ep);
1049 	}
1050 }
1051 
1052 /*
1053  * This function creates a local suspect list. This is used when a suspect list
1054  * is created directly by an external source like fminject.
1055  */
1056 static void
1057 fmd_xprt_list_suspect_local(fmd_xprt_t *xp, nvlist_t *nvl)
1058 {
1059 	nvlist_t **nvlp;
1060 	nvlist_t *de_fmri, *de_fmri_dup = NULL;
1061 	int64_t *diag_time;
1062 	char *code = NULL;
1063 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1064 	fmd_case_t *cp;
1065 	uint_t nelem = 0, nelem2 = 0, i;
1066 
1067 	fmd_module_lock(xip->xi_queue->eq_mod);
1068 	cp = fmd_case_create(xip->xi_queue->eq_mod, NULL);
1069 	if (cp == NULL) {
1070 		fmd_module_unlock(xip->xi_queue->eq_mod);
1071 		return;
1072 	}
1073 
1074 	/*
1075 	 * copy diag_code if present
1076 	 */
1077 	(void) nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code);
1078 	if (code != NULL) {
1079 		fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1080 
1081 		cip->ci_precanned = 1;
1082 		fmd_case_setcode(cp, code);
1083 	}
1084 
1085 	/*
1086 	 * copy suspects
1087 	 */
1088 	(void) nvlist_lookup_nvlist_array(nvl, FM_SUSPECT_FAULT_LIST, &nvlp,
1089 	    &nelem);
1090 	for (i = 0; i < nelem; i++) {
1091 		nvlist_t *flt_copy, *asru = NULL, *fru = NULL, *rsrc = NULL;
1092 		topo_hdl_t *thp;
1093 		char *loc = NULL;
1094 		int err;
1095 
1096 		thp = fmd_fmri_topo_hold(TOPO_VERSION);
1097 		(void) nvlist_xdup(nvlp[i], &flt_copy, &fmd.d_nva);
1098 		(void) nvlist_lookup_nvlist(nvlp[i], FM_FAULT_RESOURCE, &rsrc);
1099 
1100 		/*
1101 		 * If no fru specified, get it from topo
1102 		 */
1103 		if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_FRU, &fru) != 0 &&
1104 		    rsrc && topo_fmri_fru(thp, rsrc, &fru, &err) == 0)
1105 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_FRU, fru);
1106 		/*
1107 		 * If no asru specified, get it from topo
1108 		 */
1109 		if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU, &asru) != 0 &&
1110 		    rsrc && topo_fmri_asru(thp, rsrc, &asru, &err) == 0)
1111 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_ASRU, asru);
1112 		/*
1113 		 * If no location specified, get it from topo
1114 		 */
1115 		if (nvlist_lookup_string(nvlp[i], FM_FAULT_LOCATION,
1116 		    &loc) != 0) {
1117 			if (fru && topo_fmri_label(thp, fru, &loc, &err) == 0)
1118 				(void) nvlist_add_string(flt_copy,
1119 				    FM_FAULT_LOCATION, loc);
1120 			else if (rsrc && topo_fmri_label(thp, rsrc, &loc,
1121 			    &err) == 0)
1122 				(void) nvlist_add_string(flt_copy,
1123 				    FM_FAULT_LOCATION, loc);
1124 			if (loc)
1125 				topo_hdl_strfree(thp, loc);
1126 		}
1127 		if (fru)
1128 			nvlist_free(fru);
1129 		if (asru)
1130 			nvlist_free(asru);
1131 		if (rsrc)
1132 			nvlist_free(rsrc);
1133 		fmd_fmri_topo_rele(thp);
1134 		fmd_case_insert_suspect(cp, flt_copy);
1135 	}
1136 
1137 	/*
1138 	 * copy diag_time if present
1139 	 */
1140 	if (nvlist_lookup_int64_array(nvl, FM_SUSPECT_DIAG_TIME, &diag_time,
1141 	    &nelem2) == 0 && nelem2 >= 2)
1142 		fmd_case_settime(cp, diag_time[0], diag_time[1]);
1143 
1144 	/*
1145 	 * copy DE fmri if present
1146 	 */
1147 	if (nvlist_lookup_nvlist(nvl, FM_SUSPECT_DE, &de_fmri) == 0) {
1148 		(void) nvlist_xdup(de_fmri, &de_fmri_dup, &fmd.d_nva);
1149 		fmd_case_set_de_fmri(cp, de_fmri_dup);
1150 	}
1151 
1152 	fmd_case_transition(cp, FMD_CASE_SOLVED, FMD_CF_SOLVED);
1153 	fmd_module_unlock(xip->xi_queue->eq_mod);
1154 }
1155 
1156 /*
1157  * This function is called to create a proxy case on receipt of a list.suspect
1158  * from the diagnosing side of the transport.
1159  */
1160 static void
1161 fmd_xprt_list_suspect(fmd_xprt_t *xp, nvlist_t *nvl)
1162 {
1163 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1164 	nvlist_t **nvlp;
1165 	uint_t nelem = 0, nelem2 = 0, i;
1166 	int64_t *diag_time;
1167 	topo_hdl_t *thp;
1168 	char *class;
1169 	nvlist_t *rsrc, *asru, *de_fmri, *de_fmri_dup = NULL;
1170 	nvlist_t *flt_copy;
1171 	int err;
1172 	nvlist_t **asrua;
1173 	uint8_t *proxy_asru = NULL;
1174 	int got_proxy_asru = 0;
1175 	int got_hc_rsrc = 0;
1176 	int got_present_rsrc = 0;
1177 	uint8_t *diag_asru = NULL;
1178 	char *scheme;
1179 	uint8_t *statusp;
1180 	char *uuid, *code;
1181 	fmd_case_t *cp;
1182 	fmd_case_impl_t *cip;
1183 	int need_update = 0;
1184 
1185 	if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) != 0)
1186 		return;
1187 	if (nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code) != 0)
1188 		return;
1189 	(void) nvlist_lookup_nvlist_array(nvl, FM_SUSPECT_FAULT_LIST, &nvlp,
1190 	    &nelem);
1191 
1192 	/*
1193 	 * In order to implement FMD_XPRT_HCONLY and FMD_XPRT_HC_PRESENT_ONLY
1194 	 * etc we first scan the suspects to see if
1195 	 * - there was an asru in the received fault
1196 	 * - there was an hc-scheme resource in the received fault
1197 	 * - any hc-scheme resource in the received fault is present in the
1198 	 *   local topology
1199 	 * - any hc-scheme resource in the received fault has an asru in the
1200 	 *   local topology
1201 	 */
1202 	if (nelem > 0) {
1203 		asrua = fmd_zalloc(sizeof (nvlist_t *) * nelem, FMD_SLEEP);
1204 		proxy_asru = fmd_zalloc(sizeof (uint8_t) * nelem, FMD_SLEEP);
1205 		diag_asru = fmd_zalloc(sizeof (uint8_t) * nelem, FMD_SLEEP);
1206 		thp = fmd_fmri_topo_hold(TOPO_VERSION);
1207 		for (i = 0; i < nelem; i++) {
1208 			if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU,
1209 			    &asru) == 0 && asru != NULL)
1210 				diag_asru[i] = 1;
1211 			if (nvlist_lookup_string(nvlp[i], FM_CLASS,
1212 			    &class) != 0 || strncmp(class, "fault", 5) != 0)
1213 				continue;
1214 			/*
1215 			 * If there is an hc-scheme asru, use that to find the
1216 			 * real asru. Otherwise if there is an hc-scheme
1217 			 * resource, work out the old asru from that.
1218 			 * This order is to allow a two stage evaluation
1219 			 * of the asru where a fault in the diagnosing side
1220 			 * is in a component not visible to the proxy side,
1221 			 * but prevents a component that is visible from
1222 			 * working. So the diagnosing side sets the asru to
1223 			 * the latter component (in hc-scheme as the diagnosing
1224 			 * side doesn't know about the proxy side's virtual
1225 			 * schemes), and then the proxy side can convert that
1226 			 * to a suitable virtual scheme asru.
1227 			 */
1228 			if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU,
1229 			    &asru) == 0 && asru != NULL &&
1230 			    nvlist_lookup_string(asru, FM_FMRI_SCHEME,
1231 			    &scheme) == 0 &&
1232 			    strcmp(scheme, FM_FMRI_SCHEME_HC) == 0) {
1233 				got_hc_rsrc = 1;
1234 				if (xip->xi_flags & FMD_XPRT_EXTERNAL)
1235 					continue;
1236 				if (topo_fmri_present(thp, asru, &err) != 0)
1237 					got_present_rsrc = 1;
1238 				if (topo_fmri_asru(thp, asru, &asrua[i],
1239 				    &err) == 0) {
1240 					proxy_asru[i] =
1241 					    FMD_PROXY_ASRU_FROM_ASRU;
1242 					got_proxy_asru = 1;
1243 				}
1244 			} else if (nvlist_lookup_nvlist(nvlp[i],
1245 			    FM_FAULT_RESOURCE, &rsrc) == 0 && rsrc != NULL &&
1246 			    nvlist_lookup_string(rsrc, FM_FMRI_SCHEME,
1247 			    &scheme) == 0 &&
1248 			    strcmp(scheme, FM_FMRI_SCHEME_HC) == 0) {
1249 				got_hc_rsrc = 1;
1250 				if (xip->xi_flags & FMD_XPRT_EXTERNAL)
1251 					continue;
1252 				if (topo_fmri_present(thp, rsrc, &err) != 0)
1253 					got_present_rsrc = 1;
1254 				if (topo_fmri_asru(thp, rsrc, &asrua[i],
1255 				    &err) == 0) {
1256 					proxy_asru[i] =
1257 					    FMD_PROXY_ASRU_FROM_RSRC;
1258 					got_proxy_asru = 1;
1259 				}
1260 			}
1261 		}
1262 		fmd_fmri_topo_rele(thp);
1263 	}
1264 
1265 	/*
1266 	 * If we're set up only to report hc-scheme faults, and
1267 	 * there aren't any, then just drop the event.
1268 	 */
1269 	if (got_hc_rsrc == 0 && (xip->xi_flags & FMD_XPRT_HCONLY)) {
1270 		if (nelem > 0) {
1271 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1272 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1273 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1274 		}
1275 		return;
1276 	}
1277 
1278 	/*
1279 	 * If we're set up only to report locally present hc-scheme
1280 	 * faults, and there aren't any, then just drop the event.
1281 	 */
1282 	if (got_present_rsrc == 0 &&
1283 	    (xip->xi_flags & FMD_XPRT_HC_PRESENT_ONLY)) {
1284 		if (nelem > 0) {
1285 			for (i = 0; i < nelem; i++)
1286 				if (asrua[i])
1287 					nvlist_free(asrua[i]);
1288 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1289 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1290 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1291 		}
1292 		return;
1293 	}
1294 
1295 	/*
1296 	 * If fmd_case_recreate() returns NULL, UUID is already known.
1297 	 */
1298 	fmd_module_lock(xip->xi_queue->eq_mod);
1299 	if ((cp = fmd_case_recreate(xip->xi_queue->eq_mod, xp,
1300 	    FMD_CASE_UNSOLVED, uuid, code)) == NULL) {
1301 		if (nelem > 0) {
1302 			for (i = 0; i < nelem; i++)
1303 				if (asrua[i])
1304 					nvlist_free(asrua[i]);
1305 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1306 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1307 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1308 		}
1309 		fmd_module_unlock(xip->xi_queue->eq_mod);
1310 		return;
1311 	}
1312 
1313 	cip = (fmd_case_impl_t *)cp;
1314 	cip->ci_diag_asru = diag_asru;
1315 	cip->ci_proxy_asru = proxy_asru;
1316 	for (i = 0; i < nelem; i++) {
1317 		(void) nvlist_xdup(nvlp[i], &flt_copy, &fmd.d_nva);
1318 		if (proxy_asru[i] != FMD_PROXY_ASRU_NOT_NEEDED) {
1319 			/*
1320 			 * Copy suspects, but remove/replace asru first. Also if
1321 			 * the original asru was hc-scheme use that as resource.
1322 			 */
1323 			if (proxy_asru[i] == FMD_PROXY_ASRU_FROM_ASRU) {
1324 				(void) nvlist_remove(flt_copy,
1325 				    FM_FAULT_RESOURCE, DATA_TYPE_NVLIST);
1326 				(void) nvlist_lookup_nvlist(flt_copy,
1327 				    FM_FAULT_ASRU, &asru);
1328 				(void) nvlist_add_nvlist(flt_copy,
1329 				    FM_FAULT_RESOURCE, asru);
1330 			}
1331 			(void) nvlist_remove(flt_copy, FM_FAULT_ASRU,
1332 			    DATA_TYPE_NVLIST);
1333 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_ASRU,
1334 			    asrua[i]);
1335 			nvlist_free(asrua[i]);
1336 		} else if (nvlist_lookup_nvlist(flt_copy, FM_FAULT_ASRU,
1337 		    &asru) == 0 && asru != NULL) {
1338 			/*
1339 			 * keep asru from diag side, but but mark as no retire
1340 			 */
1341 			(void) nvlist_add_boolean_value(flt_copy,
1342 			    FM_SUSPECT_RETIRE, B_FALSE);
1343 		}
1344 		fmd_case_insert_suspect(cp, flt_copy);
1345 	}
1346 	/*
1347 	 * copy diag_time
1348 	 */
1349 	if (nvlist_lookup_int64_array(nvl, FM_SUSPECT_DIAG_TIME, &diag_time,
1350 	    &nelem2) == 0 && nelem2 >= 2)
1351 		fmd_case_settime(cp, diag_time[0], diag_time[1]);
1352 	/*
1353 	 * copy DE fmri
1354 	 */
1355 	if (nvlist_lookup_nvlist(nvl, FM_SUSPECT_DE, &de_fmri) == 0) {
1356 		(void) nvlist_xdup(de_fmri, &de_fmri_dup, &fmd.d_nva);
1357 		fmd_case_set_de_fmri(cp, de_fmri_dup);
1358 	}
1359 
1360 	/*
1361 	 * Transition to solved. This will log the suspect list and create
1362 	 * the resource cache entries.
1363 	 */
1364 	fmd_case_transition(cp, FMD_CASE_SOLVED, FMD_CF_SOLVED);
1365 
1366 	/*
1367 	 * Update status if it is not simply "all faulty" (can happen if
1368 	 * list.suspects are being re-sent when the transport has reconnected).
1369 	 */
1370 	(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS, &statusp,
1371 	    &nelem);
1372 	for (i = 0; i < nelem; i++) {
1373 		if ((statusp[i] & (FM_SUSPECT_FAULTY | FM_SUSPECT_UNUSABLE |
1374 		    FM_SUSPECT_NOT_PRESENT | FM_SUSPECT_DEGRADED)) !=
1375 		    FM_SUSPECT_FAULTY)
1376 			need_update = 1;
1377 	}
1378 	if (need_update) {
1379 		fmd_case_update_status(cp, statusp, cip->ci_proxy_asru,
1380 		    cip->ci_diag_asru);
1381 		fmd_case_update_containees(cp);
1382 		fmd_case_update(cp);
1383 	}
1384 
1385 	/*
1386 	 * if asru on proxy side, send an update back to the diagnosing side to
1387 	 * update UNUSABLE/DEGRADED.
1388 	 */
1389 	if (got_proxy_asru)
1390 		fmd_case_xprt_updated(cp);
1391 
1392 	if (nelem > 0)
1393 		fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1394 	fmd_module_unlock(xip->xi_queue->eq_mod);
1395 }
1396 
1397 void
1398 fmd_xprt_recv(fmd_xprt_t *xp, nvlist_t *nvl, hrtime_t hrt, boolean_t logonly)
1399 {
1400 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1401 	const fmd_xprt_rule_t *xrp;
1402 	fmd_t *dp = &fmd;
1403 
1404 	fmd_event_t *e;
1405 	char *class, *uuid;
1406 	boolean_t isproto, isereport;
1407 
1408 	uint64_t *tod;
1409 	uint8_t ttl;
1410 	uint_t n;
1411 	fmd_case_t *cp;
1412 
1413 	/*
1414 	 * Grab the transport lock and set the busy flag to indicate we are
1415 	 * busy receiving an event.  If [DI]SUSPEND is pending, wait until fmd
1416 	 * resumes the transport before continuing on with the receive.
1417 	 */
1418 	(void) pthread_mutex_lock(&xip->xi_lock);
1419 
1420 	while (xip->xi_flags & (FMD_XPRT_DSUSPENDED | FMD_XPRT_ISUSPENDED)) {
1421 
1422 		if (fmd.d_signal != 0) {
1423 			(void) pthread_mutex_unlock(&xip->xi_lock);
1424 			return; /* fmd_destroy() is in progress */
1425 		}
1426 
1427 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
1428 	}
1429 
1430 	xip->xi_busy++;
1431 	ASSERT(xip->xi_busy != 0);
1432 
1433 	(void) pthread_mutex_unlock(&xip->xi_lock);
1434 
1435 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
1436 	xip->xi_stats->xs_received.fmds_value.ui64++;
1437 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1438 
1439 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) {
1440 		fmd_error(EFMD_XPRT_PAYLOAD, "discarding nvlist %p: missing "
1441 		    "required \"%s\" payload element", (void *)nvl, FM_CLASS);
1442 
1443 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1444 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1445 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1446 
1447 		nvlist_free(nvl);
1448 		goto done;
1449 	}
1450 
1451 	fmd_dprintf(FMD_DBG_XPRT, "xprt %u %s %s\n", xip->xi_id,
1452 	    ((logonly == FMD_B_TRUE) ? "logging" : "posting"), class);
1453 
1454 	isereport = (strncmp(class, FM_EREPORT_CLASS,
1455 	    sizeof (FM_EREPORT_CLASS - 1)) == 0) ? FMD_B_TRUE : FMD_B_FALSE;
1456 
1457 	/*
1458 	 * The logonly flag should only be set for ereports.
1459 	 */
1460 	if ((logonly == FMD_B_TRUE) && (isereport == FMD_B_FALSE)) {
1461 		fmd_error(EFMD_XPRT_INVAL, "discarding nvlist %p: "
1462 		    "logonly flag is not valid for class %s",
1463 		    (void *)nvl, class);
1464 
1465 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1466 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1467 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1468 
1469 		nvlist_free(nvl);
1470 		goto done;
1471 	}
1472 
1473 	/*
1474 	 * If a time-to-live value is present in the event and is zero, drop
1475 	 * the event and bump xs_timeouts.  Otherwise decrement the TTL value.
1476 	 */
1477 	if (nvlist_lookup_uint8(nvl, FMD_EVN_TTL, &ttl) == 0) {
1478 		if (ttl == 0) {
1479 			fmd_dprintf(FMD_DBG_XPRT, "xprt %u nvlist %p (%s) "
1480 			    "timeout: event received with ttl=0\n",
1481 			    xip->xi_id, (void *)nvl, class);
1482 
1483 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1484 			xip->xi_stats->xs_timeouts.fmds_value.ui64++;
1485 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1486 
1487 			nvlist_free(nvl);
1488 			goto done;
1489 		}
1490 		(void) nvlist_remove(nvl, FMD_EVN_TTL, DATA_TYPE_UINT8);
1491 		(void) nvlist_add_uint8(nvl, FMD_EVN_TTL, ttl - 1);
1492 	}
1493 
1494 	/*
1495 	 * If we are using the native system clock, the underlying transport
1496 	 * code can provide a tighter event time bound by telling us when the
1497 	 * event was enqueued.  If we're using simulated clocks, this time
1498 	 * has no meaning to us, so just reset the value to use HRT_NOW.
1499 	 */
1500 	if (dp->d_clockops != &fmd_timeops_native)
1501 		hrt = FMD_HRT_NOW;
1502 
1503 	/*
1504 	 * If an event's class is in the FMD_CTL_CLASS family, then create a
1505 	 * control event.  If a FMD_EVN_TOD member is found, create a protocol
1506 	 * event using this time.  Otherwise create a protocol event using hrt.
1507 	 */
1508 	isproto = (strncmp(class, FMD_CTL_CLASS, FMD_CTL_CLASS_LEN) == 0) ?
1509 	    FMD_B_FALSE : FMD_B_TRUE;
1510 	if (isproto == FMD_B_FALSE)
1511 		e = fmd_event_create(FMD_EVT_CTL, hrt, nvl, fmd_ctl_init(nvl));
1512 	else if (nvlist_lookup_uint64_array(nvl, FMD_EVN_TOD, &tod, &n) != 0)
1513 		e = fmd_event_create(FMD_EVT_PROTOCOL, hrt, nvl, class);
1514 	else {
1515 		e = fmd_event_recreate(FMD_EVT_PROTOCOL,
1516 		    NULL, nvl, class, NULL, 0, 0);
1517 	}
1518 
1519 	/*
1520 	 * If the debug log is enabled, create a temporary event, log it to the
1521 	 * debug log, and then reset the underlying state of the event.
1522 	 */
1523 	if (xip->xi_log != NULL) {
1524 		fmd_event_impl_t *ep = (fmd_event_impl_t *)e;
1525 
1526 		fmd_log_append(xip->xi_log, e, NULL);
1527 
1528 		ep->ev_flags |= FMD_EVF_VOLATILE;
1529 		ep->ev_off = 0;
1530 		ep->ev_len = 0;
1531 
1532 		if (ep->ev_log != NULL) {
1533 			fmd_log_rele(ep->ev_log);
1534 			ep->ev_log = NULL;
1535 		}
1536 	}
1537 
1538 	/*
1539 	 * Iterate over the rules for the current state trying to match the
1540 	 * event class to one of our special rules.  If a rule is matched, the
1541 	 * event is consumed and not dispatched to other modules.  If the rule
1542 	 * set ends without matching an event, we fall through to dispatching.
1543 	 */
1544 	for (xrp = xip->xi_state; xrp->xr_class != NULL; xrp++) {
1545 		if (fmd_event_match(e, FMD_EVT_PROTOCOL, xrp->xr_class)) {
1546 			fmd_event_hold(e);
1547 			xrp->xr_func(xip, nvl);
1548 			fmd_event_rele(e);
1549 			goto done;
1550 		}
1551 	}
1552 
1553 	/*
1554 	 * Record the event in the errlog if it is an ereport.  This code will
1555 	 * be replaced later with a per-transport intent log instead.
1556 	 */
1557 	if (isereport == FMD_B_TRUE) {
1558 		(void) pthread_rwlock_rdlock(&dp->d_log_lock);
1559 		fmd_log_append(dp->d_errlog, e, NULL);
1560 		(void) pthread_rwlock_unlock(&dp->d_log_lock);
1561 	}
1562 
1563 	/*
1564 	 * If a list.suspect event is received, create a case for the specified
1565 	 * UUID in the case hash, with the transport module as its owner.
1566 	 */
1567 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_SUSPECT_CLASS)) {
1568 		if (xip->xi_flags & FMD_XPRT_CACHE_AS_LOCAL)
1569 			fmd_xprt_list_suspect_local(xp, nvl);
1570 		else
1571 			fmd_xprt_list_suspect(xp, nvl);
1572 		fmd_event_hold(e);
1573 		fmd_event_rele(e);
1574 		goto done;
1575 	}
1576 
1577 	/*
1578 	 * If a list.updated or list.repaired event is received, update the
1579 	 * resource cache status and the local case.
1580 	 */
1581 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_REPAIRED_CLASS) ||
1582 	    fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_UPDATED_CLASS)) {
1583 		uint8_t *statusp;
1584 		uint_t nelem = 0;
1585 
1586 		(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS,
1587 		    &statusp, &nelem);
1588 		fmd_module_lock(xip->xi_queue->eq_mod);
1589 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1590 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1591 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1592 			if (cip->ci_xprt != NULL) {
1593 				fmd_case_update_status(cp, statusp,
1594 				    cip->ci_proxy_asru, cip->ci_diag_asru);
1595 				fmd_case_update_containees(cp);
1596 				fmd_case_update(cp);
1597 			}
1598 			fmd_case_rele(cp);
1599 		}
1600 		fmd_module_unlock(xip->xi_queue->eq_mod);
1601 		fmd_event_hold(e);
1602 		fmd_event_rele(e);
1603 		goto done;
1604 	}
1605 
1606 	/*
1607 	 * If a list.isolated event is received, update resource cache status
1608 	 */
1609 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_ISOLATED_CLASS)) {
1610 		uint8_t *statusp;
1611 		uint_t nelem = 0;
1612 
1613 		(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS,
1614 		    &statusp, &nelem);
1615 		fmd_module_lock(xip->xi_queue->eq_mod);
1616 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1617 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1618 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1619 			if (cip->ci_xprt != NULL)
1620 				fmd_case_update_status(cp, statusp,
1621 				    cip->ci_proxy_asru, cip->ci_diag_asru);
1622 			fmd_case_rele(cp);
1623 		}
1624 		fmd_module_unlock(xip->xi_queue->eq_mod);
1625 		fmd_event_hold(e);
1626 		fmd_event_rele(e);
1627 		goto done;
1628 	}
1629 
1630 	/*
1631 	 * If a list.resolved event is received, resolve the local case.
1632 	 */
1633 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_RESOLVED_CLASS)) {
1634 		fmd_module_lock(xip->xi_queue->eq_mod);
1635 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1636 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1637 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1638 			if (cip->ci_xprt != NULL)
1639 				fmd_case_transition(cp, (cip->ci_state ==
1640 				    FMD_CASE_REPAIRED) ? FMD_CASE_RESOLVED :
1641 				    (cip->ci_state == FMD_CASE_CLOSED) ?
1642 				    FMD_CASE_REPAIRED : FMD_CASE_CLOSE_WAIT,
1643 				    FMD_CF_RESOLVED);
1644 			fmd_case_rele(cp);
1645 		}
1646 		fmd_module_unlock(xip->xi_queue->eq_mod);
1647 		fmd_event_hold(e);
1648 		fmd_event_rele(e);
1649 		goto done;
1650 	}
1651 
1652 	if (logonly == FMD_B_TRUE || (xip->xi_flags & FMD_XPRT_EXTERNAL)) {
1653 		/*
1654 		 * Don't proxy ereports on an EXTERNAL transport - we won't
1655 		 * know how to diagnose them with the wrong topology. Note
1656 		 * that here (and above) we have to hold/release the event in
1657 		 * order for it to be freed.
1658 		 */
1659 		fmd_event_hold(e);
1660 		fmd_event_rele(e);
1661 	} else if (isproto == FMD_B_TRUE)
1662 		fmd_dispq_dispatch(dp->d_disp, e, class);
1663 	else
1664 		fmd_modhash_dispatch(dp->d_mod_hash, e);
1665 done:
1666 	(void) pthread_mutex_lock(&xip->xi_lock);
1667 
1668 	ASSERT(xip->xi_busy != 0);
1669 	xip->xi_busy--;
1670 
1671 	(void) pthread_cond_broadcast(&xip->xi_cv);
1672 	(void) pthread_mutex_unlock(&xip->xi_lock);
1673 }
1674 
1675 void
1676 fmd_xprt_uuclose(fmd_xprt_t *xp, const char *uuid)
1677 {
1678 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1679 
1680 	fmd_event_t *e;
1681 	nvlist_t *nvl;
1682 	char *s;
1683 
1684 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1685 		return; /* read-only transports do not proxy uuclose */
1686 
1687 	TRACE((FMD_DBG_XPRT, "xprt %u closing case %s\n", xip->xi_id, uuid));
1688 
1689 	nvl = fmd_protocol_xprt_uuclose(xip->xi_queue->eq_mod,
1690 	    "resource.fm.xprt.uuclose", xip->xi_version, uuid);
1691 
1692 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1693 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1694 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1695 }
1696 
1697 /*
1698  * On proxy side, send back uuresolved request to diagnosing side
1699  */
1700 void
1701 fmd_xprt_uuresolved(fmd_xprt_t *xp, const char *uuid)
1702 {
1703 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1704 
1705 	fmd_event_t *e;
1706 	nvlist_t *nvl;
1707 	char *s;
1708 
1709 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1710 		return; /* read-only transports do not proxy uuresolved */
1711 
1712 	TRACE((FMD_DBG_XPRT, "xprt %u resolving case %s\n", xip->xi_id, uuid));
1713 
1714 	nvl = fmd_protocol_xprt_uuresolved(xip->xi_queue->eq_mod,
1715 	    "resource.fm.xprt.uuresolved", xip->xi_version, uuid);
1716 
1717 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1718 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1719 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1720 }
1721 
1722 /*
1723  * On proxy side, send back repair/acquit/etc request to diagnosing side
1724  */
1725 void
1726 fmd_xprt_updated(fmd_xprt_t *xp, const char *uuid, uint8_t *statusp,
1727 	uint8_t *has_asrup, uint_t nelem)
1728 {
1729 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1730 
1731 	fmd_event_t *e;
1732 	nvlist_t *nvl;
1733 	char *s;
1734 
1735 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1736 		return; /* read-only transports do not support remote repairs */
1737 
1738 	TRACE((FMD_DBG_XPRT, "xprt %u updating case %s\n", xip->xi_id, uuid));
1739 
1740 	nvl = fmd_protocol_xprt_updated(xip->xi_queue->eq_mod,
1741 	    "resource.fm.xprt.updated", xip->xi_version, uuid, statusp,
1742 	    has_asrup, nelem);
1743 
1744 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1745 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1746 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1747 }
1748 
1749 /*
1750  * Insert the specified class into our remote subscription hash.  If the class
1751  * is already present, bump the reference count; otherwise add it to the hash
1752  * and then enqueue an event for our remote peer to proxy our subscription.
1753  */
1754 void
1755 fmd_xprt_subscribe(fmd_xprt_t *xp, const char *class)
1756 {
1757 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1758 
1759 	uint_t refs;
1760 	nvlist_t *nvl;
1761 	fmd_event_t *e;
1762 	char *s;
1763 
1764 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1765 		return; /* read-only transports do not proxy subscriptions */
1766 
1767 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1768 		return; /* transport is not yet an active subscriber */
1769 
1770 	(void) pthread_mutex_lock(&xip->xi_lock);
1771 	refs = fmd_xprt_class_hash_insert(xip, &xip->xi_rsub, class);
1772 	(void) pthread_mutex_unlock(&xip->xi_lock);
1773 
1774 	if (refs > 1)
1775 		return; /* we've already asked our peer for this subscription */
1776 
1777 	fmd_dprintf(FMD_DBG_XPRT,
1778 	    "xprt %u subscribing to %s\n", xip->xi_id, class);
1779 
1780 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1781 	    "resource.fm.xprt.subscribe", xip->xi_version, class);
1782 
1783 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1784 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1785 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1786 }
1787 
1788 /*
1789  * Delete the specified class from the remote subscription hash.  If the
1790  * reference count drops to zero, ask our remote peer to unsubscribe by proxy.
1791  */
1792 void
1793 fmd_xprt_unsubscribe(fmd_xprt_t *xp, const char *class)
1794 {
1795 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1796 
1797 	uint_t refs;
1798 	nvlist_t *nvl;
1799 	fmd_event_t *e;
1800 	char *s;
1801 
1802 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1803 		return; /* read-only transports do not proxy subscriptions */
1804 
1805 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1806 		return; /* transport is not yet an active subscriber */
1807 
1808 	/*
1809 	 * If the subscription reference count drops to zero in xi_rsub, insert
1810 	 * an entry into the xi_usub hash indicating we await an unsuback event.
1811 	 */
1812 	(void) pthread_mutex_lock(&xip->xi_lock);
1813 
1814 	if ((refs = fmd_xprt_class_hash_delete(xip, &xip->xi_rsub, class)) == 0)
1815 		(void) fmd_xprt_class_hash_insert(xip, &xip->xi_usub, class);
1816 
1817 	(void) pthread_mutex_unlock(&xip->xi_lock);
1818 
1819 	if (refs != 0)
1820 		return; /* other subscriptions for this class still active */
1821 
1822 	fmd_dprintf(FMD_DBG_XPRT,
1823 	    "xprt %u unsubscribing from %s\n", xip->xi_id, class);
1824 
1825 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1826 	    "resource.fm.xprt.unsubscribe", xip->xi_version, class);
1827 
1828 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1829 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1830 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1831 }
1832 
1833 static void
1834 fmd_xprt_subscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1835 {
1836 	fmd_xprt_t *xp;
1837 
1838 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1839 		fmd_xprt_subscribe(xp, class);
1840 		fmd_idspace_rele(ids, id);
1841 	}
1842 }
1843 
1844 void
1845 fmd_xprt_subscribe_all(const char *class)
1846 {
1847 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1848 
1849 	if (ids->ids_count != 0)
1850 		fmd_idspace_apply(ids, fmd_xprt_subscribe_xid, (void *)class);
1851 }
1852 
1853 static void
1854 fmd_xprt_unsubscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1855 {
1856 	fmd_xprt_t *xp;
1857 
1858 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1859 		fmd_xprt_unsubscribe(xp, class);
1860 		fmd_idspace_rele(ids, id);
1861 	}
1862 }
1863 
1864 void
1865 fmd_xprt_unsubscribe_all(const char *class)
1866 {
1867 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1868 
1869 	if (ids->ids_count != 0)
1870 		fmd_idspace_apply(ids, fmd_xprt_unsubscribe_xid, (void *)class);
1871 }
1872 
1873 /*ARGSUSED*/
1874 static void
1875 fmd_xprt_suspend_xid(fmd_idspace_t *ids, id_t id, void *arg)
1876 {
1877 	fmd_xprt_t *xp;
1878 
1879 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1880 		fmd_xprt_xsuspend(xp, FMD_XPRT_DSUSPENDED);
1881 		fmd_idspace_rele(ids, id);
1882 	}
1883 }
1884 
1885 void
1886 fmd_xprt_suspend_all(void)
1887 {
1888 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1889 
1890 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1891 
1892 	if (fmd.d_xprt_suspend++ != 0) {
1893 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1894 		return; /* already suspended */
1895 	}
1896 
1897 	if (ids->ids_count != 0)
1898 		fmd_idspace_apply(ids, fmd_xprt_suspend_xid, NULL);
1899 
1900 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1901 }
1902 
1903 /*ARGSUSED*/
1904 static void
1905 fmd_xprt_resume_xid(fmd_idspace_t *ids, id_t id, void *arg)
1906 {
1907 	fmd_xprt_t *xp;
1908 
1909 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1910 		fmd_xprt_xresume(xp, FMD_XPRT_DSUSPENDED);
1911 		fmd_idspace_rele(ids, id);
1912 	}
1913 }
1914 
1915 void
1916 fmd_xprt_resume_all(void)
1917 {
1918 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1919 
1920 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1921 
1922 	if (fmd.d_xprt_suspend == 0)
1923 		fmd_panic("fmd_xprt_suspend/resume_all mismatch\n");
1924 
1925 	if (--fmd.d_xprt_suspend != 0) {
1926 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1927 		return; /* not ready to be resumed */
1928 	}
1929 
1930 	if (ids->ids_count != 0)
1931 		fmd_idspace_apply(ids, fmd_xprt_resume_xid, NULL);
1932 
1933 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1934 }
1935