xref: /titanic_44/usr/src/cmd/fm/fmd/common/fmd_xprt.c (revision 6185db853e024a486ff8837e6784dd290d866112)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * FMD Transport Subsystem
31  *
32  * A transport module uses some underlying mechanism to transport events.
33  * This mechanism may use any underlying link-layer protocol and may support
34  * additional link-layer packets unrelated to FMA.  Some appropriate link-
35  * layer mechanism to create the underlying connection is expected to be
36  * called prior to calling fmd_xprt_open() itself.  Alternatively, a transport
37  * may be created in the suspended state by specifying the FMD_XPRT_SUSPENDED
38  * flag as part of the call to fmd_xprt_open(), and then may be resumed later.
39  * The underlying transport mechanism is *required* to provide ordering: that
40  * is, the sequences of bytes written across the transport must be read by
41  * the remote peer in the order that they are written, even across separate
42  * calls to fmdo_send().  As an example, the Internet TCP protocol would be
43  * a valid transport as it guarantees ordering, whereas the Internet UDP
44  * protocol would not because UDP datagrams may be delivered in any order
45  * as a result of delays introduced when datagrams pass through routers.
46  *
47  * Similar to sending events, a transport module receives events that are from
48  * its peer remote endpoint using some transport-specific mechanism that is
49  * unknown to FMD.  As each event is received, the transport module is
50  * responsible for constructing a valid nvlist_t object from the data and then
51  * calling fmd_xprt_post() to post the event to the containing FMD's dispatch
52  * queue, making it available to all local modules that are not transport
53  * modules that have subscribed to the event.
54  *
55  * The following state machine is used for each transport.  The initial state
56  * is either SYN, ACK, or RUN, depending on the flags specified to xprt_create.
57  *
58  *       FMD_XPRT_ACCEPT   !FMD_XPRT_ACCEPT
59  *             |                 |
60  * waiting  +--v--+           +--v--+  waiting
61  * for syn  | SYN |--+     --+| ACK |  for ack
62  * event    +-----+   \   /   +-----+  event
63  *             |       \ /       |
64  * drop all +--v--+     X     +--v--+  send subscriptions,
65  * events   | ERR |<---+ +--->| SUB |  recv subscriptions,
66  *          +-----+           +-----+  wait for run event
67  *             ^                 |
68  *             |     +-----+     |
69  *             +-----| RUN |<----+
70  *                   +--^--+
71  *                      |
72  *               FMD_XPRT_RDONLY
73  *
74  * When fmd_xprt_open() is called without FMD_XPRT_ACCEPT, the Common Transport
75  * Layer enqueues a "syn" event for the module in its event queue and sets the
76  * state to ACK.  In state ACK, we are waiting for the transport to get an
77  * "ack" event and call fmd_xprt_post() on this event.  Other events will be
78  * discarded.  If an "ack" is received, we transition to state SUB.  If a
79  * configurable timeout occurs or if the "ack" is invalid (e.g. invalid version
80  * exchange), we transition to state ERR.  Once in state ERR, no further
81  * operations are valid except fmd_xprt_close() and fmd_xprt_error() will
82  * return a non-zero value to the caller indicating the transport has failed.
83  *
84  * When fmd_xprt_open() is called with FMD_XPRT_ACCEPT, the Common Transport
85  * Layer assumes this transport is being used to accept a virtual connection
86  * from a remote peer that is sending a "syn", and sets the initial state to
87  * SYN.  In this state, the transport waits for a "syn" event, validates it,
88  * and then transitions to state SUB if it is valid or state ERR if it is not.
89  *
90  * Once in state SUB, the transport module is expected to receive a sequence of
91  * zero or more "subscribe" events from the remote peer, followed by a "run"
92  * event.  Once in state RUN, the transport is active and any events can be
93  * sent or received.  The transport module is free to call fmd_xprt_close()
94  * from any state.  The fmd_xprt_error() function will return zero if the
95  * transport is not in the ERR state, or non-zero if it is in the ERR state.
96  *
97  * Once the state machine reaches RUN, other FMA protocol events can be sent
98  * and received across the transport in addition to the various control events.
99  *
100  * Table of Common Transport Layer Control Events
101  * ==============================================
102  *
103  * FMA Class                     Payload
104  * ---------                     -------
105  * resource.fm.xprt.uuclose      string (uuid of case)
106  * resource.fm.xprt.subscribe    string (class pattern)
107  * resource.fm.xprt.unsubscribe  string (class pattern)
108  * resource.fm.xprt.unsuback     string (class pattern)
109  * resource.fm.xprt.syn          version information
110  * resource.fm.xprt.ack          version information
111  * resource.fm.xprt.run          version information
112  *
113  * Control events are used to add and delete proxy subscriptions on the remote
114  * transport peer module, and to set up connections.  When a "syn" event is
115  * sent, FMD will include in the payload the highest version of the FMA event
116  * protocol that is supported by the sender.  When a "syn" event is received,
117  * the receiving FMD will use the minimum of this version and its version of
118  * the protocol, and reply with this new minimum version in the "ack" event.
119  * The receiver will then use this new minimum for subsequent event semantics.
120  */
121 
122 #include <sys/fm/protocol.h>
123 #include <strings.h>
124 #include <limits.h>
125 
126 #include <fmd_alloc.h>
127 #include <fmd_error.h>
128 #include <fmd_conf.h>
129 #include <fmd_subr.h>
130 #include <fmd_string.h>
131 #include <fmd_protocol.h>
132 #include <fmd_thread.h>
133 #include <fmd_eventq.h>
134 #include <fmd_dispq.h>
135 #include <fmd_ctl.h>
136 #include <fmd_log.h>
137 #include <fmd_ustat.h>
138 #include <fmd_case.h>
139 #include <fmd_api.h>
140 #include <fmd_fmri.h>
141 #include <fmd_asru.h>
142 #include <fmd_xprt.h>
143 
144 #include <fmd.h>
145 
146 /*
147  * The states shown above in the transport state machine diagram are encoded
148  * using arrays of class patterns and a corresponding action function.  These
149  * arrays are then passed to fmd_xprt_transition() to change transport states.
150  */
151 
152 const fmd_xprt_rule_t _fmd_xprt_state_syn[] = {
153 { "resource.fm.xprt.syn", fmd_xprt_event_syn },
154 { "*", fmd_xprt_event_error },
155 { NULL, NULL }
156 };
157 
158 const fmd_xprt_rule_t _fmd_xprt_state_ack[] = {
159 { "resource.fm.xprt.ack", fmd_xprt_event_ack },
160 { "*", fmd_xprt_event_error },
161 };
162 
163 const fmd_xprt_rule_t _fmd_xprt_state_err[] = {
164 { "*", fmd_xprt_event_drop },
165 { NULL, NULL }
166 };
167 
168 const fmd_xprt_rule_t _fmd_xprt_state_sub[] = {
169 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
170 { "resource.fm.xprt.run", fmd_xprt_event_run },
171 { "resource.fm.xprt.*", fmd_xprt_event_error },
172 { "*", fmd_xprt_event_drop },
173 { NULL, NULL }
174 };
175 
176 const fmd_xprt_rule_t _fmd_xprt_state_run[] = {
177 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
178 { "resource.fm.xprt.unsubscribe", fmd_xprt_event_unsub },
179 { "resource.fm.xprt.unsuback", fmd_xprt_event_unsuback },
180 { "resource.fm.xprt.uuclose", fmd_xprt_event_uuclose },
181 { "resource.fm.xprt.*", fmd_xprt_event_error },
182 { NULL, NULL }
183 };
184 
185 /*
186  * Template for per-transport statistics installed by fmd on behalf of each
187  * transport.  These are used to initialize the per-transport xi_stats.  For
188  * each statistic, the name is prepended with "fmd.xprt.%u", where %u is the
189  * transport ID (xi_id) and then are inserted into the per-module stats hash.
190  * The values in this array must match fmd_xprt_stat_t from <fmd_xprt.h>.
191  */
192 static const fmd_xprt_stat_t _fmd_xprt_stat_tmpl = {
193 {
194 { "dispatched", FMD_TYPE_UINT64, "total events dispatched to transport" },
195 { "dequeued", FMD_TYPE_UINT64, "total events dequeued by transport" },
196 { "prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by transport" },
197 { "dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" },
198 { "wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" },
199 { "wtime", FMD_TYPE_TIME, "total wait time on queue" },
200 { "wlentime", FMD_TYPE_TIME, "total wait length * time product" },
201 { "wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" },
202 { "dtime", FMD_TYPE_TIME, "total processing time after dequeue" },
203 { "dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" },
204 },
205 { "module", FMD_TYPE_STRING, "module that owns this transport" },
206 { "authority", FMD_TYPE_STRING, "authority associated with this transport" },
207 { "state", FMD_TYPE_STRING, "current transport state" },
208 { "received", FMD_TYPE_UINT64, "events received by transport" },
209 { "discarded", FMD_TYPE_UINT64, "bad events discarded by transport" },
210 { "retried", FMD_TYPE_UINT64, "retries requested of transport" },
211 { "replayed", FMD_TYPE_UINT64, "events replayed by transport" },
212 { "lost", FMD_TYPE_UINT64, "events lost by transport" },
213 { "timeouts", FMD_TYPE_UINT64, "events received by transport with ttl=0" },
214 { "subscriptions", FMD_TYPE_UINT64, "subscriptions registered to transport" },
215 };
216 
217 static void
218 fmd_xprt_class_hash_create(fmd_xprt_class_hash_t *xch, fmd_eventq_t *eq)
219 {
220 	uint_t hashlen = fmd.d_str_buckets;
221 
222 	xch->xch_queue = eq;
223 	xch->xch_hashlen = hashlen;
224 	xch->xch_hash = fmd_zalloc(sizeof (void *) * hashlen, FMD_SLEEP);
225 }
226 
227 static void
228 fmd_xprt_class_hash_destroy(fmd_xprt_class_hash_t *xch)
229 {
230 	fmd_eventq_t *eq = xch->xch_queue;
231 	fmd_xprt_class_t *xcp, *ncp;
232 	uint_t i;
233 
234 	for (i = 0; i < xch->xch_hashlen; i++) {
235 		for (xcp = xch->xch_hash[i]; xcp != NULL; xcp = ncp) {
236 			ncp = xcp->xc_next;
237 
238 			if (eq != NULL)
239 				fmd_dispq_delete(fmd.d_disp, eq, xcp->xc_class);
240 
241 			fmd_strfree(xcp->xc_class);
242 			fmd_free(xcp, sizeof (fmd_xprt_class_t));
243 		}
244 	}
245 
246 	fmd_free(xch->xch_hash, sizeof (void *) * xch->xch_hashlen);
247 }
248 
249 /*
250  * Insert the specified class into the specified class hash, and return the
251  * reference count.  A return value of one indicates this is the first insert.
252  * If an eventq is associated with the hash, insert a dispq subscription for it.
253  */
254 static uint_t
255 fmd_xprt_class_hash_insert(fmd_xprt_impl_t *xip,
256     fmd_xprt_class_hash_t *xch, const char *class)
257 {
258 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
259 	fmd_xprt_class_t *xcp;
260 
261 	ASSERT(MUTEX_HELD(&xip->xi_lock));
262 
263 	for (xcp = xch->xch_hash[h]; xcp != NULL; xcp = xcp->xc_next) {
264 		if (strcmp(class, xcp->xc_class) == 0)
265 			return (++xcp->xc_refs);
266 	}
267 
268 	xcp = fmd_alloc(sizeof (fmd_xprt_class_t), FMD_SLEEP);
269 	xcp->xc_class = fmd_strdup(class, FMD_SLEEP);
270 	xcp->xc_next = xch->xch_hash[h];
271 	xcp->xc_refs = 1;
272 	xch->xch_hash[h] = xcp;
273 
274 	if (xch->xch_queue != NULL)
275 		fmd_dispq_insert(fmd.d_disp, xch->xch_queue, class);
276 
277 	return (xcp->xc_refs);
278 }
279 
280 /*
281  * Delete the specified class from the specified class hash, and return the
282  * reference count.  A return value of zero indicates the class was deleted.
283  * If an eventq is associated with the hash, delete the dispq subscription.
284  */
285 static uint_t
286 fmd_xprt_class_hash_delete(fmd_xprt_impl_t *xip,
287     fmd_xprt_class_hash_t *xch, const char *class)
288 {
289 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
290 	fmd_xprt_class_t *xcp, **pp;
291 
292 	ASSERT(MUTEX_HELD(&xip->xi_lock));
293 	pp = &xch->xch_hash[h];
294 
295 	for (xcp = *pp; xcp != NULL; xcp = xcp->xc_next) {
296 		if (strcmp(class, xcp->xc_class) == 0)
297 			break;
298 		else
299 			pp = &xcp->xc_next;
300 	}
301 
302 	if (xcp == NULL)
303 		return (-1U); /* explicitly permit an invalid delete */
304 
305 	if (--xcp->xc_refs != 0)
306 		return (xcp->xc_refs);
307 
308 	ASSERT(xcp->xc_refs == 0);
309 	*pp = xcp->xc_next;
310 
311 	fmd_strfree(xcp->xc_class);
312 	fmd_free(xcp, sizeof (fmd_xprt_class_t));
313 
314 	if (xch->xch_queue != NULL)
315 		fmd_dispq_delete(fmd.d_disp, xch->xch_queue, class);
316 
317 	return (0);
318 }
319 
320 /*
321  * Queue subscribe events for the specified transport corresponding to all of
322  * the active module subscriptions.  This is an extremely heavyweight operation
323  * that we expect to take place rarely (i.e. when loading a transport module
324  * or when it establishes a connection).  We lock all of the known modules to
325  * prevent them from adding or deleting subscriptions, then snapshot their
326  * subscriptions, and then unlock all of the modules.  We hold the modhash
327  * lock for the duration of this operation to prevent new modules from loading.
328  */
329 static void
330 fmd_xprt_subscribe_modhash(fmd_xprt_impl_t *xip, fmd_modhash_t *mhp)
331 {
332 	fmd_xprt_t *xp = (fmd_xprt_t *)xip;
333 	const fmd_conf_path_t *pap;
334 	fmd_module_t *mp;
335 	uint_t i, j;
336 
337 	(void) pthread_rwlock_rdlock(&mhp->mh_lock);
338 
339 	for (i = 0; i < mhp->mh_hashlen; i++) {
340 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
341 			fmd_module_lock(mp);
342 	}
343 
344 	(void) pthread_mutex_lock(&xip->xi_lock);
345 	ASSERT(!(xip->xi_flags & FMD_XPRT_SUBSCRIBER));
346 	xip->xi_flags |= FMD_XPRT_SUBSCRIBER;
347 	(void) pthread_mutex_unlock(&xip->xi_lock);
348 
349 	for (i = 0; i < mhp->mh_hashlen; i++) {
350 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) {
351 			(void) fmd_conf_getprop(mp->mod_conf,
352 			    FMD_PROP_SUBSCRIPTIONS, &pap);
353 			for (j = 0; j < pap->cpa_argc; j++)
354 				fmd_xprt_subscribe(xp, pap->cpa_argv[j]);
355 		}
356 	}
357 
358 	for (i = 0; i < mhp->mh_hashlen; i++) {
359 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
360 			fmd_module_unlock(mp);
361 	}
362 
363 	(void) pthread_rwlock_unlock(&mhp->mh_lock);
364 }
365 
366 static void
367 fmd_xprt_transition(fmd_xprt_impl_t *xip,
368     const fmd_xprt_rule_t *state, const char *tag)
369 {
370 	fmd_event_t *e;
371 	nvlist_t *nvl;
372 	char *s;
373 
374 	TRACE((FMD_DBG_XPRT, "xprt %u -> %s\n", xip->xi_id, tag));
375 
376 	xip->xi_state = state;
377 	s = fmd_strdup(tag, FMD_SLEEP);
378 
379 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
380 	fmd_strfree(xip->xi_stats->xs_state.fmds_value.str);
381 	xip->xi_stats->xs_state.fmds_value.str = s;
382 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
383 
384 	/*
385 	 * If we've reached the SUB state, take out the big hammer and snapshot
386 	 * all of the subscriptions of all of the loaded modules.  Then queue a
387 	 * run event for our remote peer indicating that it can enter RUN.
388 	 */
389 	if (state == _fmd_xprt_state_sub) {
390 		fmd_xprt_subscribe_modhash(xip, fmd.d_mod_hash);
391 
392 		nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
393 		    "resource.fm.xprt.run", xip->xi_version);
394 
395 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
396 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
397 		fmd_eventq_insert_at_time(xip->xi_queue, e);
398 	}
399 }
400 
401 static void
402 fmd_xprt_authupdate(fmd_xprt_impl_t *xip)
403 {
404 	char *s = fmd_fmri_auth2str(xip->xi_auth);
405 
406 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
407 	fmd_strfree(xip->xi_stats->xs_authority.fmds_value.str);
408 	xip->xi_stats->xs_authority.fmds_value.str = s;
409 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
410 }
411 
412 static int
413 fmd_xprt_vmismatch(fmd_xprt_impl_t *xip, nvlist_t *nvl, uint_t *rversionp)
414 {
415 	uint8_t rversion;
416 
417 	if (nvlist_lookup_uint8(nvl, FM_VERSION, &rversion) != 0) {
418 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
419 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
420 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
421 
422 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
423 		return (1);
424 	}
425 
426 	if (rversion > xip->xi_version) {
427 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u protocol mismatch: %u>%u\n",
428 		    xip->xi_id, rversion, xip->xi_version);
429 
430 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
431 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
432 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
433 
434 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
435 		return (1);
436 	}
437 
438 	if (rversionp != NULL)
439 		*rversionp = rversion;
440 
441 	return (0);
442 }
443 
444 void
445 fmd_xprt_event_syn(fmd_xprt_impl_t *xip, nvlist_t *nvl)
446 {
447 	fmd_event_t *e;
448 	uint_t vers;
449 	char *class;
450 
451 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
452 		return; /* transitioned to error state */
453 
454 	/*
455 	 * If the transport module didn't specify an authority, extract the
456 	 * one that is passed along with the xprt.syn event and use that.
457 	 */
458 	if (xip->xi_auth == NULL &&
459 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
460 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
461 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
462 		fmd_xprt_authupdate(xip);
463 	}
464 
465 	nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
466 	    "resource.fm.xprt.ack", xip->xi_version);
467 
468 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
469 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
470 	fmd_eventq_insert_at_time(xip->xi_queue, e);
471 
472 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
473 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
474 }
475 
476 void
477 fmd_xprt_event_ack(fmd_xprt_impl_t *xip, nvlist_t *nvl)
478 {
479 	uint_t vers;
480 
481 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
482 		return; /* transitioned to error state */
483 
484 	/*
485 	 * If the transport module didn't specify an authority, extract the
486 	 * one that is passed along with the xprt.syn event and use that.
487 	 */
488 	if (xip->xi_auth == NULL &&
489 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
490 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
491 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
492 		fmd_xprt_authupdate(xip);
493 	}
494 
495 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
496 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
497 }
498 
499 /*
500  * Upon transition to RUN, we take every solved case and resend a list.suspect
501  * event for it to our remote peer.  If a case transitions from solved to a
502  * future state (CLOSE_WAIT, CLOSED, or REPAIRED) while we are iterating over
503  * the case hash, we will get it as part of examining the resource cache, next.
504  */
505 static void
506 fmd_xprt_send_case(fmd_case_t *cp, void *arg)
507 {
508 	fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
509 	fmd_xprt_impl_t *xip = arg;
510 
511 	fmd_event_t *e;
512 	nvlist_t *nvl;
513 	char *class;
514 
515 	if (cip->ci_state != FMD_CASE_SOLVED)
516 		return; /* unsolved, or we'll get it during the ASRU pass */
517 
518 	nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS);
519 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
520 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
521 
522 	fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n",
523 	    FM_LIST_SUSPECT_CLASS, cip->ci_uuid, xip->xi_id);
524 
525 	fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid);
526 }
527 
528 /*
529  * Upon transition to RUN, we take every ASRU which is in the degraded state
530  * and resend a fault.* event for it to our remote peer, in case the peer is
531  * running in the fault manager that knows how to disable this resource.  If
532  * any new resources are added to the cache during our iteration, this is no
533  * problem because our subscriptions are already proxied and so any new cases
534  * will result in a list.suspect event being transported if that is needed.
535  */
536 static void
537 fmd_xprt_send_asru(fmd_asru_t *ap, void *arg)
538 {
539 	fmd_xprt_impl_t *xip = arg;
540 	nvlist_t *nvl = NULL;
541 	fmd_event_t *e;
542 	char *class;
543 
544 	(void) pthread_mutex_lock(&ap->asru_lock);
545 
546 	if ((ap->asru_flags & (FMD_ASRU_INTERNAL | FMD_ASRU_STATE)) ==
547 	    FMD_ASRU_FAULTY && fmd_case_orphaned(ap->asru_case))
548 		(void) nvlist_xdup(ap->asru_event, &nvl, &fmd.d_nva);
549 
550 	(void) pthread_mutex_unlock(&ap->asru_lock);
551 
552 	if (nvl == NULL)
553 		return; /* asru is internal, unusable, or not faulty */
554 
555 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
556 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
557 
558 	fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n",
559 	    class, ap->asru_name, xip->xi_id);
560 
561 	fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid);
562 }
563 
564 void
565 fmd_xprt_event_run(fmd_xprt_impl_t *xip, nvlist_t *nvl)
566 {
567 	if (!fmd_xprt_vmismatch(xip, nvl, NULL)) {
568 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
569 		fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case, xip);
570 		fmd_asru_hash_apply(fmd.d_asrus, fmd_xprt_send_asru, xip);
571 	}
572 }
573 
574 void
575 fmd_xprt_event_sub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
576 {
577 	char *class;
578 
579 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
580 		return; /* transitioned to error state */
581 
582 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
583 		return; /* malformed protocol event */
584 
585 	(void) pthread_mutex_lock(&xip->xi_lock);
586 	(void) fmd_xprt_class_hash_insert(xip, &xip->xi_lsub, class);
587 	(void) pthread_mutex_unlock(&xip->xi_lock);
588 
589 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
590 	xip->xi_stats->xs_subscriptions.fmds_value.ui64++;
591 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
592 }
593 
594 void
595 fmd_xprt_event_unsub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
596 {
597 	fmd_event_t *e;
598 	char *class;
599 
600 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
601 		return; /* transitioned to error state */
602 
603 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
604 		return; /* malformed protocol event */
605 
606 	(void) pthread_mutex_lock(&xip->xi_lock);
607 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_lsub, class);
608 	(void) pthread_mutex_unlock(&xip->xi_lock);
609 
610 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
611 	xip->xi_stats->xs_subscriptions.fmds_value.ui64--;
612 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
613 
614 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
615 	    "resource.fm.xprt.unsuback", xip->xi_version, class);
616 
617 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
618 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
619 	fmd_eventq_insert_at_time(xip->xi_queue, e);
620 }
621 
622 void
623 fmd_xprt_event_unsuback(fmd_xprt_impl_t *xip, nvlist_t *nvl)
624 {
625 	char *class;
626 
627 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
628 		return; /* transitioned to error state */
629 
630 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
631 		return; /* malformed protocol event */
632 
633 	(void) pthread_mutex_lock(&xip->xi_lock);
634 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_usub, class);
635 	(void) pthread_mutex_unlock(&xip->xi_lock);
636 }
637 
638 void
639 fmd_xprt_event_uuclose(fmd_xprt_impl_t *xip, nvlist_t *nvl)
640 {
641 	fmd_case_t *cp;
642 	char *uuid;
643 
644 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
645 		return; /* transitioned to error state */
646 
647 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
648 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
649 		fmd_case_transition(cp, FMD_CASE_CLOSE_WAIT, FMD_CF_ISOLATED);
650 		fmd_case_rele(cp);
651 	}
652 }
653 
654 void
655 fmd_xprt_event_error(fmd_xprt_impl_t *xip, nvlist_t *nvl)
656 {
657 	char *class = "<unknown>";
658 
659 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
660 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
661 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
662 
663 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
664 	TRACE((FMD_DBG_XPRT, "xprt %u bad event %s\n", xip->xi_id, class));
665 
666 	fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
667 }
668 
669 void
670 fmd_xprt_event_drop(fmd_xprt_impl_t *xip, nvlist_t *nvl)
671 {
672 	char *class = "<unknown>";
673 
674 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
675 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
676 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
677 
678 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
679 	TRACE((FMD_DBG_XPRT, "xprt %u drop event %s\n", xip->xi_id, class));
680 
681 }
682 
683 fmd_xprt_t *
684 fmd_xprt_create(fmd_module_t *mp, uint_t flags, nvlist_t *auth, void *data)
685 {
686 	fmd_xprt_impl_t *xip = fmd_zalloc(sizeof (fmd_xprt_impl_t), FMD_SLEEP);
687 	fmd_stat_t *statv;
688 	uint_t i, statc;
689 
690 	char buf[PATH_MAX];
691 	fmd_event_t *e;
692 	nvlist_t *nvl;
693 	char *s;
694 
695 	(void) pthread_mutex_init(&xip->xi_lock, NULL);
696 	(void) pthread_cond_init(&xip->xi_cv, NULL);
697 	(void) pthread_mutex_init(&xip->xi_stats_lock, NULL);
698 
699 	xip->xi_auth = auth;
700 	xip->xi_data = data;
701 	xip->xi_version = FM_RSRC_XPRT_VERSION;
702 	xip->xi_flags = flags;
703 
704 	/*
705 	 * Grab fmd.d_xprt_lock to block fmd_xprt_suspend_all() and then create
706 	 * a transport ID and make it visible in fmd.d_xprt_ids.  If transports
707 	 * were previously suspended, set the FMD_XPRT_DSUSPENDED flag on us to
708 	 * ensure that this transport will not run until fmd_xprt_resume_all().
709 	 */
710 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
711 	xip->xi_id = fmd_idspace_alloc(fmd.d_xprt_ids, xip);
712 
713 	if (fmd.d_xprt_suspend != 0)
714 		xip->xi_flags |= FMD_XPRT_DSUSPENDED;
715 
716 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
717 
718 	/*
719 	 * If the module has not yet finished _fmd_init(), set the ISUSPENDED
720 	 * bit so that fmdo_send() is not called until _fmd_init() completes.
721 	 */
722 	if (!(mp->mod_flags & FMD_MOD_INIT))
723 		xip->xi_flags |= FMD_XPRT_ISUSPENDED;
724 
725 	/*
726 	 * Initialize the transport statistics that we keep on behalf of fmd.
727 	 * These are set up using a template defined at the top of this file.
728 	 * We rename each statistic with a prefix ensuring its uniqueness.
729 	 */
730 	statc = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
731 	statv = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
732 	bcopy(&_fmd_xprt_stat_tmpl, statv, sizeof (_fmd_xprt_stat_tmpl));
733 
734 	for (i = 0; i < statc; i++) {
735 		(void) snprintf(statv[i].fmds_name,
736 		    sizeof (statv[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
737 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
738 	}
739 
740 	xip->xi_stats = (fmd_xprt_stat_t *)fmd_ustat_insert(
741 	    mp->mod_ustat, FMD_USTAT_NOALLOC, statc, statv, NULL);
742 
743 	if (xip->xi_stats == NULL)
744 		fmd_panic("failed to create xi_stats (%p)\n", (void *)statv);
745 
746 	xip->xi_stats->xs_module.fmds_value.str =
747 	    fmd_strdup(mp->mod_name, FMD_SLEEP);
748 
749 	if (xip->xi_auth != NULL)
750 		fmd_xprt_authupdate(xip);
751 
752 	/*
753 	 * Create the outbound eventq for this transport and link to its stats.
754 	 * If any suspend bits were set above, suspend the eventq immediately.
755 	 */
756 	xip->xi_queue = fmd_eventq_create(mp, &xip->xi_stats->xs_evqstat,
757 	    &xip->xi_stats_lock, mp->mod_stats->ms_xprtqlimit.fmds_value.ui32);
758 
759 	if (xip->xi_flags & FMD_XPRT_SMASK)
760 		fmd_eventq_suspend(xip->xi_queue);
761 
762 	/*
763 	 * Create our subscription hashes: local subscriptions go to xi_queue,
764 	 * remote subscriptions are tracked only for protocol requests, and
765 	 * pending unsubscriptions are associated with the /dev/null eventq.
766 	 */
767 	fmd_xprt_class_hash_create(&xip->xi_lsub, xip->xi_queue);
768 	fmd_xprt_class_hash_create(&xip->xi_rsub, NULL);
769 	fmd_xprt_class_hash_create(&xip->xi_usub, fmd.d_rmod->mod_queue);
770 
771 	/*
772 	 * Determine our initial state based upon the creation flags.  If we're
773 	 * read-only, go directly to RUN.  If we're accepting a new connection,
774 	 * wait for a SYN.  Otherwise send a SYN and wait for an ACK.
775 	 */
776 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
777 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
778 	else if (flags & FMD_XPRT_ACCEPT)
779 		fmd_xprt_transition(xip, _fmd_xprt_state_syn, "SYN");
780 	else
781 		fmd_xprt_transition(xip, _fmd_xprt_state_ack, "ACK");
782 
783 	/*
784 	 * If client.xprtlog is set to TRUE, create a debugging log for the
785 	 * events received by the transport in var/fm/fmd/xprt/.
786 	 */
787 	(void) fmd_conf_getprop(fmd.d_conf, "client.xprtlog", &i);
788 	(void) fmd_conf_getprop(fmd.d_conf, "log.xprt", &s);
789 
790 	if (i) {
791 		(void) snprintf(buf, sizeof (buf), "%s/%u.log", s, xip->xi_id);
792 		xip->xi_log = fmd_log_open(fmd.d_rootdir, buf, FMD_LOG_XPRT);
793 	}
794 
795 	ASSERT(fmd_module_locked(mp));
796 	fmd_list_append(&mp->mod_transports, xip);
797 
798 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
799 	mp->mod_stats->ms_xprtopen.fmds_value.ui32++;
800 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
801 
802 	/*
803 	 * If this is a read-only transport, return without creating a send
804 	 * queue thread and setting up any connection events in our queue.
805 	 */
806 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
807 		goto out;
808 
809 	/*
810 	 * Once the transport is fully initialized, create a send queue thread
811 	 * and start any connect events flowing to complete our initialization.
812 	 */
813 	if ((xip->xi_thread = fmd_thread_create(mp,
814 	    (fmd_thread_f *)fmd_xprt_send, xip)) == NULL) {
815 
816 		fmd_error(EFMD_XPRT_THR,
817 		    "failed to create thread for transport %u", xip->xi_id);
818 
819 		fmd_xprt_destroy((fmd_xprt_t *)xip);
820 		(void) fmd_set_errno(EFMD_XPRT_THR);
821 		return (NULL);
822 	}
823 
824 	/*
825 	 * If the transport is not being opened to accept an inbound connect,
826 	 * start an outbound connection by enqueuing a SYN event for our peer.
827 	 */
828 	if (!(flags & FMD_XPRT_ACCEPT)) {
829 		nvl = fmd_protocol_xprt_ctl(mp,
830 		    "resource.fm.xprt.syn", FM_RSRC_XPRT_VERSION);
831 
832 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
833 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
834 		fmd_eventq_insert_at_time(xip->xi_queue, e);
835 	}
836 out:
837 	fmd_dprintf(FMD_DBG_XPRT, "opened transport %u\n", xip->xi_id);
838 	return ((fmd_xprt_t *)xip);
839 }
840 
841 void
842 fmd_xprt_destroy(fmd_xprt_t *xp)
843 {
844 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
845 	fmd_module_t *mp = xip->xi_queue->eq_mod;
846 	uint_t id = xip->xi_id;
847 
848 	fmd_case_impl_t *cip, *nip;
849 	fmd_stat_t *sp;
850 	uint_t i, n;
851 
852 	ASSERT(fmd_module_locked(mp));
853 	fmd_list_delete(&mp->mod_transports, xip);
854 
855 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
856 	mp->mod_stats->ms_xprtopen.fmds_value.ui32--;
857 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
858 
859 	(void) pthread_mutex_lock(&xip->xi_lock);
860 
861 	while (xip->xi_busy != 0)
862 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
863 
864 	/*
865 	 * Remove the transport from global visibility, cancel its send-side
866 	 * thread, join with it, and then remove the transport from module
867 	 * visibility.  Once all this is done, destroy and free the transport.
868 	 */
869 	(void) fmd_idspace_free(fmd.d_xprt_ids, xip->xi_id);
870 
871 	if (xip->xi_thread != NULL) {
872 		fmd_eventq_abort(xip->xi_queue);
873 		fmd_thread_destroy(xip->xi_thread, FMD_THREAD_JOIN);
874 	}
875 
876 	if (xip->xi_log != NULL)
877 		fmd_log_rele(xip->xi_log);
878 
879 	/*
880 	 * Release every case handle in the module that was cached by this
881 	 * transport.  This will result in these cases disappearing from the
882 	 * local case hash so that fmd_case_uuclose() can no longer be used.
883 	 */
884 	for (cip = fmd_list_next(&mp->mod_cases); cip != NULL; cip = nip) {
885 		nip = fmd_list_next(cip);
886 		if (cip->ci_xprt == xp)
887 			fmd_case_discard((fmd_case_t *)cip);
888 	}
889 
890 	/*
891 	 * Destroy every class in the various subscription hashes and remove
892 	 * any corresponding subscriptions from the event dispatch queue.
893 	 */
894 	fmd_xprt_class_hash_destroy(&xip->xi_lsub);
895 	fmd_xprt_class_hash_destroy(&xip->xi_rsub);
896 	fmd_xprt_class_hash_destroy(&xip->xi_usub);
897 
898 	/*
899 	 * Uniquify the stat names exactly as was done in fmd_xprt_create()
900 	 * before calling fmd_ustat_insert(), otherwise fmd_ustat_delete()
901 	 * won't find the entries in the hash table.
902 	 */
903 	n = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
904 	sp = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
905 	bcopy(&_fmd_xprt_stat_tmpl, sp, sizeof (_fmd_xprt_stat_tmpl));
906 	for (i = 0; i < n; i++) {
907 		(void) snprintf(sp[i].fmds_name,
908 		    sizeof (sp[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
909 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
910 	}
911 	fmd_ustat_delete(mp->mod_ustat, n, sp);
912 	fmd_free(sp, sizeof (_fmd_xprt_stat_tmpl));
913 
914 	fmd_free(xip->xi_stats, sizeof (fmd_xprt_stat_t));
915 	fmd_eventq_destroy(xip->xi_queue);
916 	nvlist_free(xip->xi_auth);
917 	fmd_free(xip, sizeof (fmd_xprt_impl_t));
918 
919 	fmd_dprintf(FMD_DBG_XPRT, "closed transport %u\n", id);
920 }
921 
922 void
923 fmd_xprt_xsuspend(fmd_xprt_t *xp, uint_t flags)
924 {
925 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
926 	uint_t oflags;
927 
928 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
929 	(void) pthread_mutex_lock(&xip->xi_lock);
930 
931 	oflags = xip->xi_flags;
932 	xip->xi_flags |= flags;
933 
934 	if (!(oflags & FMD_XPRT_SMASK) && (xip->xi_flags & FMD_XPRT_SMASK) != 0)
935 		fmd_eventq_suspend(xip->xi_queue);
936 
937 	(void) pthread_cond_broadcast(&xip->xi_cv);
938 
939 	while (xip->xi_busy != 0)
940 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
941 
942 	(void) pthread_mutex_unlock(&xip->xi_lock);
943 }
944 
945 void
946 fmd_xprt_xresume(fmd_xprt_t *xp, uint_t flags)
947 {
948 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
949 	uint_t oflags;
950 
951 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
952 	(void) pthread_mutex_lock(&xip->xi_lock);
953 
954 	oflags = xip->xi_flags;
955 	xip->xi_flags &= ~flags;
956 
957 	if ((oflags & FMD_XPRT_SMASK) != 0 && !(xip->xi_flags & FMD_XPRT_SMASK))
958 		fmd_eventq_resume(xip->xi_queue);
959 
960 	(void) pthread_cond_broadcast(&xip->xi_cv);
961 	(void) pthread_mutex_unlock(&xip->xi_lock);
962 }
963 
964 void
965 fmd_xprt_send(fmd_xprt_t *xp)
966 {
967 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
968 	fmd_module_t *mp = xip->xi_queue->eq_mod;
969 	fmd_event_t *ep;
970 	int err;
971 
972 	while ((ep = fmd_eventq_delete(xip->xi_queue)) != NULL) {
973 		if (FMD_EVENT_TTL(ep) == 0) {
974 			fmd_event_rele(ep);
975 			continue;
976 		}
977 
978 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u sending %s\n",
979 		    xip->xi_id, (char *)FMD_EVENT_DATA(ep));
980 
981 		err = mp->mod_ops->mop_transport(mp, xp, ep);
982 		fmd_eventq_done(xip->xi_queue);
983 
984 		if (err == FMD_SEND_RETRY) {
985 			fmd_eventq_insert_at_time(xip->xi_queue, ep);
986 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
987 			xip->xi_stats->xs_retried.fmds_value.ui64++;
988 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
989 		}
990 
991 		if (err != FMD_SEND_SUCCESS && err != FMD_SEND_RETRY) {
992 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
993 			xip->xi_stats->xs_lost.fmds_value.ui64++;
994 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
995 		}
996 
997 		fmd_event_rele(ep);
998 	}
999 }
1000 
1001 void
1002 fmd_xprt_recv(fmd_xprt_t *xp, nvlist_t *nvl, hrtime_t hrt)
1003 {
1004 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1005 	const fmd_xprt_rule_t *xrp;
1006 	fmd_t *dp = &fmd;
1007 
1008 	fmd_event_t *e;
1009 	char *class, *uuid, *code;
1010 	int isproto;
1011 
1012 	uint64_t *tod;
1013 	uint8_t ttl;
1014 	uint_t n;
1015 
1016 	/*
1017 	 * Grab the transport lock and set the busy flag to indicate we are
1018 	 * busy receiving an event.  If [DI]SUSPEND is pending, wait until fmd
1019 	 * resumes the transport before continuing on with the receive.
1020 	 */
1021 	(void) pthread_mutex_lock(&xip->xi_lock);
1022 
1023 	while (xip->xi_flags & (FMD_XPRT_DSUSPENDED | FMD_XPRT_ISUSPENDED)) {
1024 
1025 		if (fmd.d_signal != 0) {
1026 			(void) pthread_mutex_unlock(&xip->xi_lock);
1027 			return; /* fmd_destroy() is in progress */
1028 		}
1029 
1030 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
1031 	}
1032 
1033 	xip->xi_busy++;
1034 	ASSERT(xip->xi_busy != 0);
1035 
1036 	(void) pthread_mutex_unlock(&xip->xi_lock);
1037 
1038 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
1039 	xip->xi_stats->xs_received.fmds_value.ui64++;
1040 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1041 
1042 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) {
1043 		fmd_error(EFMD_XPRT_PAYLOAD, "discarding nvlist %p: missing "
1044 		    "required \"%s\" payload element", (void *)nvl, FM_CLASS);
1045 
1046 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1047 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1048 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1049 
1050 		nvlist_free(nvl);
1051 		goto done;
1052 	}
1053 
1054 	fmd_dprintf(FMD_DBG_XPRT, "xprt %u posting %s\n", xip->xi_id, class);
1055 
1056 	/*
1057 	 * If a time-to-live value is present in the event and is zero, drop
1058 	 * the event and bump xs_timeouts.  Otherwise decrement the TTL value.
1059 	 */
1060 	if (nvlist_lookup_uint8(nvl, FMD_EVN_TTL, &ttl) == 0) {
1061 		if (ttl == 0) {
1062 			fmd_dprintf(FMD_DBG_XPRT, "xprt %u nvlist %p (%s) "
1063 			    "timeout: event received with ttl=0\n",
1064 			    xip->xi_id, (void *)nvl, class);
1065 
1066 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1067 			xip->xi_stats->xs_timeouts.fmds_value.ui64++;
1068 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1069 
1070 			nvlist_free(nvl);
1071 			goto done;
1072 		}
1073 		(void) nvlist_remove(nvl, FMD_EVN_TTL, DATA_TYPE_UINT8);
1074 		(void) nvlist_add_uint8(nvl, FMD_EVN_TTL, ttl - 1);
1075 	}
1076 
1077 	/*
1078 	 * If we are using the native system clock, the underlying transport
1079 	 * code can provide a tighter event time bound by telling us when the
1080 	 * event was enqueued.  If we're using simulated clocks, this time
1081 	 * has no meaning to us, so just reset the value to use HRT_NOW.
1082 	 */
1083 	if (dp->d_clockops != &fmd_timeops_native)
1084 		hrt = FMD_HRT_NOW;
1085 
1086 	/*
1087 	 * If an event's class is in the FMD_CTL_CLASS family, then create a
1088 	 * control event.  If a FMD_EVN_TOD member is found, create a protocol
1089 	 * event using this time.  Otherwise create a protocol event using hrt.
1090 	 */
1091 	if ((isproto = strncmp(class, FMD_CTL_CLASS, FMD_CTL_CLASS_LEN)) == 0)
1092 		e = fmd_event_create(FMD_EVT_CTL, hrt, nvl, fmd_ctl_init(nvl));
1093 	else if (nvlist_lookup_uint64_array(nvl, FMD_EVN_TOD, &tod, &n) != 0)
1094 		e = fmd_event_create(FMD_EVT_PROTOCOL, hrt, nvl, class);
1095 	else {
1096 		e = fmd_event_recreate(FMD_EVT_PROTOCOL,
1097 		    NULL, nvl, class, NULL, 0, 0);
1098 	}
1099 
1100 	/*
1101 	 * If the debug log is enabled, create a temporary event, log it to the
1102 	 * debug log, and then reset the underlying state of the event.
1103 	 */
1104 	if (xip->xi_log != NULL) {
1105 		fmd_event_impl_t *ep = (fmd_event_impl_t *)e;
1106 
1107 		fmd_log_append(xip->xi_log, e, NULL);
1108 
1109 		ep->ev_flags |= FMD_EVF_VOLATILE;
1110 		ep->ev_off = 0;
1111 		ep->ev_len = 0;
1112 
1113 		if (ep->ev_log != NULL) {
1114 			fmd_log_rele(ep->ev_log);
1115 			ep->ev_log = NULL;
1116 		}
1117 	}
1118 
1119 	/*
1120 	 * Iterate over the rules for the current state trying to match the
1121 	 * event class to one of our special rules.  If a rule is matched, the
1122 	 * event is consumed and not dispatched to other modules.  If the rule
1123 	 * set ends without matching an event, we fall through to dispatching.
1124 	 */
1125 	for (xrp = xip->xi_state; xrp->xr_class != NULL; xrp++) {
1126 		if (fmd_event_match(e, FMD_EVT_PROTOCOL, xrp->xr_class)) {
1127 			fmd_event_hold(e);
1128 			xrp->xr_func(xip, nvl);
1129 			fmd_event_rele(e);
1130 			goto done;
1131 		}
1132 	}
1133 
1134 	/*
1135 	 * Record the event in the errlog if it is an ereport.  This code will
1136 	 * be replaced later with a per-transport intent log instead.
1137 	 */
1138 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_EREPORT_CLASS ".*")) {
1139 		(void) pthread_rwlock_rdlock(&dp->d_log_lock);
1140 		fmd_log_append(dp->d_errlog, e, NULL);
1141 		(void) pthread_rwlock_unlock(&dp->d_log_lock);
1142 	}
1143 
1144 	/*
1145 	 * If a list.suspect event is received, create a case for the specified
1146 	 * UUID in the case hash, with the transport module as its owner.  If
1147 	 * the UUID is already known, fmd_case_recreate() will return NULL and
1148 	 * we simply proceed to our normal event handling regardless.
1149 	 */
1150 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_SUSPECT_CLASS) &&
1151 	    nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1152 	    nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code) == 0) {
1153 		fmd_module_lock(xip->xi_queue->eq_mod);
1154 		(void) fmd_case_recreate(xip->xi_queue->eq_mod,
1155 		    xp, FMD_CASE_SOLVED, uuid, code);
1156 		fmd_module_unlock(xip->xi_queue->eq_mod);
1157 	}
1158 
1159 	if (isproto)
1160 		fmd_dispq_dispatch(dp->d_disp, e, class);
1161 	else
1162 		fmd_modhash_dispatch(dp->d_mod_hash, e);
1163 done:
1164 	(void) pthread_mutex_lock(&xip->xi_lock);
1165 
1166 	ASSERT(xip->xi_busy != 0);
1167 	xip->xi_busy--;
1168 
1169 	(void) pthread_cond_broadcast(&xip->xi_cv);
1170 	(void) pthread_mutex_unlock(&xip->xi_lock);
1171 }
1172 
1173 void
1174 fmd_xprt_uuclose(fmd_xprt_t *xp, const char *uuid)
1175 {
1176 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1177 
1178 	fmd_event_t *e;
1179 	nvlist_t *nvl;
1180 	char *s;
1181 
1182 	fmd_dprintf(FMD_DBG_XPRT,
1183 	    "xprt %u closing case %s\n", xip->xi_id, uuid);
1184 
1185 	nvl = fmd_protocol_xprt_uuclose(xip->xi_queue->eq_mod,
1186 	    "resource.fm.xprt.uuclose", xip->xi_version, uuid);
1187 
1188 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1189 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1190 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1191 }
1192 
1193 /*
1194  * Insert the specified class into our remote subscription hash.  If the class
1195  * is already present, bump the reference count; otherwise add it to the hash
1196  * and then enqueue an event for our remote peer to proxy our subscription.
1197  */
1198 void
1199 fmd_xprt_subscribe(fmd_xprt_t *xp, const char *class)
1200 {
1201 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1202 
1203 	uint_t refs;
1204 	nvlist_t *nvl;
1205 	fmd_event_t *e;
1206 	char *s;
1207 
1208 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1209 		return; /* read-only transports do not proxy subscriptions */
1210 
1211 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1212 		return; /* transport is not yet an active subscriber */
1213 
1214 	(void) pthread_mutex_lock(&xip->xi_lock);
1215 	refs = fmd_xprt_class_hash_insert(xip, &xip->xi_rsub, class);
1216 	(void) pthread_mutex_unlock(&xip->xi_lock);
1217 
1218 	if (refs > 1)
1219 		return; /* we've already asked our peer for this subscription */
1220 
1221 	fmd_dprintf(FMD_DBG_XPRT,
1222 	    "xprt %u subscribing to %s\n", xip->xi_id, class);
1223 
1224 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1225 	    "resource.fm.xprt.subscribe", xip->xi_version, class);
1226 
1227 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1228 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1229 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1230 }
1231 
1232 /*
1233  * Delete the specified class from the remote subscription hash.  If the
1234  * reference count drops to zero, ask our remote peer to unsubscribe by proxy.
1235  */
1236 void
1237 fmd_xprt_unsubscribe(fmd_xprt_t *xp, const char *class)
1238 {
1239 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1240 
1241 	uint_t refs;
1242 	nvlist_t *nvl;
1243 	fmd_event_t *e;
1244 	char *s;
1245 
1246 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1247 		return; /* read-only transports do not proxy subscriptions */
1248 
1249 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1250 		return; /* transport is not yet an active subscriber */
1251 
1252 	/*
1253 	 * If the subscription reference count drops to zero in xi_rsub, insert
1254 	 * an entry into the xi_usub hash indicating we await an unsuback event.
1255 	 */
1256 	(void) pthread_mutex_lock(&xip->xi_lock);
1257 
1258 	if ((refs = fmd_xprt_class_hash_delete(xip, &xip->xi_rsub, class)) == 0)
1259 		(void) fmd_xprt_class_hash_insert(xip, &xip->xi_usub, class);
1260 
1261 	(void) pthread_mutex_unlock(&xip->xi_lock);
1262 
1263 	if (refs != 0)
1264 		return; /* other subscriptions for this class still active */
1265 
1266 	fmd_dprintf(FMD_DBG_XPRT,
1267 	    "xprt %u unsubscribing from %s\n", xip->xi_id, class);
1268 
1269 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1270 	    "resource.fm.xprt.unsubscribe", xip->xi_version, class);
1271 
1272 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1273 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1274 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1275 }
1276 
1277 static void
1278 fmd_xprt_subscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1279 {
1280 	fmd_xprt_t *xp;
1281 
1282 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1283 		fmd_xprt_subscribe(xp, class);
1284 		fmd_idspace_rele(ids, id);
1285 	}
1286 }
1287 
1288 void
1289 fmd_xprt_subscribe_all(const char *class)
1290 {
1291 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1292 
1293 	if (ids->ids_count != 0)
1294 		fmd_idspace_apply(ids, fmd_xprt_subscribe_xid, (void *)class);
1295 }
1296 
1297 static void
1298 fmd_xprt_unsubscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1299 {
1300 	fmd_xprt_t *xp;
1301 
1302 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1303 		fmd_xprt_unsubscribe(xp, class);
1304 		fmd_idspace_rele(ids, id);
1305 	}
1306 }
1307 
1308 void
1309 fmd_xprt_unsubscribe_all(const char *class)
1310 {
1311 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1312 
1313 	if (ids->ids_count != 0)
1314 		fmd_idspace_apply(ids, fmd_xprt_unsubscribe_xid, (void *)class);
1315 }
1316 
1317 /*ARGSUSED*/
1318 static void
1319 fmd_xprt_suspend_xid(fmd_idspace_t *ids, id_t id, void *arg)
1320 {
1321 	fmd_xprt_t *xp;
1322 
1323 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1324 		fmd_xprt_xsuspend(xp, FMD_XPRT_DSUSPENDED);
1325 		fmd_idspace_rele(ids, id);
1326 	}
1327 }
1328 
1329 void
1330 fmd_xprt_suspend_all(void)
1331 {
1332 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1333 
1334 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1335 
1336 	if (fmd.d_xprt_suspend++ != 0) {
1337 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1338 		return; /* already suspended */
1339 	}
1340 
1341 	if (ids->ids_count != 0)
1342 		fmd_idspace_apply(ids, fmd_xprt_suspend_xid, NULL);
1343 
1344 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1345 }
1346 
1347 /*ARGSUSED*/
1348 static void
1349 fmd_xprt_resume_xid(fmd_idspace_t *ids, id_t id, void *arg)
1350 {
1351 	fmd_xprt_t *xp;
1352 
1353 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1354 		fmd_xprt_xresume(xp, FMD_XPRT_DSUSPENDED);
1355 		fmd_idspace_rele(ids, id);
1356 	}
1357 }
1358 
1359 void
1360 fmd_xprt_resume_all(void)
1361 {
1362 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1363 
1364 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1365 
1366 	if (fmd.d_xprt_suspend == 0)
1367 		fmd_panic("fmd_xprt_suspend/resume_all mismatch\n");
1368 
1369 	if (--fmd.d_xprt_suspend != 0) {
1370 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1371 		return; /* not ready to be resumed */
1372 	}
1373 
1374 	if (ids->ids_count != 0)
1375 		fmd_idspace_apply(ids, fmd_xprt_resume_xid, NULL);
1376 
1377 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1378 }
1379