1 /*- 2 * Copyright (c) 2011 NetApp, Inc. 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY NETAPP, INC ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL NETAPP, INC OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD$ 27 */ 28 29 /* 30 * Micro event library for FreeBSD, designed for a single i/o thread 31 * using kqueue, and having events be persistent by default. 32 */ 33 34 #include <sys/cdefs.h> 35 __FBSDID("$FreeBSD$"); 36 37 #include <assert.h> 38 #include <errno.h> 39 #include <stdlib.h> 40 #include <stdio.h> 41 #include <string.h> 42 #include <unistd.h> 43 44 #include <sys/types.h> 45 #include <sys/event.h> 46 #include <sys/time.h> 47 48 #include <pthread.h> 49 #include <pthread_np.h> 50 51 #include "mevent.h" 52 53 #define MEVENT_MAX 64 54 55 #define MEV_ENABLE 1 56 #define MEV_DISABLE 2 57 #define MEV_DEL_PENDING 3 58 59 extern char *vmname; 60 61 static pthread_t mevent_tid; 62 static int mevent_timid = 43; 63 static int mevent_pipefd[2]; 64 static pthread_mutex_t mevent_lmutex = PTHREAD_MUTEX_INITIALIZER; 65 66 struct mevent { 67 void (*me_func)(int, enum ev_type, void *); 68 #define me_msecs me_fd 69 int me_fd; 70 int me_timid; 71 enum ev_type me_type; 72 void *me_param; 73 int me_cq; 74 int me_state; 75 int me_closefd; 76 LIST_ENTRY(mevent) me_list; 77 }; 78 79 static LIST_HEAD(listhead, mevent) global_head, change_head; 80 81 static void 82 mevent_qlock(void) 83 { 84 pthread_mutex_lock(&mevent_lmutex); 85 } 86 87 static void 88 mevent_qunlock(void) 89 { 90 pthread_mutex_unlock(&mevent_lmutex); 91 } 92 93 static void 94 mevent_pipe_read(int fd, enum ev_type type, void *param) 95 { 96 char buf[MEVENT_MAX]; 97 int status; 98 99 /* 100 * Drain the pipe read side. The fd is non-blocking so this is 101 * safe to do. 102 */ 103 do { 104 status = read(fd, buf, sizeof(buf)); 105 } while (status == MEVENT_MAX); 106 } 107 108 static void 109 mevent_notify(void) 110 { 111 char c; 112 113 /* 114 * If calling from outside the i/o thread, write a byte on the 115 * pipe to force the i/o thread to exit the blocking kevent call. 116 */ 117 if (mevent_pipefd[1] != 0 && pthread_self() != mevent_tid) { 118 write(mevent_pipefd[1], &c, 1); 119 } 120 } 121 122 static int 123 mevent_kq_filter(struct mevent *mevp) 124 { 125 int retval; 126 127 retval = 0; 128 129 if (mevp->me_type == EVF_READ) 130 retval = EVFILT_READ; 131 132 if (mevp->me_type == EVF_WRITE) 133 retval = EVFILT_WRITE; 134 135 if (mevp->me_type == EVF_TIMER) 136 retval = EVFILT_TIMER; 137 138 if (mevp->me_type == EVF_SIGNAL) 139 retval = EVFILT_SIGNAL; 140 141 return (retval); 142 } 143 144 static int 145 mevent_kq_flags(struct mevent *mevp) 146 { 147 int ret; 148 149 switch (mevp->me_state) { 150 case MEV_ENABLE: 151 ret = EV_ADD; 152 if (mevp->me_type == EVF_TIMER) 153 ret |= EV_ENABLE; 154 break; 155 case MEV_DISABLE: 156 ret = EV_DISABLE; 157 break; 158 case MEV_DEL_PENDING: 159 ret = EV_DELETE; 160 break; 161 } 162 163 return (ret); 164 } 165 166 static int 167 mevent_kq_fflags(struct mevent *mevp) 168 { 169 /* XXX nothing yet, perhaps EV_EOF for reads ? */ 170 return (0); 171 } 172 173 static int 174 mevent_build(int mfd, struct kevent *kev) 175 { 176 struct mevent *mevp, *tmpp; 177 int i; 178 179 i = 0; 180 181 mevent_qlock(); 182 183 LIST_FOREACH_SAFE(mevp, &change_head, me_list, tmpp) { 184 if (mevp->me_closefd) { 185 /* 186 * A close of the file descriptor will remove the 187 * event 188 */ 189 close(mevp->me_fd); 190 } else { 191 if (mevp->me_type == EVF_TIMER) { 192 kev[i].ident = mevp->me_timid; 193 kev[i].data = mevp->me_msecs; 194 } else { 195 kev[i].ident = mevp->me_fd; 196 kev[i].data = 0; 197 } 198 kev[i].filter = mevent_kq_filter(mevp); 199 kev[i].flags = mevent_kq_flags(mevp); 200 kev[i].fflags = mevent_kq_fflags(mevp); 201 kev[i].udata = mevp; 202 i++; 203 } 204 205 mevp->me_cq = 0; 206 LIST_REMOVE(mevp, me_list); 207 208 if (mevp->me_state == MEV_DEL_PENDING) { 209 free(mevp); 210 } else { 211 LIST_INSERT_HEAD(&global_head, mevp, me_list); 212 } 213 214 assert(i < MEVENT_MAX); 215 } 216 217 mevent_qunlock(); 218 219 return (i); 220 } 221 222 static void 223 mevent_handle(struct kevent *kev, int numev) 224 { 225 struct mevent *mevp; 226 int i; 227 228 for (i = 0; i < numev; i++) { 229 mevp = kev[i].udata; 230 231 /* XXX check for EV_ERROR ? */ 232 233 (*mevp->me_func)(mevp->me_fd, mevp->me_type, mevp->me_param); 234 } 235 } 236 237 struct mevent * 238 mevent_add(int tfd, enum ev_type type, 239 void (*func)(int, enum ev_type, void *), void *param) 240 { 241 struct mevent *lp, *mevp; 242 243 if (tfd < 0 || func == NULL) { 244 return (NULL); 245 } 246 247 mevp = NULL; 248 249 mevent_qlock(); 250 251 /* 252 * Verify that the fd/type tuple is not present in any list 253 */ 254 LIST_FOREACH(lp, &global_head, me_list) { 255 if (type != EVF_TIMER && lp->me_fd == tfd && 256 lp->me_type == type) { 257 goto exit; 258 } 259 } 260 261 LIST_FOREACH(lp, &change_head, me_list) { 262 if (type != EVF_TIMER && lp->me_fd == tfd && 263 lp->me_type == type) { 264 goto exit; 265 } 266 } 267 268 /* 269 * Allocate an entry, populate it, and add it to the change list. 270 */ 271 mevp = malloc(sizeof(struct mevent)); 272 if (mevp == NULL) { 273 goto exit; 274 } 275 276 memset(mevp, 0, sizeof(struct mevent)); 277 if (type == EVF_TIMER) { 278 mevp->me_msecs = tfd; 279 mevp->me_timid = mevent_timid++; 280 } else 281 mevp->me_fd = tfd; 282 mevp->me_type = type; 283 mevp->me_func = func; 284 mevp->me_param = param; 285 286 LIST_INSERT_HEAD(&change_head, mevp, me_list); 287 mevp->me_cq = 1; 288 mevp->me_state = MEV_ENABLE; 289 mevent_notify(); 290 291 exit: 292 mevent_qunlock(); 293 294 return (mevp); 295 } 296 297 static int 298 mevent_update(struct mevent *evp, int newstate) 299 { 300 /* 301 * It's not possible to enable/disable a deleted event 302 */ 303 if (evp->me_state == MEV_DEL_PENDING) 304 return (EINVAL); 305 306 /* 307 * No update needed if state isn't changing 308 */ 309 if (evp->me_state == newstate) 310 return (0); 311 312 mevent_qlock(); 313 314 evp->me_state = newstate; 315 316 /* 317 * Place the entry onto the changed list if not already there. 318 */ 319 if (evp->me_cq == 0) { 320 evp->me_cq = 1; 321 LIST_REMOVE(evp, me_list); 322 LIST_INSERT_HEAD(&change_head, evp, me_list); 323 mevent_notify(); 324 } 325 326 mevent_qunlock(); 327 328 return (0); 329 } 330 331 int 332 mevent_enable(struct mevent *evp) 333 { 334 335 return (mevent_update(evp, MEV_ENABLE)); 336 } 337 338 int 339 mevent_disable(struct mevent *evp) 340 { 341 342 return (mevent_update(evp, MEV_DISABLE)); 343 } 344 345 static int 346 mevent_delete_event(struct mevent *evp, int closefd) 347 { 348 mevent_qlock(); 349 350 /* 351 * Place the entry onto the changed list if not already there, and 352 * mark as to be deleted. 353 */ 354 if (evp->me_cq == 0) { 355 evp->me_cq = 1; 356 LIST_REMOVE(evp, me_list); 357 LIST_INSERT_HEAD(&change_head, evp, me_list); 358 mevent_notify(); 359 } 360 evp->me_state = MEV_DEL_PENDING; 361 362 if (closefd) 363 evp->me_closefd = 1; 364 365 mevent_qunlock(); 366 367 return (0); 368 } 369 370 int 371 mevent_delete(struct mevent *evp) 372 { 373 374 return (mevent_delete_event(evp, 0)); 375 } 376 377 int 378 mevent_delete_close(struct mevent *evp) 379 { 380 381 return (mevent_delete_event(evp, 1)); 382 } 383 384 static void 385 mevent_set_name(void) 386 { 387 388 pthread_set_name_np(mevent_tid, "mevent"); 389 } 390 391 void 392 mevent_dispatch(void) 393 { 394 struct kevent changelist[MEVENT_MAX]; 395 struct kevent eventlist[MEVENT_MAX]; 396 struct mevent *pipev; 397 int mfd; 398 int numev; 399 int ret; 400 401 mevent_tid = pthread_self(); 402 mevent_set_name(); 403 404 mfd = kqueue(); 405 assert(mfd > 0); 406 407 /* 408 * Open the pipe that will be used for other threads to force 409 * the blocking kqueue call to exit by writing to it. Set the 410 * descriptor to non-blocking. 411 */ 412 ret = pipe(mevent_pipefd); 413 if (ret < 0) { 414 perror("pipe"); 415 exit(0); 416 } 417 418 /* 419 * Add internal event handler for the pipe write fd 420 */ 421 pipev = mevent_add(mevent_pipefd[0], EVF_READ, mevent_pipe_read, NULL); 422 assert(pipev != NULL); 423 424 for (;;) { 425 /* 426 * Build changelist if required. 427 * XXX the changelist can be put into the blocking call 428 * to eliminate the extra syscall. Currently better for 429 * debug. 430 */ 431 numev = mevent_build(mfd, changelist); 432 if (numev) { 433 ret = kevent(mfd, changelist, numev, NULL, 0, NULL); 434 if (ret == -1) { 435 perror("Error return from kevent change"); 436 } 437 } 438 439 /* 440 * Block awaiting events 441 */ 442 ret = kevent(mfd, NULL, 0, eventlist, MEVENT_MAX, NULL); 443 if (ret == -1 && errno != EINTR) { 444 perror("Error return from kevent monitor"); 445 } 446 447 /* 448 * Handle reported events 449 */ 450 mevent_handle(eventlist, ret); 451 } 452 } 453