1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include <fmd_alloc.h> 30 #include <fmd_eventq.h> 31 #include <fmd_module.h> 32 #include <fmd_dispq.h> 33 #include <fmd_subr.h> 34 35 #include <fmd.h> 36 37 fmd_eventq_t * 38 fmd_eventq_create(fmd_module_t *mp, fmd_eventqstat_t *stats, 39 pthread_mutex_t *stats_lock, uint_t limit) 40 { 41 fmd_eventq_t *eq = fmd_zalloc(sizeof (fmd_eventq_t), FMD_SLEEP); 42 43 (void) pthread_mutex_init(&eq->eq_lock, NULL); 44 (void) pthread_cond_init(&eq->eq_cv, NULL); 45 46 eq->eq_mod = mp; 47 eq->eq_stats = stats; 48 eq->eq_stats_lock = stats_lock; 49 eq->eq_limit = limit; 50 eq->eq_sgid = fmd_dispq_getgid(fmd.d_disp, eq); 51 52 return (eq); 53 } 54 55 void 56 fmd_eventq_destroy(fmd_eventq_t *eq) 57 { 58 fmd_eventqelem_t *eqe; 59 60 while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) { 61 fmd_list_delete(&eq->eq_list, eqe); 62 fmd_event_rele(eqe->eqe_event); 63 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 64 } 65 66 fmd_dispq_delgid(fmd.d_disp, eq->eq_sgid); 67 fmd_free(eq, sizeof (fmd_eventq_t)); 68 } 69 70 static void 71 fmd_eventq_drop(fmd_eventq_t *eq, fmd_eventqelem_t *eqe) 72 { 73 (void) pthread_mutex_lock(eq->eq_stats_lock); 74 eq->eq_stats->eqs_dropped.fmds_value.ui64++; 75 (void) pthread_mutex_unlock(eq->eq_stats_lock); 76 77 fmd_event_rele(eqe->eqe_event); 78 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 79 } 80 81 /* 82 * Update statistics when an event is dispatched and placed on a module's event 83 * queue. This is essentially the same code as kstat_waitq_enter(9F). 84 */ 85 static void 86 fmd_eventqstat_dispatch(fmd_eventq_t *eq) 87 { 88 fmd_eventqstat_t *eqs = eq->eq_stats; 89 hrtime_t new, delta; 90 uint32_t wcnt; 91 92 (void) pthread_mutex_lock(eq->eq_stats_lock); 93 94 new = gethrtime(); 95 delta = new - eqs->eqs_wlastupdate.fmds_value.ui64; 96 eqs->eqs_wlastupdate.fmds_value.ui64 = new; 97 wcnt = eqs->eqs_wcnt.fmds_value.ui32++; 98 99 if (wcnt != 0) { 100 eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt; 101 eqs->eqs_wtime.fmds_value.ui64 += delta; 102 } 103 104 eqs->eqs_dispatched.fmds_value.ui64++; 105 (void) pthread_mutex_unlock(eq->eq_stats_lock); 106 } 107 108 void 109 fmd_eventq_insert_at_head(fmd_eventq_t *eq, fmd_event_t *ep) 110 { 111 uint_t evt = FMD_EVENT_TYPE(ep); 112 fmd_eventqelem_t *eqe; 113 int ok; 114 115 /* 116 * If this event queue is acting as /dev/null, bounce the reference 117 * count to free an unreferenced event and just return immediately. 118 */ 119 if (eq->eq_limit == 0) { 120 fmd_event_hold(ep); 121 fmd_event_rele(ep); 122 return; 123 } 124 125 eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP); 126 fmd_event_hold(ep); 127 eqe->eqe_event = ep; 128 129 (void) pthread_mutex_lock(&eq->eq_lock); 130 131 if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) { 132 if (evt != FMD_EVT_CTL) 133 fmd_eventqstat_dispatch(eq); 134 135 fmd_list_prepend(&eq->eq_list, eqe); 136 eq->eq_size++; 137 } 138 139 (void) pthread_cond_broadcast(&eq->eq_cv); 140 (void) pthread_mutex_unlock(&eq->eq_lock); 141 142 if (!ok) 143 fmd_eventq_drop(eq, eqe); 144 } 145 146 void 147 fmd_eventq_insert_at_time(fmd_eventq_t *eq, fmd_event_t *ep) 148 { 149 uint_t evt = FMD_EVENT_TYPE(ep); 150 hrtime_t hrt = fmd_event_hrtime(ep); 151 fmd_eventqelem_t *eqe, *oqe; 152 int ok; 153 154 /* 155 * If this event queue is acting as /dev/null, bounce the reference 156 * count to free an unreferenced event and just return immediately. 157 */ 158 if (eq->eq_limit == 0) { 159 fmd_event_hold(ep); 160 fmd_event_rele(ep); 161 return; 162 } 163 164 eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP); 165 fmd_event_hold(ep); 166 eqe->eqe_event = ep; 167 168 (void) pthread_mutex_lock(&eq->eq_lock); 169 170 /* 171 * fmd makes no guarantees that events will be delivered in time order 172 * because its transport can make no such guarantees. Instead we make 173 * a looser guarantee that an enqueued event will be dequeued before 174 * any newer *pending* events according to event time. This permits us 175 * to state, for example, that a timer expiry event will be delivered 176 * prior to any enqueued event whose time is after the timer expired. 177 * We use a simple insertion sort for this task, as queue lengths are 178 * typically short and events do *tend* to be received chronologically. 179 */ 180 for (oqe = fmd_list_prev(&eq->eq_list); oqe; oqe = fmd_list_prev(oqe)) { 181 if (hrt >= fmd_event_hrtime(oqe->eqe_event)) 182 break; /* 'ep' is newer than the event in 'oqe' */ 183 } 184 185 if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) { 186 if (evt != FMD_EVT_CTL) 187 fmd_eventqstat_dispatch(eq); 188 189 if (oqe == NULL) 190 fmd_list_prepend(&eq->eq_list, eqe); 191 else 192 fmd_list_insert_after(&eq->eq_list, oqe, eqe); 193 eq->eq_size++; 194 } 195 196 (void) pthread_cond_broadcast(&eq->eq_cv); 197 (void) pthread_mutex_unlock(&eq->eq_lock); 198 199 if (!ok) 200 fmd_eventq_drop(eq, eqe); 201 } 202 203 fmd_event_t * 204 fmd_eventq_delete(fmd_eventq_t *eq) 205 { 206 fmd_eventqstat_t *eqs = eq->eq_stats; 207 hrtime_t new, delta; 208 uint32_t wcnt; 209 210 fmd_eventqelem_t *eqe; 211 fmd_event_t *ep; 212 top: 213 (void) pthread_mutex_lock(&eq->eq_lock); 214 215 while (!(eq->eq_flags & FMD_EVENTQ_ABORT) && 216 (eq->eq_size == 0 || (eq->eq_flags & FMD_EVENTQ_SUSPEND))) 217 (void) pthread_cond_wait(&eq->eq_cv, &eq->eq_lock); 218 219 if (eq->eq_flags & FMD_EVENTQ_ABORT) { 220 (void) pthread_mutex_unlock(&eq->eq_lock); 221 return (NULL); 222 } 223 224 eqe = fmd_list_next(&eq->eq_list); 225 fmd_list_delete(&eq->eq_list, eqe); 226 eq->eq_size--; 227 228 (void) pthread_mutex_unlock(&eq->eq_lock); 229 230 ep = eqe->eqe_event; 231 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 232 233 /* 234 * If we dequeued a control event, release it and go back to sleep. 235 * fmd_event_rele() on the event will block as described in fmd_ctl.c. 236 * This effectively renders control events invisible to our callers 237 * as well as to statistics and observability tools (e.g. fmstat(1M)). 238 */ 239 if (FMD_EVENT_TYPE(ep) == FMD_EVT_CTL) { 240 fmd_event_rele(ep); 241 goto top; 242 } 243 244 /* 245 * Before returning, update our statistics. This code is essentially 246 * kstat_waitq_to_runq(9F), except simplified because our queues are 247 * always consumed by a single thread (i.e. runq len == 1). 248 */ 249 (void) pthread_mutex_lock(eq->eq_stats_lock); 250 251 new = gethrtime(); 252 delta = new - eqs->eqs_wlastupdate.fmds_value.ui64; 253 254 eqs->eqs_wlastupdate.fmds_value.ui64 = new; 255 eqs->eqs_dlastupdate.fmds_value.ui64 = new; 256 257 ASSERT(eqs->eqs_wcnt.fmds_value.ui32 != 0); 258 wcnt = eqs->eqs_wcnt.fmds_value.ui32--; 259 260 eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt; 261 eqs->eqs_wtime.fmds_value.ui64 += delta; 262 263 if (FMD_EVENT_TYPE(ep) == FMD_EVT_PROTOCOL) 264 eqs->eqs_prdequeued.fmds_value.ui64++; 265 266 eqs->eqs_dequeued.fmds_value.ui64++; 267 (void) pthread_mutex_unlock(eq->eq_stats_lock); 268 269 return (ep); 270 } 271 272 /* 273 * Update statistics when an event is done being processed by the eventq's 274 * consumer thread. This is essentially kstat_runq_exit(9F) simplified for 275 * our principle that a single thread consumes the queue (i.e. runq len == 1). 276 */ 277 void 278 fmd_eventq_done(fmd_eventq_t *eq) 279 { 280 fmd_eventqstat_t *eqs = eq->eq_stats; 281 hrtime_t new, delta; 282 283 (void) pthread_mutex_lock(eq->eq_stats_lock); 284 285 new = gethrtime(); 286 delta = new - eqs->eqs_dlastupdate.fmds_value.ui64; 287 288 eqs->eqs_dlastupdate.fmds_value.ui64 = new; 289 eqs->eqs_dtime.fmds_value.ui64 += delta; 290 291 (void) pthread_mutex_unlock(eq->eq_stats_lock); 292 } 293 294 void 295 fmd_eventq_cancel(fmd_eventq_t *eq, uint_t type, void *data) 296 { 297 fmd_eventqelem_t *eqe, *nqe; 298 299 (void) pthread_mutex_lock(&eq->eq_lock); 300 301 for (eqe = fmd_list_next(&eq->eq_list); eqe != NULL; eqe = nqe) { 302 nqe = fmd_list_next(eqe); 303 304 if (fmd_event_match(eqe->eqe_event, type, data)) { 305 fmd_list_delete(&eq->eq_list, eqe); 306 eq->eq_size--; 307 fmd_event_rele(eqe->eqe_event); 308 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 309 } 310 } 311 312 (void) pthread_mutex_unlock(&eq->eq_lock); 313 } 314 315 void 316 fmd_eventq_suspend(fmd_eventq_t *eq) 317 { 318 (void) pthread_mutex_lock(&eq->eq_lock); 319 eq->eq_flags |= FMD_EVENTQ_SUSPEND; 320 (void) pthread_mutex_unlock(&eq->eq_lock); 321 } 322 323 void 324 fmd_eventq_resume(fmd_eventq_t *eq) 325 { 326 (void) pthread_mutex_lock(&eq->eq_lock); 327 eq->eq_flags &= ~FMD_EVENTQ_SUSPEND; 328 (void) pthread_cond_broadcast(&eq->eq_cv); 329 (void) pthread_mutex_unlock(&eq->eq_lock); 330 } 331 332 void 333 fmd_eventq_abort(fmd_eventq_t *eq) 334 { 335 fmd_eventqelem_t *eqe; 336 337 (void) pthread_mutex_lock(&eq->eq_lock); 338 339 while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) { 340 fmd_list_delete(&eq->eq_list, eqe); 341 fmd_event_rele(eqe->eqe_event); 342 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 343 } 344 345 eq->eq_flags |= FMD_EVENTQ_ABORT; 346 (void) pthread_cond_broadcast(&eq->eq_cv); 347 (void) pthread_mutex_unlock(&eq->eq_lock); 348 } 349