1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 /* 30 * FMD Transport Subsystem 31 * 32 * A transport module uses some underlying mechanism to transport events. 33 * This mechanism may use any underlying link-layer protocol and may support 34 * additional link-layer packets unrelated to FMA. Some appropriate link- 35 * layer mechanism to create the underlying connection is expected to be 36 * called prior to calling fmd_xprt_open() itself. Alternatively, a transport 37 * may be created in the suspended state by specifying the FMD_XPRT_SUSPENDED 38 * flag as part of the call to fmd_xprt_open(), and then may be resumed later. 39 * The underlying transport mechanism is *required* to provide ordering: that 40 * is, the sequences of bytes written across the transport must be read by 41 * the remote peer in the order that they are written, even across separate 42 * calls to fmdo_send(). As an example, the Internet TCP protocol would be 43 * a valid transport as it guarantees ordering, whereas the Internet UDP 44 * protocol would not because UDP datagrams may be delivered in any order 45 * as a result of delays introduced when datagrams pass through routers. 46 * 47 * Similar to sending events, a transport module receives events that are from 48 * its peer remote endpoint using some transport-specific mechanism that is 49 * unknown to FMD. As each event is received, the transport module is 50 * responsible for constructing a valid nvlist_t object from the data and then 51 * calling fmd_xprt_post() to post the event to the containing FMD's dispatch 52 * queue, making it available to all local modules that are not transport 53 * modules that have subscribed to the event. 54 * 55 * The following state machine is used for each transport. The initial state 56 * is either SYN, ACK, or RUN, depending on the flags specified to xprt_create. 57 * 58 * FMD_XPRT_ACCEPT !FMD_XPRT_ACCEPT 59 * | | 60 * waiting +--v--+ +--v--+ waiting 61 * for syn | SYN |--+ --+| ACK | for ack 62 * event +-----+ \ / +-----+ event 63 * | \ / | 64 * drop all +--v--+ X +--v--+ send subscriptions, 65 * events | ERR |<---+ +--->| SUB | recv subscriptions, 66 * +-----+ +-----+ wait for run event 67 * ^ | 68 * | +-----+ | 69 * +-----| RUN |<----+ 70 * +--^--+ 71 * | 72 * FMD_XPRT_RDONLY 73 * 74 * When fmd_xprt_open() is called without FMD_XPRT_ACCEPT, the Common Transport 75 * Layer enqueues a "syn" event for the module in its event queue and sets the 76 * state to ACK. In state ACK, we are waiting for the transport to get an 77 * "ack" event and call fmd_xprt_post() on this event. Other events will be 78 * discarded. If an "ack" is received, we transition to state SUB. If a 79 * configurable timeout occurs or if the "ack" is invalid (e.g. invalid version 80 * exchange), we transition to state ERR. Once in state ERR, no further 81 * operations are valid except fmd_xprt_close() and fmd_xprt_error() will 82 * return a non-zero value to the caller indicating the transport has failed. 83 * 84 * When fmd_xprt_open() is called with FMD_XPRT_ACCEPT, the Common Transport 85 * Layer assumes this transport is being used to accept a virtual connection 86 * from a remote peer that is sending a "syn", and sets the initial state to 87 * SYN. In this state, the transport waits for a "syn" event, validates it, 88 * and then transitions to state SUB if it is valid or state ERR if it is not. 89 * 90 * Once in state SUB, the transport module is expected to receive a sequence of 91 * zero or more "subscribe" events from the remote peer, followed by a "run" 92 * event. Once in state RUN, the transport is active and any events can be 93 * sent or received. The transport module is free to call fmd_xprt_close() 94 * from any state. The fmd_xprt_error() function will return zero if the 95 * transport is not in the ERR state, or non-zero if it is in the ERR state. 96 * 97 * Once the state machine reaches RUN, other FMA protocol events can be sent 98 * and received across the transport in addition to the various control events. 99 * 100 * Table of Common Transport Layer Control Events 101 * ============================================== 102 * 103 * FMA Class Payload 104 * --------- ------- 105 * resource.fm.xprt.uuclose string (uuid of case) 106 * resource.fm.xprt.subscribe string (class pattern) 107 * resource.fm.xprt.unsubscribe string (class pattern) 108 * resource.fm.xprt.unsuback string (class pattern) 109 * resource.fm.xprt.syn version information 110 * resource.fm.xprt.ack version information 111 * resource.fm.xprt.run version information 112 * 113 * Control events are used to add and delete proxy subscriptions on the remote 114 * transport peer module, and to set up connections. When a "syn" event is 115 * sent, FMD will include in the payload the highest version of the FMA event 116 * protocol that is supported by the sender. When a "syn" event is received, 117 * the receiving FMD will use the minimum of this version and its version of 118 * the protocol, and reply with this new minimum version in the "ack" event. 119 * The receiver will then use this new minimum for subsequent event semantics. 120 */ 121 122 #include <sys/fm/protocol.h> 123 #include <strings.h> 124 #include <limits.h> 125 126 #include <fmd_alloc.h> 127 #include <fmd_error.h> 128 #include <fmd_conf.h> 129 #include <fmd_subr.h> 130 #include <fmd_string.h> 131 #include <fmd_protocol.h> 132 #include <fmd_thread.h> 133 #include <fmd_eventq.h> 134 #include <fmd_dispq.h> 135 #include <fmd_ctl.h> 136 #include <fmd_log.h> 137 #include <fmd_ustat.h> 138 #include <fmd_case.h> 139 #include <fmd_api.h> 140 #include <fmd_fmri.h> 141 #include <fmd_asru.h> 142 #include <fmd_xprt.h> 143 144 #include <fmd.h> 145 146 /* 147 * The states shown above in the transport state machine diagram are encoded 148 * using arrays of class patterns and a corresponding action function. These 149 * arrays are then passed to fmd_xprt_transition() to change transport states. 150 */ 151 152 const fmd_xprt_rule_t _fmd_xprt_state_syn[] = { 153 { "resource.fm.xprt.syn", fmd_xprt_event_syn }, 154 { "*", fmd_xprt_event_error }, 155 { NULL, NULL } 156 }; 157 158 const fmd_xprt_rule_t _fmd_xprt_state_ack[] = { 159 { "resource.fm.xprt.ack", fmd_xprt_event_ack }, 160 { "*", fmd_xprt_event_error }, 161 }; 162 163 const fmd_xprt_rule_t _fmd_xprt_state_err[] = { 164 { "*", fmd_xprt_event_drop }, 165 { NULL, NULL } 166 }; 167 168 const fmd_xprt_rule_t _fmd_xprt_state_sub[] = { 169 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub }, 170 { "resource.fm.xprt.run", fmd_xprt_event_run }, 171 { "resource.fm.xprt.*", fmd_xprt_event_error }, 172 { "*", fmd_xprt_event_drop }, 173 { NULL, NULL } 174 }; 175 176 const fmd_xprt_rule_t _fmd_xprt_state_run[] = { 177 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub }, 178 { "resource.fm.xprt.unsubscribe", fmd_xprt_event_unsub }, 179 { "resource.fm.xprt.unsuback", fmd_xprt_event_unsuback }, 180 { "resource.fm.xprt.uuclose", fmd_xprt_event_uuclose }, 181 { "resource.fm.xprt.*", fmd_xprt_event_error }, 182 { NULL, NULL } 183 }; 184 185 /* 186 * Template for per-transport statistics installed by fmd on behalf of each 187 * transport. These are used to initialize the per-transport xi_stats. For 188 * each statistic, the name is prepended with "fmd.xprt.%u", where %u is the 189 * transport ID (xi_id) and then are inserted into the per-module stats hash. 190 * The values in this array must match fmd_xprt_stat_t from <fmd_xprt.h>. 191 */ 192 static const fmd_xprt_stat_t _fmd_xprt_stat_tmpl = { 193 { 194 { "dispatched", FMD_TYPE_UINT64, "total events dispatched to transport" }, 195 { "dequeued", FMD_TYPE_UINT64, "total events dequeued by transport" }, 196 { "prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by transport" }, 197 { "dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" }, 198 { "wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" }, 199 { "wtime", FMD_TYPE_TIME, "total wait time on queue" }, 200 { "wlentime", FMD_TYPE_TIME, "total wait length * time product" }, 201 { "wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" }, 202 { "dtime", FMD_TYPE_TIME, "total processing time after dequeue" }, 203 { "dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" }, 204 }, 205 { "module", FMD_TYPE_STRING, "module that owns this transport" }, 206 { "authority", FMD_TYPE_STRING, "authority associated with this transport" }, 207 { "state", FMD_TYPE_STRING, "current transport state" }, 208 { "received", FMD_TYPE_UINT64, "events received by transport" }, 209 { "discarded", FMD_TYPE_UINT64, "bad events discarded by transport" }, 210 { "retried", FMD_TYPE_UINT64, "retries requested of transport" }, 211 { "replayed", FMD_TYPE_UINT64, "events replayed by transport" }, 212 { "lost", FMD_TYPE_UINT64, "events lost by transport" }, 213 { "timeouts", FMD_TYPE_UINT64, "events received by transport with ttl=0" }, 214 { "subscriptions", FMD_TYPE_UINT64, "subscriptions registered to transport" }, 215 }; 216 217 static void 218 fmd_xprt_class_hash_create(fmd_xprt_class_hash_t *xch, fmd_eventq_t *eq) 219 { 220 uint_t hashlen = fmd.d_str_buckets; 221 222 xch->xch_queue = eq; 223 xch->xch_hashlen = hashlen; 224 xch->xch_hash = fmd_zalloc(sizeof (void *) * hashlen, FMD_SLEEP); 225 } 226 227 static void 228 fmd_xprt_class_hash_destroy(fmd_xprt_class_hash_t *xch) 229 { 230 fmd_eventq_t *eq = xch->xch_queue; 231 fmd_xprt_class_t *xcp, *ncp; 232 uint_t i; 233 234 for (i = 0; i < xch->xch_hashlen; i++) { 235 for (xcp = xch->xch_hash[i]; xcp != NULL; xcp = ncp) { 236 ncp = xcp->xc_next; 237 238 if (eq != NULL) 239 fmd_dispq_delete(fmd.d_disp, eq, xcp->xc_class); 240 241 fmd_strfree(xcp->xc_class); 242 fmd_free(xcp, sizeof (fmd_xprt_class_t)); 243 } 244 } 245 246 fmd_free(xch->xch_hash, sizeof (void *) * xch->xch_hashlen); 247 } 248 249 /* 250 * Insert the specified class into the specified class hash, and return the 251 * reference count. A return value of one indicates this is the first insert. 252 * If an eventq is associated with the hash, insert a dispq subscription for it. 253 */ 254 static uint_t 255 fmd_xprt_class_hash_insert(fmd_xprt_impl_t *xip, 256 fmd_xprt_class_hash_t *xch, const char *class) 257 { 258 uint_t h = fmd_strhash(class) % xch->xch_hashlen; 259 fmd_xprt_class_t *xcp; 260 261 ASSERT(MUTEX_HELD(&xip->xi_lock)); 262 263 for (xcp = xch->xch_hash[h]; xcp != NULL; xcp = xcp->xc_next) { 264 if (strcmp(class, xcp->xc_class) == 0) 265 return (++xcp->xc_refs); 266 } 267 268 xcp = fmd_alloc(sizeof (fmd_xprt_class_t), FMD_SLEEP); 269 xcp->xc_class = fmd_strdup(class, FMD_SLEEP); 270 xcp->xc_next = xch->xch_hash[h]; 271 xcp->xc_refs = 1; 272 xch->xch_hash[h] = xcp; 273 274 if (xch->xch_queue != NULL) 275 fmd_dispq_insert(fmd.d_disp, xch->xch_queue, class); 276 277 return (xcp->xc_refs); 278 } 279 280 /* 281 * Delete the specified class from the specified class hash, and return the 282 * reference count. A return value of zero indicates the class was deleted. 283 * If an eventq is associated with the hash, delete the dispq subscription. 284 */ 285 static uint_t 286 fmd_xprt_class_hash_delete(fmd_xprt_impl_t *xip, 287 fmd_xprt_class_hash_t *xch, const char *class) 288 { 289 uint_t h = fmd_strhash(class) % xch->xch_hashlen; 290 fmd_xprt_class_t *xcp, **pp; 291 292 ASSERT(MUTEX_HELD(&xip->xi_lock)); 293 pp = &xch->xch_hash[h]; 294 295 for (xcp = *pp; xcp != NULL; xcp = xcp->xc_next) { 296 if (strcmp(class, xcp->xc_class) == 0) 297 break; 298 else 299 pp = &xcp->xc_next; 300 } 301 302 if (xcp == NULL) 303 return (-1U); /* explicitly permit an invalid delete */ 304 305 if (--xcp->xc_refs != 0) 306 return (xcp->xc_refs); 307 308 ASSERT(xcp->xc_refs == 0); 309 *pp = xcp->xc_next; 310 311 fmd_strfree(xcp->xc_class); 312 fmd_free(xcp, sizeof (fmd_xprt_class_t)); 313 314 if (xch->xch_queue != NULL) 315 fmd_dispq_delete(fmd.d_disp, xch->xch_queue, class); 316 317 return (0); 318 } 319 320 /* 321 * Queue subscribe events for the specified transport corresponding to all of 322 * the active module subscriptions. This is an extremely heavyweight operation 323 * that we expect to take place rarely (i.e. when loading a transport module 324 * or when it establishes a connection). We lock all of the known modules to 325 * prevent them from adding or deleting subscriptions, then snapshot their 326 * subscriptions, and then unlock all of the modules. We hold the modhash 327 * lock for the duration of this operation to prevent new modules from loading. 328 */ 329 static void 330 fmd_xprt_subscribe_modhash(fmd_xprt_impl_t *xip, fmd_modhash_t *mhp) 331 { 332 fmd_xprt_t *xp = (fmd_xprt_t *)xip; 333 const fmd_conf_path_t *pap; 334 fmd_module_t *mp; 335 uint_t i, j; 336 337 (void) pthread_rwlock_rdlock(&mhp->mh_lock); 338 339 for (i = 0; i < mhp->mh_hashlen; i++) { 340 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) 341 fmd_module_lock(mp); 342 } 343 344 (void) pthread_mutex_lock(&xip->xi_lock); 345 ASSERT(!(xip->xi_flags & FMD_XPRT_SUBSCRIBER)); 346 xip->xi_flags |= FMD_XPRT_SUBSCRIBER; 347 (void) pthread_mutex_unlock(&xip->xi_lock); 348 349 for (i = 0; i < mhp->mh_hashlen; i++) { 350 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) { 351 (void) fmd_conf_getprop(mp->mod_conf, 352 FMD_PROP_SUBSCRIPTIONS, &pap); 353 for (j = 0; j < pap->cpa_argc; j++) 354 fmd_xprt_subscribe(xp, pap->cpa_argv[j]); 355 } 356 } 357 358 for (i = 0; i < mhp->mh_hashlen; i++) { 359 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) 360 fmd_module_unlock(mp); 361 } 362 363 (void) pthread_rwlock_unlock(&mhp->mh_lock); 364 } 365 366 static void 367 fmd_xprt_transition(fmd_xprt_impl_t *xip, 368 const fmd_xprt_rule_t *state, const char *tag) 369 { 370 fmd_event_t *e; 371 nvlist_t *nvl; 372 char *s; 373 374 TRACE((FMD_DBG_XPRT, "xprt %u -> %s\n", xip->xi_id, tag)); 375 376 xip->xi_state = state; 377 s = fmd_strdup(tag, FMD_SLEEP); 378 379 (void) pthread_mutex_lock(&xip->xi_stats_lock); 380 fmd_strfree(xip->xi_stats->xs_state.fmds_value.str); 381 xip->xi_stats->xs_state.fmds_value.str = s; 382 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 383 384 /* 385 * If we've reached the SUB state, take out the big hammer and snapshot 386 * all of the subscriptions of all of the loaded modules. Then queue a 387 * run event for our remote peer indicating that it can enter RUN. 388 */ 389 if (state == _fmd_xprt_state_sub) { 390 fmd_xprt_subscribe_modhash(xip, fmd.d_mod_hash); 391 392 nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod, 393 "resource.fm.xprt.run", xip->xi_version); 394 395 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 396 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 397 fmd_eventq_insert_at_time(xip->xi_queue, e); 398 } 399 } 400 401 static void 402 fmd_xprt_authupdate(fmd_xprt_impl_t *xip) 403 { 404 char *s = fmd_fmri_auth2str(xip->xi_auth); 405 406 (void) pthread_mutex_lock(&xip->xi_stats_lock); 407 fmd_strfree(xip->xi_stats->xs_authority.fmds_value.str); 408 xip->xi_stats->xs_authority.fmds_value.str = s; 409 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 410 } 411 412 static int 413 fmd_xprt_vmismatch(fmd_xprt_impl_t *xip, nvlist_t *nvl, uint_t *rversionp) 414 { 415 uint8_t rversion; 416 417 if (nvlist_lookup_uint8(nvl, FM_VERSION, &rversion) != 0) { 418 (void) pthread_mutex_lock(&xip->xi_stats_lock); 419 xip->xi_stats->xs_discarded.fmds_value.ui64++; 420 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 421 422 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR"); 423 return (1); 424 } 425 426 if (rversion > xip->xi_version) { 427 fmd_dprintf(FMD_DBG_XPRT, "xprt %u protocol mismatch: %u>%u\n", 428 xip->xi_id, rversion, xip->xi_version); 429 430 (void) pthread_mutex_lock(&xip->xi_stats_lock); 431 xip->xi_stats->xs_discarded.fmds_value.ui64++; 432 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 433 434 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR"); 435 return (1); 436 } 437 438 if (rversionp != NULL) 439 *rversionp = rversion; 440 441 return (0); 442 } 443 444 void 445 fmd_xprt_event_syn(fmd_xprt_impl_t *xip, nvlist_t *nvl) 446 { 447 fmd_event_t *e; 448 uint_t vers; 449 char *class; 450 451 if (fmd_xprt_vmismatch(xip, nvl, &vers)) 452 return; /* transitioned to error state */ 453 454 /* 455 * If the transport module didn't specify an authority, extract the 456 * one that is passed along with the xprt.syn event and use that. 457 */ 458 if (xip->xi_auth == NULL && 459 nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 && 460 nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) { 461 (void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva); 462 fmd_xprt_authupdate(xip); 463 } 464 465 nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod, 466 "resource.fm.xprt.ack", xip->xi_version); 467 468 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 469 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 470 fmd_eventq_insert_at_time(xip->xi_queue, e); 471 472 xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers); 473 fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB"); 474 } 475 476 void 477 fmd_xprt_event_ack(fmd_xprt_impl_t *xip, nvlist_t *nvl) 478 { 479 uint_t vers; 480 481 if (fmd_xprt_vmismatch(xip, nvl, &vers)) 482 return; /* transitioned to error state */ 483 484 /* 485 * If the transport module didn't specify an authority, extract the 486 * one that is passed along with the xprt.syn event and use that. 487 */ 488 if (xip->xi_auth == NULL && 489 nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 && 490 nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) { 491 (void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva); 492 fmd_xprt_authupdate(xip); 493 } 494 495 xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers); 496 fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB"); 497 } 498 499 /* 500 * Upon transition to RUN, we take every solved case and resend a list.suspect 501 * event for it to our remote peer. If a case transitions from solved to a 502 * future state (CLOSE_WAIT, CLOSED, or REPAIRED) while we are iterating over 503 * the case hash, we will get it as part of examining the resource cache, next. 504 */ 505 static void 506 fmd_xprt_send_case(fmd_case_t *cp, void *arg) 507 { 508 fmd_case_impl_t *cip = (fmd_case_impl_t *)cp; 509 fmd_xprt_impl_t *xip = arg; 510 511 fmd_event_t *e; 512 nvlist_t *nvl; 513 char *class; 514 515 if (cip->ci_state != FMD_CASE_SOLVED) 516 return; /* unsolved, or we'll get it during the ASRU pass */ 517 518 nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS); 519 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 520 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 521 522 fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n", 523 FM_LIST_SUSPECT_CLASS, cip->ci_uuid, xip->xi_id); 524 525 fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid); 526 } 527 528 /* 529 * Upon transition to RUN, we take every ASRU which is in the degraded state 530 * and resend a fault.* event for it to our remote peer, in case the peer is 531 * running in the fault manager that knows how to disable this resource. If 532 * any new resources are added to the cache during our iteration, this is no 533 * problem because our subscriptions are already proxied and so any new cases 534 * will result in a list.suspect event being transported if that is needed. 535 */ 536 static void 537 fmd_xprt_send_asru(fmd_asru_t *ap, void *arg) 538 { 539 fmd_xprt_impl_t *xip = arg; 540 nvlist_t *nvl = NULL; 541 fmd_event_t *e; 542 char *class; 543 544 (void) pthread_mutex_lock(&ap->asru_lock); 545 546 if ((ap->asru_flags & (FMD_ASRU_INTERNAL | FMD_ASRU_STATE)) == 547 FMD_ASRU_FAULTY && fmd_case_orphaned(ap->asru_case)) 548 (void) nvlist_xdup(ap->asru_event, &nvl, &fmd.d_nva); 549 550 (void) pthread_mutex_unlock(&ap->asru_lock); 551 552 if (nvl == NULL) 553 return; /* asru is internal, unusable, or not faulty */ 554 555 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 556 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 557 558 fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n", 559 class, ap->asru_name, xip->xi_id); 560 561 fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid); 562 } 563 564 void 565 fmd_xprt_event_run(fmd_xprt_impl_t *xip, nvlist_t *nvl) 566 { 567 if (!fmd_xprt_vmismatch(xip, nvl, NULL)) { 568 fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN"); 569 fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case, xip); 570 fmd_asru_hash_apply(fmd.d_asrus, fmd_xprt_send_asru, xip); 571 } 572 } 573 574 void 575 fmd_xprt_event_sub(fmd_xprt_impl_t *xip, nvlist_t *nvl) 576 { 577 char *class; 578 579 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 580 return; /* transitioned to error state */ 581 582 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0) 583 return; /* malformed protocol event */ 584 585 (void) pthread_mutex_lock(&xip->xi_lock); 586 (void) fmd_xprt_class_hash_insert(xip, &xip->xi_lsub, class); 587 (void) pthread_mutex_unlock(&xip->xi_lock); 588 589 (void) pthread_mutex_lock(&xip->xi_stats_lock); 590 xip->xi_stats->xs_subscriptions.fmds_value.ui64++; 591 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 592 } 593 594 void 595 fmd_xprt_event_unsub(fmd_xprt_impl_t *xip, nvlist_t *nvl) 596 { 597 fmd_event_t *e; 598 char *class; 599 600 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 601 return; /* transitioned to error state */ 602 603 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0) 604 return; /* malformed protocol event */ 605 606 (void) pthread_mutex_lock(&xip->xi_lock); 607 (void) fmd_xprt_class_hash_delete(xip, &xip->xi_lsub, class); 608 (void) pthread_mutex_unlock(&xip->xi_lock); 609 610 (void) pthread_mutex_lock(&xip->xi_stats_lock); 611 xip->xi_stats->xs_subscriptions.fmds_value.ui64--; 612 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 613 614 nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod, 615 "resource.fm.xprt.unsuback", xip->xi_version, class); 616 617 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 618 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 619 fmd_eventq_insert_at_time(xip->xi_queue, e); 620 } 621 622 void 623 fmd_xprt_event_unsuback(fmd_xprt_impl_t *xip, nvlist_t *nvl) 624 { 625 char *class; 626 627 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 628 return; /* transitioned to error state */ 629 630 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0) 631 return; /* malformed protocol event */ 632 633 (void) pthread_mutex_lock(&xip->xi_lock); 634 (void) fmd_xprt_class_hash_delete(xip, &xip->xi_usub, class); 635 (void) pthread_mutex_unlock(&xip->xi_lock); 636 } 637 638 void 639 fmd_xprt_event_uuclose(fmd_xprt_impl_t *xip, nvlist_t *nvl) 640 { 641 fmd_case_t *cp; 642 char *uuid; 643 644 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 645 return; /* transitioned to error state */ 646 647 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 && 648 (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) { 649 fmd_case_transition(cp, FMD_CASE_CLOSE_WAIT, FMD_CF_ISOLATED); 650 fmd_case_rele(cp); 651 } 652 } 653 654 void 655 fmd_xprt_event_error(fmd_xprt_impl_t *xip, nvlist_t *nvl) 656 { 657 char *class = "<unknown>"; 658 659 (void) pthread_mutex_lock(&xip->xi_stats_lock); 660 xip->xi_stats->xs_discarded.fmds_value.ui64++; 661 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 662 663 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 664 TRACE((FMD_DBG_XPRT, "xprt %u bad event %s\n", xip->xi_id, class)); 665 666 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR"); 667 } 668 669 void 670 fmd_xprt_event_drop(fmd_xprt_impl_t *xip, nvlist_t *nvl) 671 { 672 char *class = "<unknown>"; 673 674 (void) pthread_mutex_lock(&xip->xi_stats_lock); 675 xip->xi_stats->xs_discarded.fmds_value.ui64++; 676 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 677 678 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 679 TRACE((FMD_DBG_XPRT, "xprt %u drop event %s\n", xip->xi_id, class)); 680 681 } 682 683 fmd_xprt_t * 684 fmd_xprt_create(fmd_module_t *mp, uint_t flags, nvlist_t *auth, void *data) 685 { 686 fmd_xprt_impl_t *xip = fmd_zalloc(sizeof (fmd_xprt_impl_t), FMD_SLEEP); 687 fmd_stat_t *statv; 688 uint_t i, statc; 689 690 char buf[PATH_MAX]; 691 fmd_event_t *e; 692 nvlist_t *nvl; 693 char *s; 694 695 (void) pthread_mutex_init(&xip->xi_lock, NULL); 696 (void) pthread_cond_init(&xip->xi_cv, NULL); 697 (void) pthread_mutex_init(&xip->xi_stats_lock, NULL); 698 699 xip->xi_auth = auth; 700 xip->xi_data = data; 701 xip->xi_version = FM_RSRC_XPRT_VERSION; 702 xip->xi_flags = flags; 703 704 /* 705 * Grab fmd.d_xprt_lock to block fmd_xprt_suspend_all() and then create 706 * a transport ID and make it visible in fmd.d_xprt_ids. If transports 707 * were previously suspended, set the FMD_XPRT_DSUSPENDED flag on us to 708 * ensure that this transport will not run until fmd_xprt_resume_all(). 709 */ 710 (void) pthread_mutex_lock(&fmd.d_xprt_lock); 711 xip->xi_id = fmd_idspace_alloc(fmd.d_xprt_ids, xip); 712 713 if (fmd.d_xprt_suspend != 0) 714 xip->xi_flags |= FMD_XPRT_DSUSPENDED; 715 716 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 717 718 /* 719 * If the module has not yet finished _fmd_init(), set the ISUSPENDED 720 * bit so that fmdo_send() is not called until _fmd_init() completes. 721 */ 722 if (!(mp->mod_flags & FMD_MOD_INIT)) 723 xip->xi_flags |= FMD_XPRT_ISUSPENDED; 724 725 /* 726 * Initialize the transport statistics that we keep on behalf of fmd. 727 * These are set up using a template defined at the top of this file. 728 * We rename each statistic with a prefix ensuring its uniqueness. 729 */ 730 statc = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t); 731 statv = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP); 732 bcopy(&_fmd_xprt_stat_tmpl, statv, sizeof (_fmd_xprt_stat_tmpl)); 733 734 for (i = 0; i < statc; i++) { 735 (void) snprintf(statv[i].fmds_name, 736 sizeof (statv[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id, 737 ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name); 738 } 739 740 xip->xi_stats = (fmd_xprt_stat_t *)fmd_ustat_insert( 741 mp->mod_ustat, FMD_USTAT_NOALLOC, statc, statv, NULL); 742 743 if (xip->xi_stats == NULL) 744 fmd_panic("failed to create xi_stats (%p)\n", (void *)statv); 745 746 xip->xi_stats->xs_module.fmds_value.str = 747 fmd_strdup(mp->mod_name, FMD_SLEEP); 748 749 if (xip->xi_auth != NULL) 750 fmd_xprt_authupdate(xip); 751 752 /* 753 * Create the outbound eventq for this transport and link to its stats. 754 * If any suspend bits were set above, suspend the eventq immediately. 755 */ 756 xip->xi_queue = fmd_eventq_create(mp, &xip->xi_stats->xs_evqstat, 757 &xip->xi_stats_lock, mp->mod_stats->ms_xprtqlimit.fmds_value.ui32); 758 759 if (xip->xi_flags & FMD_XPRT_SMASK) 760 fmd_eventq_suspend(xip->xi_queue); 761 762 /* 763 * Create our subscription hashes: local subscriptions go to xi_queue, 764 * remote subscriptions are tracked only for protocol requests, and 765 * pending unsubscriptions are associated with the /dev/null eventq. 766 */ 767 fmd_xprt_class_hash_create(&xip->xi_lsub, xip->xi_queue); 768 fmd_xprt_class_hash_create(&xip->xi_rsub, NULL); 769 fmd_xprt_class_hash_create(&xip->xi_usub, fmd.d_rmod->mod_queue); 770 771 /* 772 * Determine our initial state based upon the creation flags. If we're 773 * read-only, go directly to RUN. If we're accepting a new connection, 774 * wait for a SYN. Otherwise send a SYN and wait for an ACK. 775 */ 776 if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 777 fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN"); 778 else if (flags & FMD_XPRT_ACCEPT) 779 fmd_xprt_transition(xip, _fmd_xprt_state_syn, "SYN"); 780 else 781 fmd_xprt_transition(xip, _fmd_xprt_state_ack, "ACK"); 782 783 /* 784 * If client.xprtlog is set to TRUE, create a debugging log for the 785 * events received by the transport in var/fm/fmd/xprt/. 786 */ 787 (void) fmd_conf_getprop(fmd.d_conf, "client.xprtlog", &i); 788 (void) fmd_conf_getprop(fmd.d_conf, "log.xprt", &s); 789 790 if (i) { 791 (void) snprintf(buf, sizeof (buf), "%s/%u.log", s, xip->xi_id); 792 xip->xi_log = fmd_log_open(fmd.d_rootdir, buf, FMD_LOG_XPRT); 793 } 794 795 ASSERT(fmd_module_locked(mp)); 796 fmd_list_append(&mp->mod_transports, xip); 797 798 (void) pthread_mutex_lock(&mp->mod_stats_lock); 799 mp->mod_stats->ms_xprtopen.fmds_value.ui32++; 800 (void) pthread_mutex_unlock(&mp->mod_stats_lock); 801 802 /* 803 * If this is a read-only transport, return without creating a send 804 * queue thread and setting up any connection events in our queue. 805 */ 806 if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 807 goto out; 808 809 /* 810 * Once the transport is fully initialized, create a send queue thread 811 * and start any connect events flowing to complete our initialization. 812 */ 813 if ((xip->xi_thread = fmd_thread_create(mp, 814 (fmd_thread_f *)fmd_xprt_send, xip)) == NULL) { 815 816 fmd_error(EFMD_XPRT_THR, 817 "failed to create thread for transport %u", xip->xi_id); 818 819 fmd_xprt_destroy((fmd_xprt_t *)xip); 820 (void) fmd_set_errno(EFMD_XPRT_THR); 821 return (NULL); 822 } 823 824 /* 825 * If the transport is not being opened to accept an inbound connect, 826 * start an outbound connection by enqueuing a SYN event for our peer. 827 */ 828 if (!(flags & FMD_XPRT_ACCEPT)) { 829 nvl = fmd_protocol_xprt_ctl(mp, 830 "resource.fm.xprt.syn", FM_RSRC_XPRT_VERSION); 831 832 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 833 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 834 fmd_eventq_insert_at_time(xip->xi_queue, e); 835 } 836 out: 837 fmd_dprintf(FMD_DBG_XPRT, "opened transport %u\n", xip->xi_id); 838 return ((fmd_xprt_t *)xip); 839 } 840 841 void 842 fmd_xprt_destroy(fmd_xprt_t *xp) 843 { 844 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 845 fmd_module_t *mp = xip->xi_queue->eq_mod; 846 uint_t id = xip->xi_id; 847 848 fmd_case_impl_t *cip, *nip; 849 fmd_stat_t *sp; 850 uint_t i, n; 851 852 ASSERT(fmd_module_locked(mp)); 853 fmd_list_delete(&mp->mod_transports, xip); 854 855 (void) pthread_mutex_lock(&mp->mod_stats_lock); 856 mp->mod_stats->ms_xprtopen.fmds_value.ui32--; 857 (void) pthread_mutex_unlock(&mp->mod_stats_lock); 858 859 (void) pthread_mutex_lock(&xip->xi_lock); 860 861 while (xip->xi_busy != 0) 862 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock); 863 864 /* 865 * Remove the transport from global visibility, cancel its send-side 866 * thread, join with it, and then remove the transport from module 867 * visibility. Once all this is done, destroy and free the transport. 868 */ 869 (void) fmd_idspace_free(fmd.d_xprt_ids, xip->xi_id); 870 871 if (xip->xi_thread != NULL) { 872 fmd_eventq_abort(xip->xi_queue); 873 fmd_thread_destroy(xip->xi_thread, FMD_THREAD_JOIN); 874 } 875 876 if (xip->xi_log != NULL) 877 fmd_log_rele(xip->xi_log); 878 879 /* 880 * Release every case handle in the module that was cached by this 881 * transport. This will result in these cases disappearing from the 882 * local case hash so that fmd_case_uuclose() can no longer be used. 883 */ 884 for (cip = fmd_list_next(&mp->mod_cases); cip != NULL; cip = nip) { 885 nip = fmd_list_next(cip); 886 if (cip->ci_xprt == xp) 887 fmd_case_discard((fmd_case_t *)cip); 888 } 889 890 /* 891 * Destroy every class in the various subscription hashes and remove 892 * any corresponding subscriptions from the event dispatch queue. 893 */ 894 fmd_xprt_class_hash_destroy(&xip->xi_lsub); 895 fmd_xprt_class_hash_destroy(&xip->xi_rsub); 896 fmd_xprt_class_hash_destroy(&xip->xi_usub); 897 898 /* 899 * Uniquify the stat names exactly as was done in fmd_xprt_create() 900 * before calling fmd_ustat_insert(), otherwise fmd_ustat_delete() 901 * won't find the entries in the hash table. 902 */ 903 n = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t); 904 sp = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP); 905 bcopy(&_fmd_xprt_stat_tmpl, sp, sizeof (_fmd_xprt_stat_tmpl)); 906 for (i = 0; i < n; i++) { 907 (void) snprintf(sp[i].fmds_name, 908 sizeof (sp[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id, 909 ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name); 910 } 911 fmd_ustat_delete(mp->mod_ustat, n, sp); 912 fmd_free(sp, sizeof (_fmd_xprt_stat_tmpl)); 913 914 fmd_free(xip->xi_stats, sizeof (fmd_xprt_stat_t)); 915 fmd_eventq_destroy(xip->xi_queue); 916 nvlist_free(xip->xi_auth); 917 fmd_free(xip, sizeof (fmd_xprt_impl_t)); 918 919 fmd_dprintf(FMD_DBG_XPRT, "closed transport %u\n", id); 920 } 921 922 void 923 fmd_xprt_xsuspend(fmd_xprt_t *xp, uint_t flags) 924 { 925 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 926 uint_t oflags; 927 928 ASSERT((flags & ~FMD_XPRT_SMASK) == 0); 929 (void) pthread_mutex_lock(&xip->xi_lock); 930 931 oflags = xip->xi_flags; 932 xip->xi_flags |= flags; 933 934 if (!(oflags & FMD_XPRT_SMASK) && (xip->xi_flags & FMD_XPRT_SMASK) != 0) 935 fmd_eventq_suspend(xip->xi_queue); 936 937 (void) pthread_cond_broadcast(&xip->xi_cv); 938 939 while (xip->xi_busy != 0) 940 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock); 941 942 (void) pthread_mutex_unlock(&xip->xi_lock); 943 } 944 945 void 946 fmd_xprt_xresume(fmd_xprt_t *xp, uint_t flags) 947 { 948 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 949 uint_t oflags; 950 951 ASSERT((flags & ~FMD_XPRT_SMASK) == 0); 952 (void) pthread_mutex_lock(&xip->xi_lock); 953 954 oflags = xip->xi_flags; 955 xip->xi_flags &= ~flags; 956 957 if ((oflags & FMD_XPRT_SMASK) != 0 && !(xip->xi_flags & FMD_XPRT_SMASK)) 958 fmd_eventq_resume(xip->xi_queue); 959 960 (void) pthread_cond_broadcast(&xip->xi_cv); 961 (void) pthread_mutex_unlock(&xip->xi_lock); 962 } 963 964 void 965 fmd_xprt_send(fmd_xprt_t *xp) 966 { 967 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 968 fmd_module_t *mp = xip->xi_queue->eq_mod; 969 fmd_event_t *ep; 970 int err; 971 972 while ((ep = fmd_eventq_delete(xip->xi_queue)) != NULL) { 973 if (FMD_EVENT_TTL(ep) == 0) { 974 fmd_event_rele(ep); 975 continue; 976 } 977 978 fmd_dprintf(FMD_DBG_XPRT, "xprt %u sending %s\n", 979 xip->xi_id, (char *)FMD_EVENT_DATA(ep)); 980 981 err = mp->mod_ops->mop_transport(mp, xp, ep); 982 fmd_eventq_done(xip->xi_queue); 983 984 if (err == FMD_SEND_RETRY) { 985 fmd_eventq_insert_at_time(xip->xi_queue, ep); 986 (void) pthread_mutex_lock(&xip->xi_stats_lock); 987 xip->xi_stats->xs_retried.fmds_value.ui64++; 988 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 989 } 990 991 if (err != FMD_SEND_SUCCESS && err != FMD_SEND_RETRY) { 992 (void) pthread_mutex_lock(&xip->xi_stats_lock); 993 xip->xi_stats->xs_lost.fmds_value.ui64++; 994 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 995 } 996 997 fmd_event_rele(ep); 998 } 999 } 1000 1001 void 1002 fmd_xprt_recv(fmd_xprt_t *xp, nvlist_t *nvl, hrtime_t hrt) 1003 { 1004 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1005 const fmd_xprt_rule_t *xrp; 1006 fmd_t *dp = &fmd; 1007 1008 fmd_event_t *e; 1009 char *class, *uuid, *code; 1010 int isproto; 1011 1012 uint64_t *tod; 1013 uint8_t ttl; 1014 uint_t n; 1015 1016 /* 1017 * Grab the transport lock and set the busy flag to indicate we are 1018 * busy receiving an event. If [DI]SUSPEND is pending, wait until fmd 1019 * resumes the transport before continuing on with the receive. 1020 */ 1021 (void) pthread_mutex_lock(&xip->xi_lock); 1022 1023 while (xip->xi_flags & (FMD_XPRT_DSUSPENDED | FMD_XPRT_ISUSPENDED)) { 1024 1025 if (fmd.d_signal != 0) { 1026 (void) pthread_mutex_unlock(&xip->xi_lock); 1027 return; /* fmd_destroy() is in progress */ 1028 } 1029 1030 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock); 1031 } 1032 1033 xip->xi_busy++; 1034 ASSERT(xip->xi_busy != 0); 1035 1036 (void) pthread_mutex_unlock(&xip->xi_lock); 1037 1038 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1039 xip->xi_stats->xs_received.fmds_value.ui64++; 1040 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1041 1042 if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) { 1043 fmd_error(EFMD_XPRT_PAYLOAD, "discarding nvlist %p: missing " 1044 "required \"%s\" payload element", (void *)nvl, FM_CLASS); 1045 1046 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1047 xip->xi_stats->xs_discarded.fmds_value.ui64++; 1048 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1049 1050 nvlist_free(nvl); 1051 goto done; 1052 } 1053 1054 fmd_dprintf(FMD_DBG_XPRT, "xprt %u posting %s\n", xip->xi_id, class); 1055 1056 /* 1057 * If a time-to-live value is present in the event and is zero, drop 1058 * the event and bump xs_timeouts. Otherwise decrement the TTL value. 1059 */ 1060 if (nvlist_lookup_uint8(nvl, FMD_EVN_TTL, &ttl) == 0) { 1061 if (ttl == 0) { 1062 fmd_dprintf(FMD_DBG_XPRT, "xprt %u nvlist %p (%s) " 1063 "timeout: event received with ttl=0\n", 1064 xip->xi_id, (void *)nvl, class); 1065 1066 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1067 xip->xi_stats->xs_timeouts.fmds_value.ui64++; 1068 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1069 1070 nvlist_free(nvl); 1071 goto done; 1072 } 1073 (void) nvlist_remove(nvl, FMD_EVN_TTL, DATA_TYPE_UINT8); 1074 (void) nvlist_add_uint8(nvl, FMD_EVN_TTL, ttl - 1); 1075 } 1076 1077 /* 1078 * If we are using the native system clock, the underlying transport 1079 * code can provide a tighter event time bound by telling us when the 1080 * event was enqueued. If we're using simulated clocks, this time 1081 * has no meaning to us, so just reset the value to use HRT_NOW. 1082 */ 1083 if (dp->d_clockops != &fmd_timeops_native) 1084 hrt = FMD_HRT_NOW; 1085 1086 /* 1087 * If an event's class is in the FMD_CTL_CLASS family, then create a 1088 * control event. If a FMD_EVN_TOD member is found, create a protocol 1089 * event using this time. Otherwise create a protocol event using hrt. 1090 */ 1091 if ((isproto = strncmp(class, FMD_CTL_CLASS, FMD_CTL_CLASS_LEN)) == 0) 1092 e = fmd_event_create(FMD_EVT_CTL, hrt, nvl, fmd_ctl_init(nvl)); 1093 else if (nvlist_lookup_uint64_array(nvl, FMD_EVN_TOD, &tod, &n) != 0) 1094 e = fmd_event_create(FMD_EVT_PROTOCOL, hrt, nvl, class); 1095 else { 1096 e = fmd_event_recreate(FMD_EVT_PROTOCOL, 1097 NULL, nvl, class, NULL, 0, 0); 1098 } 1099 1100 /* 1101 * If the debug log is enabled, create a temporary event, log it to the 1102 * debug log, and then reset the underlying state of the event. 1103 */ 1104 if (xip->xi_log != NULL) { 1105 fmd_event_impl_t *ep = (fmd_event_impl_t *)e; 1106 1107 fmd_log_append(xip->xi_log, e, NULL); 1108 1109 ep->ev_flags |= FMD_EVF_VOLATILE; 1110 ep->ev_off = 0; 1111 ep->ev_len = 0; 1112 1113 if (ep->ev_log != NULL) { 1114 fmd_log_rele(ep->ev_log); 1115 ep->ev_log = NULL; 1116 } 1117 } 1118 1119 /* 1120 * Iterate over the rules for the current state trying to match the 1121 * event class to one of our special rules. If a rule is matched, the 1122 * event is consumed and not dispatched to other modules. If the rule 1123 * set ends without matching an event, we fall through to dispatching. 1124 */ 1125 for (xrp = xip->xi_state; xrp->xr_class != NULL; xrp++) { 1126 if (fmd_event_match(e, FMD_EVT_PROTOCOL, xrp->xr_class)) { 1127 fmd_event_hold(e); 1128 xrp->xr_func(xip, nvl); 1129 fmd_event_rele(e); 1130 goto done; 1131 } 1132 } 1133 1134 /* 1135 * Record the event in the errlog if it is an ereport. This code will 1136 * be replaced later with a per-transport intent log instead. 1137 */ 1138 if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_EREPORT_CLASS ".*")) { 1139 (void) pthread_rwlock_rdlock(&dp->d_log_lock); 1140 fmd_log_append(dp->d_errlog, e, NULL); 1141 (void) pthread_rwlock_unlock(&dp->d_log_lock); 1142 } 1143 1144 /* 1145 * If a list.suspect event is received, create a case for the specified 1146 * UUID in the case hash, with the transport module as its owner. If 1147 * the UUID is already known, fmd_case_recreate() will return NULL and 1148 * we simply proceed to our normal event handling regardless. 1149 */ 1150 if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_SUSPECT_CLASS) && 1151 nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 && 1152 nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code) == 0) { 1153 fmd_module_lock(xip->xi_queue->eq_mod); 1154 (void) fmd_case_recreate(xip->xi_queue->eq_mod, 1155 xp, FMD_CASE_SOLVED, uuid, code); 1156 fmd_module_unlock(xip->xi_queue->eq_mod); 1157 } 1158 1159 if (isproto) 1160 fmd_dispq_dispatch(dp->d_disp, e, class); 1161 else 1162 fmd_modhash_dispatch(dp->d_mod_hash, e); 1163 done: 1164 (void) pthread_mutex_lock(&xip->xi_lock); 1165 1166 ASSERT(xip->xi_busy != 0); 1167 xip->xi_busy--; 1168 1169 (void) pthread_cond_broadcast(&xip->xi_cv); 1170 (void) pthread_mutex_unlock(&xip->xi_lock); 1171 } 1172 1173 void 1174 fmd_xprt_uuclose(fmd_xprt_t *xp, const char *uuid) 1175 { 1176 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1177 1178 fmd_event_t *e; 1179 nvlist_t *nvl; 1180 char *s; 1181 1182 fmd_dprintf(FMD_DBG_XPRT, 1183 "xprt %u closing case %s\n", xip->xi_id, uuid); 1184 1185 nvl = fmd_protocol_xprt_uuclose(xip->xi_queue->eq_mod, 1186 "resource.fm.xprt.uuclose", xip->xi_version, uuid); 1187 1188 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 1189 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 1190 fmd_eventq_insert_at_time(xip->xi_queue, e); 1191 } 1192 1193 /* 1194 * Insert the specified class into our remote subscription hash. If the class 1195 * is already present, bump the reference count; otherwise add it to the hash 1196 * and then enqueue an event for our remote peer to proxy our subscription. 1197 */ 1198 void 1199 fmd_xprt_subscribe(fmd_xprt_t *xp, const char *class) 1200 { 1201 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1202 1203 uint_t refs; 1204 nvlist_t *nvl; 1205 fmd_event_t *e; 1206 char *s; 1207 1208 if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 1209 return; /* read-only transports do not proxy subscriptions */ 1210 1211 if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER)) 1212 return; /* transport is not yet an active subscriber */ 1213 1214 (void) pthread_mutex_lock(&xip->xi_lock); 1215 refs = fmd_xprt_class_hash_insert(xip, &xip->xi_rsub, class); 1216 (void) pthread_mutex_unlock(&xip->xi_lock); 1217 1218 if (refs > 1) 1219 return; /* we've already asked our peer for this subscription */ 1220 1221 fmd_dprintf(FMD_DBG_XPRT, 1222 "xprt %u subscribing to %s\n", xip->xi_id, class); 1223 1224 nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod, 1225 "resource.fm.xprt.subscribe", xip->xi_version, class); 1226 1227 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 1228 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 1229 fmd_eventq_insert_at_time(xip->xi_queue, e); 1230 } 1231 1232 /* 1233 * Delete the specified class from the remote subscription hash. If the 1234 * reference count drops to zero, ask our remote peer to unsubscribe by proxy. 1235 */ 1236 void 1237 fmd_xprt_unsubscribe(fmd_xprt_t *xp, const char *class) 1238 { 1239 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1240 1241 uint_t refs; 1242 nvlist_t *nvl; 1243 fmd_event_t *e; 1244 char *s; 1245 1246 if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 1247 return; /* read-only transports do not proxy subscriptions */ 1248 1249 if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER)) 1250 return; /* transport is not yet an active subscriber */ 1251 1252 /* 1253 * If the subscription reference count drops to zero in xi_rsub, insert 1254 * an entry into the xi_usub hash indicating we await an unsuback event. 1255 */ 1256 (void) pthread_mutex_lock(&xip->xi_lock); 1257 1258 if ((refs = fmd_xprt_class_hash_delete(xip, &xip->xi_rsub, class)) == 0) 1259 (void) fmd_xprt_class_hash_insert(xip, &xip->xi_usub, class); 1260 1261 (void) pthread_mutex_unlock(&xip->xi_lock); 1262 1263 if (refs != 0) 1264 return; /* other subscriptions for this class still active */ 1265 1266 fmd_dprintf(FMD_DBG_XPRT, 1267 "xprt %u unsubscribing from %s\n", xip->xi_id, class); 1268 1269 nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod, 1270 "resource.fm.xprt.unsubscribe", xip->xi_version, class); 1271 1272 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 1273 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 1274 fmd_eventq_insert_at_time(xip->xi_queue, e); 1275 } 1276 1277 static void 1278 fmd_xprt_subscribe_xid(fmd_idspace_t *ids, id_t id, void *class) 1279 { 1280 fmd_xprt_t *xp; 1281 1282 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1283 fmd_xprt_subscribe(xp, class); 1284 fmd_idspace_rele(ids, id); 1285 } 1286 } 1287 1288 void 1289 fmd_xprt_subscribe_all(const char *class) 1290 { 1291 fmd_idspace_t *ids = fmd.d_xprt_ids; 1292 1293 if (ids->ids_count != 0) 1294 fmd_idspace_apply(ids, fmd_xprt_subscribe_xid, (void *)class); 1295 } 1296 1297 static void 1298 fmd_xprt_unsubscribe_xid(fmd_idspace_t *ids, id_t id, void *class) 1299 { 1300 fmd_xprt_t *xp; 1301 1302 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1303 fmd_xprt_unsubscribe(xp, class); 1304 fmd_idspace_rele(ids, id); 1305 } 1306 } 1307 1308 void 1309 fmd_xprt_unsubscribe_all(const char *class) 1310 { 1311 fmd_idspace_t *ids = fmd.d_xprt_ids; 1312 1313 if (ids->ids_count != 0) 1314 fmd_idspace_apply(ids, fmd_xprt_unsubscribe_xid, (void *)class); 1315 } 1316 1317 /*ARGSUSED*/ 1318 static void 1319 fmd_xprt_suspend_xid(fmd_idspace_t *ids, id_t id, void *arg) 1320 { 1321 fmd_xprt_t *xp; 1322 1323 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1324 fmd_xprt_xsuspend(xp, FMD_XPRT_DSUSPENDED); 1325 fmd_idspace_rele(ids, id); 1326 } 1327 } 1328 1329 void 1330 fmd_xprt_suspend_all(void) 1331 { 1332 fmd_idspace_t *ids = fmd.d_xprt_ids; 1333 1334 (void) pthread_mutex_lock(&fmd.d_xprt_lock); 1335 1336 if (fmd.d_xprt_suspend++ != 0) { 1337 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1338 return; /* already suspended */ 1339 } 1340 1341 if (ids->ids_count != 0) 1342 fmd_idspace_apply(ids, fmd_xprt_suspend_xid, NULL); 1343 1344 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1345 } 1346 1347 /*ARGSUSED*/ 1348 static void 1349 fmd_xprt_resume_xid(fmd_idspace_t *ids, id_t id, void *arg) 1350 { 1351 fmd_xprt_t *xp; 1352 1353 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1354 fmd_xprt_xresume(xp, FMD_XPRT_DSUSPENDED); 1355 fmd_idspace_rele(ids, id); 1356 } 1357 } 1358 1359 void 1360 fmd_xprt_resume_all(void) 1361 { 1362 fmd_idspace_t *ids = fmd.d_xprt_ids; 1363 1364 (void) pthread_mutex_lock(&fmd.d_xprt_lock); 1365 1366 if (fmd.d_xprt_suspend == 0) 1367 fmd_panic("fmd_xprt_suspend/resume_all mismatch\n"); 1368 1369 if (--fmd.d_xprt_suspend != 0) { 1370 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1371 return; /* not ready to be resumed */ 1372 } 1373 1374 if (ids->ids_count != 0) 1375 fmd_idspace_apply(ids, fmd_xprt_resume_xid, NULL); 1376 1377 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1378 } 1379