xref: /titanic_52/usr/src/cmd/fm/fmd/common/fmd_xprt.c (revision f9e0b1dc6f25356fd35837dc9e1124b2d91ed6f3)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
24  */
25 
26 /*
27  * FMD Transport Subsystem
28  *
29  * A transport module uses some underlying mechanism to transport events.
30  * This mechanism may use any underlying link-layer protocol and may support
31  * additional link-layer packets unrelated to FMA.  Some appropriate link-
32  * layer mechanism to create the underlying connection is expected to be
33  * called prior to calling fmd_xprt_open() itself.  Alternatively, a transport
34  * may be created in the suspended state by specifying the FMD_XPRT_SUSPENDED
35  * flag as part of the call to fmd_xprt_open(), and then may be resumed later.
36  * The underlying transport mechanism is *required* to provide ordering: that
37  * is, the sequences of bytes written across the transport must be read by
38  * the remote peer in the order that they are written, even across separate
39  * calls to fmdo_send().  As an example, the Internet TCP protocol would be
40  * a valid transport as it guarantees ordering, whereas the Internet UDP
41  * protocol would not because UDP datagrams may be delivered in any order
42  * as a result of delays introduced when datagrams pass through routers.
43  *
44  * Similar to sending events, a transport module receives events that are from
45  * its peer remote endpoint using some transport-specific mechanism that is
46  * unknown to FMD.  As each event is received, the transport module is
47  * responsible for constructing a valid nvlist_t object from the data and then
48  * calling fmd_xprt_post() to post the event to the containing FMD's dispatch
49  * queue, making it available to all local modules that are not transport
50  * modules that have subscribed to the event.
51  *
52  * The following state machine is used for each transport.  The initial state
53  * is either SYN, ACK, or RUN, depending on the flags specified to xprt_create.
54  *
55  *       FMD_XPRT_ACCEPT   !FMD_XPRT_ACCEPT
56  *             |                 |
57  * waiting  +--v--+           +--v--+  waiting
58  * for syn  | SYN |--+     --+| ACK |  for ack
59  * event    +-----+   \   /   +-----+  event
60  *             |       \ /       |
61  * drop all +--v--+     X     +--v--+  send subscriptions,
62  * events   | ERR |<---+ +--->| SUB |  recv subscriptions,
63  *          +-----+           +-----+  wait for run event
64  *             ^                 |
65  *             |     +-----+     |
66  *             +-----| RUN |<----+
67  *                   +--^--+
68  *                      |
69  *               FMD_XPRT_RDONLY
70  *
71  * When fmd_xprt_open() is called without FMD_XPRT_ACCEPT, the Common Transport
72  * Layer enqueues a "syn" event for the module in its event queue and sets the
73  * state to ACK.  In state ACK, we are waiting for the transport to get an
74  * "ack" event and call fmd_xprt_post() on this event.  Other events will be
75  * discarded.  If an "ack" is received, we transition to state SUB.  If a
76  * configurable timeout occurs or if the "ack" is invalid (e.g. invalid version
77  * exchange), we transition to state ERR.  Once in state ERR, no further
78  * operations are valid except fmd_xprt_close() and fmd_xprt_error() will
79  * return a non-zero value to the caller indicating the transport has failed.
80  *
81  * When fmd_xprt_open() is called with FMD_XPRT_ACCEPT, the Common Transport
82  * Layer assumes this transport is being used to accept a virtual connection
83  * from a remote peer that is sending a "syn", and sets the initial state to
84  * SYN.  In this state, the transport waits for a "syn" event, validates it,
85  * and then transitions to state SUB if it is valid or state ERR if it is not.
86  *
87  * Once in state SUB, the transport module is expected to receive a sequence of
88  * zero or more "subscribe" events from the remote peer, followed by a "run"
89  * event.  Once in state RUN, the transport is active and any events can be
90  * sent or received.  The transport module is free to call fmd_xprt_close()
91  * from any state.  The fmd_xprt_error() function will return zero if the
92  * transport is not in the ERR state, or non-zero if it is in the ERR state.
93  *
94  * Once the state machine reaches RUN, other FMA protocol events can be sent
95  * and received across the transport in addition to the various control events.
96  *
97  * Table of Common Transport Layer Control Events
98  * ==============================================
99  *
100  * FMA Class                     Payload
101  * ---------                     -------
102  * resource.fm.xprt.uuclose      string (uuid of case)
103  * resource.fm.xprt.uuresolved   string (uuid of case)
104  * resource.fm.xprt.updated      string (uuid of case)
105  * resource.fm.xprt.subscribe    string (class pattern)
106  * resource.fm.xprt.unsubscribe  string (class pattern)
107  * resource.fm.xprt.unsuback     string (class pattern)
108  * resource.fm.xprt.syn          version information
109  * resource.fm.xprt.ack          version information
110  * resource.fm.xprt.run          version information
111  *
112  * Control events are used to add and delete proxy subscriptions on the remote
113  * transport peer module, and to set up connections.  When a "syn" event is
114  * sent, FMD will include in the payload the highest version of the FMA event
115  * protocol that is supported by the sender.  When a "syn" event is received,
116  * the receiving FMD will use the minimum of this version and its version of
117  * the protocol, and reply with this new minimum version in the "ack" event.
118  * The receiver will then use this new minimum for subsequent event semantics.
119  */
120 
121 #include <sys/fm/protocol.h>
122 #include <strings.h>
123 #include <limits.h>
124 
125 #include <fmd_alloc.h>
126 #include <fmd_error.h>
127 #include <fmd_conf.h>
128 #include <fmd_subr.h>
129 #include <fmd_string.h>
130 #include <fmd_protocol.h>
131 #include <fmd_thread.h>
132 #include <fmd_eventq.h>
133 #include <fmd_dispq.h>
134 #include <fmd_ctl.h>
135 #include <fmd_log.h>
136 #include <fmd_ustat.h>
137 #include <fmd_case.h>
138 #include <fmd_api.h>
139 #include <fmd_fmri.h>
140 #include <fmd_asru.h>
141 #include <fmd_xprt.h>
142 
143 #include <fmd.h>
144 
145 /*
146  * The states shown above in the transport state machine diagram are encoded
147  * using arrays of class patterns and a corresponding action function.  These
148  * arrays are then passed to fmd_xprt_transition() to change transport states.
149  */
150 
151 const fmd_xprt_rule_t _fmd_xprt_state_syn[] = {
152 { "resource.fm.xprt.syn", fmd_xprt_event_syn },
153 { "*", fmd_xprt_event_error },
154 { NULL, NULL }
155 };
156 
157 const fmd_xprt_rule_t _fmd_xprt_state_ack[] = {
158 { "resource.fm.xprt.ack", fmd_xprt_event_ack },
159 { "*", fmd_xprt_event_error },
160 };
161 
162 const fmd_xprt_rule_t _fmd_xprt_state_err[] = {
163 { "*", fmd_xprt_event_drop },
164 { NULL, NULL }
165 };
166 
167 const fmd_xprt_rule_t _fmd_xprt_state_sub[] = {
168 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
169 { "resource.fm.xprt.run", fmd_xprt_event_run },
170 { "resource.fm.xprt.*", fmd_xprt_event_error },
171 { "*", fmd_xprt_event_drop },
172 { NULL, NULL }
173 };
174 
175 const fmd_xprt_rule_t _fmd_xprt_state_run[] = {
176 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
177 { "resource.fm.xprt.unsubscribe", fmd_xprt_event_unsub },
178 { "resource.fm.xprt.unsuback", fmd_xprt_event_unsuback },
179 { "resource.fm.xprt.uuclose", fmd_xprt_event_uuclose },
180 { "resource.fm.xprt.uuresolved", fmd_xprt_event_uuresolved },
181 { "resource.fm.xprt.updated", fmd_xprt_event_updated },
182 { "resource.fm.xprt.*", fmd_xprt_event_error },
183 { NULL, NULL }
184 };
185 
186 /*
187  * Template for per-transport statistics installed by fmd on behalf of each
188  * transport.  These are used to initialize the per-transport xi_stats.  For
189  * each statistic, the name is prepended with "fmd.xprt.%u", where %u is the
190  * transport ID (xi_id) and then are inserted into the per-module stats hash.
191  * The values in this array must match fmd_xprt_stat_t from <fmd_xprt.h>.
192  */
193 static const fmd_xprt_stat_t _fmd_xprt_stat_tmpl = {
194 {
195 { "dispatched", FMD_TYPE_UINT64, "total events dispatched to transport" },
196 { "dequeued", FMD_TYPE_UINT64, "total events dequeued by transport" },
197 { "prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by transport" },
198 { "dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" },
199 { "wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" },
200 { "wtime", FMD_TYPE_TIME, "total wait time on queue" },
201 { "wlentime", FMD_TYPE_TIME, "total wait length * time product" },
202 { "wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" },
203 { "dtime", FMD_TYPE_TIME, "total processing time after dequeue" },
204 { "dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" },
205 },
206 { "module", FMD_TYPE_STRING, "module that owns this transport" },
207 { "authority", FMD_TYPE_STRING, "authority associated with this transport" },
208 { "state", FMD_TYPE_STRING, "current transport state" },
209 { "received", FMD_TYPE_UINT64, "events received by transport" },
210 { "discarded", FMD_TYPE_UINT64, "bad events discarded by transport" },
211 { "retried", FMD_TYPE_UINT64, "retries requested of transport" },
212 { "replayed", FMD_TYPE_UINT64, "events replayed by transport" },
213 { "lost", FMD_TYPE_UINT64, "events lost by transport" },
214 { "timeouts", FMD_TYPE_UINT64, "events received by transport with ttl=0" },
215 { "subscriptions", FMD_TYPE_UINT64, "subscriptions registered to transport" },
216 };
217 
218 static void
219 fmd_xprt_class_hash_create(fmd_xprt_class_hash_t *xch, fmd_eventq_t *eq)
220 {
221 	uint_t hashlen = fmd.d_str_buckets;
222 
223 	xch->xch_queue = eq;
224 	xch->xch_hashlen = hashlen;
225 	xch->xch_hash = fmd_zalloc(sizeof (void *) * hashlen, FMD_SLEEP);
226 }
227 
228 static void
229 fmd_xprt_class_hash_destroy(fmd_xprt_class_hash_t *xch)
230 {
231 	fmd_eventq_t *eq = xch->xch_queue;
232 	fmd_xprt_class_t *xcp, *ncp;
233 	uint_t i;
234 
235 	for (i = 0; i < xch->xch_hashlen; i++) {
236 		for (xcp = xch->xch_hash[i]; xcp != NULL; xcp = ncp) {
237 			ncp = xcp->xc_next;
238 
239 			if (eq != NULL)
240 				fmd_dispq_delete(fmd.d_disp, eq, xcp->xc_class);
241 
242 			fmd_strfree(xcp->xc_class);
243 			fmd_free(xcp, sizeof (fmd_xprt_class_t));
244 		}
245 	}
246 
247 	fmd_free(xch->xch_hash, sizeof (void *) * xch->xch_hashlen);
248 }
249 
250 /*
251  * Insert the specified class into the specified class hash, and return the
252  * reference count.  A return value of one indicates this is the first insert.
253  * If an eventq is associated with the hash, insert a dispq subscription for it.
254  */
255 static uint_t
256 fmd_xprt_class_hash_insert(fmd_xprt_impl_t *xip,
257     fmd_xprt_class_hash_t *xch, const char *class)
258 {
259 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
260 	fmd_xprt_class_t *xcp;
261 
262 	ASSERT(MUTEX_HELD(&xip->xi_lock));
263 
264 	for (xcp = xch->xch_hash[h]; xcp != NULL; xcp = xcp->xc_next) {
265 		if (strcmp(class, xcp->xc_class) == 0)
266 			return (++xcp->xc_refs);
267 	}
268 
269 	xcp = fmd_alloc(sizeof (fmd_xprt_class_t), FMD_SLEEP);
270 	xcp->xc_class = fmd_strdup(class, FMD_SLEEP);
271 	xcp->xc_next = xch->xch_hash[h];
272 	xcp->xc_refs = 1;
273 	xch->xch_hash[h] = xcp;
274 
275 	if (xch->xch_queue != NULL)
276 		fmd_dispq_insert(fmd.d_disp, xch->xch_queue, class);
277 
278 	return (xcp->xc_refs);
279 }
280 
281 /*
282  * Delete the specified class from the specified class hash, and return the
283  * reference count.  A return value of zero indicates the class was deleted.
284  * If an eventq is associated with the hash, delete the dispq subscription.
285  */
286 static uint_t
287 fmd_xprt_class_hash_delete(fmd_xprt_impl_t *xip,
288     fmd_xprt_class_hash_t *xch, const char *class)
289 {
290 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
291 	fmd_xprt_class_t *xcp, **pp;
292 
293 	ASSERT(MUTEX_HELD(&xip->xi_lock));
294 	pp = &xch->xch_hash[h];
295 
296 	for (xcp = *pp; xcp != NULL; xcp = xcp->xc_next) {
297 		if (strcmp(class, xcp->xc_class) == 0)
298 			break;
299 		else
300 			pp = &xcp->xc_next;
301 	}
302 
303 	if (xcp == NULL)
304 		return (-1U); /* explicitly permit an invalid delete */
305 
306 	if (--xcp->xc_refs != 0)
307 		return (xcp->xc_refs);
308 
309 	ASSERT(xcp->xc_refs == 0);
310 	*pp = xcp->xc_next;
311 
312 	fmd_strfree(xcp->xc_class);
313 	fmd_free(xcp, sizeof (fmd_xprt_class_t));
314 
315 	if (xch->xch_queue != NULL)
316 		fmd_dispq_delete(fmd.d_disp, xch->xch_queue, class);
317 
318 	return (0);
319 }
320 
321 /*
322  * Queue subscribe events for the specified transport corresponding to all of
323  * the active module subscriptions.  This is an extremely heavyweight operation
324  * that we expect to take place rarely (i.e. when loading a transport module
325  * or when it establishes a connection).  We lock all of the known modules to
326  * prevent them from adding or deleting subscriptions, then snapshot their
327  * subscriptions, and then unlock all of the modules.  We hold the modhash
328  * lock for the duration of this operation to prevent new modules from loading.
329  */
330 static void
331 fmd_xprt_subscribe_modhash(fmd_xprt_impl_t *xip, fmd_modhash_t *mhp)
332 {
333 	fmd_xprt_t *xp = (fmd_xprt_t *)xip;
334 	const fmd_conf_path_t *pap;
335 	fmd_module_t *mp;
336 	uint_t i, j;
337 
338 	(void) pthread_rwlock_rdlock(&mhp->mh_lock);
339 
340 	for (i = 0; i < mhp->mh_hashlen; i++) {
341 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
342 			fmd_module_lock(mp);
343 	}
344 
345 	(void) pthread_mutex_lock(&xip->xi_lock);
346 	ASSERT(!(xip->xi_flags & FMD_XPRT_SUBSCRIBER));
347 	xip->xi_flags |= FMD_XPRT_SUBSCRIBER;
348 	(void) pthread_mutex_unlock(&xip->xi_lock);
349 
350 	for (i = 0; i < mhp->mh_hashlen; i++) {
351 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) {
352 			(void) fmd_conf_getprop(mp->mod_conf,
353 			    FMD_PROP_SUBSCRIPTIONS, &pap);
354 			for (j = 0; j < pap->cpa_argc; j++)
355 				fmd_xprt_subscribe(xp, pap->cpa_argv[j]);
356 		}
357 	}
358 
359 	for (i = 0; i < mhp->mh_hashlen; i++) {
360 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
361 			fmd_module_unlock(mp);
362 	}
363 
364 	(void) pthread_rwlock_unlock(&mhp->mh_lock);
365 }
366 
367 static void
368 fmd_xprt_transition(fmd_xprt_impl_t *xip,
369     const fmd_xprt_rule_t *state, const char *tag)
370 {
371 	fmd_xprt_t *xp = (fmd_xprt_t *)xip;
372 	fmd_event_t *e;
373 	nvlist_t *nvl;
374 	char *s;
375 
376 	TRACE((FMD_DBG_XPRT, "xprt %u -> %s\n", xip->xi_id, tag));
377 
378 	xip->xi_state = state;
379 	s = fmd_strdup(tag, FMD_SLEEP);
380 
381 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
382 	fmd_strfree(xip->xi_stats->xs_state.fmds_value.str);
383 	xip->xi_stats->xs_state.fmds_value.str = s;
384 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
385 
386 	/*
387 	 * If we've reached the SUB state, take out the big hammer and snapshot
388 	 * all of the subscriptions of all of the loaded modules.  Then queue a
389 	 * run event for our remote peer indicating that it can enter RUN.
390 	 */
391 	if (state == _fmd_xprt_state_sub) {
392 		fmd_xprt_subscribe_modhash(xip, fmd.d_mod_hash);
393 
394 		/*
395 		 * For read-write transports, we always want to set up remote
396 		 * subscriptions to the bultin list.* events, regardless of
397 		 * whether any agents have subscribed to them.
398 		 */
399 		if (xip->xi_flags & FMD_XPRT_RDWR) {
400 			fmd_xprt_subscribe(xp, FM_LIST_SUSPECT_CLASS);
401 			fmd_xprt_subscribe(xp, FM_LIST_ISOLATED_CLASS);
402 			fmd_xprt_subscribe(xp, FM_LIST_UPDATED_CLASS);
403 			fmd_xprt_subscribe(xp, FM_LIST_RESOLVED_CLASS);
404 			fmd_xprt_subscribe(xp, FM_LIST_REPAIRED_CLASS);
405 		}
406 
407 		nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
408 		    "resource.fm.xprt.run", xip->xi_version);
409 
410 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
411 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
412 		fmd_eventq_insert_at_time(xip->xi_queue, e);
413 	}
414 }
415 
416 static void
417 fmd_xprt_authupdate(fmd_xprt_impl_t *xip)
418 {
419 	char *s = fmd_fmri_auth2str(xip->xi_auth);
420 
421 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
422 	fmd_strfree(xip->xi_stats->xs_authority.fmds_value.str);
423 	xip->xi_stats->xs_authority.fmds_value.str = s;
424 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
425 }
426 
427 static int
428 fmd_xprt_vmismatch(fmd_xprt_impl_t *xip, nvlist_t *nvl, uint_t *rversionp)
429 {
430 	uint8_t rversion;
431 
432 	if (nvlist_lookup_uint8(nvl, FM_VERSION, &rversion) != 0) {
433 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
434 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
435 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
436 
437 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
438 		return (1);
439 	}
440 
441 	if (rversion > xip->xi_version) {
442 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u protocol mismatch: %u>%u\n",
443 		    xip->xi_id, rversion, xip->xi_version);
444 
445 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
446 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
447 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
448 
449 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
450 		return (1);
451 	}
452 
453 	if (rversionp != NULL)
454 		*rversionp = rversion;
455 
456 	return (0);
457 }
458 
459 void
460 fmd_xprt_event_syn(fmd_xprt_impl_t *xip, nvlist_t *nvl)
461 {
462 	fmd_event_t *e;
463 	uint_t vers;
464 	char *class;
465 
466 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
467 		return; /* transitioned to error state */
468 
469 	/*
470 	 * If the transport module didn't specify an authority, extract the
471 	 * one that is passed along with the xprt.syn event and use that.
472 	 */
473 	if (xip->xi_auth == NULL &&
474 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
475 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
476 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
477 		fmd_xprt_authupdate(xip);
478 	}
479 
480 	nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
481 	    "resource.fm.xprt.ack", xip->xi_version);
482 
483 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
484 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
485 	fmd_eventq_insert_at_time(xip->xi_queue, e);
486 
487 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
488 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
489 }
490 
491 void
492 fmd_xprt_event_ack(fmd_xprt_impl_t *xip, nvlist_t *nvl)
493 {
494 	uint_t vers;
495 
496 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
497 		return; /* transitioned to error state */
498 
499 	/*
500 	 * If the transport module didn't specify an authority, extract the
501 	 * one that is passed along with the xprt.syn event and use that.
502 	 */
503 	if (xip->xi_auth == NULL &&
504 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
505 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
506 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
507 		fmd_xprt_authupdate(xip);
508 	}
509 
510 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
511 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
512 }
513 
514 /*
515  * Upon transition to RUN, we take every solved case and resend a list.suspect
516  * event for it to our remote peer.  If a case transitions from solved to a
517  * future state (CLOSE_WAIT, CLOSED, or REPAIRED) while we are iterating over
518  * the case hash, we will get it as part of examining the resource cache, next.
519  */
520 static void
521 fmd_xprt_send_case(fmd_case_t *cp, void *arg)
522 {
523 	fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
524 	fmd_xprt_impl_t *xip = arg;
525 
526 	fmd_event_t *e;
527 	nvlist_t *nvl;
528 	char *class;
529 
530 	if (cip->ci_state == FMD_CASE_UNSOLVED)
531 		return;
532 
533 	nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS);
534 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
535 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
536 
537 	fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n",
538 	    FM_LIST_SUSPECT_CLASS, cip->ci_uuid, xip->xi_id);
539 
540 	fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid);
541 }
542 
543 /*
544  * Similar to the above function, but for use with readonly transport. Puts
545  * the event on the module's queue such that it's fmdo_recv function can pick
546  * it up and send it if appropriate.
547  */
548 static void
549 fmd_xprt_send_case_ro(fmd_case_t *cp, void *arg)
550 {
551 	fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
552 	fmd_module_t *mp = arg;
553 
554 	fmd_event_t *e;
555 	nvlist_t *nvl;
556 	char *class;
557 
558 	if (cip->ci_state == FMD_CASE_UNSOLVED)
559 		return;
560 
561 	nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS);
562 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
563 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
564 
565 	fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to rdonly transport %s\n",
566 	    FM_LIST_SUSPECT_CLASS, cip->ci_uuid, mp->mod_name);
567 
568 	fmd_dispq_dispatch_gid(fmd.d_disp, e, class, mp->mod_queue->eq_sgid);
569 }
570 
571 void
572 fmd_xprt_event_run(fmd_xprt_impl_t *xip, nvlist_t *nvl)
573 {
574 	if (!fmd_xprt_vmismatch(xip, nvl, NULL)) {
575 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
576 		fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case, xip);
577 	}
578 }
579 
580 void
581 fmd_xprt_event_sub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
582 {
583 	char *class;
584 
585 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
586 		return; /* transitioned to error state */
587 
588 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
589 		return; /* malformed protocol event */
590 
591 	(void) pthread_mutex_lock(&xip->xi_lock);
592 	(void) fmd_xprt_class_hash_insert(xip, &xip->xi_lsub, class);
593 	(void) pthread_mutex_unlock(&xip->xi_lock);
594 
595 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
596 	xip->xi_stats->xs_subscriptions.fmds_value.ui64++;
597 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
598 }
599 
600 void
601 fmd_xprt_event_unsub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
602 {
603 	fmd_event_t *e;
604 	char *class;
605 
606 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
607 		return; /* transitioned to error state */
608 
609 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
610 		return; /* malformed protocol event */
611 
612 	(void) pthread_mutex_lock(&xip->xi_lock);
613 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_lsub, class);
614 	(void) pthread_mutex_unlock(&xip->xi_lock);
615 
616 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
617 	xip->xi_stats->xs_subscriptions.fmds_value.ui64--;
618 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
619 
620 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
621 	    "resource.fm.xprt.unsuback", xip->xi_version, class);
622 
623 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
624 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
625 	fmd_eventq_insert_at_time(xip->xi_queue, e);
626 }
627 
628 void
629 fmd_xprt_event_unsuback(fmd_xprt_impl_t *xip, nvlist_t *nvl)
630 {
631 	char *class;
632 
633 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
634 		return; /* transitioned to error state */
635 
636 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
637 		return; /* malformed protocol event */
638 
639 	(void) pthread_mutex_lock(&xip->xi_lock);
640 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_usub, class);
641 	(void) pthread_mutex_unlock(&xip->xi_lock);
642 }
643 
644 /*
645  * on diagnosing side, receive a uuclose from the proxy.
646  */
647 void
648 fmd_xprt_event_uuclose(fmd_xprt_impl_t *xip, nvlist_t *nvl)
649 {
650 	fmd_case_t *cp;
651 	char *uuid;
652 
653 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
654 		return; /* transitioned to error state */
655 
656 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
657 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
658 		/*
659 		 * update resource cache status and transition case
660 		 */
661 		fmd_case_close_status(cp);
662 		fmd_case_transition(cp, FMD_CASE_CLOSE_WAIT, FMD_CF_ISOLATED);
663 		fmd_case_rele(cp);
664 	}
665 }
666 
667 /*
668  * on diagnosing side, receive a uuresolved from the proxy.
669  */
670 void
671 fmd_xprt_event_uuresolved(fmd_xprt_impl_t *xip, nvlist_t *nvl)
672 {
673 	fmd_case_t *cp;
674 	char *uuid;
675 
676 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
677 		return; /* transitioned to error state */
678 
679 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
680 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
681 		fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
682 
683 		fmd_case_transition(cp, (cip->ci_state == FMD_CASE_REPAIRED) ?
684 		    FMD_CASE_RESOLVED : (cip->ci_state == FMD_CASE_CLOSED) ?
685 		    FMD_CASE_REPAIRED : FMD_CASE_CLOSE_WAIT, FMD_CF_RESOLVED);
686 		fmd_case_rele(cp);
687 	}
688 }
689 
690 /*
691  * on diagnosing side, receive a repair/acquit from the proxy.
692  */
693 void
694 fmd_xprt_event_updated(fmd_xprt_impl_t *xip, nvlist_t *nvl)
695 {
696 	fmd_case_t *cp;
697 	char *uuid;
698 
699 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
700 		return; /* transitioned to error state */
701 
702 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
703 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
704 		uint8_t *statusp, *proxy_asrup = NULL;
705 		uint_t nelem = 0;
706 
707 		/*
708 		 * Only update status with new repairs if "no remote repair"
709 		 * is not set. Do the case_update anyway though (as this will
710 		 * refresh the status on the proxy side).
711 		 */
712 		if (!(xip->xi_flags & FMD_XPRT_NO_REMOTE_REPAIR)) {
713 			if (nvlist_lookup_uint8_array(nvl,
714 			    FM_RSRC_XPRT_FAULT_STATUS, &statusp, &nelem) == 0 &&
715 			    nelem != 0) {
716 				(void) nvlist_lookup_uint8_array(nvl,
717 				    FM_RSRC_XPRT_FAULT_HAS_ASRU, &proxy_asrup,
718 				    &nelem);
719 				fmd_case_update_status(cp, statusp,
720 				    proxy_asrup, NULL);
721 			}
722 			fmd_case_update_containees(cp);
723 		}
724 		fmd_case_update(cp);
725 		fmd_case_rele(cp);
726 	}
727 }
728 
729 void
730 fmd_xprt_event_error(fmd_xprt_impl_t *xip, nvlist_t *nvl)
731 {
732 	char *class = "<unknown>";
733 
734 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
735 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
736 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
737 
738 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
739 	TRACE((FMD_DBG_XPRT, "xprt %u bad event %s\n", xip->xi_id, class));
740 
741 	fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
742 }
743 
744 void
745 fmd_xprt_event_drop(fmd_xprt_impl_t *xip, nvlist_t *nvl)
746 {
747 	char *class = "<unknown>";
748 
749 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
750 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
751 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
752 
753 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
754 	TRACE((FMD_DBG_XPRT, "xprt %u drop event %s\n", xip->xi_id, class));
755 
756 }
757 
758 fmd_xprt_t *
759 fmd_xprt_create(fmd_module_t *mp, uint_t flags, nvlist_t *auth, void *data)
760 {
761 	fmd_xprt_impl_t *xip = fmd_zalloc(sizeof (fmd_xprt_impl_t), FMD_SLEEP);
762 	fmd_stat_t *statv;
763 	uint_t i, statc;
764 
765 	char buf[PATH_MAX];
766 	fmd_event_t *e;
767 	nvlist_t *nvl;
768 	char *s;
769 
770 	(void) pthread_mutex_init(&xip->xi_lock, NULL);
771 	(void) pthread_cond_init(&xip->xi_cv, NULL);
772 	(void) pthread_mutex_init(&xip->xi_stats_lock, NULL);
773 
774 	xip->xi_auth = auth;
775 	xip->xi_data = data;
776 	xip->xi_version = FM_RSRC_XPRT_VERSION;
777 	xip->xi_flags = flags;
778 
779 	/*
780 	 * Grab fmd.d_xprt_lock to block fmd_xprt_suspend_all() and then create
781 	 * a transport ID and make it visible in fmd.d_xprt_ids.  If transports
782 	 * were previously suspended, set the FMD_XPRT_DSUSPENDED flag on us to
783 	 * ensure that this transport will not run until fmd_xprt_resume_all().
784 	 */
785 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
786 	xip->xi_id = fmd_idspace_alloc(fmd.d_xprt_ids, xip);
787 
788 	if (fmd.d_xprt_suspend != 0)
789 		xip->xi_flags |= FMD_XPRT_DSUSPENDED;
790 
791 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
792 
793 	/*
794 	 * If the module has not yet finished _fmd_init(), set the ISUSPENDED
795 	 * bit so that fmdo_send() is not called until _fmd_init() completes.
796 	 */
797 	if (!(mp->mod_flags & FMD_MOD_INIT))
798 		xip->xi_flags |= FMD_XPRT_ISUSPENDED;
799 
800 	/*
801 	 * Initialize the transport statistics that we keep on behalf of fmd.
802 	 * These are set up using a template defined at the top of this file.
803 	 * We rename each statistic with a prefix ensuring its uniqueness.
804 	 */
805 	statc = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
806 	statv = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
807 	bcopy(&_fmd_xprt_stat_tmpl, statv, sizeof (_fmd_xprt_stat_tmpl));
808 
809 	for (i = 0; i < statc; i++) {
810 		(void) snprintf(statv[i].fmds_name,
811 		    sizeof (statv[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
812 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
813 	}
814 
815 	xip->xi_stats = (fmd_xprt_stat_t *)fmd_ustat_insert(
816 	    mp->mod_ustat, FMD_USTAT_NOALLOC, statc, statv, NULL);
817 
818 	if (xip->xi_stats == NULL)
819 		fmd_panic("failed to create xi_stats (%p)\n", (void *)statv);
820 
821 	xip->xi_stats->xs_module.fmds_value.str =
822 	    fmd_strdup(mp->mod_name, FMD_SLEEP);
823 
824 	if (xip->xi_auth != NULL)
825 		fmd_xprt_authupdate(xip);
826 
827 	/*
828 	 * Create the outbound eventq for this transport and link to its stats.
829 	 * If any suspend bits were set above, suspend the eventq immediately.
830 	 */
831 	xip->xi_queue = fmd_eventq_create(mp, &xip->xi_stats->xs_evqstat,
832 	    &xip->xi_stats_lock, mp->mod_stats->ms_xprtqlimit.fmds_value.ui32);
833 
834 	if (xip->xi_flags & FMD_XPRT_SMASK)
835 		fmd_eventq_suspend(xip->xi_queue);
836 
837 	/*
838 	 * Create our subscription hashes: local subscriptions go to xi_queue,
839 	 * remote subscriptions are tracked only for protocol requests, and
840 	 * pending unsubscriptions are associated with the /dev/null eventq.
841 	 */
842 	fmd_xprt_class_hash_create(&xip->xi_lsub, xip->xi_queue);
843 	fmd_xprt_class_hash_create(&xip->xi_rsub, NULL);
844 	fmd_xprt_class_hash_create(&xip->xi_usub, fmd.d_rmod->mod_queue);
845 
846 	/*
847 	 * Determine our initial state based upon the creation flags.  If we're
848 	 * read-only, go directly to RUN.  If we're accepting a new connection,
849 	 * wait for a SYN.  Otherwise send a SYN and wait for an ACK.
850 	 */
851 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) {
852 		/*
853 		 * Send the list.suspects across here for readonly transports.
854 		 * For read-write transport they will be sent on transition to
855 		 * RUN state in fmd_xprt_event_run().
856 		 */
857 		fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case_ro, mp);
858 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
859 	} else if (flags & FMD_XPRT_ACCEPT)
860 		fmd_xprt_transition(xip, _fmd_xprt_state_syn, "SYN");
861 	else
862 		fmd_xprt_transition(xip, _fmd_xprt_state_ack, "ACK");
863 
864 	/*
865 	 * If client.xprtlog is set to TRUE, create a debugging log for the
866 	 * events received by the transport in var/fm/fmd/xprt/.
867 	 */
868 	(void) fmd_conf_getprop(fmd.d_conf, "client.xprtlog", &i);
869 	(void) fmd_conf_getprop(fmd.d_conf, "log.xprt", &s);
870 
871 	if (i) {
872 		(void) snprintf(buf, sizeof (buf), "%s/%u.log", s, xip->xi_id);
873 		xip->xi_log = fmd_log_open(fmd.d_rootdir, buf, FMD_LOG_XPRT);
874 	}
875 
876 	ASSERT(fmd_module_locked(mp));
877 	fmd_list_append(&mp->mod_transports, xip);
878 
879 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
880 	mp->mod_stats->ms_xprtopen.fmds_value.ui32++;
881 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
882 
883 	/*
884 	 * If this is a read-only transport, return without creating a send
885 	 * queue thread and setting up any connection events in our queue.
886 	 */
887 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
888 		goto out;
889 
890 	/*
891 	 * Once the transport is fully initialized, create a send queue thread
892 	 * and start any connect events flowing to complete our initialization.
893 	 */
894 	if ((xip->xi_thread = fmd_thread_create(mp,
895 	    (fmd_thread_f *)fmd_xprt_send, xip)) == NULL) {
896 
897 		fmd_error(EFMD_XPRT_THR,
898 		    "failed to create thread for transport %u", xip->xi_id);
899 
900 		fmd_xprt_destroy((fmd_xprt_t *)xip);
901 		(void) fmd_set_errno(EFMD_XPRT_THR);
902 		return (NULL);
903 	}
904 
905 	/*
906 	 * If the transport is not being opened to accept an inbound connect,
907 	 * start an outbound connection by enqueuing a SYN event for our peer.
908 	 */
909 	if (!(flags & FMD_XPRT_ACCEPT)) {
910 		nvl = fmd_protocol_xprt_ctl(mp,
911 		    "resource.fm.xprt.syn", FM_RSRC_XPRT_VERSION);
912 
913 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
914 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
915 		fmd_eventq_insert_at_time(xip->xi_queue, e);
916 	}
917 out:
918 	fmd_dprintf(FMD_DBG_XPRT, "opened transport %u\n", xip->xi_id);
919 	return ((fmd_xprt_t *)xip);
920 }
921 
922 void
923 fmd_xprt_destroy(fmd_xprt_t *xp)
924 {
925 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
926 	fmd_module_t *mp = xip->xi_queue->eq_mod;
927 	uint_t id = xip->xi_id;
928 
929 	fmd_case_impl_t *cip, *nip;
930 	fmd_stat_t *sp;
931 	uint_t i, n;
932 
933 	ASSERT(fmd_module_locked(mp));
934 	fmd_list_delete(&mp->mod_transports, xip);
935 
936 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
937 	mp->mod_stats->ms_xprtopen.fmds_value.ui32--;
938 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
939 
940 	(void) pthread_mutex_lock(&xip->xi_lock);
941 
942 	while (xip->xi_busy != 0)
943 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
944 
945 	/*
946 	 * Remove the transport from global visibility, cancel its send-side
947 	 * thread, join with it, and then remove the transport from module
948 	 * visibility.  Once all this is done, destroy and free the transport.
949 	 */
950 	(void) fmd_idspace_free(fmd.d_xprt_ids, xip->xi_id);
951 
952 	if (xip->xi_thread != NULL) {
953 		fmd_eventq_abort(xip->xi_queue);
954 		fmd_module_unlock(mp);
955 		fmd_thread_destroy(xip->xi_thread, FMD_THREAD_JOIN);
956 		fmd_module_lock(mp);
957 	}
958 
959 	if (xip->xi_log != NULL)
960 		fmd_log_rele(xip->xi_log);
961 
962 	/*
963 	 * Release every case handle in the module that was cached by this
964 	 * transport.  This will result in these cases disappearing from the
965 	 * local case hash so that fmd_case_uuclose() and fmd_case_repaired()
966 	 * etc can no longer be used.
967 	 */
968 	for (cip = fmd_list_next(&mp->mod_cases); cip != NULL; cip = nip) {
969 		nip = fmd_list_next(cip);
970 		if (cip->ci_xprt == xp)
971 			fmd_case_discard((fmd_case_t *)cip, B_TRUE);
972 	}
973 
974 	/*
975 	 * Destroy every class in the various subscription hashes and remove
976 	 * any corresponding subscriptions from the event dispatch queue.
977 	 */
978 	fmd_xprt_class_hash_destroy(&xip->xi_lsub);
979 	fmd_xprt_class_hash_destroy(&xip->xi_rsub);
980 	fmd_xprt_class_hash_destroy(&xip->xi_usub);
981 
982 	/*
983 	 * Uniquify the stat names exactly as was done in fmd_xprt_create()
984 	 * before calling fmd_ustat_insert(), otherwise fmd_ustat_delete()
985 	 * won't find the entries in the hash table.
986 	 */
987 	n = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
988 	sp = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
989 	bcopy(&_fmd_xprt_stat_tmpl, sp, sizeof (_fmd_xprt_stat_tmpl));
990 	for (i = 0; i < n; i++) {
991 		(void) snprintf(sp[i].fmds_name,
992 		    sizeof (sp[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
993 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
994 	}
995 	fmd_ustat_delete(mp->mod_ustat, n, sp);
996 	fmd_free(sp, sizeof (_fmd_xprt_stat_tmpl));
997 
998 	fmd_free(xip->xi_stats, sizeof (fmd_xprt_stat_t));
999 	fmd_eventq_destroy(xip->xi_queue);
1000 	nvlist_free(xip->xi_auth);
1001 	fmd_free(xip, sizeof (fmd_xprt_impl_t));
1002 
1003 	fmd_dprintf(FMD_DBG_XPRT, "closed transport %u\n", id);
1004 }
1005 
1006 void
1007 fmd_xprt_xsuspend(fmd_xprt_t *xp, uint_t flags)
1008 {
1009 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1010 	uint_t oflags;
1011 
1012 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
1013 	(void) pthread_mutex_lock(&xip->xi_lock);
1014 
1015 	oflags = xip->xi_flags;
1016 	xip->xi_flags |= flags;
1017 
1018 	if (!(oflags & FMD_XPRT_SMASK) && (xip->xi_flags & FMD_XPRT_SMASK) != 0)
1019 		fmd_eventq_suspend(xip->xi_queue);
1020 
1021 	(void) pthread_cond_broadcast(&xip->xi_cv);
1022 
1023 	while (xip->xi_busy != 0)
1024 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
1025 
1026 	(void) pthread_mutex_unlock(&xip->xi_lock);
1027 }
1028 
1029 void
1030 fmd_xprt_xresume(fmd_xprt_t *xp, uint_t flags)
1031 {
1032 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1033 	uint_t oflags;
1034 
1035 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
1036 	(void) pthread_mutex_lock(&xip->xi_lock);
1037 
1038 	oflags = xip->xi_flags;
1039 	xip->xi_flags &= ~flags;
1040 
1041 	if ((oflags & FMD_XPRT_SMASK) != 0 && !(xip->xi_flags & FMD_XPRT_SMASK))
1042 		fmd_eventq_resume(xip->xi_queue);
1043 
1044 	(void) pthread_cond_broadcast(&xip->xi_cv);
1045 	(void) pthread_mutex_unlock(&xip->xi_lock);
1046 }
1047 
1048 void
1049 fmd_xprt_send(fmd_xprt_t *xp)
1050 {
1051 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1052 	fmd_module_t *mp = xip->xi_queue->eq_mod;
1053 	fmd_event_t *ep;
1054 	int err;
1055 
1056 	while ((ep = fmd_eventq_delete(xip->xi_queue)) != NULL) {
1057 		if (FMD_EVENT_TTL(ep) == 0) {
1058 			fmd_event_rele(ep);
1059 			continue;
1060 		}
1061 
1062 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u sending %s\n",
1063 		    xip->xi_id, (char *)FMD_EVENT_DATA(ep));
1064 
1065 		err = mp->mod_ops->mop_transport(mp, xp, ep);
1066 		fmd_eventq_done(xip->xi_queue);
1067 
1068 		if (err == FMD_SEND_RETRY) {
1069 			fmd_eventq_insert_at_time(xip->xi_queue, ep);
1070 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1071 			xip->xi_stats->xs_retried.fmds_value.ui64++;
1072 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1073 		}
1074 
1075 		if (err != FMD_SEND_SUCCESS && err != FMD_SEND_RETRY) {
1076 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1077 			xip->xi_stats->xs_lost.fmds_value.ui64++;
1078 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1079 		}
1080 
1081 		fmd_event_rele(ep);
1082 	}
1083 }
1084 
1085 /*
1086  * This function creates a local suspect list. This is used when a suspect list
1087  * is created directly by an external source like fminject.
1088  */
1089 static void
1090 fmd_xprt_list_suspect_local(fmd_xprt_t *xp, nvlist_t *nvl)
1091 {
1092 	nvlist_t **nvlp;
1093 	nvlist_t *de_fmri, *de_fmri_dup = NULL;
1094 	int64_t *diag_time;
1095 	char *code = NULL;
1096 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1097 	fmd_case_t *cp;
1098 	uint_t nelem = 0, nelem2 = 0, i;
1099 	boolean_t injected;
1100 
1101 	fmd_module_lock(xip->xi_queue->eq_mod);
1102 	cp = fmd_case_create(xip->xi_queue->eq_mod, NULL);
1103 	if (cp == NULL) {
1104 		fmd_module_unlock(xip->xi_queue->eq_mod);
1105 		return;
1106 	}
1107 
1108 	/*
1109 	 * copy diag_code if present
1110 	 */
1111 	(void) nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code);
1112 	if (code != NULL) {
1113 		fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1114 
1115 		cip->ci_precanned = 1;
1116 		fmd_case_setcode(cp, code);
1117 	}
1118 
1119 	/*
1120 	 * copy suspects
1121 	 */
1122 	(void) nvlist_lookup_nvlist_array(nvl, FM_SUSPECT_FAULT_LIST, &nvlp,
1123 	    &nelem);
1124 	for (i = 0; i < nelem; i++) {
1125 		nvlist_t *flt_copy, *asru = NULL, *fru = NULL, *rsrc = NULL;
1126 		topo_hdl_t *thp;
1127 		char *loc = NULL;
1128 		int err;
1129 
1130 		thp = fmd_fmri_topo_hold(TOPO_VERSION);
1131 		(void) nvlist_xdup(nvlp[i], &flt_copy, &fmd.d_nva);
1132 		(void) nvlist_lookup_nvlist(nvlp[i], FM_FAULT_RESOURCE, &rsrc);
1133 
1134 		/*
1135 		 * If no fru specified, get it from topo
1136 		 */
1137 		if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_FRU, &fru) != 0 &&
1138 		    rsrc && topo_fmri_fru(thp, rsrc, &fru, &err) == 0)
1139 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_FRU, fru);
1140 		/*
1141 		 * If no asru specified, get it from topo
1142 		 */
1143 		if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU, &asru) != 0 &&
1144 		    rsrc && topo_fmri_asru(thp, rsrc, &asru, &err) == 0)
1145 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_ASRU, asru);
1146 		/*
1147 		 * If no location specified, get it from topo
1148 		 */
1149 		if (nvlist_lookup_string(nvlp[i], FM_FAULT_LOCATION,
1150 		    &loc) != 0) {
1151 			if (fru && topo_fmri_label(thp, fru, &loc, &err) == 0)
1152 				(void) nvlist_add_string(flt_copy,
1153 				    FM_FAULT_LOCATION, loc);
1154 			else if (rsrc && topo_fmri_label(thp, rsrc, &loc,
1155 			    &err) == 0)
1156 				(void) nvlist_add_string(flt_copy,
1157 				    FM_FAULT_LOCATION, loc);
1158 			if (loc)
1159 				topo_hdl_strfree(thp, loc);
1160 		}
1161 		if (fru)
1162 			nvlist_free(fru);
1163 		if (asru)
1164 			nvlist_free(asru);
1165 		if (rsrc)
1166 			nvlist_free(rsrc);
1167 		fmd_fmri_topo_rele(thp);
1168 		fmd_case_insert_suspect(cp, flt_copy);
1169 	}
1170 
1171 	/*
1172 	 * copy diag_time if present
1173 	 */
1174 	if (nvlist_lookup_int64_array(nvl, FM_SUSPECT_DIAG_TIME, &diag_time,
1175 	    &nelem2) == 0 && nelem2 >= 2)
1176 		fmd_case_settime(cp, diag_time[0], diag_time[1]);
1177 
1178 	/*
1179 	 * copy DE fmri if present
1180 	 */
1181 	if (nvlist_lookup_nvlist(nvl, FM_SUSPECT_DE, &de_fmri) == 0) {
1182 		(void) nvlist_xdup(de_fmri, &de_fmri_dup, &fmd.d_nva);
1183 		fmd_case_set_de_fmri(cp, de_fmri_dup);
1184 	}
1185 
1186 	/*
1187 	 * copy injected if present
1188 	 */
1189 	if (nvlist_lookup_boolean_value(nvl, FM_SUSPECT_INJECTED,
1190 	    &injected) == 0 && injected)
1191 		fmd_case_set_injected(cp);
1192 
1193 	fmd_case_transition(cp, FMD_CASE_SOLVED, FMD_CF_SOLVED);
1194 	fmd_module_unlock(xip->xi_queue->eq_mod);
1195 }
1196 
1197 /*
1198  * This function is called to create a proxy case on receipt of a list.suspect
1199  * from the diagnosing side of the transport.
1200  */
1201 static void
1202 fmd_xprt_list_suspect(fmd_xprt_t *xp, nvlist_t *nvl)
1203 {
1204 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1205 	nvlist_t **nvlp;
1206 	uint_t nelem = 0, nelem2 = 0, i;
1207 	int64_t *diag_time;
1208 	topo_hdl_t *thp;
1209 	char *class;
1210 	nvlist_t *rsrc, *asru, *de_fmri, *de_fmri_dup = NULL;
1211 	nvlist_t *flt_copy;
1212 	int err;
1213 	nvlist_t **asrua;
1214 	uint8_t *proxy_asru = NULL;
1215 	int got_proxy_asru = 0;
1216 	int got_hc_rsrc = 0;
1217 	int got_hc_asru = 0;
1218 	int got_present_rsrc = 0;
1219 	uint8_t *diag_asru = NULL;
1220 	char *scheme;
1221 	uint8_t *statusp;
1222 	char *uuid, *code;
1223 	fmd_case_t *cp;
1224 	fmd_case_impl_t *cip;
1225 	int need_update = 0;
1226 	boolean_t injected;
1227 
1228 	if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) != 0)
1229 		return;
1230 	if (nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code) != 0)
1231 		return;
1232 	(void) nvlist_lookup_nvlist_array(nvl, FM_SUSPECT_FAULT_LIST, &nvlp,
1233 	    &nelem);
1234 
1235 	/*
1236 	 * In order to implement FMD_XPRT_HCONLY and FMD_XPRT_HC_PRESENT_ONLY
1237 	 * etc we first scan the suspects to see if
1238 	 * - there was an asru in the received fault
1239 	 * - there was an hc-scheme resource in the received fault
1240 	 * - any hc-scheme resource in the received fault is present in the
1241 	 *   local topology
1242 	 * - any hc-scheme resource in the received fault has an asru in the
1243 	 *   local topology
1244 	 */
1245 	if (nelem > 0) {
1246 		asrua = fmd_zalloc(sizeof (nvlist_t *) * nelem, FMD_SLEEP);
1247 		proxy_asru = fmd_zalloc(sizeof (uint8_t) * nelem, FMD_SLEEP);
1248 		diag_asru = fmd_zalloc(sizeof (uint8_t) * nelem, FMD_SLEEP);
1249 		thp = fmd_fmri_topo_hold(TOPO_VERSION);
1250 		for (i = 0; i < nelem; i++) {
1251 			if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU,
1252 			    &asru) == 0 && asru != NULL)
1253 				diag_asru[i] = 1;
1254 			if (nvlist_lookup_string(nvlp[i], FM_CLASS,
1255 			    &class) != 0 || strncmp(class, "fault", 5) != 0)
1256 				continue;
1257 			/*
1258 			 * If there is an hc-scheme asru, use that to find the
1259 			 * real asru. Otherwise if there is an hc-scheme
1260 			 * resource, work out the old asru from that.
1261 			 * This order is to allow a two stage evaluation
1262 			 * of the asru where a fault in the diagnosing side
1263 			 * is in a component not visible to the proxy side,
1264 			 * but prevents a component that is visible from
1265 			 * working. So the diagnosing side sets the asru to
1266 			 * the latter component (in hc-scheme as the diagnosing
1267 			 * side doesn't know about the proxy side's virtual
1268 			 * schemes), and then the proxy side can convert that
1269 			 * to a suitable virtual scheme asru.
1270 			 */
1271 			if (nvlist_lookup_nvlist(nvlp[i], FM_FAULT_ASRU,
1272 			    &asru) == 0 && asru != NULL &&
1273 			    nvlist_lookup_string(asru, FM_FMRI_SCHEME,
1274 			    &scheme) == 0 &&
1275 			    strcmp(scheme, FM_FMRI_SCHEME_HC) == 0) {
1276 				got_hc_asru = 1;
1277 				if (xip->xi_flags & FMD_XPRT_EXTERNAL)
1278 					continue;
1279 				if (topo_fmri_present(thp, asru, &err) != 0)
1280 					got_present_rsrc = 1;
1281 				if (topo_fmri_asru(thp, asru, &asrua[i],
1282 				    &err) == 0) {
1283 					proxy_asru[i] =
1284 					    FMD_PROXY_ASRU_FROM_ASRU;
1285 					got_proxy_asru = 1;
1286 				}
1287 			} else if (nvlist_lookup_nvlist(nvlp[i],
1288 			    FM_FAULT_RESOURCE, &rsrc) == 0 && rsrc != NULL &&
1289 			    nvlist_lookup_string(rsrc, FM_FMRI_SCHEME,
1290 			    &scheme) == 0 &&
1291 			    strcmp(scheme, FM_FMRI_SCHEME_HC) == 0) {
1292 				got_hc_rsrc = 1;
1293 				if (xip->xi_flags & FMD_XPRT_EXTERNAL)
1294 					continue;
1295 				if (topo_fmri_present(thp, rsrc, &err) != 0)
1296 					got_present_rsrc = 1;
1297 				if (topo_fmri_asru(thp, rsrc, &asrua[i],
1298 				    &err) == 0) {
1299 					proxy_asru[i] =
1300 					    FMD_PROXY_ASRU_FROM_RSRC;
1301 					got_proxy_asru = 1;
1302 				}
1303 			}
1304 		}
1305 		fmd_fmri_topo_rele(thp);
1306 	}
1307 
1308 	/*
1309 	 * If we're set up only to report hc-scheme faults, and
1310 	 * there aren't any, then just drop the event.
1311 	 */
1312 	if (got_hc_rsrc == 0 && got_hc_asru == 0 &&
1313 	    (xip->xi_flags & FMD_XPRT_HCONLY)) {
1314 		if (nelem > 0) {
1315 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1316 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1317 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1318 		}
1319 		return;
1320 	}
1321 
1322 	/*
1323 	 * If we're set up only to report locally present hc-scheme
1324 	 * faults, and there aren't any, then just drop the event.
1325 	 */
1326 	if (got_present_rsrc == 0 &&
1327 	    (xip->xi_flags & FMD_XPRT_HC_PRESENT_ONLY)) {
1328 		if (nelem > 0) {
1329 			for (i = 0; i < nelem; i++)
1330 				if (asrua[i])
1331 					nvlist_free(asrua[i]);
1332 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1333 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1334 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1335 		}
1336 		return;
1337 	}
1338 
1339 	/*
1340 	 * If fmd_case_recreate() returns NULL, UUID is already known.
1341 	 */
1342 	fmd_module_lock(xip->xi_queue->eq_mod);
1343 	if ((cp = fmd_case_recreate(xip->xi_queue->eq_mod, xp,
1344 	    FMD_CASE_UNSOLVED, uuid, code)) == NULL) {
1345 		if (nelem > 0) {
1346 			for (i = 0; i < nelem; i++)
1347 				if (asrua[i])
1348 					nvlist_free(asrua[i]);
1349 			fmd_free(proxy_asru, sizeof (uint8_t) * nelem);
1350 			fmd_free(diag_asru, sizeof (uint8_t) * nelem);
1351 			fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1352 		}
1353 		fmd_module_unlock(xip->xi_queue->eq_mod);
1354 		return;
1355 	}
1356 
1357 	cip = (fmd_case_impl_t *)cp;
1358 	cip->ci_diag_asru = diag_asru;
1359 	cip->ci_proxy_asru = proxy_asru;
1360 	for (i = 0; i < nelem; i++) {
1361 		(void) nvlist_xdup(nvlp[i], &flt_copy, &fmd.d_nva);
1362 		if (proxy_asru[i] != FMD_PROXY_ASRU_NOT_NEEDED) {
1363 			/*
1364 			 * Copy suspects, but remove/replace asru first. Also if
1365 			 * the original asru was hc-scheme use that as resource.
1366 			 */
1367 			if (proxy_asru[i] == FMD_PROXY_ASRU_FROM_ASRU) {
1368 				(void) nvlist_remove(flt_copy,
1369 				    FM_FAULT_RESOURCE, DATA_TYPE_NVLIST);
1370 				(void) nvlist_lookup_nvlist(flt_copy,
1371 				    FM_FAULT_ASRU, &asru);
1372 				(void) nvlist_add_nvlist(flt_copy,
1373 				    FM_FAULT_RESOURCE, asru);
1374 			}
1375 			(void) nvlist_remove(flt_copy, FM_FAULT_ASRU,
1376 			    DATA_TYPE_NVLIST);
1377 			(void) nvlist_add_nvlist(flt_copy, FM_FAULT_ASRU,
1378 			    asrua[i]);
1379 			nvlist_free(asrua[i]);
1380 		} else if (got_hc_asru == 0 &&
1381 		    nvlist_lookup_nvlist(flt_copy, FM_FAULT_ASRU,
1382 		    &asru) == 0 && asru != NULL) {
1383 			/*
1384 			 * If we have an asru from diag side, but it's not
1385 			 * in hc scheme, then we can't be sure what it
1386 			 * represents, so mark as no retire.
1387 			 */
1388 			(void) nvlist_add_boolean_value(flt_copy,
1389 			    FM_SUSPECT_RETIRE, B_FALSE);
1390 		}
1391 		fmd_case_insert_suspect(cp, flt_copy);
1392 	}
1393 	/*
1394 	 * copy diag_time
1395 	 */
1396 	if (nvlist_lookup_int64_array(nvl, FM_SUSPECT_DIAG_TIME, &diag_time,
1397 	    &nelem2) == 0 && nelem2 >= 2)
1398 		fmd_case_settime(cp, diag_time[0], diag_time[1]);
1399 	/*
1400 	 * copy DE fmri
1401 	 */
1402 	if (nvlist_lookup_nvlist(nvl, FM_SUSPECT_DE, &de_fmri) == 0) {
1403 		(void) nvlist_xdup(de_fmri, &de_fmri_dup, &fmd.d_nva);
1404 		fmd_case_set_de_fmri(cp, de_fmri_dup);
1405 	}
1406 
1407 	/*
1408 	 * copy injected if present
1409 	 */
1410 	if (nvlist_lookup_boolean_value(nvl, FM_SUSPECT_INJECTED,
1411 	    &injected) == 0 && injected)
1412 		fmd_case_set_injected(cp);
1413 
1414 	/*
1415 	 * Transition to solved. This will log the suspect list and create
1416 	 * the resource cache entries.
1417 	 */
1418 	fmd_case_transition(cp, FMD_CASE_SOLVED, FMD_CF_SOLVED);
1419 
1420 	/*
1421 	 * Update status if it is not simply "all faulty" (can happen if
1422 	 * list.suspects are being re-sent when the transport has reconnected).
1423 	 */
1424 	(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS, &statusp,
1425 	    &nelem);
1426 	for (i = 0; i < nelem; i++) {
1427 		if ((statusp[i] & (FM_SUSPECT_FAULTY | FM_SUSPECT_UNUSABLE |
1428 		    FM_SUSPECT_NOT_PRESENT | FM_SUSPECT_DEGRADED)) !=
1429 		    FM_SUSPECT_FAULTY)
1430 			need_update = 1;
1431 	}
1432 	if (need_update) {
1433 		fmd_case_update_status(cp, statusp, cip->ci_proxy_asru,
1434 		    cip->ci_diag_asru);
1435 		fmd_case_update_containees(cp);
1436 		fmd_case_update(cp);
1437 	}
1438 
1439 	/*
1440 	 * if asru on proxy side, send an update back to the diagnosing side to
1441 	 * update UNUSABLE/DEGRADED.
1442 	 */
1443 	if (got_proxy_asru)
1444 		fmd_case_xprt_updated(cp);
1445 
1446 	if (nelem > 0)
1447 		fmd_free(asrua, sizeof (nvlist_t *) * nelem);
1448 	fmd_module_unlock(xip->xi_queue->eq_mod);
1449 }
1450 
1451 void
1452 fmd_xprt_recv(fmd_xprt_t *xp, nvlist_t *nvl, hrtime_t hrt, boolean_t logonly)
1453 {
1454 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1455 	const fmd_xprt_rule_t *xrp;
1456 	fmd_t *dp = &fmd;
1457 
1458 	fmd_event_t *e;
1459 	char *class, *uuid;
1460 	boolean_t isproto, isereport;
1461 
1462 	uint64_t *tod;
1463 	uint8_t ttl;
1464 	uint_t n;
1465 	fmd_case_t *cp;
1466 
1467 	/*
1468 	 * Grab the transport lock and set the busy flag to indicate we are
1469 	 * busy receiving an event.  If [DI]SUSPEND is pending, wait until fmd
1470 	 * resumes the transport before continuing on with the receive.
1471 	 */
1472 	(void) pthread_mutex_lock(&xip->xi_lock);
1473 
1474 	while (xip->xi_flags & (FMD_XPRT_DSUSPENDED | FMD_XPRT_ISUSPENDED)) {
1475 
1476 		if (fmd.d_signal != 0) {
1477 			(void) pthread_mutex_unlock(&xip->xi_lock);
1478 			return; /* fmd_destroy() is in progress */
1479 		}
1480 
1481 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
1482 	}
1483 
1484 	xip->xi_busy++;
1485 	ASSERT(xip->xi_busy != 0);
1486 
1487 	(void) pthread_mutex_unlock(&xip->xi_lock);
1488 
1489 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
1490 	xip->xi_stats->xs_received.fmds_value.ui64++;
1491 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1492 
1493 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) {
1494 		fmd_error(EFMD_XPRT_PAYLOAD, "discarding nvlist %p: missing "
1495 		    "required \"%s\" payload element", (void *)nvl, FM_CLASS);
1496 
1497 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1498 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1499 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1500 
1501 		nvlist_free(nvl);
1502 		goto done;
1503 	}
1504 
1505 	fmd_dprintf(FMD_DBG_XPRT, "xprt %u %s %s\n", xip->xi_id,
1506 	    ((logonly == FMD_B_TRUE) ? "logging" : "posting"), class);
1507 
1508 	isereport = (strncmp(class, FM_EREPORT_CLASS,
1509 	    sizeof (FM_EREPORT_CLASS - 1)) == 0) ? FMD_B_TRUE : FMD_B_FALSE;
1510 
1511 	/*
1512 	 * The logonly flag should only be set for ereports.
1513 	 */
1514 	if ((logonly == FMD_B_TRUE) && (isereport == FMD_B_FALSE)) {
1515 		fmd_error(EFMD_XPRT_INVAL, "discarding nvlist %p: "
1516 		    "logonly flag is not valid for class %s",
1517 		    (void *)nvl, class);
1518 
1519 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1520 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1521 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1522 
1523 		nvlist_free(nvl);
1524 		goto done;
1525 	}
1526 
1527 	/*
1528 	 * If a time-to-live value is present in the event and is zero, drop
1529 	 * the event and bump xs_timeouts.  Otherwise decrement the TTL value.
1530 	 */
1531 	if (nvlist_lookup_uint8(nvl, FMD_EVN_TTL, &ttl) == 0) {
1532 		if (ttl == 0) {
1533 			fmd_dprintf(FMD_DBG_XPRT, "xprt %u nvlist %p (%s) "
1534 			    "timeout: event received with ttl=0\n",
1535 			    xip->xi_id, (void *)nvl, class);
1536 
1537 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1538 			xip->xi_stats->xs_timeouts.fmds_value.ui64++;
1539 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1540 
1541 			nvlist_free(nvl);
1542 			goto done;
1543 		}
1544 		(void) nvlist_remove(nvl, FMD_EVN_TTL, DATA_TYPE_UINT8);
1545 		(void) nvlist_add_uint8(nvl, FMD_EVN_TTL, ttl - 1);
1546 	}
1547 
1548 	/*
1549 	 * If we are using the native system clock, the underlying transport
1550 	 * code can provide a tighter event time bound by telling us when the
1551 	 * event was enqueued.  If we're using simulated clocks, this time
1552 	 * has no meaning to us, so just reset the value to use HRT_NOW.
1553 	 */
1554 	if (dp->d_clockops != &fmd_timeops_native)
1555 		hrt = FMD_HRT_NOW;
1556 
1557 	/*
1558 	 * If an event's class is in the FMD_CTL_CLASS family, then create a
1559 	 * control event.  If a FMD_EVN_TOD member is found, create a protocol
1560 	 * event using this time.  Otherwise create a protocol event using hrt.
1561 	 */
1562 	isproto = (strncmp(class, FMD_CTL_CLASS, FMD_CTL_CLASS_LEN) == 0) ?
1563 	    FMD_B_FALSE : FMD_B_TRUE;
1564 	if (isproto == FMD_B_FALSE)
1565 		e = fmd_event_create(FMD_EVT_CTL, hrt, nvl, fmd_ctl_init(nvl));
1566 	else if (nvlist_lookup_uint64_array(nvl, FMD_EVN_TOD, &tod, &n) != 0)
1567 		e = fmd_event_create(FMD_EVT_PROTOCOL, hrt, nvl, class);
1568 	else {
1569 		e = fmd_event_recreate(FMD_EVT_PROTOCOL,
1570 		    NULL, nvl, class, NULL, 0, 0);
1571 	}
1572 
1573 	/*
1574 	 * If the debug log is enabled, create a temporary event, log it to the
1575 	 * debug log, and then reset the underlying state of the event.
1576 	 */
1577 	if (xip->xi_log != NULL) {
1578 		fmd_event_impl_t *ep = (fmd_event_impl_t *)e;
1579 
1580 		fmd_log_append(xip->xi_log, e, NULL);
1581 
1582 		ep->ev_flags |= FMD_EVF_VOLATILE;
1583 		ep->ev_off = 0;
1584 		ep->ev_len = 0;
1585 
1586 		if (ep->ev_log != NULL) {
1587 			fmd_log_rele(ep->ev_log);
1588 			ep->ev_log = NULL;
1589 		}
1590 	}
1591 
1592 	/*
1593 	 * Iterate over the rules for the current state trying to match the
1594 	 * event class to one of our special rules.  If a rule is matched, the
1595 	 * event is consumed and not dispatched to other modules.  If the rule
1596 	 * set ends without matching an event, we fall through to dispatching.
1597 	 */
1598 	for (xrp = xip->xi_state; xrp->xr_class != NULL; xrp++) {
1599 		if (fmd_event_match(e, FMD_EVT_PROTOCOL, xrp->xr_class)) {
1600 			fmd_event_hold(e);
1601 			xrp->xr_func(xip, nvl);
1602 			fmd_event_rele(e);
1603 			goto done;
1604 		}
1605 	}
1606 
1607 	/*
1608 	 * Record the event in the errlog if it is an ereport.  This code will
1609 	 * be replaced later with a per-transport intent log instead.
1610 	 */
1611 	if (isereport == FMD_B_TRUE) {
1612 		(void) pthread_rwlock_rdlock(&dp->d_log_lock);
1613 		fmd_log_append(dp->d_errlog, e, NULL);
1614 		(void) pthread_rwlock_unlock(&dp->d_log_lock);
1615 	}
1616 
1617 	/*
1618 	 * If a list.suspect event is received, create a case for the specified
1619 	 * UUID in the case hash, with the transport module as its owner.
1620 	 */
1621 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_SUSPECT_CLASS)) {
1622 		if (xip->xi_flags & FMD_XPRT_CACHE_AS_LOCAL)
1623 			fmd_xprt_list_suspect_local(xp, nvl);
1624 		else
1625 			fmd_xprt_list_suspect(xp, nvl);
1626 		fmd_event_hold(e);
1627 		fmd_event_rele(e);
1628 		goto done;
1629 	}
1630 
1631 	/*
1632 	 * If a list.updated or list.repaired event is received, update the
1633 	 * resource cache status and the local case.
1634 	 */
1635 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_REPAIRED_CLASS) ||
1636 	    fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_UPDATED_CLASS)) {
1637 		uint8_t *statusp;
1638 		uint_t nelem = 0;
1639 
1640 		(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS,
1641 		    &statusp, &nelem);
1642 		fmd_module_lock(xip->xi_queue->eq_mod);
1643 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1644 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1645 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1646 			if (cip->ci_xprt != NULL) {
1647 				fmd_case_update_status(cp, statusp,
1648 				    cip->ci_proxy_asru, cip->ci_diag_asru);
1649 				fmd_case_update_containees(cp);
1650 				fmd_case_update(cp);
1651 			}
1652 			fmd_case_rele(cp);
1653 		}
1654 		fmd_module_unlock(xip->xi_queue->eq_mod);
1655 		fmd_event_hold(e);
1656 		fmd_event_rele(e);
1657 		goto done;
1658 	}
1659 
1660 	/*
1661 	 * If a list.isolated event is received, update resource cache status
1662 	 */
1663 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_ISOLATED_CLASS)) {
1664 		uint8_t *statusp;
1665 		uint_t nelem = 0;
1666 
1667 		(void) nvlist_lookup_uint8_array(nvl, FM_SUSPECT_FAULT_STATUS,
1668 		    &statusp, &nelem);
1669 		fmd_module_lock(xip->xi_queue->eq_mod);
1670 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1671 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1672 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1673 			if (cip->ci_xprt != NULL)
1674 				fmd_case_update_status(cp, statusp,
1675 				    cip->ci_proxy_asru, cip->ci_diag_asru);
1676 			fmd_case_rele(cp);
1677 		}
1678 		fmd_module_unlock(xip->xi_queue->eq_mod);
1679 		fmd_event_hold(e);
1680 		fmd_event_rele(e);
1681 		goto done;
1682 	}
1683 
1684 	/*
1685 	 * If a list.resolved event is received, resolve the local case.
1686 	 */
1687 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_RESOLVED_CLASS)) {
1688 		fmd_module_lock(xip->xi_queue->eq_mod);
1689 		if (nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1690 		    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
1691 			fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
1692 			if (cip->ci_xprt != NULL)
1693 				fmd_case_transition(cp, (cip->ci_state ==
1694 				    FMD_CASE_REPAIRED) ? FMD_CASE_RESOLVED :
1695 				    (cip->ci_state == FMD_CASE_CLOSED) ?
1696 				    FMD_CASE_REPAIRED : FMD_CASE_CLOSE_WAIT,
1697 				    FMD_CF_RESOLVED);
1698 			fmd_case_rele(cp);
1699 		}
1700 		fmd_module_unlock(xip->xi_queue->eq_mod);
1701 		fmd_event_hold(e);
1702 		fmd_event_rele(e);
1703 		goto done;
1704 	}
1705 
1706 	if (logonly == FMD_B_TRUE || (xip->xi_flags & FMD_XPRT_EXTERNAL)) {
1707 		/*
1708 		 * Don't proxy ereports on an EXTERNAL transport - we won't
1709 		 * know how to diagnose them with the wrong topology. Note
1710 		 * that here (and above) we have to hold/release the event in
1711 		 * order for it to be freed.
1712 		 */
1713 		fmd_event_hold(e);
1714 		fmd_event_rele(e);
1715 	} else if (isproto == FMD_B_TRUE)
1716 		fmd_dispq_dispatch(dp->d_disp, e, class);
1717 	else
1718 		fmd_modhash_dispatch(dp->d_mod_hash, e);
1719 done:
1720 	(void) pthread_mutex_lock(&xip->xi_lock);
1721 
1722 	ASSERT(xip->xi_busy != 0);
1723 	xip->xi_busy--;
1724 
1725 	(void) pthread_cond_broadcast(&xip->xi_cv);
1726 	(void) pthread_mutex_unlock(&xip->xi_lock);
1727 }
1728 
1729 void
1730 fmd_xprt_uuclose(fmd_xprt_t *xp, const char *uuid)
1731 {
1732 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1733 
1734 	fmd_event_t *e;
1735 	nvlist_t *nvl;
1736 	char *s;
1737 
1738 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1739 		return; /* read-only transports do not proxy uuclose */
1740 
1741 	TRACE((FMD_DBG_XPRT, "xprt %u closing case %s\n", xip->xi_id, uuid));
1742 
1743 	nvl = fmd_protocol_xprt_uuclose(xip->xi_queue->eq_mod,
1744 	    "resource.fm.xprt.uuclose", xip->xi_version, uuid);
1745 
1746 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1747 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1748 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1749 }
1750 
1751 /*
1752  * On proxy side, send back uuresolved request to diagnosing side
1753  */
1754 void
1755 fmd_xprt_uuresolved(fmd_xprt_t *xp, const char *uuid)
1756 {
1757 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1758 
1759 	fmd_event_t *e;
1760 	nvlist_t *nvl;
1761 	char *s;
1762 
1763 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1764 		return; /* read-only transports do not proxy uuresolved */
1765 
1766 	TRACE((FMD_DBG_XPRT, "xprt %u resolving case %s\n", xip->xi_id, uuid));
1767 
1768 	nvl = fmd_protocol_xprt_uuresolved(xip->xi_queue->eq_mod,
1769 	    "resource.fm.xprt.uuresolved", xip->xi_version, uuid);
1770 
1771 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1772 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1773 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1774 }
1775 
1776 /*
1777  * On proxy side, send back repair/acquit/etc request to diagnosing side
1778  */
1779 void
1780 fmd_xprt_updated(fmd_xprt_t *xp, const char *uuid, uint8_t *statusp,
1781 	uint8_t *has_asrup, uint_t nelem)
1782 {
1783 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1784 
1785 	fmd_event_t *e;
1786 	nvlist_t *nvl;
1787 	char *s;
1788 
1789 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1790 		return; /* read-only transports do not support remote repairs */
1791 
1792 	TRACE((FMD_DBG_XPRT, "xprt %u updating case %s\n", xip->xi_id, uuid));
1793 
1794 	nvl = fmd_protocol_xprt_updated(xip->xi_queue->eq_mod,
1795 	    "resource.fm.xprt.updated", xip->xi_version, uuid, statusp,
1796 	    has_asrup, nelem);
1797 
1798 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1799 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1800 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1801 }
1802 
1803 /*
1804  * Insert the specified class into our remote subscription hash.  If the class
1805  * is already present, bump the reference count; otherwise add it to the hash
1806  * and then enqueue an event for our remote peer to proxy our subscription.
1807  */
1808 void
1809 fmd_xprt_subscribe(fmd_xprt_t *xp, const char *class)
1810 {
1811 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1812 
1813 	uint_t refs;
1814 	nvlist_t *nvl;
1815 	fmd_event_t *e;
1816 	char *s;
1817 
1818 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1819 		return; /* read-only transports do not proxy subscriptions */
1820 
1821 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1822 		return; /* transport is not yet an active subscriber */
1823 
1824 	(void) pthread_mutex_lock(&xip->xi_lock);
1825 	refs = fmd_xprt_class_hash_insert(xip, &xip->xi_rsub, class);
1826 	(void) pthread_mutex_unlock(&xip->xi_lock);
1827 
1828 	if (refs > 1)
1829 		return; /* we've already asked our peer for this subscription */
1830 
1831 	fmd_dprintf(FMD_DBG_XPRT,
1832 	    "xprt %u subscribing to %s\n", xip->xi_id, class);
1833 
1834 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1835 	    "resource.fm.xprt.subscribe", xip->xi_version, class);
1836 
1837 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1838 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1839 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1840 }
1841 
1842 /*
1843  * Delete the specified class from the remote subscription hash.  If the
1844  * reference count drops to zero, ask our remote peer to unsubscribe by proxy.
1845  */
1846 void
1847 fmd_xprt_unsubscribe(fmd_xprt_t *xp, const char *class)
1848 {
1849 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1850 
1851 	uint_t refs;
1852 	nvlist_t *nvl;
1853 	fmd_event_t *e;
1854 	char *s;
1855 
1856 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1857 		return; /* read-only transports do not proxy subscriptions */
1858 
1859 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1860 		return; /* transport is not yet an active subscriber */
1861 
1862 	/*
1863 	 * If the subscription reference count drops to zero in xi_rsub, insert
1864 	 * an entry into the xi_usub hash indicating we await an unsuback event.
1865 	 */
1866 	(void) pthread_mutex_lock(&xip->xi_lock);
1867 
1868 	if ((refs = fmd_xprt_class_hash_delete(xip, &xip->xi_rsub, class)) == 0)
1869 		(void) fmd_xprt_class_hash_insert(xip, &xip->xi_usub, class);
1870 
1871 	(void) pthread_mutex_unlock(&xip->xi_lock);
1872 
1873 	if (refs != 0)
1874 		return; /* other subscriptions for this class still active */
1875 
1876 	fmd_dprintf(FMD_DBG_XPRT,
1877 	    "xprt %u unsubscribing from %s\n", xip->xi_id, class);
1878 
1879 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1880 	    "resource.fm.xprt.unsubscribe", xip->xi_version, class);
1881 
1882 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1883 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1884 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1885 }
1886 
1887 static void
1888 fmd_xprt_subscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1889 {
1890 	fmd_xprt_t *xp;
1891 
1892 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1893 		fmd_xprt_subscribe(xp, class);
1894 		fmd_idspace_rele(ids, id);
1895 	}
1896 }
1897 
1898 void
1899 fmd_xprt_subscribe_all(const char *class)
1900 {
1901 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1902 
1903 	if (ids->ids_count != 0)
1904 		fmd_idspace_apply(ids, fmd_xprt_subscribe_xid, (void *)class);
1905 }
1906 
1907 static void
1908 fmd_xprt_unsubscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1909 {
1910 	fmd_xprt_t *xp;
1911 
1912 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1913 		fmd_xprt_unsubscribe(xp, class);
1914 		fmd_idspace_rele(ids, id);
1915 	}
1916 }
1917 
1918 void
1919 fmd_xprt_unsubscribe_all(const char *class)
1920 {
1921 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1922 
1923 	if (ids->ids_count != 0)
1924 		fmd_idspace_apply(ids, fmd_xprt_unsubscribe_xid, (void *)class);
1925 }
1926 
1927 /*ARGSUSED*/
1928 static void
1929 fmd_xprt_suspend_xid(fmd_idspace_t *ids, id_t id, void *arg)
1930 {
1931 	fmd_xprt_t *xp;
1932 
1933 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1934 		fmd_xprt_xsuspend(xp, FMD_XPRT_DSUSPENDED);
1935 		fmd_idspace_rele(ids, id);
1936 	}
1937 }
1938 
1939 void
1940 fmd_xprt_suspend_all(void)
1941 {
1942 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1943 
1944 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1945 
1946 	if (fmd.d_xprt_suspend++ != 0) {
1947 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1948 		return; /* already suspended */
1949 	}
1950 
1951 	if (ids->ids_count != 0)
1952 		fmd_idspace_apply(ids, fmd_xprt_suspend_xid, NULL);
1953 
1954 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1955 }
1956 
1957 /*ARGSUSED*/
1958 static void
1959 fmd_xprt_resume_xid(fmd_idspace_t *ids, id_t id, void *arg)
1960 {
1961 	fmd_xprt_t *xp;
1962 
1963 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1964 		fmd_xprt_xresume(xp, FMD_XPRT_DSUSPENDED);
1965 		fmd_idspace_rele(ids, id);
1966 	}
1967 }
1968 
1969 void
1970 fmd_xprt_resume_all(void)
1971 {
1972 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1973 
1974 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1975 
1976 	if (fmd.d_xprt_suspend == 0)
1977 		fmd_panic("fmd_xprt_suspend/resume_all mismatch\n");
1978 
1979 	if (--fmd.d_xprt_suspend != 0) {
1980 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1981 		return; /* not ready to be resumed */
1982 	}
1983 
1984 	if (ids->ids_count != 0)
1985 		fmd_idspace_apply(ids, fmd_xprt_resume_xid, NULL);
1986 
1987 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1988 }
1989