1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include <fmd_alloc.h> 30 #include <fmd_eventq.h> 31 #include <fmd_module.h> 32 33 fmd_eventq_t * 34 fmd_eventq_create(fmd_module_t *mp, uint_t limit) 35 { 36 fmd_eventq_t *eq = fmd_zalloc(sizeof (fmd_eventq_t), FMD_SLEEP); 37 38 (void) pthread_mutex_init(&eq->eq_lock, NULL); 39 (void) pthread_cond_init(&eq->eq_cv, NULL); 40 41 eq->eq_mod = mp; 42 eq->eq_limit = limit; 43 44 return (eq); 45 } 46 47 void 48 fmd_eventq_destroy(fmd_eventq_t *eq) 49 { 50 fmd_eventqelem_t *eqe; 51 52 while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) { 53 fmd_list_delete(&eq->eq_list, eqe); 54 fmd_event_rele(eqe->eqe_event); 55 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 56 } 57 58 fmd_free(eq, sizeof (fmd_eventq_t)); 59 } 60 61 static void 62 fmd_eventq_drop(fmd_eventq_t *eq, fmd_eventqelem_t *eqe) 63 { 64 (void) pthread_mutex_lock(&eq->eq_mod->mod_stats_lock); 65 eq->eq_mod->mod_stats->ms_dropped.fmds_value.ui64++; 66 (void) pthread_mutex_unlock(&eq->eq_mod->mod_stats_lock); 67 68 fmd_event_rele(eqe->eqe_event); 69 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 70 } 71 72 void 73 fmd_eventq_insert_at_head(fmd_eventq_t *eq, fmd_event_t *ep) 74 { 75 fmd_eventqelem_t *eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP); 76 uint_t evt = ((fmd_event_impl_t *)ep)->ev_type; 77 int ok; 78 79 fmd_event_hold(ep); 80 eqe->eqe_event = ep; 81 82 (void) pthread_mutex_lock(&eq->eq_lock); 83 84 if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) { 85 if (evt != FMD_EVT_CTL) 86 fmd_modstat_eventq_dispatch(eq->eq_mod); 87 88 fmd_list_prepend(&eq->eq_list, eqe); 89 eq->eq_size++; 90 } 91 92 (void) pthread_mutex_unlock(&eq->eq_lock); 93 (void) pthread_cond_broadcast(&eq->eq_cv); 94 95 if (!ok) 96 fmd_eventq_drop(eq, eqe); 97 } 98 99 void 100 fmd_eventq_insert_at_time(fmd_eventq_t *eq, fmd_event_t *ep) 101 { 102 fmd_eventqelem_t *eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP); 103 uint_t evt = ((fmd_event_impl_t *)ep)->ev_type; 104 hrtime_t hrt = fmd_event_hrtime(ep); 105 106 fmd_eventqelem_t *oqe; 107 int ok; 108 109 fmd_event_hold(ep); 110 eqe->eqe_event = ep; 111 112 (void) pthread_mutex_lock(&eq->eq_lock); 113 114 /* 115 * fmd makes no guarantees that events will be delivered in time order 116 * because its transport can make no such guarantees. Instead we make 117 * a looser guarantee that an enqueued event will be dequeued before 118 * any newer *pending* events according to event time. This permits us 119 * to state, for example, that a timer expiry event will be delivered 120 * prior to any enqueued event whose time is after the timer expired. 121 * We use a simple insertion sort for this task, as queue lengths are 122 * typically short and events do *tend* to be received chronologically. 123 */ 124 for (oqe = fmd_list_prev(&eq->eq_list); oqe; oqe = fmd_list_prev(oqe)) { 125 if (hrt >= fmd_event_hrtime(oqe->eqe_event)) 126 break; /* 'ep' is newer than the event in 'oqe' */ 127 } 128 129 if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) { 130 if (evt != FMD_EVT_CTL) 131 fmd_modstat_eventq_dispatch(eq->eq_mod); 132 133 fmd_list_insert_after(&eq->eq_list, oqe, eqe); 134 eq->eq_size++; 135 } 136 137 (void) pthread_mutex_unlock(&eq->eq_lock); 138 (void) pthread_cond_broadcast(&eq->eq_cv); 139 140 if (!ok) 141 fmd_eventq_drop(eq, eqe); 142 } 143 144 fmd_event_t * 145 fmd_eventq_delete(fmd_eventq_t *eq) 146 { 147 fmd_eventqelem_t *eqe; 148 fmd_event_t *ep; 149 150 (void) pthread_mutex_lock(&eq->eq_lock); 151 152 while (eq->eq_size == 0 && eq->eq_abort == 0) 153 (void) pthread_cond_wait(&eq->eq_cv, &eq->eq_lock); 154 155 if (eq->eq_abort) { 156 (void) pthread_mutex_unlock(&eq->eq_lock); 157 return (NULL); 158 } 159 160 eqe = fmd_list_next(&eq->eq_list); 161 fmd_list_delete(&eq->eq_list, eqe); 162 eq->eq_size--; 163 164 (void) pthread_mutex_unlock(&eq->eq_lock); 165 166 ep = eqe->eqe_event; 167 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 168 169 return (ep); 170 } 171 172 void 173 fmd_eventq_cancel(fmd_eventq_t *eq, uint_t type, void *data) 174 { 175 fmd_eventqelem_t *eqe, *nqe; 176 177 (void) pthread_mutex_lock(&eq->eq_lock); 178 179 for (eqe = fmd_list_next(&eq->eq_list); eqe != NULL; eqe = nqe) { 180 nqe = fmd_list_next(eqe); 181 182 if (fmd_event_match(eqe->eqe_event, type, data)) { 183 fmd_list_delete(&eq->eq_list, eqe); 184 eq->eq_size--; 185 fmd_event_rele(eqe->eqe_event); 186 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 187 } 188 } 189 190 (void) pthread_mutex_unlock(&eq->eq_lock); 191 } 192 193 void 194 fmd_eventq_abort(fmd_eventq_t *eq) 195 { 196 fmd_eventqelem_t *eqe; 197 198 (void) pthread_mutex_lock(&eq->eq_lock); 199 200 while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) { 201 fmd_list_delete(&eq->eq_list, eqe); 202 fmd_event_rele(eqe->eqe_event); 203 fmd_free(eqe, sizeof (fmd_eventqelem_t)); 204 } 205 206 eq->eq_abort++; /* signal fmd_eventq_delete() to abort */ 207 208 (void) pthread_mutex_unlock(&eq->eq_lock); 209 (void) pthread_cond_broadcast(&eq->eq_cv); 210 } 211