1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved. 24 */ 25 26 #include <fmd_alloc.h> 27 #include <fmd_eventq.h> 28 #include <fmd_module.h> 29 #include <fmd_dispq.h> 30 #include <fmd_subr.h> 31 32 #include <fmd.h> 33 34 fmd_eventq_t * 35 fmd_eventq_create(fmd_module_t *mp, fmd_eventqstat_t *stats, 36 pthread_mutex_t *stats_lock, uint_t limit) 37 { 38 fmd_eventq_t *eq = fmd_zalloc(sizeof (fmd_eventq_t), FMD_SLEEP); 39 40 (void) pthread_mutex_init(&eq->eq_lock, NULL); 41 (void) pthread_cond_init(&eq->eq_cv, NULL); 42 43 eq->eq_mod = mp; 44 eq->eq_stats = stats; 45 eq->eq_stats_lock = stats_lock; 46 eq->eq_limit = limit; 47 eq->eq_sgid = fmd_dispq_getgid(fmd.d_disp, eq); 48 49 return (eq); 50 } 51 52 void 53 fmd_eventq_destroy(fmd_eventq_t *eq) 54 { 55 fmd_eventqelem_t *eqe; 56 57 while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) { 58 fmd_list_delete(&eq->eq_list, eqe); 59 fmd_event_rele(eqe->eqe_event); 60 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 61 } 62 63 fmd_dispq_delgid(fmd.d_disp, eq->eq_sgid); 64 fmd_free(eq, sizeof (fmd_eventq_t)); 65 } 66 67 static void 68 fmd_eventq_drop(fmd_eventq_t *eq, fmd_eventqelem_t *eqe) 69 { 70 (void) pthread_mutex_lock(eq->eq_stats_lock); 71 eq->eq_stats->eqs_dropped.fmds_value.ui64++; 72 (void) pthread_mutex_unlock(eq->eq_stats_lock); 73 74 fmd_event_rele(eqe->eqe_event); 75 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 76 } 77 78 void 79 fmd_eventq_drop_topo(fmd_eventq_t *eq) 80 { 81 fmd_eventqelem_t *eqe, *tmp; 82 boolean_t got_fm_events = B_FALSE; 83 84 /* 85 * Here we iterate through the per-module event queue in order to remove 86 * redundant FMD_EVT_TOPO events. The trick is to not drop a given 87 * topo event if there are any FM protocol events in the queue after 88 * it, as those events need to be processed with the correct topology. 89 */ 90 (void) pthread_mutex_lock(&eq->eq_lock); 91 eqe = fmd_list_prev(&eq->eq_list); 92 while (eqe) { 93 if (FMD_EVENT_TYPE(eqe->eqe_event) == FMD_EVT_TOPO) { 94 if (!got_fm_events) { 95 tmp = eqe; 96 eqe = fmd_list_prev(eqe); 97 fmd_list_delete(&eq->eq_list, tmp); 98 eq->eq_size--; 99 fmd_eventq_drop(eq, tmp); 100 } else { 101 got_fm_events = B_FALSE; 102 eqe = fmd_list_prev(eqe); 103 } 104 } else if (FMD_EVENT_TYPE(eqe->eqe_event) == FMD_EVT_PROTOCOL) { 105 got_fm_events = B_TRUE; 106 eqe = fmd_list_prev(eqe); 107 } else 108 eqe = fmd_list_prev(eqe); 109 } 110 (void) pthread_mutex_unlock(&eq->eq_lock); 111 } 112 113 /* 114 * Update statistics when an event is dispatched and placed on a module's event 115 * queue. This is essentially the same code as kstat_waitq_enter(9F). 116 */ 117 static void 118 fmd_eventqstat_dispatch(fmd_eventq_t *eq) 119 { 120 fmd_eventqstat_t *eqs = eq->eq_stats; 121 hrtime_t new, delta; 122 uint32_t wcnt; 123 124 (void) pthread_mutex_lock(eq->eq_stats_lock); 125 126 new = gethrtime(); 127 delta = new - eqs->eqs_wlastupdate.fmds_value.ui64; 128 eqs->eqs_wlastupdate.fmds_value.ui64 = new; 129 wcnt = eqs->eqs_wcnt.fmds_value.ui32++; 130 131 if (wcnt != 0) { 132 eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt; 133 eqs->eqs_wtime.fmds_value.ui64 += delta; 134 } 135 136 eqs->eqs_dispatched.fmds_value.ui64++; 137 (void) pthread_mutex_unlock(eq->eq_stats_lock); 138 } 139 140 void 141 fmd_eventq_insert_at_head(fmd_eventq_t *eq, fmd_event_t *ep) 142 { 143 uint_t evt = FMD_EVENT_TYPE(ep); 144 fmd_eventqelem_t *eqe; 145 int ok; 146 147 /* 148 * If this event queue is acting as /dev/null, bounce the reference 149 * count to free an unreferenced event and just return immediately. 150 */ 151 if (eq->eq_limit == 0) { 152 fmd_event_hold(ep); 153 fmd_event_rele(ep); 154 return; 155 } 156 157 eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP); 158 fmd_event_hold(ep); 159 eqe->eqe_event = ep; 160 161 (void) pthread_mutex_lock(&eq->eq_lock); 162 163 if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) { 164 if (evt != FMD_EVT_CTL) 165 fmd_eventqstat_dispatch(eq); 166 167 fmd_list_prepend(&eq->eq_list, eqe); 168 eq->eq_size++; 169 } 170 171 (void) pthread_cond_broadcast(&eq->eq_cv); 172 (void) pthread_mutex_unlock(&eq->eq_lock); 173 174 if (!ok) 175 fmd_eventq_drop(eq, eqe); 176 } 177 178 void 179 fmd_eventq_insert_at_time(fmd_eventq_t *eq, fmd_event_t *ep) 180 { 181 uint_t evt = FMD_EVENT_TYPE(ep); 182 hrtime_t hrt = fmd_event_hrtime(ep); 183 fmd_eventqelem_t *eqe, *oqe; 184 int ok; 185 186 /* 187 * If this event queue is acting as /dev/null, bounce the reference 188 * count to free an unreferenced event and just return immediately. 189 */ 190 if (eq->eq_limit == 0) { 191 fmd_event_hold(ep); 192 fmd_event_rele(ep); 193 return; 194 } 195 196 eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP); 197 fmd_event_hold(ep); 198 eqe->eqe_event = ep; 199 200 (void) pthread_mutex_lock(&eq->eq_lock); 201 202 /* 203 * fmd makes no guarantees that events will be delivered in time order 204 * because its transport can make no such guarantees. Instead we make 205 * a looser guarantee that an enqueued event will be dequeued before 206 * any newer *pending* events according to event time. This permits us 207 * to state, for example, that a timer expiry event will be delivered 208 * prior to any enqueued event whose time is after the timer expired. 209 * We use a simple insertion sort for this task, as queue lengths are 210 * typically short and events do *tend* to be received chronologically. 211 */ 212 for (oqe = fmd_list_prev(&eq->eq_list); oqe; oqe = fmd_list_prev(oqe)) { 213 if (hrt >= fmd_event_hrtime(oqe->eqe_event)) 214 break; /* 'ep' is newer than the event in 'oqe' */ 215 } 216 217 if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) { 218 if (evt != FMD_EVT_CTL) 219 fmd_eventqstat_dispatch(eq); 220 221 if (oqe == NULL) 222 fmd_list_prepend(&eq->eq_list, eqe); 223 else 224 fmd_list_insert_after(&eq->eq_list, oqe, eqe); 225 eq->eq_size++; 226 } 227 228 (void) pthread_cond_broadcast(&eq->eq_cv); 229 (void) pthread_mutex_unlock(&eq->eq_lock); 230 231 if (!ok) 232 fmd_eventq_drop(eq, eqe); 233 } 234 235 fmd_event_t * 236 fmd_eventq_delete(fmd_eventq_t *eq) 237 { 238 fmd_eventqstat_t *eqs = eq->eq_stats; 239 hrtime_t new, delta; 240 uint32_t wcnt; 241 242 fmd_eventqelem_t *eqe; 243 fmd_event_t *ep; 244 top: 245 (void) pthread_mutex_lock(&eq->eq_lock); 246 247 while (!(eq->eq_flags & FMD_EVENTQ_ABORT) && 248 (eq->eq_size == 0 || (eq->eq_flags & FMD_EVENTQ_SUSPEND))) 249 (void) pthread_cond_wait(&eq->eq_cv, &eq->eq_lock); 250 251 if (eq->eq_flags & FMD_EVENTQ_ABORT) { 252 (void) pthread_mutex_unlock(&eq->eq_lock); 253 return (NULL); 254 } 255 256 eqe = fmd_list_next(&eq->eq_list); 257 fmd_list_delete(&eq->eq_list, eqe); 258 eq->eq_size--; 259 260 (void) pthread_mutex_unlock(&eq->eq_lock); 261 262 ep = eqe->eqe_event; 263 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 264 265 /* 266 * If we dequeued a control event, release it and go back to sleep. 267 * fmd_event_rele() on the event will block as described in fmd_ctl.c. 268 * This effectively renders control events invisible to our callers 269 * as well as to statistics and observability tools (e.g. fmstat(8)). 270 */ 271 if (FMD_EVENT_TYPE(ep) == FMD_EVT_CTL) { 272 fmd_event_rele(ep); 273 goto top; 274 } 275 276 /* 277 * Before returning, update our statistics. This code is essentially 278 * kstat_waitq_to_runq(9F), except simplified because our queues are 279 * always consumed by a single thread (i.e. runq len == 1). 280 */ 281 (void) pthread_mutex_lock(eq->eq_stats_lock); 282 283 new = gethrtime(); 284 delta = new - eqs->eqs_wlastupdate.fmds_value.ui64; 285 286 eqs->eqs_wlastupdate.fmds_value.ui64 = new; 287 eqs->eqs_dlastupdate.fmds_value.ui64 = new; 288 289 ASSERT(eqs->eqs_wcnt.fmds_value.ui32 != 0); 290 wcnt = eqs->eqs_wcnt.fmds_value.ui32--; 291 292 eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt; 293 eqs->eqs_wtime.fmds_value.ui64 += delta; 294 295 if (FMD_EVENT_TYPE(ep) == FMD_EVT_PROTOCOL) 296 eqs->eqs_prdequeued.fmds_value.ui64++; 297 298 eqs->eqs_dequeued.fmds_value.ui64++; 299 (void) pthread_mutex_unlock(eq->eq_stats_lock); 300 301 return (ep); 302 } 303 304 /* 305 * Update statistics when an event is done being processed by the eventq's 306 * consumer thread. This is essentially kstat_runq_exit(9F) simplified for 307 * our principle that a single thread consumes the queue (i.e. runq len == 1). 308 */ 309 void 310 fmd_eventq_done(fmd_eventq_t *eq) 311 { 312 fmd_eventqstat_t *eqs = eq->eq_stats; 313 hrtime_t new, delta; 314 315 (void) pthread_mutex_lock(eq->eq_stats_lock); 316 317 new = gethrtime(); 318 delta = new - eqs->eqs_dlastupdate.fmds_value.ui64; 319 320 eqs->eqs_dlastupdate.fmds_value.ui64 = new; 321 eqs->eqs_dtime.fmds_value.ui64 += delta; 322 323 (void) pthread_mutex_unlock(eq->eq_stats_lock); 324 } 325 326 void 327 fmd_eventq_cancel(fmd_eventq_t *eq, uint_t type, void *data) 328 { 329 fmd_eventqelem_t *eqe, *nqe; 330 331 (void) pthread_mutex_lock(&eq->eq_lock); 332 333 for (eqe = fmd_list_next(&eq->eq_list); eqe != NULL; eqe = nqe) { 334 nqe = fmd_list_next(eqe); 335 336 if (fmd_event_match(eqe->eqe_event, type, data)) { 337 fmd_list_delete(&eq->eq_list, eqe); 338 eq->eq_size--; 339 fmd_event_rele(eqe->eqe_event); 340 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 341 } 342 } 343 344 (void) pthread_mutex_unlock(&eq->eq_lock); 345 } 346 347 void 348 fmd_eventq_suspend(fmd_eventq_t *eq) 349 { 350 (void) pthread_mutex_lock(&eq->eq_lock); 351 eq->eq_flags |= FMD_EVENTQ_SUSPEND; 352 (void) pthread_mutex_unlock(&eq->eq_lock); 353 } 354 355 void 356 fmd_eventq_resume(fmd_eventq_t *eq) 357 { 358 (void) pthread_mutex_lock(&eq->eq_lock); 359 eq->eq_flags &= ~FMD_EVENTQ_SUSPEND; 360 (void) pthread_cond_broadcast(&eq->eq_cv); 361 (void) pthread_mutex_unlock(&eq->eq_lock); 362 } 363 364 void 365 fmd_eventq_abort(fmd_eventq_t *eq) 366 { 367 fmd_eventqelem_t *eqe; 368 369 (void) pthread_mutex_lock(&eq->eq_lock); 370 371 while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) { 372 fmd_list_delete(&eq->eq_list, eqe); 373 fmd_event_rele(eqe->eqe_event); 374 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 375 } 376 377 eq->eq_flags |= FMD_EVENTQ_ABORT; 378 (void) pthread_cond_broadcast(&eq->eq_cv); 379 (void) pthread_mutex_unlock(&eq->eq_lock); 380 } 381