1eac58b9emaste/*
2eac58b9emaste * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
3eac58b9emaste *
4eac58b9emaste * All rights reserved.
5eac58b9emaste *
6eac58b9emaste * Redistribution and use in source and binary forms, with or without
7eac58b9emaste * modification, are permitted provided that the following conditions
8eac58b9emaste * are met:
9eac58b9emaste * 1. Redistributions of source code must retain the above copyright
10eac58b9emaste *    notice, this list of conditions and the following disclaimer.
11eac58b9emaste * 2. Redistributions in binary form must reproduce the above copyright
12eac58b9emaste *    notice, this list of conditions and the following disclaimer in the
13eac58b9emaste *    documentation and/or other materials provided with the distribution.
14eac58b9emaste * 3. The name of the author may not be used to endorse or promote products
15eac58b9emaste *    derived from this software without specific prior written permission.
16eac58b9emaste *
17eac58b9emaste * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18eac58b9emaste * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19eac58b9emaste * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20eac58b9emaste * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21eac58b9emaste * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22eac58b9emaste * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23eac58b9emaste * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24eac58b9emaste * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25eac58b9emaste * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26eac58b9emaste * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27eac58b9emaste */
28eac58b9emaste
29eac58b9emaste#include "event2/event-config.h"
30eac58b9emaste#include "evconfig-private.h"
31eac58b9emaste
32eac58b9emaste#ifdef EVENT__HAVE_SYS_TIME_H
33eac58b9emaste#include <sys/time.h>
34eac58b9emaste#endif
35eac58b9emaste
36eac58b9emaste#include <errno.h>
37eac58b9emaste#include <stdio.h>
38eac58b9emaste#include <stdlib.h>
39eac58b9emaste#include <string.h>
40eac58b9emaste#ifdef EVENT__HAVE_STDARG_H
41eac58b9emaste#include <stdarg.h>
42eac58b9emaste#endif
43eac58b9emaste#ifdef EVENT__HAVE_UNISTD_H
44eac58b9emaste#include <unistd.h>
45eac58b9emaste#endif
46eac58b9emaste
47eac58b9emaste#ifdef _WIN32
48eac58b9emaste#include <winsock2.h>
49eac58b9emaste#include <ws2tcpip.h>
50eac58b9emaste#endif
51eac58b9emaste
52eac58b9emaste#include <sys/queue.h>
53eac58b9emaste
54eac58b9emaste#include "event2/util.h"
55eac58b9emaste#include "event2/bufferevent.h"
56eac58b9emaste#include "event2/buffer.h"
57eac58b9emaste#include "event2/bufferevent_struct.h"
58eac58b9emaste#include "event2/event.h"
59eac58b9emaste#include "event2/util.h"
60eac58b9emaste#include "event-internal.h"
61eac58b9emaste#include "log-internal.h"
62eac58b9emaste#include "mm-internal.h"
63eac58b9emaste#include "bufferevent-internal.h"
64eac58b9emaste#include "util-internal.h"
65eac58b9emaste#include "iocp-internal.h"
66eac58b9emaste
67eac58b9emaste#ifndef SO_UPDATE_CONNECT_CONTEXT
68eac58b9emaste/* Mingw is sometimes missing this */
69eac58b9emaste#define SO_UPDATE_CONNECT_CONTEXT 0x7010
70eac58b9emaste#endif
71eac58b9emaste
72eac58b9emaste/* prototypes */
73eac58b9emastestatic int be_async_enable(struct bufferevent *, short);
74eac58b9emastestatic int be_async_disable(struct bufferevent *, short);
75eac58b9emastestatic void be_async_destruct(struct bufferevent *);
76eac58b9emastestatic int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
77eac58b9emastestatic int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
78eac58b9emaste
79eac58b9emastestruct bufferevent_async {
80eac58b9emaste	struct bufferevent_private bev;
81eac58b9emaste	struct event_overlapped connect_overlapped;
82eac58b9emaste	struct event_overlapped read_overlapped;
83eac58b9emaste	struct event_overlapped write_overlapped;
84eac58b9emaste	size_t read_in_progress;
85eac58b9emaste	size_t write_in_progress;
86eac58b9emaste	unsigned ok : 1;
87eac58b9emaste	unsigned read_added : 1;
88eac58b9emaste	unsigned write_added : 1;
89eac58b9emaste};
90eac58b9emaste
91eac58b9emasteconst struct bufferevent_ops bufferevent_ops_async = {
92eac58b9emaste	"socket_async",
93eac58b9emaste	evutil_offsetof(struct bufferevent_async, bev.bev),
94eac58b9emaste	be_async_enable,
95eac58b9emaste	be_async_disable,
96eac58b9emaste	NULL, /* Unlink */
97eac58b9emaste	be_async_destruct,
98eac58b9emaste	bufferevent_generic_adj_timeouts_,
99eac58b9emaste	be_async_flush,
100eac58b9emaste	be_async_ctrl,
101eac58b9emaste};
102eac58b9emaste
103eac58b9emastestatic inline struct bufferevent_async *
104eac58b9emasteupcast(struct bufferevent *bev)
105eac58b9emaste{
106eac58b9emaste	struct bufferevent_async *bev_a;
107eac58b9emaste	if (bev->be_ops != &bufferevent_ops_async)
108eac58b9emaste		return NULL;
109eac58b9emaste	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
110eac58b9emaste	return bev_a;
111eac58b9emaste}
112eac58b9emaste
113eac58b9emastestatic inline struct bufferevent_async *
114eac58b9emasteupcast_connect(struct event_overlapped *eo)
115eac58b9emaste{
116eac58b9emaste	struct bufferevent_async *bev_a;
117eac58b9emaste	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
118eac58b9emaste	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
119eac58b9emaste	return bev_a;
120eac58b9emaste}
121eac58b9emaste
122eac58b9emastestatic inline struct bufferevent_async *
123eac58b9emasteupcast_read(struct event_overlapped *eo)
124eac58b9emaste{
125eac58b9emaste	struct bufferevent_async *bev_a;
126eac58b9emaste	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
127eac58b9emaste	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
128eac58b9emaste	return bev_a;
129eac58b9emaste}
130eac58b9emaste
131eac58b9emastestatic inline struct bufferevent_async *
132eac58b9emasteupcast_write(struct event_overlapped *eo)
133eac58b9emaste{
134eac58b9emaste	struct bufferevent_async *bev_a;
135eac58b9emaste	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
136eac58b9emaste	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
137eac58b9emaste	return bev_a;
138eac58b9emaste}
139eac58b9emaste
140eac58b9emastestatic void
141eac58b9emastebev_async_del_write(struct bufferevent_async *beva)
142eac58b9emaste{
143eac58b9emaste	struct bufferevent *bev = &beva->bev.bev;
144eac58b9emaste
145eac58b9emaste	if (beva->write_added) {
146eac58b9emaste		beva->write_added = 0;
147eac58b9emaste		event_base_del_virtual_(bev->ev_base);
148eac58b9emaste	}
149eac58b9emaste}
150eac58b9emaste
151eac58b9emastestatic void
152eac58b9emastebev_async_del_read(struct bufferevent_async *beva)
153eac58b9emaste{
154eac58b9emaste	struct bufferevent *bev = &beva->bev.bev;
155eac58b9emaste
156eac58b9emaste	if (beva->read_added) {
157eac58b9emaste		beva->read_added = 0;
158eac58b9emaste		event_base_del_virtual_(bev->ev_base);
159eac58b9emaste	}
160eac58b9emaste}
161eac58b9emaste
162eac58b9emastestatic void
163eac58b9emastebev_async_add_write(struct bufferevent_async *beva)
164eac58b9emaste{
165eac58b9emaste	struct bufferevent *bev = &beva->bev.bev;
166eac58b9emaste
167eac58b9emaste	if (!beva->write_added) {
168eac58b9emaste		beva->write_added = 1;
169eac58b9emaste		event_base_add_virtual_(bev->ev_base);
170eac58b9emaste	}
171eac58b9emaste}
172eac58b9emaste
173eac58b9emastestatic void
174eac58b9emastebev_async_add_read(struct bufferevent_async *beva)
175eac58b9emaste{
176eac58b9emaste	struct bufferevent *bev = &beva->bev.bev;
177eac58b9emaste
178eac58b9emaste	if (!beva->read_added) {
179eac58b9emaste		beva->read_added = 1;
180eac58b9emaste		event_base_add_virtual_(bev->ev_base);
181eac58b9emaste	}
182eac58b9emaste}
183eac58b9emaste
184eac58b9emastestatic void
185eac58b9emastebev_async_consider_writing(struct bufferevent_async *beva)
186eac58b9emaste{
187eac58b9emaste	size_t at_most;
188eac58b9emaste	int limit;
189eac58b9emaste	struct bufferevent *bev = &beva->bev.bev;
190eac58b9emaste
191eac58b9emaste	/* Don't write if there's a write in progress, or we do not
192eac58b9emaste	 * want to write, or when there's nothing left to write. */
193eac58b9emaste	if (beva->write_in_progress || beva->bev.connecting)
194eac58b9emaste		return;
195eac58b9emaste	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
196eac58b9emaste	    !evbuffer_get_length(bev->output)) {
197eac58b9emaste		bev_async_del_write(beva);
198eac58b9emaste		return;
199eac58b9emaste	}
200eac58b9emaste
201eac58b9emaste	at_most = evbuffer_get_length(bev->output);
202eac58b9emaste
203eac58b9emaste	/* This is safe so long as bufferevent_get_write_max never returns
204eac58b9emaste	 * more than INT_MAX.  That's true for now. XXXX */
205eac58b9emaste	limit = (int)bufferevent_get_write_max_(&beva->bev);
206eac58b9emaste	if (at_most >= (size_t)limit && limit >= 0)
207eac58b9emaste		at_most = limit;
208eac58b9emaste
209eac58b9emaste	if (beva->bev.write_suspended) {
210eac58b9emaste		bev_async_del_write(beva);
211eac58b9emaste		return;
212eac58b9emaste	}
213eac58b9emaste
214eac58b9emaste	/*  XXXX doesn't respect low-water mark very well. */
215eac58b9emaste	bufferevent_incref_(bev);
216eac58b9emaste	if (evbuffer_launch_write_(bev->output, at_most,
217eac58b9emaste	    &beva->write_overlapped)) {
218eac58b9emaste		bufferevent_decref_(bev);
219eac58b9emaste		beva->ok = 0;
220eac58b9emaste		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
221eac58b9emaste	} else {
222eac58b9emaste		beva->write_in_progress = at_most;
223eac58b9emaste		bufferevent_decrement_write_buckets_(&beva->bev, at_most);
224eac58b9emaste		bev_async_add_write(beva);
225eac58b9emaste	}
226eac58b9emaste}
227eac58b9emaste
228eac58b9emastestatic void
229eac58b9emastebev_async_consider_reading(struct bufferevent_async *beva)
230eac58b9emaste{
231eac58b9emaste	size_t cur_size;
232eac58b9emaste	size_t read_high;
233eac58b9emaste	size_t at_most;
234eac58b9emaste	int limit;
235eac58b9emaste	struct bufferevent *bev = &beva->bev.bev;
236eac58b9emaste
237eac58b9emaste	/* Don't read if there is a read in progress, or we do not
238eac58b9emaste	 * want to read. */
239eac58b9emaste	if (beva->read_in_progress || beva->bev.connecting)
240eac58b9emaste		return;
241eac58b9emaste	if (!beva->ok || !(bev->enabled&EV_READ)) {
242eac58b9emaste		bev_async_del_read(beva);
243eac58b9emaste		return;
244eac58b9emaste	}
245eac58b9emaste
246eac58b9emaste	/* Don't read if we're full */
247eac58b9emaste	cur_size = evbuffer_get_length(bev->input);
248eac58b9emaste	read_high = bev->wm_read.high;
249eac58b9emaste	if (read_high) {
250eac58b9emaste		if (cur_size >= read_high) {
251eac58b9emaste			bev_async_del_read(beva);
252eac58b9emaste			return;
253eac58b9emaste		}
254eac58b9emaste		at_most = read_high - cur_size;
255eac58b9emaste	} else {
256eac58b9emaste		at_most = 16384; /* FIXME totally magic. */
257eac58b9emaste	}
258eac58b9emaste
259eac58b9emaste	/* XXXX This over-commits. */
260eac58b9emaste	/* XXXX see also not above on cast on bufferevent_get_write_max_() */
261eac58b9emaste	limit = (int)bufferevent_get_read_max_(&beva->bev);
262eac58b9emaste	if (at_most >= (size_t)limit && limit >= 0)
263eac58b9emaste		at_most = limit;
264eac58b9emaste
265eac58b9emaste	if (beva->bev.read_suspended) {
266eac58b9emaste		bev_async_del_read(beva);
267eac58b9emaste		return;
268eac58b9emaste	}
269eac58b9emaste
270eac58b9emaste	bufferevent_incref_(bev);
271eac58b9emaste	if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
272eac58b9emaste		beva->ok = 0;
273eac58b9emaste		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
274eac58b9emaste		bufferevent_decref_(bev);
275eac58b9emaste	} else {
276eac58b9emaste		beva->read_in_progress = at_most;
277eac58b9emaste		bufferevent_decrement_read_buckets_(&beva->bev, at_most);
278eac58b9emaste		bev_async_add_read(beva);
279eac58b9emaste	}
280eac58b9emaste
281eac58b9emaste	return;
282eac58b9emaste}
283eac58b9emaste
284eac58b9emastestatic void
285eac58b9emastebe_async_outbuf_callback(struct evbuffer *buf,
286eac58b9emaste    const struct evbuffer_cb_info *cbinfo,
287eac58b9emaste    void *arg)
288eac58b9emaste{
289eac58b9emaste	struct bufferevent *bev = arg;
290eac58b9emaste	struct bufferevent_async *bev_async = upcast(bev);
291eac58b9emaste
292eac58b9emaste	/* If we added data to the outbuf and were not writing before,
293eac58b9emaste	 * we may want to write now. */
294eac58b9emaste
295eac58b9emaste	bufferevent_incref_and_lock_(bev);
296eac58b9emaste
297eac58b9emaste	if (cbinfo->n_added)
298eac58b9emaste		bev_async_consider_writing(bev_async);
299eac58b9emaste
300eac58b9emaste	bufferevent_decref_and_unlock_(bev);
301eac58b9emaste}
302eac58b9emaste
303eac58b9emastestatic void
304eac58b9emastebe_async_inbuf_callback(struct evbuffer *buf,
305eac58b9emaste    const struct evbuffer_cb_info *cbinfo,
306eac58b9emaste    void *arg)
307eac58b9emaste{
308eac58b9emaste	struct bufferevent *bev = arg;
309eac58b9emaste	struct bufferevent_async *bev_async = upcast(bev);
310eac58b9emaste
311eac58b9emaste	/* If we drained data from the inbuf and were not reading before,
312eac58b9emaste	 * we may want to read now */
313eac58b9emaste
314eac58b9emaste	bufferevent_incref_and_lock_(bev);
315eac58b9emaste
316eac58b9emaste	if (cbinfo->n_deleted)
317eac58b9emaste		bev_async_consider_reading(bev_async);
318eac58b9emaste
319eac58b9emaste	bufferevent_decref_and_unlock_(bev);
320eac58b9emaste}
321eac58b9emaste
322eac58b9emastestatic int
323eac58b9emastebe_async_enable(struct bufferevent *buf, short what)
324eac58b9emaste{
325eac58b9emaste	struct bufferevent_async *bev_async = upcast(buf);
326eac58b9emaste
327eac58b9emaste	if (!bev_async->ok)
328eac58b9emaste		return -1;
329eac58b9emaste
330eac58b9emaste	if (bev_async->bev.connecting) {
331eac58b9emaste		/* Don't launch anything during connection attempts. */
332eac58b9emaste		return 0;
333eac58b9emaste	}
334eac58b9emaste
335eac58b9emaste	if (what & EV_READ)
336eac58b9emaste		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
337eac58b9emaste	if (what & EV_WRITE)
338eac58b9emaste		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
339eac58b9emaste
340eac58b9emaste	/* If we newly enable reading or writing, and we aren't reading or
341eac58b9emaste	   writing already, consider launching a new read or write. */
342eac58b9emaste
343eac58b9emaste	if (what & EV_READ)
344eac58b9emaste		bev_async_consider_reading(bev_async);
345eac58b9emaste	if (what & EV_WRITE)
346eac58b9emaste		bev_async_consider_writing(bev_async);
347eac58b9emaste	return 0;
348eac58b9emaste}
349eac58b9emaste
350eac58b9emastestatic int
351eac58b9emastebe_async_disable(struct bufferevent *bev, short what)
352eac58b9emaste{
353eac58b9emaste	struct bufferevent_async *bev_async = upcast(bev);
354eac58b9emaste	/* XXXX If we disable reading or writing, we may want to consider
355eac58b9emaste	 * canceling any in-progress read or write operation, though it might
356eac58b9emaste	 * not work. */
357eac58b9emaste
358eac58b9emaste	if (what & EV_READ) {
359eac58b9emaste		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
360eac58b9emaste		bev_async_del_read(bev_async);
361eac58b9emaste	}
362eac58b9emaste	if (what & EV_WRITE) {
363eac58b9emaste		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
364eac58b9emaste		bev_async_del_write(bev_async);
365eac58b9emaste	}
366eac58b9emaste
367eac58b9emaste	return 0;
368eac58b9emaste}
369eac58b9emaste
370eac58b9emastestatic void
371eac58b9emastebe_async_destruct(struct bufferevent *bev)
372eac58b9emaste{
373eac58b9emaste	struct bufferevent_async *bev_async = upcast(bev);
374eac58b9emaste	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
375eac58b9emaste	evutil_socket_t fd;
376eac58b9emaste
377eac58b9emaste	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
378eac58b9emaste			!upcast(bev)->read_in_progress);
379eac58b9emaste
380eac58b9emaste	bev_async_del_read(bev_async);
381eac58b9emaste	bev_async_del_write(bev_async);
382eac58b9emaste
383eac58b9emaste	fd = evbuffer_overlapped_get_fd_(bev->input);
384eac58b9emaste	if (fd != (evutil_socket_t)INVALID_SOCKET &&
385eac58b9emaste		(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
386eac58b9emaste		evutil_closesocket(fd);
387eac58b9emaste		evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
388eac58b9emaste	}
389eac58b9emaste}
390eac58b9emaste
391eac58b9emaste/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
392eac58b9emaste * we use WSAGetOverlappedResult to translate. */
393eac58b9emastestatic void
394eac58b9emastebev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
395eac58b9emaste{
396eac58b9emaste	DWORD bytes, flags;
397eac58b9emaste	evutil_socket_t fd;
398eac58b9emaste
399eac58b9emaste	fd = evbuffer_overlapped_get_fd_(bev->input);
400eac58b9emaste	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
401eac58b9emaste}
402eac58b9emaste
403eac58b9emastestatic int
404eac58b9emastebe_async_flush(struct bufferevent *bev, short what,
405eac58b9emaste    enum bufferevent_flush_mode mode)
406eac58b9emaste{
407eac58b9emaste	return 0;
408eac58b9emaste}
409eac58b9emaste
410eac58b9emastestatic void
411eac58b9emasteconnect_complete(struct event_overlapped *eo, ev_uintptr_t key,
412eac58b9emaste    ev_ssize_t nbytes, int ok)
413eac58b9emaste{
414eac58b9emaste	struct bufferevent_async *bev_a = upcast_connect(eo);
415eac58b9emaste	struct bufferevent *bev = &bev_a->bev.bev;
416eac58b9emaste	evutil_socket_t sock;
417eac58b9emaste
418eac58b9emaste	BEV_LOCK(bev);
419eac58b9emaste
420eac58b9emaste	EVUTIL_ASSERT(bev_a->bev.connecting);
421eac58b9emaste	bev_a->bev.connecting = 0;
422eac58b9emaste	sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
423eac58b9emaste	/* XXXX Handle error? */
424eac58b9emaste	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
425eac58b9emaste
426eac58b9emaste	if (ok)
427eac58b9emaste		bufferevent_async_set_connected_(bev);
428eac58b9emaste	else
429eac58b9emaste		bev_async_set_wsa_error(bev, eo);
430eac58b9emaste
431eac58b9emaste	bufferevent_run_eventcb_(bev,
432eac58b9emaste			ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
433eac58b9emaste
434eac58b9emaste	event_base_del_virtual_(bev->ev_base);
435eac58b9emaste
436eac58b9emaste	bufferevent_decref_and_unlock_(bev);
437eac58b9emaste}
438eac58b9emaste
439eac58b9emastestatic void
440eac58b9emasteread_complete(struct event_overlapped *eo, ev_uintptr_t key,
441eac58b9emaste    ev_ssize_t nbytes, int ok)
442eac58b9emaste{
443eac58b9emaste	struct bufferevent_async *bev_a = upcast_read(eo);
444eac58b9emaste	struct bufferevent *bev = &bev_a->bev.bev;
445eac58b9emaste	short what = BEV_EVENT_READING;
446eac58b9emaste	ev_ssize_t amount_unread;
447eac58b9emaste	BEV_LOCK(bev);
448eac58b9emaste	EVUTIL_ASSERT(bev_a->read_in_progress);
449eac58b9emaste
450eac58b9emaste	amount_unread = bev_a->read_in_progress - nbytes;
451eac58b9emaste	evbuffer_commit_read_(bev->input, nbytes);
452eac58b9emaste	bev_a->read_in_progress = 0;
453eac58b9emaste	if (amount_unread)
454eac58b9emaste		bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
455eac58b9emaste
456eac58b9emaste	if (!ok)
457eac58b9emaste		bev_async_set_wsa_error(bev, eo);
458eac58b9emaste
459eac58b9emaste	if (bev_a->ok) {
460eac58b9emaste		if (ok && nbytes) {
461eac58b9emaste			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
462eac58b9emaste			bufferevent_trigger_nolock_(bev, EV_READ, 0);
463eac58b9emaste			bev_async_consider_reading(bev_a);
464eac58b9emaste		} else if (!ok) {
465eac58b9emaste			what |= BEV_EVENT_ERROR;
466eac58b9emaste			bev_a->ok = 0;
467eac58b9emaste			bufferevent_run_eventcb_(bev, what, 0);
468eac58b9emaste		} else if (!nbytes) {
469eac58b9emaste			what |= BEV_EVENT_EOF;
470eac58b9emaste			bev_a->ok = 0;
471eac58b9emaste			bufferevent_run_eventcb_(bev, what, 0);
472eac58b9emaste		}
473eac58b9emaste	}
474eac58b9emaste
475eac58b9emaste	bufferevent_decref_and_unlock_(bev);
476eac58b9emaste}
477eac58b9emaste
478eac58b9emastestatic void
479eac58b9emastewrite_complete(struct event_overlapped *eo, ev_uintptr_t key,
480eac58b9emaste    ev_ssize_t nbytes, int ok)
481eac58b9emaste{
482eac58b9emaste	struct bufferevent_async *bev_a = upcast_write(eo);
483eac58b9emaste	struct bufferevent *bev = &bev_a->bev.bev;
484eac58b9emaste	short what = BEV_EVENT_WRITING;
485eac58b9emaste	ev_ssize_t amount_unwritten;
486eac58b9emaste
487eac58b9emaste	BEV_LOCK(bev);
488eac58b9emaste	EVUTIL_ASSERT(bev_a->write_in_progress);
489eac58b9emaste
490eac58b9emaste	amount_unwritten = bev_a->write_in_progress - nbytes;
491eac58b9emaste	evbuffer_commit_write_(bev->output, nbytes);
492eac58b9emaste	bev_a->write_in_progress = 0;
493eac58b9emaste
494eac58b9emaste	if (amount_unwritten)
495eac58b9emaste		bufferevent_decrement_write_buckets_(&bev_a->bev,
496eac58b9emaste		                                     -amount_unwritten);
497eac58b9emaste
498eac58b9emaste
499eac58b9emaste	if (!ok)
500eac58b9emaste		bev_async_set_wsa_error(bev, eo);
501eac58b9emaste
502eac58b9emaste	if (bev_a->ok) {
503eac58b9emaste		if (ok && nbytes) {
504eac58b9emaste			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
505eac58b9emaste			bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
506eac58b9emaste			bev_async_consider_writing(bev_a);
507eac58b9emaste		} else if (!ok) {
508eac58b9emaste			what |= BEV_EVENT_ERROR;
509eac58b9emaste			bev_a->ok = 0;
510eac58b9emaste			bufferevent_run_eventcb_(bev, what, 0);
511eac58b9emaste		} else if (!nbytes) {
512eac58b9emaste			what |= BEV_EVENT_EOF;
513eac58b9emaste			bev_a->ok = 0;
514eac58b9emaste			bufferevent_run_eventcb_(bev, what, 0);
515eac58b9emaste		}
516eac58b9emaste	}
517eac58b9emaste
518eac58b9emaste	bufferevent_decref_and_unlock_(bev);
519eac58b9emaste}
520eac58b9emaste
521eac58b9emastestruct bufferevent *
522eac58b9emastebufferevent_async_new_(struct event_base *base,
523eac58b9emaste    evutil_socket_t fd, int options)
524eac58b9emaste{
525eac58b9emaste	struct bufferevent_async *bev_a;
526eac58b9emaste	struct bufferevent *bev;
527eac58b9emaste	struct event_iocp_port *iocp;
528eac58b9emaste
529eac58b9emaste	options |= BEV_OPT_THREADSAFE;
530eac58b9emaste
531eac58b9emaste	if (!(iocp = event_base_get_iocp_(base)))
532eac58b9emaste		return NULL;
533eac58b9emaste
534eac58b9emaste	if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
535eac58b9emaste		int err = GetLastError();
536eac58b9emaste		/* We may have alrady associated this fd with a port.
537eac58b9emaste		 * Let's hope it's this port, and that the error code
538eac58b9emaste		 * for doing this neer changes. */
539eac58b9emaste		if (err != ERROR_INVALID_PARAMETER)
540eac58b9emaste			return NULL;
541eac58b9emaste	}
542eac58b9emaste
543eac58b9emaste	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
544eac58b9emaste		return NULL;
545eac58b9emaste
546eac58b9emaste	bev = &bev_a->bev.bev;
547eac58b9emaste	if (!(bev->input = evbuffer_overlapped_new_(fd))) {
548eac58b9emaste		mm_free(bev_a);
549eac58b9emaste		return NULL;
550eac58b9emaste	}
551eac58b9emaste	if (!(bev->output = evbuffer_overlapped_new_(fd))) {
552eac58b9emaste		evbuffer_free(bev->input);
553eac58b9emaste		mm_free(bev_a);
554eac58b9emaste		return NULL;
555eac58b9emaste	}
556eac58b9emaste
557eac58b9emaste	if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
558eac58b9emaste		options)<0)
559eac58b9emaste		goto err;
560eac58b9emaste
561eac58b9emaste	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
562eac58b9emaste	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
563eac58b9emaste
564eac58b9emaste	event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
565eac58b9emaste	event_overlapped_init_(&bev_a->read_overlapped, read_complete);
566eac58b9emaste	event_overlapped_init_(&bev_a->write_overlapped, write_complete);
567eac58b9emaste
568eac58b9emaste	bufferevent_init_generic_timeout_cbs_(bev);
569eac58b9emaste
570eac58b9emaste	bev_a->ok = fd >= 0;
571eac58b9emaste
572eac58b9emaste	return bev;
573eac58b9emasteerr:
574eac58b9emaste	bufferevent_free(&bev_a->bev.bev);
575eac58b9emaste	return NULL;
576eac58b9emaste}
577eac58b9emaste
578eac58b9emastevoid
579eac58b9emastebufferevent_async_set_connected_(struct bufferevent *bev)
580eac58b9emaste{
581eac58b9emaste	struct bufferevent_async *bev_async = upcast(bev);
582eac58b9emaste	bev_async->ok = 1;
583eac58b9emaste	bufferevent_init_generic_timeout_cbs_(bev);
584eac58b9emaste	/* Now's a good time to consider reading/writing */
585eac58b9emaste	be_async_enable(bev, bev->enabled);
586eac58b9emaste}
587eac58b9emaste
588eac58b9emasteint
589eac58b9emastebufferevent_async_can_connect_(struct bufferevent *bev)
590eac58b9emaste{
591eac58b9emaste	const struct win32_extension_fns *ext =
592eac58b9emaste	    event_get_win32_extension_fns_();
593eac58b9emaste
594eac58b9emaste	if (BEV_IS_ASYNC(bev) &&
595eac58b9emaste	    event_base_get_iocp_(bev->ev_base) &&
596eac58b9emaste	    ext && ext->ConnectEx)
597eac58b9emaste		return 1;
598eac58b9emaste
599eac58b9emaste	return 0;
600eac58b9emaste}
601eac58b9emaste
602eac58b9emasteint
603eac58b9emastebufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
604eac58b9emaste	const struct sockaddr *sa, int socklen)
605eac58b9emaste{
606eac58b9emaste	BOOL rc;
607eac58b9emaste	struct bufferevent_async *bev_async = upcast(bev);
608eac58b9emaste	struct sockaddr_storage ss;
609eac58b9emaste	const struct win32_extension_fns *ext =
610eac58b9emaste	    event_get_win32_extension_fns_();
611eac58b9emaste
612eac58b9emaste	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
613eac58b9emaste
614eac58b9emaste	/* ConnectEx() requires that the socket be bound to an address
615eac58b9emaste	 * with bind() before using, otherwise it will fail. We attempt
616eac58b9emaste	 * to issue a bind() here, taking into account that the error
617eac58b9emaste	 * code is set to WSAEINVAL when the socket is already bound. */
618eac58b9emaste	memset(&ss, 0, sizeof(ss));
619eac58b9emaste	if (sa->sa_family == AF_INET) {
620eac58b9emaste		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
621eac58b9emaste		sin->sin_family = AF_INET;
622eac58b9emaste		sin->sin_addr.s_addr = INADDR_ANY;
623eac58b9emaste	} else if (sa->sa_family == AF_INET6) {
624eac58b9emaste		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
625eac58b9emaste		sin6->sin6_family = AF_INET6;
626eac58b9emaste		sin6->sin6_addr = in6addr_any;
627eac58b9emaste	} else {
628eac58b9emaste		/* Well, the user will have to bind() */
629eac58b9emaste		return -1;
630eac58b9emaste	}
631eac58b9emaste	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
632eac58b9emaste	    WSAGetLastError() != WSAEINVAL)
633eac58b9emaste		return -1;
634eac58b9emaste
635eac58b9emaste	event_base_add_virtual_(bev->ev_base);
636eac58b9emaste	bufferevent_incref_(bev);
637eac58b9emaste	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
638eac58b9emaste			    &bev_async->connect_overlapped.overlapped);
639eac58b9emaste	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
640eac58b9emaste		return 0;
641eac58b9emaste
642eac58b9emaste	event_base_del_virtual_(bev->ev_base);
643eac58b9emaste	bufferevent_decref_(bev);
644eac58b9emaste
645eac58b9emaste	return -1;
646eac58b9emaste}
647eac58b9emaste
648eac58b9emastestatic int
649eac58b9emastebe_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
650eac58b9emaste    union bufferevent_ctrl_data *data)
651eac58b9emaste{
652eac58b9emaste	switch (op) {
653eac58b9emaste	case BEV_CTRL_GET_FD:
654eac58b9emaste		data->fd = evbuffer_overlapped_get_fd_(bev->input);
655eac58b9emaste		return 0;
656eac58b9emaste	case BEV_CTRL_SET_FD: {
657eac58b9emaste		struct event_iocp_port *iocp;
658eac58b9emaste
659eac58b9emaste		if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
660eac58b9emaste			return 0;
661eac58b9emaste		if (!(iocp = event_base_get_iocp_(bev->ev_base)))
662eac58b9emaste			return -1;
663eac58b9emaste		if (event_iocp_port_associate_(iocp, data->fd, 1) < 0)
664eac58b9emaste			return -1;
665eac58b9emaste		evbuffer_overlapped_set_fd_(bev->input, data->fd);
666eac58b9emaste		evbuffer_overlapped_set_fd_(bev->output, data->fd);
667eac58b9emaste		return 0;
668eac58b9emaste	}
669eac58b9emaste	case BEV_CTRL_CANCEL_ALL: {
670eac58b9emaste		struct bufferevent_async *bev_a = upcast(bev);
671eac58b9emaste		evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
672eac58b9emaste		if (fd != (evutil_socket_t)INVALID_SOCKET &&
673eac58b9emaste		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
674eac58b9emaste			closesocket(fd);
675eac58b9emaste			evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
676eac58b9emaste		}
677eac58b9emaste		bev_a->ok = 0;
678eac58b9emaste		return 0;
679eac58b9emaste	}
680eac58b9emaste	case BEV_CTRL_GET_UNDERLYING:
681eac58b9emaste	default:
682eac58b9emaste		return -1;
683eac58b9emaste	}
684eac58b9emaste}
685eac58b9emaste
686eac58b9emaste
687