1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 23 /* 24 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 25 * Use is subject to license terms. 26 */ 27 28 #pragma ident "%Z%%M% %I% %E% SMI" 29 30 #include <fmd_alloc.h> 31 #include <fmd_eventq.h> 32 #include <fmd_module.h> 33 #include <fmd_dispq.h> 34 #include <fmd_subr.h> 35 36 #include <fmd.h> 37 38 fmd_eventq_t * 39 fmd_eventq_create(fmd_module_t *mp, fmd_eventqstat_t *stats, 40 pthread_mutex_t *stats_lock, uint_t limit) 41 { 42 fmd_eventq_t *eq = fmd_zalloc(sizeof (fmd_eventq_t), FMD_SLEEP); 43 44 (void) pthread_mutex_init(&eq->eq_lock, NULL); 45 (void) pthread_cond_init(&eq->eq_cv, NULL); 46 47 eq->eq_mod = mp; 48 eq->eq_stats = stats; 49 eq->eq_stats_lock = stats_lock; 50 eq->eq_limit = limit; 51 eq->eq_sgid = fmd_dispq_getgid(fmd.d_disp, eq); 52 53 return (eq); 54 } 55 56 void 57 fmd_eventq_destroy(fmd_eventq_t *eq) 58 { 59 fmd_eventqelem_t *eqe; 60 61 while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) { 62 fmd_list_delete(&eq->eq_list, eqe); 63 fmd_event_rele(eqe->eqe_event); 64 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 65 } 66 67 fmd_dispq_delgid(fmd.d_disp, eq->eq_sgid); 68 fmd_free(eq, sizeof (fmd_eventq_t)); 69 } 70 71 static void 72 fmd_eventq_drop(fmd_eventq_t *eq, fmd_eventqelem_t *eqe) 73 { 74 (void) pthread_mutex_lock(eq->eq_stats_lock); 75 eq->eq_stats->eqs_dropped.fmds_value.ui64++; 76 (void) pthread_mutex_unlock(eq->eq_stats_lock); 77 78 fmd_event_rele(eqe->eqe_event); 79 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 80 } 81 82 /* 83 * Update statistics when an event is dispatched and placed on a module's event 84 * queue. This is essentially the same code as kstat_waitq_enter(9F). 85 */ 86 static void 87 fmd_eventqstat_dispatch(fmd_eventq_t *eq) 88 { 89 fmd_eventqstat_t *eqs = eq->eq_stats; 90 hrtime_t new, delta; 91 uint32_t wcnt; 92 93 (void) pthread_mutex_lock(eq->eq_stats_lock); 94 95 new = gethrtime(); 96 delta = new - eqs->eqs_wlastupdate.fmds_value.ui64; 97 eqs->eqs_wlastupdate.fmds_value.ui64 = new; 98 wcnt = eqs->eqs_wcnt.fmds_value.ui32++; 99 100 if (wcnt != 0) { 101 eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt; 102 eqs->eqs_wtime.fmds_value.ui64 += delta; 103 } 104 105 eqs->eqs_dispatched.fmds_value.ui64++; 106 (void) pthread_mutex_unlock(eq->eq_stats_lock); 107 } 108 109 void 110 fmd_eventq_insert_at_head(fmd_eventq_t *eq, fmd_event_t *ep) 111 { 112 uint_t evt = FMD_EVENT_TYPE(ep); 113 fmd_eventqelem_t *eqe; 114 int ok; 115 116 /* 117 * If this event queue is acting as /dev/null, bounce the reference 118 * count to free an unreferenced event and just return immediately. 119 */ 120 if (eq->eq_limit == 0) { 121 fmd_event_hold(ep); 122 fmd_event_rele(ep); 123 return; 124 } 125 126 eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP); 127 fmd_event_hold(ep); 128 eqe->eqe_event = ep; 129 130 (void) pthread_mutex_lock(&eq->eq_lock); 131 132 if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) { 133 if (evt != FMD_EVT_CTL) 134 fmd_eventqstat_dispatch(eq); 135 136 fmd_list_prepend(&eq->eq_list, eqe); 137 eq->eq_size++; 138 } 139 140 (void) pthread_cond_broadcast(&eq->eq_cv); 141 (void) pthread_mutex_unlock(&eq->eq_lock); 142 143 if (!ok) 144 fmd_eventq_drop(eq, eqe); 145 } 146 147 void 148 fmd_eventq_insert_at_time(fmd_eventq_t *eq, fmd_event_t *ep) 149 { 150 uint_t evt = FMD_EVENT_TYPE(ep); 151 hrtime_t hrt = fmd_event_hrtime(ep); 152 fmd_eventqelem_t *eqe, *oqe; 153 int ok; 154 155 /* 156 * If this event queue is acting as /dev/null, bounce the reference 157 * count to free an unreferenced event and just return immediately. 158 */ 159 if (eq->eq_limit == 0) { 160 fmd_event_hold(ep); 161 fmd_event_rele(ep); 162 return; 163 } 164 165 eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP); 166 fmd_event_hold(ep); 167 eqe->eqe_event = ep; 168 169 (void) pthread_mutex_lock(&eq->eq_lock); 170 171 /* 172 * fmd makes no guarantees that events will be delivered in time order 173 * because its transport can make no such guarantees. Instead we make 174 * a looser guarantee that an enqueued event will be dequeued before 175 * any newer *pending* events according to event time. This permits us 176 * to state, for example, that a timer expiry event will be delivered 177 * prior to any enqueued event whose time is after the timer expired. 178 * We use a simple insertion sort for this task, as queue lengths are 179 * typically short and events do *tend* to be received chronologically. 180 */ 181 for (oqe = fmd_list_prev(&eq->eq_list); oqe; oqe = fmd_list_prev(oqe)) { 182 if (hrt >= fmd_event_hrtime(oqe->eqe_event)) 183 break; /* 'ep' is newer than the event in 'oqe' */ 184 } 185 186 if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) { 187 if (evt != FMD_EVT_CTL) 188 fmd_eventqstat_dispatch(eq); 189 190 fmd_list_insert_after(&eq->eq_list, oqe, eqe); 191 eq->eq_size++; 192 } 193 194 (void) pthread_cond_broadcast(&eq->eq_cv); 195 (void) pthread_mutex_unlock(&eq->eq_lock); 196 197 if (!ok) 198 fmd_eventq_drop(eq, eqe); 199 } 200 201 fmd_event_t * 202 fmd_eventq_delete(fmd_eventq_t *eq) 203 { 204 fmd_eventqstat_t *eqs = eq->eq_stats; 205 hrtime_t new, delta; 206 uint32_t wcnt; 207 208 fmd_eventqelem_t *eqe; 209 fmd_event_t *ep; 210 top: 211 (void) pthread_mutex_lock(&eq->eq_lock); 212 213 while (!(eq->eq_flags & FMD_EVENTQ_ABORT) && 214 (eq->eq_size == 0 || (eq->eq_flags & FMD_EVENTQ_SUSPEND))) 215 (void) pthread_cond_wait(&eq->eq_cv, &eq->eq_lock); 216 217 if (eq->eq_flags & FMD_EVENTQ_ABORT) { 218 (void) pthread_mutex_unlock(&eq->eq_lock); 219 return (NULL); 220 } 221 222 eqe = fmd_list_next(&eq->eq_list); 223 fmd_list_delete(&eq->eq_list, eqe); 224 eq->eq_size--; 225 226 (void) pthread_mutex_unlock(&eq->eq_lock); 227 228 ep = eqe->eqe_event; 229 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 230 231 /* 232 * If we dequeued a control event, release it and go back to sleep. 233 * fmd_event_rele() on the event will block as described in fmd_ctl.c. 234 * This effectively renders control events invisible to our callers 235 * as well as to statistics and observability tools (e.g. fmstat(1M)). 236 */ 237 if (FMD_EVENT_TYPE(ep) == FMD_EVT_CTL) { 238 fmd_event_rele(ep); 239 goto top; 240 } 241 242 /* 243 * Before returning, update our statistics. This code is essentially 244 * kstat_waitq_to_runq(9F), except simplified because our queues are 245 * always consumed by a single thread (i.e. runq len == 1). 246 */ 247 (void) pthread_mutex_lock(eq->eq_stats_lock); 248 249 new = gethrtime(); 250 delta = new - eqs->eqs_wlastupdate.fmds_value.ui64; 251 252 eqs->eqs_wlastupdate.fmds_value.ui64 = new; 253 eqs->eqs_dlastupdate.fmds_value.ui64 = new; 254 255 ASSERT(eqs->eqs_wcnt.fmds_value.ui32 != 0); 256 wcnt = eqs->eqs_wcnt.fmds_value.ui32--; 257 258 eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt; 259 eqs->eqs_wtime.fmds_value.ui64 += delta; 260 261 if (FMD_EVENT_TYPE(ep) == FMD_EVT_PROTOCOL) 262 eqs->eqs_prdequeued.fmds_value.ui64++; 263 264 eqs->eqs_dequeued.fmds_value.ui64++; 265 (void) pthread_mutex_unlock(eq->eq_stats_lock); 266 267 return (ep); 268 } 269 270 /* 271 * Update statistics when an event is done being processed by the eventq's 272 * consumer thread. This is essentially kstat_runq_exit(9F) simplified for 273 * our principle that a single thread consumes the queue (i.e. runq len == 1). 274 */ 275 void 276 fmd_eventq_done(fmd_eventq_t *eq) 277 { 278 fmd_eventqstat_t *eqs = eq->eq_stats; 279 hrtime_t new, delta; 280 281 (void) pthread_mutex_lock(eq->eq_stats_lock); 282 283 new = gethrtime(); 284 delta = new - eqs->eqs_dlastupdate.fmds_value.ui64; 285 286 eqs->eqs_dlastupdate.fmds_value.ui64 = new; 287 eqs->eqs_dtime.fmds_value.ui64 += delta; 288 289 (void) pthread_mutex_unlock(eq->eq_stats_lock); 290 } 291 292 void 293 fmd_eventq_cancel(fmd_eventq_t *eq, uint_t type, void *data) 294 { 295 fmd_eventqelem_t *eqe, *nqe; 296 297 (void) pthread_mutex_lock(&eq->eq_lock); 298 299 for (eqe = fmd_list_next(&eq->eq_list); eqe != NULL; eqe = nqe) { 300 nqe = fmd_list_next(eqe); 301 302 if (fmd_event_match(eqe->eqe_event, type, data)) { 303 fmd_list_delete(&eq->eq_list, eqe); 304 eq->eq_size--; 305 fmd_event_rele(eqe->eqe_event); 306 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 307 } 308 } 309 310 (void) pthread_mutex_unlock(&eq->eq_lock); 311 } 312 313 void 314 fmd_eventq_suspend(fmd_eventq_t *eq) 315 { 316 (void) pthread_mutex_lock(&eq->eq_lock); 317 eq->eq_flags |= FMD_EVENTQ_SUSPEND; 318 (void) pthread_mutex_unlock(&eq->eq_lock); 319 } 320 321 void 322 fmd_eventq_resume(fmd_eventq_t *eq) 323 { 324 (void) pthread_mutex_lock(&eq->eq_lock); 325 eq->eq_flags &= ~FMD_EVENTQ_SUSPEND; 326 (void) pthread_cond_broadcast(&eq->eq_cv); 327 (void) pthread_mutex_unlock(&eq->eq_lock); 328 } 329 330 void 331 fmd_eventq_abort(fmd_eventq_t *eq) 332 { 333 fmd_eventqelem_t *eqe; 334 335 (void) pthread_mutex_lock(&eq->eq_lock); 336 337 while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) { 338 fmd_list_delete(&eq->eq_list, eqe); 339 fmd_event_rele(eqe->eqe_event); 340 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 341 } 342 343 eq->eq_flags |= FMD_EVENTQ_ABORT; 344 (void) pthread_cond_broadcast(&eq->eq_cv); 345 (void) pthread_mutex_unlock(&eq->eq_lock); 346 } 347