#include #include #include #include #include #include #include #include "server.h" #include "socket.h" #include "log.h" typedef struct client_thread_args { pthread_t self; psocket client_psock; char host_buf[NI_MAXHOST], service_buf[NI_MAXSERV]; const server_ctx *server_ctx; } client_thread_args; static int server_accept_client(server_ctx *ctx[], size_t siz, struct epoll_event *event); static void * client_mainloop_epoll(void *arg); static int client_io_epoll(struct epoll_event *ev); int server_init_ctx(server_ctx **ctx, forward_ctx *fwd_ctx) { assert(ctx && fwd_ctx); if (!*ctx) *ctx = (server_ctx *) malloc(sizeof(**ctx)); assert(*ctx); memset(*ctx, 0, sizeof(**ctx)); (*ctx)->fwd_ctx = fwd_ctx; return 0; } int server_setup(server_ctx *ctx, const char *listen_addr, const char *listen_port) { int s; struct addrinfo *srv_addr = NULL; assert(ctx); assert(listen_addr || listen_port); D2("Try to listen on %s:%s", (listen_addr ? listen_addr : "*"), listen_port); s = socket_init_in(listen_addr, listen_port, &srv_addr); if (s) { E_GAIERR(s, "Could not initialise server socket"); return 1; } if (socket_bind_in(&ctx->sock, srv_addr)) { E_STRERR("Could not bind server socket"); return 1; } if (socket_listen_in(&ctx->sock)) { E_STRERR("Could not listen on server socket"); return 1; } return 0; } int server_validate_ctx(const server_ctx *ctx) { assert(ctx && ctx->fwd_ctx); assert(ctx->sock.fd >= 0 && ctx->sock.addr_len > 0); return 0; } int server_setup_epoll(server_ctx *ctx[], size_t siz) { int s, fd = epoll_create1(0); /* flags == 0 -> obsolete size arg is dropped */ struct epoll_event ev; assert(ctx); assert(siz > 0 && siz < POTD_MAXFD); if (fd < 0) return -1; for (size_t i = 0; i < siz; ++i) { memset(&ev, 0, sizeof(ev)); ev.data.fd = ctx[i]->sock.fd; ev.events = EPOLLIN | EPOLLET; s = socket_addrtostr_in(&ctx[i]->sock, ctx[i]->host_buf, ctx[i]->service_buf); if (s) { E_GAIERR(s, "Convert socket address to string"); return -2; } N("Redirector service listening on %s:%s", ctx[i]->host_buf, ctx[i]->service_buf); s = epoll_ctl(fd, EPOLL_CTL_ADD, ctx[i]->sock.fd, &ev); if (s) { close(fd); return -3; } } return fd; } int server_mainloop_epoll(int epoll_fd, server_ctx *ctx[], size_t siz) { static struct epoll_event *events = NULL; if (!events) events = (struct epoll_event *) calloc(POTD_MAXEVENTS, sizeof(*events)); assert(events); assert(ctx); assert(siz > 0 && siz < POTD_MAXFD); while (1) { int n, i; n = epoll_wait(epoll_fd, events, POTD_MAXEVENTS, -1); if (n < 0) return 1; for (i = 0; i < n; ++i) { if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) { E("Epoll for descriptor %d failed", events[i].data.fd); E_STRERR("epoll_wait"); close(events[i].data.fd); continue; } else { if (server_accept_client(ctx, siz, &events[i])) { /* new client connection, accept succeeded */ continue; } W2("Server accept client failed: [fd: %d , npoll: %d]", events[i].data.fd, n); } } } free(events); return 0; } static int server_accept_client(server_ctx *ctx[], size_t siz, struct epoll_event *event) { size_t i; int s; client_thread_args *args; for (i = 0; i < siz; ++i) { if (ctx[i]->sock.fd == event->data.fd) { args = (client_thread_args *) calloc(1, sizeof(client_thread_args)); if (socket_accept_in(&ctx[i]->sock, &args->client_psock)) { E_STRERR("Could not accept client connection"); return 0; } args->server_ctx = ctx[i]; s = socket_addrtostr_in(&args->client_psock, args->host_buf, args->service_buf); if (s) { E_GAIERR(s, "Convert socket address to string"); goto error; } N("New connection from %s:%s to %s:%s: %d", args->host_buf, args->service_buf, ctx[i]->host_buf, ctx[i]->service_buf, args->client_psock.fd); if (pthread_create(&args->self, NULL, client_mainloop_epoll, args)) { E_STRERR("Thread creation"); goto error; } return 1; error: close(args->client_psock.fd); free(args); return 0; } } return 0; } static void * client_mainloop_epoll(void *arg) { client_thread_args *args; int s, epoll_fd, active = 1; struct epoll_event event = {0,{0}}; struct epoll_event *events; assert(arg); args = (client_thread_args *) arg; pthread_detach(args->self); events = (struct epoll_event *) calloc(POTD_MAXEVENTS, sizeof(*events)); assert(events); epoll_fd = epoll_create1(0); if (epoll_fd < 0) { E_STRERR("Client epoll_create1"); goto finish; } event.data.fd = args->client_psock.fd; event.events = EPOLLIN | EPOLLET; s = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, args->client_psock.fd, &event); if (s) { E_STRERR("Client epoll_ctl"); goto finish; } while (active) { int n, i; n = epoll_wait(epoll_fd, events, POTD_MAXEVENTS, -1); if (n < 0) break; for (i = 0; i < n; ++i) { if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN) && !(events[i].events & EPOLLOUT))) { E("Epoll for descriptor %d failed", events[i].data.fd); E_STRERR("epoll_wait"); close(events[i].data.fd); continue; } else { if (client_io_epoll(&events[i])) { N("Lost connection to %s:%s: %d", args->host_buf, args->service_buf, args->client_psock.fd); active = 0; break; } } W2("I/O forwarder failed: [fd: %d , npoll: %d]", events[i].data.fd, n); } } finish: close(epoll_fd); close(args->client_psock.fd); free(events); free(args); return NULL; } static int client_io_epoll(struct epoll_event *ev) { int data_avail = 1; int has_input; int saved_errno, io_fail = 0; ssize_t siz; char buf[BUFSIZ+sizeof(long)]; while (data_avail) { has_input = 0; saved_errno = 0; siz = -1; if (ev->events & EPOLLIN) { has_input = 1; siz = read(ev->data.fd, &buf[0], BUFSIZ); saved_errno = errno; } else if (ev->events & EPOLLOUT) { W("%s", "Suffering from buffer bloat"); continue; } switch (siz) { case -1: E_STRERR("Client read"); if (saved_errno != EAGAIN) io_fail = 1; break; case 0: printf("DISCONNECT !!!\n"); io_fail = 1; break; default: buf[siz] = 0; break; } if (io_fail) break; if (has_input) { printf("INPUT: ___%s___\n", buf); if (strncmp(buf, "QUIT", 4) == 0) io_fail = 1; if (strncmp(buf, "TEST", 4) == 0) { printf("------------\n"); write(ev->data.fd, "BLABLABLA\n", 10); } } if (io_fail) break; } return io_fail != 0; }