1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 /* 30 * FMD Transport Subsystem 31 * 32 * A transport module uses some underlying mechanism to transport events. 33 * This mechanism may use any underlying link-layer protocol and may support 34 * additional link-layer packets unrelated to FMA. Some appropriate link- 35 * layer mechanism to create the underlying connection is expected to be 36 * called prior to calling fmd_xprt_open() itself. Alternatively, a transport 37 * may be created in the suspended state by specifying the FMD_XPRT_SUSPENDED 38 * flag as part of the call to fmd_xprt_open(), and then may be resumed later. 39 * The underlying transport mechanism is *required* to provide ordering: that 40 * is, the sequences of bytes written across the transport must be read by 41 * the remote peer in the order that they are written, even across separate 42 * calls to fmdo_send(). As an example, the Internet TCP protocol would be 43 * a valid transport as it guarantees ordering, whereas the Internet UDP 44 * protocol would not because UDP datagrams may be delivered in any order 45 * as a result of delays introduced when datagrams pass through routers. 46 * 47 * Similar to sending events, a transport module receives events that are from 48 * its peer remote endpoint using some transport-specific mechanism that is 49 * unknown to FMD. As each event is received, the transport module is 50 * responsible for constructing a valid nvlist_t object from the data and then 51 * calling fmd_xprt_post() to post the event to the containing FMD's dispatch 52 * queue, making it available to all local modules that are not transport 53 * modules that have subscribed to the event. 54 * 55 * The following state machine is used for each transport. The initial state 56 * is either SYN, ACK, or RUN, depending on the flags specified to xprt_create. 57 * 58 * FMD_XPRT_ACCEPT !FMD_XPRT_ACCEPT 59 * | | 60 * waiting +--v--+ +--v--+ waiting 61 * for syn | SYN |--+ --+| ACK | for ack 62 * event +-----+ \ / +-----+ event 63 * | \ / | 64 * drop all +--v--+ X +--v--+ send subscriptions, 65 * events | ERR |<---+ +--->| SUB | recv subscriptions, 66 * +-----+ +-----+ wait for run event 67 * ^ | 68 * | +-----+ | 69 * +-----| RUN |<----+ 70 * +--^--+ 71 * | 72 * FMD_XPRT_RDONLY 73 * 74 * When fmd_xprt_open() is called without FMD_XPRT_ACCEPT, the Common Transport 75 * Layer enqueues a "syn" event for the module in its event queue and sets the 76 * state to ACK. In state ACK, we are waiting for the transport to get an 77 * "ack" event and call fmd_xprt_post() on this event. Other events will be 78 * discarded. If an "ack" is received, we transition to state SUB. If a 79 * configurable timeout occurs or if the "ack" is invalid (e.g. invalid version 80 * exchange), we transition to state ERR. Once in state ERR, no further 81 * operations are valid except fmd_xprt_close() and fmd_xprt_error() will 82 * return a non-zero value to the caller indicating the transport has failed. 83 * 84 * When fmd_xprt_open() is called with FMD_XPRT_ACCEPT, the Common Transport 85 * Layer assumes this transport is being used to accept a virtual connection 86 * from a remote peer that is sending a "syn", and sets the initial state to 87 * SYN. In this state, the transport waits for a "syn" event, validates it, 88 * and then transitions to state SUB if it is valid or state ERR if it is not. 89 * 90 * Once in state SUB, the transport module is expected to receive a sequence of 91 * zero or more "subscribe" events from the remote peer, followed by a "run" 92 * event. Once in state RUN, the transport is active and any events can be 93 * sent or received. The transport module is free to call fmd_xprt_close() 94 * from any state. The fmd_xprt_error() function will return zero if the 95 * transport is not in the ERR state, or non-zero if it is in the ERR state. 96 * 97 * Once the state machine reaches RUN, other FMA protocol events can be sent 98 * and received across the transport in addition to the various control events. 99 * 100 * Table of Common Transport Layer Control Events 101 * ============================================== 102 * 103 * FMA Class Payload 104 * --------- ------- 105 * resource.fm.xprt.uuclose string (uuid of case) 106 * resource.fm.xprt.subscribe string (class pattern) 107 * resource.fm.xprt.unsubscribe string (class pattern) 108 * resource.fm.xprt.unsuback string (class pattern) 109 * resource.fm.xprt.syn version information 110 * resource.fm.xprt.ack version information 111 * resource.fm.xprt.run version information 112 * 113 * Control events are used to add and delete proxy subscriptions on the remote 114 * transport peer module, and to set up connections. When a "syn" event is 115 * sent, FMD will include in the payload the highest version of the FMA event 116 * protocol that is supported by the sender. When a "syn" event is received, 117 * the receiving FMD will use the minimum of this version and its version of 118 * the protocol, and reply with this new minimum version in the "ack" event. 119 * The receiver will then use this new minimum for subsequent event semantics. 120 */ 121 122 #include <sys/fm/protocol.h> 123 #include <strings.h> 124 #include <limits.h> 125 126 #include <fmd_alloc.h> 127 #include <fmd_error.h> 128 #include <fmd_conf.h> 129 #include <fmd_subr.h> 130 #include <fmd_string.h> 131 #include <fmd_protocol.h> 132 #include <fmd_thread.h> 133 #include <fmd_eventq.h> 134 #include <fmd_dispq.h> 135 #include <fmd_ctl.h> 136 #include <fmd_log.h> 137 #include <fmd_ustat.h> 138 #include <fmd_case.h> 139 #include <fmd_api.h> 140 #include <fmd_fmri.h> 141 #include <fmd_asru.h> 142 #include <fmd_xprt.h> 143 144 #include <fmd.h> 145 146 /* 147 * The states shown above in the transport state machine diagram are encoded 148 * using arrays of class patterns and a corresponding action function. These 149 * arrays are then passed to fmd_xprt_transition() to change transport states. 150 */ 151 152 const fmd_xprt_rule_t _fmd_xprt_state_syn[] = { 153 { "resource.fm.xprt.syn", fmd_xprt_event_syn }, 154 { "*", fmd_xprt_event_error }, 155 { NULL, NULL } 156 }; 157 158 const fmd_xprt_rule_t _fmd_xprt_state_ack[] = { 159 { "resource.fm.xprt.ack", fmd_xprt_event_ack }, 160 { "*", fmd_xprt_event_error }, 161 }; 162 163 const fmd_xprt_rule_t _fmd_xprt_state_err[] = { 164 { "*", fmd_xprt_event_drop }, 165 { NULL, NULL } 166 }; 167 168 const fmd_xprt_rule_t _fmd_xprt_state_sub[] = { 169 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub }, 170 { "resource.fm.xprt.run", fmd_xprt_event_run }, 171 { "resource.fm.xprt.*", fmd_xprt_event_error }, 172 { "*", fmd_xprt_event_drop }, 173 { NULL, NULL } 174 }; 175 176 const fmd_xprt_rule_t _fmd_xprt_state_run[] = { 177 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub }, 178 { "resource.fm.xprt.unsubscribe", fmd_xprt_event_unsub }, 179 { "resource.fm.xprt.unsuback", fmd_xprt_event_unsuback }, 180 { "resource.fm.xprt.uuclose", fmd_xprt_event_uuclose }, 181 { "resource.fm.xprt.*", fmd_xprt_event_error }, 182 { NULL, NULL } 183 }; 184 185 /* 186 * Template for per-transport statistics installed by fmd on behalf of each 187 * transport. These are used to initialize the per-transport xi_stats. For 188 * each statistic, the name is prepended with "fmd.xprt.%u", where %u is the 189 * transport ID (xi_id) and then are inserted into the per-module stats hash. 190 * The values in this array must match fmd_xprt_stat_t from <fmd_xprt.h>. 191 */ 192 static const fmd_xprt_stat_t _fmd_xprt_stat_tmpl = { 193 { 194 { "dispatched", FMD_TYPE_UINT64, "total events dispatched to transport" }, 195 { "dequeued", FMD_TYPE_UINT64, "total events dequeued by transport" }, 196 { "prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by transport" }, 197 { "dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" }, 198 { "wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" }, 199 { "wtime", FMD_TYPE_TIME, "total wait time on queue" }, 200 { "wlentime", FMD_TYPE_TIME, "total wait length * time product" }, 201 { "wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" }, 202 { "dtime", FMD_TYPE_TIME, "total processing time after dequeue" }, 203 { "dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" }, 204 }, 205 { "module", FMD_TYPE_STRING, "module that owns this transport" }, 206 { "authority", FMD_TYPE_STRING, "authority associated with this transport" }, 207 { "state", FMD_TYPE_STRING, "current transport state" }, 208 { "received", FMD_TYPE_UINT64, "events received by transport" }, 209 { "discarded", FMD_TYPE_UINT64, "bad events discarded by transport" }, 210 { "retried", FMD_TYPE_UINT64, "retries requested of transport" }, 211 { "replayed", FMD_TYPE_UINT64, "events replayed by transport" }, 212 { "lost", FMD_TYPE_UINT64, "events lost by transport" }, 213 { "timeouts", FMD_TYPE_UINT64, "events received by transport with ttl=0" }, 214 { "subscriptions", FMD_TYPE_UINT64, "subscriptions registered to transport" }, 215 }; 216 217 static void 218 fmd_xprt_class_hash_create(fmd_xprt_class_hash_t *xch, fmd_eventq_t *eq) 219 { 220 uint_t hashlen = fmd.d_str_buckets; 221 222 xch->xch_queue = eq; 223 xch->xch_hashlen = hashlen; 224 xch->xch_hash = fmd_zalloc(sizeof (void *) * hashlen, FMD_SLEEP); 225 } 226 227 static void 228 fmd_xprt_class_hash_destroy(fmd_xprt_class_hash_t *xch) 229 { 230 fmd_eventq_t *eq = xch->xch_queue; 231 fmd_xprt_class_t *xcp, *ncp; 232 uint_t i; 233 234 for (i = 0; i < xch->xch_hashlen; i++) { 235 for (xcp = xch->xch_hash[i]; xcp != NULL; xcp = ncp) { 236 ncp = xcp->xc_next; 237 238 if (eq != NULL) 239 fmd_dispq_delete(fmd.d_disp, eq, xcp->xc_class); 240 241 fmd_strfree(xcp->xc_class); 242 fmd_free(xcp, sizeof (fmd_xprt_class_t)); 243 } 244 } 245 246 fmd_free(xch->xch_hash, sizeof (void *) * xch->xch_hashlen); 247 } 248 249 /* 250 * Insert the specified class into the specified class hash, and return the 251 * reference count. A return value of one indicates this is the first insert. 252 * If an eventq is associated with the hash, insert a dispq subscription for it. 253 */ 254 static uint_t 255 fmd_xprt_class_hash_insert(fmd_xprt_impl_t *xip, 256 fmd_xprt_class_hash_t *xch, const char *class) 257 { 258 uint_t h = fmd_strhash(class) % xch->xch_hashlen; 259 fmd_xprt_class_t *xcp; 260 261 ASSERT(MUTEX_HELD(&xip->xi_lock)); 262 263 for (xcp = xch->xch_hash[h]; xcp != NULL; xcp = xcp->xc_next) { 264 if (strcmp(class, xcp->xc_class) == 0) 265 return (++xcp->xc_refs); 266 } 267 268 xcp = fmd_alloc(sizeof (fmd_xprt_class_t), FMD_SLEEP); 269 xcp->xc_class = fmd_strdup(class, FMD_SLEEP); 270 xcp->xc_next = xch->xch_hash[h]; 271 xcp->xc_refs = 1; 272 xch->xch_hash[h] = xcp; 273 274 if (xch->xch_queue != NULL) 275 fmd_dispq_insert(fmd.d_disp, xch->xch_queue, class); 276 277 return (xcp->xc_refs); 278 } 279 280 /* 281 * Delete the specified class from the specified class hash, and return the 282 * reference count. A return value of zero indicates the class was deleted. 283 * If an eventq is associated with the hash, delete the dispq subscription. 284 */ 285 static uint_t 286 fmd_xprt_class_hash_delete(fmd_xprt_impl_t *xip, 287 fmd_xprt_class_hash_t *xch, const char *class) 288 { 289 uint_t h = fmd_strhash(class) % xch->xch_hashlen; 290 fmd_xprt_class_t *xcp, **pp; 291 292 ASSERT(MUTEX_HELD(&xip->xi_lock)); 293 pp = &xch->xch_hash[h]; 294 295 for (xcp = *pp; xcp != NULL; xcp = xcp->xc_next) { 296 if (strcmp(class, xcp->xc_class) == 0) 297 break; 298 else 299 pp = &xcp->xc_next; 300 } 301 302 if (xcp == NULL) 303 return (-1U); /* explicitly permit an invalid delete */ 304 305 if (--xcp->xc_refs != 0) 306 return (xcp->xc_refs); 307 308 ASSERT(xcp->xc_refs == 0); 309 *pp = xcp->xc_next; 310 311 fmd_strfree(xcp->xc_class); 312 fmd_free(xcp, sizeof (fmd_xprt_class_t)); 313 314 if (xch->xch_queue != NULL) 315 fmd_dispq_delete(fmd.d_disp, xch->xch_queue, class); 316 317 return (0); 318 } 319 320 /* 321 * Queue subscribe events for the specified transport corresponding to all of 322 * the active module subscriptions. This is an extremely heavyweight operation 323 * that we expect to take place rarely (i.e. when loading a transport module 324 * or when it establishes a connection). We lock all of the known modules to 325 * prevent them from adding or deleting subscriptions, then snapshot their 326 * subscriptions, and then unlock all of the modules. We hold the modhash 327 * lock for the duration of this operation to prevent new modules from loading. 328 */ 329 static void 330 fmd_xprt_subscribe_modhash(fmd_xprt_impl_t *xip, fmd_modhash_t *mhp) 331 { 332 fmd_xprt_t *xp = (fmd_xprt_t *)xip; 333 const fmd_conf_path_t *pap; 334 fmd_module_t *mp; 335 uint_t i, j; 336 337 (void) pthread_rwlock_rdlock(&mhp->mh_lock); 338 339 for (i = 0; i < mhp->mh_hashlen; i++) { 340 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) 341 fmd_module_lock(mp); 342 } 343 344 (void) pthread_mutex_lock(&xip->xi_lock); 345 ASSERT(!(xip->xi_flags & FMD_XPRT_SUBSCRIBER)); 346 xip->xi_flags |= FMD_XPRT_SUBSCRIBER; 347 (void) pthread_mutex_unlock(&xip->xi_lock); 348 349 for (i = 0; i < mhp->mh_hashlen; i++) { 350 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) { 351 (void) fmd_conf_getprop(mp->mod_conf, 352 FMD_PROP_SUBSCRIPTIONS, &pap); 353 for (j = 0; j < pap->cpa_argc; j++) 354 fmd_xprt_subscribe(xp, pap->cpa_argv[j]); 355 } 356 } 357 358 for (i = 0; i < mhp->mh_hashlen; i++) { 359 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) 360 fmd_module_unlock(mp); 361 } 362 363 (void) pthread_rwlock_unlock(&mhp->mh_lock); 364 } 365 366 static void 367 fmd_xprt_transition(fmd_xprt_impl_t *xip, 368 const fmd_xprt_rule_t *state, const char *tag) 369 { 370 fmd_event_t *e; 371 nvlist_t *nvl; 372 char *s; 373 374 TRACE((FMD_DBG_XPRT, "xprt %u -> %s\n", xip->xi_id, tag)); 375 376 xip->xi_state = state; 377 s = fmd_strdup(tag, FMD_SLEEP); 378 379 (void) pthread_mutex_lock(&xip->xi_stats_lock); 380 fmd_strfree(xip->xi_stats->xs_state.fmds_value.str); 381 xip->xi_stats->xs_state.fmds_value.str = s; 382 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 383 384 /* 385 * If we've reached the SUB state, take out the big hammer and snapshot 386 * all of the subscriptions of all of the loaded modules. Then queue a 387 * run event for our remote peer indicating that it can enter RUN. 388 */ 389 if (state == _fmd_xprt_state_sub) { 390 fmd_xprt_subscribe_modhash(xip, fmd.d_mod_hash); 391 392 nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod, 393 "resource.fm.xprt.run", xip->xi_version); 394 395 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 396 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 397 fmd_eventq_insert_at_time(xip->xi_queue, e); 398 } 399 } 400 401 static void 402 fmd_xprt_authupdate(fmd_xprt_impl_t *xip) 403 { 404 char *s = fmd_fmri_auth2str(xip->xi_auth); 405 406 (void) pthread_mutex_lock(&xip->xi_stats_lock); 407 fmd_strfree(xip->xi_stats->xs_authority.fmds_value.str); 408 xip->xi_stats->xs_authority.fmds_value.str = s; 409 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 410 } 411 412 static int 413 fmd_xprt_vmismatch(fmd_xprt_impl_t *xip, nvlist_t *nvl, uint_t *rversionp) 414 { 415 uint8_t rversion; 416 417 if (nvlist_lookup_uint8(nvl, FM_VERSION, &rversion) != 0) { 418 (void) pthread_mutex_lock(&xip->xi_stats_lock); 419 xip->xi_stats->xs_discarded.fmds_value.ui64++; 420 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 421 422 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR"); 423 return (1); 424 } 425 426 if (rversion > xip->xi_version) { 427 fmd_dprintf(FMD_DBG_XPRT, "xprt %u protocol mismatch: %u>%u\n", 428 xip->xi_id, rversion, xip->xi_version); 429 430 (void) pthread_mutex_lock(&xip->xi_stats_lock); 431 xip->xi_stats->xs_discarded.fmds_value.ui64++; 432 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 433 434 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR"); 435 return (1); 436 } 437 438 if (rversionp != NULL) 439 *rversionp = rversion; 440 441 return (0); 442 } 443 444 void 445 fmd_xprt_event_syn(fmd_xprt_impl_t *xip, nvlist_t *nvl) 446 { 447 fmd_event_t *e; 448 uint_t vers; 449 char *class; 450 451 if (fmd_xprt_vmismatch(xip, nvl, &vers)) 452 return; /* transitioned to error state */ 453 454 /* 455 * If the transport module didn't specify an authority, extract the 456 * one that is passed along with the xprt.syn event and use that. 457 */ 458 if (xip->xi_auth == NULL && 459 nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 && 460 nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) { 461 (void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva); 462 fmd_xprt_authupdate(xip); 463 } 464 465 nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod, 466 "resource.fm.xprt.ack", xip->xi_version); 467 468 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 469 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 470 fmd_eventq_insert_at_time(xip->xi_queue, e); 471 472 xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers); 473 fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB"); 474 } 475 476 void 477 fmd_xprt_event_ack(fmd_xprt_impl_t *xip, nvlist_t *nvl) 478 { 479 uint_t vers; 480 481 if (fmd_xprt_vmismatch(xip, nvl, &vers)) 482 return; /* transitioned to error state */ 483 484 /* 485 * If the transport module didn't specify an authority, extract the 486 * one that is passed along with the xprt.syn event and use that. 487 */ 488 if (xip->xi_auth == NULL && 489 nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 && 490 nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) { 491 (void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva); 492 fmd_xprt_authupdate(xip); 493 } 494 495 xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers); 496 fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB"); 497 } 498 499 /* 500 * Upon transition to RUN, we take every solved case and resend a list.suspect 501 * event for it to our remote peer. If a case transitions from solved to a 502 * future state (CLOSE_WAIT, CLOSED, or REPAIRED) while we are iterating over 503 * the case hash, we will get it as part of examining the resource cache, next. 504 */ 505 static void 506 fmd_xprt_send_case(fmd_case_t *cp, void *arg) 507 { 508 fmd_case_impl_t *cip = (fmd_case_impl_t *)cp; 509 fmd_xprt_impl_t *xip = arg; 510 511 fmd_event_t *e; 512 nvlist_t *nvl; 513 char *class; 514 515 if (cip->ci_state != FMD_CASE_SOLVED) 516 return; /* unsolved, or we'll get it during the ASRU pass */ 517 518 nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS); 519 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 520 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 521 522 fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n", 523 FM_LIST_SUSPECT_CLASS, cip->ci_uuid, xip->xi_id); 524 525 fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid); 526 } 527 528 /* 529 * Upon transition to RUN, we take every ASRU which is in the degraded state 530 * and resend a fault.* event for it to our remote peer, in case the peer is 531 * running in the fault manager that knows how to disable this resource. If 532 * any new resources are added to the cache during our iteration, this is no 533 * problem because our subscriptions are already proxied and so any new cases 534 * will result in a list.suspect event being transported if that is needed. 535 */ 536 static void 537 fmd_xprt_send_asru(fmd_asru_t *ap, void *arg) 538 { 539 fmd_xprt_impl_t *xip = arg; 540 nvlist_t *nvl = NULL; 541 fmd_event_t *e; 542 char *class; 543 544 (void) pthread_mutex_lock(&ap->asru_lock); 545 546 if ((ap->asru_flags & (FMD_ASRU_INTERNAL | FMD_ASRU_STATE)) == 547 FMD_ASRU_FAULTY && fmd_case_orphaned(ap->asru_case)) 548 (void) nvlist_xdup(ap->asru_event, &nvl, &fmd.d_nva); 549 550 (void) pthread_mutex_unlock(&ap->asru_lock); 551 552 if (nvl == NULL) 553 return; /* asru is internal, unusable, or not faulty */ 554 555 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 556 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 557 558 fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n", 559 class, ap->asru_name, xip->xi_id); 560 561 fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid); 562 } 563 564 void 565 fmd_xprt_event_run(fmd_xprt_impl_t *xip, nvlist_t *nvl) 566 { 567 if (!fmd_xprt_vmismatch(xip, nvl, NULL)) { 568 fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN"); 569 fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case, xip); 570 fmd_asru_hash_apply(fmd.d_asrus, fmd_xprt_send_asru, xip); 571 } 572 } 573 574 void 575 fmd_xprt_event_sub(fmd_xprt_impl_t *xip, nvlist_t *nvl) 576 { 577 char *class; 578 579 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 580 return; /* transitioned to error state */ 581 582 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0) 583 return; /* malformed protocol event */ 584 585 (void) pthread_mutex_lock(&xip->xi_lock); 586 (void) fmd_xprt_class_hash_insert(xip, &xip->xi_lsub, class); 587 (void) pthread_mutex_unlock(&xip->xi_lock); 588 589 (void) pthread_mutex_lock(&xip->xi_stats_lock); 590 xip->xi_stats->xs_subscriptions.fmds_value.ui64++; 591 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 592 } 593 594 void 595 fmd_xprt_event_unsub(fmd_xprt_impl_t *xip, nvlist_t *nvl) 596 { 597 fmd_event_t *e; 598 char *class; 599 600 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 601 return; /* transitioned to error state */ 602 603 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0) 604 return; /* malformed protocol event */ 605 606 (void) pthread_mutex_lock(&xip->xi_lock); 607 (void) fmd_xprt_class_hash_delete(xip, &xip->xi_lsub, class); 608 (void) pthread_mutex_unlock(&xip->xi_lock); 609 610 (void) pthread_mutex_lock(&xip->xi_stats_lock); 611 xip->xi_stats->xs_subscriptions.fmds_value.ui64--; 612 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 613 614 nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod, 615 "resource.fm.xprt.unsuback", xip->xi_version, class); 616 617 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 618 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 619 fmd_eventq_insert_at_time(xip->xi_queue, e); 620 } 621 622 void 623 fmd_xprt_event_unsuback(fmd_xprt_impl_t *xip, nvlist_t *nvl) 624 { 625 char *class; 626 627 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 628 return; /* transitioned to error state */ 629 630 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0) 631 return; /* malformed protocol event */ 632 633 (void) pthread_mutex_lock(&xip->xi_lock); 634 (void) fmd_xprt_class_hash_delete(xip, &xip->xi_usub, class); 635 (void) pthread_mutex_unlock(&xip->xi_lock); 636 } 637 638 void 639 fmd_xprt_event_uuclose(fmd_xprt_impl_t *xip, nvlist_t *nvl) 640 { 641 fmd_case_t *cp; 642 char *uuid; 643 644 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 645 return; /* transitioned to error state */ 646 647 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 && 648 (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) { 649 fmd_case_transition(cp, FMD_CASE_CLOSE_WAIT, FMD_CF_ISOLATED); 650 fmd_case_rele(cp); 651 } 652 } 653 654 void 655 fmd_xprt_event_error(fmd_xprt_impl_t *xip, nvlist_t *nvl) 656 { 657 char *class = "<unknown>"; 658 659 (void) pthread_mutex_lock(&xip->xi_stats_lock); 660 xip->xi_stats->xs_discarded.fmds_value.ui64++; 661 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 662 663 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 664 TRACE((FMD_DBG_XPRT, "xprt %u bad event %s\n", xip->xi_id, class)); 665 666 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR"); 667 } 668 669 void 670 fmd_xprt_event_drop(fmd_xprt_impl_t *xip, nvlist_t *nvl) 671 { 672 char *class = "<unknown>"; 673 674 (void) pthread_mutex_lock(&xip->xi_stats_lock); 675 xip->xi_stats->xs_discarded.fmds_value.ui64++; 676 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 677 678 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 679 TRACE((FMD_DBG_XPRT, "xprt %u drop event %s\n", xip->xi_id, class)); 680 681 } 682 683 fmd_xprt_t * 684 fmd_xprt_create(fmd_module_t *mp, uint_t flags, nvlist_t *auth, void *data) 685 { 686 fmd_xprt_impl_t *xip = fmd_zalloc(sizeof (fmd_xprt_impl_t), FMD_SLEEP); 687 fmd_stat_t *statv; 688 uint_t i, statc; 689 690 char buf[PATH_MAX]; 691 fmd_event_t *e; 692 nvlist_t *nvl; 693 char *s; 694 695 (void) pthread_mutex_init(&xip->xi_lock, NULL); 696 (void) pthread_cond_init(&xip->xi_cv, NULL); 697 (void) pthread_mutex_init(&xip->xi_stats_lock, NULL); 698 699 xip->xi_auth = auth; 700 xip->xi_data = data; 701 xip->xi_version = FM_RSRC_XPRT_VERSION; 702 xip->xi_flags = flags; 703 704 /* 705 * Grab fmd.d_xprt_lock to block fmd_xprt_suspend_all() and then create 706 * a transport ID and make it visible in fmd.d_xprt_ids. If transports 707 * were previously suspended, set the FMD_XPRT_DSUSPENDED flag on us to 708 * ensure that this transport will not run until fmd_xprt_resume_all(). 709 */ 710 (void) pthread_mutex_lock(&fmd.d_xprt_lock); 711 xip->xi_id = fmd_idspace_alloc(fmd.d_xprt_ids, xip); 712 713 if (fmd.d_xprt_suspend != 0) 714 xip->xi_flags |= FMD_XPRT_DSUSPENDED; 715 716 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 717 718 /* 719 * If the module has not yet finished _fmd_init(), set the ISUSPENDED 720 * bit so that fmdo_send() is not called until _fmd_init() completes. 721 */ 722 if (!(mp->mod_flags & FMD_MOD_INIT)) 723 xip->xi_flags |= FMD_XPRT_ISUSPENDED; 724 725 /* 726 * Initialize the transport statistics that we keep on behalf of fmd. 727 * These are set up using a template defined at the top of this file. 728 * We rename each statistic with a prefix ensuring its uniqueness. 729 */ 730 statc = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t); 731 statv = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP); 732 bcopy(&_fmd_xprt_stat_tmpl, statv, sizeof (_fmd_xprt_stat_tmpl)); 733 734 for (i = 0; i < statc; i++) { 735 (void) snprintf(statv[i].fmds_name, 736 sizeof (statv[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id, 737 ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name); 738 } 739 740 xip->xi_stats = (fmd_xprt_stat_t *)fmd_ustat_insert( 741 mp->mod_ustat, FMD_USTAT_NOALLOC, statc, statv, NULL); 742 743 if (xip->xi_stats == NULL) 744 fmd_panic("failed to create xi_stats (%p)\n", (void *)statv); 745 746 xip->xi_stats->xs_module.fmds_value.str = 747 fmd_strdup(mp->mod_name, FMD_SLEEP); 748 749 if (xip->xi_auth != NULL) 750 fmd_xprt_authupdate(xip); 751 752 /* 753 * Create the outbound eventq for this transport and link to its stats. 754 * If any suspend bits were set above, suspend the eventq immediately. 755 */ 756 xip->xi_queue = fmd_eventq_create(mp, &xip->xi_stats->xs_evqstat, 757 &xip->xi_stats_lock, mp->mod_stats->ms_xprtqlimit.fmds_value.ui32); 758 759 if (xip->xi_flags & FMD_XPRT_SMASK) 760 fmd_eventq_suspend(xip->xi_queue); 761 762 /* 763 * Create our subscription hashes: local subscriptions go to xi_queue, 764 * remote subscriptions are tracked only for protocol requests, and 765 * pending unsubscriptions are associated with the /dev/null eventq. 766 */ 767 fmd_xprt_class_hash_create(&xip->xi_lsub, xip->xi_queue); 768 fmd_xprt_class_hash_create(&xip->xi_rsub, NULL); 769 fmd_xprt_class_hash_create(&xip->xi_usub, fmd.d_rmod->mod_queue); 770 771 /* 772 * Determine our initial state based upon the creation flags. If we're 773 * read-only, go directly to RUN. If we're accepting a new connection, 774 * wait for a SYN. Otherwise send a SYN and wait for an ACK. 775 */ 776 if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 777 fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN"); 778 else if (flags & FMD_XPRT_ACCEPT) 779 fmd_xprt_transition(xip, _fmd_xprt_state_syn, "SYN"); 780 else 781 fmd_xprt_transition(xip, _fmd_xprt_state_ack, "ACK"); 782 783 /* 784 * If client.xprtlog is set to TRUE, create a debugging log for the 785 * events received by the transport in var/fm/fmd/xprt/. 786 */ 787 (void) fmd_conf_getprop(fmd.d_conf, "client.xprtlog", &i); 788 (void) fmd_conf_getprop(fmd.d_conf, "log.xprt", &s); 789 790 if (i) { 791 (void) snprintf(buf, sizeof (buf), "%s/%u.log", s, xip->xi_id); 792 xip->xi_log = fmd_log_open(fmd.d_rootdir, buf, FMD_LOG_XPRT); 793 } 794 795 ASSERT(fmd_module_locked(mp)); 796 fmd_list_append(&mp->mod_transports, xip); 797 798 (void) pthread_mutex_lock(&mp->mod_stats_lock); 799 mp->mod_stats->ms_xprtopen.fmds_value.ui32++; 800 (void) pthread_mutex_unlock(&mp->mod_stats_lock); 801 802 /* 803 * If this is a read-only transport, return without creating a send 804 * queue thread and setting up any connection events in our queue. 805 */ 806 if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 807 goto out; 808 809 /* 810 * Once the transport is fully initialized, create a send queue thread 811 * and start any connect events flowing to complete our initialization. 812 */ 813 if ((xip->xi_thread = fmd_thread_create(mp, 814 (fmd_thread_f *)fmd_xprt_send, xip)) == NULL) { 815 816 fmd_error(EFMD_XPRT_THR, 817 "failed to create thread for transport %u", xip->xi_id); 818 819 fmd_xprt_destroy((fmd_xprt_t *)xip); 820 (void) fmd_set_errno(EFMD_XPRT_THR); 821 return (NULL); 822 } 823 824 /* 825 * If the transport is not being opened to accept an inbound connect, 826 * start an outbound connection by enqueuing a SYN event for our peer. 827 */ 828 if (!(flags & FMD_XPRT_ACCEPT)) { 829 nvl = fmd_protocol_xprt_ctl(mp, 830 "resource.fm.xprt.syn", FM_RSRC_XPRT_VERSION); 831 832 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 833 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 834 fmd_eventq_insert_at_time(xip->xi_queue, e); 835 } 836 out: 837 fmd_dprintf(FMD_DBG_XPRT, "opened transport %u\n", xip->xi_id); 838 return ((fmd_xprt_t *)xip); 839 } 840 841 void 842 fmd_xprt_destroy(fmd_xprt_t *xp) 843 { 844 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 845 fmd_module_t *mp = xip->xi_queue->eq_mod; 846 uint_t id = xip->xi_id; 847 848 fmd_case_impl_t *cip, *nip; 849 fmd_stat_t *sp; 850 uint_t i, n; 851 852 ASSERT(fmd_module_locked(mp)); 853 fmd_list_delete(&mp->mod_transports, xip); 854 855 (void) pthread_mutex_lock(&mp->mod_stats_lock); 856 mp->mod_stats->ms_xprtopen.fmds_value.ui32--; 857 (void) pthread_mutex_unlock(&mp->mod_stats_lock); 858 859 (void) pthread_mutex_lock(&xip->xi_lock); 860 861 while (xip->xi_busy != 0) 862 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock); 863 864 /* 865 * Remove the transport from global visibility, cancel its send-side 866 * thread, join with it, and then remove the transport from module 867 * visibility. Once all this is done, destroy and free the transport. 868 */ 869 (void) fmd_idspace_free(fmd.d_xprt_ids, xip->xi_id); 870 871 if (xip->xi_thread != NULL) { 872 fmd_eventq_abort(xip->xi_queue); 873 fmd_thread_destroy(xip->xi_thread, FMD_THREAD_JOIN); 874 } 875 876 if (xip->xi_log != NULL) 877 fmd_log_rele(xip->xi_log); 878 879 /* 880 * Release every case handle in the module that was cached by this 881 * transport. This will result in these cases disappearing from the 882 * local case hash so that fmd_case_uuclose() can no longer be used. 883 */ 884 for (cip = fmd_list_next(&mp->mod_cases); cip != NULL; cip = nip) { 885 nip = fmd_list_next(cip); 886 if (cip->ci_xprt == xp) 887 fmd_case_discard((fmd_case_t *)cip); 888 } 889 890 /* 891 * Destroy every class in the various subscription hashes and remove 892 * any corresponding subscriptions from the event dispatch queue. 893 */ 894 fmd_xprt_class_hash_destroy(&xip->xi_lsub); 895 fmd_xprt_class_hash_destroy(&xip->xi_rsub); 896 fmd_xprt_class_hash_destroy(&xip->xi_usub); 897 898 /* 899 * Since our statistics are created by hand, after deleting them from 900 * the ustat hash we must manually free them and any embedded strings. 901 */ 902 fmd_ustat_delete(mp->mod_ustat, sizeof (_fmd_xprt_stat_tmpl) / 903 sizeof (fmd_stat_t), (fmd_stat_t *)&_fmd_xprt_stat_tmpl); 904 905 sp = (fmd_stat_t *)xip->xi_stats; 906 n = sizeof (fmd_xprt_stat_t) / sizeof (fmd_stat_t); 907 908 for (i = 0; i < n; i++, sp++) { 909 if (sp->fmds_type == FMD_TYPE_STRING) 910 fmd_strfree(sp->fmds_value.str); 911 } 912 913 fmd_free(xip->xi_stats, sizeof (fmd_xprt_stat_t)); 914 fmd_eventq_destroy(xip->xi_queue); 915 nvlist_free(xip->xi_auth); 916 fmd_free(xip, sizeof (fmd_xprt_impl_t)); 917 918 fmd_dprintf(FMD_DBG_XPRT, "closed transport %u\n", id); 919 } 920 921 void 922 fmd_xprt_xsuspend(fmd_xprt_t *xp, uint_t flags) 923 { 924 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 925 uint_t oflags; 926 927 ASSERT((flags & ~FMD_XPRT_SMASK) == 0); 928 (void) pthread_mutex_lock(&xip->xi_lock); 929 930 oflags = xip->xi_flags; 931 xip->xi_flags |= flags; 932 933 if (!(oflags & FMD_XPRT_SMASK) && (xip->xi_flags & FMD_XPRT_SMASK) != 0) 934 fmd_eventq_suspend(xip->xi_queue); 935 936 (void) pthread_cond_broadcast(&xip->xi_cv); 937 938 while (xip->xi_busy != 0) 939 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock); 940 941 (void) pthread_mutex_unlock(&xip->xi_lock); 942 } 943 944 void 945 fmd_xprt_xresume(fmd_xprt_t *xp, uint_t flags) 946 { 947 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 948 uint_t oflags; 949 950 ASSERT((flags & ~FMD_XPRT_SMASK) == 0); 951 (void) pthread_mutex_lock(&xip->xi_lock); 952 953 oflags = xip->xi_flags; 954 xip->xi_flags &= ~flags; 955 956 if ((oflags & FMD_XPRT_SMASK) != 0 && !(xip->xi_flags & FMD_XPRT_SMASK)) 957 fmd_eventq_resume(xip->xi_queue); 958 959 (void) pthread_cond_broadcast(&xip->xi_cv); 960 (void) pthread_mutex_unlock(&xip->xi_lock); 961 } 962 963 void 964 fmd_xprt_send(fmd_xprt_t *xp) 965 { 966 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 967 fmd_module_t *mp = xip->xi_queue->eq_mod; 968 fmd_event_t *ep; 969 int err; 970 971 while ((ep = fmd_eventq_delete(xip->xi_queue)) != NULL) { 972 if (FMD_EVENT_TTL(ep) == 0) { 973 fmd_event_rele(ep); 974 continue; 975 } 976 977 fmd_dprintf(FMD_DBG_XPRT, "xprt %u sending %s\n", 978 xip->xi_id, (char *)FMD_EVENT_DATA(ep)); 979 980 err = mp->mod_ops->mop_transport(mp, xp, ep); 981 fmd_eventq_done(xip->xi_queue); 982 983 if (err == FMD_SEND_RETRY) { 984 fmd_eventq_insert_at_time(xip->xi_queue, ep); 985 (void) pthread_mutex_lock(&xip->xi_stats_lock); 986 xip->xi_stats->xs_retried.fmds_value.ui64++; 987 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 988 } 989 990 if (err != FMD_SEND_SUCCESS && err != FMD_SEND_RETRY) { 991 (void) pthread_mutex_lock(&xip->xi_stats_lock); 992 xip->xi_stats->xs_lost.fmds_value.ui64++; 993 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 994 } 995 996 fmd_event_rele(ep); 997 } 998 } 999 1000 void 1001 fmd_xprt_recv(fmd_xprt_t *xp, nvlist_t *nvl, hrtime_t hrt) 1002 { 1003 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1004 const fmd_xprt_rule_t *xrp; 1005 fmd_t *dp = &fmd; 1006 1007 fmd_event_t *e; 1008 char *class, *uuid, *code; 1009 int isproto; 1010 1011 uint64_t *tod; 1012 uint8_t ttl; 1013 uint_t n; 1014 1015 /* 1016 * Grab the transport lock and set the busy flag to indicate we are 1017 * busy receiving an event. If [DI]SUSPEND is pending, wait until fmd 1018 * resumes the transport before continuing on with the receive. 1019 */ 1020 (void) pthread_mutex_lock(&xip->xi_lock); 1021 1022 while (xip->xi_flags & (FMD_XPRT_DSUSPENDED | FMD_XPRT_ISUSPENDED)) { 1023 1024 if (fmd.d_signal != 0) { 1025 (void) pthread_mutex_unlock(&xip->xi_lock); 1026 return; /* fmd_destroy() is in progress */ 1027 } 1028 1029 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock); 1030 } 1031 1032 xip->xi_busy++; 1033 ASSERT(xip->xi_busy != 0); 1034 1035 (void) pthread_mutex_unlock(&xip->xi_lock); 1036 1037 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1038 xip->xi_stats->xs_received.fmds_value.ui64++; 1039 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1040 1041 if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) { 1042 fmd_error(EFMD_XPRT_PAYLOAD, "discarding nvlist %p: missing " 1043 "required \"%s\" payload element", (void *)nvl, FM_CLASS); 1044 1045 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1046 xip->xi_stats->xs_discarded.fmds_value.ui64++; 1047 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1048 1049 nvlist_free(nvl); 1050 goto done; 1051 } 1052 1053 fmd_dprintf(FMD_DBG_XPRT, "xprt %u posting %s\n", xip->xi_id, class); 1054 1055 /* 1056 * If a time-to-live value is present in the event and is zero, drop 1057 * the event and bump xs_timeouts. Otherwise decrement the TTL value. 1058 */ 1059 if (nvlist_lookup_uint8(nvl, FMD_EVN_TTL, &ttl) == 0) { 1060 if (ttl == 0) { 1061 fmd_dprintf(FMD_DBG_XPRT, "xprt %u nvlist %p (%s) " 1062 "timeout: event received with ttl=0\n", 1063 xip->xi_id, (void *)nvl, class); 1064 1065 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1066 xip->xi_stats->xs_timeouts.fmds_value.ui64++; 1067 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1068 1069 nvlist_free(nvl); 1070 goto done; 1071 } 1072 (void) nvlist_remove(nvl, FMD_EVN_TTL, DATA_TYPE_UINT8); 1073 (void) nvlist_add_uint8(nvl, FMD_EVN_TTL, ttl - 1); 1074 } 1075 1076 /* 1077 * If we are using the native system clock, the underlying transport 1078 * code can provide a tighter event time bound by telling us when the 1079 * event was enqueued. If we're using simulated clocks, this time 1080 * has no meaning to us, so just reset the value to use HRT_NOW. 1081 */ 1082 if (dp->d_clockops != &fmd_timeops_native) 1083 hrt = FMD_HRT_NOW; 1084 1085 /* 1086 * If an event's class is in the FMD_CTL_CLASS family, then create a 1087 * control event. If a FMD_EVN_TOD member is found, create a protocol 1088 * event using this time. Otherwise create a protocol event using hrt. 1089 */ 1090 if ((isproto = strncmp(class, FMD_CTL_CLASS, FMD_CTL_CLASS_LEN)) == 0) 1091 e = fmd_event_create(FMD_EVT_CTL, hrt, nvl, fmd_ctl_init(nvl)); 1092 else if (nvlist_lookup_uint64_array(nvl, FMD_EVN_TOD, &tod, &n) != 0) 1093 e = fmd_event_create(FMD_EVT_PROTOCOL, hrt, nvl, class); 1094 else { 1095 e = fmd_event_recreate(FMD_EVT_PROTOCOL, 1096 NULL, nvl, class, NULL, 0, 0); 1097 } 1098 1099 /* 1100 * If the debug log is enabled, create a temporary event, log it to the 1101 * debug log, and then reset the underlying state of the event. 1102 */ 1103 if (xip->xi_log != NULL) { 1104 fmd_event_impl_t *ep = (fmd_event_impl_t *)e; 1105 1106 fmd_log_append(xip->xi_log, e, NULL); 1107 1108 ep->ev_flags |= FMD_EVF_VOLATILE; 1109 ep->ev_off = 0; 1110 ep->ev_len = 0; 1111 1112 if (ep->ev_log != NULL) { 1113 fmd_log_rele(ep->ev_log); 1114 ep->ev_log = NULL; 1115 } 1116 } 1117 1118 /* 1119 * Iterate over the rules for the current state trying to match the 1120 * event class to one of our special rules. If a rule is matched, the 1121 * event is consumed and not dispatched to other modules. If the rule 1122 * set ends without matching an event, we fall through to dispatching. 1123 */ 1124 for (xrp = xip->xi_state; xrp->xr_class != NULL; xrp++) { 1125 if (fmd_event_match(e, FMD_EVT_PROTOCOL, xrp->xr_class)) { 1126 fmd_event_hold(e); 1127 xrp->xr_func(xip, nvl); 1128 fmd_event_rele(e); 1129 goto done; 1130 } 1131 } 1132 1133 /* 1134 * Record the event in the errlog if it is an ereport. This code will 1135 * be replaced later with a per-transport intent log instead. 1136 */ 1137 if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_EREPORT_CLASS ".*")) { 1138 (void) pthread_rwlock_rdlock(&dp->d_log_lock); 1139 fmd_log_append(dp->d_errlog, e, NULL); 1140 (void) pthread_rwlock_unlock(&dp->d_log_lock); 1141 } 1142 1143 /* 1144 * If a list.suspect event is received, create a case for the specified 1145 * UUID in the case hash, with the transport module as its owner. If 1146 * the UUID is already known, fmd_case_recreate() will return NULL and 1147 * we simply proceed to our normal event handling regardless. 1148 */ 1149 if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_SUSPECT_CLASS) && 1150 nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 && 1151 nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code) == 0) { 1152 fmd_module_lock(xip->xi_queue->eq_mod); 1153 (void) fmd_case_recreate(xip->xi_queue->eq_mod, 1154 xp, FMD_CASE_SOLVED, uuid, code); 1155 fmd_module_unlock(xip->xi_queue->eq_mod); 1156 } 1157 1158 if (isproto) 1159 fmd_dispq_dispatch(dp->d_disp, e, class); 1160 else 1161 fmd_modhash_dispatch(dp->d_mod_hash, e); 1162 done: 1163 (void) pthread_mutex_lock(&xip->xi_lock); 1164 1165 ASSERT(xip->xi_busy != 0); 1166 xip->xi_busy--; 1167 1168 (void) pthread_cond_broadcast(&xip->xi_cv); 1169 (void) pthread_mutex_unlock(&xip->xi_lock); 1170 } 1171 1172 void 1173 fmd_xprt_uuclose(fmd_xprt_t *xp, const char *uuid) 1174 { 1175 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1176 1177 fmd_event_t *e; 1178 nvlist_t *nvl; 1179 char *s; 1180 1181 fmd_dprintf(FMD_DBG_XPRT, 1182 "xprt %u closing case %s\n", xip->xi_id, uuid); 1183 1184 nvl = fmd_protocol_xprt_uuclose(xip->xi_queue->eq_mod, 1185 "resource.fm.xprt.uuclose", xip->xi_version, uuid); 1186 1187 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 1188 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 1189 fmd_eventq_insert_at_time(xip->xi_queue, e); 1190 } 1191 1192 /* 1193 * Insert the specified class into our remote subscription hash. If the class 1194 * is already present, bump the reference count; otherwise add it to the hash 1195 * and then enqueue an event for our remote peer to proxy our subscription. 1196 */ 1197 void 1198 fmd_xprt_subscribe(fmd_xprt_t *xp, const char *class) 1199 { 1200 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1201 1202 uint_t refs; 1203 nvlist_t *nvl; 1204 fmd_event_t *e; 1205 char *s; 1206 1207 if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 1208 return; /* read-only transports do not proxy subscriptions */ 1209 1210 if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER)) 1211 return; /* transport is not yet an active subscriber */ 1212 1213 (void) pthread_mutex_lock(&xip->xi_lock); 1214 refs = fmd_xprt_class_hash_insert(xip, &xip->xi_rsub, class); 1215 (void) pthread_mutex_unlock(&xip->xi_lock); 1216 1217 if (refs > 1) 1218 return; /* we've already asked our peer for this subscription */ 1219 1220 fmd_dprintf(FMD_DBG_XPRT, 1221 "xprt %u subscribing to %s\n", xip->xi_id, class); 1222 1223 nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod, 1224 "resource.fm.xprt.subscribe", xip->xi_version, class); 1225 1226 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 1227 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 1228 fmd_eventq_insert_at_time(xip->xi_queue, e); 1229 } 1230 1231 /* 1232 * Delete the specified class from the remote subscription hash. If the 1233 * reference count drops to zero, ask our remote peer to unsubscribe by proxy. 1234 */ 1235 void 1236 fmd_xprt_unsubscribe(fmd_xprt_t *xp, const char *class) 1237 { 1238 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1239 1240 uint_t refs; 1241 nvlist_t *nvl; 1242 fmd_event_t *e; 1243 char *s; 1244 1245 if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 1246 return; /* read-only transports do not proxy subscriptions */ 1247 1248 if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER)) 1249 return; /* transport is not yet an active subscriber */ 1250 1251 /* 1252 * If the subscription reference count drops to zero in xi_rsub, insert 1253 * an entry into the xi_usub hash indicating we await an unsuback event. 1254 */ 1255 (void) pthread_mutex_lock(&xip->xi_lock); 1256 1257 if ((refs = fmd_xprt_class_hash_delete(xip, &xip->xi_rsub, class)) == 0) 1258 (void) fmd_xprt_class_hash_insert(xip, &xip->xi_usub, class); 1259 1260 (void) pthread_mutex_unlock(&xip->xi_lock); 1261 1262 if (refs != 0) 1263 return; /* other subscriptions for this class still active */ 1264 1265 fmd_dprintf(FMD_DBG_XPRT, 1266 "xprt %u unsubscribing from %s\n", xip->xi_id, class); 1267 1268 nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod, 1269 "resource.fm.xprt.unsubscribe", xip->xi_version, class); 1270 1271 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 1272 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 1273 fmd_eventq_insert_at_time(xip->xi_queue, e); 1274 } 1275 1276 static void 1277 fmd_xprt_subscribe_xid(fmd_idspace_t *ids, id_t id, void *class) 1278 { 1279 fmd_xprt_t *xp; 1280 1281 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1282 fmd_xprt_subscribe(xp, class); 1283 fmd_idspace_rele(ids, id); 1284 } 1285 } 1286 1287 void 1288 fmd_xprt_subscribe_all(const char *class) 1289 { 1290 fmd_idspace_t *ids = fmd.d_xprt_ids; 1291 1292 if (ids->ids_count != 0) 1293 fmd_idspace_apply(ids, fmd_xprt_subscribe_xid, (void *)class); 1294 } 1295 1296 static void 1297 fmd_xprt_unsubscribe_xid(fmd_idspace_t *ids, id_t id, void *class) 1298 { 1299 fmd_xprt_t *xp; 1300 1301 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1302 fmd_xprt_unsubscribe(xp, class); 1303 fmd_idspace_rele(ids, id); 1304 } 1305 } 1306 1307 void 1308 fmd_xprt_unsubscribe_all(const char *class) 1309 { 1310 fmd_idspace_t *ids = fmd.d_xprt_ids; 1311 1312 if (ids->ids_count != 0) 1313 fmd_idspace_apply(ids, fmd_xprt_unsubscribe_xid, (void *)class); 1314 } 1315 1316 /*ARGSUSED*/ 1317 static void 1318 fmd_xprt_suspend_xid(fmd_idspace_t *ids, id_t id, void *arg) 1319 { 1320 fmd_xprt_t *xp; 1321 1322 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1323 fmd_xprt_xsuspend(xp, FMD_XPRT_DSUSPENDED); 1324 fmd_idspace_rele(ids, id); 1325 } 1326 } 1327 1328 void 1329 fmd_xprt_suspend_all(void) 1330 { 1331 fmd_idspace_t *ids = fmd.d_xprt_ids; 1332 1333 (void) pthread_mutex_lock(&fmd.d_xprt_lock); 1334 1335 if (fmd.d_xprt_suspend++ != 0) { 1336 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1337 return; /* already suspended */ 1338 } 1339 1340 if (ids->ids_count != 0) 1341 fmd_idspace_apply(ids, fmd_xprt_suspend_xid, NULL); 1342 1343 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1344 } 1345 1346 /*ARGSUSED*/ 1347 static void 1348 fmd_xprt_resume_xid(fmd_idspace_t *ids, id_t id, void *arg) 1349 { 1350 fmd_xprt_t *xp; 1351 1352 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1353 fmd_xprt_xresume(xp, FMD_XPRT_DSUSPENDED); 1354 fmd_idspace_rele(ids, id); 1355 } 1356 } 1357 1358 void 1359 fmd_xprt_resume_all(void) 1360 { 1361 fmd_idspace_t *ids = fmd.d_xprt_ids; 1362 1363 (void) pthread_mutex_lock(&fmd.d_xprt_lock); 1364 1365 if (fmd.d_xprt_suspend == 0) 1366 fmd_panic("fmd_xprt_suspend/resume_all mismatch\n"); 1367 1368 if (--fmd.d_xprt_suspend != 0) { 1369 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1370 return; /* not ready to be resumed */ 1371 } 1372 1373 if (ids->ids_count != 0) 1374 fmd_idspace_apply(ids, fmd_xprt_resume_xid, NULL); 1375 1376 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1377 } 1378