xref: /titanic_51/usr/src/cmd/fm/fmd/common/fmd_eventq.c (revision 381a2a9a387f449fab7d0c7e97c4184c26963abf)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 
23 /*
24  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
25  * Use is subject to license terms.
26  */
27 
28 #pragma ident	"%Z%%M%	%I%	%E% SMI"
29 
30 #include <fmd_alloc.h>
31 #include <fmd_eventq.h>
32 #include <fmd_module.h>
33 #include <fmd_dispq.h>
34 #include <fmd_subr.h>
35 
36 #include <fmd.h>
37 
38 fmd_eventq_t *
39 fmd_eventq_create(fmd_module_t *mp, fmd_eventqstat_t *stats,
40     pthread_mutex_t *stats_lock, uint_t limit)
41 {
42 	fmd_eventq_t *eq = fmd_zalloc(sizeof (fmd_eventq_t), FMD_SLEEP);
43 
44 	(void) pthread_mutex_init(&eq->eq_lock, NULL);
45 	(void) pthread_cond_init(&eq->eq_cv, NULL);
46 
47 	eq->eq_mod = mp;
48 	eq->eq_stats = stats;
49 	eq->eq_stats_lock = stats_lock;
50 	eq->eq_limit = limit;
51 	eq->eq_sgid = fmd_dispq_getgid(fmd.d_disp, eq);
52 
53 	return (eq);
54 }
55 
56 void
57 fmd_eventq_destroy(fmd_eventq_t *eq)
58 {
59 	fmd_eventqelem_t *eqe;
60 
61 	while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) {
62 		fmd_list_delete(&eq->eq_list, eqe);
63 		fmd_event_rele(eqe->eqe_event);
64 		fmd_free(eqe, sizeof (fmd_eventqelem_t));
65 	}
66 
67 	fmd_dispq_delgid(fmd.d_disp, eq->eq_sgid);
68 	fmd_free(eq, sizeof (fmd_eventq_t));
69 }
70 
71 static void
72 fmd_eventq_drop(fmd_eventq_t *eq, fmd_eventqelem_t *eqe)
73 {
74 	(void) pthread_mutex_lock(eq->eq_stats_lock);
75 	eq->eq_stats->eqs_dropped.fmds_value.ui64++;
76 	(void) pthread_mutex_unlock(eq->eq_stats_lock);
77 
78 	fmd_event_rele(eqe->eqe_event);
79 	fmd_free(eqe, sizeof (fmd_eventqelem_t));
80 }
81 
82 /*
83  * Update statistics when an event is dispatched and placed on a module's event
84  * queue.  This is essentially the same code as kstat_waitq_enter(9F).
85  */
86 static void
87 fmd_eventqstat_dispatch(fmd_eventq_t *eq)
88 {
89 	fmd_eventqstat_t *eqs = eq->eq_stats;
90 	hrtime_t new, delta;
91 	uint32_t wcnt;
92 
93 	(void) pthread_mutex_lock(eq->eq_stats_lock);
94 
95 	new = gethrtime();
96 	delta = new - eqs->eqs_wlastupdate.fmds_value.ui64;
97 	eqs->eqs_wlastupdate.fmds_value.ui64 = new;
98 	wcnt = eqs->eqs_wcnt.fmds_value.ui32++;
99 
100 	if (wcnt != 0) {
101 		eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt;
102 		eqs->eqs_wtime.fmds_value.ui64 += delta;
103 	}
104 
105 	eqs->eqs_dispatched.fmds_value.ui64++;
106 	(void) pthread_mutex_unlock(eq->eq_stats_lock);
107 }
108 
109 void
110 fmd_eventq_insert_at_head(fmd_eventq_t *eq, fmd_event_t *ep)
111 {
112 	uint_t evt = FMD_EVENT_TYPE(ep);
113 	fmd_eventqelem_t *eqe;
114 	int ok;
115 
116 	/*
117 	 * If this event queue is acting as /dev/null, bounce the reference
118 	 * count to free an unreferenced event and just return immediately.
119 	 */
120 	if (eq->eq_limit == 0) {
121 		fmd_event_hold(ep);
122 		fmd_event_rele(ep);
123 		return;
124 	}
125 
126 	eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP);
127 	fmd_event_hold(ep);
128 	eqe->eqe_event = ep;
129 
130 	(void) pthread_mutex_lock(&eq->eq_lock);
131 
132 	if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) {
133 		if (evt != FMD_EVT_CTL)
134 			fmd_eventqstat_dispatch(eq);
135 
136 		fmd_list_prepend(&eq->eq_list, eqe);
137 		eq->eq_size++;
138 	}
139 
140 	(void) pthread_cond_broadcast(&eq->eq_cv);
141 	(void) pthread_mutex_unlock(&eq->eq_lock);
142 
143 	if (!ok)
144 		fmd_eventq_drop(eq, eqe);
145 }
146 
147 void
148 fmd_eventq_insert_at_time(fmd_eventq_t *eq, fmd_event_t *ep)
149 {
150 	uint_t evt = FMD_EVENT_TYPE(ep);
151 	hrtime_t hrt = fmd_event_hrtime(ep);
152 	fmd_eventqelem_t *eqe, *oqe;
153 	int ok;
154 
155 	/*
156 	 * If this event queue is acting as /dev/null, bounce the reference
157 	 * count to free an unreferenced event and just return immediately.
158 	 */
159 	if (eq->eq_limit == 0) {
160 		fmd_event_hold(ep);
161 		fmd_event_rele(ep);
162 		return;
163 	}
164 
165 	eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP);
166 	fmd_event_hold(ep);
167 	eqe->eqe_event = ep;
168 
169 	(void) pthread_mutex_lock(&eq->eq_lock);
170 
171 	/*
172 	 * fmd makes no guarantees that events will be delivered in time order
173 	 * because its transport can make no such guarantees.  Instead we make
174 	 * a looser guarantee that an enqueued event will be dequeued before
175 	 * any newer *pending* events according to event time.  This permits us
176 	 * to state, for example, that a timer expiry event will be delivered
177 	 * prior to any enqueued event whose time is after the timer expired.
178 	 * We use a simple insertion sort for this task, as queue lengths are
179 	 * typically short and events do *tend* to be received chronologically.
180 	 */
181 	for (oqe = fmd_list_prev(&eq->eq_list); oqe; oqe = fmd_list_prev(oqe)) {
182 		if (hrt >= fmd_event_hrtime(oqe->eqe_event))
183 			break; /* 'ep' is newer than the event in 'oqe' */
184 	}
185 
186 	if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) {
187 		if (evt != FMD_EVT_CTL)
188 			fmd_eventqstat_dispatch(eq);
189 
190 		fmd_list_insert_after(&eq->eq_list, oqe, eqe);
191 		eq->eq_size++;
192 	}
193 
194 	(void) pthread_cond_broadcast(&eq->eq_cv);
195 	(void) pthread_mutex_unlock(&eq->eq_lock);
196 
197 	if (!ok)
198 		fmd_eventq_drop(eq, eqe);
199 }
200 
201 fmd_event_t *
202 fmd_eventq_delete(fmd_eventq_t *eq)
203 {
204 	fmd_eventqstat_t *eqs = eq->eq_stats;
205 	hrtime_t new, delta;
206 	uint32_t wcnt;
207 
208 	fmd_eventqelem_t *eqe;
209 	fmd_event_t *ep;
210 top:
211 	(void) pthread_mutex_lock(&eq->eq_lock);
212 
213 	while (!(eq->eq_flags & FMD_EVENTQ_ABORT) &&
214 	    (eq->eq_size == 0 || (eq->eq_flags & FMD_EVENTQ_SUSPEND)))
215 		(void) pthread_cond_wait(&eq->eq_cv, &eq->eq_lock);
216 
217 	if (eq->eq_flags & FMD_EVENTQ_ABORT) {
218 		(void) pthread_mutex_unlock(&eq->eq_lock);
219 		return (NULL);
220 	}
221 
222 	eqe = fmd_list_next(&eq->eq_list);
223 	fmd_list_delete(&eq->eq_list, eqe);
224 	eq->eq_size--;
225 
226 	(void) pthread_mutex_unlock(&eq->eq_lock);
227 
228 	ep = eqe->eqe_event;
229 	fmd_free(eqe, sizeof (fmd_eventqelem_t));
230 
231 	/*
232 	 * If we dequeued a control event, release it and go back to sleep.
233 	 * fmd_event_rele() on the event will block as described in fmd_ctl.c.
234 	 * This effectively renders control events invisible to our callers
235 	 * as well as to statistics and observability tools (e.g. fmstat(1M)).
236 	 */
237 	if (FMD_EVENT_TYPE(ep) == FMD_EVT_CTL) {
238 		fmd_event_rele(ep);
239 		goto top;
240 	}
241 
242 	/*
243 	 * Before returning, update our statistics.  This code is essentially
244 	 * kstat_waitq_to_runq(9F), except simplified because our queues are
245 	 * always consumed by a single thread (i.e. runq len == 1).
246 	 */
247 	(void) pthread_mutex_lock(eq->eq_stats_lock);
248 
249 	new = gethrtime();
250 	delta = new - eqs->eqs_wlastupdate.fmds_value.ui64;
251 
252 	eqs->eqs_wlastupdate.fmds_value.ui64 = new;
253 	eqs->eqs_dlastupdate.fmds_value.ui64 = new;
254 
255 	ASSERT(eqs->eqs_wcnt.fmds_value.ui32 != 0);
256 	wcnt = eqs->eqs_wcnt.fmds_value.ui32--;
257 
258 	eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt;
259 	eqs->eqs_wtime.fmds_value.ui64 += delta;
260 
261 	if (FMD_EVENT_TYPE(ep) == FMD_EVT_PROTOCOL)
262 		eqs->eqs_prdequeued.fmds_value.ui64++;
263 
264 	eqs->eqs_dequeued.fmds_value.ui64++;
265 	(void) pthread_mutex_unlock(eq->eq_stats_lock);
266 
267 	return (ep);
268 }
269 
270 /*
271  * Update statistics when an event is done being processed by the eventq's
272  * consumer thread.  This is essentially kstat_runq_exit(9F) simplified for
273  * our principle that a single thread consumes the queue (i.e. runq len == 1).
274  */
275 void
276 fmd_eventq_done(fmd_eventq_t *eq)
277 {
278 	fmd_eventqstat_t *eqs = eq->eq_stats;
279 	hrtime_t new, delta;
280 
281 	(void) pthread_mutex_lock(eq->eq_stats_lock);
282 
283 	new = gethrtime();
284 	delta = new - eqs->eqs_dlastupdate.fmds_value.ui64;
285 
286 	eqs->eqs_dlastupdate.fmds_value.ui64 = new;
287 	eqs->eqs_dtime.fmds_value.ui64 += delta;
288 
289 	(void) pthread_mutex_unlock(eq->eq_stats_lock);
290 }
291 
292 void
293 fmd_eventq_cancel(fmd_eventq_t *eq, uint_t type, void *data)
294 {
295 	fmd_eventqelem_t *eqe, *nqe;
296 
297 	(void) pthread_mutex_lock(&eq->eq_lock);
298 
299 	for (eqe = fmd_list_next(&eq->eq_list); eqe != NULL; eqe = nqe) {
300 		nqe = fmd_list_next(eqe);
301 
302 		if (fmd_event_match(eqe->eqe_event, type, data)) {
303 			fmd_list_delete(&eq->eq_list, eqe);
304 			eq->eq_size--;
305 			fmd_event_rele(eqe->eqe_event);
306 			fmd_free(eqe, sizeof (fmd_eventqelem_t));
307 		}
308 	}
309 
310 	(void) pthread_mutex_unlock(&eq->eq_lock);
311 }
312 
313 void
314 fmd_eventq_suspend(fmd_eventq_t *eq)
315 {
316 	(void) pthread_mutex_lock(&eq->eq_lock);
317 	eq->eq_flags |= FMD_EVENTQ_SUSPEND;
318 	(void) pthread_mutex_unlock(&eq->eq_lock);
319 }
320 
321 void
322 fmd_eventq_resume(fmd_eventq_t *eq)
323 {
324 	(void) pthread_mutex_lock(&eq->eq_lock);
325 	eq->eq_flags &= ~FMD_EVENTQ_SUSPEND;
326 	(void) pthread_cond_broadcast(&eq->eq_cv);
327 	(void) pthread_mutex_unlock(&eq->eq_lock);
328 }
329 
330 void
331 fmd_eventq_abort(fmd_eventq_t *eq)
332 {
333 	fmd_eventqelem_t *eqe;
334 
335 	(void) pthread_mutex_lock(&eq->eq_lock);
336 
337 	while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) {
338 		fmd_list_delete(&eq->eq_list, eqe);
339 		fmd_event_rele(eqe->eqe_event);
340 		fmd_free(eqe, sizeof (fmd_eventqelem_t));
341 	}
342 
343 	eq->eq_flags |= FMD_EVENTQ_ABORT;
344 	(void) pthread_cond_broadcast(&eq->eq_cv);
345 	(void) pthread_mutex_unlock(&eq->eq_lock);
346 }
347