1c43e99fdSEd Maste /*
2c43e99fdSEd Maste * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson
3c43e99fdSEd Maste *
4c43e99fdSEd Maste * Redistribution and use in source and binary forms, with or without
5c43e99fdSEd Maste * modification, are permitted provided that the following conditions
6c43e99fdSEd Maste * are met:
7c43e99fdSEd Maste * 1. Redistributions of source code must retain the above copyright
8c43e99fdSEd Maste * notice, this list of conditions and the following disclaimer.
9c43e99fdSEd Maste * 2. Redistributions in binary form must reproduce the above copyright
10c43e99fdSEd Maste * notice, this list of conditions and the following disclaimer in the
11c43e99fdSEd Maste * documentation and/or other materials provided with the distribution.
12c43e99fdSEd Maste * 3. The name of the author may not be used to endorse or promote products
13c43e99fdSEd Maste * derived from this software without specific prior written permission.
14c43e99fdSEd Maste *
15c43e99fdSEd Maste * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16c43e99fdSEd Maste * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17c43e99fdSEd Maste * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18c43e99fdSEd Maste * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19c43e99fdSEd Maste * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20c43e99fdSEd Maste * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21c43e99fdSEd Maste * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22c43e99fdSEd Maste * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23c43e99fdSEd Maste * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24c43e99fdSEd Maste * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25c43e99fdSEd Maste */
26c43e99fdSEd Maste #include "event2/event-config.h"
27c43e99fdSEd Maste #include "evconfig-private.h"
28c43e99fdSEd Maste
29c43e99fdSEd Maste #include <sys/types.h>
30c43e99fdSEd Maste
31c43e99fdSEd Maste #ifdef _WIN32
32c43e99fdSEd Maste #include <winsock2.h>
33c43e99fdSEd Maste #endif
34c43e99fdSEd Maste
35c43e99fdSEd Maste #include "event2/util.h"
36c43e99fdSEd Maste #include "event2/buffer.h"
37c43e99fdSEd Maste #include "event2/bufferevent.h"
38c43e99fdSEd Maste #include "event2/bufferevent_struct.h"
39c43e99fdSEd Maste #include "event2/event.h"
40c43e99fdSEd Maste #include "defer-internal.h"
41c43e99fdSEd Maste #include "bufferevent-internal.h"
42c43e99fdSEd Maste #include "mm-internal.h"
43c43e99fdSEd Maste #include "util-internal.h"
44c43e99fdSEd Maste
45c43e99fdSEd Maste struct bufferevent_pair {
46c43e99fdSEd Maste struct bufferevent_private bev;
47c43e99fdSEd Maste struct bufferevent_pair *partner;
48c43e99fdSEd Maste /* For ->destruct() lock checking */
49c43e99fdSEd Maste struct bufferevent_pair *unlinked_partner;
50c43e99fdSEd Maste };
51c43e99fdSEd Maste
52c43e99fdSEd Maste
53c43e99fdSEd Maste /* Given a bufferevent that's really a bev part of a bufferevent_pair,
54c43e99fdSEd Maste * return that bufferevent_filtered. Returns NULL otherwise.*/
55c43e99fdSEd Maste static inline struct bufferevent_pair *
upcast(struct bufferevent * bev)56c43e99fdSEd Maste upcast(struct bufferevent *bev)
57c43e99fdSEd Maste {
58c43e99fdSEd Maste struct bufferevent_pair *bev_p;
59*b50261e2SCy Schubert if (!BEV_IS_PAIR(bev))
60c43e99fdSEd Maste return NULL;
61c43e99fdSEd Maste bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev);
62*b50261e2SCy Schubert EVUTIL_ASSERT(BEV_IS_PAIR(&bev_p->bev.bev));
63c43e99fdSEd Maste return bev_p;
64c43e99fdSEd Maste }
65c43e99fdSEd Maste
66c43e99fdSEd Maste #define downcast(bev_pair) (&(bev_pair)->bev.bev)
67c43e99fdSEd Maste
68c43e99fdSEd Maste static inline void
incref_and_lock(struct bufferevent * b)69c43e99fdSEd Maste incref_and_lock(struct bufferevent *b)
70c43e99fdSEd Maste {
71c43e99fdSEd Maste struct bufferevent_pair *bevp;
72c43e99fdSEd Maste bufferevent_incref_and_lock_(b);
73c43e99fdSEd Maste bevp = upcast(b);
74c43e99fdSEd Maste if (bevp->partner)
75c43e99fdSEd Maste bufferevent_incref_and_lock_(downcast(bevp->partner));
76c43e99fdSEd Maste }
77c43e99fdSEd Maste
78c43e99fdSEd Maste static inline void
decref_and_unlock(struct bufferevent * b)79c43e99fdSEd Maste decref_and_unlock(struct bufferevent *b)
80c43e99fdSEd Maste {
81c43e99fdSEd Maste struct bufferevent_pair *bevp = upcast(b);
82c43e99fdSEd Maste if (bevp->partner)
83c43e99fdSEd Maste bufferevent_decref_and_unlock_(downcast(bevp->partner));
84c43e99fdSEd Maste bufferevent_decref_and_unlock_(b);
85c43e99fdSEd Maste }
86c43e99fdSEd Maste
87c43e99fdSEd Maste /* XXX Handle close */
88c43e99fdSEd Maste
89c43e99fdSEd Maste static void be_pair_outbuf_cb(struct evbuffer *,
90c43e99fdSEd Maste const struct evbuffer_cb_info *, void *);
91c43e99fdSEd Maste
92c43e99fdSEd Maste static struct bufferevent_pair *
bufferevent_pair_elt_new(struct event_base * base,int options)93c43e99fdSEd Maste bufferevent_pair_elt_new(struct event_base *base,
94c43e99fdSEd Maste int options)
95c43e99fdSEd Maste {
96c43e99fdSEd Maste struct bufferevent_pair *bufev;
97c43e99fdSEd Maste if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair))))
98c43e99fdSEd Maste return NULL;
99c43e99fdSEd Maste if (bufferevent_init_common_(&bufev->bev, base, &bufferevent_ops_pair,
100c43e99fdSEd Maste options)) {
101c43e99fdSEd Maste mm_free(bufev);
102c43e99fdSEd Maste return NULL;
103c43e99fdSEd Maste }
104c43e99fdSEd Maste if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) {
105c43e99fdSEd Maste bufferevent_free(downcast(bufev));
106c43e99fdSEd Maste return NULL;
107c43e99fdSEd Maste }
108c43e99fdSEd Maste
109c43e99fdSEd Maste bufferevent_init_generic_timeout_cbs_(&bufev->bev.bev);
110c43e99fdSEd Maste
111c43e99fdSEd Maste return bufev;
112c43e99fdSEd Maste }
113c43e99fdSEd Maste
114c43e99fdSEd Maste int
bufferevent_pair_new(struct event_base * base,int options,struct bufferevent * pair[2])115c43e99fdSEd Maste bufferevent_pair_new(struct event_base *base, int options,
116c43e99fdSEd Maste struct bufferevent *pair[2])
117c43e99fdSEd Maste {
118c43e99fdSEd Maste struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
119c43e99fdSEd Maste int tmp_options;
120c43e99fdSEd Maste
121c43e99fdSEd Maste options |= BEV_OPT_DEFER_CALLBACKS;
122c43e99fdSEd Maste tmp_options = options & ~BEV_OPT_THREADSAFE;
123c43e99fdSEd Maste
124c43e99fdSEd Maste bufev1 = bufferevent_pair_elt_new(base, options);
125c43e99fdSEd Maste if (!bufev1)
126c43e99fdSEd Maste return -1;
127c43e99fdSEd Maste bufev2 = bufferevent_pair_elt_new(base, tmp_options);
128c43e99fdSEd Maste if (!bufev2) {
129c43e99fdSEd Maste bufferevent_free(downcast(bufev1));
130c43e99fdSEd Maste return -1;
131c43e99fdSEd Maste }
132c43e99fdSEd Maste
133c43e99fdSEd Maste if (options & BEV_OPT_THREADSAFE) {
134c43e99fdSEd Maste /*XXXX check return */
135c43e99fdSEd Maste bufferevent_enable_locking_(downcast(bufev2), bufev1->bev.lock);
136c43e99fdSEd Maste }
137c43e99fdSEd Maste
138c43e99fdSEd Maste bufev1->partner = bufev2;
139c43e99fdSEd Maste bufev2->partner = bufev1;
140c43e99fdSEd Maste
141c43e99fdSEd Maste evbuffer_freeze(downcast(bufev1)->input, 0);
142c43e99fdSEd Maste evbuffer_freeze(downcast(bufev1)->output, 1);
143c43e99fdSEd Maste evbuffer_freeze(downcast(bufev2)->input, 0);
144c43e99fdSEd Maste evbuffer_freeze(downcast(bufev2)->output, 1);
145c43e99fdSEd Maste
146c43e99fdSEd Maste pair[0] = downcast(bufev1);
147c43e99fdSEd Maste pair[1] = downcast(bufev2);
148c43e99fdSEd Maste
149c43e99fdSEd Maste return 0;
150c43e99fdSEd Maste }
151c43e99fdSEd Maste
152c43e99fdSEd Maste static void
be_pair_transfer(struct bufferevent * src,struct bufferevent * dst,int ignore_wm)153c43e99fdSEd Maste be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
154c43e99fdSEd Maste int ignore_wm)
155c43e99fdSEd Maste {
156c43e99fdSEd Maste size_t dst_size;
157c43e99fdSEd Maste size_t n;
158c43e99fdSEd Maste
159c43e99fdSEd Maste evbuffer_unfreeze(src->output, 1);
160c43e99fdSEd Maste evbuffer_unfreeze(dst->input, 0);
161c43e99fdSEd Maste
162c43e99fdSEd Maste if (dst->wm_read.high) {
163c43e99fdSEd Maste dst_size = evbuffer_get_length(dst->input);
164c43e99fdSEd Maste if (dst_size < dst->wm_read.high) {
165c43e99fdSEd Maste n = dst->wm_read.high - dst_size;
166c43e99fdSEd Maste evbuffer_remove_buffer(src->output, dst->input, n);
167c43e99fdSEd Maste } else {
168c43e99fdSEd Maste if (!ignore_wm)
169c43e99fdSEd Maste goto done;
170c43e99fdSEd Maste n = evbuffer_get_length(src->output);
171c43e99fdSEd Maste evbuffer_add_buffer(dst->input, src->output);
172c43e99fdSEd Maste }
173c43e99fdSEd Maste } else {
174c43e99fdSEd Maste n = evbuffer_get_length(src->output);
175c43e99fdSEd Maste evbuffer_add_buffer(dst->input, src->output);
176c43e99fdSEd Maste }
177c43e99fdSEd Maste
178c43e99fdSEd Maste if (n) {
179c43e99fdSEd Maste BEV_RESET_GENERIC_READ_TIMEOUT(dst);
180c43e99fdSEd Maste
181c43e99fdSEd Maste if (evbuffer_get_length(dst->output))
182c43e99fdSEd Maste BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
183c43e99fdSEd Maste else
184c43e99fdSEd Maste BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
185c43e99fdSEd Maste }
186c43e99fdSEd Maste
187c43e99fdSEd Maste bufferevent_trigger_nolock_(dst, EV_READ, 0);
188c43e99fdSEd Maste bufferevent_trigger_nolock_(src, EV_WRITE, 0);
189c43e99fdSEd Maste done:
190c43e99fdSEd Maste evbuffer_freeze(src->output, 1);
191c43e99fdSEd Maste evbuffer_freeze(dst->input, 0);
192c43e99fdSEd Maste }
193c43e99fdSEd Maste
194c43e99fdSEd Maste static inline int
be_pair_wants_to_talk(struct bufferevent_pair * src,struct bufferevent_pair * dst)195c43e99fdSEd Maste be_pair_wants_to_talk(struct bufferevent_pair *src,
196c43e99fdSEd Maste struct bufferevent_pair *dst)
197c43e99fdSEd Maste {
198c43e99fdSEd Maste return (downcast(src)->enabled & EV_WRITE) &&
199c43e99fdSEd Maste (downcast(dst)->enabled & EV_READ) &&
200c43e99fdSEd Maste !dst->bev.read_suspended &&
201c43e99fdSEd Maste evbuffer_get_length(downcast(src)->output);
202c43e99fdSEd Maste }
203c43e99fdSEd Maste
204c43e99fdSEd Maste static void
be_pair_outbuf_cb(struct evbuffer * outbuf,const struct evbuffer_cb_info * info,void * arg)205c43e99fdSEd Maste be_pair_outbuf_cb(struct evbuffer *outbuf,
206c43e99fdSEd Maste const struct evbuffer_cb_info *info, void *arg)
207c43e99fdSEd Maste {
208c43e99fdSEd Maste struct bufferevent_pair *bev_pair = arg;
209c43e99fdSEd Maste struct bufferevent_pair *partner = bev_pair->partner;
210c43e99fdSEd Maste
211c43e99fdSEd Maste incref_and_lock(downcast(bev_pair));
212c43e99fdSEd Maste
213c43e99fdSEd Maste if (info->n_added > info->n_deleted && partner) {
214c43e99fdSEd Maste /* We got more data. If the other side's reading, then
215c43e99fdSEd Maste hand it over. */
216c43e99fdSEd Maste if (be_pair_wants_to_talk(bev_pair, partner)) {
217c43e99fdSEd Maste be_pair_transfer(downcast(bev_pair), downcast(partner), 0);
218c43e99fdSEd Maste }
219c43e99fdSEd Maste }
220c43e99fdSEd Maste
221c43e99fdSEd Maste decref_and_unlock(downcast(bev_pair));
222c43e99fdSEd Maste }
223c43e99fdSEd Maste
224c43e99fdSEd Maste static int
be_pair_enable(struct bufferevent * bufev,short events)225c43e99fdSEd Maste be_pair_enable(struct bufferevent *bufev, short events)
226c43e99fdSEd Maste {
227c43e99fdSEd Maste struct bufferevent_pair *bev_p = upcast(bufev);
228c43e99fdSEd Maste struct bufferevent_pair *partner = bev_p->partner;
229c43e99fdSEd Maste
230c43e99fdSEd Maste incref_and_lock(bufev);
231c43e99fdSEd Maste
232c43e99fdSEd Maste if (events & EV_READ) {
233c43e99fdSEd Maste BEV_RESET_GENERIC_READ_TIMEOUT(bufev);
234c43e99fdSEd Maste }
235c43e99fdSEd Maste if ((events & EV_WRITE) && evbuffer_get_length(bufev->output))
236c43e99fdSEd Maste BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
237c43e99fdSEd Maste
238c43e99fdSEd Maste /* We're starting to read! Does the other side have anything to write?*/
239c43e99fdSEd Maste if ((events & EV_READ) && partner &&
240c43e99fdSEd Maste be_pair_wants_to_talk(partner, bev_p)) {
241c43e99fdSEd Maste be_pair_transfer(downcast(partner), bufev, 0);
242c43e99fdSEd Maste }
243c43e99fdSEd Maste /* We're starting to write! Does the other side want to read? */
244c43e99fdSEd Maste if ((events & EV_WRITE) && partner &&
245c43e99fdSEd Maste be_pair_wants_to_talk(bev_p, partner)) {
246c43e99fdSEd Maste be_pair_transfer(bufev, downcast(partner), 0);
247c43e99fdSEd Maste }
248c43e99fdSEd Maste decref_and_unlock(bufev);
249c43e99fdSEd Maste return 0;
250c43e99fdSEd Maste }
251c43e99fdSEd Maste
252c43e99fdSEd Maste static int
be_pair_disable(struct bufferevent * bev,short events)253c43e99fdSEd Maste be_pair_disable(struct bufferevent *bev, short events)
254c43e99fdSEd Maste {
255c43e99fdSEd Maste if (events & EV_READ) {
256c43e99fdSEd Maste BEV_DEL_GENERIC_READ_TIMEOUT(bev);
257c43e99fdSEd Maste }
258c43e99fdSEd Maste if (events & EV_WRITE) {
259c43e99fdSEd Maste BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
260c43e99fdSEd Maste }
261c43e99fdSEd Maste return 0;
262c43e99fdSEd Maste }
263c43e99fdSEd Maste
264c43e99fdSEd Maste static void
be_pair_unlink(struct bufferevent * bev)265c43e99fdSEd Maste be_pair_unlink(struct bufferevent *bev)
266c43e99fdSEd Maste {
267c43e99fdSEd Maste struct bufferevent_pair *bev_p = upcast(bev);
268c43e99fdSEd Maste
269c43e99fdSEd Maste if (bev_p->partner) {
270c43e99fdSEd Maste bev_p->unlinked_partner = bev_p->partner;
271c43e99fdSEd Maste bev_p->partner->partner = NULL;
272c43e99fdSEd Maste bev_p->partner = NULL;
273c43e99fdSEd Maste }
274c43e99fdSEd Maste }
275c43e99fdSEd Maste
276c43e99fdSEd Maste /* Free *shared* lock in the latest be (since we share it between two of them). */
277c43e99fdSEd Maste static void
be_pair_destruct(struct bufferevent * bev)278c43e99fdSEd Maste be_pair_destruct(struct bufferevent *bev)
279c43e99fdSEd Maste {
280c43e99fdSEd Maste struct bufferevent_pair *bev_p = upcast(bev);
281c43e99fdSEd Maste
282c43e99fdSEd Maste /* Transfer ownership of the lock into partner, otherwise we will use
283c43e99fdSEd Maste * already free'd lock during freeing second bev, see next example:
284c43e99fdSEd Maste *
285c43e99fdSEd Maste * bev1->own_lock = 1
286c43e99fdSEd Maste * bev2->own_lock = 0
287c43e99fdSEd Maste * bev2->lock = bev1->lock
288c43e99fdSEd Maste *
289c43e99fdSEd Maste * bufferevent_free(bev1) # refcnt == 0 -> unlink
290c43e99fdSEd Maste * bufferevent_free(bev2) # refcnt == 0 -> unlink
291c43e99fdSEd Maste *
292c43e99fdSEd Maste * event_base_free() -> finilizers -> EVTHREAD_FREE_LOCK(bev1->lock)
293c43e99fdSEd Maste * -> BEV_LOCK(bev2->lock) <-- already freed
294c43e99fdSEd Maste *
295c43e99fdSEd Maste * Where bev1 == pair[0], bev2 == pair[1].
296c43e99fdSEd Maste */
297c43e99fdSEd Maste if (bev_p->unlinked_partner && bev_p->bev.own_lock) {
298c43e99fdSEd Maste bev_p->unlinked_partner->bev.own_lock = 1;
299c43e99fdSEd Maste bev_p->bev.own_lock = 0;
300c43e99fdSEd Maste }
301c43e99fdSEd Maste bev_p->unlinked_partner = NULL;
302c43e99fdSEd Maste }
303c43e99fdSEd Maste
304c43e99fdSEd Maste static int
be_pair_flush(struct bufferevent * bev,short iotype,enum bufferevent_flush_mode mode)305c43e99fdSEd Maste be_pair_flush(struct bufferevent *bev, short iotype,
306c43e99fdSEd Maste enum bufferevent_flush_mode mode)
307c43e99fdSEd Maste {
308c43e99fdSEd Maste struct bufferevent_pair *bev_p = upcast(bev);
309c43e99fdSEd Maste struct bufferevent *partner;
310c43e99fdSEd Maste
311c43e99fdSEd Maste if (!bev_p->partner)
312c43e99fdSEd Maste return -1;
313c43e99fdSEd Maste
314c43e99fdSEd Maste if (mode == BEV_NORMAL)
315c43e99fdSEd Maste return 0;
316c43e99fdSEd Maste
317c43e99fdSEd Maste incref_and_lock(bev);
318c43e99fdSEd Maste
319c43e99fdSEd Maste partner = downcast(bev_p->partner);
320c43e99fdSEd Maste
321c43e99fdSEd Maste if ((iotype & EV_READ) != 0)
322c43e99fdSEd Maste be_pair_transfer(partner, bev, 1);
323c43e99fdSEd Maste
324c43e99fdSEd Maste if ((iotype & EV_WRITE) != 0)
325c43e99fdSEd Maste be_pair_transfer(bev, partner, 1);
326c43e99fdSEd Maste
327c43e99fdSEd Maste if (mode == BEV_FINISHED) {
328c43e99fdSEd Maste short what = BEV_EVENT_EOF;
329c43e99fdSEd Maste if (iotype & EV_READ)
330c43e99fdSEd Maste what |= BEV_EVENT_WRITING;
331c43e99fdSEd Maste if (iotype & EV_WRITE)
332c43e99fdSEd Maste what |= BEV_EVENT_READING;
333c43e99fdSEd Maste bufferevent_run_eventcb_(partner, what, 0);
334c43e99fdSEd Maste }
335c43e99fdSEd Maste decref_and_unlock(bev);
336c43e99fdSEd Maste return 0;
337c43e99fdSEd Maste }
338c43e99fdSEd Maste
339c43e99fdSEd Maste struct bufferevent *
bufferevent_pair_get_partner(struct bufferevent * bev)340c43e99fdSEd Maste bufferevent_pair_get_partner(struct bufferevent *bev)
341c43e99fdSEd Maste {
342c43e99fdSEd Maste struct bufferevent_pair *bev_p;
343c43e99fdSEd Maste struct bufferevent *partner = NULL;
344c43e99fdSEd Maste bev_p = upcast(bev);
345c43e99fdSEd Maste if (! bev_p)
346c43e99fdSEd Maste return NULL;
347c43e99fdSEd Maste
348c43e99fdSEd Maste incref_and_lock(bev);
349c43e99fdSEd Maste if (bev_p->partner)
350c43e99fdSEd Maste partner = downcast(bev_p->partner);
351c43e99fdSEd Maste decref_and_unlock(bev);
352c43e99fdSEd Maste return partner;
353c43e99fdSEd Maste }
354c43e99fdSEd Maste
355c43e99fdSEd Maste const struct bufferevent_ops bufferevent_ops_pair = {
356c43e99fdSEd Maste "pair_elt",
357c43e99fdSEd Maste evutil_offsetof(struct bufferevent_pair, bev.bev),
358c43e99fdSEd Maste be_pair_enable,
359c43e99fdSEd Maste be_pair_disable,
360c43e99fdSEd Maste be_pair_unlink,
361c43e99fdSEd Maste be_pair_destruct,
362c43e99fdSEd Maste bufferevent_generic_adj_timeouts_,
363c43e99fdSEd Maste be_pair_flush,
364c43e99fdSEd Maste NULL, /* ctrl */
365c43e99fdSEd Maste };
366