1 /* 2 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include <sys/types.h> 29 30 #ifdef HAVE_CONFIG_H 31 #include "config.h" 32 #endif 33 34 #ifdef HAVE_SYS_TIME_H 35 #include <sys/time.h> 36 #endif 37 38 #include <errno.h> 39 #include <stdio.h> 40 #include <stdlib.h> 41 #include <string.h> 42 #ifdef HAVE_STDARG_H 43 #include <stdarg.h> 44 #endif 45 46 #include "event.h" 47 48 /* prototypes */ 49 50 void bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t); 51 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *); 52 53 static int 54 bufferevent_add(struct event *ev, int timeout) 55 { 56 struct timeval tv, *ptv = NULL; 57 58 if (timeout) { 59 timerclear(&tv); 60 tv.tv_sec = timeout; 61 ptv = &tv; 62 } 63 64 return (event_add(ev, ptv)); 65 } 66 67 /* 68 * This callback is executed when the size of the input buffer changes. 69 * We use it to apply back pressure on the reading side. 70 */ 71 72 void 73 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, 74 void *arg) { 75 struct bufferevent *bufev = arg; 76 /* 77 * If we are below the watermark then reschedule reading if it's 78 * still enabled. 79 */ 80 if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { 81 evbuffer_setcb(buf, NULL, NULL); 82 83 if (bufev->enabled & EV_READ) 84 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 85 } 86 } 87 88 static void 89 bufferevent_readcb(int fd, short event, void *arg) 90 { 91 struct bufferevent *bufev = arg; 92 int res = 0; 93 short what = EVBUFFER_READ; 94 size_t len; 95 int howmuch = -1; 96 97 if (event == EV_TIMEOUT) { 98 what |= EVBUFFER_TIMEOUT; 99 goto error; 100 } 101 102 /* 103 * If we have a high watermark configured then we don't want to 104 * read more data than would make us reach the watermark. 105 */ 106 if (bufev->wm_read.high != 0) 107 howmuch = bufev->wm_read.high; 108 109 res = evbuffer_read(bufev->input, fd, howmuch); 110 if (res == -1) { 111 if (errno == EAGAIN || errno == EINTR) 112 goto reschedule; 113 /* error case */ 114 what |= EVBUFFER_ERROR; 115 } else if (res == 0) { 116 /* eof case */ 117 what |= EVBUFFER_EOF; 118 } 119 120 if (res <= 0) 121 goto error; 122 123 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 124 125 /* See if this callbacks meets the water marks */ 126 len = EVBUFFER_LENGTH(bufev->input); 127 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) 128 return; 129 if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) { 130 struct evbuffer *buf = bufev->input; 131 event_del(&bufev->ev_read); 132 133 /* Now schedule a callback for us */ 134 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); 135 return; 136 } 137 138 /* Invoke the user callback - must always be called last */ 139 if (bufev->readcb != NULL) 140 (*bufev->readcb)(bufev, bufev->cbarg); 141 return; 142 143 reschedule: 144 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 145 return; 146 147 error: 148 (*bufev->errorcb)(bufev, what, bufev->cbarg); 149 } 150 151 static void 152 bufferevent_writecb(int fd, short event, void *arg) 153 { 154 struct bufferevent *bufev = arg; 155 int res = 0; 156 short what = EVBUFFER_WRITE; 157 158 if (event == EV_TIMEOUT) { 159 what |= EVBUFFER_TIMEOUT; 160 goto error; 161 } 162 163 if (EVBUFFER_LENGTH(bufev->output)) { 164 res = evbuffer_write(bufev->output, fd); 165 if (res == -1) { 166 #ifndef WIN32 167 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not 168 *set errno. thus this error checking is not portable*/ 169 if (errno == EAGAIN || 170 errno == EINTR || 171 errno == EINPROGRESS) 172 goto reschedule; 173 /* error case */ 174 what |= EVBUFFER_ERROR; 175 176 #else 177 goto reschedule; 178 #endif 179 180 } else if (res == 0) { 181 /* eof case */ 182 what |= EVBUFFER_EOF; 183 } 184 if (res <= 0) 185 goto error; 186 } 187 188 if (EVBUFFER_LENGTH(bufev->output) != 0) 189 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 190 191 /* 192 * Invoke the user callback if our buffer is drained or below the 193 * low watermark. 194 */ 195 if (bufev->writecb != NULL && 196 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) 197 (*bufev->writecb)(bufev, bufev->cbarg); 198 199 return; 200 201 reschedule: 202 if (EVBUFFER_LENGTH(bufev->output) != 0) 203 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 204 return; 205 206 error: 207 (*bufev->errorcb)(bufev, what, bufev->cbarg); 208 } 209 210 /* 211 * Create a new buffered event object. 212 * 213 * The read callback is invoked whenever we read new data. 214 * The write callback is invoked whenever the output buffer is drained. 215 * The error callback is invoked on a write/read error or on EOF. 216 * 217 * Both read and write callbacks maybe NULL. The error callback is not 218 * allowed to be NULL and have to be provided always. 219 */ 220 221 struct bufferevent * 222 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, 223 everrorcb errorcb, void *cbarg) 224 { 225 struct bufferevent *bufev; 226 227 if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL) 228 return (NULL); 229 230 if ((bufev->input = evbuffer_new()) == NULL) { 231 free(bufev); 232 return (NULL); 233 } 234 235 if ((bufev->output = evbuffer_new()) == NULL) { 236 evbuffer_free(bufev->input); 237 free(bufev); 238 return (NULL); 239 } 240 241 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); 242 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); 243 244 bufev->readcb = readcb; 245 bufev->writecb = writecb; 246 bufev->errorcb = errorcb; 247 248 bufev->cbarg = cbarg; 249 250 /* 251 * Set to EV_WRITE so that using bufferevent_write is going to 252 * trigger a callback. Reading needs to be explicitly enabled 253 * because otherwise no data will be available. 254 */ 255 bufev->enabled = EV_WRITE; 256 257 return (bufev); 258 } 259 260 int 261 bufferevent_priority_set(struct bufferevent *bufev, int priority) 262 { 263 if (event_priority_set(&bufev->ev_read, priority) == -1) 264 return (-1); 265 if (event_priority_set(&bufev->ev_write, priority) == -1) 266 return (-1); 267 268 return (0); 269 } 270 271 /* Closing the file descriptor is the responsibility of the caller */ 272 273 void 274 bufferevent_free(struct bufferevent *bufev) 275 { 276 event_del(&bufev->ev_read); 277 event_del(&bufev->ev_write); 278 279 evbuffer_free(bufev->input); 280 evbuffer_free(bufev->output); 281 282 free(bufev); 283 } 284 285 /* 286 * Returns 0 on success; 287 * -1 on failure. 288 */ 289 290 int 291 bufferevent_write(struct bufferevent *bufev, void *data, size_t size) 292 { 293 int res; 294 295 res = evbuffer_add(bufev->output, data, size); 296 297 if (res == -1) 298 return (res); 299 300 /* If everything is okay, we need to schedule a write */ 301 if (size > 0 && (bufev->enabled & EV_WRITE)) 302 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 303 304 return (res); 305 } 306 307 int 308 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) 309 { 310 int res; 311 312 res = bufferevent_write(bufev, buf->buffer, buf->off); 313 if (res != -1) 314 evbuffer_drain(buf, buf->off); 315 316 return (res); 317 } 318 319 size_t 320 bufferevent_read(struct bufferevent *bufev, void *data, size_t size) 321 { 322 struct evbuffer *buf = bufev->input; 323 324 if (buf->off < size) 325 size = buf->off; 326 327 /* Copy the available data to the user buffer */ 328 memcpy(data, buf->buffer, size); 329 330 if (size) 331 evbuffer_drain(buf, size); 332 333 return (size); 334 } 335 336 int 337 bufferevent_enable(struct bufferevent *bufev, short event) 338 { 339 if (event & EV_READ) { 340 if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1) 341 return (-1); 342 } 343 if (event & EV_WRITE) { 344 if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1) 345 return (-1); 346 } 347 348 bufev->enabled |= event; 349 return (0); 350 } 351 352 int 353 bufferevent_disable(struct bufferevent *bufev, short event) 354 { 355 if (event & EV_READ) { 356 if (event_del(&bufev->ev_read) == -1) 357 return (-1); 358 } 359 if (event & EV_WRITE) { 360 if (event_del(&bufev->ev_write) == -1) 361 return (-1); 362 } 363 364 bufev->enabled &= ~event; 365 return (0); 366 } 367 368 /* 369 * Sets the read and write timeout for a buffered event. 370 */ 371 372 void 373 bufferevent_settimeout(struct bufferevent *bufev, 374 int timeout_read, int timeout_write) { 375 bufev->timeout_read = timeout_read; 376 bufev->timeout_write = timeout_write; 377 } 378 379 /* 380 * Sets the water marks 381 */ 382 383 void 384 bufferevent_setwatermark(struct bufferevent *bufev, short events, 385 size_t lowmark, size_t highmark) 386 { 387 if (events & EV_READ) { 388 bufev->wm_read.low = lowmark; 389 bufev->wm_read.high = highmark; 390 } 391 392 if (events & EV_WRITE) { 393 bufev->wm_write.low = lowmark; 394 bufev->wm_write.high = highmark; 395 } 396 397 /* If the watermarks changed then see if we should call read again */ 398 bufferevent_read_pressure_cb(bufev->input, 399 0, EVBUFFER_LENGTH(bufev->input), bufev); 400 } 401 402 int 403 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) 404 { 405 int res; 406 407 res = event_base_set(base, &bufev->ev_read); 408 if (res == -1) 409 return (res); 410 411 res = event_base_set(base, &bufev->ev_write); 412 return (res); 413 } 414