1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 23 /* 24 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 25 * Use is subject to license terms. 26 */ 27 28 #pragma ident "%Z%%M% %I% %E% SMI" 29 30 /* 31 * FMD Transport Subsystem 32 * 33 * A transport module uses some underlying mechanism to transport events. 34 * This mechanism may use any underlying link-layer protocol and may support 35 * additional link-layer packets unrelated to FMA. Some appropriate link- 36 * layer mechanism to create the underlying connection is expected to be 37 * called prior to calling fmd_xprt_open() itself. Alternatively, a transport 38 * may be created in the suspended state by specifying the FMD_XPRT_SUSPENDED 39 * flag as part of the call to fmd_xprt_open(), and then may be resumed later. 40 * The underlying transport mechanism is *required* to provide ordering: that 41 * is, the sequences of bytes written across the transport must be read by 42 * the remote peer in the order that they are written, even across separate 43 * calls to fmdo_send(). As an example, the Internet TCP protocol would be 44 * a valid transport as it guarantees ordering, whereas the Internet UDP 45 * protocol would not because UDP datagrams may be delivered in any order 46 * as a result of delays introduced when datagrams pass through routers. 47 * 48 * Similar to sending events, a transport module receives events that are from 49 * its peer remote endpoint using some transport-specific mechanism that is 50 * unknown to FMD. As each event is received, the transport module is 51 * responsible for constructing a valid nvlist_t object from the data and then 52 * calling fmd_xprt_post() to post the event to the containing FMD's dispatch 53 * queue, making it available to all local modules that are not transport 54 * modules that have subscribed to the event. 55 * 56 * The following state machine is used for each transport. The initial state 57 * is either SYN, ACK, or RUN, depending on the flags specified to xprt_create. 58 * 59 * FMD_XPRT_ACCEPT !FMD_XPRT_ACCEPT 60 * | | 61 * waiting +--v--+ +--v--+ waiting 62 * for syn | SYN |--+ --+| ACK | for ack 63 * event +-----+ \ / +-----+ event 64 * | \ / | 65 * drop all +--v--+ X +--v--+ send subscriptions, 66 * events | ERR |<---+ +--->| SUB | recv subscriptions, 67 * +-----+ +-----+ wait for run event 68 * ^ | 69 * | +-----+ | 70 * +-----| RUN |<----+ 71 * +--^--+ 72 * | 73 * FMD_XPRT_RDONLY 74 * 75 * When fmd_xprt_open() is called without FMD_XPRT_ACCEPT, the Common Transport 76 * Layer enqueues a "syn" event for the module in its event queue and sets the 77 * state to ACK. In state ACK, we are waiting for the transport to get an 78 * "ack" event and call fmd_xprt_post() on this event. Other events will be 79 * discarded. If an "ack" is received, we transition to state SUB. If a 80 * configurable timeout occurs or if the "ack" is invalid (e.g. invalid version 81 * exchange), we transition to state ERR. Once in state ERR, no further 82 * operations are valid except fmd_xprt_close() and fmd_xprt_error() will 83 * return a non-zero value to the caller indicating the transport has failed. 84 * 85 * When fmd_xprt_open() is called with FMD_XPRT_ACCEPT, the Common Transport 86 * Layer assumes this transport is being used to accept a virtual connection 87 * from a remote peer that is sending a "syn", and sets the initial state to 88 * SYN. In this state, the transport waits for a "syn" event, validates it, 89 * and then transitions to state SUB if it is valid or state ERR if it is not. 90 * 91 * Once in state SUB, the transport module is expected to receive a sequence of 92 * zero or more "subscribe" events from the remote peer, followed by a "run" 93 * event. Once in state RUN, the transport is active and any events can be 94 * sent or received. The transport module is free to call fmd_xprt_close() 95 * from any state. The fmd_xprt_error() function will return zero if the 96 * transport is not in the ERR state, or non-zero if it is in the ERR state. 97 * 98 * Once the state machine reaches RUN, other FMA protocol events can be sent 99 * and received across the transport in addition to the various control events. 100 * 101 * Table of Common Transport Layer Control Events 102 * ============================================== 103 * 104 * FMA Class Payload 105 * --------- ------- 106 * resource.fm.xprt.uuclose string (uuid of case) 107 * resource.fm.xprt.subscribe string (class pattern) 108 * resource.fm.xprt.unsubscribe string (class pattern) 109 * resource.fm.xprt.unsuback string (class pattern) 110 * resource.fm.xprt.syn version information 111 * resource.fm.xprt.ack version information 112 * resource.fm.xprt.run version information 113 * 114 * Control events are used to add and delete proxy subscriptions on the remote 115 * transport peer module, and to set up connections. When a "syn" event is 116 * sent, FMD will include in the payload the highest version of the FMA event 117 * protocol that is supported by the sender. When a "syn" event is received, 118 * the receiving FMD will use the minimum of this version and its version of 119 * the protocol, and reply with this new minimum version in the "ack" event. 120 * The receiver will then use this new minimum for subsequent event semantics. 121 */ 122 123 #include <sys/fm/protocol.h> 124 #include <strings.h> 125 #include <limits.h> 126 127 #include <fmd_alloc.h> 128 #include <fmd_error.h> 129 #include <fmd_conf.h> 130 #include <fmd_subr.h> 131 #include <fmd_string.h> 132 #include <fmd_protocol.h> 133 #include <fmd_thread.h> 134 #include <fmd_eventq.h> 135 #include <fmd_dispq.h> 136 #include <fmd_ctl.h> 137 #include <fmd_log.h> 138 #include <fmd_ustat.h> 139 #include <fmd_case.h> 140 #include <fmd_api.h> 141 #include <fmd_fmri.h> 142 #include <fmd_asru.h> 143 #include <fmd_xprt.h> 144 145 #include <fmd.h> 146 147 /* 148 * The states shown above in the transport state machine diagram are encoded 149 * using arrays of class patterns and a corresponding action function. These 150 * arrays are then passed to fmd_xprt_transition() to change transport states. 151 */ 152 153 const fmd_xprt_rule_t _fmd_xprt_state_syn[] = { 154 { "resource.fm.xprt.syn", fmd_xprt_event_syn }, 155 { "*", fmd_xprt_event_error }, 156 { NULL, NULL } 157 }; 158 159 const fmd_xprt_rule_t _fmd_xprt_state_ack[] = { 160 { "resource.fm.xprt.ack", fmd_xprt_event_ack }, 161 { "*", fmd_xprt_event_error }, 162 }; 163 164 const fmd_xprt_rule_t _fmd_xprt_state_err[] = { 165 { "*", fmd_xprt_event_drop }, 166 { NULL, NULL } 167 }; 168 169 const fmd_xprt_rule_t _fmd_xprt_state_sub[] = { 170 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub }, 171 { "resource.fm.xprt.run", fmd_xprt_event_run }, 172 { "resource.fm.xprt.*", fmd_xprt_event_error }, 173 { "*", fmd_xprt_event_drop }, 174 { NULL, NULL } 175 }; 176 177 const fmd_xprt_rule_t _fmd_xprt_state_run[] = { 178 { "resource.fm.xprt.subscribe", fmd_xprt_event_sub }, 179 { "resource.fm.xprt.unsubscribe", fmd_xprt_event_unsub }, 180 { "resource.fm.xprt.unsuback", fmd_xprt_event_unsuback }, 181 { "resource.fm.xprt.uuclose", fmd_xprt_event_uuclose }, 182 { "resource.fm.xprt.*", fmd_xprt_event_error }, 183 { NULL, NULL } 184 }; 185 186 /* 187 * Template for per-transport statistics installed by fmd on behalf of each 188 * transport. These are used to initialize the per-transport xi_stats. For 189 * each statistic, the name is prepended with "fmd.xprt.%u", where %u is the 190 * transport ID (xi_id) and then are inserted into the per-module stats hash. 191 * The values in this array must match fmd_xprt_stat_t from <fmd_xprt.h>. 192 */ 193 static const fmd_xprt_stat_t _fmd_xprt_stat_tmpl = { 194 { 195 { "dispatched", FMD_TYPE_UINT64, "total events dispatched to transport" }, 196 { "dequeued", FMD_TYPE_UINT64, "total events dequeued by transport" }, 197 { "prdequeued", FMD_TYPE_UINT64, "protocol events dequeued by transport" }, 198 { "dropped", FMD_TYPE_UINT64, "total events dropped on queue overflow" }, 199 { "wcnt", FMD_TYPE_UINT32, "count of events waiting on queue" }, 200 { "wtime", FMD_TYPE_TIME, "total wait time on queue" }, 201 { "wlentime", FMD_TYPE_TIME, "total wait length * time product" }, 202 { "wlastupdate", FMD_TYPE_TIME, "hrtime of last wait queue update" }, 203 { "dtime", FMD_TYPE_TIME, "total processing time after dequeue" }, 204 { "dlastupdate", FMD_TYPE_TIME, "hrtime of last event dequeue completion" }, 205 }, 206 { "module", FMD_TYPE_STRING, "module that owns this transport" }, 207 { "authority", FMD_TYPE_STRING, "authority associated with this transport" }, 208 { "state", FMD_TYPE_STRING, "current transport state" }, 209 { "received", FMD_TYPE_UINT64, "events received by transport" }, 210 { "discarded", FMD_TYPE_UINT64, "bad events discarded by transport" }, 211 { "retried", FMD_TYPE_UINT64, "retries requested of transport" }, 212 { "replayed", FMD_TYPE_UINT64, "events replayed by transport" }, 213 { "lost", FMD_TYPE_UINT64, "events lost by transport" }, 214 { "timeouts", FMD_TYPE_UINT64, "events received by transport with ttl=0" }, 215 { "subscriptions", FMD_TYPE_UINT64, "subscriptions registered to transport" }, 216 }; 217 218 static void 219 fmd_xprt_class_hash_create(fmd_xprt_class_hash_t *xch, fmd_eventq_t *eq) 220 { 221 uint_t hashlen = fmd.d_str_buckets; 222 223 xch->xch_queue = eq; 224 xch->xch_hashlen = hashlen; 225 xch->xch_hash = fmd_zalloc(sizeof (void *) * hashlen, FMD_SLEEP); 226 } 227 228 static void 229 fmd_xprt_class_hash_destroy(fmd_xprt_class_hash_t *xch) 230 { 231 fmd_eventq_t *eq = xch->xch_queue; 232 fmd_xprt_class_t *xcp, *ncp; 233 uint_t i; 234 235 for (i = 0; i < xch->xch_hashlen; i++) { 236 for (xcp = xch->xch_hash[i]; xcp != NULL; xcp = ncp) { 237 ncp = xcp->xc_next; 238 239 if (eq != NULL) 240 fmd_dispq_delete(fmd.d_disp, eq, xcp->xc_class); 241 242 fmd_strfree(xcp->xc_class); 243 fmd_free(xcp, sizeof (fmd_xprt_class_t)); 244 } 245 } 246 247 fmd_free(xch->xch_hash, sizeof (void *) * xch->xch_hashlen); 248 } 249 250 /* 251 * Insert the specified class into the specified class hash, and return the 252 * reference count. A return value of one indicates this is the first insert. 253 * If an eventq is associated with the hash, insert a dispq subscription for it. 254 */ 255 static uint_t 256 fmd_xprt_class_hash_insert(fmd_xprt_impl_t *xip, 257 fmd_xprt_class_hash_t *xch, const char *class) 258 { 259 uint_t h = fmd_strhash(class) % xch->xch_hashlen; 260 fmd_xprt_class_t *xcp; 261 262 ASSERT(MUTEX_HELD(&xip->xi_lock)); 263 264 for (xcp = xch->xch_hash[h]; xcp != NULL; xcp = xcp->xc_next) { 265 if (strcmp(class, xcp->xc_class) == 0) 266 return (++xcp->xc_refs); 267 } 268 269 xcp = fmd_alloc(sizeof (fmd_xprt_class_t), FMD_SLEEP); 270 xcp->xc_class = fmd_strdup(class, FMD_SLEEP); 271 xcp->xc_next = xch->xch_hash[h]; 272 xcp->xc_refs = 1; 273 xch->xch_hash[h] = xcp; 274 275 if (xch->xch_queue != NULL) 276 fmd_dispq_insert(fmd.d_disp, xch->xch_queue, class); 277 278 return (xcp->xc_refs); 279 } 280 281 /* 282 * Delete the specified class from the specified class hash, and return the 283 * reference count. A return value of zero indicates the class was deleted. 284 * If an eventq is associated with the hash, delete the dispq subscription. 285 */ 286 static uint_t 287 fmd_xprt_class_hash_delete(fmd_xprt_impl_t *xip, 288 fmd_xprt_class_hash_t *xch, const char *class) 289 { 290 uint_t h = fmd_strhash(class) % xch->xch_hashlen; 291 fmd_xprt_class_t *xcp, **pp; 292 293 ASSERT(MUTEX_HELD(&xip->xi_lock)); 294 pp = &xch->xch_hash[h]; 295 296 for (xcp = *pp; xcp != NULL; xcp = xcp->xc_next) { 297 if (strcmp(class, xcp->xc_class) == 0) 298 break; 299 else 300 pp = &xcp->xc_next; 301 } 302 303 if (xcp == NULL) 304 return (-1U); /* explicitly permit an invalid delete */ 305 306 if (--xcp->xc_refs != 0) 307 return (xcp->xc_refs); 308 309 ASSERT(xcp->xc_refs == 0); 310 *pp = xcp->xc_next; 311 312 fmd_strfree(xcp->xc_class); 313 fmd_free(xcp, sizeof (fmd_xprt_class_t)); 314 315 if (xch->xch_queue != NULL) 316 fmd_dispq_delete(fmd.d_disp, xch->xch_queue, class); 317 318 return (0); 319 } 320 321 /* 322 * Queue subscribe events for the specified transport corresponding to all of 323 * the active module subscriptions. This is an extremely heavyweight operation 324 * that we expect to take place rarely (i.e. when loading a transport module 325 * or when it establishes a connection). We lock all of the known modules to 326 * prevent them from adding or deleting subscriptions, then snapshot their 327 * subscriptions, and then unlock all of the modules. We hold the modhash 328 * lock for the duration of this operation to prevent new modules from loading. 329 */ 330 static void 331 fmd_xprt_subscribe_modhash(fmd_xprt_impl_t *xip, fmd_modhash_t *mhp) 332 { 333 fmd_xprt_t *xp = (fmd_xprt_t *)xip; 334 const fmd_conf_path_t *pap; 335 fmd_module_t *mp; 336 uint_t i, j; 337 338 (void) pthread_rwlock_rdlock(&mhp->mh_lock); 339 340 for (i = 0; i < mhp->mh_hashlen; i++) { 341 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) 342 fmd_module_lock(mp); 343 } 344 345 (void) pthread_mutex_lock(&xip->xi_lock); 346 ASSERT(!(xip->xi_flags & FMD_XPRT_SUBSCRIBER)); 347 xip->xi_flags |= FMD_XPRT_SUBSCRIBER; 348 (void) pthread_mutex_unlock(&xip->xi_lock); 349 350 for (i = 0; i < mhp->mh_hashlen; i++) { 351 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) { 352 (void) fmd_conf_getprop(mp->mod_conf, 353 FMD_PROP_SUBSCRIPTIONS, &pap); 354 for (j = 0; j < pap->cpa_argc; j++) 355 fmd_xprt_subscribe(xp, pap->cpa_argv[j]); 356 } 357 } 358 359 for (i = 0; i < mhp->mh_hashlen; i++) { 360 for (mp = mhp->mh_hash[i]; mp != NULL; mp = mp->mod_next) 361 fmd_module_unlock(mp); 362 } 363 364 (void) pthread_rwlock_unlock(&mhp->mh_lock); 365 } 366 367 static void 368 fmd_xprt_transition(fmd_xprt_impl_t *xip, 369 const fmd_xprt_rule_t *state, const char *tag) 370 { 371 fmd_event_t *e; 372 nvlist_t *nvl; 373 char *s; 374 375 TRACE((FMD_DBG_XPRT, "xprt %u -> %s\n", xip->xi_id, tag)); 376 377 xip->xi_state = state; 378 s = fmd_strdup(tag, FMD_SLEEP); 379 380 (void) pthread_mutex_lock(&xip->xi_stats_lock); 381 fmd_strfree(xip->xi_stats->xs_state.fmds_value.str); 382 xip->xi_stats->xs_state.fmds_value.str = s; 383 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 384 385 /* 386 * If we've reached the SUB state, take out the big hammer and snapshot 387 * all of the subscriptions of all of the loaded modules. Then queue a 388 * run event for our remote peer indicating that it can enter RUN. 389 */ 390 if (state == _fmd_xprt_state_sub) { 391 fmd_xprt_subscribe_modhash(xip, fmd.d_mod_hash); 392 393 nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod, 394 "resource.fm.xprt.run", xip->xi_version); 395 396 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 397 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 398 fmd_eventq_insert_at_time(xip->xi_queue, e); 399 } 400 } 401 402 static void 403 fmd_xprt_authupdate(fmd_xprt_impl_t *xip) 404 { 405 char *s = fmd_fmri_auth2str(xip->xi_auth); 406 407 (void) pthread_mutex_lock(&xip->xi_stats_lock); 408 fmd_strfree(xip->xi_stats->xs_authority.fmds_value.str); 409 xip->xi_stats->xs_authority.fmds_value.str = s; 410 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 411 } 412 413 static int 414 fmd_xprt_vmismatch(fmd_xprt_impl_t *xip, nvlist_t *nvl, uint_t *rversionp) 415 { 416 uint8_t rversion; 417 418 if (nvlist_lookup_uint8(nvl, FM_VERSION, &rversion) != 0) { 419 (void) pthread_mutex_lock(&xip->xi_stats_lock); 420 xip->xi_stats->xs_discarded.fmds_value.ui64++; 421 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 422 423 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR"); 424 return (1); 425 } 426 427 if (rversion > xip->xi_version) { 428 fmd_dprintf(FMD_DBG_XPRT, "xprt %u protocol mismatch: %u>%u\n", 429 xip->xi_id, rversion, xip->xi_version); 430 431 (void) pthread_mutex_lock(&xip->xi_stats_lock); 432 xip->xi_stats->xs_discarded.fmds_value.ui64++; 433 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 434 435 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR"); 436 return (1); 437 } 438 439 if (rversionp != NULL) 440 *rversionp = rversion; 441 442 return (0); 443 } 444 445 void 446 fmd_xprt_event_syn(fmd_xprt_impl_t *xip, nvlist_t *nvl) 447 { 448 fmd_event_t *e; 449 uint_t vers; 450 char *class; 451 452 if (fmd_xprt_vmismatch(xip, nvl, &vers)) 453 return; /* transitioned to error state */ 454 455 /* 456 * If the transport module didn't specify an authority, extract the 457 * one that is passed along with the xprt.syn event and use that. 458 */ 459 if (xip->xi_auth == NULL && 460 nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 && 461 nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) { 462 (void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva); 463 fmd_xprt_authupdate(xip); 464 } 465 466 nvl = fmd_protocol_xprt_ctl(xip->xi_queue->eq_mod, 467 "resource.fm.xprt.ack", xip->xi_version); 468 469 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 470 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 471 fmd_eventq_insert_at_time(xip->xi_queue, e); 472 473 xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers); 474 fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB"); 475 } 476 477 void 478 fmd_xprt_event_ack(fmd_xprt_impl_t *xip, nvlist_t *nvl) 479 { 480 uint_t vers; 481 482 if (fmd_xprt_vmismatch(xip, nvl, &vers)) 483 return; /* transitioned to error state */ 484 485 /* 486 * If the transport module didn't specify an authority, extract the 487 * one that is passed along with the xprt.syn event and use that. 488 */ 489 if (xip->xi_auth == NULL && 490 nvlist_lookup_nvlist(nvl, FM_RSRC_RESOURCE, &nvl) == 0 && 491 nvlist_lookup_nvlist(nvl, FM_FMRI_AUTHORITY, &nvl) == 0) { 492 (void) nvlist_xdup(nvl, &xip->xi_auth, &fmd.d_nva); 493 fmd_xprt_authupdate(xip); 494 } 495 496 xip->xi_version = MIN(FM_RSRC_XPRT_VERSION, vers); 497 fmd_xprt_transition(xip, _fmd_xprt_state_sub, "SUB"); 498 } 499 500 /* 501 * Upon transition to RUN, we take every solved case and resend a list.suspect 502 * event for it to our remote peer. If a case transitions from solved to a 503 * future state (CLOSE_WAIT, CLOSED, or REPAIRED) while we are iterating over 504 * the case hash, we will get it as part of examining the resource cache, next. 505 */ 506 static void 507 fmd_xprt_send_case(fmd_case_t *cp, void *arg) 508 { 509 fmd_case_impl_t *cip = (fmd_case_impl_t *)cp; 510 fmd_xprt_impl_t *xip = arg; 511 512 fmd_event_t *e; 513 nvlist_t *nvl; 514 char *class; 515 516 if (cip->ci_state != FMD_CASE_SOLVED) 517 return; /* unsolved, or we'll get it during the ASRU pass */ 518 519 nvl = fmd_case_mkevent(cp, FM_LIST_SUSPECT_CLASS); 520 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 521 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 522 523 fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n", 524 FM_LIST_SUSPECT_CLASS, cip->ci_uuid, xip->xi_id); 525 526 fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid); 527 } 528 529 /* 530 * Upon transition to RUN, we take every ASRU which is in the degraded state 531 * and resend a fault.* event for it to our remote peer, in case the peer is 532 * running in the fault manager that knows how to disable this resource. If 533 * any new resources are added to the cache during our iteration, this is no 534 * problem because our subscriptions are already proxied and so any new cases 535 * will result in a list.suspect event being transported if that is needed. 536 */ 537 static void 538 fmd_xprt_send_asru(fmd_asru_t *ap, void *arg) 539 { 540 fmd_xprt_impl_t *xip = arg; 541 nvlist_t *nvl = NULL; 542 fmd_event_t *e; 543 char *class; 544 545 (void) pthread_mutex_lock(&ap->asru_lock); 546 547 if ((ap->asru_flags & (FMD_ASRU_INTERNAL | FMD_ASRU_STATE)) == 548 FMD_ASRU_FAULTY && fmd_case_orphaned(ap->asru_case)) 549 (void) nvlist_xdup(ap->asru_event, &nvl, &fmd.d_nva); 550 551 (void) pthread_mutex_unlock(&ap->asru_lock); 552 553 if (nvl == NULL) 554 return; /* asru is internal, unusable, or not faulty */ 555 556 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 557 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 558 559 fmd_dprintf(FMD_DBG_XPRT, "re-send %s for %s to transport %u\n", 560 class, ap->asru_name, xip->xi_id); 561 562 fmd_dispq_dispatch_gid(fmd.d_disp, e, class, xip->xi_queue->eq_sgid); 563 } 564 565 void 566 fmd_xprt_event_run(fmd_xprt_impl_t *xip, nvlist_t *nvl) 567 { 568 if (!fmd_xprt_vmismatch(xip, nvl, NULL)) { 569 fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN"); 570 fmd_case_hash_apply(fmd.d_cases, fmd_xprt_send_case, xip); 571 fmd_asru_hash_apply(fmd.d_asrus, fmd_xprt_send_asru, xip); 572 } 573 } 574 575 void 576 fmd_xprt_event_sub(fmd_xprt_impl_t *xip, nvlist_t *nvl) 577 { 578 char *class; 579 580 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 581 return; /* transitioned to error state */ 582 583 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0) 584 return; /* malformed protocol event */ 585 586 (void) pthread_mutex_lock(&xip->xi_lock); 587 (void) fmd_xprt_class_hash_insert(xip, &xip->xi_lsub, class); 588 (void) pthread_mutex_unlock(&xip->xi_lock); 589 590 (void) pthread_mutex_lock(&xip->xi_stats_lock); 591 xip->xi_stats->xs_subscriptions.fmds_value.ui64++; 592 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 593 } 594 595 void 596 fmd_xprt_event_unsub(fmd_xprt_impl_t *xip, nvlist_t *nvl) 597 { 598 fmd_event_t *e; 599 char *class; 600 601 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 602 return; /* transitioned to error state */ 603 604 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0) 605 return; /* malformed protocol event */ 606 607 (void) pthread_mutex_lock(&xip->xi_lock); 608 (void) fmd_xprt_class_hash_delete(xip, &xip->xi_lsub, class); 609 (void) pthread_mutex_unlock(&xip->xi_lock); 610 611 (void) pthread_mutex_lock(&xip->xi_stats_lock); 612 xip->xi_stats->xs_subscriptions.fmds_value.ui64--; 613 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 614 615 nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod, 616 "resource.fm.xprt.unsuback", xip->xi_version, class); 617 618 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 619 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, class); 620 fmd_eventq_insert_at_time(xip->xi_queue, e); 621 } 622 623 void 624 fmd_xprt_event_unsuback(fmd_xprt_impl_t *xip, nvlist_t *nvl) 625 { 626 char *class; 627 628 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 629 return; /* transitioned to error state */ 630 631 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_SUBCLASS, &class) != 0) 632 return; /* malformed protocol event */ 633 634 (void) pthread_mutex_lock(&xip->xi_lock); 635 (void) fmd_xprt_class_hash_delete(xip, &xip->xi_usub, class); 636 (void) pthread_mutex_unlock(&xip->xi_lock); 637 } 638 639 void 640 fmd_xprt_event_uuclose(fmd_xprt_impl_t *xip, nvlist_t *nvl) 641 { 642 fmd_case_t *cp; 643 char *uuid; 644 645 if (fmd_xprt_vmismatch(xip, nvl, NULL)) 646 return; /* transitioned to error state */ 647 648 if (nvlist_lookup_string(nvl, FM_RSRC_XPRT_UUID, &uuid) == 0 && 649 (cp = fmd_case_hash_lookup(fmd.d_cases, uuid)) != NULL) { 650 fmd_case_transition(cp, FMD_CASE_CLOSE_WAIT, FMD_CF_ISOLATED); 651 fmd_case_rele(cp); 652 } 653 } 654 655 void 656 fmd_xprt_event_error(fmd_xprt_impl_t *xip, nvlist_t *nvl) 657 { 658 char *class = "<unknown>"; 659 660 (void) pthread_mutex_lock(&xip->xi_stats_lock); 661 xip->xi_stats->xs_discarded.fmds_value.ui64++; 662 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 663 664 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 665 TRACE((FMD_DBG_XPRT, "xprt %u bad event %s\n", xip->xi_id, class)); 666 667 fmd_xprt_transition(xip, _fmd_xprt_state_err, "ERR"); 668 } 669 670 void 671 fmd_xprt_event_drop(fmd_xprt_impl_t *xip, nvlist_t *nvl) 672 { 673 char *class = "<unknown>"; 674 675 (void) pthread_mutex_lock(&xip->xi_stats_lock); 676 xip->xi_stats->xs_discarded.fmds_value.ui64++; 677 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 678 679 (void) nvlist_lookup_string(nvl, FM_CLASS, &class); 680 TRACE((FMD_DBG_XPRT, "xprt %u drop event %s\n", xip->xi_id, class)); 681 682 } 683 684 fmd_xprt_t * 685 fmd_xprt_create(fmd_module_t *mp, uint_t flags, nvlist_t *auth, void *data) 686 { 687 fmd_xprt_impl_t *xip = fmd_zalloc(sizeof (fmd_xprt_impl_t), FMD_SLEEP); 688 fmd_stat_t *statv; 689 uint_t i, statc; 690 691 char buf[PATH_MAX]; 692 fmd_event_t *e; 693 nvlist_t *nvl; 694 char *s; 695 696 (void) pthread_mutex_init(&xip->xi_lock, NULL); 697 (void) pthread_cond_init(&xip->xi_cv, NULL); 698 (void) pthread_mutex_init(&xip->xi_stats_lock, NULL); 699 700 xip->xi_auth = auth; 701 xip->xi_data = data; 702 xip->xi_version = FM_RSRC_XPRT_VERSION; 703 xip->xi_flags = flags; 704 705 /* 706 * Grab fmd.d_xprt_lock to block fmd_xprt_suspend_all() and then create 707 * a transport ID and make it visible in fmd.d_xprt_ids. If transports 708 * were previously suspended, set the FMD_XPRT_DSUSPENDED flag on us to 709 * ensure that this transport will not run until fmd_xprt_resume_all(). 710 */ 711 (void) pthread_mutex_lock(&fmd.d_xprt_lock); 712 xip->xi_id = fmd_idspace_alloc(fmd.d_xprt_ids, xip); 713 714 if (fmd.d_xprt_suspend != 0) 715 xip->xi_flags |= FMD_XPRT_DSUSPENDED; 716 717 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 718 719 /* 720 * If the module has not yet finished _fmd_init(), set the ISUSPENDED 721 * bit so that fmdo_send() is not called until _fmd_init() completes. 722 */ 723 if (!(mp->mod_flags & FMD_MOD_INIT)) 724 xip->xi_flags |= FMD_XPRT_ISUSPENDED; 725 726 /* 727 * Initialize the transport statistics that we keep on behalf of fmd. 728 * These are set up using a template defined at the top of this file. 729 * We rename each statistic with a prefix ensuring its uniqueness. 730 */ 731 statc = sizeof (_fmd_xprt_stat_tmpl) / sizeof (fmd_stat_t); 732 statv = fmd_alloc(sizeof (_fmd_xprt_stat_tmpl), FMD_SLEEP); 733 bcopy(&_fmd_xprt_stat_tmpl, statv, sizeof (_fmd_xprt_stat_tmpl)); 734 735 for (i = 0; i < statc; i++) { 736 (void) snprintf(statv[i].fmds_name, 737 sizeof (statv[i].fmds_name), "fmd.xprt.%u.%s", xip->xi_id, 738 ((fmd_stat_t *)&_fmd_xprt_stat_tmpl + i)->fmds_name); 739 } 740 741 xip->xi_stats = (fmd_xprt_stat_t *)fmd_ustat_insert( 742 mp->mod_ustat, FMD_USTAT_NOALLOC, statc, statv, NULL); 743 744 if (xip->xi_stats == NULL) 745 fmd_panic("failed to create xi_stats (%p)\n", (void *)statv); 746 747 xip->xi_stats->xs_module.fmds_value.str = 748 fmd_strdup(mp->mod_name, FMD_SLEEP); 749 750 if (xip->xi_auth != NULL) 751 fmd_xprt_authupdate(xip); 752 753 /* 754 * Create the outbound eventq for this transport and link to its stats. 755 * If any suspend bits were set above, suspend the eventq immediately. 756 */ 757 xip->xi_queue = fmd_eventq_create(mp, &xip->xi_stats->xs_evqstat, 758 &xip->xi_stats_lock, mp->mod_stats->ms_xprtqlimit.fmds_value.ui32); 759 760 if (xip->xi_flags & FMD_XPRT_SMASK) 761 fmd_eventq_suspend(xip->xi_queue); 762 763 /* 764 * Create our subscription hashes: local subscriptions go to xi_queue, 765 * remote subscriptions are tracked only for protocol requests, and 766 * pending unsubscriptions are associated with the /dev/null eventq. 767 */ 768 fmd_xprt_class_hash_create(&xip->xi_lsub, xip->xi_queue); 769 fmd_xprt_class_hash_create(&xip->xi_rsub, NULL); 770 fmd_xprt_class_hash_create(&xip->xi_usub, fmd.d_rmod->mod_queue); 771 772 /* 773 * Determine our initial state based upon the creation flags. If we're 774 * read-only, go directly to RUN. If we're accepting a new connection, 775 * wait for a SYN. Otherwise send a SYN and wait for an ACK. 776 */ 777 if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 778 fmd_xprt_transition(xip, _fmd_xprt_state_run, "RUN"); 779 else if (flags & FMD_XPRT_ACCEPT) 780 fmd_xprt_transition(xip, _fmd_xprt_state_syn, "SYN"); 781 else 782 fmd_xprt_transition(xip, _fmd_xprt_state_ack, "ACK"); 783 784 /* 785 * If client.xprtlog is set to TRUE, create a debugging log for the 786 * events received by the transport in var/fm/fmd/xprt/. 787 */ 788 (void) fmd_conf_getprop(fmd.d_conf, "client.xprtlog", &i); 789 (void) fmd_conf_getprop(fmd.d_conf, "log.xprt", &s); 790 791 if (i) { 792 (void) snprintf(buf, sizeof (buf), "%s/%u.log", s, xip->xi_id); 793 xip->xi_log = fmd_log_open(fmd.d_rootdir, buf, FMD_LOG_XPRT); 794 } 795 796 ASSERT(fmd_module_locked(mp)); 797 fmd_list_append(&mp->mod_transports, xip); 798 799 (void) pthread_mutex_lock(&mp->mod_stats_lock); 800 mp->mod_stats->ms_xprtopen.fmds_value.ui32++; 801 (void) pthread_mutex_unlock(&mp->mod_stats_lock); 802 803 /* 804 * If this is a read-only transport, return without creating a send 805 * queue thread and setting up any connection events in our queue. 806 */ 807 if ((flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 808 goto out; 809 810 /* 811 * Once the transport is fully initialized, create a send queue thread 812 * and start any connect events flowing to complete our initialization. 813 */ 814 if ((xip->xi_thread = fmd_thread_create(mp, 815 (fmd_thread_f *)fmd_xprt_send, xip)) == NULL) { 816 817 fmd_error(EFMD_XPRT_THR, 818 "failed to create thread for transport %u", xip->xi_id); 819 820 fmd_xprt_destroy((fmd_xprt_t *)xip); 821 (void) fmd_set_errno(EFMD_XPRT_THR); 822 return (NULL); 823 } 824 825 /* 826 * If the transport is not being opened to accept an inbound connect, 827 * start an outbound connection by enqueuing a SYN event for our peer. 828 */ 829 if (!(flags & FMD_XPRT_ACCEPT)) { 830 nvl = fmd_protocol_xprt_ctl(mp, 831 "resource.fm.xprt.syn", FM_RSRC_XPRT_VERSION); 832 833 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 834 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 835 fmd_eventq_insert_at_time(xip->xi_queue, e); 836 } 837 out: 838 fmd_dprintf(FMD_DBG_XPRT, "opened transport %u\n", xip->xi_id); 839 return ((fmd_xprt_t *)xip); 840 } 841 842 void 843 fmd_xprt_destroy(fmd_xprt_t *xp) 844 { 845 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 846 fmd_module_t *mp = xip->xi_queue->eq_mod; 847 uint_t id = xip->xi_id; 848 849 fmd_case_impl_t *cip, *nip; 850 fmd_stat_t *sp; 851 uint_t i, n; 852 853 ASSERT(fmd_module_locked(mp)); 854 fmd_list_delete(&mp->mod_transports, xip); 855 856 (void) pthread_mutex_lock(&mp->mod_stats_lock); 857 mp->mod_stats->ms_xprtopen.fmds_value.ui32--; 858 (void) pthread_mutex_unlock(&mp->mod_stats_lock); 859 860 (void) pthread_mutex_lock(&xip->xi_lock); 861 862 while (xip->xi_busy != 0) 863 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock); 864 865 /* 866 * Remove the transport from global visibility, cancel its send-side 867 * thread, join with it, and then remove the transport from module 868 * visibility. Once all this is done, destroy and free the transport. 869 */ 870 (void) fmd_idspace_free(fmd.d_xprt_ids, xip->xi_id); 871 872 if (xip->xi_thread != NULL) { 873 fmd_eventq_abort(xip->xi_queue); 874 fmd_thread_destroy(xip->xi_thread, FMD_THREAD_JOIN); 875 } 876 877 if (xip->xi_log != NULL) 878 fmd_log_rele(xip->xi_log); 879 880 /* 881 * Release every case handle in the module that was cached by this 882 * transport. This will result in these cases disappearing from the 883 * local case hash so that fmd_case_uuclose() can no longer be used. 884 */ 885 for (cip = fmd_list_next(&mp->mod_cases); cip != NULL; cip = nip) { 886 nip = fmd_list_next(cip); 887 if (cip->ci_xprt == xp) 888 fmd_case_discard((fmd_case_t *)cip); 889 } 890 891 /* 892 * Destroy every class in the various subscription hashes and remove 893 * any corresponding subscriptions from the event dispatch queue. 894 */ 895 fmd_xprt_class_hash_destroy(&xip->xi_lsub); 896 fmd_xprt_class_hash_destroy(&xip->xi_rsub); 897 fmd_xprt_class_hash_destroy(&xip->xi_usub); 898 899 /* 900 * Since our statistics are created by hand, after deleting them from 901 * the ustat hash we must manually free them and any embedded strings. 902 */ 903 fmd_ustat_delete(mp->mod_ustat, sizeof (_fmd_xprt_stat_tmpl) / 904 sizeof (fmd_stat_t), (fmd_stat_t *)&_fmd_xprt_stat_tmpl); 905 906 sp = (fmd_stat_t *)xip->xi_stats; 907 n = sizeof (fmd_xprt_stat_t) / sizeof (fmd_stat_t); 908 909 for (i = 0; i < n; i++, sp++) { 910 if (sp->fmds_type == FMD_TYPE_STRING) 911 fmd_strfree(sp->fmds_value.str); 912 } 913 914 fmd_free(xip->xi_stats, sizeof (fmd_xprt_stat_t)); 915 fmd_eventq_destroy(xip->xi_queue); 916 nvlist_free(xip->xi_auth); 917 fmd_free(xip, sizeof (fmd_xprt_impl_t)); 918 919 fmd_dprintf(FMD_DBG_XPRT, "closed transport %u\n", id); 920 } 921 922 void 923 fmd_xprt_xsuspend(fmd_xprt_t *xp, uint_t flags) 924 { 925 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 926 uint_t oflags; 927 928 ASSERT((flags & ~FMD_XPRT_SMASK) == 0); 929 (void) pthread_mutex_lock(&xip->xi_lock); 930 931 oflags = xip->xi_flags; 932 xip->xi_flags |= flags; 933 934 if (!(oflags & FMD_XPRT_SMASK) && (xip->xi_flags & FMD_XPRT_SMASK) != 0) 935 fmd_eventq_suspend(xip->xi_queue); 936 937 (void) pthread_cond_broadcast(&xip->xi_cv); 938 939 while (xip->xi_busy != 0) 940 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock); 941 942 (void) pthread_mutex_unlock(&xip->xi_lock); 943 } 944 945 void 946 fmd_xprt_xresume(fmd_xprt_t *xp, uint_t flags) 947 { 948 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 949 uint_t oflags; 950 951 ASSERT((flags & ~FMD_XPRT_SMASK) == 0); 952 (void) pthread_mutex_lock(&xip->xi_lock); 953 954 oflags = xip->xi_flags; 955 xip->xi_flags &= ~flags; 956 957 if ((oflags & FMD_XPRT_SMASK) != 0 && !(xip->xi_flags & FMD_XPRT_SMASK)) 958 fmd_eventq_resume(xip->xi_queue); 959 960 (void) pthread_cond_broadcast(&xip->xi_cv); 961 (void) pthread_mutex_unlock(&xip->xi_lock); 962 } 963 964 void 965 fmd_xprt_send(fmd_xprt_t *xp) 966 { 967 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 968 fmd_module_t *mp = xip->xi_queue->eq_mod; 969 fmd_event_t *ep; 970 int err; 971 972 while ((ep = fmd_eventq_delete(xip->xi_queue)) != NULL) { 973 if (FMD_EVENT_TTL(ep) == 0) { 974 fmd_event_rele(ep); 975 continue; 976 } 977 978 fmd_dprintf(FMD_DBG_XPRT, "xprt %u sending %s\n", 979 xip->xi_id, (char *)FMD_EVENT_DATA(ep)); 980 981 err = mp->mod_ops->mop_transport(mp, xp, ep); 982 fmd_eventq_done(xip->xi_queue); 983 984 if (err == FMD_SEND_RETRY) { 985 fmd_eventq_insert_at_time(xip->xi_queue, ep); 986 (void) pthread_mutex_lock(&xip->xi_stats_lock); 987 xip->xi_stats->xs_retried.fmds_value.ui64++; 988 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 989 } 990 991 if (err != FMD_SEND_SUCCESS && err != FMD_SEND_RETRY) { 992 (void) pthread_mutex_lock(&xip->xi_stats_lock); 993 xip->xi_stats->xs_lost.fmds_value.ui64++; 994 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 995 } 996 997 fmd_event_rele(ep); 998 } 999 } 1000 1001 void 1002 fmd_xprt_recv(fmd_xprt_t *xp, nvlist_t *nvl, hrtime_t hrt) 1003 { 1004 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1005 const fmd_xprt_rule_t *xrp; 1006 fmd_t *dp = &fmd; 1007 1008 fmd_event_t *e; 1009 char *class, *uuid, *code; 1010 int isproto; 1011 1012 uint64_t *tod; 1013 uint8_t ttl; 1014 uint_t n; 1015 1016 /* 1017 * Grab the transport lock and set the busy flag to indicate we are 1018 * busy receiving an event. If [DI]SUSPEND is pending, wait until fmd 1019 * resumes the transport before continuing on with the receive. 1020 */ 1021 (void) pthread_mutex_lock(&xip->xi_lock); 1022 1023 while (xip->xi_flags & (FMD_XPRT_DSUSPENDED | FMD_XPRT_ISUSPENDED)) 1024 (void) pthread_cond_wait(&xip->xi_cv, &xip->xi_lock); 1025 1026 xip->xi_busy++; 1027 ASSERT(xip->xi_busy != 0); 1028 1029 (void) pthread_mutex_unlock(&xip->xi_lock); 1030 1031 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1032 xip->xi_stats->xs_received.fmds_value.ui64++; 1033 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1034 1035 if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0) { 1036 fmd_error(EFMD_XPRT_PAYLOAD, "discarding nvlist %p: missing " 1037 "required \"%s\" payload element", (void *)nvl, FM_CLASS); 1038 1039 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1040 xip->xi_stats->xs_discarded.fmds_value.ui64++; 1041 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1042 1043 nvlist_free(nvl); 1044 goto done; 1045 } 1046 1047 fmd_dprintf(FMD_DBG_XPRT, "xprt %u posting %s\n", xip->xi_id, class); 1048 1049 /* 1050 * If a time-to-live value is present in the event and is zero, drop 1051 * the event and bump xs_timeouts. Otherwise decrement the TTL value. 1052 */ 1053 if (nvlist_lookup_uint8(nvl, FMD_EVN_TTL, &ttl) == 0) { 1054 if (ttl == 0) { 1055 fmd_dprintf(FMD_DBG_XPRT, "xprt %u nvlist %p (%s) " 1056 "timeout: event received with ttl=0\n", 1057 xip->xi_id, (void *)nvl, class); 1058 1059 (void) pthread_mutex_lock(&xip->xi_stats_lock); 1060 xip->xi_stats->xs_timeouts.fmds_value.ui64++; 1061 (void) pthread_mutex_unlock(&xip->xi_stats_lock); 1062 1063 nvlist_free(nvl); 1064 goto done; 1065 } 1066 (void) nvlist_remove(nvl, FMD_EVN_TTL, DATA_TYPE_UINT8); 1067 (void) nvlist_add_uint8(nvl, FMD_EVN_TTL, ttl - 1); 1068 } 1069 1070 /* 1071 * If we are using the native system clock, the underlying transport 1072 * code can provide a tighter event time bound by telling us when the 1073 * event was enqueued. If we're using simulated clocks, this time 1074 * has no meaning to us, so just reset the value to use HRT_NOW. 1075 */ 1076 if (dp->d_clockops != &fmd_timeops_native) 1077 hrt = FMD_HRT_NOW; 1078 1079 /* 1080 * If an event's class is in the FMD_CTL_CLASS family, then create a 1081 * control event. If a FMD_EVN_TOD member is found, create a protocol 1082 * event using this time. Otherwise create a protocol event using hrt. 1083 */ 1084 if ((isproto = strncmp(class, FMD_CTL_CLASS, FMD_CTL_CLASS_LEN)) == 0) 1085 e = fmd_event_create(FMD_EVT_CTL, hrt, nvl, fmd_ctl_init(nvl)); 1086 else if (nvlist_lookup_uint64_array(nvl, FMD_EVN_TOD, &tod, &n) != 0) 1087 e = fmd_event_create(FMD_EVT_PROTOCOL, hrt, nvl, class); 1088 else { 1089 e = fmd_event_recreate(FMD_EVT_PROTOCOL, 1090 NULL, nvl, class, NULL, 0, 0); 1091 } 1092 1093 /* 1094 * If the debug log is enabled, create a temporary event, log it to the 1095 * debug log, and then reset the underlying state of the event. 1096 */ 1097 if (xip->xi_log != NULL) { 1098 fmd_event_impl_t *ep = (fmd_event_impl_t *)e; 1099 1100 fmd_log_append(xip->xi_log, e, NULL); 1101 1102 ep->ev_flags |= FMD_EVF_VOLATILE; 1103 ep->ev_off = 0; 1104 ep->ev_len = 0; 1105 1106 if (ep->ev_log != NULL) { 1107 fmd_log_rele(ep->ev_log); 1108 ep->ev_log = NULL; 1109 } 1110 } 1111 1112 /* 1113 * Iterate over the rules for the current state trying to match the 1114 * event class to one of our special rules. If a rule is matched, the 1115 * event is consumed and not dispatched to other modules. If the rule 1116 * set ends without matching an event, we fall through to dispatching. 1117 */ 1118 for (xrp = xip->xi_state; xrp->xr_class != NULL; xrp++) { 1119 if (fmd_event_match(e, FMD_EVT_PROTOCOL, xrp->xr_class)) { 1120 fmd_event_hold(e); 1121 xrp->xr_func(xip, nvl); 1122 fmd_event_rele(e); 1123 goto done; 1124 } 1125 } 1126 1127 /* 1128 * Record the event in the errlog if it is an ereport. This code will 1129 * be replaced later with a per-transport intent log instead. 1130 */ 1131 if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_EREPORT_CLASS ".*")) { 1132 (void) pthread_rwlock_rdlock(&dp->d_log_lock); 1133 fmd_log_append(dp->d_errlog, e, NULL); 1134 (void) pthread_rwlock_unlock(&dp->d_log_lock); 1135 } 1136 1137 /* 1138 * If a list.suspect event is received, create a case for the specified 1139 * UUID in the case hash, with the transport module as its owner. If 1140 * the UUID is already known, fmd_case_recreate() will return NULL and 1141 * we simply proceed to our normal event handling regardless. 1142 */ 1143 if (fmd_event_match(e, FMD_EVT_PROTOCOL, FM_LIST_SUSPECT_CLASS) && 1144 nvlist_lookup_string(nvl, FM_SUSPECT_UUID, &uuid) == 0 && 1145 nvlist_lookup_string(nvl, FM_SUSPECT_DIAG_CODE, &code) == 0) { 1146 fmd_module_lock(xip->xi_queue->eq_mod); 1147 (void) fmd_case_recreate(xip->xi_queue->eq_mod, 1148 xp, FMD_CASE_SOLVED, uuid, code); 1149 fmd_module_unlock(xip->xi_queue->eq_mod); 1150 } 1151 1152 if (isproto) 1153 fmd_dispq_dispatch(dp->d_disp, e, class); 1154 else 1155 fmd_modhash_dispatch(dp->d_mod_hash, e); 1156 done: 1157 (void) pthread_mutex_lock(&xip->xi_lock); 1158 1159 ASSERT(xip->xi_busy != 0); 1160 xip->xi_busy--; 1161 1162 (void) pthread_cond_broadcast(&xip->xi_cv); 1163 (void) pthread_mutex_unlock(&xip->xi_lock); 1164 } 1165 1166 void 1167 fmd_xprt_uuclose(fmd_xprt_t *xp, const char *uuid) 1168 { 1169 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1170 1171 fmd_event_t *e; 1172 nvlist_t *nvl; 1173 char *s; 1174 1175 fmd_dprintf(FMD_DBG_XPRT, 1176 "xprt %u closing case %s\n", xip->xi_id, uuid); 1177 1178 nvl = fmd_protocol_xprt_uuclose(xip->xi_queue->eq_mod, 1179 "resource.fm.xprt.uuclose", xip->xi_version, uuid); 1180 1181 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 1182 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 1183 fmd_eventq_insert_at_time(xip->xi_queue, e); 1184 } 1185 1186 /* 1187 * Insert the specified class into our remote subscription hash. If the class 1188 * is already present, bump the reference count; otherwise add it to the hash 1189 * and then enqueue an event for our remote peer to proxy our subscription. 1190 */ 1191 void 1192 fmd_xprt_subscribe(fmd_xprt_t *xp, const char *class) 1193 { 1194 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1195 1196 uint_t refs; 1197 nvlist_t *nvl; 1198 fmd_event_t *e; 1199 char *s; 1200 1201 if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 1202 return; /* read-only transports do not proxy subscriptions */ 1203 1204 if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER)) 1205 return; /* transport is not yet an active subscriber */ 1206 1207 (void) pthread_mutex_lock(&xip->xi_lock); 1208 refs = fmd_xprt_class_hash_insert(xip, &xip->xi_rsub, class); 1209 (void) pthread_mutex_unlock(&xip->xi_lock); 1210 1211 if (refs > 1) 1212 return; /* we've already asked our peer for this subscription */ 1213 1214 fmd_dprintf(FMD_DBG_XPRT, 1215 "xprt %u subscribing to %s\n", xip->xi_id, class); 1216 1217 nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod, 1218 "resource.fm.xprt.subscribe", xip->xi_version, class); 1219 1220 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 1221 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 1222 fmd_eventq_insert_at_time(xip->xi_queue, e); 1223 } 1224 1225 /* 1226 * Delete the specified class from the remote subscription hash. If the 1227 * reference count drops to zero, ask our remote peer to unsubscribe by proxy. 1228 */ 1229 void 1230 fmd_xprt_unsubscribe(fmd_xprt_t *xp, const char *class) 1231 { 1232 fmd_xprt_impl_t *xip = (fmd_xprt_impl_t *)xp; 1233 1234 uint_t refs; 1235 nvlist_t *nvl; 1236 fmd_event_t *e; 1237 char *s; 1238 1239 if ((xip->xi_flags & FMD_XPRT_RDWR) == FMD_XPRT_RDONLY) 1240 return; /* read-only transports do not proxy subscriptions */ 1241 1242 if (!(xip->xi_flags & FMD_XPRT_SUBSCRIBER)) 1243 return; /* transport is not yet an active subscriber */ 1244 1245 /* 1246 * If the subscription reference count drops to zero in xi_rsub, insert 1247 * an entry into the xi_usub hash indicating we await an unsuback event. 1248 */ 1249 (void) pthread_mutex_lock(&xip->xi_lock); 1250 1251 if ((refs = fmd_xprt_class_hash_delete(xip, &xip->xi_rsub, class)) == 0) 1252 (void) fmd_xprt_class_hash_insert(xip, &xip->xi_usub, class); 1253 1254 (void) pthread_mutex_unlock(&xip->xi_lock); 1255 1256 if (refs != 0) 1257 return; /* other subscriptions for this class still active */ 1258 1259 fmd_dprintf(FMD_DBG_XPRT, 1260 "xprt %u unsubscribing from %s\n", xip->xi_id, class); 1261 1262 nvl = fmd_protocol_xprt_sub(xip->xi_queue->eq_mod, 1263 "resource.fm.xprt.unsubscribe", xip->xi_version, class); 1264 1265 (void) nvlist_lookup_string(nvl, FM_CLASS, &s); 1266 e = fmd_event_create(FMD_EVT_PROTOCOL, FMD_HRT_NOW, nvl, s); 1267 fmd_eventq_insert_at_time(xip->xi_queue, e); 1268 } 1269 1270 static void 1271 fmd_xprt_subscribe_xid(fmd_idspace_t *ids, id_t id, void *class) 1272 { 1273 fmd_xprt_t *xp; 1274 1275 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1276 fmd_xprt_subscribe(xp, class); 1277 fmd_idspace_rele(ids, id); 1278 } 1279 } 1280 1281 void 1282 fmd_xprt_subscribe_all(const char *class) 1283 { 1284 fmd_idspace_t *ids = fmd.d_xprt_ids; 1285 1286 if (ids->ids_count != 0) 1287 fmd_idspace_apply(ids, fmd_xprt_subscribe_xid, (void *)class); 1288 } 1289 1290 static void 1291 fmd_xprt_unsubscribe_xid(fmd_idspace_t *ids, id_t id, void *class) 1292 { 1293 fmd_xprt_t *xp; 1294 1295 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1296 fmd_xprt_unsubscribe(xp, class); 1297 fmd_idspace_rele(ids, id); 1298 } 1299 } 1300 1301 void 1302 fmd_xprt_unsubscribe_all(const char *class) 1303 { 1304 fmd_idspace_t *ids = fmd.d_xprt_ids; 1305 1306 if (ids->ids_count != 0) 1307 fmd_idspace_apply(ids, fmd_xprt_unsubscribe_xid, (void *)class); 1308 } 1309 1310 /*ARGSUSED*/ 1311 static void 1312 fmd_xprt_suspend_xid(fmd_idspace_t *ids, id_t id, void *arg) 1313 { 1314 fmd_xprt_t *xp; 1315 1316 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1317 fmd_xprt_xsuspend(xp, FMD_XPRT_DSUSPENDED); 1318 fmd_idspace_rele(ids, id); 1319 } 1320 } 1321 1322 void 1323 fmd_xprt_suspend_all(void) 1324 { 1325 fmd_idspace_t *ids = fmd.d_xprt_ids; 1326 1327 (void) pthread_mutex_lock(&fmd.d_xprt_lock); 1328 1329 if (fmd.d_xprt_suspend++ != 0) { 1330 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1331 return; /* already suspended */ 1332 } 1333 1334 if (ids->ids_count != 0) 1335 fmd_idspace_apply(ids, fmd_xprt_suspend_xid, NULL); 1336 1337 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1338 } 1339 1340 /*ARGSUSED*/ 1341 static void 1342 fmd_xprt_resume_xid(fmd_idspace_t *ids, id_t id, void *arg) 1343 { 1344 fmd_xprt_t *xp; 1345 1346 if ((xp = fmd_idspace_hold(ids, id)) != NULL) { 1347 fmd_xprt_xresume(xp, FMD_XPRT_DSUSPENDED); 1348 fmd_idspace_rele(ids, id); 1349 } 1350 } 1351 1352 void 1353 fmd_xprt_resume_all(void) 1354 { 1355 fmd_idspace_t *ids = fmd.d_xprt_ids; 1356 1357 (void) pthread_mutex_lock(&fmd.d_xprt_lock); 1358 1359 if (fmd.d_xprt_suspend == 0) 1360 fmd_panic("fmd_xprt_suspend/resume_all mismatch\n"); 1361 1362 if (--fmd.d_xprt_suspend != 0) { 1363 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1364 return; /* not ready to be resumed */ 1365 } 1366 1367 if (ids->ids_count != 0) 1368 fmd_idspace_apply(ids, fmd_xprt_resume_xid, NULL); 1369 1370 (void) pthread_mutex_unlock(&fmd.d_xprt_lock); 1371 } 1372