1 /*- 2 * Copyright (c) 2011 NetApp, Inc. 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY NETAPP, INC ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL NETAPP, INC OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD$ 27 */ 28 29 /* 30 * Micro event library for FreeBSD, designed for a single i/o thread 31 * using kqueue, and having events be persistent by default. 32 */ 33 34 #include <sys/cdefs.h> 35 __FBSDID("$FreeBSD$"); 36 37 #include <assert.h> 38 #include <errno.h> 39 #include <stdlib.h> 40 #include <stdio.h> 41 #include <string.h> 42 #include <unistd.h> 43 44 #include <sys/types.h> 45 #include <sys/event.h> 46 #include <sys/time.h> 47 48 #include <pthread.h> 49 #include <pthread_np.h> 50 51 #include "mevent.h" 52 53 #define MEVENT_MAX 64 54 55 #define MEV_ENABLE 1 56 #define MEV_DISABLE 2 57 #define MEV_DEL_PENDING 3 58 59 extern char *vmname; 60 61 static pthread_t mevent_tid; 62 static int mevent_pipefd[2]; 63 static pthread_mutex_t mevent_lmutex = PTHREAD_MUTEX_INITIALIZER; 64 65 struct mevent { 66 void (*me_func)(int, enum ev_type, void *); 67 int me_fd; 68 enum ev_type me_type; 69 void *me_param; 70 int me_cq; 71 int me_state; 72 int me_closefd; 73 LIST_ENTRY(mevent) me_list; 74 }; 75 76 static LIST_HEAD(listhead, mevent) global_head, change_head; 77 78 static void 79 mevent_qlock(void) 80 { 81 pthread_mutex_lock(&mevent_lmutex); 82 } 83 84 static void 85 mevent_qunlock(void) 86 { 87 pthread_mutex_unlock(&mevent_lmutex); 88 } 89 90 static void 91 mevent_pipe_read(int fd, enum ev_type type, void *param) 92 { 93 char buf[MEVENT_MAX]; 94 int status; 95 96 /* 97 * Drain the pipe read side. The fd is non-blocking so this is 98 * safe to do. 99 */ 100 do { 101 status = read(fd, buf, sizeof(buf)); 102 } while (status == MEVENT_MAX); 103 } 104 105 static void 106 mevent_notify(void) 107 { 108 char c; 109 110 /* 111 * If calling from outside the i/o thread, write a byte on the 112 * pipe to force the i/o thread to exit the blocking kevent call. 113 */ 114 if (mevent_pipefd[1] != 0 && pthread_self() != mevent_tid) { 115 write(mevent_pipefd[1], &c, 1); 116 } 117 } 118 119 static int 120 mevent_kq_filter(struct mevent *mevp) 121 { 122 int retval; 123 124 retval = 0; 125 126 if (mevp->me_type == EVF_READ) 127 retval = EVFILT_READ; 128 129 if (mevp->me_type == EVF_WRITE) 130 retval = EVFILT_WRITE; 131 132 return (retval); 133 } 134 135 static int 136 mevent_kq_flags(struct mevent *mevp) 137 { 138 int ret; 139 140 switch (mevp->me_state) { 141 case MEV_ENABLE: 142 ret = EV_ADD; 143 break; 144 case MEV_DISABLE: 145 ret = EV_DISABLE; 146 break; 147 case MEV_DEL_PENDING: 148 ret = EV_DELETE; 149 break; 150 } 151 152 return (ret); 153 } 154 155 static int 156 mevent_kq_fflags(struct mevent *mevp) 157 { 158 /* XXX nothing yet, perhaps EV_EOF for reads ? */ 159 return (0); 160 } 161 162 static int 163 mevent_build(int mfd, struct kevent *kev) 164 { 165 struct mevent *mevp, *tmpp; 166 int i; 167 168 i = 0; 169 170 mevent_qlock(); 171 172 LIST_FOREACH_SAFE(mevp, &change_head, me_list, tmpp) { 173 if (mevp->me_closefd) { 174 /* 175 * A close of the file descriptor will remove the 176 * event 177 */ 178 close(mevp->me_fd); 179 } else { 180 kev[i].ident = mevp->me_fd; 181 kev[i].filter = mevent_kq_filter(mevp); 182 kev[i].flags = mevent_kq_flags(mevp); 183 kev[i].fflags = mevent_kq_fflags(mevp); 184 kev[i].data = 0; 185 kev[i].udata = mevp; 186 i++; 187 } 188 189 mevp->me_cq = 0; 190 LIST_REMOVE(mevp, me_list); 191 192 if (mevp->me_state == MEV_DEL_PENDING) { 193 free(mevp); 194 } else { 195 LIST_INSERT_HEAD(&global_head, mevp, me_list); 196 } 197 198 assert(i < MEVENT_MAX); 199 } 200 201 mevent_qunlock(); 202 203 return (i); 204 } 205 206 static void 207 mevent_handle(struct kevent *kev, int numev) 208 { 209 struct mevent *mevp; 210 int i; 211 212 for (i = 0; i < numev; i++) { 213 mevp = kev[i].udata; 214 215 /* XXX check for EV_ERROR ? */ 216 217 (*mevp->me_func)(mevp->me_fd, mevp->me_type, mevp->me_param); 218 } 219 } 220 221 struct mevent * 222 mevent_add(int fd, enum ev_type type, 223 void (*func)(int, enum ev_type, void *), void *param) 224 { 225 struct mevent *lp, *mevp; 226 227 if (fd < 0 || func == NULL) { 228 return (NULL); 229 } 230 231 mevp = NULL; 232 233 mevent_qlock(); 234 235 /* 236 * Verify that the fd/type tuple is not present in any list 237 */ 238 LIST_FOREACH(lp, &global_head, me_list) { 239 if (lp->me_fd == fd && lp->me_type == type) { 240 goto exit; 241 } 242 } 243 244 LIST_FOREACH(lp, &change_head, me_list) { 245 if (lp->me_fd == fd && lp->me_type == type) { 246 goto exit; 247 } 248 } 249 250 /* 251 * Allocate an entry, populate it, and add it to the change list. 252 */ 253 mevp = malloc(sizeof(struct mevent)); 254 if (mevp == NULL) { 255 goto exit; 256 } 257 258 memset(mevp, 0, sizeof(struct mevent)); 259 mevp->me_fd = fd; 260 mevp->me_type = type; 261 mevp->me_func = func; 262 mevp->me_param = param; 263 264 LIST_INSERT_HEAD(&change_head, mevp, me_list); 265 mevp->me_cq = 1; 266 mevp->me_state = MEV_ENABLE; 267 mevent_notify(); 268 269 exit: 270 mevent_qunlock(); 271 272 return (mevp); 273 } 274 275 static int 276 mevent_update(struct mevent *evp, int newstate) 277 { 278 /* 279 * It's not possible to enable/disable a deleted event 280 */ 281 if (evp->me_state == MEV_DEL_PENDING) 282 return (EINVAL); 283 284 /* 285 * No update needed if state isn't changing 286 */ 287 if (evp->me_state == newstate) 288 return (0); 289 290 mevent_qlock(); 291 292 evp->me_state = newstate; 293 294 /* 295 * Place the entry onto the changed list if not already there. 296 */ 297 if (evp->me_cq == 0) { 298 evp->me_cq = 1; 299 LIST_REMOVE(evp, me_list); 300 LIST_INSERT_HEAD(&change_head, evp, me_list); 301 mevent_notify(); 302 } 303 304 mevent_qunlock(); 305 306 return (0); 307 } 308 309 int 310 mevent_enable(struct mevent *evp) 311 { 312 313 return (mevent_update(evp, MEV_ENABLE)); 314 } 315 316 int 317 mevent_disable(struct mevent *evp) 318 { 319 320 return (mevent_update(evp, MEV_DISABLE)); 321 } 322 323 static int 324 mevent_delete_event(struct mevent *evp, int closefd) 325 { 326 mevent_qlock(); 327 328 /* 329 * Place the entry onto the changed list if not already there, and 330 * mark as to be deleted. 331 */ 332 if (evp->me_cq == 0) { 333 evp->me_cq = 1; 334 LIST_REMOVE(evp, me_list); 335 LIST_INSERT_HEAD(&change_head, evp, me_list); 336 mevent_notify(); 337 } 338 evp->me_state = MEV_DEL_PENDING; 339 340 if (closefd) 341 evp->me_closefd = 1; 342 343 mevent_qunlock(); 344 345 return (0); 346 } 347 348 int 349 mevent_delete(struct mevent *evp) 350 { 351 352 return (mevent_delete_event(evp, 0)); 353 } 354 355 int 356 mevent_delete_close(struct mevent *evp) 357 { 358 359 return (mevent_delete_event(evp, 1)); 360 } 361 362 static void 363 mevent_set_name(void) 364 { 365 char tname[MAXCOMLEN + 1]; 366 367 snprintf(tname, sizeof(tname), "%s mevent", vmname); 368 pthread_set_name_np(mevent_tid, tname); 369 } 370 371 void 372 mevent_dispatch(void) 373 { 374 struct kevent changelist[MEVENT_MAX]; 375 struct kevent eventlist[MEVENT_MAX]; 376 struct mevent *pipev; 377 int mfd; 378 int numev; 379 int ret; 380 381 mevent_tid = pthread_self(); 382 mevent_set_name(); 383 384 mfd = kqueue(); 385 assert(mfd > 0); 386 387 /* 388 * Open the pipe that will be used for other threads to force 389 * the blocking kqueue call to exit by writing to it. Set the 390 * descriptor to non-blocking. 391 */ 392 ret = pipe(mevent_pipefd); 393 if (ret < 0) { 394 perror("pipe"); 395 exit(0); 396 } 397 398 /* 399 * Add internal event handler for the pipe write fd 400 */ 401 pipev = mevent_add(mevent_pipefd[0], EVF_READ, mevent_pipe_read, NULL); 402 assert(pipev != NULL); 403 404 for (;;) { 405 /* 406 * Build changelist if required. 407 * XXX the changelist can be put into the blocking call 408 * to eliminate the extra syscall. Currently better for 409 * debug. 410 */ 411 numev = mevent_build(mfd, changelist); 412 if (numev) { 413 ret = kevent(mfd, changelist, numev, NULL, 0, NULL); 414 if (ret == -1) { 415 perror("Error return from kevent change"); 416 } 417 } 418 419 /* 420 * Block awaiting events 421 */ 422 ret = kevent(mfd, NULL, 0, eventlist, MEVENT_MAX, NULL); 423 if (ret == -1) { 424 perror("Error return from kevent monitor"); 425 } 426 427 /* 428 * Handle reported events 429 */ 430 mevent_handle(eventlist, ret); 431 } 432 } 433