aboutsummaryrefslogtreecommitdiff
path: root/src/pevent.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pevent.c')
-rw-r--r--src/pevent.c232
1 files changed, 232 insertions, 0 deletions
diff --git a/src/pevent.c b/src/pevent.c
new file mode 100644
index 0000000..e377f29
--- /dev/null
+++ b/src/pevent.c
@@ -0,0 +1,232 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <signal.h>
+#include <sys/epoll.h>
+#include <assert.h>
+
+#include "pevent.h"
+#include "log.h"
+
+static int epoll_event_string(uint32_t event, char *buf, size_t siz);
+
+
+static int epoll_event_string(uint32_t event, char *buf, size_t siz)
+{
+ if (event & EPOLLERR) {
+ snprintf(buf, siz, "%s", "EPOLLERR");
+ } else if (event & EPOLLHUP) {
+ snprintf(buf, siz, "%s", "EPOLLHUP");
+ } else if (event & EPOLLRDHUP) {
+ snprintf(buf, siz, "%s", "EPOLLRDHUP");
+ } else if (event & EPOLLIN) {
+ snprintf(buf, siz, "%s", "EPOLLIN");
+ } else return 1;
+
+ return 0;
+}
+
+void event_init(event_ctx **ctx)
+{
+ assert(ctx);
+ if (!*ctx)
+ *ctx = (event_ctx *) malloc(sizeof(**ctx));
+ assert(*ctx);
+
+ memset(*ctx, 0, sizeof(**ctx));
+ (*ctx)->epoll_fd = -1;
+}
+
+void event_free(event_ctx **ctx)
+{
+ assert(ctx && *ctx);
+
+ close((*ctx)->epoll_fd);
+ free((*ctx));
+ *ctx = NULL;
+}
+
+int event_setup(event_ctx *ctx)
+{
+ assert(ctx);
+
+ if (ctx->epoll_fd < 0)
+ /* flags == 0 -> obsolete size arg is dropped */
+ ctx->epoll_fd = epoll_create1(0);
+
+ return ctx->epoll_fd < 0;
+}
+
+int event_add_sock(event_ctx *ctx, psocket *sock)
+{
+ int s;
+ struct epoll_event ev = {0,{0}};
+
+ assert(ctx && sock);
+
+ ev.data.fd = sock->fd;
+ ev.events = EPOLLIN /*| EPOLLET*/; /* EPOLLET: broken */
+ s = epoll_ctl(ctx->epoll_fd, EPOLL_CTL_ADD, sock->fd, &ev);
+ if (s)
+ return 1;
+
+ return 0;
+}
+
+int event_add_fd(event_ctx *ctx, int fd)
+{
+ int s;
+ struct epoll_event ev = {0,{0}};
+
+ assert(ctx);
+
+ ev.data.fd = fd;
+ ev.events = EPOLLIN | EPOLLET;
+ s = epoll_ctl(ctx->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+ if (s)
+ return 1;
+
+ return 0;
+}
+
+int event_loop(event_ctx *ctx, on_event_cb on_event, void *user_data)
+{
+ int n, i, saved_errno;
+ char ev_err[16];
+ sigset_t eset;
+
+ assert(ctx && on_event);
+ sigemptyset(&eset);
+ ctx->active = 1;
+ ctx->has_error = 0;
+
+ while (ctx->active && !ctx->has_error) {
+ errno = 0;
+ n = epoll_pwait(ctx->epoll_fd, ctx->events, POTD_MAXEVENTS, -1, &eset);
+ saved_errno = errno;
+ if (errno == EINTR)
+ continue;
+ if (n < 0) {
+ ctx->has_error = 1;
+ break;
+ }
+
+ for (i = 0; i < n; ++i) {
+ ctx->current_event = i;
+
+ if ((ctx->events[i].events & EPOLLERR) ||
+ (ctx->events[i].events & EPOLLHUP) ||
+ (ctx->events[i].events & EPOLLRDHUP) ||
+ (!(ctx->events[i].events & EPOLLIN)))
+ {
+ if (epoll_event_string(ctx->events[i].events, ev_err, sizeof ev_err)) {
+ errno = saved_errno;
+ E_STRERR("Event for descriptor %d",
+ ctx->events[i].data.fd);
+ } else {
+ errno = saved_errno;
+ E_STRERR("Event [%s] for descriptor %d",
+ ev_err, ctx->events[i].data.fd);
+ }
+
+ ctx->has_error = 1;
+ } else {
+ if (!on_event(ctx, ctx->events[i].data.fd, user_data) && !ctx->has_error)
+ W2("Event callback failed: [fd: %d , npoll: %d]",
+ ctx->events[i].data.fd, n);
+ }
+
+ if (!ctx->active || ctx->has_error)
+ break;
+ }
+ }
+
+ return ctx->has_error != 0;
+}
+
+forward_state
+event_forward_connection(event_ctx *ctx, int dest_fd, on_data_cb on_data,
+ void *user_data)
+{
+ int data_avail = 1;
+ int has_input;
+ int saved_errno;
+ forward_state rc = CON_OK;
+ ssize_t siz;
+ char buf[BUFSIZ];
+ struct epoll_event *ev;
+
+ assert(ctx->current_event >= 0 &&
+ ctx->current_event < POTD_MAXEVENTS);
+ ev = &ctx->events[ctx->current_event];
+
+ while (data_avail) {
+ has_input = 0;
+ saved_errno = 0;
+ siz = -1;
+
+ if (ev->events & EPOLLIN) {
+ has_input = 1;
+ errno = 0;
+ siz = read(ev->data.fd, &buf[0], BUFSIZ);
+ saved_errno = errno;
+ } else break;
+ if (saved_errno == EAGAIN)
+ break;
+
+ switch (siz) {
+ case -1:
+ E_STRERR("Client read from fd %d", ev->data.fd);
+ ctx->has_error = 1;
+ rc = CON_IN_ERROR;
+ break;
+ case 0:
+ rc = CON_IN_TERMINATED;
+ break;
+ default:
+ D2("Read %zu bytes from fd %d", siz, ev->data.fd);
+ break;
+ }
+
+ if (rc != CON_OK)
+ break;
+
+ if (on_data &&
+ on_data(ctx, ev->data.fd, dest_fd, buf, siz, user_data))
+ {
+ W2("On data callback failed, not forwarding from %d to %d",
+ ev->data.fd, dest_fd);
+ continue;
+ }
+
+ if (has_input) {
+ errno = 0;
+ siz = write(dest_fd, &buf[0], siz);
+
+ switch (siz) {
+ case -1:
+ ctx->has_error = 1;
+ rc = CON_OUT_ERROR;
+ break;
+ case 0:
+ rc = CON_OUT_TERMINATED;
+ break;
+ default:
+ D2("Written %zu bytes from fd %d to fd %d",
+ siz, ev->data.fd, dest_fd);
+ break;
+ }
+ }
+
+ if (rc != CON_OK)
+ break;
+ }
+
+ D2("Connection state: %d", rc);
+ if (rc != CON_OK) {
+ shutdown(ev->data.fd, SHUT_RDWR);
+ shutdown(dest_fd, SHUT_RDWR);
+ }
+ return rc;
+}