aboutsummaryrefslogtreecommitdiff
path: root/src/pevent.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pevent.c')
-rw-r--r--src/pevent.c96
1 files changed, 60 insertions, 36 deletions
diff --git a/src/pevent.c b/src/pevent.c
index 452b621..64b2d0b 100644
--- a/src/pevent.c
+++ b/src/pevent.c
@@ -226,7 +226,7 @@ int event_loop(event_ctx *ctx, on_event_cb on_event, void *user_data)
ctx->has_error = 1;
} else {
- if (!on_event(ctx, (event_buf *) ctx->events[i].data.ptr,
+ if (!on_event(ctx, buf->fd,
user_data) && !ctx->has_error)
{
W2("Event callback failed: [fd: %d , npoll: %d]",
@@ -251,21 +251,21 @@ event_forward_connection(event_ctx *ctx, int dest_fd, on_data_cb on_data,
forward_state rc = CON_OK;
ssize_t siz;
struct epoll_event *ev;
- struct event_buf *ev_buf;
+ struct event_buf *read_buf, write_buf = WRITE_BUF(dest_fd);
+ assert(dest_fd >= 0);
assert(ctx->current_event >= 0 &&
ctx->current_event < POTD_MAXEVENTS);
ev = &ctx->events[ctx->current_event];
- ev_buf = (event_buf *) ev->data.ptr;
+ read_buf = (event_buf *) ev->data.ptr;
- while (data_avail) {
+ while (data_avail && ctx->active && !ctx->has_error) {
saved_errno = 0;
siz = -1;
if (ev->events & EPOLLIN) {
errno = 0;
- ev_buf->buf_used = 0;
- siz = read(ev_buf->fd, ev_buf->buf, sizeof(ev_buf->buf));
+ siz = event_buf_read(read_buf);
saved_errno = errno;
} else break;
if (saved_errno == EAGAIN)
@@ -273,16 +273,16 @@ event_forward_connection(event_ctx *ctx, int dest_fd, on_data_cb on_data,
switch (siz) {
case -1:
- E_STRERR("Client read from fd %d", ev_buf->fd);
+ E_STRERR("Client read from fd %d", read_buf->fd);
ctx->has_error = 1;
rc = CON_IN_ERROR;
break;
case 0:
+ ctx->active = 0;
rc = CON_IN_TERMINATED;
break;
default:
- D2("Read %zu bytes from fd %d", siz, ev_buf->fd);
- ev_buf->buf_used = siz;
+ D2("Read %zu bytes from fd %d", siz, read_buf->fd);
break;
}
@@ -290,29 +290,47 @@ event_forward_connection(event_ctx *ctx, int dest_fd, on_data_cb on_data,
break;
if (on_data &&
- on_data(ctx, ev_buf->fd, dest_fd, ev_buf->buf, ev_buf->buf_used,
- user_data))
+ on_data(ctx, read_buf, &write_buf, user_data))
{
W2("On data callback failed, not forwarding from %d to %d",
- ev_buf->fd, dest_fd);
+ read_buf->fd, dest_fd);
continue;
+ } else if (!on_data) {
+ if (event_buf_fill(&write_buf, read_buf->buf,
+ read_buf->buf_used))
+ {
+ W2("Data copy failed, not forwarding from %d to %d",
+ read_buf->fd, dest_fd);
+ continue;
+ } else {
+ event_buf_discardall(read_buf);
+ }
}
- errno = 0;
- siz = write(dest_fd, ev_buf->buf, ev_buf->buf_used);
-
- switch (siz) {
- case -1:
- ctx->has_error = 1;
- rc = CON_OUT_ERROR;
- break;
- case 0:
- rc = CON_OUT_TERMINATED;
- break;
- default:
- D2("Written %zu bytes from fd %d to fd %d",
- siz, ev_buf->fd, dest_fd);
- break;
+ if (write_buf.buf_used) {
+ errno = 0;
+ siz = event_buf_drain(&write_buf);
+
+ switch (siz) {
+ case -1:
+ ctx->has_error = 1;
+ rc = CON_OUT_ERROR;
+ break;
+ case 0:
+ ctx->active = 0;
+ rc = CON_OUT_TERMINATED;
+ break;
+ default:
+ if (write_buf.buf_used) {
+ W2("Written only %zd bytes (remaining %zu bytes) "
+ "from %d to %d", siz, write_buf.buf_used,
+ read_buf->fd, write_buf.fd);
+ } else {
+ D2("Written %zd bytes from fd %d to fd %d",
+ siz, read_buf->fd, dest_fd);
+ }
+ break;
+ }
}
if (rc != CON_OK)
@@ -321,18 +339,19 @@ event_forward_connection(event_ctx *ctx, int dest_fd, on_data_cb on_data,
D2("Connection state: %d", rc);
if (rc != CON_OK) {
- shutdown(ev_buf->fd, SHUT_RDWR);
+ shutdown(read_buf->fd, SHUT_RDWR);
shutdown(dest_fd, SHUT_RDWR);
}
return rc;
}
-int event_buf_fill(event_buf *buf, unsigned char *data, size_t size)
+int event_buf_fill(event_buf *buf, char *data, size_t size)
{
- if (size > buf->buf_used && event_buf_drain(buf) < 0)
- return 1;
- if (size > sizeof(buf->buf))
+ if (size > event_buf_avail(buf) &&
+ event_buf_drain(buf) < 0)
+ {
return 1;
+ }
memcpy(buf->buf + buf->buf_used, data, size);
buf->buf_used += size;
@@ -343,13 +362,18 @@ ssize_t event_buf_drain(event_buf *buf)
{
ssize_t written;
- if (!buf->buf_used)
+ if (!buf->buf_used || buf->fd < 0)
return 0;
written = write(buf->fd, buf->buf, buf->buf_used);
- if (written < 0)
- return -1;
- buf->buf_used -= written;
+ switch (written) {
+ case 0:
+ case -1:
+ return written;
+ default:
+ break;
+ }
+ event_buf_discard(buf, written);
return written;
}