diff options
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r-- | nDPIsrvd.c | 1933 |
1 files changed, 1933 insertions, 0 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c new file mode 100644 index 000000000..1e675ba5b --- /dev/null +++ b/nDPIsrvd.c @@ -0,0 +1,1933 @@ +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <netdb.h> +#include <netinet/tcp.h> +#include <pwd.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <string.h> +#if !defined(__FreeBSD__) && !defined(__APPLE__) +#include <sys/signalfd.h> +#endif +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#include "config.h" +#include "nDPIsrvd.h" +#include "nio.h" +#include "utils.h" + +enum sock_type +{ + COLLECTOR_UN, + DISTRIBUTOR_UN, + DISTRIBUTOR_IN, +}; + +struct nDPIsrvd_write_buffer +{ + struct nDPIsrvd_buffer buf; + size_t written; +}; + +struct remote_desc +{ + enum sock_type sock_type; + int fd; + + union + { + struct + { + struct sockaddr_un peer; + unsigned long long int json_bytes; +#if !defined(__FreeBSD__) && !defined(__APPLE__) + pid_t pid; +#endif + + struct nDPIsrvd_json_buffer main_read_buffer; + } event_collector_un; + struct + { + struct sockaddr_un peer; +#if !defined(__FreeBSD__) && !defined(__APPLE__) + pid_t pid; + char * user_name; +#endif + + struct nDPIsrvd_write_buffer main_write_buffer; + UT_array * additional_write_buffers; + } event_distributor_un; /* UNIX socket */ + struct + { + struct sockaddr_in peer; + char peer_addr[INET_ADDRSTRLEN]; + + struct nDPIsrvd_write_buffer main_write_buffer; + UT_array * additional_write_buffers; + } event_distributor_in; /* TCP/IP socket */ + }; +}; + +static struct +{ + struct remote_desc * desc; + nDPIsrvd_ull desc_size; + nDPIsrvd_ull desc_used; +} remotes = {NULL, 0, 0}; + +static int nDPIsrvd_main_thread_shutdown = 0; +static int collector_un_sockfd = -1; +static int distributor_un_sockfd = -1; +static int distributor_in_sockfd = -1; +static struct nDPIsrvd_address distributor_in_address = { + .raw.sa_family = (sa_family_t)0xFFFF, +}; + +static struct +{ + struct cmdarg config_file; + struct cmdarg pidfile; + struct cmdarg collector_un_sockpath; + struct cmdarg distributor_un_sockpath; + struct cmdarg distributor_in_address; + struct cmdarg user; + struct cmdarg group; + struct cmdarg collector_group; + struct cmdarg distributor_group; + struct cmdarg max_remote_descriptors; + struct cmdarg max_write_buffers; + struct cmdarg bufferbloat_fallback_to_blocking; +#ifdef ENABLE_EPOLL + struct cmdarg use_poll; +#endif +} nDPIsrvd_options = {.config_file = CMDARG_STR(NULL), + .pidfile = CMDARG_STR(nDPIsrvd_PIDFILE), + .collector_un_sockpath = CMDARG_STR(COLLECTOR_UNIX_SOCKET), + .distributor_un_sockpath = CMDARG_STR(DISTRIBUTOR_UNIX_SOCKET), + .distributor_in_address = CMDARG_STR(NULL), + .user = CMDARG_STR(DEFAULT_CHUSER), + .group = CMDARG_STR(NULL), + .collector_group = CMDARG_STR(NULL), + .distributor_group = CMDARG_STR(NULL), + .max_remote_descriptors = CMDARG_ULL(nDPIsrvd_MAX_REMOTE_DESCRIPTORS), + .max_write_buffers = CMDARG_ULL(nDPIsrvd_MAX_WRITE_BUFFERS), + .bufferbloat_fallback_to_blocking = CMDARG_BOOL(1) +#ifdef ENABLE_EPOLL + , + .use_poll = CMDARG_BOOL(0) +#endif +}; +struct confopt config_map[] = {CONFOPT("pidfile", &nDPIsrvd_options.pidfile), + CONFOPT("collector", &nDPIsrvd_options.collector_un_sockpath), + CONFOPT("distributor-unix", &nDPIsrvd_options.distributor_un_sockpath), + CONFOPT("distributor-in", &nDPIsrvd_options.distributor_in_address), + CONFOPT("user", &nDPIsrvd_options.user), + CONFOPT("group", &nDPIsrvd_options.group), + CONFOPT("collector-group", &nDPIsrvd_options.collector_group), + CONFOPT("distributor-group", &nDPIsrvd_options.distributor_group), + CONFOPT("max-remote-descriptors", &nDPIsrvd_options.max_remote_descriptors), + CONFOPT("max-write-buffers", &nDPIsrvd_options.max_write_buffers), + CONFOPT("blocking-io-fallback", &nDPIsrvd_options.bufferbloat_fallback_to_blocking) +#ifdef ENABLE_EPOLL + , + CONFOPT("poll", &nDPIsrvd_options.use_poll) +#endif +}; + +static void logger_nDPIsrvd(struct remote_desc const * const remote, + char const * const prefix, + char const * const format, + ...); +static int fcntl_add_flags(int fd, int flags); +static int fcntl_del_flags(int fd, int flags); +static int add_in_event_fd(struct nio * const io, int fd); +static int add_in_event(struct nio * const io, struct remote_desc * const remote); +static int del_event(struct nio * const io, int fd); +static int set_in_event(struct nio * const io, struct remote_desc * const remote); +static void disconnect_client(struct nio * const io, struct remote_desc * const current); +static int drain_write_buffers_blocking(struct remote_desc * const remote); + +static void nDPIsrvd_buffer_array_copy(void * dst, const void * src) +{ + struct nDPIsrvd_write_buffer * const buf_dst = (struct nDPIsrvd_write_buffer *)dst; + struct nDPIsrvd_write_buffer const * const buf_src = (struct nDPIsrvd_write_buffer *)src; + + buf_dst->buf.ptr.raw = NULL; + if (nDPIsrvd_buffer_init(&buf_dst->buf, buf_src->buf.used) != 0) + { + logger(1, "Additional write buffer init failed, size: %zu bytes", buf_src->buf.used); + return; + } + + buf_dst->written = buf_src->written; + buf_dst->buf.used = buf_src->buf.used; + memcpy(buf_dst->buf.ptr.raw, buf_src->buf.ptr.raw, buf_src->buf.used); +} + +static void nDPIsrvd_buffer_array_dtor(void * elt) +{ + struct nDPIsrvd_write_buffer * const buf_dst = (struct nDPIsrvd_write_buffer *)elt; + + nDPIsrvd_buffer_free(&buf_dst->buf); + buf_dst->written = 0; +} + +static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_write_buffer), + NULL, + nDPIsrvd_buffer_array_copy, + nDPIsrvd_buffer_array_dtor}; + +#ifndef NO_MAIN +#ifdef ENABLE_MEMORY_PROFILING +void nDPIsrvd_memprof_log_alloc(size_t alloc_size) +{ + (void)alloc_size; +} + +void nDPIsrvd_memprof_log_free(size_t free_size) +{ + (void)free_size; +} + +void nDPIsrvd_memprof_log(char const * const format, ...) +{ + va_list ap; + + va_start(ap, format); + vlogger(0, format, ap); + va_end(ap); +} +#endif +#endif + +static struct nDPIsrvd_json_buffer * get_read_buffer(struct remote_desc * const remote) +{ + switch (remote->sock_type) + { + case COLLECTOR_UN: + return &remote->event_collector_un.main_read_buffer; + + case DISTRIBUTOR_UN: + case DISTRIBUTOR_IN: + return NULL; + } + + return NULL; +} + +static struct nDPIsrvd_write_buffer * get_write_buffer(struct remote_desc * const remote) +{ + switch (remote->sock_type) + { + case COLLECTOR_UN: + return NULL; + + case DISTRIBUTOR_UN: + return &remote->event_distributor_un.main_write_buffer; + + case DISTRIBUTOR_IN: + return &remote->event_distributor_in.main_write_buffer; + } + + return NULL; +} + +static UT_array * get_additional_write_buffers(struct remote_desc * const remote) +{ + switch (remote->sock_type) + { + case COLLECTOR_UN: + return NULL; + + case DISTRIBUTOR_UN: + return remote->event_distributor_un.additional_write_buffers; + + case DISTRIBUTOR_IN: + return remote->event_distributor_in.additional_write_buffers; + } + + return NULL; +} + +static int add_to_additional_write_buffers(struct remote_desc * const remote, + uint8_t * const buf, + nDPIsrvd_ull json_message_length) +{ + struct nDPIsrvd_write_buffer buf_src = {}; + UT_array * const additional_write_buffers = get_additional_write_buffers(remote); + + if (additional_write_buffers == NULL) + { + return -1; + } + + if (utarray_len(additional_write_buffers) >= GET_CMDARG_ULL(nDPIsrvd_options.max_write_buffers)) + { + if (GET_CMDARG_BOOL(nDPIsrvd_options.bufferbloat_fallback_to_blocking) == 0) + { + logger_nDPIsrvd(remote, + "Buffer limit for", + "for reached, remote too slow: %u lines", + utarray_len(additional_write_buffers)); + logger_nDPIsrvd(remote, "%s", "You can try to increase buffer limits with `-C'."); + return -1; + } + else + { + logger_nDPIsrvd(remote, + "Buffer limit for", + "reached, falling back to blocking I/O: %u lines", + utarray_len(additional_write_buffers)); + if (drain_write_buffers_blocking(remote) != 0) + { + return -1; + } + } + } + + buf_src.buf.ptr.raw = buf; + buf_src.buf.used = buf_src.buf.max = json_message_length; + utarray_push_back(additional_write_buffers, &buf_src); + + return 0; +} + +static void logger_nDPIsrvd(struct remote_desc const * const remote, + char const * const prefix, + char const * const format, + ...) +{ + char logbuf[512]; + va_list ap; + + va_start(ap, format); + vsnprintf(logbuf, sizeof(logbuf), format, ap); + + switch (remote->sock_type) + { + case DISTRIBUTOR_UN: +#if !defined(__FreeBSD__) && !defined(__APPLE__) + logger(1, + "%s PID %d (User: %s) %s", + prefix, + remote->event_distributor_un.pid, + remote->event_distributor_un.user_name, + logbuf); +#else + logger(1, "%s %s", prefix, logbuf); +#endif + break; + case DISTRIBUTOR_IN: + logger(1, + "%s %.*s:%u %s", + prefix, + (int)sizeof(remote->event_distributor_in.peer_addr), + remote->event_distributor_in.peer_addr, + ntohs(remote->event_distributor_in.peer.sin_port), + logbuf); + break; + case COLLECTOR_UN: +#if !defined(__FreeBSD__) && !defined(__APPLE__) + logger(1, "%s PID %d %s", prefix, remote->event_collector_un.pid, logbuf); +#else + logger(1, "%s %s", prefix, logbuf); +#endif + break; + } + + va_end(ap); +} + +static int drain_main_buffer(struct remote_desc * const remote) +{ + ssize_t bytes_written; + struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(remote); + + if (write_buffer == NULL) + { + return -1; + } + + if (write_buffer->buf.used == 0) + { + return 0; + } + + errno = 0; + while ((bytes_written = write(remote->fd, write_buffer->buf.ptr.raw, write_buffer->buf.used)) < 0 && errno == EINTR) + { + errno = 0; + } + if (errno == EAGAIN) + { + return 0; + } + if (bytes_written < 0 || errno != 0) + { + logger_nDPIsrvd(remote, "Distributor connection", "closed, send failed: %s", strerror(errno)); + return -1; + } + if (bytes_written == 0) + { + logger_nDPIsrvd(remote, "Distributor connection", "closed"); + return -1; + } + if ((size_t)bytes_written < write_buffer->buf.used) + { +#if 0 + logger_nDPIsrvd( + remote, "Distributor", "wrote less than expected: %zd < %zu", bytes_written, remote->buf.used); +#endif + memmove(write_buffer->buf.ptr.raw, + write_buffer->buf.ptr.raw + bytes_written, + write_buffer->buf.used - bytes_written); + } + + write_buffer->buf.used -= bytes_written; + return 0; +} + +static int drain_write_buffers(struct remote_desc * const remote) +{ + UT_array * const additional_write_buffers = get_additional_write_buffers(remote); + + errno = 0; + + if (drain_main_buffer(remote) != 0 || additional_write_buffers == NULL) + { + return -1; + } + + while (utarray_len(additional_write_buffers) > 0) + { + struct nDPIsrvd_write_buffer * buf = (struct nDPIsrvd_write_buffer *)utarray_front(additional_write_buffers); + ssize_t written; + + while ((written = write(remote->fd, buf->buf.ptr.raw + buf->written, buf->buf.used - buf->written)) < 0 && + errno == EINTR) + { + // Retry if interrupted by a signal. + } + switch (written) + { + case -1: + if (errno == EAGAIN) + { + return 0; + } + return -1; + case 0: + return -1; + default: + buf->written += written; + if (buf->written == buf->buf.max) + { + utarray_erase(additional_write_buffers, 0, 1); + } + break; + } + } + + return 0; +} + +static int drain_write_buffers_blocking(struct remote_desc * const remote) +{ + int retval = 0; + + if (fcntl_del_flags(remote->fd, O_NONBLOCK) != 0) + { + logger_nDPIsrvd(remote, "Error setting distributor", "fd flags to blocking mode: %s", strerror(errno)); + return -1; + } + if (drain_write_buffers(remote) != 0) + { + logger_nDPIsrvd(remote, "Could not drain buffers for", "in blocking I/O: %s", strerror(errno)); + retval = -1; + } + if (fcntl_add_flags(remote->fd, O_NONBLOCK) != 0) + { + logger_nDPIsrvd(remote, "Error setting distributor", "fd flags to non-blocking mode: %s", strerror(errno)); + return -1; + } + + return retval; +} + +static int handle_outgoing_data(struct nio * const io, struct remote_desc * const remote) +{ + UT_array * const additional_write_buffers = get_additional_write_buffers(remote); + + if (additional_write_buffers == NULL) + { + return -1; + } + if (drain_write_buffers(remote) != 0) + { + logger_nDPIsrvd(remote, "Could not drain buffers for", ": %s", strerror(errno)); + disconnect_client(io, remote); + return -1; + } + if (utarray_len(additional_write_buffers) == 0) + { + struct nDPIsrvd_write_buffer const * const write_buffer = get_write_buffer(remote); + + if (write_buffer->buf.used == 0) + { + return set_in_event(io, remote); + } + else + { + return drain_main_buffer(remote); + } + } + + return 0; +} + +static int fcntl_add_flags(int fd, int flags) +{ + int cur_flags = fcntl(fd, F_GETFL, 0); + + if (cur_flags == -1) + { + return 1; + } + + return fcntl(fd, F_SETFL, cur_flags | flags); +} + +static int fcntl_del_flags(int fd, int flags) +{ + int cur_flags = fcntl(fd, F_GETFL, 0); + + if (cur_flags == -1) + { + return -1; + } + + return fcntl(fd, F_SETFL, cur_flags & ~flags); +} + +static int create_listen_sockets(void) +{ + collector_un_sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + distributor_un_sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (collector_un_sockfd < 0 || distributor_un_sockfd < 0 || set_fd_cloexec(collector_un_sockfd) < 0 || + set_fd_cloexec(distributor_un_sockfd) < 0) + { + logger(1, "Error creating UNIX socket: %s", strerror(errno)); + return 1; + } + + if (IS_CMDARG_SET(nDPIsrvd_options.distributor_in_address) != 0) + { + distributor_in_sockfd = socket(distributor_in_address.raw.sa_family, SOCK_STREAM, 0); + if (distributor_in_sockfd < 0 || set_fd_cloexec(distributor_in_sockfd) < 0) + { + logger(1, "Error creating TCP/IP socket: %s", strerror(errno)); + return 1; + } + int opt = 1; + if (setsockopt(distributor_in_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) + { + logger(1, "Setting TCP/IP socket option SO_REUSEADDR failed: %s", strerror(errno)); + } + } + + { + int opt = 1; + if (setsockopt(collector_un_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 || + setsockopt(distributor_un_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) + { + logger(1, "Setting UNIX socket option SO_REUSEADDR failed: %s", strerror(errno)); + } + } + + { + struct sockaddr_un collector_addr; + collector_addr.sun_family = AF_UNIX; + int written = snprintf(collector_addr.sun_path, + sizeof(collector_addr.sun_path), + "%s", + GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath)); + if (written < 0) + { + logger(1, "snprintf failed: %s", strerror(errno)); + return 1; + } + else if (written == sizeof(collector_addr.sun_path)) + { + logger(1, "Collector UNIX socket path too long, max: %zu characters", sizeof(collector_addr.sun_path) - 1); + return 1; + } + + if (bind(collector_un_sockfd, (struct sockaddr *)&collector_addr, sizeof(collector_addr)) < 0) + { + logger(1, + "Error binding Collector UNIX socket to `%s': %s", + GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath), + strerror(errno)); + return 1; + } + } + + { + struct sockaddr_un distributor_addr; + distributor_addr.sun_family = AF_UNIX; + int written = snprintf(distributor_addr.sun_path, + sizeof(distributor_addr.sun_path), + "%s", + GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath)); + if (written < 0) + { + logger(1, "snprintf failed: %s", strerror(errno)); + return 2; + } + else if (written == sizeof(distributor_addr.sun_path)) + { + logger(1, + "Distributor UNIX socket path too long, max: %zu characters", + sizeof(distributor_addr.sun_path) - 1); + return 2; + } + + if (bind(distributor_un_sockfd, (struct sockaddr *)&distributor_addr, sizeof(distributor_addr)) < 0) + { + logger(1, + "Error binding Distributor socket to `%s': %s", + GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath), + strerror(errno)); + return 2; + } + } + + if (IS_CMDARG_SET(nDPIsrvd_options.distributor_in_address) != 0) + { + if (bind(distributor_in_sockfd, &distributor_in_address.raw, distributor_in_address.size) < 0) + { + logger(1, + "Error binding Distributor TCP/IP socket to %s: %s", + GET_CMDARG_STR(nDPIsrvd_options.distributor_in_address), + strerror(errno)); + return 3; + } + if (listen(distributor_in_sockfd, 16) < 0) + { + logger(1, + "Error listening Distributor TCP/IP socket to %s: %s", + GET_CMDARG_STR(nDPIsrvd_options.distributor_in_address), + strerror(errno)); + return 3; + } + if (fcntl_add_flags(distributor_in_sockfd, O_NONBLOCK) != 0) + { + logger(1, + "Error setting Distributor TCP/IP socket %s to non-blocking mode: %s", + GET_CMDARG_STR(nDPIsrvd_options.distributor_in_address), + strerror(errno)); + return 3; + } + } + + if (listen(collector_un_sockfd, 16) < 0 || listen(distributor_un_sockfd, 16) < 0) + { + logger(1, "Error listening UNIX socket: %s", strerror(errno)); + return 3; + } + + if (fcntl_add_flags(collector_un_sockfd, O_NONBLOCK) != 0) + { + logger(1, + "Error setting Collector UNIX socket `%s' to non-blocking mode: %s", + GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath), + strerror(errno)); + return 3; + } + + if (fcntl_add_flags(distributor_un_sockfd, O_NONBLOCK) != 0) + { + logger(1, + "Error setting Distributor UNIX socket `%s' to non-blocking mode: %s", + GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath), + strerror(errno)); + return 3; + } + + return 0; +} + +static struct remote_desc * get_remote_descriptor(enum sock_type type, int remote_fd, size_t max_buffer_size) +{ + if (remotes.desc_used == remotes.desc_size) + { + logger(1, "Max number of connections reached: %llu", remotes.desc_used); + return NULL; + } + + for (size_t i = 0; i < remotes.desc_size; ++i) + { + if (remotes.desc[i].fd == -1) + { + remotes.desc_used++; + + struct nDPIsrvd_write_buffer * write_buffer = NULL; + UT_array ** additional_write_buffers = NULL; + + switch (type) + { + case COLLECTOR_UN: + if (nDPIsrvd_json_buffer_init(&remotes.desc[i].event_collector_un.main_read_buffer, + max_buffer_size) != 0) + { + logger(1, "Read/JSON buffer init failed, size: %zu bytes", max_buffer_size); + return NULL; + } + break; + case DISTRIBUTOR_UN: + write_buffer = &remotes.desc[i].event_distributor_un.main_write_buffer; + additional_write_buffers = &remotes.desc[i].event_distributor_un.additional_write_buffers; + break; + case DISTRIBUTOR_IN: + write_buffer = &remotes.desc[i].event_distributor_in.main_write_buffer; + additional_write_buffers = &remotes.desc[i].event_distributor_in.additional_write_buffers; + break; + } + + if (additional_write_buffers != NULL && *additional_write_buffers == NULL) + { + utarray_new(*additional_write_buffers, &nDPIsrvd_buffer_array_icd); + if (*additional_write_buffers == NULL) + { + logger(1, "%s", "Could not create additional write buffers"); + return NULL; + } + } + if (write_buffer != NULL && nDPIsrvd_buffer_init(&write_buffer->buf, max_buffer_size) != 0) + { + logger(1, "Write buffer init failed, size: %zu bytes", max_buffer_size); + return NULL; + } + + remotes.desc[i].sock_type = type; + remotes.desc[i].fd = remote_fd; + return &remotes.desc[i]; + } + } + + logger(1, "%s", "BUG: Unknown error while finding the remote descriptor"); + return NULL; +} + +static void free_remote(struct nio * const io, struct remote_desc * remote) +{ + if (remote->fd > -1) + { + errno = 0; + if (del_event(io, remote->fd) != 0) + { + logger_nDPIsrvd(remote, + "Could not delete event from queue for connection", + ": %s", + (errno != 0 ? strerror(errno) : "Internal Error")); + } + errno = 0; + close(remote->fd); + + switch (remote->sock_type) + { + case COLLECTOR_UN: + if (errno != 0) + { + logger_nDPIsrvd(remote, "Error closing collector connection", ": %s", strerror(errno)); + } + nDPIsrvd_json_buffer_free(&remote->event_collector_un.main_read_buffer); + break; + case DISTRIBUTOR_UN: + if (errno != 0) + { + logger_nDPIsrvd(remote, "Error closing distributor connection", ": %s", strerror(errno)); + } + if (remote->event_distributor_un.additional_write_buffers != NULL) + { + utarray_free(remote->event_distributor_un.additional_write_buffers); + } + nDPIsrvd_buffer_free(&remote->event_distributor_un.main_write_buffer.buf); +#if !defined(__FreeBSD__) && !defined(__APPLE__) + free(remote->event_distributor_un.user_name); +#endif + break; + case DISTRIBUTOR_IN: + if (errno != 0) + { + logger_nDPIsrvd(remote, "Error closing distributor connection", ": %s", strerror(errno)); + } + if (remote->event_distributor_in.additional_write_buffers != NULL) + { + utarray_free(remote->event_distributor_in.additional_write_buffers); + } + nDPIsrvd_buffer_free(&remote->event_distributor_in.main_write_buffer.buf); + break; + } + + memset(remote, 0, sizeof(*remote)); + remote->fd = -1; + remotes.desc_used--; + } +} + +static void free_remotes(struct nio * const io) +{ + for (size_t i = 0; i < remotes.desc_size; ++i) + { + free_remote(io, &remotes.desc[i]); + } + nDPIsrvd_free(remotes.desc); + remotes.desc = NULL; + remotes.desc_used = 0; + remotes.desc_size = 0; +} + +static int add_in_event_fd(struct nio * const io, int fd) +{ + return nio_add_fd(io, fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS; +} + +static int add_in_event(struct nio * const io, struct remote_desc * const remote) +{ + return nio_add_fd(io, remote->fd, NIO_EVENT_INPUT, remote) != NIO_SUCCESS; +} + +static int set_out_event(struct nio * const io, struct remote_desc * const remote) +{ + return nio_mod_fd(io, remote->fd, NIO_EVENT_OUTPUT, remote) != NIO_SUCCESS; +} + +static int set_in_event(struct nio * const io, struct remote_desc * const remote) +{ + return nio_mod_fd(io, remote->fd, NIO_EVENT_INPUT, remote) != NIO_SUCCESS; +} + +static int del_event(struct nio * const io, int fd) +{ + return nio_del_fd(io, fd) != NIO_SUCCESS; +} + +static void disconnect_client(struct nio * const io, struct remote_desc * const remote) +{ + free_remote(io, remote); +} + +static int nDPIsrvd_parse_options(int argc, char ** argv) +{ + int opt; + + while ((opt = getopt(argc, argv, "f:lL:c:dp:s:S:G:m:u:g:C:Dvh")) != -1) + { + switch (opt) + { + case 'f': + set_cmdarg_string(&nDPIsrvd_options.config_file, optarg); + break; + case 'l': + enable_console_logger(); + break; + case 'L': + if (enable_file_logger(optarg) != 0) + { + return 1; + } + break; + case 'c': + set_cmdarg_string(&nDPIsrvd_options.collector_un_sockpath, optarg); + break; + case 'e': +#ifdef ENABLE_EPOLL + set_cmdarg_boolean(&nDPIsrvd_options.use_poll, 1); +#else + logger_early(1, "%s", "nDPIsrvd was built w/o epoll() support, poll() is already the default"); +#endif + break; + case 'd': + daemonize_enable(); + break; + case 'p': + set_cmdarg_string(&nDPIsrvd_options.pidfile, optarg); + break; + case 's': + set_cmdarg_string(&nDPIsrvd_options.distributor_un_sockpath, optarg); + break; + case 'S': + set_cmdarg_string(&nDPIsrvd_options.distributor_in_address, optarg); + break; + case 'G': + { + char const * const sep = strchr(optarg, ':'); + char group[256]; + + if (sep == NULL) + { + fprintf(stderr, "%s: Argument for `-G' is not in the format group:group\n", argv[0]); + return 1; + } + + if (snprintf(group, sizeof(group), "%.*s", (int)(sep - optarg), optarg) > 0) + { + set_cmdarg_string(&nDPIsrvd_options.collector_group, group); + } + if (snprintf(group, sizeof(group), "%s", sep + 1) > 0) + { + set_cmdarg_string(&nDPIsrvd_options.distributor_group, group); + } + break; + } + case 'm': + { + nDPIsrvd_ull tmp; + + if (str_value_to_ull(optarg, &tmp) != CONVERSION_OK) + { + fprintf(stderr, "%s: Argument for `-C' is not a number: %s\n", argv[0], optarg); + return 1; + } + set_cmdarg_ull(&nDPIsrvd_options.max_remote_descriptors, tmp); + break; + } + case 'u': + set_cmdarg_string(&nDPIsrvd_options.user, optarg); + break; + case 'g': + set_cmdarg_string(&nDPIsrvd_options.group, optarg); + break; + case 'C': + { + nDPIsrvd_ull tmp; + + if (str_value_to_ull(optarg, &tmp) != CONVERSION_OK) + { + fprintf(stderr, "%s: Argument for `-C' is not a number: %s\n", argv[0], optarg); + return 1; + } + set_cmdarg_ull(&nDPIsrvd_options.max_write_buffers, tmp); + break; + } + case 'D': + set_cmdarg_boolean(&nDPIsrvd_options.bufferbloat_fallback_to_blocking, 0); + break; + case 'v': + fprintf(stderr, "%s", get_nDPId_version()); + return 1; + case 'h': + default: + fprintf(stderr, "%s\n", get_nDPId_version()); + fprintf(stderr, + "Usage: %s [-f config-file] [-l] [-L logfile]\n" + "\t[-c path-to-unix-sock] [-e] [-d] [-p pidfile]\n" + "\t[-s path-to-distributor-unix-socket] [-S distributor-host:port]\n" + "\t[-G collector-unix-socket-group:distributor-unix-socket-group]\n" + "\t[-m max-remote-descriptors] [-u user] [-g group]\n" + "\t[-C max-buffered-json-lines] [-D]\n" + "\t[-v] [-h]\n\n" + "\t-f\tLoad nDPIsrvd options from a configuration file.\n" + "\t-l\tLog all messages to stderr.\n" + "\t-L\tLog all messages to a log file.\n" + "\t-c\tPath to a listening UNIX socket (nDPIsrvd Collector).\n" + "\t \tDefault: %s\n" + "\t-e\tUse poll() instead of epoll().\n" + "\t \tDefault: epoll() on Linux, poll() otherwise\n" + "\t-d\tFork into background after initialization.\n" + "\t-p\tWrite the daemon PID to the given file path.\n" + "\t \tDefault: %s\n" + "\t-m\tMax accepted (Collector and Distributor) clients.\n" + "\t-u\tChange UID to the numeric value of user.\n" + "\t \tDefault: %s\n" + "\t-g\tChange GID to the numeric value of group.\n" + "\t-C\tMax buffered JSON lines before nDPIsrvd disconnects/blocking-IO a client.\n" + "\t-D\tDisconnect a slow client instead of falling back to blocking-IO.\n" + "\t-s\tPath to a listening UNIX socket (nDPIsrvd Distributor).\n" + "\t \tDefault: %s\n" + "\t-S\tAddress:Port of the listening TCP/IP socket (nDPIsrvd Distributor).\n" + "\t-G\tGroup owner of the UNIX collector/distributor socket.\n" + "\t \tDefault: Either the group set via `-g', otherwise the primary group of `-u'\n" + "\t-v\tversion\n" + "\t-h\tthis\n\n", + argv[0], + nDPIsrvd_options.collector_un_sockpath.string.default_value, + nDPIsrvd_options.pidfile.string.default_value, + nDPIsrvd_options.user.string.default_value, + nDPIsrvd_options.distributor_un_sockpath.string.default_value); + return 1; + } + } + + set_config_defaults(&config_map[0], nDPIsrvd_ARRAY_LENGTH(config_map)); + + if (is_path_absolute("Pidfile", GET_CMDARG_STR(nDPIsrvd_options.pidfile)) != 0) + { + return 1; + } + + if (is_path_absolute("Collector UNIX socket", GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath)) != 0) + { + return 1; + } + + if (is_path_absolute("Distributor UNIX socket", GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath)) != 0) + { + return 1; + } + + if (IS_CMDARG_SET(nDPIsrvd_options.distributor_in_address) != 0) + { + if (nDPIsrvd_setup_address(&distributor_in_address, GET_CMDARG_STR(nDPIsrvd_options.distributor_in_address)) != + 0) + { + logger_early(1, + "%s: Could not parse address %s", + argv[0], + GET_CMDARG_STR(nDPIsrvd_options.distributor_in_address)); + return 1; + } + if (distributor_in_address.raw.sa_family == AF_UNIX) + { + logger_early(1, + "%s: You've requested to setup another UNIX socket `%s', but there is already one at `%s'", + argv[0], + GET_CMDARG_STR(nDPIsrvd_options.distributor_in_address), + GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath)); + return 1; + } + } + + if (optind < argc) + { + logger_early(1, "%s: Unexpected argument after options", argv[0]); + return 1; + } + + return 0; +} + +static struct remote_desc * accept_remote(int server_fd, + enum sock_type socktype, + struct sockaddr * const sockaddr, + socklen_t * const addrlen) +{ + int client_fd; + + while ((client_fd = accept(server_fd, sockaddr, addrlen)) < 0 && errno == EINTR) {} + if (client_fd < 0 || set_fd_cloexec(client_fd) < 0) + { + logger(1, "Accept failed: %s", strerror(errno)); + return NULL; + } + + struct remote_desc * current = get_remote_descriptor(socktype, client_fd, NETWORK_BUFFER_MAX_SIZE); + if (current == NULL) + { + return NULL; + } + + return current; +} + +static int new_connection(struct nio * const io, int eventfd) +{ + union + { + struct sockaddr_un saddr_collector_un; + struct sockaddr_un saddr_distributor_un; + struct sockaddr_in saddr_distributor_in; + } sockaddr; + + socklen_t peer_addr_len; + enum sock_type stype; + int server_fd; + if (eventfd == collector_un_sockfd) + { + peer_addr_len = sizeof(sockaddr.saddr_collector_un); + stype = COLLECTOR_UN; + server_fd = collector_un_sockfd; + } + else if (eventfd == distributor_un_sockfd) + { + peer_addr_len = sizeof(sockaddr.saddr_distributor_un); + stype = DISTRIBUTOR_UN; + server_fd = distributor_un_sockfd; + } + else if (eventfd == distributor_in_sockfd) + { + peer_addr_len = sizeof(sockaddr.saddr_distributor_in); + stype = DISTRIBUTOR_IN; + server_fd = distributor_in_sockfd; + } + else + { + return 1; + } + + struct remote_desc * const current = accept_remote(server_fd, stype, (struct sockaddr *)&sockaddr, &peer_addr_len); + if (current == NULL) + { + return 1; + } + + int sockopt; + switch (current->sock_type) + { + case COLLECTOR_UN: + current->event_collector_un.peer = sockaddr.saddr_collector_un; + current->event_collector_un.json_bytes = 0; + + sockopt = NETWORK_BUFFER_MAX_SIZE; + if (setsockopt(current->fd, SOL_SOCKET, SO_RCVBUF, &sockopt, sizeof(sockopt)) < 0) + { + logger(1, "Error setting socket option SO_RCVBUF: %s", strerror(errno)); + return 1; + } + +#if !defined(__FreeBSD__) && !defined(__APPLE__) + struct ucred ucred = {}; + socklen_t ucred_len = sizeof(ucred); + if (getsockopt(current->fd, SOL_SOCKET, SO_PEERCRED, &ucred, &ucred_len) == -1) + { + logger(1, "Error getting credentials from UNIX socket: %s", strerror(errno)); + return 1; + } + current->event_collector_un.pid = ucred.pid; +#endif + + logger_nDPIsrvd(current, "New collector connection from", ""); + break; + case DISTRIBUTOR_UN: + case DISTRIBUTOR_IN: + if (current->sock_type == DISTRIBUTOR_UN) + { + current->event_distributor_un.peer = sockaddr.saddr_distributor_un; + +#if !defined(__FreeBSD__) && !defined(__APPLE__) + struct ucred ucred = {}; + socklen_t ucred_len = sizeof(ucred); + if (getsockopt(current->fd, SOL_SOCKET, SO_PEERCRED, &ucred, &ucred_len) == -1) + { + logger(1, "Error getting credentials from UNIX socket: %s", strerror(errno)); + return 1; + } + + struct passwd pwnam = {}; + struct passwd * pwres = NULL; + ssize_t pwsiz = sysconf(_SC_GETPW_R_SIZE_MAX); + if (pwsiz == -1) + { + pwsiz = BUFSIZ; + } + char buf[pwsiz]; + if (getpwuid_r(ucred.uid, &pwnam, &buf[0], pwsiz, &pwres) != 0 || pwres == NULL) + { + logger(1, "Could not get passwd entry for user id %u", ucred.uid); + return 1; + } + + current->event_distributor_un.pid = ucred.pid; + current->event_distributor_un.user_name = strdup(pwres->pw_name); +#endif + } + else + { + current->event_distributor_in.peer = sockaddr.saddr_distributor_in; + + sockopt = 1; + if (setsockopt(current->fd, SOL_SOCKET, SO_RCVBUF, &sockopt, sizeof(sockopt)) < 0) + { + logger(1, "Error setting socket option SO_RCVBUF: %s", strerror(errno)); + return 1; + } + + if (inet_ntop(current->event_distributor_in.peer.sin_family, + ¤t->event_distributor_in.peer.sin_addr, + ¤t->event_distributor_in.peer_addr[0], + sizeof(current->event_distributor_in.peer_addr)) == NULL) + { + logger(1, "Error converting an internet address: %s", strerror(errno)); + return 1; + } + } + + sockopt = NETWORK_BUFFER_MAX_SIZE; + if (setsockopt(current->fd, SOL_SOCKET, SO_SNDBUF, &sockopt, sizeof(sockopt)) < 0) + { + logger(1, "Error setting socket option SO_SNDBUF: %s", strerror(errno)); + return 1; + } + + { + struct timeval send_timeout = {1, 0}; + if (setsockopt(current->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&send_timeout, sizeof(send_timeout)) != 0) + { + logger(1, "Error setting socket option send timeout: %s", strerror(errno)); + } + } + + logger_nDPIsrvd(current, "New distributor connection from", ""); + break; + } + + /* nonblocking fd is mandatory */ + if (fcntl_add_flags(current->fd, O_NONBLOCK) != 0) + { + logger(1, "Error setting fd flags to non-blocking mode: %s", strerror(errno)); + disconnect_client(io, current); + return 1; + } + + /* shutdown writing end for collector clients */ + if (current->sock_type == COLLECTOR_UN) + { + shutdown(current->fd, SHUT_WR); // collector + /* shutdown reading end for distributor clients does not work due to epoll usage */ + } + + /* setup event I/O */ + errno = 0; + if (add_in_event(io, current) != NIO_SUCCESS) + { + logger(1, "Error adding input event to %d: %s", current->fd, (errno != 0 ? strerror(errno) : "Internal Error")); + disconnect_client(io, current); + return 1; + } + + return 0; +} + +static int handle_collector_protocol(struct nio * const io, struct remote_desc * const current) +{ + struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current); + char * json_msg_start = NULL; + + if (json_read_buffer == NULL) + { + return 1; + } + + if (json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{') + { + logger_nDPIsrvd(current, + "BUG: Collector connection", + "JSON invalid opening character: '%c'", + json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]); + disconnect_client(io, current); + return 1; + } + + errno = 0; + current->event_collector_un.json_bytes = strtoull(json_read_buffer->buf.ptr.text, &json_msg_start, 10); + current->event_collector_un.json_bytes += json_msg_start - json_read_buffer->buf.ptr.text; + + if (errno == ERANGE) + { + logger_nDPIsrvd(current, "BUG: Collector connection", "JSON message length exceeds numceric limits"); + disconnect_client(io, current); + return 1; + } + + if (json_msg_start == json_read_buffer->buf.ptr.text) + { + logger_nDPIsrvd(current, + "BUG: Collector connection", + "missing JSON message length in protocol preamble: \"%.*s\"", + NETWORK_BUFFER_LENGTH_DIGITS, + json_read_buffer->buf.ptr.text); + disconnect_client(io, current); + return 1; + } + + if (json_msg_start - json_read_buffer->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS) + { + logger_nDPIsrvd(current, + "BUG: Collector connection", + "invalid collector protocol data received. Expected protocol preamble of size %u bytes, got " + "%ld " + "bytes", + NETWORK_BUFFER_LENGTH_DIGITS, + (long int)(json_msg_start - json_read_buffer->buf.ptr.text)); + } + + if (current->event_collector_un.json_bytes > json_read_buffer->buf.max) + { + logger_nDPIsrvd(current, + "BUG: Collector connection", + "JSON message too big: %llu > %zu", + current->event_collector_un.json_bytes, + json_read_buffer->buf.max); + disconnect_client(io, current); + return 1; + } + + if (current->event_collector_un.json_bytes > json_read_buffer->buf.used) + { + return 1; + } + + if (json_read_buffer->buf.ptr.text[current->event_collector_un.json_bytes - 2] != '}' || + json_read_buffer->buf.ptr.text[current->event_collector_un.json_bytes - 1] != '\n') + { + logger_nDPIsrvd(current, + "BUG: Collector connection", + "invalid JSON message: %.*s...", + (int)current->event_collector_un.json_bytes > 512 ? 512 + : (int)current->event_collector_un.json_bytes, + json_read_buffer->buf.ptr.text); + disconnect_client(io, current); + return 1; + } + + return 0; +} + +static int handle_incoming_data(struct nio * const io, struct remote_desc * const current) +{ + struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current); + + if (json_read_buffer == NULL) + { + unsigned char garbage = 0; + + if (read(current->fd, &garbage, sizeof(garbage)) == sizeof(garbage)) + { + logger_nDPIsrvd(current, "Received data from", "who is not allowed to send us some."); + } + else + { + logger_nDPIsrvd(current, "Distributor connection", "closed"); + } + disconnect_client(io, current); + return 1; + } + + /* read JSON messages (or parts) from the UNIX socket (collecting) */ + if (json_read_buffer->buf.used == json_read_buffer->buf.max) + { + logger_nDPIsrvd(current, + "Collector connection", + "read buffer (%zu bytes) full. No more read possible.", + json_read_buffer->buf.max); + } + else + { + errno = 0; + ssize_t bytes_read; + + while ((bytes_read = read(current->fd, + json_read_buffer->buf.ptr.raw + json_read_buffer->buf.used, + json_read_buffer->buf.max - json_read_buffer->buf.used)) < 0 && + errno == EINTR) + { + // Retry if interrupted by a signal. + } + if (bytes_read < 0 || errno != 0) + { + logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno)); + disconnect_client(io, current); + return 1; + } + if (bytes_read == 0) + { + logger_nDPIsrvd(current, "Collector connection", "closed during read"); + disconnect_client(io, current); + return 1; + } + json_read_buffer->buf.used += bytes_read; + } + + while (json_read_buffer->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1) + { + if (handle_collector_protocol(io, current) != 0) + { + break; + } + + for (size_t i = 0; i < remotes.desc_size; ++i) + { + struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(&remotes.desc[i]); + UT_array * const additional_write_buffers = get_additional_write_buffers(&remotes.desc[i]); + + if (remotes.desc[i].fd < 0 || write_buffer == NULL || additional_write_buffers == NULL) + { + continue; + } + + if (current->event_collector_un.json_bytes > write_buffer->buf.max - write_buffer->buf.used || + utarray_len(additional_write_buffers) > 0) + { + if (utarray_len(additional_write_buffers) == 0) + { + errno = 0; + if (set_out_event(io, &remotes.desc[i]) != 0) + { + logger_nDPIsrvd(&remotes.desc[i], + "Could not add event to", + ", disconnecting: %s", + (errno != 0 ? strerror(errno) : "Internal Error")); + disconnect_client(io, &remotes.desc[i]); + continue; + } + } + if (add_to_additional_write_buffers(&remotes.desc[i], + json_read_buffer->buf.ptr.raw, + current->event_collector_un.json_bytes) != 0) + { + disconnect_client(io, &remotes.desc[i]); + continue; + } + } + else + { + memcpy(write_buffer->buf.ptr.raw + write_buffer->buf.used, + json_read_buffer->buf.ptr.raw, + current->event_collector_un.json_bytes); + write_buffer->buf.used += current->event_collector_un.json_bytes; + } + + if (drain_main_buffer(&remotes.desc[i]) != 0) + { + disconnect_client(io, &remotes.desc[i]); + } + } + + memmove(json_read_buffer->buf.ptr.raw, + json_read_buffer->buf.ptr.raw + current->event_collector_un.json_bytes, + json_read_buffer->buf.used - current->event_collector_un.json_bytes); + json_read_buffer->buf.used -= current->event_collector_un.json_bytes; + current->event_collector_un.json_bytes = 0; + } + + return 0; +} + +static int handle_data_event(struct nio * const io, int index) +{ + struct remote_desc * const current = (struct remote_desc *)nio_get_ptr(io, index); + + if (nio_has_input(io, index) != NIO_SUCCESS && nio_can_output(io, index) != NIO_SUCCESS) + { + logger(1, "%s", "Neither input nor output event set."); + return 1; + } + + if (current == NULL) + { + logger(1, "%s", "Remote descriptor got from event data invalid."); + return 1; + } + + if (current->fd < 0) + { + logger(1, "File descriptor `%d' got from event data invalid.", current->fd); + return 1; + } + + if (nio_has_input(io, index) == NIO_SUCCESS) + { + return handle_incoming_data(io, current); + } + else + { + return handle_outgoing_data(io, current); + } +} + +#if !defined(__FreeBSD__) && !defined(__APPLE__) +static int setup_signalfd(struct nio * const io) +{ + sigset_t mask; + int sfd; + + sigemptyset(&mask); + sigaddset(&mask, SIGINT); + sigaddset(&mask, SIGTERM); + sigaddset(&mask, SIGQUIT); + + if (sigprocmask(SIG_BLOCK, &mask, NULL) == -1) + { + return -1; + } + sfd = signalfd(-1, &mask, 0); + if (sfd == -1) + { + return -1; + } + + if (add_in_event_fd(io, sfd) != 0) + { + return -1; + } + + if (fcntl_add_flags(sfd, O_NONBLOCK) != 0) + { + return -1; + } + + return sfd; +} +#endif + +static int mainloop(struct nio * const io) +{ +#if !defined(__FreeBSD__) && !defined(__APPLE__) + int signalfd = setup_signalfd(io); +#endif + + while (nDPIsrvd_main_thread_shutdown == 0) + { + if (nio_run(io, 1000) != NIO_SUCCESS) + { + logger(1, "Event I/O returned error: %s", strerror(errno)); + } + + int nready = nio_get_nready(io); + + for (int i = 0; i < nready; i++) + { + int fd = nio_get_fd(io, i); + + if (nio_has_error(io, i) == NIO_SUCCESS) + { + if (fd != collector_un_sockfd && fd != distributor_un_sockfd && fd != distributor_in_sockfd) + { + struct remote_desc * const current = (struct remote_desc *)nio_get_ptr(io, i); + switch (current->sock_type) + { + case COLLECTOR_UN: + logger_nDPIsrvd(current, "Collector connection", "closed"); + break; + case DISTRIBUTOR_UN: + case DISTRIBUTOR_IN: + logger_nDPIsrvd(current, "Distributor connection", "closed"); + break; + } + disconnect_client(io, current); + } + else + { + logger(1, "Event I/O error: %s", (errno != 0 ? strerror(errno) : "unknown")); + } + break; + } + + if (fd == collector_un_sockfd || fd == distributor_un_sockfd || fd == distributor_in_sockfd) + { + /* New connection to collector / distributor. */ + if (new_connection(io, fd) != 0) + { + continue; + } + } +#if !defined(__FreeBSD__) && !defined(__APPLE__) + else if (fd == signalfd) + { + struct signalfd_siginfo fdsi; + ssize_t s; + + s = read(signalfd, &fdsi, sizeof(struct signalfd_siginfo)); + if (s != sizeof(struct signalfd_siginfo)) + { + if (s < 0) + { + logger(1, "Read from signal fd returned: %s", strerror(errno)); + nDPIsrvd_main_thread_shutdown = 1; + } + else + { + logger(1, + "Invalid signal fd read size. Got %zd, wanted %zu bytes.", + s, + sizeof(struct signalfd_siginfo)); + } + continue; + } + + if (fdsi.ssi_signo == SIGINT || fdsi.ssi_signo == SIGTERM || fdsi.ssi_signo == SIGQUIT) + { + nDPIsrvd_main_thread_shutdown = 1; + continue; + } + } +#endif + else + { + /* Incoming data / Outoing data ready to receive / send. */ + if (handle_data_event(io, i) != 0) + { + /* do nothing */ + } + } + } + } + + free_remotes(io); + nio_free(io); +#if !defined(__FreeBSD__) && !defined(__APPLE__) + close(signalfd); +#endif + + return 0; +} + +static int setup_event_queue(struct nio * const io) +{ +#ifdef ENABLE_EPOLL + if ((GET_CMDARG_BOOL(nDPIsrvd_options.use_poll) == 0 && nio_use_epoll(io, 32) != NIO_SUCCESS) || + (GET_CMDARG_BOOL(nDPIsrvd_options.use_poll) != 0 && + nio_use_poll(io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)) +#else + if (nio_use_poll(io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS) +#endif + { + logger(1, "%s", "Event I/O poll/epoll setup failed"); + return -1; + } + + errno = 0; + if (add_in_event_fd(io, collector_un_sockfd) != 0) + { + logger(1, + "Error adding collector UNIX socket fd to event I/O: %s", + (errno != 0 ? strerror(errno) : "Internal Error")); + return -1; + } + + errno = 0; + if (add_in_event_fd(io, distributor_un_sockfd) != 0) + { + logger(1, + "Error adding distributor UNIX socket fd to event I/O: %s", + (errno != 0 ? strerror(errno) : "Internal Error")); + return -1; + } + + if (distributor_in_sockfd >= 0) + { + errno = 0; + if (add_in_event_fd(io, distributor_in_sockfd) != 0) + { + logger(1, + "Error adding distributor TCP/IP socket fd to event I/O: %s", + (errno != 0 ? strerror(errno) : "Internal Error")); + return -1; + } + } + + return 0; +} + +static int setup_remote_descriptors(nDPIsrvd_ull max_remote_descriptors) +{ + remotes.desc_used = 0; + remotes.desc_size = max_remote_descriptors; + remotes.desc = (struct remote_desc *)nDPIsrvd_calloc(remotes.desc_size, sizeof(*remotes.desc)); + if (remotes.desc == NULL) + { + return -1; + } + for (size_t i = 0; i < remotes.desc_size; ++i) + { + remotes.desc[i].fd = -1; + } + + return 0; +} + +static int nDPIsrvd_parsed_config_line( + int lineno, char const * const section, char const * const name, char const * const value, void * const user_data) +{ + (void)user_data; + + if (strnlen(section, INI_MAX_SECTION) == nDPIsrvd_STRLEN_SZ("general") && + strncmp(section, "general", INI_MAX_SECTION) == 0) + { + size_t i; + + for (i = 0; i < nDPIsrvd_ARRAY_LENGTH(config_map); ++i) + { + if (strnlen(name, INI_MAX_NAME) == strnlen(config_map[i].key, INI_MAX_NAME) && + strncmp(name, config_map[i].key, INI_MAX_NAME) == 0) + { + if (IS_CMDARG_SET(*config_map[i].opt) != 0) + { + logger_early(1, "General config key `%s' already set, ignoring value `%s'", name, value); + } + else + { + if (set_config_from(&config_map[i], value) != 0) + { + return 0; + } + } + break; + } + } + if (i == nDPIsrvd_ARRAY_LENGTH(config_map)) + { + logger_early(1, "Invalid general config key `%s' at line %d", name, lineno); + } + } + else + { + logger_early( + 1, "Invalid config section `%s' at line %d with key `%s' and value `%s'", section, lineno, name, value); + } + + return 1; +} + +#ifndef NO_MAIN +int main(int argc, char ** argv) +{ + int retval = 1; + struct nio io; + + if (argc == 0) + { + return 1; + } + + nio_init(&io); + init_logging("nDPIsrvd"); + + if (nDPIsrvd_parse_options(argc, argv) != 0) + { + return 1; + } + { + int ret; + + if (IS_CMDARG_SET(nDPIsrvd_options.config_file) != 0 && + (ret = + parse_config_file(GET_CMDARG_STR(nDPIsrvd_options.config_file), nDPIsrvd_parsed_config_line, NULL)) != + 0) + { + if (ret > 0) + { + logger_early(1, "Config file `%s' is malformed", GET_CMDARG_STR(nDPIsrvd_options.config_file)); + } + else if (ret == -ENOENT) + { + logger_early(1, "Path `%s' is not a regular file", GET_CMDARG_STR(nDPIsrvd_options.config_file)); + } + else + { + logger_early(1, + "Could not open file `%s' for reading: %s", + GET_CMDARG_STR(nDPIsrvd_options.config_file), + strerror(errno)); + } + return 1; + } + } + + if (is_daemonize_enabled() != 0 && is_console_logger_enabled() != 0) + { + logger_early(1, + "%s", + "Daemon mode `-d' and `-l' can not be used together, " + "because stdout/stderr is beeing redirected to /dev/null"); + return 1; + } + + if (access(GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath), F_OK) == 0) + { + logger_early(1, + "UNIX socket `%s' exists; nDPIsrvd already running? " + "Please remove the socket manually or change socket path.", + GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath)); + return 1; + } + + if (access(GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath), F_OK) == 0) + { + logger_early(1, + "UNIX socket `%s' exists; nDPIsrvd already running? " + "Please remove the socket manually or change socket path.", + GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath)); + return 1; + } + + log_app_info(); + + if (daemonize_with_pidfile(GET_CMDARG_STR(nDPIsrvd_options.pidfile)) != 0) + { + goto error; + } + + if (setup_remote_descriptors(GET_CMDARG_ULL(nDPIsrvd_options.max_remote_descriptors)) != 0) + { + goto error; + } + + switch (create_listen_sockets()) + { + case 0: + break; + case 1: + goto error; + case 2: + if (unlink(GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath)) != 0) + { + logger(1, + "Could not unlink `%s': %s", + GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath), + strerror(errno)); + } + goto error; + case 3: + goto error_unlink_sockets; + default: + goto error; + } + + logger(0, "collector UNIX socket listen on `%s'", GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath)); + logger(0, "distributor UNIX listen on `%s'", GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath)); + switch (distributor_in_address.raw.sa_family) + { + default: + goto error_unlink_sockets; + case AF_INET: + case AF_INET6: + logger(1, + "Please keep in mind that using a TCP Socket may leak sensitive information to " + "everyone with access to the device/network. You've been warned!"); + break; + case AF_UNIX: + case (sa_family_t)0xFFFF: + break; + } + + int ret = chmod_chown(GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath), + S_IRUSR | S_IWUSR | S_IWGRP, + GET_CMDARG_STR(nDPIsrvd_options.user), + IS_CMDARG_SET(nDPIsrvd_options.collector_group) != 0 + ? GET_CMDARG_STR(nDPIsrvd_options.collector_group) + : GET_CMDARG_STR(nDPIsrvd_options.group)); + if (ret != 0) + { + if (IS_CMDARG_SET(nDPIsrvd_options.collector_group) != 0 || IS_CMDARG_SET(nDPIsrvd_options.group) != 0) + { + logger(1, + "Could not chmod/chown `%s' to user `%s' and group `%s': %s", + GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath), + GET_CMDARG_STR(nDPIsrvd_options.user), + IS_CMDARG_SET(nDPIsrvd_options.collector_group) != 0 + ? GET_CMDARG_STR(nDPIsrvd_options.collector_group) + : GET_CMDARG_STR(nDPIsrvd_options.group), + strerror(errno)); + } + else + { + logger(1, + "Could not chmod/chown `%s' to user `%s': %s", + GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath), + GET_CMDARG_STR(nDPIsrvd_options.user), + strerror(errno)); + } + if (ret != -EPERM) + { + goto error_unlink_sockets; + } + } + + ret = chmod_chown(GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath), + S_IRUSR | S_IWUSR | S_IWGRP, + GET_CMDARG_STR(nDPIsrvd_options.user), + IS_CMDARG_SET(nDPIsrvd_options.distributor_group) != 0 + ? GET_CMDARG_STR(nDPIsrvd_options.distributor_group) + : GET_CMDARG_STR(nDPIsrvd_options.group)); + if (ret != 0) + { + if (IS_CMDARG_SET(nDPIsrvd_options.distributor_group) != 0 || IS_CMDARG_SET(nDPIsrvd_options.group) != 0) + { + logger(1, + "Could not chmod/chown `%s' to user `%s' and group `%s': %s", + GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath), + GET_CMDARG_STR(nDPIsrvd_options.user), + IS_CMDARG_SET(nDPIsrvd_options.distributor_group) != 0 + ? GET_CMDARG_STR(nDPIsrvd_options.distributor_group) + : GET_CMDARG_STR(nDPIsrvd_options.group), + strerror(errno)); + } + else + { + logger(1, + "Could not chmod/chown `%s' to user `%s': %s", + GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath), + GET_CMDARG_STR(nDPIsrvd_options.user), + strerror(errno)); + } + if (ret != -EPERM) + { + goto error_unlink_sockets; + } + } + + ret = change_user_group(GET_CMDARG_STR(nDPIsrvd_options.user), + GET_CMDARG_STR(nDPIsrvd_options.group), + GET_CMDARG_STR(nDPIsrvd_options.pidfile)); + if (ret != 0 && ret != -EPERM) + { + if (GET_CMDARG_STR(nDPIsrvd_options.group) != NULL) + { + logger(1, + "Change user/group to %s/%s failed: %s", + GET_CMDARG_STR(nDPIsrvd_options.user), + GET_CMDARG_STR(nDPIsrvd_options.group), + strerror(-ret)); + } + else + { + logger(1, "Change user to %s failed: %s", GET_CMDARG_STR(nDPIsrvd_options.user), strerror(-ret)); + } + goto error_unlink_sockets; + } + + signal(SIGPIPE, SIG_IGN); +#if !defined(__FreeBSD__) && !defined(__APPLE__) + signal(SIGINT, SIG_IGN); + signal(SIGTERM, SIG_IGN); + signal(SIGQUIT, SIG_IGN); +#endif + + if (setup_event_queue(&io) != 0) + { + goto error_unlink_sockets; + } + + retval = mainloop(&io); + +error_unlink_sockets: + if (unlink(GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath)) != 0) + { + logger(1, "Could not unlink `%s': %s", GET_CMDARG_STR(nDPIsrvd_options.collector_un_sockpath), strerror(errno)); + } + if (unlink(GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath)) != 0) + { + logger(1, + "Could not unlink `%s': %s", + GET_CMDARG_STR(nDPIsrvd_options.distributor_un_sockpath), + strerror(errno)); + } +error: + close(collector_un_sockfd); + close(distributor_un_sockfd); + close(distributor_in_sockfd); + + daemonize_shutdown(GET_CMDARG_STR(nDPIsrvd_options.pidfile)); + logger(0, "Bye."); + shutdown_logging(); + + return retval; +} +#endif |