xref: /titanic_50/usr/src/cmd/fm/fmd/common/fmd_xprt.c (revision 53a7b6b6763f5865522a76e5e887390a8f4777d7)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * FMD Transport Subsystem
29  *
30  * A transport module uses some underlying mechanism to transport events.
31  * This mechanism may use any underlying link-layer protocol and may support
32  * additional link-layer packets unrelated to FMA.  Some appropriate link-
33  * layer mechanism to create the underlying connection is expected to be
34  * called prior to calling fmd_xprt_open() itself.  Alternatively, a transport
35  * may be created in the suspended state by specifying the FMD_XPRT_SUSPENDED
36  * flag as part of the call to fmd_xprt_open(), and then may be resumed later.
37  * The underlying transport mechanism is *required* to provide ordering: that
38  * is, the sequences of bytes written across the transport must be read by
39  * the remote peer in the order that they are written, even across separate
40  * calls to fmdo_send().  As an example, the Internet TCP protocol would be
41  * a valid transport as it guarantees ordering, whereas the Internet UDP
42  * protocol would not because UDP datagrams may be delivered in any order
43  * as a result of delays introduced when datagrams pass through routers.
44  *
45  * Similar to sending events, a transport module receives events that are from
46  * its peer remote endpoint using some transport-specific mechanism that is
47  * unknown to FMD.  As each event is received, the transport module is
48  * responsible for constructing a valid nvlist_t object from the data and then
49  * calling fmd_xprt_post() to post the event to the containing FMD's dispatch
50  * queue, making it available to all local modules that are not transport
51  * modules that have subscribed to the event.
52  *
53  * The following state machine is used for each transport.  The initial state
54  * is either SYN, ACK, or RUN, depending on the flags specified to xprt_create.
55  *
56  *       FMD_XPRT_ACCEPT   !FMD_XPRT_ACCEPT
57  *             |                 |
58  * waiting  +--v--+           +--v--+  waiting
59  * for syn  | SYN |--+     --+| ACK |  for ack
60  * event    +-----+   \   /   +-----+  event
61  *             |       \ /       |
62  * drop all +--v--+     X     +--v--+  send subscriptions,
63  * events   | ERR |<---+ +--->| SUB |  recv subscriptions,
64  *          +-----+           +-----+  wait for run event
65  *             ^                 |
66  *             |     +-----+     |
67  *             +-----| RUN |<----+
68  *                   +--^--+
69  *                      |
70  *               FMD_XPRT_RDONLY
71  *
72  * When fmd_xprt_open() is called without FMD_XPRT_ACCEPT, the Common Transport
73  * Layer enqueues a "syn" event for the module in its event queue and sets the
74  * state to ACK.  In state ACK, we are waiting for the transport to get an
75  * "ack" event and call fmd_xprt_post() on this event.  Other events will be
76  * discarded.  If an "ack" is received, we transition to state SUB.  If a
77  * configurable timeout occurs or if the "ack" is invalid (e.g. invalid version
78  * exchange), we transition to state ERR.  Once in state ERR, no further
79  * operations are valid except fmd_xprt_close() and fmd_xprt_error() will
80  * return a non-zero value to the caller indicating the transport has failed.
81  *
82  * When fmd_xprt_open() is called with FMD_XPRT_ACCEPT, the Common Transport
83  * Layer assumes this transport is being used to accept a virtual connection
84  * from a remote peer that is sending a "syn", and sets the initial state to
85  * SYN.  In this state, the transport waits for a "syn" event, validates it,
86  * and then transitions to state SUB if it is valid or state ERR if it is not.
87  *
88  * Once in state SUB, the transport module is expected to receive a sequence of
89  * zero or more "subscribe" events from the remote peer, followed by a "run"
90  * event.  Once in state RUN, the transport is active and any events can be
91  * sent or received.  The transport module is free to call fmd_xprt_close()
92  * from any state.  The fmd_xprt_error() function will return zero if the
93  * transport is not in the ERR state, or non-zero if it is in the ERR state.
94  *
95  * Once the state machine reaches RUN, other FMA protocol events can be sent
96  * and received across the transport in addition to the various control events.
97  *
98  * Table of Common Transport Layer Control Events
99  * ==============================================
100  *
101  * FMA Class                     Payload
102  * ---------                     -------
103  * resource.fm.xprt.uuclose      string (uuid of case)
104  * resource.fm.xprt.subscribe    string (class pattern)
105  * resource.fm.xprt.unsubscribe  string (class pattern)
106  * resource.fm.xprt.unsuback     string (class pattern)
107  * resource.fm.xprt.syn          version information
108  * resource.fm.xprt.ack          version information
109  * resource.fm.xprt.run          version information
110  *
111  * Control events are used to add and delete proxy subscriptions on the remote
112  * transport peer module, and to set up connections.  When a "syn" event is
113  * sent, FMD will include in the payload the highest version of the FMA event
114  * protocol that is supported by the sender.  When a "syn" event is received,
115  * the receiving FMD will use the minimum of this version and its version of
116  * the protocol, and reply with this new minimum version in the "ack" event.
117  * The receiver will then use this new minimum for subsequent event semantics.
118  */
119 
120 #include <sys/fm/protocol.h>
121 #include <strings.h>
122 #include <limits.h>
123 
124 #include <fmd_alloc.h>
125 #include <fmd_error.h>
126 #include <fmd_conf.h>
127 #include <fmd_subr.h>
128 #include <fmd_string.h>
129 #include <fmd_protocol.h>
130 #include <fmd_thread.h>
131 #include <fmd_eventq.h>
132 #include <fmd_dispq.h>
133 #include <fmd_ctl.h>
134 #include <fmd_log.h>
135 #include <fmd_ustat.h>
136 #include <fmd_case.h>
137 #include <fmd_api.h>
138 #include <fmd_fmri.h>
139 #include <fmd_asru.h>
140 #include <fmd_xprt.h>
141 
142 #include <fmd.h>
143 
144 /*
145  * The states shown above in the transport state machine diagram are encoded
146  * using arrays of class patterns and a corresponding action function.  These
147  * arrays are then passed to fmd_xprt_transition() to change transport states.
148  */
149 
150 const fmd_xprt_rule_t _fmd_xprt_state_syn[] = {
151 { "resource.fm.xprt.syn", fmd_xprt_event_syn },
152 { "*", fmd_xprt_event_error },
153 { NULL, NULL }
154 };
155 
156 const fmd_xprt_rule_t _fmd_xprt_state_ack[] = {
157 { "resource.fm.xprt.ack", fmd_xprt_event_ack },
158 { "*", fmd_xprt_event_error },
159 };
160 
161 const fmd_xprt_rule_t _fmd_xprt_state_err[] = {
162 { "*", fmd_xprt_event_drop },
163 { NULL, NULL }
164 };
165 
166 const fmd_xprt_rule_t _fmd_xprt_state_sub[] = {
167 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
168 { "resource.fm.xprt.run", fmd_xprt_event_run },
169 { "resource.fm.xprt.*", fmd_xprt_event_error },
170 { "*", fmd_xprt_event_drop },
171 { NULL, NULL }
172 };
173 
174 const fmd_xprt_rule_t _fmd_xprt_state_run[] = {
175 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub },
176 { "resource.fm.xprt.unsubscribe", fmd_xprt_event_unsub },
177 { "resource.fm.xprt.unsuback", fmd_xprt_event_unsuback },
178 { "resource.fm.xprt.uuclose", fmd_xprt_event_uuclose },
179 { "resource.fm.xprt.*", fmd_xprt_event_error },
180 { NULL, NULL }
181 };
182 
183 /*
184  * Template for per-transport statistics installed by fmd on behalf of each
185  * transport.  These are used to initialize the per-transport xi_stats.  For
186  * each statistic, the name is prepended with "fmd.xprt.%u", where %u is the
187  * transport ID (xi_id) and then are inserted into the per-module stats hash.
188  * The values in this array must match fmd_xprt_stat_t from <fmd_xprt.h>.
189  */
190 static const fmd_xprt_stat_t _fmd_xprt_stat_tmpl = {
191 {
192 { "dispatched", FMD_TYPE_UINT64, "total events dispatched to transport" },
193 { "dequeued", FMD_TYPE_UINT64, "total events dequeued by transport" },
194 { "prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by transport" },
195 { "dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" },
196 { "wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" },
197 { "wtime", FMD_TYPE_TIME, "total wait time on queue" },
198 { "wlentime", FMD_TYPE_TIME, "total wait length * time product" },
199 { "wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" },
200 { "dtime", FMD_TYPE_TIME, "total processing time after dequeue" },
201 { "dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" },
202 },
203 { "module", FMD_TYPE_STRING, "module that owns this transport" },
204 { "authority", FMD_TYPE_STRING, "authority associated with this transport" },
205 { "state", FMD_TYPE_STRING, "current transport state" },
206 { "received", FMD_TYPE_UINT64, "events received by transport" },
207 { "discarded", FMD_TYPE_UINT64, "bad events discarded by transport" },
208 { "retried", FMD_TYPE_UINT64, "retries requested of transport" },
209 { "replayed", FMD_TYPE_UINT64, "events replayed by transport" },
210 { "lost", FMD_TYPE_UINT64, "events lost by transport" },
211 { "timeouts", FMD_TYPE_UINT64, "events received by transport with ttl=0" },
212 { "subscriptions", FMD_TYPE_UINT64, "subscriptions registered to transport" },
213 };
214 
215 static void
216 fmd_xprt_class_hash_create(fmd_xprt_class_hash_t *xch, fmd_eventq_t *eq)
217 {
218 	uint_t hashlen = fmd.d_str_buckets;
219 
220 	xch->xch_queue = eq;
221 	xch->xch_hashlen = hashlen;
222 	xch->xch_hash = fmd_zalloc(sizeof (void *) * hashlen, FMD_SLEEP);
223 }
224 
225 static void
226 fmd_xprt_class_hash_destroy(fmd_xprt_class_hash_t *xch)
227 {
228 	fmd_eventq_t *eq = xch->xch_queue;
229 	fmd_xprt_class_t *xcp, *ncp;
230 	uint_t i;
231 
232 	for (i = 0; i < xch->xch_hashlen; i++) {
233 		for (xcp = xch->xch_hash[i]; xcp != NULL; xcp = ncp) {
234 			ncp = xcp->xc_next;
235 
236 			if (eq != NULL)
237 				fmd_dispq_delete(fmd.d_disp, eq, xcp->xc_class);
238 
239 			fmd_strfree(xcp->xc_class);
240 			fmd_free(xcp, sizeof (fmd_xprt_class_t));
241 		}
242 	}
243 
244 	fmd_free(xch->xch_hash, sizeof (void *) * xch->xch_hashlen);
245 }
246 
247 /*
248  * Insert the specified class into the specified class hash, and return the
249  * reference count.  A return value of one indicates this is the first insert.
250  * If an eventq is associated with the hash, insert a dispq subscription for it.
251  */
252 static uint_t
253 fmd_xprt_class_hash_insert(fmd_xprt_impl_t *xip,
254     fmd_xprt_class_hash_t *xch, const char *class)
255 {
256 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
257 	fmd_xprt_class_t *xcp;
258 
259 	ASSERT(MUTEX_HELD(&xip->xi_lock));
260 
261 	for (xcp = xch->xch_hash[h]; xcp != NULL; xcp = xcp->xc_next) {
262 		if (strcmp(class, xcp->xc_class) == 0)
263 			return (++xcp->xc_refs);
264 	}
265 
266 	xcp = fmd_alloc(sizeof (fmd_xprt_class_t), FMD_SLEEP);
267 	xcp->xc_class = fmd_strdup(class, FMD_SLEEP);
268 	xcp->xc_next = xch->xch_hash[h];
269 	xcp->xc_refs = 1;
270 	xch->xch_hash[h] = xcp;
271 
272 	if (xch->xch_queue != NULL)
273 		fmd_dispq_insert(fmd.d_disp, xch->xch_queue, class);
274 
275 	return (xcp->xc_refs);
276 }
277 
278 /*
279  * Delete the specified class from the specified class hash, and return the
280  * reference count.  A return value of zero indicates the class was deleted.
281  * If an eventq is associated with the hash, delete the dispq subscription.
282  */
283 static uint_t
284 fmd_xprt_class_hash_delete(fmd_xprt_impl_t *xip,
285     fmd_xprt_class_hash_t *xch, const char *class)
286 {
287 	uint_t h = fmd_strhash(class) % xch->xch_hashlen;
288 	fmd_xprt_class_t *xcp, **pp;
289 
290 	ASSERT(MUTEX_HELD(&xip->xi_lock));
291 	pp = &xch->xch_hash[h];
292 
293 	for (xcp = *pp; xcp != NULL; xcp = xcp->xc_next) {
294 		if (strcmp(class, xcp->xc_class) == 0)
295 			break;
296 		else
297 			pp = &xcp->xc_next;
298 	}
299 
300 	if (xcp == NULL)
301 		return (-1U); /* explicitly permit an invalid delete */
302 
303 	if (--xcp->xc_refs != 0)
304 		return (xcp->xc_refs);
305 
306 	ASSERT(xcp->xc_refs == 0);
307 	*pp = xcp->xc_next;
308 
309 	fmd_strfree(xcp->xc_class);
310 	fmd_free(xcp, sizeof (fmd_xprt_class_t));
311 
312 	if (xch->xch_queue != NULL)
313 		fmd_dispq_delete(fmd.d_disp, xch->xch_queue, class);
314 
315 	return (0);
316 }
317 
318 /*
319  * Queue subscribe events for the specified transport corresponding to all of
320  * the active module subscriptions.  This is an extremely heavyweight operation
321  * that we expect to take place rarely (i.e. when loading a transport module
322  * or when it establishes a connection).  We lock all of the known modules to
323  * prevent them from adding or deleting subscriptions, then snapshot their
324  * subscriptions, and then unlock all of the modules.  We hold the modhash
325  * lock for the duration of this operation to prevent new modules from loading.
326  */
327 static void
328 fmd_xprt_subscribe_modhash(fmd_xprt_impl_t *xip, fmd_modhash_t *mhp)
329 {
330 	fmd_xprt_t *xp = (fmd_xprt_t *)xip;
331 	const fmd_conf_path_t *pap;
332 	fmd_module_t *mp;
333 	uint_t i, j;
334 
335 	(void) pthread_rwlock_rdlock(&mhp->mh_lock);
336 
337 	for (i = 0; i < mhp->mh_hashlen; i++) {
338 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
339 			fmd_module_lock(mp);
340 	}
341 
342 	(void) pthread_mutex_lock(&xip->xi_lock);
343 	ASSERT(!(xip->xi_flags & FMD_XPRT_SUBSCRIBER));
344 	xip->xi_flags |= FMD_XPRT_SUBSCRIBER;
345 	(void) pthread_mutex_unlock(&xip->xi_lock);
346 
347 	for (i = 0; i < mhp->mh_hashlen; i++) {
348 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) {
349 			(void) fmd_conf_getprop(mp->mod_conf,
350 			    FMD_PROP_SUBSCRIPTIONS, &pap);
351 			for (j = 0; j < pap->cpa_argc; j++)
352 				fmd_xprt_subscribe(xp, pap->cpa_argv[j]);
353 		}
354 	}
355 
356 	for (i = 0; i < mhp->mh_hashlen; i++) {
357 		for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next)
358 			fmd_module_unlock(mp);
359 	}
360 
361 	(void) pthread_rwlock_unlock(&mhp->mh_lock);
362 }
363 
364 static void
365 fmd_xprt_transition(fmd_xprt_impl_t *xip,
366     const fmd_xprt_rule_t *state, const char *tag)
367 {
368 	fmd_event_t *e;
369 	nvlist_t *nvl;
370 	char *s;
371 
372 	TRACE((FMD_DBG_XPRT, "xprt %u -> %s\n", xip->xi_id, tag));
373 
374 	xip->xi_state = state;
375 	s = fmd_strdup(tag, FMD_SLEEP);
376 
377 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
378 	fmd_strfree(xip->xi_stats->xs_state.fmds_value.str);
379 	xip->xi_stats->xs_state.fmds_value.str = s;
380 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
381 
382 	/*
383 	 * If we've reached the SUB state, take out the big hammer and snapshot
384 	 * all of the subscriptions of all of the loaded modules.  Then queue a
385 	 * run event for our remote peer indicating that it can enter RUN.
386 	 */
387 	if (state == _fmd_xprt_state_sub) {
388 		fmd_xprt_subscribe_modhash(xip, fmd.d_mod_hash);
389 
390 		nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
391 		    "resource.fm.xprt.run", xip->xi_version);
392 
393 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
394 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
395 		fmd_eventq_insert_at_time(xip->xi_queue, e);
396 	}
397 }
398 
399 static void
400 fmd_xprt_authupdate(fmd_xprt_impl_t *xip)
401 {
402 	char *s = fmd_fmri_auth2str(xip->xi_auth);
403 
404 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
405 	fmd_strfree(xip->xi_stats->xs_authority.fmds_value.str);
406 	xip->xi_stats->xs_authority.fmds_value.str = s;
407 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
408 }
409 
410 static int
411 fmd_xprt_vmismatch(fmd_xprt_impl_t *xip, nvlist_t *nvl, uint_t *rversionp)
412 {
413 	uint8_t rversion;
414 
415 	if (nvlist_lookup_uint8(nvl, FM_VERSION, &rversion) != 0) {
416 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
417 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
418 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
419 
420 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
421 		return (1);
422 	}
423 
424 	if (rversion > xip->xi_version) {
425 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u protocol mismatch: %u>%u\n",
426 		    xip->xi_id, rversion, xip->xi_version);
427 
428 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
429 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
430 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
431 
432 		fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
433 		return (1);
434 	}
435 
436 	if (rversionp != NULL)
437 		*rversionp = rversion;
438 
439 	return (0);
440 }
441 
442 void
443 fmd_xprt_event_syn(fmd_xprt_impl_t *xip, nvlist_t *nvl)
444 {
445 	fmd_event_t *e;
446 	uint_t vers;
447 	char *class;
448 
449 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
450 		return; /* transitioned to error state */
451 
452 	/*
453 	 * If the transport module didn't specify an authority, extract the
454 	 * one that is passed along with the xprt.syn event and use that.
455 	 */
456 	if (xip->xi_auth == NULL &&
457 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
458 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
459 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
460 		fmd_xprt_authupdate(xip);
461 	}
462 
463 	nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod,
464 	    "resource.fm.xprt.ack", xip->xi_version);
465 
466 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
467 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
468 	fmd_eventq_insert_at_time(xip->xi_queue, e);
469 
470 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
471 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
472 }
473 
474 void
475 fmd_xprt_event_ack(fmd_xprt_impl_t *xip, nvlist_t *nvl)
476 {
477 	uint_t vers;
478 
479 	if (fmd_xprt_vmismatch(xip, nvl, &vers))
480 		return; /* transitioned to error state */
481 
482 	/*
483 	 * If the transport module didn't specify an authority, extract the
484 	 * one that is passed along with the xprt.syn event and use that.
485 	 */
486 	if (xip->xi_auth == NULL &&
487 	    nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 &&
488 	    nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) {
489 		(void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva);
490 		fmd_xprt_authupdate(xip);
491 	}
492 
493 	xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers);
494 	fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB");
495 }
496 
497 /*
498  * Upon transition to RUN, we take every solved case and resend a list.suspect
499  * event for it to our remote peer.  If a case transitions from solved to a
500  * future state (CLOSE_WAIT, CLOSED, or REPAIRED) while we are iterating over
501  * the case hash, we will get it as part of examining the resource cache, next.
502  */
503 static void
504 fmd_xprt_send_case(fmd_case_t *cp, void *arg)
505 {
506 	fmd_case_impl_t *cip = (fmd_case_impl_t *)cp;
507 	fmd_xprt_impl_t *xip = arg;
508 
509 	fmd_event_t *e;
510 	nvlist_t *nvl;
511 	char *class;
512 
513 	if (cip->ci_state != FMD_CASE_SOLVED)
514 		return; /* unsolved, or we'll get it during the ASRU pass */
515 
516 	nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS);
517 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
518 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
519 
520 	fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n",
521 	    FM_LIST_SUSPECT_CLASS, cip->ci_uuid, xip->xi_id);
522 
523 	fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid);
524 }
525 
526 /*
527  * Upon transition to RUN, we take every ASRU which is in the degraded state
528  * and resend a fault.* event for it to our remote peer, in case the peer is
529  * running in the fault manager that knows how to disable this resource.  If
530  * any new resources are added to the cache during our iteration, this is no
531  * problem because our subscriptions are already proxied and so any new cases
532  * will result in a list.suspect event being transported if that is needed.
533  */
534 static void
535 fmd_xprt_send_asru(fmd_asru_t *ap, void *arg)
536 {
537 	fmd_xprt_impl_t *xip = arg;
538 	nvlist_t *nvl = NULL;
539 	fmd_event_t *e;
540 	char *class;
541 
542 	(void) pthread_mutex_lock(&ap->asru_lock);
543 
544 	if ((ap->asru_flags & (FMD_ASRU_INTERNAL | FMD_ASRU_STATE)) ==
545 	    FMD_ASRU_FAULTY && fmd_case_orphaned(ap->asru_case))
546 		(void) nvlist_xdup(ap->asru_event, &nvl, &fmd.d_nva);
547 
548 	(void) pthread_mutex_unlock(&ap->asru_lock);
549 
550 	if (nvl == NULL)
551 		return; /* asru is internal, unusable, or not faulty */
552 
553 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
554 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
555 
556 	fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n",
557 	    class, ap->asru_name, xip->xi_id);
558 
559 	fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid);
560 }
561 
562 void
563 fmd_xprt_event_run(fmd_xprt_impl_t *xip, nvlist_t *nvl)
564 {
565 	if (!fmd_xprt_vmismatch(xip, nvl, NULL)) {
566 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
567 		fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case, xip);
568 		fmd_asru_hash_apply(fmd.d_asrus, fmd_xprt_send_asru, xip);
569 	}
570 }
571 
572 void
573 fmd_xprt_event_sub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
574 {
575 	char *class;
576 
577 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
578 		return; /* transitioned to error state */
579 
580 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
581 		return; /* malformed protocol event */
582 
583 	(void) pthread_mutex_lock(&xip->xi_lock);
584 	(void) fmd_xprt_class_hash_insert(xip, &xip->xi_lsub, class);
585 	(void) pthread_mutex_unlock(&xip->xi_lock);
586 
587 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
588 	xip->xi_stats->xs_subscriptions.fmds_value.ui64++;
589 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
590 }
591 
592 void
593 fmd_xprt_event_unsub(fmd_xprt_impl_t *xip, nvlist_t *nvl)
594 {
595 	fmd_event_t *e;
596 	char *class;
597 
598 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
599 		return; /* transitioned to error state */
600 
601 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
602 		return; /* malformed protocol event */
603 
604 	(void) pthread_mutex_lock(&xip->xi_lock);
605 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_lsub, class);
606 	(void) pthread_mutex_unlock(&xip->xi_lock);
607 
608 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
609 	xip->xi_stats->xs_subscriptions.fmds_value.ui64--;
610 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
611 
612 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
613 	    "resource.fm.xprt.unsuback", xip->xi_version, class);
614 
615 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
616 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class);
617 	fmd_eventq_insert_at_time(xip->xi_queue, e);
618 }
619 
620 void
621 fmd_xprt_event_unsuback(fmd_xprt_impl_t *xip, nvlist_t *nvl)
622 {
623 	char *class;
624 
625 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
626 		return; /* transitioned to error state */
627 
628 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0)
629 		return; /* malformed protocol event */
630 
631 	(void) pthread_mutex_lock(&xip->xi_lock);
632 	(void) fmd_xprt_class_hash_delete(xip, &xip->xi_usub, class);
633 	(void) pthread_mutex_unlock(&xip->xi_lock);
634 }
635 
636 void
637 fmd_xprt_event_uuclose(fmd_xprt_impl_t *xip, nvlist_t *nvl)
638 {
639 	fmd_case_t *cp;
640 	char *uuid;
641 
642 	if (fmd_xprt_vmismatch(xip, nvl, NULL))
643 		return; /* transitioned to error state */
644 
645 	if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 &&
646 	    (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) {
647 		fmd_case_transition(cp, FMD_CASE_CLOSE_WAIT, FMD_CF_ISOLATED);
648 		fmd_case_rele(cp);
649 	}
650 }
651 
652 void
653 fmd_xprt_event_error(fmd_xprt_impl_t *xip, nvlist_t *nvl)
654 {
655 	char *class = "<unknown>";
656 
657 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
658 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
659 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
660 
661 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
662 	TRACE((FMD_DBG_XPRT, "xprt %u bad event %s\n", xip->xi_id, class));
663 
664 	fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR");
665 }
666 
667 void
668 fmd_xprt_event_drop(fmd_xprt_impl_t *xip, nvlist_t *nvl)
669 {
670 	char *class = "<unknown>";
671 
672 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
673 	xip->xi_stats->xs_discarded.fmds_value.ui64++;
674 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
675 
676 	(void) nvlist_lookup_string(nvl, FM_CLASS, &class);
677 	TRACE((FMD_DBG_XPRT, "xprt %u drop event %s\n", xip->xi_id, class));
678 
679 }
680 
681 fmd_xprt_t *
682 fmd_xprt_create(fmd_module_t *mp, uint_t flags, nvlist_t *auth, void *data)
683 {
684 	fmd_xprt_impl_t *xip = fmd_zalloc(sizeof (fmd_xprt_impl_t), FMD_SLEEP);
685 	fmd_stat_t *statv;
686 	uint_t i, statc;
687 
688 	char buf[PATH_MAX];
689 	fmd_event_t *e;
690 	nvlist_t *nvl;
691 	char *s;
692 
693 	(void) pthread_mutex_init(&xip->xi_lock, NULL);
694 	(void) pthread_cond_init(&xip->xi_cv, NULL);
695 	(void) pthread_mutex_init(&xip->xi_stats_lock, NULL);
696 
697 	xip->xi_auth = auth;
698 	xip->xi_data = data;
699 	xip->xi_version = FM_RSRC_XPRT_VERSION;
700 	xip->xi_flags = flags;
701 
702 	/*
703 	 * Grab fmd.d_xprt_lock to block fmd_xprt_suspend_all() and then create
704 	 * a transport ID and make it visible in fmd.d_xprt_ids.  If transports
705 	 * were previously suspended, set the FMD_XPRT_DSUSPENDED flag on us to
706 	 * ensure that this transport will not run until fmd_xprt_resume_all().
707 	 */
708 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
709 	xip->xi_id = fmd_idspace_alloc(fmd.d_xprt_ids, xip);
710 
711 	if (fmd.d_xprt_suspend != 0)
712 		xip->xi_flags |= FMD_XPRT_DSUSPENDED;
713 
714 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
715 
716 	/*
717 	 * If the module has not yet finished _fmd_init(), set the ISUSPENDED
718 	 * bit so that fmdo_send() is not called until _fmd_init() completes.
719 	 */
720 	if (!(mp->mod_flags & FMD_MOD_INIT))
721 		xip->xi_flags |= FMD_XPRT_ISUSPENDED;
722 
723 	/*
724 	 * Initialize the transport statistics that we keep on behalf of fmd.
725 	 * These are set up using a template defined at the top of this file.
726 	 * We rename each statistic with a prefix ensuring its uniqueness.
727 	 */
728 	statc = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
729 	statv = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
730 	bcopy(&_fmd_xprt_stat_tmpl, statv, sizeof (_fmd_xprt_stat_tmpl));
731 
732 	for (i = 0; i < statc; i++) {
733 		(void) snprintf(statv[i].fmds_name,
734 		    sizeof (statv[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
735 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
736 	}
737 
738 	xip->xi_stats = (fmd_xprt_stat_t *)fmd_ustat_insert(
739 	    mp->mod_ustat, FMD_USTAT_NOALLOC, statc, statv, NULL);
740 
741 	if (xip->xi_stats == NULL)
742 		fmd_panic("failed to create xi_stats (%p)\n", (void *)statv);
743 
744 	xip->xi_stats->xs_module.fmds_value.str =
745 	    fmd_strdup(mp->mod_name, FMD_SLEEP);
746 
747 	if (xip->xi_auth != NULL)
748 		fmd_xprt_authupdate(xip);
749 
750 	/*
751 	 * Create the outbound eventq for this transport and link to its stats.
752 	 * If any suspend bits were set above, suspend the eventq immediately.
753 	 */
754 	xip->xi_queue = fmd_eventq_create(mp, &xip->xi_stats->xs_evqstat,
755 	    &xip->xi_stats_lock, mp->mod_stats->ms_xprtqlimit.fmds_value.ui32);
756 
757 	if (xip->xi_flags & FMD_XPRT_SMASK)
758 		fmd_eventq_suspend(xip->xi_queue);
759 
760 	/*
761 	 * Create our subscription hashes: local subscriptions go to xi_queue,
762 	 * remote subscriptions are tracked only for protocol requests, and
763 	 * pending unsubscriptions are associated with the /dev/null eventq.
764 	 */
765 	fmd_xprt_class_hash_create(&xip->xi_lsub, xip->xi_queue);
766 	fmd_xprt_class_hash_create(&xip->xi_rsub, NULL);
767 	fmd_xprt_class_hash_create(&xip->xi_usub, fmd.d_rmod->mod_queue);
768 
769 	/*
770 	 * Determine our initial state based upon the creation flags.  If we're
771 	 * read-only, go directly to RUN.  If we're accepting a new connection,
772 	 * wait for a SYN.  Otherwise send a SYN and wait for an ACK.
773 	 */
774 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
775 		fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN");
776 	else if (flags & FMD_XPRT_ACCEPT)
777 		fmd_xprt_transition(xip, _fmd_xprt_state_syn, "SYN");
778 	else
779 		fmd_xprt_transition(xip, _fmd_xprt_state_ack, "ACK");
780 
781 	/*
782 	 * If client.xprtlog is set to TRUE, create a debugging log for the
783 	 * events received by the transport in var/fm/fmd/xprt/.
784 	 */
785 	(void) fmd_conf_getprop(fmd.d_conf, "client.xprtlog", &i);
786 	(void) fmd_conf_getprop(fmd.d_conf, "log.xprt", &s);
787 
788 	if (i) {
789 		(void) snprintf(buf, sizeof (buf), "%s/%u.log", s, xip->xi_id);
790 		xip->xi_log = fmd_log_open(fmd.d_rootdir, buf, FMD_LOG_XPRT);
791 	}
792 
793 	ASSERT(fmd_module_locked(mp));
794 	fmd_list_append(&mp->mod_transports, xip);
795 
796 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
797 	mp->mod_stats->ms_xprtopen.fmds_value.ui32++;
798 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
799 
800 	/*
801 	 * If this is a read-only transport, return without creating a send
802 	 * queue thread and setting up any connection events in our queue.
803 	 */
804 	if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
805 		goto out;
806 
807 	/*
808 	 * Once the transport is fully initialized, create a send queue thread
809 	 * and start any connect events flowing to complete our initialization.
810 	 */
811 	if ((xip->xi_thread = fmd_thread_create(mp,
812 	    (fmd_thread_f *)fmd_xprt_send, xip)) == NULL) {
813 
814 		fmd_error(EFMD_XPRT_THR,
815 		    "failed to create thread for transport %u", xip->xi_id);
816 
817 		fmd_xprt_destroy((fmd_xprt_t *)xip);
818 		(void) fmd_set_errno(EFMD_XPRT_THR);
819 		return (NULL);
820 	}
821 
822 	/*
823 	 * If the transport is not being opened to accept an inbound connect,
824 	 * start an outbound connection by enqueuing a SYN event for our peer.
825 	 */
826 	if (!(flags & FMD_XPRT_ACCEPT)) {
827 		nvl = fmd_protocol_xprt_ctl(mp,
828 		    "resource.fm.xprt.syn", FM_RSRC_XPRT_VERSION);
829 
830 		(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
831 		e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
832 		fmd_eventq_insert_at_time(xip->xi_queue, e);
833 	}
834 out:
835 	fmd_dprintf(FMD_DBG_XPRT, "opened transport %u\n", xip->xi_id);
836 	return ((fmd_xprt_t *)xip);
837 }
838 
839 void
840 fmd_xprt_destroy(fmd_xprt_t *xp)
841 {
842 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
843 	fmd_module_t *mp = xip->xi_queue->eq_mod;
844 	uint_t id = xip->xi_id;
845 
846 	fmd_case_impl_t *cip, *nip;
847 	fmd_stat_t *sp;
848 	uint_t i, n;
849 
850 	ASSERT(fmd_module_locked(mp));
851 	fmd_list_delete(&mp->mod_transports, xip);
852 
853 	(void) pthread_mutex_lock(&mp->mod_stats_lock);
854 	mp->mod_stats->ms_xprtopen.fmds_value.ui32--;
855 	(void) pthread_mutex_unlock(&mp->mod_stats_lock);
856 
857 	(void) pthread_mutex_lock(&xip->xi_lock);
858 
859 	while (xip->xi_busy != 0)
860 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
861 
862 	/*
863 	 * Remove the transport from global visibility, cancel its send-side
864 	 * thread, join with it, and then remove the transport from module
865 	 * visibility.  Once all this is done, destroy and free the transport.
866 	 */
867 	(void) fmd_idspace_free(fmd.d_xprt_ids, xip->xi_id);
868 
869 	if (xip->xi_thread != NULL) {
870 		fmd_eventq_abort(xip->xi_queue);
871 		fmd_module_unlock(mp);
872 		fmd_thread_destroy(xip->xi_thread, FMD_THREAD_JOIN);
873 		fmd_module_lock(mp);
874 	}
875 
876 	if (xip->xi_log != NULL)
877 		fmd_log_rele(xip->xi_log);
878 
879 	/*
880 	 * Release every case handle in the module that was cached by this
881 	 * transport.  This will result in these cases disappearing from the
882 	 * local case hash so that fmd_case_uuclose() can no longer be used.
883 	 */
884 	for (cip = fmd_list_next(&mp->mod_cases); cip != NULL; cip = nip) {
885 		nip = fmd_list_next(cip);
886 		if (cip->ci_xprt == xp)
887 			fmd_case_discard((fmd_case_t *)cip);
888 	}
889 
890 	/*
891 	 * Destroy every class in the various subscription hashes and remove
892 	 * any corresponding subscriptions from the event dispatch queue.
893 	 */
894 	fmd_xprt_class_hash_destroy(&xip->xi_lsub);
895 	fmd_xprt_class_hash_destroy(&xip->xi_rsub);
896 	fmd_xprt_class_hash_destroy(&xip->xi_usub);
897 
898 	/*
899 	 * Uniquify the stat names exactly as was done in fmd_xprt_create()
900 	 * before calling fmd_ustat_insert(), otherwise fmd_ustat_delete()
901 	 * won't find the entries in the hash table.
902 	 */
903 	n = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t);
904 	sp = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP);
905 	bcopy(&_fmd_xprt_stat_tmpl, sp, sizeof (_fmd_xprt_stat_tmpl));
906 	for (i = 0; i < n; i++) {
907 		(void) snprintf(sp[i].fmds_name,
908 		    sizeof (sp[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id,
909 		    ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name);
910 	}
911 	fmd_ustat_delete(mp->mod_ustat, n, sp);
912 	fmd_free(sp, sizeof (_fmd_xprt_stat_tmpl));
913 
914 	fmd_free(xip->xi_stats, sizeof (fmd_xprt_stat_t));
915 	fmd_eventq_destroy(xip->xi_queue);
916 	nvlist_free(xip->xi_auth);
917 	fmd_free(xip, sizeof (fmd_xprt_impl_t));
918 
919 	fmd_dprintf(FMD_DBG_XPRT, "closed transport %u\n", id);
920 }
921 
922 void
923 fmd_xprt_xsuspend(fmd_xprt_t *xp, uint_t flags)
924 {
925 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
926 	uint_t oflags;
927 
928 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
929 	(void) pthread_mutex_lock(&xip->xi_lock);
930 
931 	oflags = xip->xi_flags;
932 	xip->xi_flags |= flags;
933 
934 	if (!(oflags & FMD_XPRT_SMASK) && (xip->xi_flags & FMD_XPRT_SMASK) != 0)
935 		fmd_eventq_suspend(xip->xi_queue);
936 
937 	(void) pthread_cond_broadcast(&xip->xi_cv);
938 
939 	while (xip->xi_busy != 0)
940 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
941 
942 	(void) pthread_mutex_unlock(&xip->xi_lock);
943 }
944 
945 void
946 fmd_xprt_xresume(fmd_xprt_t *xp, uint_t flags)
947 {
948 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
949 	uint_t oflags;
950 
951 	ASSERT((flags & ~FMD_XPRT_SMASK) == 0);
952 	(void) pthread_mutex_lock(&xip->xi_lock);
953 
954 	oflags = xip->xi_flags;
955 	xip->xi_flags &= ~flags;
956 
957 	if ((oflags & FMD_XPRT_SMASK) != 0 && !(xip->xi_flags & FMD_XPRT_SMASK))
958 		fmd_eventq_resume(xip->xi_queue);
959 
960 	(void) pthread_cond_broadcast(&xip->xi_cv);
961 	(void) pthread_mutex_unlock(&xip->xi_lock);
962 }
963 
964 void
965 fmd_xprt_send(fmd_xprt_t *xp)
966 {
967 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
968 	fmd_module_t *mp = xip->xi_queue->eq_mod;
969 	fmd_event_t *ep;
970 	int err;
971 
972 	while ((ep = fmd_eventq_delete(xip->xi_queue)) != NULL) {
973 		if (FMD_EVENT_TTL(ep) == 0) {
974 			fmd_event_rele(ep);
975 			continue;
976 		}
977 
978 		fmd_dprintf(FMD_DBG_XPRT, "xprt %u sending %s\n",
979 		    xip->xi_id, (char *)FMD_EVENT_DATA(ep));
980 
981 		err = mp->mod_ops->mop_transport(mp, xp, ep);
982 		fmd_eventq_done(xip->xi_queue);
983 
984 		if (err == FMD_SEND_RETRY) {
985 			fmd_eventq_insert_at_time(xip->xi_queue, ep);
986 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
987 			xip->xi_stats->xs_retried.fmds_value.ui64++;
988 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
989 		}
990 
991 		if (err != FMD_SEND_SUCCESS && err != FMD_SEND_RETRY) {
992 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
993 			xip->xi_stats->xs_lost.fmds_value.ui64++;
994 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
995 		}
996 
997 		fmd_event_rele(ep);
998 	}
999 }
1000 
1001 void
1002 fmd_xprt_recv(fmd_xprt_t *xp, nvlist_t *nvl, hrtime_t hrt, boolean_t logonly)
1003 {
1004 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1005 	const fmd_xprt_rule_t *xrp;
1006 	fmd_t *dp = &fmd;
1007 
1008 	fmd_event_t *e;
1009 	char *class, *uuid, *code;
1010 	boolean_t isproto, isereport;
1011 
1012 	uint64_t *tod;
1013 	uint8_t ttl;
1014 	uint_t n;
1015 
1016 	/*
1017 	 * Grab the transport lock and set the busy flag to indicate we are
1018 	 * busy receiving an event.  If [DI]SUSPEND is pending, wait until fmd
1019 	 * resumes the transport before continuing on with the receive.
1020 	 */
1021 	(void) pthread_mutex_lock(&xip->xi_lock);
1022 
1023 	while (xip->xi_flags & (FMD_XPRT_DSUSPENDED | FMD_XPRT_ISUSPENDED)) {
1024 
1025 		if (fmd.d_signal != 0) {
1026 			(void) pthread_mutex_unlock(&xip->xi_lock);
1027 			return; /* fmd_destroy() is in progress */
1028 		}
1029 
1030 		(void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock);
1031 	}
1032 
1033 	xip->xi_busy++;
1034 	ASSERT(xip->xi_busy != 0);
1035 
1036 	(void) pthread_mutex_unlock(&xip->xi_lock);
1037 
1038 	(void) pthread_mutex_lock(&xip->xi_stats_lock);
1039 	xip->xi_stats->xs_received.fmds_value.ui64++;
1040 	(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1041 
1042 	if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) {
1043 		fmd_error(EFMD_XPRT_PAYLOAD, "discarding nvlist %p: missing "
1044 		    "required \"%s\" payload element", (void *)nvl, FM_CLASS);
1045 
1046 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1047 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1048 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1049 
1050 		nvlist_free(nvl);
1051 		goto done;
1052 	}
1053 
1054 	fmd_dprintf(FMD_DBG_XPRT, "xprt %u %s %s\n", xip->xi_id,
1055 	    ((logonly == FMD_B_TRUE) ? "logging" : "posting"), class);
1056 
1057 	isereport = (strncmp(class, FM_EREPORT_CLASS,
1058 	    sizeof (FM_EREPORT_CLASS - 1)) == 0) ? FMD_B_TRUE : FMD_B_FALSE;
1059 
1060 	/*
1061 	 * The logonly flag should only be set for ereports.
1062 	 */
1063 	if ((logonly == FMD_B_TRUE) && (isereport == FMD_B_FALSE)) {
1064 		fmd_error(EFMD_XPRT_INVAL, "discarding nvlist %p: "
1065 		    "logonly flag is not valid for class %s",
1066 		    (void *)nvl, class);
1067 
1068 		(void) pthread_mutex_lock(&xip->xi_stats_lock);
1069 		xip->xi_stats->xs_discarded.fmds_value.ui64++;
1070 		(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1071 
1072 		nvlist_free(nvl);
1073 		goto done;
1074 	}
1075 
1076 	/*
1077 	 * If a time-to-live value is present in the event and is zero, drop
1078 	 * the event and bump xs_timeouts.  Otherwise decrement the TTL value.
1079 	 */
1080 	if (nvlist_lookup_uint8(nvl, FMD_EVN_TTL, &ttl) == 0) {
1081 		if (ttl == 0) {
1082 			fmd_dprintf(FMD_DBG_XPRT, "xprt %u nvlist %p (%s) "
1083 			    "timeout: event received with ttl=0\n",
1084 			    xip->xi_id, (void *)nvl, class);
1085 
1086 			(void) pthread_mutex_lock(&xip->xi_stats_lock);
1087 			xip->xi_stats->xs_timeouts.fmds_value.ui64++;
1088 			(void) pthread_mutex_unlock(&xip->xi_stats_lock);
1089 
1090 			nvlist_free(nvl);
1091 			goto done;
1092 		}
1093 		(void) nvlist_remove(nvl, FMD_EVN_TTL, DATA_TYPE_UINT8);
1094 		(void) nvlist_add_uint8(nvl, FMD_EVN_TTL, ttl - 1);
1095 	}
1096 
1097 	/*
1098 	 * If we are using the native system clock, the underlying transport
1099 	 * code can provide a tighter event time bound by telling us when the
1100 	 * event was enqueued.  If we're using simulated clocks, this time
1101 	 * has no meaning to us, so just reset the value to use HRT_NOW.
1102 	 */
1103 	if (dp->d_clockops != &fmd_timeops_native)
1104 		hrt = FMD_HRT_NOW;
1105 
1106 	/*
1107 	 * If an event's class is in the FMD_CTL_CLASS family, then create a
1108 	 * control event.  If a FMD_EVN_TOD member is found, create a protocol
1109 	 * event using this time.  Otherwise create a protocol event using hrt.
1110 	 */
1111 	isproto = (strncmp(class, FMD_CTL_CLASS, FMD_CTL_CLASS_LEN) == 0) ?
1112 	    FMD_B_FALSE : FMD_B_TRUE;
1113 	if (isproto == FMD_B_FALSE)
1114 		e = fmd_event_create(FMD_EVT_CTL, hrt, nvl, fmd_ctl_init(nvl));
1115 	else if (nvlist_lookup_uint64_array(nvl, FMD_EVN_TOD, &tod, &n) != 0)
1116 		e = fmd_event_create(FMD_EVT_PROTOCOL, hrt, nvl, class);
1117 	else {
1118 		e = fmd_event_recreate(FMD_EVT_PROTOCOL,
1119 		    NULL, nvl, class, NULL, 0, 0);
1120 	}
1121 
1122 	/*
1123 	 * If the debug log is enabled, create a temporary event, log it to the
1124 	 * debug log, and then reset the underlying state of the event.
1125 	 */
1126 	if (xip->xi_log != NULL) {
1127 		fmd_event_impl_t *ep = (fmd_event_impl_t *)e;
1128 
1129 		fmd_log_append(xip->xi_log, e, NULL);
1130 
1131 		ep->ev_flags |= FMD_EVF_VOLATILE;
1132 		ep->ev_off = 0;
1133 		ep->ev_len = 0;
1134 
1135 		if (ep->ev_log != NULL) {
1136 			fmd_log_rele(ep->ev_log);
1137 			ep->ev_log = NULL;
1138 		}
1139 	}
1140 
1141 	/*
1142 	 * Iterate over the rules for the current state trying to match the
1143 	 * event class to one of our special rules.  If a rule is matched, the
1144 	 * event is consumed and not dispatched to other modules.  If the rule
1145 	 * set ends without matching an event, we fall through to dispatching.
1146 	 */
1147 	for (xrp = xip->xi_state; xrp->xr_class != NULL; xrp++) {
1148 		if (fmd_event_match(e, FMD_EVT_PROTOCOL, xrp->xr_class)) {
1149 			fmd_event_hold(e);
1150 			xrp->xr_func(xip, nvl);
1151 			fmd_event_rele(e);
1152 			goto done;
1153 		}
1154 	}
1155 
1156 	/*
1157 	 * Record the event in the errlog if it is an ereport.  This code will
1158 	 * be replaced later with a per-transport intent log instead.
1159 	 */
1160 	if (isereport == FMD_B_TRUE) {
1161 		(void) pthread_rwlock_rdlock(&dp->d_log_lock);
1162 		fmd_log_append(dp->d_errlog, e, NULL);
1163 		(void) pthread_rwlock_unlock(&dp->d_log_lock);
1164 	}
1165 
1166 	/*
1167 	 * If a list.suspect event is received, create a case for the specified
1168 	 * UUID in the case hash, with the transport module as its owner.  If
1169 	 * the UUID is already known, fmd_case_recreate() will return NULL and
1170 	 * we simply proceed to our normal event handling regardless.
1171 	 */
1172 	if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_SUSPECT_CLASS) &&
1173 	    nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 &&
1174 	    nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code) == 0) {
1175 		fmd_module_lock(xip->xi_queue->eq_mod);
1176 		(void) fmd_case_recreate(xip->xi_queue->eq_mod,
1177 		    xp, FMD_CASE_SOLVED, uuid, code);
1178 		fmd_module_unlock(xip->xi_queue->eq_mod);
1179 	}
1180 
1181 	if (logonly == FMD_B_TRUE) {
1182 		fmd_event_hold(e);
1183 		fmd_event_rele(e);
1184 	} else if (isproto == FMD_B_TRUE)
1185 		fmd_dispq_dispatch(dp->d_disp, e, class);
1186 	else
1187 		fmd_modhash_dispatch(dp->d_mod_hash, e);
1188 done:
1189 	(void) pthread_mutex_lock(&xip->xi_lock);
1190 
1191 	ASSERT(xip->xi_busy != 0);
1192 	xip->xi_busy--;
1193 
1194 	(void) pthread_cond_broadcast(&xip->xi_cv);
1195 	(void) pthread_mutex_unlock(&xip->xi_lock);
1196 }
1197 
1198 void
1199 fmd_xprt_uuclose(fmd_xprt_t *xp, const char *uuid)
1200 {
1201 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1202 
1203 	fmd_event_t *e;
1204 	nvlist_t *nvl;
1205 	char *s;
1206 
1207 	fmd_dprintf(FMD_DBG_XPRT,
1208 	    "xprt %u closing case %s\n", xip->xi_id, uuid);
1209 
1210 	nvl = fmd_protocol_xprt_uuclose(xip->xi_queue->eq_mod,
1211 	    "resource.fm.xprt.uuclose", xip->xi_version, uuid);
1212 
1213 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1214 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1215 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1216 }
1217 
1218 /*
1219  * Insert the specified class into our remote subscription hash.  If the class
1220  * is already present, bump the reference count; otherwise add it to the hash
1221  * and then enqueue an event for our remote peer to proxy our subscription.
1222  */
1223 void
1224 fmd_xprt_subscribe(fmd_xprt_t *xp, const char *class)
1225 {
1226 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1227 
1228 	uint_t refs;
1229 	nvlist_t *nvl;
1230 	fmd_event_t *e;
1231 	char *s;
1232 
1233 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1234 		return; /* read-only transports do not proxy subscriptions */
1235 
1236 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1237 		return; /* transport is not yet an active subscriber */
1238 
1239 	(void) pthread_mutex_lock(&xip->xi_lock);
1240 	refs = fmd_xprt_class_hash_insert(xip, &xip->xi_rsub, class);
1241 	(void) pthread_mutex_unlock(&xip->xi_lock);
1242 
1243 	if (refs > 1)
1244 		return; /* we've already asked our peer for this subscription */
1245 
1246 	fmd_dprintf(FMD_DBG_XPRT,
1247 	    "xprt %u subscribing to %s\n", xip->xi_id, class);
1248 
1249 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1250 	    "resource.fm.xprt.subscribe", xip->xi_version, class);
1251 
1252 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1253 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1254 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1255 }
1256 
1257 /*
1258  * Delete the specified class from the remote subscription hash.  If the
1259  * reference count drops to zero, ask our remote peer to unsubscribe by proxy.
1260  */
1261 void
1262 fmd_xprt_unsubscribe(fmd_xprt_t *xp, const char *class)
1263 {
1264 	fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp;
1265 
1266 	uint_t refs;
1267 	nvlist_t *nvl;
1268 	fmd_event_t *e;
1269 	char *s;
1270 
1271 	if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY)
1272 		return; /* read-only transports do not proxy subscriptions */
1273 
1274 	if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER))
1275 		return; /* transport is not yet an active subscriber */
1276 
1277 	/*
1278 	 * If the subscription reference count drops to zero in xi_rsub, insert
1279 	 * an entry into the xi_usub hash indicating we await an unsuback event.
1280 	 */
1281 	(void) pthread_mutex_lock(&xip->xi_lock);
1282 
1283 	if ((refs = fmd_xprt_class_hash_delete(xip, &xip->xi_rsub, class)) == 0)
1284 		(void) fmd_xprt_class_hash_insert(xip, &xip->xi_usub, class);
1285 
1286 	(void) pthread_mutex_unlock(&xip->xi_lock);
1287 
1288 	if (refs != 0)
1289 		return; /* other subscriptions for this class still active */
1290 
1291 	fmd_dprintf(FMD_DBG_XPRT,
1292 	    "xprt %u unsubscribing from %s\n", xip->xi_id, class);
1293 
1294 	nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod,
1295 	    "resource.fm.xprt.unsubscribe", xip->xi_version, class);
1296 
1297 	(void) nvlist_lookup_string(nvl, FM_CLASS, &s);
1298 	e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s);
1299 	fmd_eventq_insert_at_time(xip->xi_queue, e);
1300 }
1301 
1302 static void
1303 fmd_xprt_subscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1304 {
1305 	fmd_xprt_t *xp;
1306 
1307 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1308 		fmd_xprt_subscribe(xp, class);
1309 		fmd_idspace_rele(ids, id);
1310 	}
1311 }
1312 
1313 void
1314 fmd_xprt_subscribe_all(const char *class)
1315 {
1316 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1317 
1318 	if (ids->ids_count != 0)
1319 		fmd_idspace_apply(ids, fmd_xprt_subscribe_xid, (void *)class);
1320 }
1321 
1322 static void
1323 fmd_xprt_unsubscribe_xid(fmd_idspace_t *ids, id_t id, void *class)
1324 {
1325 	fmd_xprt_t *xp;
1326 
1327 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1328 		fmd_xprt_unsubscribe(xp, class);
1329 		fmd_idspace_rele(ids, id);
1330 	}
1331 }
1332 
1333 void
1334 fmd_xprt_unsubscribe_all(const char *class)
1335 {
1336 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1337 
1338 	if (ids->ids_count != 0)
1339 		fmd_idspace_apply(ids, fmd_xprt_unsubscribe_xid, (void *)class);
1340 }
1341 
1342 /*ARGSUSED*/
1343 static void
1344 fmd_xprt_suspend_xid(fmd_idspace_t *ids, id_t id, void *arg)
1345 {
1346 	fmd_xprt_t *xp;
1347 
1348 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1349 		fmd_xprt_xsuspend(xp, FMD_XPRT_DSUSPENDED);
1350 		fmd_idspace_rele(ids, id);
1351 	}
1352 }
1353 
1354 void
1355 fmd_xprt_suspend_all(void)
1356 {
1357 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1358 
1359 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1360 
1361 	if (fmd.d_xprt_suspend++ != 0) {
1362 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1363 		return; /* already suspended */
1364 	}
1365 
1366 	if (ids->ids_count != 0)
1367 		fmd_idspace_apply(ids, fmd_xprt_suspend_xid, NULL);
1368 
1369 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1370 }
1371 
1372 /*ARGSUSED*/
1373 static void
1374 fmd_xprt_resume_xid(fmd_idspace_t *ids, id_t id, void *arg)
1375 {
1376 	fmd_xprt_t *xp;
1377 
1378 	if ((xp = fmd_idspace_hold(ids, id)) != NULL) {
1379 		fmd_xprt_xresume(xp, FMD_XPRT_DSUSPENDED);
1380 		fmd_idspace_rele(ids, id);
1381 	}
1382 }
1383 
1384 void
1385 fmd_xprt_resume_all(void)
1386 {
1387 	fmd_idspace_t *ids = fmd.d_xprt_ids;
1388 
1389 	(void) pthread_mutex_lock(&fmd.d_xprt_lock);
1390 
1391 	if (fmd.d_xprt_suspend == 0)
1392 		fmd_panic("fmd_xprt_suspend/resume_all mismatch\n");
1393 
1394 	if (--fmd.d_xprt_suspend != 0) {
1395 		(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1396 		return; /* not ready to be resumed */
1397 	}
1398 
1399 	if (ids->ids_count != 0)
1400 		fmd_idspace_apply(ids, fmd_xprt_resume_xid, NULL);
1401 
1402 	(void) pthread_mutex_unlock(&fmd.d_xprt_lock);
1403 }
1404