1 /*- 2 * Copyright (c) 2011 NetApp, Inc. 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY NETAPP, INC ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL NETAPP, INC OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD$ 27 */ 28 29 /* 30 * Micro event library for FreeBSD, designed for a single i/o thread 31 * using kqueue, and having events be persistent by default. 32 */ 33 34 #include <sys/cdefs.h> 35 __FBSDID("$FreeBSD$"); 36 37 #include <assert.h> 38 #include <errno.h> 39 #include <stdlib.h> 40 #include <stdio.h> 41 #include <string.h> 42 #include <unistd.h> 43 44 #include <sys/types.h> 45 #include <sys/event.h> 46 #include <sys/time.h> 47 48 #include <pthread.h> 49 #include <pthread_np.h> 50 51 #include "mevent.h" 52 53 #define MEVENT_MAX 64 54 55 #define MEV_ADD 1 56 #define MEV_ENABLE 2 57 #define MEV_DISABLE 3 58 #define MEV_DEL_PENDING 4 59 60 extern char *vmname; 61 62 static pthread_t mevent_tid; 63 static int mevent_timid = 43; 64 static int mevent_pipefd[2]; 65 static pthread_mutex_t mevent_lmutex = PTHREAD_MUTEX_INITIALIZER; 66 67 struct mevent { 68 void (*me_func)(int, enum ev_type, void *); 69 #define me_msecs me_fd 70 int me_fd; 71 int me_timid; 72 enum ev_type me_type; 73 void *me_param; 74 int me_cq; 75 int me_state; 76 int me_closefd; 77 LIST_ENTRY(mevent) me_list; 78 }; 79 80 static LIST_HEAD(listhead, mevent) global_head, change_head; 81 82 static void 83 mevent_qlock(void) 84 { 85 pthread_mutex_lock(&mevent_lmutex); 86 } 87 88 static void 89 mevent_qunlock(void) 90 { 91 pthread_mutex_unlock(&mevent_lmutex); 92 } 93 94 static void 95 mevent_pipe_read(int fd, enum ev_type type, void *param) 96 { 97 char buf[MEVENT_MAX]; 98 int status; 99 100 /* 101 * Drain the pipe read side. The fd is non-blocking so this is 102 * safe to do. 103 */ 104 do { 105 status = read(fd, buf, sizeof(buf)); 106 } while (status == MEVENT_MAX); 107 } 108 109 static void 110 mevent_notify(void) 111 { 112 char c; 113 114 /* 115 * If calling from outside the i/o thread, write a byte on the 116 * pipe to force the i/o thread to exit the blocking kevent call. 117 */ 118 if (mevent_pipefd[1] != 0 && pthread_self() != mevent_tid) { 119 write(mevent_pipefd[1], &c, 1); 120 } 121 } 122 123 static int 124 mevent_kq_filter(struct mevent *mevp) 125 { 126 int retval; 127 128 retval = 0; 129 130 if (mevp->me_type == EVF_READ) 131 retval = EVFILT_READ; 132 133 if (mevp->me_type == EVF_WRITE) 134 retval = EVFILT_WRITE; 135 136 if (mevp->me_type == EVF_TIMER) 137 retval = EVFILT_TIMER; 138 139 if (mevp->me_type == EVF_SIGNAL) 140 retval = EVFILT_SIGNAL; 141 142 return (retval); 143 } 144 145 static int 146 mevent_kq_flags(struct mevent *mevp) 147 { 148 int ret; 149 150 switch (mevp->me_state) { 151 case MEV_ADD: 152 ret = EV_ADD; /* implicitly enabled */ 153 break; 154 case MEV_ENABLE: 155 ret = EV_ENABLE; 156 break; 157 case MEV_DISABLE: 158 ret = EV_DISABLE; 159 break; 160 case MEV_DEL_PENDING: 161 ret = EV_DELETE; 162 break; 163 default: 164 assert(0); 165 break; 166 } 167 168 return (ret); 169 } 170 171 static int 172 mevent_kq_fflags(struct mevent *mevp) 173 { 174 /* XXX nothing yet, perhaps EV_EOF for reads ? */ 175 return (0); 176 } 177 178 static int 179 mevent_build(int mfd, struct kevent *kev) 180 { 181 struct mevent *mevp, *tmpp; 182 int i; 183 184 i = 0; 185 186 mevent_qlock(); 187 188 LIST_FOREACH_SAFE(mevp, &change_head, me_list, tmpp) { 189 if (mevp->me_closefd) { 190 /* 191 * A close of the file descriptor will remove the 192 * event 193 */ 194 close(mevp->me_fd); 195 } else { 196 if (mevp->me_type == EVF_TIMER) { 197 kev[i].ident = mevp->me_timid; 198 kev[i].data = mevp->me_msecs; 199 } else { 200 kev[i].ident = mevp->me_fd; 201 kev[i].data = 0; 202 } 203 kev[i].filter = mevent_kq_filter(mevp); 204 kev[i].flags = mevent_kq_flags(mevp); 205 kev[i].fflags = mevent_kq_fflags(mevp); 206 kev[i].udata = mevp; 207 i++; 208 } 209 210 mevp->me_cq = 0; 211 LIST_REMOVE(mevp, me_list); 212 213 if (mevp->me_state == MEV_DEL_PENDING) { 214 free(mevp); 215 } else { 216 LIST_INSERT_HEAD(&global_head, mevp, me_list); 217 } 218 219 assert(i < MEVENT_MAX); 220 } 221 222 mevent_qunlock(); 223 224 return (i); 225 } 226 227 static void 228 mevent_handle(struct kevent *kev, int numev) 229 { 230 struct mevent *mevp; 231 int i; 232 233 for (i = 0; i < numev; i++) { 234 mevp = kev[i].udata; 235 236 /* XXX check for EV_ERROR ? */ 237 238 (*mevp->me_func)(mevp->me_fd, mevp->me_type, mevp->me_param); 239 } 240 } 241 242 struct mevent * 243 mevent_add(int tfd, enum ev_type type, 244 void (*func)(int, enum ev_type, void *), void *param) 245 { 246 struct mevent *lp, *mevp; 247 248 if (tfd < 0 || func == NULL) { 249 return (NULL); 250 } 251 252 mevp = NULL; 253 254 mevent_qlock(); 255 256 /* 257 * Verify that the fd/type tuple is not present in any list 258 */ 259 LIST_FOREACH(lp, &global_head, me_list) { 260 if (type != EVF_TIMER && lp->me_fd == tfd && 261 lp->me_type == type) { 262 goto exit; 263 } 264 } 265 266 LIST_FOREACH(lp, &change_head, me_list) { 267 if (type != EVF_TIMER && lp->me_fd == tfd && 268 lp->me_type == type) { 269 goto exit; 270 } 271 } 272 273 /* 274 * Allocate an entry, populate it, and add it to the change list. 275 */ 276 mevp = calloc(1, sizeof(struct mevent)); 277 if (mevp == NULL) { 278 goto exit; 279 } 280 281 if (type == EVF_TIMER) { 282 mevp->me_msecs = tfd; 283 mevp->me_timid = mevent_timid++; 284 } else 285 mevp->me_fd = tfd; 286 mevp->me_type = type; 287 mevp->me_func = func; 288 mevp->me_param = param; 289 290 LIST_INSERT_HEAD(&change_head, mevp, me_list); 291 mevp->me_cq = 1; 292 mevp->me_state = MEV_ADD; 293 mevent_notify(); 294 295 exit: 296 mevent_qunlock(); 297 298 return (mevp); 299 } 300 301 static int 302 mevent_update(struct mevent *evp, int newstate) 303 { 304 /* 305 * It's not possible to enable/disable a deleted event 306 */ 307 if (evp->me_state == MEV_DEL_PENDING) 308 return (EINVAL); 309 310 /* 311 * No update needed if state isn't changing 312 */ 313 if (evp->me_state == newstate) 314 return (0); 315 316 mevent_qlock(); 317 318 evp->me_state = newstate; 319 320 /* 321 * Place the entry onto the changed list if not already there. 322 */ 323 if (evp->me_cq == 0) { 324 evp->me_cq = 1; 325 LIST_REMOVE(evp, me_list); 326 LIST_INSERT_HEAD(&change_head, evp, me_list); 327 mevent_notify(); 328 } 329 330 mevent_qunlock(); 331 332 return (0); 333 } 334 335 int 336 mevent_enable(struct mevent *evp) 337 { 338 339 return (mevent_update(evp, MEV_ENABLE)); 340 } 341 342 int 343 mevent_disable(struct mevent *evp) 344 { 345 346 return (mevent_update(evp, MEV_DISABLE)); 347 } 348 349 static int 350 mevent_delete_event(struct mevent *evp, int closefd) 351 { 352 mevent_qlock(); 353 354 /* 355 * Place the entry onto the changed list if not already there, and 356 * mark as to be deleted. 357 */ 358 if (evp->me_cq == 0) { 359 evp->me_cq = 1; 360 LIST_REMOVE(evp, me_list); 361 LIST_INSERT_HEAD(&change_head, evp, me_list); 362 mevent_notify(); 363 } 364 evp->me_state = MEV_DEL_PENDING; 365 366 if (closefd) 367 evp->me_closefd = 1; 368 369 mevent_qunlock(); 370 371 return (0); 372 } 373 374 int 375 mevent_delete(struct mevent *evp) 376 { 377 378 return (mevent_delete_event(evp, 0)); 379 } 380 381 int 382 mevent_delete_close(struct mevent *evp) 383 { 384 385 return (mevent_delete_event(evp, 1)); 386 } 387 388 static void 389 mevent_set_name(void) 390 { 391 392 pthread_set_name_np(mevent_tid, "mevent"); 393 } 394 395 void 396 mevent_dispatch(void) 397 { 398 struct kevent changelist[MEVENT_MAX]; 399 struct kevent eventlist[MEVENT_MAX]; 400 struct mevent *pipev; 401 int mfd; 402 int numev; 403 int ret; 404 405 mevent_tid = pthread_self(); 406 mevent_set_name(); 407 408 mfd = kqueue(); 409 assert(mfd > 0); 410 411 /* 412 * Open the pipe that will be used for other threads to force 413 * the blocking kqueue call to exit by writing to it. Set the 414 * descriptor to non-blocking. 415 */ 416 ret = pipe(mevent_pipefd); 417 if (ret < 0) { 418 perror("pipe"); 419 exit(0); 420 } 421 422 /* 423 * Add internal event handler for the pipe write fd 424 */ 425 pipev = mevent_add(mevent_pipefd[0], EVF_READ, mevent_pipe_read, NULL); 426 assert(pipev != NULL); 427 428 for (;;) { 429 /* 430 * Build changelist if required. 431 * XXX the changelist can be put into the blocking call 432 * to eliminate the extra syscall. Currently better for 433 * debug. 434 */ 435 numev = mevent_build(mfd, changelist); 436 if (numev) { 437 ret = kevent(mfd, changelist, numev, NULL, 0, NULL); 438 if (ret == -1) { 439 perror("Error return from kevent change"); 440 } 441 } 442 443 /* 444 * Block awaiting events 445 */ 446 ret = kevent(mfd, NULL, 0, eventlist, MEVENT_MAX, NULL); 447 if (ret == -1 && errno != EINTR) { 448 perror("Error return from kevent monitor"); 449 } 450 451 /* 452 * Handle reported events 453 */ 454 mevent_handle(eventlist, ret); 455 } 456 } 457