1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
24 */
25
26 #include <fmd_alloc.h>
27 #include <fmd_eventq.h>
28 #include <fmd_module.h>
29 #include <fmd_dispq.h>
30 #include <fmd_subr.h>
31
32 #include <fmd.h>
33
34 fmd_eventq_t *
fmd_eventq_create(fmd_module_t * mp,fmd_eventqstat_t * stats,pthread_mutex_t * stats_lock,uint_t limit)35 fmd_eventq_create(fmd_module_t *mp, fmd_eventqstat_t *stats,
36 pthread_mutex_t *stats_lock, uint_t limit)
37 {
38 fmd_eventq_t *eq = fmd_zalloc(sizeof (fmd_eventq_t), FMD_SLEEP);
39
40 (void) pthread_mutex_init(&eq->eq_lock, NULL);
41 (void) pthread_cond_init(&eq->eq_cv, NULL);
42
43 eq->eq_mod = mp;
44 eq->eq_stats = stats;
45 eq->eq_stats_lock = stats_lock;
46 eq->eq_limit = limit;
47 eq->eq_sgid = fmd_dispq_getgid(fmd.d_disp, eq);
48
49 return (eq);
50 }
51
52 void
fmd_eventq_destroy(fmd_eventq_t * eq)53 fmd_eventq_destroy(fmd_eventq_t *eq)
54 {
55 fmd_eventqelem_t *eqe;
56
57 while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) {
58 fmd_list_delete(&eq->eq_list, eqe);
59 fmd_event_rele(eqe->eqe_event);
60 fmd_free(eqe, sizeof (fmd_eventqelem_t));
61 }
62
63 fmd_dispq_delgid(fmd.d_disp, eq->eq_sgid);
64 fmd_free(eq, sizeof (fmd_eventq_t));
65 }
66
67 static void
fmd_eventq_drop(fmd_eventq_t * eq,fmd_eventqelem_t * eqe)68 fmd_eventq_drop(fmd_eventq_t *eq, fmd_eventqelem_t *eqe)
69 {
70 (void) pthread_mutex_lock(eq->eq_stats_lock);
71 eq->eq_stats->eqs_dropped.fmds_value.ui64++;
72 (void) pthread_mutex_unlock(eq->eq_stats_lock);
73
74 fmd_event_rele(eqe->eqe_event);
75 fmd_free(eqe, sizeof (fmd_eventqelem_t));
76 }
77
78 void
fmd_eventq_drop_topo(fmd_eventq_t * eq)79 fmd_eventq_drop_topo(fmd_eventq_t *eq)
80 {
81 fmd_eventqelem_t *eqe, *tmp;
82 boolean_t got_fm_events = B_FALSE;
83
84 /*
85 * Here we iterate through the per-module event queue in order to remove
86 * redundant FMD_EVT_TOPO events. The trick is to not drop a given
87 * topo event if there are any FM protocol events in the queue after
88 * it, as those events need to be processed with the correct topology.
89 */
90 (void) pthread_mutex_lock(&eq->eq_lock);
91 eqe = fmd_list_prev(&eq->eq_list);
92 while (eqe) {
93 if (FMD_EVENT_TYPE(eqe->eqe_event) == FMD_EVT_TOPO) {
94 if (!got_fm_events) {
95 tmp = eqe;
96 eqe = fmd_list_prev(eqe);
97 fmd_list_delete(&eq->eq_list, tmp);
98 eq->eq_size--;
99 fmd_eventq_drop(eq, tmp);
100 } else {
101 got_fm_events = B_FALSE;
102 eqe = fmd_list_prev(eqe);
103 }
104 } else if (FMD_EVENT_TYPE(eqe->eqe_event) == FMD_EVT_PROTOCOL) {
105 got_fm_events = B_TRUE;
106 eqe = fmd_list_prev(eqe);
107 } else
108 eqe = fmd_list_prev(eqe);
109 }
110 (void) pthread_mutex_unlock(&eq->eq_lock);
111 }
112
113 /*
114 * Update statistics when an event is dispatched and placed on a module's event
115 * queue. This is essentially the same code as kstat_waitq_enter(9F).
116 */
117 static void
fmd_eventqstat_dispatch(fmd_eventq_t * eq)118 fmd_eventqstat_dispatch(fmd_eventq_t *eq)
119 {
120 fmd_eventqstat_t *eqs = eq->eq_stats;
121 hrtime_t new, delta;
122 uint32_t wcnt;
123
124 (void) pthread_mutex_lock(eq->eq_stats_lock);
125
126 new = gethrtime();
127 delta = new - eqs->eqs_wlastupdate.fmds_value.ui64;
128 eqs->eqs_wlastupdate.fmds_value.ui64 = new;
129 wcnt = eqs->eqs_wcnt.fmds_value.ui32++;
130
131 if (wcnt != 0) {
132 eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt;
133 eqs->eqs_wtime.fmds_value.ui64 += delta;
134 }
135
136 eqs->eqs_dispatched.fmds_value.ui64++;
137 (void) pthread_mutex_unlock(eq->eq_stats_lock);
138 }
139
140 void
fmd_eventq_insert_at_head(fmd_eventq_t * eq,fmd_event_t * ep)141 fmd_eventq_insert_at_head(fmd_eventq_t *eq, fmd_event_t *ep)
142 {
143 uint_t evt = FMD_EVENT_TYPE(ep);
144 fmd_eventqelem_t *eqe;
145 int ok;
146
147 /*
148 * If this event queue is acting as /dev/null, bounce the reference
149 * count to free an unreferenced event and just return immediately.
150 */
151 if (eq->eq_limit == 0) {
152 fmd_event_hold(ep);
153 fmd_event_rele(ep);
154 return;
155 }
156
157 eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP);
158 fmd_event_hold(ep);
159 eqe->eqe_event = ep;
160
161 (void) pthread_mutex_lock(&eq->eq_lock);
162
163 if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) {
164 if (evt != FMD_EVT_CTL)
165 fmd_eventqstat_dispatch(eq);
166
167 fmd_list_prepend(&eq->eq_list, eqe);
168 eq->eq_size++;
169 }
170
171 (void) pthread_cond_broadcast(&eq->eq_cv);
172 (void) pthread_mutex_unlock(&eq->eq_lock);
173
174 if (!ok)
175 fmd_eventq_drop(eq, eqe);
176 }
177
178 void
fmd_eventq_insert_at_time(fmd_eventq_t * eq,fmd_event_t * ep)179 fmd_eventq_insert_at_time(fmd_eventq_t *eq, fmd_event_t *ep)
180 {
181 uint_t evt = FMD_EVENT_TYPE(ep);
182 hrtime_t hrt = fmd_event_hrtime(ep);
183 fmd_eventqelem_t *eqe, *oqe;
184 int ok;
185
186 /*
187 * If this event queue is acting as /dev/null, bounce the reference
188 * count to free an unreferenced event and just return immediately.
189 */
190 if (eq->eq_limit == 0) {
191 fmd_event_hold(ep);
192 fmd_event_rele(ep);
193 return;
194 }
195
196 eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP);
197 fmd_event_hold(ep);
198 eqe->eqe_event = ep;
199
200 (void) pthread_mutex_lock(&eq->eq_lock);
201
202 /*
203 * fmd makes no guarantees that events will be delivered in time order
204 * because its transport can make no such guarantees. Instead we make
205 * a looser guarantee that an enqueued event will be dequeued before
206 * any newer *pending* events according to event time. This permits us
207 * to state, for example, that a timer expiry event will be delivered
208 * prior to any enqueued event whose time is after the timer expired.
209 * We use a simple insertion sort for this task, as queue lengths are
210 * typically short and events do *tend* to be received chronologically.
211 */
212 for (oqe = fmd_list_prev(&eq->eq_list); oqe; oqe = fmd_list_prev(oqe)) {
213 if (hrt >= fmd_event_hrtime(oqe->eqe_event))
214 break; /* 'ep' is newer than the event in 'oqe' */
215 }
216
217 if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) {
218 if (evt != FMD_EVT_CTL)
219 fmd_eventqstat_dispatch(eq);
220
221 if (oqe == NULL)
222 fmd_list_prepend(&eq->eq_list, eqe);
223 else
224 fmd_list_insert_after(&eq->eq_list, oqe, eqe);
225 eq->eq_size++;
226 }
227
228 (void) pthread_cond_broadcast(&eq->eq_cv);
229 (void) pthread_mutex_unlock(&eq->eq_lock);
230
231 if (!ok)
232 fmd_eventq_drop(eq, eqe);
233 }
234
235 fmd_event_t *
fmd_eventq_delete(fmd_eventq_t * eq)236 fmd_eventq_delete(fmd_eventq_t *eq)
237 {
238 fmd_eventqstat_t *eqs = eq->eq_stats;
239 hrtime_t new, delta;
240 uint32_t wcnt;
241
242 fmd_eventqelem_t *eqe;
243 fmd_event_t *ep;
244 top:
245 (void) pthread_mutex_lock(&eq->eq_lock);
246
247 while (!(eq->eq_flags & FMD_EVENTQ_ABORT) &&
248 (eq->eq_size == 0 || (eq->eq_flags & FMD_EVENTQ_SUSPEND)))
249 (void) pthread_cond_wait(&eq->eq_cv, &eq->eq_lock);
250
251 if (eq->eq_flags & FMD_EVENTQ_ABORT) {
252 (void) pthread_mutex_unlock(&eq->eq_lock);
253 return (NULL);
254 }
255
256 eqe = fmd_list_next(&eq->eq_list);
257 fmd_list_delete(&eq->eq_list, eqe);
258 eq->eq_size--;
259
260 (void) pthread_mutex_unlock(&eq->eq_lock);
261
262 ep = eqe->eqe_event;
263 fmd_free(eqe, sizeof (fmd_eventqelem_t));
264
265 /*
266 * If we dequeued a control event, release it and go back to sleep.
267 * fmd_event_rele() on the event will block as described in fmd_ctl.c.
268 * This effectively renders control events invisible to our callers
269 * as well as to statistics and observability tools (e.g. fmstat(1M)).
270 */
271 if (FMD_EVENT_TYPE(ep) == FMD_EVT_CTL) {
272 fmd_event_rele(ep);
273 goto top;
274 }
275
276 /*
277 * Before returning, update our statistics. This code is essentially
278 * kstat_waitq_to_runq(9F), except simplified because our queues are
279 * always consumed by a single thread (i.e. runq len == 1).
280 */
281 (void) pthread_mutex_lock(eq->eq_stats_lock);
282
283 new = gethrtime();
284 delta = new - eqs->eqs_wlastupdate.fmds_value.ui64;
285
286 eqs->eqs_wlastupdate.fmds_value.ui64 = new;
287 eqs->eqs_dlastupdate.fmds_value.ui64 = new;
288
289 ASSERT(eqs->eqs_wcnt.fmds_value.ui32 != 0);
290 wcnt = eqs->eqs_wcnt.fmds_value.ui32--;
291
292 eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt;
293 eqs->eqs_wtime.fmds_value.ui64 += delta;
294
295 if (FMD_EVENT_TYPE(ep) == FMD_EVT_PROTOCOL)
296 eqs->eqs_prdequeued.fmds_value.ui64++;
297
298 eqs->eqs_dequeued.fmds_value.ui64++;
299 (void) pthread_mutex_unlock(eq->eq_stats_lock);
300
301 return (ep);
302 }
303
304 /*
305 * Update statistics when an event is done being processed by the eventq's
306 * consumer thread. This is essentially kstat_runq_exit(9F) simplified for
307 * our principle that a single thread consumes the queue (i.e. runq len == 1).
308 */
309 void
fmd_eventq_done(fmd_eventq_t * eq)310 fmd_eventq_done(fmd_eventq_t *eq)
311 {
312 fmd_eventqstat_t *eqs = eq->eq_stats;
313 hrtime_t new, delta;
314
315 (void) pthread_mutex_lock(eq->eq_stats_lock);
316
317 new = gethrtime();
318 delta = new - eqs->eqs_dlastupdate.fmds_value.ui64;
319
320 eqs->eqs_dlastupdate.fmds_value.ui64 = new;
321 eqs->eqs_dtime.fmds_value.ui64 += delta;
322
323 (void) pthread_mutex_unlock(eq->eq_stats_lock);
324 }
325
326 void
fmd_eventq_cancel(fmd_eventq_t * eq,uint_t type,void * data)327 fmd_eventq_cancel(fmd_eventq_t *eq, uint_t type, void *data)
328 {
329 fmd_eventqelem_t *eqe, *nqe;
330
331 (void) pthread_mutex_lock(&eq->eq_lock);
332
333 for (eqe = fmd_list_next(&eq->eq_list); eqe != NULL; eqe = nqe) {
334 nqe = fmd_list_next(eqe);
335
336 if (fmd_event_match(eqe->eqe_event, type, data)) {
337 fmd_list_delete(&eq->eq_list, eqe);
338 eq->eq_size--;
339 fmd_event_rele(eqe->eqe_event);
340 fmd_free(eqe, sizeof (fmd_eventqelem_t));
341 }
342 }
343
344 (void) pthread_mutex_unlock(&eq->eq_lock);
345 }
346
347 void
fmd_eventq_suspend(fmd_eventq_t * eq)348 fmd_eventq_suspend(fmd_eventq_t *eq)
349 {
350 (void) pthread_mutex_lock(&eq->eq_lock);
351 eq->eq_flags |= FMD_EVENTQ_SUSPEND;
352 (void) pthread_mutex_unlock(&eq->eq_lock);
353 }
354
355 void
fmd_eventq_resume(fmd_eventq_t * eq)356 fmd_eventq_resume(fmd_eventq_t *eq)
357 {
358 (void) pthread_mutex_lock(&eq->eq_lock);
359 eq->eq_flags &= ~FMD_EVENTQ_SUSPEND;
360 (void) pthread_cond_broadcast(&eq->eq_cv);
361 (void) pthread_mutex_unlock(&eq->eq_lock);
362 }
363
364 void
fmd_eventq_abort(fmd_eventq_t * eq)365 fmd_eventq_abort(fmd_eventq_t *eq)
366 {
367 fmd_eventqelem_t *eqe;
368
369 (void) pthread_mutex_lock(&eq->eq_lock);
370
371 while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) {
372 fmd_list_delete(&eq->eq_list, eqe);
373 fmd_event_rele(eqe->eqe_event);
374 fmd_free(eqe, sizeof (fmd_eventqelem_t));
375 }
376
377 eq->eq_flags |= FMD_EVENTQ_ABORT;
378 (void) pthread_cond_broadcast(&eq->eq_cv);
379 (void) pthread_mutex_unlock(&eq->eq_lock);
380 }
381