1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 /* 28 * FMD Transport Subsystem 29 * 30 * A transport module uses some underlying mechanism to transport events. 31 * This mechanism may use any underlying link-layer protocol and may support 32 * additional link-layer packets unrelated to FMA. Some appropriate link- 33 * layer mechanism to create the underlying connection is expected to be 34 * called prior to calling fmd_xprt_open() itself. Alternatively, a transport 35 * may be created in the suspended state by specifying the FMD_XPRT_SUSPENDED 36 * flag as part of the call to fmd_xprt_open(), and then may be resumed later. 37 * The underlying transport mechanism is *required* to provide ordering: that 38 * is, the sequences of bytes written across the transport must be read by 39 * the remote peer in the order that they are written, even across separate 40 * calls to fmdo_send(). As an example, the Internet TCP protocol would be 41 * a valid transport as it guarantees ordering, whereas the Internet UDP 42 * protocol would not because UDP datagrams may be delivered in any order 43 * as a result of delays introduced when datagrams pass through routers. 44 * 45 * Similar to sending events, a transport module receives events that are from 46 * its peer remote endpoint using some transport-specific mechanism that is 47 * unknown to FMD. As each event is received, the transport module is 48 * responsible for constructing a valid nvlist_t object from the data and then 49 * calling fmd_xprt_post() to post the event to the containing FMD's dispatch 50 * queue, making it available to all local modules that are not transport 51 * modules that have subscribed to the event. 52 * 53 * The following state machine is used for each transport. The initial state 54 * is either SYN, ACK, or RUN, depending on the flags specified to xprt_create. 55 * 56 * FMD_XPRT_ACCEPT !FMD_XPRT_ACCEPT 57 * | | 58 * waiting +--v--+ +--v--+ waiting 59 * for syn | SYN |--+ --+| ACK | for ack 60 * event +-----+ \ / +-----+ event 61 * | \ / | 62 * drop all +--v--+ X +--v--+ send subscriptions, 63 * events | ERR |<---+ +--->| SUB | recv subscriptions, 64 * +-----+ +-----+ wait for run event 65 * ^ | 66 * | +-----+ | 67 * +-----| RUN |<----+ 68 * +--^--+ 69 * | 70 * FMD_XPRT_RDONLY 71 * 72 * When fmd_xprt_open() is called without FMD_XPRT_ACCEPT, the Common Transport 73 * Layer enqueues a "syn" event for the module in its event queue and sets the 74 * state to ACK. In state ACK, we are waiting for the transport to get an 75 * "ack" event and call fmd_xprt_post() on this event. Other events will be 76 * discarded. If an "ack" is received, we transition to state SUB. If a 77 * configurable timeout occurs or if the "ack" is invalid (e.g. invalid version 78 * exchange), we transition to state ERR. Once in state ERR, no further 79 * operations are valid except fmd_xprt_close() and fmd_xprt_error() will 80 * return a non-zero value to the caller indicating the transport has failed. 81 * 82 * When fmd_xprt_open() is called with FMD_XPRT_ACCEPT, the Common Transport 83 * Layer assumes this transport is being used to accept a virtual connection 84 * from a remote peer that is sending a "syn", and sets the initial state to 85 * SYN. In this state, the transport waits for a "syn" event, validates it, 86 * and then transitions to state SUB if it is valid or state ERR if it is not. 87 * 88 * Once in state SUB, the transport module is expected to receive a sequence of 89 * zero or more "subscribe" events from the remote peer, followed by a "run" 90 * event. Once in state RUN, the transport is active and any events can be 91 * sent or received. The transport module is free to call fmd_xprt_close() 92 * from any state. The fmd_xprt_error() function will return zero if the 93 * transport is not in the ERR state, or non-zero if it is in the ERR state. 94 * 95 * Once the state machine reaches RUN, other FMA protocol events can be sent 96 * and received across the transport in addition to the various control events. 97 * 98 * Table of Common Transport Layer Control Events 99 * ============================================== 100 * 101 * FMA Class Payload 102 * --------- ------- 103 * resource.fm.xprt.uuclose string (uuid of case) 104 * resource.fm.xprt.subscribe string (class pattern) 105 * resource.fm.xprt.unsubscribe string (class pattern) 106 * resource.fm.xprt.unsuback string (class pattern) 107 * resource.fm.xprt.syn version information 108 * resource.fm.xprt.ack version information 109 * resource.fm.xprt.run version information 110 * 111 * Control events are used to add and delete proxy subscriptions on the remote 112 * transport peer module, and to set up connections. When a "syn" event is 113 * sent, FMD will include in the payload the highest version of the FMA event 114 * protocol that is supported by the sender. When a "syn" event is received, 115 * the receiving FMD will use the minimum of this version and its version of 116 * the protocol, and reply with this new minimum version in the "ack" event. 117 * The receiver will then use this new minimum for subsequent event semantics. 118 */ 119 120 #include <sys/fm/protocol.h> 121 #include <strings.h> 122 #include <limits.h> 123 124 #include <fmd_alloc.h> 125 #include <fmd_error.h> 126 #include <fmd_conf.h> 127 #include <fmd_subr.h> 128 #include <fmd_string.h> 129 #include <fmd_protocol.h> 130 #include <fmd_thread.h> 131 #include <fmd_eventq.h> 132 #include <fmd_dispq.h> 133 #include <fmd_ctl.h> 134 #include <fmd_log.h> 135 #include <fmd_ustat.h> 136 #include <fmd_case.h> 137 #include <fmd_api.h> 138 #include <fmd_fmri.h> 139 #include <fmd_asru.h> 140 #include <fmd_xprt.h> 141 142 #include <fmd.h> 143 144 /* 145 * The states shown above in the transport state machine diagram are encoded 146 * using arrays of class patterns and a corresponding action function. These 147 * arrays are then passed to fmd_xprt_transition() to change transport states. 148 */ 149 150 const fmd_xprt_rule_t _fmd_xprt_state_syn[] = { 151 { "resource.fm.xprt.syn", fmd_xprt_event_syn }, 152 { "*", fmd_xprt_event_error }, 153 { NULL, NULL } 154 }; 155 156 const fmd_xprt_rule_t _fmd_xprt_state_ack[] = { 157 { "resource.fm.xprt.ack", fmd_xprt_event_ack }, 158 { "*", fmd_xprt_event_error }, 159 }; 160 161 const fmd_xprt_rule_t _fmd_xprt_state_err[] = { 162 { "*", fmd_xprt_event_drop }, 163 { NULL, NULL } 164 }; 165 166 const fmd_xprt_rule_t _fmd_xprt_state_sub[] = { 167 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub }, 168 { "resource.fm.xprt.run", fmd_xprt_event_run }, 169 { "resource.fm.xprt.*", fmd_xprt_event_error }, 170 { "*", fmd_xprt_event_drop }, 171 { NULL, NULL } 172 }; 173 174 const fmd_xprt_rule_t _fmd_xprt_state_run[] = { 175 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub }, 176 { "resource.fm.xprt.unsubscribe", fmd_xprt_event_unsub }, 177 { "resource.fm.xprt.unsuback", fmd_xprt_event_unsuback }, 178 { "resource.fm.xprt.uuclose", fmd_xprt_event_uuclose }, 179 { "resource.fm.xprt.*", fmd_xprt_event_error }, 180 { NULL, NULL } 181 }; 182 183 /* 184 * Template for per-transport statistics installed by fmd on behalf of each 185 * transport. These are used to initialize the per-transport xi_stats. For 186 * each statistic, the name is prepended with "fmd.xprt.%u", where %u is the 187 * transport ID (xi_id) and then are inserted into the per-module stats hash. 188 * The values in this array must match fmd_xprt_stat_t from <fmd_xprt.h>. 189 */ 190 static const fmd_xprt_stat_t _fmd_xprt_stat_tmpl = { 191 { 192 { "dispatched", FMD_TYPE_UINT64, "total events dispatched to transport" }, 193 { "dequeued", FMD_TYPE_UINT64, "total events dequeued by transport" }, 194 { "prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by transport" }, 195 { "dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" }, 196 { "wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" }, 197 { "wtime", FMD_TYPE_TIME, "total wait time on queue" }, 198 { "wlentime", FMD_TYPE_TIME, "total wait length * time product" }, 199 { "wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" }, 200 { "dtime", FMD_TYPE_TIME, "total processing time after dequeue" }, 201 { "dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" }, 202 }, 203 { "module", FMD_TYPE_STRING, "module that owns this transport" }, 204 { "authority", FMD_TYPE_STRING, "authority associated with this transport" }, 205 { "state", FMD_TYPE_STRING, "current transport state" }, 206 { "received", FMD_TYPE_UINT64, "events received by transport" }, 207 { "discarded", FMD_TYPE_UINT64, "bad events discarded by transport" }, 208 { "retried", FMD_TYPE_UINT64, "retries requested of transport" }, 209 { "replayed", FMD_TYPE_UINT64, "events replayed by transport" }, 210 { "lost", FMD_TYPE_UINT64, "events lost by transport" }, 211 { "timeouts", FMD_TYPE_UINT64, "events received by transport with ttl=0" }, 212 { "subscriptions", FMD_TYPE_UINT64, "subscriptions registered to transport" }, 213 }; 214 215 static void 216 fmd_xprt_class_hash_create(fmd_xprt_class_hash_t *xch, fmd_eventq_t *eq) 217 { 218 uint_t hashlen = fmd.d_str_buckets; 219 220 xch->xch_queue = eq; 221 xch->xch_hashlen = hashlen; 222 xch->xch_hash = fmd_zalloc(sizeof (void *) * hashlen, FMD_SLEEP); 223 } 224 225 static void 226 fmd_xprt_class_hash_destroy(fmd_xprt_class_hash_t *xch) 227 { 228 fmd_eventq_t *eq = xch->xch_queue; 229 fmd_xprt_class_t *xcp, *ncp; 230 uint_t i; 231 232 for (i = 0; i < xch->xch_hashlen; i++) { 233 for (xcp = xch->xch_hash[i]; xcp != NULL; xcp = ncp) { 234 ncp = xcp->xc_next; 235 236 if (eq != NULL) 237 fmd_dispq_delete(fmd.d_disp, eq, xcp->xc_class); 238 239 fmd_strfree(xcp->xc_class); 240 fmd_free(xcp, sizeof (fmd_xprt_class_t)); 241 } 242 } 243 244 fmd_free(xch->xch_hash, sizeof (void *) * xch->xch_hashlen); 245 } 246 247 /* 248 * Insert the specified class into the specified class hash, and return the 249 * reference count. A return value of one indicates this is the first insert. 250 * If an eventq is associated with the hash, insert a dispq subscription for it. 251 */ 252 static uint_t 253 fmd_xprt_class_hash_insert(fmd_xprt_impl_t *xip, 254 fmd_xprt_class_hash_t *xch, const char *class) 255 { 256 uint_t h = fmd_strhash(class) % xch->xch_hashlen; 257 fmd_xprt_class_t *xcp; 258 259 ASSERT(MUTEX_HELD(&xip->xi_lock)); 260 261 for (xcp = xch->xch_hash[h]; xcp != NULL; xcp = xcp->xc_next) { 262 if (strcmp(class, xcp->xc_class) == 0) 263 return (++xcp->xc_refs); 264 } 265 266 xcp = fmd_alloc(sizeof (fmd_xprt_class_t), FMD_SLEEP); 267 xcp->xc_class = fmd_strdup(class, FMD_SLEEP); 268 xcp->xc_next = xch->xch_hash[h]; 269 xcp->xc_refs = 1; 270 xch->xch_hash[h] = xcp; 271 272 if (xch->xch_queue != NULL) 273 fmd_dispq_insert(fmd.d_disp, xch->xch_queue, class); 274 275 return (xcp->xc_refs); 276 } 277 278 /* 279 * Delete the specified class from the specified class hash, and return the 280 * reference count. A return value of zero indicates the class was deleted. 281 * If an eventq is associated with the hash, delete the dispq subscription. 282 */ 283 static uint_t 284 fmd_xprt_class_hash_delete(fmd_xprt_impl_t *xip, 285 fmd_xprt_class_hash_t *xch, const char *class) 286 { 287 uint_t h = fmd_strhash(class) % xch->xch_hashlen; 288 fmd_xprt_class_t *xcp, **pp; 289 290 ASSERT(MUTEX_HELD(&xip->xi_lock)); 291 pp = &xch->xch_hash[h]; 292 293 for (xcp = *pp; xcp != NULL; xcp = xcp->xc_next) { 294 if (strcmp(class, xcp->xc_class) == 0) 295 break; 296 else 297 pp = &xcp->xc_next; 298 } 299 300 if (xcp == NULL) 301 return (-1U); /* explicitly permit an invalid delete */ 302 303 if (--xcp->xc_refs != 0) 304 return (xcp->xc_refs); 305 306 ASSERT(xcp->xc_refs == 0); 307 *pp = xcp->xc_next; 308 309 fmd_strfree(xcp->xc_class); 310 fmd_free(xcp, sizeof (fmd_xprt_class_t)); 311 312 if (xch->xch_queue != NULL) 313 fmd_dispq_delete(fmd.d_disp, xch->xch_queue, class); 314 315 return (0); 316 } 317 318 /* 319 * Queue subscribe events for the specified transport corresponding to all of 320 * the active module subscriptions. This is an extremely heavyweight operation 321 * that we expect to take place rarely (i.e. when loading a transport module 322 * or when it establishes a connection). We lock all of the known modules to 323 * prevent them from adding or deleting subscriptions, then snapshot their 324 * subscriptions, and then unlock all of the modules. We hold the modhash 325 * lock for the duration of this operation to prevent new modules from loading. 326 */ 327 static void 328 fmd_xprt_subscribe_modhash(fmd_xprt_impl_t *xip, fmd_modhash_t *mhp) 329 { 330 fmd_xprt_t *xp = (fmd_xprt_t *)xip; 331 const fmd_conf_path_t *pap; 332 fmd_module_t *mp; 333 uint_t i, j; 334 335 (void) pthread_rwlock_rdlock(&mhp->mh_lock); 336 337 for (i = 0; i < mhp->mh_hashlen; i++) { 338 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) 339 fmd_module_lock(mp); 340 } 341 342 (void) pthread_mutex_lock(&xip->xi_lock); 343 ASSERT(!(xip->xi_flags & FMD_XPRT_SUBSCRIBER)); 344 xip->xi_flags |= FMD_XPRT_SUBSCRIBER; 345 (void) pthread_mutex_unlock(&xip->xi_lock); 346 347 for (i = 0; i < mhp->mh_hashlen; i++) { 348 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) { 349 (void) fmd_conf_getprop(mp->mod_conf, 350 FMD_PROP_SUBSCRIPTIONS, &pap); 351 for (j = 0; j < pap->cpa_argc; j++) 352 fmd_xprt_subscribe(xp, pap->cpa_argv[j]); 353 } 354 } 355 356 for (i = 0; i < mhp->mh_hashlen; i++) { 357 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) 358 fmd_module_unlock(mp); 359 } 360 361 (void) pthread_rwlock_unlock(&mhp->mh_lock); 362 } 363 364 static void 365 fmd_xprt_transition(fmd_xprt_impl_t *xip, 366 const fmd_xprt_rule_t *state, const char *tag) 367 { 368 fmd_event_t *e; 369 nvlist_t *nvl; 370 char *s; 371 372 TRACE((FMD_DBG_XPRT, "xprt %u -> %s\n", xip->xi_id, tag)); 373 374 xip->xi_state = state; 375 s = fmd_strdup(tag, FMD_SLEEP); 376 377 (void) pthread_mutex_lock(&xip->xi_stats_lock); 378 fmd_strfree(xip->xi_stats->xs_state.fmds_value.str); 379 xip->xi_stats->xs_state.fmds_value.str = s; 380 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 381 382 /* 383 * If we've reached the SUB state, take out the big hammer and snapshot 384 * all of the subscriptions of all of the loaded modules. Then queue a 385 * run event for our remote peer indicating that it can enter RUN. 386 */ 387 if (state == _fmd_xprt_state_sub) { 388 fmd_xprt_subscribe_modhash(xip, fmd.d_mod_hash); 389 390 nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod, 391 "resource.fm.xprt.run", xip->xi_version); 392 393 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 394 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 395 fmd_eventq_insert_at_time(xip->xi_queue, e); 396 } 397 } 398 399 static void 400 fmd_xprt_authupdate(fmd_xprt_impl_t *xip) 401 { 402 char *s = fmd_fmri_auth2str(xip->xi_auth); 403 404 (void) pthread_mutex_lock(&xip->xi_stats_lock); 405 fmd_strfree(xip->xi_stats->xs_authority.fmds_value.str); 406 xip->xi_stats->xs_authority.fmds_value.str = s; 407 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 408 } 409 410 static int 411 fmd_xprt_vmismatch(fmd_xprt_impl_t *xip, nvlist_t *nvl, uint_t *rversionp) 412 { 413 uint8_t rversion; 414 415 if (nvlist_lookup_uint8(nvl, FM_VERSION, &rversion) != 0) { 416 (void) pthread_mutex_lock(&xip->xi_stats_lock); 417 xip->xi_stats->xs_discarded.fmds_value.ui64++; 418 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 419 420 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR"); 421 return (1); 422 } 423 424 if (rversion > xip->xi_version) { 425 fmd_dprintf(FMD_DBG_XPRT, "xprt %u protocol mismatch: %u>%u\n", 426 xip->xi_id, rversion, xip->xi_version); 427 428 (void) pthread_mutex_lock(&xip->xi_stats_lock); 429 xip->xi_stats->xs_discarded.fmds_value.ui64++; 430 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 431 432 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR"); 433 return (1); 434 } 435 436 if (rversionp != NULL) 437 *rversionp = rversion; 438 439 return (0); 440 } 441 442 void 443 fmd_xprt_event_syn(fmd_xprt_impl_t *xip, nvlist_t *nvl) 444 { 445 fmd_event_t *e; 446 uint_t vers; 447 char *class; 448 449 if (fmd_xprt_vmismatch(xip, nvl, &vers)) 450 return; /* transitioned to error state */ 451 452 /* 453 * If the transport module didn't specify an authority, extract the 454 * one that is passed along with the xprt.syn event and use that. 455 */ 456 if (xip->xi_auth == NULL && 457 nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 && 458 nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) { 459 (void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva); 460 fmd_xprt_authupdate(xip); 461 } 462 463 nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod, 464 "resource.fm.xprt.ack", xip->xi_version); 465 466 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 467 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 468 fmd_eventq_insert_at_time(xip->xi_queue, e); 469 470 xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers); 471 fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB"); 472 } 473 474 void 475 fmd_xprt_event_ack(fmd_xprt_impl_t *xip, nvlist_t *nvl) 476 { 477 uint_t vers; 478 479 if (fmd_xprt_vmismatch(xip, nvl, &vers)) 480 return; /* transitioned to error state */ 481 482 /* 483 * If the transport module didn't specify an authority, extract the 484 * one that is passed along with the xprt.syn event and use that. 485 */ 486 if (xip->xi_auth == NULL && 487 nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 && 488 nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) { 489 (void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva); 490 fmd_xprt_authupdate(xip); 491 } 492 493 xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers); 494 fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB"); 495 } 496 497 /* 498 * Upon transition to RUN, we take every solved case and resend a list.suspect 499 * event for it to our remote peer. If a case transitions from solved to a 500 * future state (CLOSE_WAIT, CLOSED, or REPAIRED) while we are iterating over 501 * the case hash, we will get it as part of examining the resource cache, next. 502 */ 503 static void 504 fmd_xprt_send_case(fmd_case_t *cp, void *arg) 505 { 506 fmd_case_impl_t *cip = (fmd_case_impl_t *)cp; 507 fmd_xprt_impl_t *xip = arg; 508 509 fmd_event_t *e; 510 nvlist_t *nvl; 511 char *class; 512 513 if (cip->ci_state != FMD_CASE_SOLVED) 514 return; /* unsolved, or we'll get it during the ASRU pass */ 515 516 nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS); 517 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 518 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 519 520 fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n", 521 FM_LIST_SUSPECT_CLASS, cip->ci_uuid, xip->xi_id); 522 523 fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid); 524 } 525 526 /* 527 * Upon transition to RUN, we take every ASRU which is in the degraded state 528 * and resend a fault.* event for it to our remote peer, in case the peer is 529 * running in the fault manager that knows how to disable this resource. If 530 * any new resources are added to the cache during our iteration, this is no 531 * problem because our subscriptions are already proxied and so any new cases 532 * will result in a list.suspect event being transported if that is needed. 533 */ 534 static void 535 fmd_xprt_send_asru(fmd_asru_t *ap, void *arg) 536 { 537 fmd_xprt_impl_t *xip = arg; 538 nvlist_t *nvl = NULL; 539 fmd_event_t *e; 540 char *class; 541 542 (void) pthread_mutex_lock(&ap->asru_lock); 543 544 if ((ap->asru_flags & (FMD_ASRU_INTERNAL | FMD_ASRU_STATE)) == 545 FMD_ASRU_FAULTY && fmd_case_orphaned(ap->asru_case)) 546 (void) nvlist_xdup(ap->asru_event, &nvl, &fmd.d_nva); 547 548 (void) pthread_mutex_unlock(&ap->asru_lock); 549 550 if (nvl == NULL) 551 return; /* asru is internal, unusable, or not faulty */ 552 553 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 554 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 555 556 fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n", 557 class, ap->asru_name, xip->xi_id); 558 559 fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid); 560 } 561 562 void 563 fmd_xprt_event_run(fmd_xprt_impl_t *xip, nvlist_t *nvl) 564 { 565 if (!fmd_xprt_vmismatch(xip, nvl, NULL)) { 566 fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN"); 567 fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case, xip); 568 fmd_asru_hash_apply(fmd.d_asrus, fmd_xprt_send_asru, xip); 569 } 570 } 571 572 void 573 fmd_xprt_event_sub(fmd_xprt_impl_t *xip, nvlist_t *nvl) 574 { 575 char *class; 576 577 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 578 return; /* transitioned to error state */ 579 580 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0) 581 return; /* malformed protocol event */ 582 583 (void) pthread_mutex_lock(&xip->xi_lock); 584 (void) fmd_xprt_class_hash_insert(xip, &xip->xi_lsub, class); 585 (void) pthread_mutex_unlock(&xip->xi_lock); 586 587 (void) pthread_mutex_lock(&xip->xi_stats_lock); 588 xip->xi_stats->xs_subscriptions.fmds_value.ui64++; 589 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 590 } 591 592 void 593 fmd_xprt_event_unsub(fmd_xprt_impl_t *xip, nvlist_t *nvl) 594 { 595 fmd_event_t *e; 596 char *class; 597 598 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 599 return; /* transitioned to error state */ 600 601 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0) 602 return; /* malformed protocol event */ 603 604 (void) pthread_mutex_lock(&xip->xi_lock); 605 (void) fmd_xprt_class_hash_delete(xip, &xip->xi_lsub, class); 606 (void) pthread_mutex_unlock(&xip->xi_lock); 607 608 (void) pthread_mutex_lock(&xip->xi_stats_lock); 609 xip->xi_stats->xs_subscriptions.fmds_value.ui64--; 610 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 611 612 nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod, 613 "resource.fm.xprt.unsuback", xip->xi_version, class); 614 615 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 616 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 617 fmd_eventq_insert_at_time(xip->xi_queue, e); 618 } 619 620 void 621 fmd_xprt_event_unsuback(fmd_xprt_impl_t *xip, nvlist_t *nvl) 622 { 623 char *class; 624 625 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 626 return; /* transitioned to error state */ 627 628 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0) 629 return; /* malformed protocol event */ 630 631 (void) pthread_mutex_lock(&xip->xi_lock); 632 (void) fmd_xprt_class_hash_delete(xip, &xip->xi_usub, class); 633 (void) pthread_mutex_unlock(&xip->xi_lock); 634 } 635 636 void 637 fmd_xprt_event_uuclose(fmd_xprt_impl_t *xip, nvlist_t *nvl) 638 { 639 fmd_case_t *cp; 640 char *uuid; 641 642 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 643 return; /* transitioned to error state */ 644 645 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 && 646 (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) { 647 fmd_case_transition(cp, FMD_CASE_CLOSE_WAIT, FMD_CF_ISOLATED); 648 fmd_case_rele(cp); 649 } 650 } 651 652 void 653 fmd_xprt_event_error(fmd_xprt_impl_t *xip, nvlist_t *nvl) 654 { 655 char *class = "<unknown>"; 656 657 (void) pthread_mutex_lock(&xip->xi_stats_lock); 658 xip->xi_stats->xs_discarded.fmds_value.ui64++; 659 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 660 661 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 662 TRACE((FMD_DBG_XPRT, "xprt %u bad event %s\n", xip->xi_id, class)); 663 664 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR"); 665 } 666 667 void 668 fmd_xprt_event_drop(fmd_xprt_impl_t *xip, nvlist_t *nvl) 669 { 670 char *class = "<unknown>"; 671 672 (void) pthread_mutex_lock(&xip->xi_stats_lock); 673 xip->xi_stats->xs_discarded.fmds_value.ui64++; 674 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 675 676 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 677 TRACE((FMD_DBG_XPRT, "xprt %u drop event %s\n", xip->xi_id, class)); 678 679 } 680 681 fmd_xprt_t * 682 fmd_xprt_create(fmd_module_t *mp, uint_t flags, nvlist_t *auth, void *data) 683 { 684 fmd_xprt_impl_t *xip = fmd_zalloc(sizeof (fmd_xprt_impl_t), FMD_SLEEP); 685 fmd_stat_t *statv; 686 uint_t i, statc; 687 688 char buf[PATH_MAX]; 689 fmd_event_t *e; 690 nvlist_t *nvl; 691 char *s; 692 693 (void) pthread_mutex_init(&xip->xi_lock, NULL); 694 (void) pthread_cond_init(&xip->xi_cv, NULL); 695 (void) pthread_mutex_init(&xip->xi_stats_lock, NULL); 696 697 xip->xi_auth = auth; 698 xip->xi_data = data; 699 xip->xi_version = FM_RSRC_XPRT_VERSION; 700 xip->xi_flags = flags; 701 702 /* 703 * Grab fmd.d_xprt_lock to block fmd_xprt_suspend_all() and then create 704 * a transport ID and make it visible in fmd.d_xprt_ids. If transports 705 * were previously suspended, set the FMD_XPRT_DSUSPENDED flag on us to 706 * ensure that this transport will not run until fmd_xprt_resume_all(). 707 */ 708 (void) pthread_mutex_lock(&fmd.d_xprt_lock); 709 xip->xi_id = fmd_idspace_alloc(fmd.d_xprt_ids, xip); 710 711 if (fmd.d_xprt_suspend != 0) 712 xip->xi_flags |= FMD_XPRT_DSUSPENDED; 713 714 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 715 716 /* 717 * If the module has not yet finished _fmd_init(), set the ISUSPENDED 718 * bit so that fmdo_send() is not called until _fmd_init() completes. 719 */ 720 if (!(mp->mod_flags & FMD_MOD_INIT)) 721 xip->xi_flags |= FMD_XPRT_ISUSPENDED; 722 723 /* 724 * Initialize the transport statistics that we keep on behalf of fmd. 725 * These are set up using a template defined at the top of this file. 726 * We rename each statistic with a prefix ensuring its uniqueness. 727 */ 728 statc = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t); 729 statv = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP); 730 bcopy(&_fmd_xprt_stat_tmpl, statv, sizeof (_fmd_xprt_stat_tmpl)); 731 732 for (i = 0; i < statc; i++) { 733 (void) snprintf(statv[i].fmds_name, 734 sizeof (statv[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id, 735 ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name); 736 } 737 738 xip->xi_stats = (fmd_xprt_stat_t *)fmd_ustat_insert( 739 mp->mod_ustat, FMD_USTAT_NOALLOC, statc, statv, NULL); 740 741 if (xip->xi_stats == NULL) 742 fmd_panic("failed to create xi_stats (%p)\n", (void *)statv); 743 744 xip->xi_stats->xs_module.fmds_value.str = 745 fmd_strdup(mp->mod_name, FMD_SLEEP); 746 747 if (xip->xi_auth != NULL) 748 fmd_xprt_authupdate(xip); 749 750 /* 751 * Create the outbound eventq for this transport and link to its stats. 752 * If any suspend bits were set above, suspend the eventq immediately. 753 */ 754 xip->xi_queue = fmd_eventq_create(mp, &xip->xi_stats->xs_evqstat, 755 &xip->xi_stats_lock, mp->mod_stats->ms_xprtqlimit.fmds_value.ui32); 756 757 if (xip->xi_flags & FMD_XPRT_SMASK) 758 fmd_eventq_suspend(xip->xi_queue); 759 760 /* 761 * Create our subscription hashes: local subscriptions go to xi_queue, 762 * remote subscriptions are tracked only for protocol requests, and 763 * pending unsubscriptions are associated with the /dev/null eventq. 764 */ 765 fmd_xprt_class_hash_create(&xip->xi_lsub, xip->xi_queue); 766 fmd_xprt_class_hash_create(&xip->xi_rsub, NULL); 767 fmd_xprt_class_hash_create(&xip->xi_usub, fmd.d_rmod->mod_queue); 768 769 /* 770 * Determine our initial state based upon the creation flags. If we're 771 * read-only, go directly to RUN. If we're accepting a new connection, 772 * wait for a SYN. Otherwise send a SYN and wait for an ACK. 773 */ 774 if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 775 fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN"); 776 else if (flags & FMD_XPRT_ACCEPT) 777 fmd_xprt_transition(xip, _fmd_xprt_state_syn, "SYN"); 778 else 779 fmd_xprt_transition(xip, _fmd_xprt_state_ack, "ACK"); 780 781 /* 782 * If client.xprtlog is set to TRUE, create a debugging log for the 783 * events received by the transport in var/fm/fmd/xprt/. 784 */ 785 (void) fmd_conf_getprop(fmd.d_conf, "client.xprtlog", &i); 786 (void) fmd_conf_getprop(fmd.d_conf, "log.xprt", &s); 787 788 if (i) { 789 (void) snprintf(buf, sizeof (buf), "%s/%u.log", s, xip->xi_id); 790 xip->xi_log = fmd_log_open(fmd.d_rootdir, buf, FMD_LOG_XPRT); 791 } 792 793 ASSERT(fmd_module_locked(mp)); 794 fmd_list_append(&mp->mod_transports, xip); 795 796 (void) pthread_mutex_lock(&mp->mod_stats_lock); 797 mp->mod_stats->ms_xprtopen.fmds_value.ui32++; 798 (void) pthread_mutex_unlock(&mp->mod_stats_lock); 799 800 /* 801 * If this is a read-only transport, return without creating a send 802 * queue thread and setting up any connection events in our queue. 803 */ 804 if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 805 goto out; 806 807 /* 808 * Once the transport is fully initialized, create a send queue thread 809 * and start any connect events flowing to complete our initialization. 810 */ 811 if ((xip->xi_thread = fmd_thread_create(mp, 812 (fmd_thread_f *)fmd_xprt_send, xip)) == NULL) { 813 814 fmd_error(EFMD_XPRT_THR, 815 "failed to create thread for transport %u", xip->xi_id); 816 817 fmd_xprt_destroy((fmd_xprt_t *)xip); 818 (void) fmd_set_errno(EFMD_XPRT_THR); 819 return (NULL); 820 } 821 822 /* 823 * If the transport is not being opened to accept an inbound connect, 824 * start an outbound connection by enqueuing a SYN event for our peer. 825 */ 826 if (!(flags & FMD_XPRT_ACCEPT)) { 827 nvl = fmd_protocol_xprt_ctl(mp, 828 "resource.fm.xprt.syn", FM_RSRC_XPRT_VERSION); 829 830 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 831 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 832 fmd_eventq_insert_at_time(xip->xi_queue, e); 833 } 834 out: 835 fmd_dprintf(FMD_DBG_XPRT, "opened transport %u\n", xip->xi_id); 836 return ((fmd_xprt_t *)xip); 837 } 838 839 void 840 fmd_xprt_destroy(fmd_xprt_t *xp) 841 { 842 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 843 fmd_module_t *mp = xip->xi_queue->eq_mod; 844 uint_t id = xip->xi_id; 845 846 fmd_case_impl_t *cip, *nip; 847 fmd_stat_t *sp; 848 uint_t i, n; 849 850 ASSERT(fmd_module_locked(mp)); 851 fmd_list_delete(&mp->mod_transports, xip); 852 853 (void) pthread_mutex_lock(&mp->mod_stats_lock); 854 mp->mod_stats->ms_xprtopen.fmds_value.ui32--; 855 (void) pthread_mutex_unlock(&mp->mod_stats_lock); 856 857 (void) pthread_mutex_lock(&xip->xi_lock); 858 859 while (xip->xi_busy != 0) 860 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock); 861 862 /* 863 * Remove the transport from global visibility, cancel its send-side 864 * thread, join with it, and then remove the transport from module 865 * visibility. Once all this is done, destroy and free the transport. 866 */ 867 (void) fmd_idspace_free(fmd.d_xprt_ids, xip->xi_id); 868 869 if (xip->xi_thread != NULL) { 870 fmd_eventq_abort(xip->xi_queue); 871 fmd_module_unlock(mp); 872 fmd_thread_destroy(xip->xi_thread, FMD_THREAD_JOIN); 873 fmd_module_lock(mp); 874 } 875 876 if (xip->xi_log != NULL) 877 fmd_log_rele(xip->xi_log); 878 879 /* 880 * Release every case handle in the module that was cached by this 881 * transport. This will result in these cases disappearing from the 882 * local case hash so that fmd_case_uuclose() can no longer be used. 883 */ 884 for (cip = fmd_list_next(&mp->mod_cases); cip != NULL; cip = nip) { 885 nip = fmd_list_next(cip); 886 if (cip->ci_xprt == xp) 887 fmd_case_discard((fmd_case_t *)cip); 888 } 889 890 /* 891 * Destroy every class in the various subscription hashes and remove 892 * any corresponding subscriptions from the event dispatch queue. 893 */ 894 fmd_xprt_class_hash_destroy(&xip->xi_lsub); 895 fmd_xprt_class_hash_destroy(&xip->xi_rsub); 896 fmd_xprt_class_hash_destroy(&xip->xi_usub); 897 898 /* 899 * Uniquify the stat names exactly as was done in fmd_xprt_create() 900 * before calling fmd_ustat_insert(), otherwise fmd_ustat_delete() 901 * won't find the entries in the hash table. 902 */ 903 n = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t); 904 sp = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP); 905 bcopy(&_fmd_xprt_stat_tmpl, sp, sizeof (_fmd_xprt_stat_tmpl)); 906 for (i = 0; i < n; i++) { 907 (void) snprintf(sp[i].fmds_name, 908 sizeof (sp[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id, 909 ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name); 910 } 911 fmd_ustat_delete(mp->mod_ustat, n, sp); 912 fmd_free(sp, sizeof (_fmd_xprt_stat_tmpl)); 913 914 fmd_free(xip->xi_stats, sizeof (fmd_xprt_stat_t)); 915 fmd_eventq_destroy(xip->xi_queue); 916 nvlist_free(xip->xi_auth); 917 fmd_free(xip, sizeof (fmd_xprt_impl_t)); 918 919 fmd_dprintf(FMD_DBG_XPRT, "closed transport %u\n", id); 920 } 921 922 void 923 fmd_xprt_xsuspend(fmd_xprt_t *xp, uint_t flags) 924 { 925 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 926 uint_t oflags; 927 928 ASSERT((flags & ~FMD_XPRT_SMASK) == 0); 929 (void) pthread_mutex_lock(&xip->xi_lock); 930 931 oflags = xip->xi_flags; 932 xip->xi_flags |= flags; 933 934 if (!(oflags & FMD_XPRT_SMASK) && (xip->xi_flags & FMD_XPRT_SMASK) != 0) 935 fmd_eventq_suspend(xip->xi_queue); 936 937 (void) pthread_cond_broadcast(&xip->xi_cv); 938 939 while (xip->xi_busy != 0) 940 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock); 941 942 (void) pthread_mutex_unlock(&xip->xi_lock); 943 } 944 945 void 946 fmd_xprt_xresume(fmd_xprt_t *xp, uint_t flags) 947 { 948 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 949 uint_t oflags; 950 951 ASSERT((flags & ~FMD_XPRT_SMASK) == 0); 952 (void) pthread_mutex_lock(&xip->xi_lock); 953 954 oflags = xip->xi_flags; 955 xip->xi_flags &= ~flags; 956 957 if ((oflags & FMD_XPRT_SMASK) != 0 && !(xip->xi_flags & FMD_XPRT_SMASK)) 958 fmd_eventq_resume(xip->xi_queue); 959 960 (void) pthread_cond_broadcast(&xip->xi_cv); 961 (void) pthread_mutex_unlock(&xip->xi_lock); 962 } 963 964 void 965 fmd_xprt_send(fmd_xprt_t *xp) 966 { 967 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 968 fmd_module_t *mp = xip->xi_queue->eq_mod; 969 fmd_event_t *ep; 970 int err; 971 972 while ((ep = fmd_eventq_delete(xip->xi_queue)) != NULL) { 973 if (FMD_EVENT_TTL(ep) == 0) { 974 fmd_event_rele(ep); 975 continue; 976 } 977 978 fmd_dprintf(FMD_DBG_XPRT, "xprt %u sending %s\n", 979 xip->xi_id, (char *)FMD_EVENT_DATA(ep)); 980 981 err = mp->mod_ops->mop_transport(mp, xp, ep); 982 fmd_eventq_done(xip->xi_queue); 983 984 if (err == FMD_SEND_RETRY) { 985 fmd_eventq_insert_at_time(xip->xi_queue, ep); 986 (void) pthread_mutex_lock(&xip->xi_stats_lock); 987 xip->xi_stats->xs_retried.fmds_value.ui64++; 988 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 989 } 990 991 if (err != FMD_SEND_SUCCESS && err != FMD_SEND_RETRY) { 992 (void) pthread_mutex_lock(&xip->xi_stats_lock); 993 xip->xi_stats->xs_lost.fmds_value.ui64++; 994 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 995 } 996 997 fmd_event_rele(ep); 998 } 999 } 1000 1001 void 1002 fmd_xprt_recv(fmd_xprt_t *xp, nvlist_t *nvl, hrtime_t hrt, boolean_t logonly) 1003 { 1004 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1005 const fmd_xprt_rule_t *xrp; 1006 fmd_t *dp = &fmd; 1007 1008 fmd_event_t *e; 1009 char *class, *uuid, *code; 1010 boolean_t isproto, isereport; 1011 1012 uint64_t *tod; 1013 uint8_t ttl; 1014 uint_t n; 1015 1016 /* 1017 * Grab the transport lock and set the busy flag to indicate we are 1018 * busy receiving an event. If [DI]SUSPEND is pending, wait until fmd 1019 * resumes the transport before continuing on with the receive. 1020 */ 1021 (void) pthread_mutex_lock(&xip->xi_lock); 1022 1023 while (xip->xi_flags & (FMD_XPRT_DSUSPENDED | FMD_XPRT_ISUSPENDED)) { 1024 1025 if (fmd.d_signal != 0) { 1026 (void) pthread_mutex_unlock(&xip->xi_lock); 1027 return; /* fmd_destroy() is in progress */ 1028 } 1029 1030 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock); 1031 } 1032 1033 xip->xi_busy++; 1034 ASSERT(xip->xi_busy != 0); 1035 1036 (void) pthread_mutex_unlock(&xip->xi_lock); 1037 1038 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1039 xip->xi_stats->xs_received.fmds_value.ui64++; 1040 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1041 1042 if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) { 1043 fmd_error(EFMD_XPRT_PAYLOAD, "discarding nvlist %p: missing " 1044 "required \"%s\" payload element", (void *)nvl, FM_CLASS); 1045 1046 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1047 xip->xi_stats->xs_discarded.fmds_value.ui64++; 1048 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1049 1050 nvlist_free(nvl); 1051 goto done; 1052 } 1053 1054 fmd_dprintf(FMD_DBG_XPRT, "xprt %u %s %s\n", xip->xi_id, 1055 ((logonly == FMD_B_TRUE) ? "logging" : "posting"), class); 1056 1057 isereport = (strncmp(class, FM_EREPORT_CLASS, 1058 sizeof (FM_EREPORT_CLASS - 1)) == 0) ? FMD_B_TRUE : FMD_B_FALSE; 1059 1060 /* 1061 * The logonly flag should only be set for ereports. 1062 */ 1063 if ((logonly == FMD_B_TRUE) && (isereport == FMD_B_FALSE)) { 1064 fmd_error(EFMD_XPRT_INVAL, "discarding nvlist %p: " 1065 "logonly flag is not valid for class %s", 1066 (void *)nvl, class); 1067 1068 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1069 xip->xi_stats->xs_discarded.fmds_value.ui64++; 1070 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1071 1072 nvlist_free(nvl); 1073 goto done; 1074 } 1075 1076 /* 1077 * If a time-to-live value is present in the event and is zero, drop 1078 * the event and bump xs_timeouts. Otherwise decrement the TTL value. 1079 */ 1080 if (nvlist_lookup_uint8(nvl, FMD_EVN_TTL, &ttl) == 0) { 1081 if (ttl == 0) { 1082 fmd_dprintf(FMD_DBG_XPRT, "xprt %u nvlist %p (%s) " 1083 "timeout: event received with ttl=0\n", 1084 xip->xi_id, (void *)nvl, class); 1085 1086 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1087 xip->xi_stats->xs_timeouts.fmds_value.ui64++; 1088 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1089 1090 nvlist_free(nvl); 1091 goto done; 1092 } 1093 (void) nvlist_remove(nvl, FMD_EVN_TTL, DATA_TYPE_UINT8); 1094 (void) nvlist_add_uint8(nvl, FMD_EVN_TTL, ttl - 1); 1095 } 1096 1097 /* 1098 * If we are using the native system clock, the underlying transport 1099 * code can provide a tighter event time bound by telling us when the 1100 * event was enqueued. If we're using simulated clocks, this time 1101 * has no meaning to us, so just reset the value to use HRT_NOW. 1102 */ 1103 if (dp->d_clockops != &fmd_timeops_native) 1104 hrt = FMD_HRT_NOW; 1105 1106 /* 1107 * If an event's class is in the FMD_CTL_CLASS family, then create a 1108 * control event. If a FMD_EVN_TOD member is found, create a protocol 1109 * event using this time. Otherwise create a protocol event using hrt. 1110 */ 1111 isproto = (strncmp(class, FMD_CTL_CLASS, FMD_CTL_CLASS_LEN) == 0) ? 1112 FMD_B_FALSE : FMD_B_TRUE; 1113 if (isproto == FMD_B_FALSE) 1114 e = fmd_event_create(FMD_EVT_CTL, hrt, nvl, fmd_ctl_init(nvl)); 1115 else if (nvlist_lookup_uint64_array(nvl, FMD_EVN_TOD, &tod, &n) != 0) 1116 e = fmd_event_create(FMD_EVT_PROTOCOL, hrt, nvl, class); 1117 else { 1118 e = fmd_event_recreate(FMD_EVT_PROTOCOL, 1119 NULL, nvl, class, NULL, 0, 0); 1120 } 1121 1122 /* 1123 * If the debug log is enabled, create a temporary event, log it to the 1124 * debug log, and then reset the underlying state of the event. 1125 */ 1126 if (xip->xi_log != NULL) { 1127 fmd_event_impl_t *ep = (fmd_event_impl_t *)e; 1128 1129 fmd_log_append(xip->xi_log, e, NULL); 1130 1131 ep->ev_flags |= FMD_EVF_VOLATILE; 1132 ep->ev_off = 0; 1133 ep->ev_len = 0; 1134 1135 if (ep->ev_log != NULL) { 1136 fmd_log_rele(ep->ev_log); 1137 ep->ev_log = NULL; 1138 } 1139 } 1140 1141 /* 1142 * Iterate over the rules for the current state trying to match the 1143 * event class to one of our special rules. If a rule is matched, the 1144 * event is consumed and not dispatched to other modules. If the rule 1145 * set ends without matching an event, we fall through to dispatching. 1146 */ 1147 for (xrp = xip->xi_state; xrp->xr_class != NULL; xrp++) { 1148 if (fmd_event_match(e, FMD_EVT_PROTOCOL, xrp->xr_class)) { 1149 fmd_event_hold(e); 1150 xrp->xr_func(xip, nvl); 1151 fmd_event_rele(e); 1152 goto done; 1153 } 1154 } 1155 1156 /* 1157 * Record the event in the errlog if it is an ereport. This code will 1158 * be replaced later with a per-transport intent log instead. 1159 */ 1160 if (isereport == FMD_B_TRUE) { 1161 (void) pthread_rwlock_rdlock(&dp->d_log_lock); 1162 fmd_log_append(dp->d_errlog, e, NULL); 1163 (void) pthread_rwlock_unlock(&dp->d_log_lock); 1164 } 1165 1166 /* 1167 * If a list.suspect event is received, create a case for the specified 1168 * UUID in the case hash, with the transport module as its owner. If 1169 * the UUID is already known, fmd_case_recreate() will return NULL and 1170 * we simply proceed to our normal event handling regardless. 1171 */ 1172 if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_SUSPECT_CLASS) && 1173 nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 && 1174 nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code) == 0) { 1175 fmd_module_lock(xip->xi_queue->eq_mod); 1176 (void) fmd_case_recreate(xip->xi_queue->eq_mod, 1177 xp, FMD_CASE_SOLVED, uuid, code); 1178 fmd_module_unlock(xip->xi_queue->eq_mod); 1179 } 1180 1181 if (logonly == FMD_B_TRUE) { 1182 fmd_event_hold(e); 1183 fmd_event_rele(e); 1184 } else if (isproto == FMD_B_TRUE) 1185 fmd_dispq_dispatch(dp->d_disp, e, class); 1186 else 1187 fmd_modhash_dispatch(dp->d_mod_hash, e); 1188 done: 1189 (void) pthread_mutex_lock(&xip->xi_lock); 1190 1191 ASSERT(xip->xi_busy != 0); 1192 xip->xi_busy--; 1193 1194 (void) pthread_cond_broadcast(&xip->xi_cv); 1195 (void) pthread_mutex_unlock(&xip->xi_lock); 1196 } 1197 1198 void 1199 fmd_xprt_uuclose(fmd_xprt_t *xp, const char *uuid) 1200 { 1201 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1202 1203 fmd_event_t *e; 1204 nvlist_t *nvl; 1205 char *s; 1206 1207 fmd_dprintf(FMD_DBG_XPRT, 1208 "xprt %u closing case %s\n", xip->xi_id, uuid); 1209 1210 nvl = fmd_protocol_xprt_uuclose(xip->xi_queue->eq_mod, 1211 "resource.fm.xprt.uuclose", xip->xi_version, uuid); 1212 1213 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 1214 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 1215 fmd_eventq_insert_at_time(xip->xi_queue, e); 1216 } 1217 1218 /* 1219 * Insert the specified class into our remote subscription hash. If the class 1220 * is already present, bump the reference count; otherwise add it to the hash 1221 * and then enqueue an event for our remote peer to proxy our subscription. 1222 */ 1223 void 1224 fmd_xprt_subscribe(fmd_xprt_t *xp, const char *class) 1225 { 1226 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1227 1228 uint_t refs; 1229 nvlist_t *nvl; 1230 fmd_event_t *e; 1231 char *s; 1232 1233 if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 1234 return; /* read-only transports do not proxy subscriptions */ 1235 1236 if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER)) 1237 return; /* transport is not yet an active subscriber */ 1238 1239 (void) pthread_mutex_lock(&xip->xi_lock); 1240 refs = fmd_xprt_class_hash_insert(xip, &xip->xi_rsub, class); 1241 (void) pthread_mutex_unlock(&xip->xi_lock); 1242 1243 if (refs > 1) 1244 return; /* we've already asked our peer for this subscription */ 1245 1246 fmd_dprintf(FMD_DBG_XPRT, 1247 "xprt %u subscribing to %s\n", xip->xi_id, class); 1248 1249 nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod, 1250 "resource.fm.xprt.subscribe", xip->xi_version, class); 1251 1252 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 1253 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 1254 fmd_eventq_insert_at_time(xip->xi_queue, e); 1255 } 1256 1257 /* 1258 * Delete the specified class from the remote subscription hash. If the 1259 * reference count drops to zero, ask our remote peer to unsubscribe by proxy. 1260 */ 1261 void 1262 fmd_xprt_unsubscribe(fmd_xprt_t *xp, const char *class) 1263 { 1264 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1265 1266 uint_t refs; 1267 nvlist_t *nvl; 1268 fmd_event_t *e; 1269 char *s; 1270 1271 if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 1272 return; /* read-only transports do not proxy subscriptions */ 1273 1274 if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER)) 1275 return; /* transport is not yet an active subscriber */ 1276 1277 /* 1278 * If the subscription reference count drops to zero in xi_rsub, insert 1279 * an entry into the xi_usub hash indicating we await an unsuback event. 1280 */ 1281 (void) pthread_mutex_lock(&xip->xi_lock); 1282 1283 if ((refs = fmd_xprt_class_hash_delete(xip, &xip->xi_rsub, class)) == 0) 1284 (void) fmd_xprt_class_hash_insert(xip, &xip->xi_usub, class); 1285 1286 (void) pthread_mutex_unlock(&xip->xi_lock); 1287 1288 if (refs != 0) 1289 return; /* other subscriptions for this class still active */ 1290 1291 fmd_dprintf(FMD_DBG_XPRT, 1292 "xprt %u unsubscribing from %s\n", xip->xi_id, class); 1293 1294 nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod, 1295 "resource.fm.xprt.unsubscribe", xip->xi_version, class); 1296 1297 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 1298 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 1299 fmd_eventq_insert_at_time(xip->xi_queue, e); 1300 } 1301 1302 static void 1303 fmd_xprt_subscribe_xid(fmd_idspace_t *ids, id_t id, void *class) 1304 { 1305 fmd_xprt_t *xp; 1306 1307 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1308 fmd_xprt_subscribe(xp, class); 1309 fmd_idspace_rele(ids, id); 1310 } 1311 } 1312 1313 void 1314 fmd_xprt_subscribe_all(const char *class) 1315 { 1316 fmd_idspace_t *ids = fmd.d_xprt_ids; 1317 1318 if (ids->ids_count != 0) 1319 fmd_idspace_apply(ids, fmd_xprt_subscribe_xid, (void *)class); 1320 } 1321 1322 static void 1323 fmd_xprt_unsubscribe_xid(fmd_idspace_t *ids, id_t id, void *class) 1324 { 1325 fmd_xprt_t *xp; 1326 1327 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1328 fmd_xprt_unsubscribe(xp, class); 1329 fmd_idspace_rele(ids, id); 1330 } 1331 } 1332 1333 void 1334 fmd_xprt_unsubscribe_all(const char *class) 1335 { 1336 fmd_idspace_t *ids = fmd.d_xprt_ids; 1337 1338 if (ids->ids_count != 0) 1339 fmd_idspace_apply(ids, fmd_xprt_unsubscribe_xid, (void *)class); 1340 } 1341 1342 /*ARGSUSED*/ 1343 static void 1344 fmd_xprt_suspend_xid(fmd_idspace_t *ids, id_t id, void *arg) 1345 { 1346 fmd_xprt_t *xp; 1347 1348 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1349 fmd_xprt_xsuspend(xp, FMD_XPRT_DSUSPENDED); 1350 fmd_idspace_rele(ids, id); 1351 } 1352 } 1353 1354 void 1355 fmd_xprt_suspend_all(void) 1356 { 1357 fmd_idspace_t *ids = fmd.d_xprt_ids; 1358 1359 (void) pthread_mutex_lock(&fmd.d_xprt_lock); 1360 1361 if (fmd.d_xprt_suspend++ != 0) { 1362 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1363 return; /* already suspended */ 1364 } 1365 1366 if (ids->ids_count != 0) 1367 fmd_idspace_apply(ids, fmd_xprt_suspend_xid, NULL); 1368 1369 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1370 } 1371 1372 /*ARGSUSED*/ 1373 static void 1374 fmd_xprt_resume_xid(fmd_idspace_t *ids, id_t id, void *arg) 1375 { 1376 fmd_xprt_t *xp; 1377 1378 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1379 fmd_xprt_xresume(xp, FMD_XPRT_DSUSPENDED); 1380 fmd_idspace_rele(ids, id); 1381 } 1382 } 1383 1384 void 1385 fmd_xprt_resume_all(void) 1386 { 1387 fmd_idspace_t *ids = fmd.d_xprt_ids; 1388 1389 (void) pthread_mutex_lock(&fmd.d_xprt_lock); 1390 1391 if (fmd.d_xprt_suspend == 0) 1392 fmd_panic("fmd_xprt_suspend/resume_all mismatch\n"); 1393 1394 if (--fmd.d_xprt_suspend != 0) { 1395 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1396 return; /* not ready to be resumed */ 1397 } 1398 1399 if (ids->ids_count != 0) 1400 fmd_idspace_apply(ids, fmd_xprt_resume_xid, NULL); 1401 1402 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1403 } 1404