1 /* 2 * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. Redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution. 12 * 3. The name of the author may not be used to endorse or promote products 13 * derived from this software without specific prior written permission. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 16 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 17 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25 */ 26 #include "event2/event-config.h" 27 #include "evconfig-private.h" 28 29 #include <sys/types.h> 30 31 #ifdef _WIN32 32 #include <winsock2.h> 33 #endif 34 35 #include "event2/util.h" 36 #include "event2/buffer.h" 37 #include "event2/bufferevent.h" 38 #include "event2/bufferevent_struct.h" 39 #include "event2/event.h" 40 #include "defer-internal.h" 41 #include "bufferevent-internal.h" 42 #include "mm-internal.h" 43 #include "util-internal.h" 44 45 struct bufferevent_pair { 46 struct bufferevent_private bev; 47 struct bufferevent_pair *partner; 48 /* For ->destruct() lock checking */ 49 struct bufferevent_pair *unlinked_partner; 50 }; 51 52 53 /* Given a bufferevent that's really a bev part of a bufferevent_pair, 54 * return that bufferevent_filtered. Returns NULL otherwise.*/ 55 static inline struct bufferevent_pair * 56 upcast(struct bufferevent *bev) 57 { 58 struct bufferevent_pair *bev_p; 59 if (!BEV_IS_PAIR(bev)) 60 return NULL; 61 bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev); 62 EVUTIL_ASSERT(BEV_IS_PAIR(&bev_p->bev.bev)); 63 return bev_p; 64 } 65 66 #define downcast(bev_pair) (&(bev_pair)->bev.bev) 67 68 static inline void 69 incref_and_lock(struct bufferevent *b) 70 { 71 struct bufferevent_pair *bevp; 72 bufferevent_incref_and_lock_(b); 73 bevp = upcast(b); 74 if (bevp->partner) 75 bufferevent_incref_and_lock_(downcast(bevp->partner)); 76 } 77 78 static inline void 79 decref_and_unlock(struct bufferevent *b) 80 { 81 struct bufferevent_pair *bevp = upcast(b); 82 if (bevp->partner) 83 bufferevent_decref_and_unlock_(downcast(bevp->partner)); 84 bufferevent_decref_and_unlock_(b); 85 } 86 87 /* XXX Handle close */ 88 89 static void be_pair_outbuf_cb(struct evbuffer *, 90 const struct evbuffer_cb_info *, void *); 91 92 static struct bufferevent_pair * 93 bufferevent_pair_elt_new(struct event_base *base, 94 int options) 95 { 96 struct bufferevent_pair *bufev; 97 if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair)))) 98 return NULL; 99 if (bufferevent_init_common_(&bufev->bev, base, &bufferevent_ops_pair, 100 options)) { 101 mm_free(bufev); 102 return NULL; 103 } 104 if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) { 105 bufferevent_free(downcast(bufev)); 106 return NULL; 107 } 108 109 bufferevent_init_generic_timeout_cbs_(&bufev->bev.bev); 110 111 return bufev; 112 } 113 114 int 115 bufferevent_pair_new(struct event_base *base, int options, 116 struct bufferevent *pair[2]) 117 { 118 struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL; 119 int tmp_options; 120 121 options |= BEV_OPT_DEFER_CALLBACKS; 122 tmp_options = options & ~BEV_OPT_THREADSAFE; 123 124 bufev1 = bufferevent_pair_elt_new(base, options); 125 if (!bufev1) 126 return -1; 127 bufev2 = bufferevent_pair_elt_new(base, tmp_options); 128 if (!bufev2) { 129 bufferevent_free(downcast(bufev1)); 130 return -1; 131 } 132 133 if (options & BEV_OPT_THREADSAFE) { 134 /*XXXX check return */ 135 bufferevent_enable_locking_(downcast(bufev2), bufev1->bev.lock); 136 } 137 138 bufev1->partner = bufev2; 139 bufev2->partner = bufev1; 140 141 evbuffer_freeze(downcast(bufev1)->input, 0); 142 evbuffer_freeze(downcast(bufev1)->output, 1); 143 evbuffer_freeze(downcast(bufev2)->input, 0); 144 evbuffer_freeze(downcast(bufev2)->output, 1); 145 146 pair[0] = downcast(bufev1); 147 pair[1] = downcast(bufev2); 148 149 return 0; 150 } 151 152 static void 153 be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, 154 int ignore_wm) 155 { 156 size_t dst_size; 157 size_t n; 158 159 evbuffer_unfreeze(src->output, 1); 160 evbuffer_unfreeze(dst->input, 0); 161 162 if (dst->wm_read.high) { 163 dst_size = evbuffer_get_length(dst->input); 164 if (dst_size < dst->wm_read.high) { 165 n = dst->wm_read.high - dst_size; 166 evbuffer_remove_buffer(src->output, dst->input, n); 167 } else { 168 if (!ignore_wm) 169 goto done; 170 n = evbuffer_get_length(src->output); 171 evbuffer_add_buffer(dst->input, src->output); 172 } 173 } else { 174 n = evbuffer_get_length(src->output); 175 evbuffer_add_buffer(dst->input, src->output); 176 } 177 178 if (n) { 179 BEV_RESET_GENERIC_READ_TIMEOUT(dst); 180 181 if (evbuffer_get_length(dst->output)) 182 BEV_RESET_GENERIC_WRITE_TIMEOUT(dst); 183 else 184 BEV_DEL_GENERIC_WRITE_TIMEOUT(dst); 185 } 186 187 bufferevent_trigger_nolock_(dst, EV_READ, 0); 188 bufferevent_trigger_nolock_(src, EV_WRITE, 0); 189 done: 190 evbuffer_freeze(src->output, 1); 191 evbuffer_freeze(dst->input, 0); 192 } 193 194 static inline int 195 be_pair_wants_to_talk(struct bufferevent_pair *src, 196 struct bufferevent_pair *dst) 197 { 198 return (downcast(src)->enabled & EV_WRITE) && 199 (downcast(dst)->enabled & EV_READ) && 200 !dst->bev.read_suspended && 201 evbuffer_get_length(downcast(src)->output); 202 } 203 204 static void 205 be_pair_outbuf_cb(struct evbuffer *outbuf, 206 const struct evbuffer_cb_info *info, void *arg) 207 { 208 struct bufferevent_pair *bev_pair = arg; 209 struct bufferevent_pair *partner = bev_pair->partner; 210 211 incref_and_lock(downcast(bev_pair)); 212 213 if (info->n_added > info->n_deleted && partner) { 214 /* We got more data. If the other side's reading, then 215 hand it over. */ 216 if (be_pair_wants_to_talk(bev_pair, partner)) { 217 be_pair_transfer(downcast(bev_pair), downcast(partner), 0); 218 } 219 } 220 221 decref_and_unlock(downcast(bev_pair)); 222 } 223 224 static int 225 be_pair_enable(struct bufferevent *bufev, short events) 226 { 227 struct bufferevent_pair *bev_p = upcast(bufev); 228 struct bufferevent_pair *partner = bev_p->partner; 229 230 incref_and_lock(bufev); 231 232 if (events & EV_READ) { 233 BEV_RESET_GENERIC_READ_TIMEOUT(bufev); 234 } 235 if ((events & EV_WRITE) && evbuffer_get_length(bufev->output)) 236 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); 237 238 /* We're starting to read! Does the other side have anything to write?*/ 239 if ((events & EV_READ) && partner && 240 be_pair_wants_to_talk(partner, bev_p)) { 241 be_pair_transfer(downcast(partner), bufev, 0); 242 } 243 /* We're starting to write! Does the other side want to read? */ 244 if ((events & EV_WRITE) && partner && 245 be_pair_wants_to_talk(bev_p, partner)) { 246 be_pair_transfer(bufev, downcast(partner), 0); 247 } 248 decref_and_unlock(bufev); 249 return 0; 250 } 251 252 static int 253 be_pair_disable(struct bufferevent *bev, short events) 254 { 255 if (events & EV_READ) { 256 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 257 } 258 if (events & EV_WRITE) { 259 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 260 } 261 return 0; 262 } 263 264 static void 265 be_pair_unlink(struct bufferevent *bev) 266 { 267 struct bufferevent_pair *bev_p = upcast(bev); 268 269 if (bev_p->partner) { 270 bev_p->unlinked_partner = bev_p->partner; 271 bev_p->partner->partner = NULL; 272 bev_p->partner = NULL; 273 } 274 } 275 276 /* Free *shared* lock in the latest be (since we share it between two of them). */ 277 static void 278 be_pair_destruct(struct bufferevent *bev) 279 { 280 struct bufferevent_pair *bev_p = upcast(bev); 281 282 /* Transfer ownership of the lock into partner, otherwise we will use 283 * already free'd lock during freeing second bev, see next example: 284 * 285 * bev1->own_lock = 1 286 * bev2->own_lock = 0 287 * bev2->lock = bev1->lock 288 * 289 * bufferevent_free(bev1) # refcnt == 0 -> unlink 290 * bufferevent_free(bev2) # refcnt == 0 -> unlink 291 * 292 * event_base_free() -> finilizers -> EVTHREAD_FREE_LOCK(bev1->lock) 293 * -> BEV_LOCK(bev2->lock) <-- already freed 294 * 295 * Where bev1 == pair[0], bev2 == pair[1]. 296 */ 297 if (bev_p->unlinked_partner && bev_p->bev.own_lock) { 298 bev_p->unlinked_partner->bev.own_lock = 1; 299 bev_p->bev.own_lock = 0; 300 } 301 bev_p->unlinked_partner = NULL; 302 } 303 304 static int 305 be_pair_flush(struct bufferevent *bev, short iotype, 306 enum bufferevent_flush_mode mode) 307 { 308 struct bufferevent_pair *bev_p = upcast(bev); 309 struct bufferevent *partner; 310 311 if (!bev_p->partner) 312 return -1; 313 314 if (mode == BEV_NORMAL) 315 return 0; 316 317 incref_and_lock(bev); 318 319 partner = downcast(bev_p->partner); 320 321 if ((iotype & EV_READ) != 0) 322 be_pair_transfer(partner, bev, 1); 323 324 if ((iotype & EV_WRITE) != 0) 325 be_pair_transfer(bev, partner, 1); 326 327 if (mode == BEV_FINISHED) { 328 short what = BEV_EVENT_EOF; 329 if (iotype & EV_READ) 330 what |= BEV_EVENT_WRITING; 331 if (iotype & EV_WRITE) 332 what |= BEV_EVENT_READING; 333 bufferevent_run_eventcb_(partner, what, 0); 334 } 335 decref_and_unlock(bev); 336 return 0; 337 } 338 339 struct bufferevent * 340 bufferevent_pair_get_partner(struct bufferevent *bev) 341 { 342 struct bufferevent_pair *bev_p; 343 struct bufferevent *partner = NULL; 344 bev_p = upcast(bev); 345 if (! bev_p) 346 return NULL; 347 348 incref_and_lock(bev); 349 if (bev_p->partner) 350 partner = downcast(bev_p->partner); 351 decref_and_unlock(bev); 352 return partner; 353 } 354 355 const struct bufferevent_ops bufferevent_ops_pair = { 356 "pair_elt", 357 evutil_offsetof(struct bufferevent_pair, bev.bev), 358 be_pair_enable, 359 be_pair_disable, 360 be_pair_unlink, 361 be_pair_destruct, 362 bufferevent_generic_adj_timeouts_, 363 be_pair_flush, 364 NULL, /* ctrl */ 365 }; 366