#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <syslog.h>
#include <sys/epoll.h>
#include <sys/signalfd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include "config.h"
#include "nDPIsrvd.h"
#include "utils.h"

enum sock_type
{
    JSON_SOCK,
    SERV_SOCK
};

struct remote_desc
{
    enum sock_type sock_type;
    int fd;
    struct nDPIsrvd_buffer buf;
    union {
        struct
        {
            int json_sockfd;
            struct sockaddr_un peer;
            unsigned long long int json_bytes;
        } event_json;
        struct
        {
            int serv_sockfd;
            struct sockaddr_in peer;
            char peer_addr[INET_ADDRSTRLEN];
        } event_serv;
    };
};

static struct
{
    struct remote_desc * desc;
    size_t desc_size;
    size_t desc_used;
} remotes = {NULL, 0, 0};

static int nDPIsrvd_main_thread_shutdown = 0;
static int json_sockfd;
static int serv_sockfd;
static struct nDPIsrvd_address serv_address = {
    .raw.sa_family = 0xFFFF,
};

static struct
{
    int log_to_stderr;
    char * pidfile;
    char * json_sockpath;
    char * serv_optarg;
    char * user;
    char * group;
} nDPIsrvd_options = {};

static int fcntl_add_flags(int fd, int flags)
{
    int cur_flags = fcntl(fd, F_GETFL, 0);

    if (cur_flags == -1)
    {
        return 1;
    }

    return fcntl(fd, F_SETFL, cur_flags | flags);
}

static int fcntl_del_flags(int fd, int flags)
{
    int cur_flags = fcntl(fd, F_GETFL, 0);

    if (cur_flags == -1)
    {
        return 1;
    }

    return fcntl(fd, F_SETFL, cur_flags & ~flags);
}

static int create_listen_sockets(void)
{
    json_sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
    serv_sockfd = socket(serv_address.raw.sa_family, SOCK_STREAM, 0);
    if (json_sockfd < 0 || serv_sockfd < 0)
    {
        syslog(LOG_DAEMON | LOG_ERR, "Error opening socket: %s", strerror(errno));
        return 1;
    }

    int opt = 1;
    if (setsockopt(json_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 ||
        setsockopt(serv_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
    {
        syslog(LOG_DAEMON | LOG_ERR, "setsockopt with SO_REUSEADDR failed: %s", strerror(errno));
        return 1;
    }

    struct sockaddr_un json_addr;
    json_addr.sun_family = AF_UNIX;
    if (snprintf(json_addr.sun_path, sizeof(json_addr.sun_path), "%s", nDPIsrvd_options.json_sockpath) <= 0)
    {
        syslog(LOG_DAEMON | LOG_ERR, "snprintf failed: %s", strerror(errno));
        return 1;
    }

    if (bind(json_sockfd, (struct sockaddr *)&json_addr, sizeof(json_addr)) < 0)
    {
        unlink(nDPIsrvd_options.json_sockpath);
        syslog(LOG_DAEMON | LOG_ERR,
               "Error on binding UNIX socket (collector) to %s: %s",
               nDPIsrvd_options.json_sockpath,
               strerror(errno));
        return 1;
    }

    if (bind(serv_sockfd, &serv_address.raw, serv_address.size) < 0)
    {
        syslog(LOG_DAEMON | LOG_ERR,
               "Error on binding socket (distributor) to %s: %s",
               nDPIsrvd_options.serv_optarg,
               strerror(errno));
        unlink(nDPIsrvd_options.json_sockpath);
        return 1;
    }

    if (listen(json_sockfd, 16) < 0 || listen(serv_sockfd, 16) < 0)
    {
        unlink(nDPIsrvd_options.json_sockpath);
        syslog(LOG_DAEMON | LOG_ERR, "Error on listen: %s", strerror(errno));
        return 1;
    }

    if (fcntl_add_flags(json_sockfd, O_NONBLOCK) != 0)
    {
        unlink(nDPIsrvd_options.json_sockpath);
        syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags for the collector socket: %s", strerror(errno));
        return 1;
    }

    if (fcntl_add_flags(serv_sockfd, O_NONBLOCK) != 0)
    {
        syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags for the distributor socket: %s", strerror(errno));
        return 1;
    }

    return 0;
}

static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, int remote_fd)
{
    if (remotes.desc_used == remotes.desc_size)
    {
        return NULL;
    }

    for (size_t i = 0; i < remotes.desc_size; ++i)
    {
        if (remotes.desc[i].fd == -1)
        {
            remotes.desc_used++;
            if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, NETWORK_BUFFER_MAX_SIZE) != 0)
            {
                return NULL;
            }
            remotes.desc[i].sock_type = type;
            remotes.desc[i].fd = remote_fd;
            return &remotes.desc[i];
        }
    }

    return NULL;
}

static int add_event(int epollfd, int fd, void * ptr)
{
    struct epoll_event event = {};

    if (ptr != NULL)
    {
        event.data.ptr = ptr;
    }
    else
    {
        event.data.fd = fd;
    }
    event.events = EPOLLIN;
    return epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
}

static int del_event(int epollfd, int fd)
{
    return epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
}

static void disconnect_client(int epollfd, struct remote_desc * const current)
{
    if (current->fd > -1)
    {
        del_event(epollfd, current->fd);
        if (close(current->fd) != 0)
        {
            syslog(LOG_DAEMON | LOG_ERR, "Error closing fd: %s", strerror(errno));
        }
        current->fd = -1;
        remotes.desc_used--;
    }
    nDPIsrvd_buffer_free(&current->buf);
}

static int nDPIsrvd_parse_options(int argc, char ** argv)
{
    int opt;

    while ((opt = getopt(argc, argv, "lc:dp:s:u:g:vh")) != -1)
    {
        switch (opt)
        {
            case 'l':
                nDPIsrvd_options.log_to_stderr = 1;
                break;
            case 'c':
                free(nDPIsrvd_options.json_sockpath);
                nDPIsrvd_options.json_sockpath = strdup(optarg);
                break;
            case 'd':
                daemonize_enable();
                break;
            case 'p':
                free(nDPIsrvd_options.pidfile);
                nDPIsrvd_options.pidfile = strdup(optarg);
                break;
            case 's':
                free(nDPIsrvd_options.serv_optarg);
                nDPIsrvd_options.serv_optarg = strdup(optarg);
                break;
            case 'u':
                free(nDPIsrvd_options.user);
                nDPIsrvd_options.user = strdup(optarg);
                break;
            case 'g':
                free(nDPIsrvd_options.group);
                nDPIsrvd_options.group = strdup(optarg);
                break;
            case 'v':
                fprintf(stderr, "%s", get_nDPId_version());
                return 1;
            case 'h':
            default:
                fprintf(stderr, "%s\n", get_nDPId_version());
                fprintf(stderr,
                        "Usage: %s [-l] [-c path-to-unix-sock] [-d] [-p pidfile]\n"
                        "\t[-s path-to-unix-socket|distributor-host:port] [-u user] [-g group]\n"
                        "\t[-v] [-h]\n",
                        argv[0]);
                return 1;
        }
    }

    if (nDPIsrvd_options.pidfile == NULL)
    {
        nDPIsrvd_options.pidfile = strdup(nDPIsrvd_PIDFILE);
    }
    if (is_path_absolute("Pidfile", nDPIsrvd_options.pidfile) != 0)
    {
        return 1;
    }

    if (nDPIsrvd_options.json_sockpath == NULL)
    {
        nDPIsrvd_options.json_sockpath = strdup(COLLECTOR_UNIX_SOCKET);
    }
    if (is_path_absolute("JSON socket", nDPIsrvd_options.json_sockpath) != 0)
    {
        return 1;
    }

    if (nDPIsrvd_options.serv_optarg == NULL)
    {
        nDPIsrvd_options.serv_optarg = strdup(DISTRIBUTOR_UNIX_SOCKET);
    }

    if (nDPIsrvd_setup_address(&serv_address, nDPIsrvd_options.serv_optarg) != 0)
    {
        fprintf(stderr, "%s: Could not parse address `%s'\n", argv[0], nDPIsrvd_options.serv_optarg);
        return 1;
    }
    if (serv_address.raw.sa_family == AF_UNIX && is_path_absolute("SERV socket", nDPIsrvd_options.serv_optarg) != 0)
    {
        return 1;
    }

    if (optind < argc)
    {
        fprintf(stderr, "%s: Unexpected argument after options\n", argv[0]);
        return 1;
    }

    return 0;
}

static struct remote_desc * accept_remote(int server_fd,
                                          enum sock_type socktype,
                                          struct sockaddr * const sockaddr,
                                          socklen_t * const addrlen)
{
    int client_fd = accept(server_fd, sockaddr, addrlen);
    if (client_fd < 0)
    {
        syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno));
        return NULL;
    }

    struct remote_desc * current = get_unused_remote_descriptor(socktype, client_fd);
    if (current == NULL)
    {
        syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used);
        return NULL;
    }

    return current;
}

static int new_connection(int epollfd, int eventfd)
{
    union {
        struct sockaddr_un event_json;
        struct sockaddr_un event_serv;
    } sockaddr;

    socklen_t peer_addr_len;
    enum sock_type stype;
    int server_fd;
    if (eventfd == json_sockfd)
    {
        peer_addr_len = sizeof(sockaddr.event_json);
        stype = JSON_SOCK;
        server_fd = json_sockfd;
    }
    else if (eventfd == serv_sockfd)
    {
        peer_addr_len = sizeof(sockaddr.event_serv);
        stype = SERV_SOCK;
        server_fd = serv_sockfd;
    }
    else
    {
        return 1;
    }

    struct remote_desc * current = accept_remote(server_fd, stype, (struct sockaddr *)&sockaddr, &peer_addr_len);
    if (current == NULL)
    {
        return 1;
    }

    char const * sock_type = NULL;
    switch (current->sock_type)
    {
        case JSON_SOCK:
            sock_type = "collector";
            current->event_json.json_bytes = 0;
            syslog(LOG_DAEMON, "New collector connection");
            break;
        case SERV_SOCK:
            sock_type = "distributor";
            if (inet_ntop(current->event_serv.peer.sin_family,
                          &current->event_serv.peer.sin_addr,
                          &current->event_serv.peer_addr[0],
                          sizeof(current->event_serv.peer_addr)) == NULL)
            {
                if (errno == EAFNOSUPPORT)
                {
                    syslog(LOG_DAEMON | LOG_ERR, "New distributor connection.");
                }
                else
                {
                    syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno));
                }
                current->event_serv.peer_addr[0] = '\0';
            }
            else
            {
                syslog(LOG_DAEMON,
                       "New distributor connection from %.*s:%u",
                       (int)sizeof(current->event_serv.peer_addr),
                       current->event_serv.peer_addr,
                       ntohs(current->event_serv.peer.sin_port));
            }
            break;
    }

    /* nonblocking fd is mandatory */
    if (fcntl_add_flags(current->fd, O_NONBLOCK) != 0)
    {
        syslog(LOG_DAEMON | LOG_ERR, "Error setting %s fd flags: %s", sock_type, strerror(errno));
        disconnect_client(epollfd, current);
        return 1;
    }

    /* shutdown writing end for collector clients */
    if (current->sock_type == JSON_SOCK)
    {
        shutdown(current->fd, SHUT_WR); // collector
        /* setup epoll event */
        if (add_event(epollfd, current->fd, current) != 0)
        {
            disconnect_client(epollfd, current);
            return 1;
        }
    }
    else
    {
        shutdown(current->fd, SHUT_RD); // distributor
    }

    return 0;
}

static int handle_collector_protocol(int epollfd, struct remote_desc * const current)
{
    char * json_str_start = NULL;

    if (current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
    {
        syslog(LOG_DAEMON | LOG_ERR,
               "BUG: JSON invalid opening character: '%c'",
               current->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]);
        disconnect_client(epollfd, current);
        return 1;
    }

    errno = 0;
    current->event_json.json_bytes = strtoull((char *)current->buf.ptr.text, &json_str_start, 10);
    current->event_json.json_bytes += json_str_start - current->buf.ptr.text;

    if (errno == ERANGE)
    {
        syslog(LOG_DAEMON | LOG_ERR, "BUG: Size of JSON exceeds limit");
        disconnect_client(epollfd, current);
        return 1;
    }

    if (json_str_start == current->buf.ptr.text)
    {
        syslog(LOG_DAEMON | LOG_ERR,
               "BUG: Missing size before JSON string: \"%.*s\"",
               NETWORK_BUFFER_LENGTH_DIGITS,
               current->buf.ptr.text);
        disconnect_client(epollfd, current);
        return 1;
    }

    if (current->event_json.json_bytes > current->buf.max)
    {
        syslog(LOG_DAEMON | LOG_ERR,
               "BUG: JSON string too big: %llu > %zu",
               current->event_json.json_bytes,
               current->buf.max);
        disconnect_client(epollfd, current);
        return 1;
    }

    if (current->event_json.json_bytes > current->buf.used)
    {
        return 1;
    }

    if (current->buf.ptr.text[current->event_json.json_bytes - 2] != '}' ||
        current->buf.ptr.text[current->event_json.json_bytes - 1] != '\n')
    {
        syslog(LOG_DAEMON | LOG_ERR,
               "BUG: Invalid JSON string: %.*s",
               (int)current->event_json.json_bytes,
               current->buf.ptr.text);
        disconnect_client(epollfd, current);
        return 1;
    }

    return 0;
}

static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
    if (current->sock_type != JSON_SOCK)
    {
        return 0;
    }

    /* read JSON strings (or parts) from the UNIX socket (collecting) */
    if (current->buf.used == current->buf.max)
    {
        syslog(LOG_DAEMON, "Collector read buffer full. No more read possible.");
    }
    else
    {
        errno = 0;
        ssize_t bytes_read =
            read(current->fd, current->buf.ptr.raw + current->buf.used, current->buf.max - current->buf.used);
        if (bytes_read < 0 || errno != 0)
        {
            disconnect_client(epollfd, current);
            return 1;
        }
        if (bytes_read == 0)
        {
            syslog(LOG_DAEMON, "Collector connection closed during read");
            disconnect_client(epollfd, current);
            return 1;
        }
        current->buf.used += bytes_read;
    }

    while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
    {
        if (handle_collector_protocol(epollfd, current) != 0)
        {
            break;
        }

        for (size_t i = 0; i < remotes.desc_size; ++i)
        {
            if (remotes.desc[i].fd < 0)
            {
                continue;
            }
            if (remotes.desc[i].sock_type != SERV_SOCK)
            {
                continue;
            }
            if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used)
            {
                syslog(LOG_DAEMON | LOG_ERR,
                       "Buffer capacity threshold (%zu of max %zu bytes) reached, "
                       "falling back to blocking mode.",
                       remotes.desc[i].buf.used,
                       remotes.desc[i].buf.max);
                /*
                 * FIXME: Maybe switch to a Multithreading distributor data transmission,
                 *        so that we do not have to switch back to blocking mode here!
                 * NOTE: If *one* distributer peer is too slow, all other distributors are
                 *       affected by this. This causes starvation and leads to a possible data loss on
                 *       the nDPId collector side.
                 */
                if (fcntl_del_flags(remotes.desc[i].fd, O_NONBLOCK) != 0)
                {
                    syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno));
                    disconnect_client(epollfd, &remotes.desc[i]);
                    continue;
                }
                if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used) !=
                    (ssize_t)remotes.desc[i].buf.used)
                {
                    syslog(LOG_DAEMON | LOG_ERR,
                           "Could not drain buffer by %zu bytes. (forced)",
                           remotes.desc[i].buf.used);
                    disconnect_client(epollfd, &remotes.desc[i]);
                    continue;
                }
                remotes.desc[i].buf.used = 0;
                if (fcntl_add_flags(remotes.desc[i].fd, O_NONBLOCK) != 0)
                {
                    syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno));
                    disconnect_client(epollfd, &remotes.desc[i]);
                    continue;
                }
            }

            memcpy(remotes.desc[i].buf.ptr.raw + remotes.desc[i].buf.used,
                   current->buf.ptr.raw,
                   current->event_json.json_bytes);
            remotes.desc[i].buf.used += current->event_json.json_bytes;

            errno = 0;
            ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr.raw, remotes.desc[i].buf.used);
            if (errno == EAGAIN)
            {
                continue;
            }
            if (bytes_written < 0 || errno != 0)
            {
                if (remotes.desc[i].event_serv.peer_addr[0] == '\0')
                {
                    syslog(LOG_DAEMON | LOG_ERR, "Distributor connection closed, send failed: %s", strerror(errno));
                }
                else
                {
                    syslog(LOG_DAEMON | LOG_ERR,
                           "Distributor connection to %.*s:%u closed, send failed: %s",
                           (int)sizeof(remotes.desc[i].event_serv.peer_addr),
                           remotes.desc[i].event_serv.peer_addr,
                           ntohs(remotes.desc[i].event_serv.peer.sin_port),
                           strerror(errno));
                }
                disconnect_client(epollfd, &remotes.desc[i]);
                continue;
            }
            if (bytes_written == 0)
            {
                syslog(LOG_DAEMON,
                       "Distributor connection to %.*s:%u closed during write",
                       (int)sizeof(remotes.desc[i].event_serv.peer_addr),
                       remotes.desc[i].event_serv.peer_addr,
                       ntohs(remotes.desc[i].event_serv.peer.sin_port));
                disconnect_client(epollfd, &remotes.desc[i]);
                continue;
            }
            if ((size_t)bytes_written < remotes.desc[i].buf.used)
            {
                syslog(LOG_DAEMON,
                       "Distributor wrote less than expected to %.*s:%u: %zd < %zu",
                       (int)sizeof(remotes.desc[i].event_serv.peer_addr),
                       remotes.desc[i].event_serv.peer_addr,
                       ntohs(remotes.desc[i].event_serv.peer.sin_port),
                       bytes_written,
                       remotes.desc[i].buf.used);
                memmove(remotes.desc[i].buf.ptr.raw,
                        remotes.desc[i].buf.ptr.raw + bytes_written,
                        remotes.desc[i].buf.used - bytes_written);
                remotes.desc[i].buf.used -= bytes_written;
                continue;
            }

            remotes.desc[i].buf.used = 0;
        }

        memmove(current->buf.ptr.raw,
                current->buf.ptr.raw + current->event_json.json_bytes,
                current->buf.used - current->event_json.json_bytes);
        current->buf.used -= current->event_json.json_bytes;
        current->event_json.json_bytes = 0;
    }

    return 0;
}

static int handle_incoming_data_event(int epollfd, struct epoll_event * const event)
{
    struct remote_desc * current = (struct remote_desc *)event->data.ptr;

    if ((event->events & EPOLLIN) == 0)
    {
        return 1;
    }

    if (current == NULL)
    {
        syslog(LOG_DAEMON | LOG_ERR, "%s", "remote descriptor got from event data invalid");
        return 1;
    }

    if (current->fd < 0)
    {
        syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd);
        return 1;
    }

    return handle_incoming_data(epollfd, current);
}

static int setup_signalfd(int epollfd)
{
    sigset_t mask;
    int sfd;

    sigemptyset(&mask);
    sigaddset(&mask, SIGINT);
    sigaddset(&mask, SIGTERM);
    sigaddset(&mask, SIGQUIT);

    if (sigprocmask(SIG_BLOCK, &mask, NULL) == -1)
    {
        return -1;
    }
    sfd = signalfd(-1, &mask, 0);
    if (sfd == -1)
    {
        return -1;
    }

    if (add_event(epollfd, sfd, NULL) != 0)
    {
        return -1;
    }

    if (fcntl_add_flags(sfd, O_NONBLOCK) != 0)
    {
        return -1;
    }

    return sfd;
}

static int mainloop(int epollfd)
{
    struct epoll_event events[32];
    size_t const events_size = sizeof(events) / sizeof(events[0]);
    int signalfd = setup_signalfd(epollfd);

    while (nDPIsrvd_main_thread_shutdown == 0)
    {
        int nready = epoll_wait(epollfd, events, events_size, -1);

        for (int i = 0; i < nready; i++)
        {
            if (events[i].events & EPOLLERR)
            {
                syslog(LOG_DAEMON | LOG_ERR,
                       "Epoll event error: %s",
                       (errno != 0 ? strerror(errno) : "Client disconnected"));
                if (events[i].data.fd != json_sockfd && events[i].data.fd != serv_sockfd)
                {
                    struct remote_desc * current = (struct remote_desc *)events[i].data.ptr;
                    disconnect_client(epollfd, current);
                }
                continue;
            }

            if (events[i].data.fd == json_sockfd || events[i].data.fd == serv_sockfd)
            {
                /* New connection to collector / distributor. */
                if (new_connection(epollfd, events[i].data.fd) != 0)
                {
                    continue;
                }
            }
            else if (events[i].data.fd == signalfd)
            {
                struct signalfd_siginfo fdsi;
                ssize_t s;

                s = read(signalfd, &fdsi, sizeof(struct signalfd_siginfo));
                if (s != sizeof(struct signalfd_siginfo))
                {
                    syslog(LOG_DAEMON | LOG_ERR,
                           "Invalid signal fd read size. Got %zd, wanted %zu bytes.",
                           s,
                           sizeof(struct signalfd_siginfo));
                    continue;
                }

                if (fdsi.ssi_signo == SIGINT || fdsi.ssi_signo == SIGTERM || fdsi.ssi_signo == SIGQUIT)
                {
                    nDPIsrvd_main_thread_shutdown = 1;
                    break;
                }
            }
            else
            {
                /* Incoming data. */
                if (handle_incoming_data_event(epollfd, &events[i]) != 0)
                {
                    continue;
                }
            }
        }
    }

    close(signalfd);

    return 0;
}

static int create_evq(void)
{
    return epoll_create1(EPOLL_CLOEXEC);
}

static int setup_event_queue(void)
{
    int epollfd = create_evq();
    if (epollfd < 0)
    {
        syslog(LOG_DAEMON | LOG_ERR, "Error creating epoll: %s", strerror(errno));
        return -1;
    }

    if (add_event(epollfd, json_sockfd, NULL) != 0)
    {
        syslog(LOG_DAEMON | LOG_ERR, "Error adding JSON fd to epoll: %s", strerror(errno));
        return -1;
    }

    if (add_event(epollfd, serv_sockfd, NULL) != 0)
    {
        syslog(LOG_DAEMON | LOG_ERR, "Error adding SERV fd to epoll: %s", strerror(errno));
        return -1;
    }

    return epollfd;
}

static void close_event_queue(int epollfd)
{
    for (size_t i = 0; i < remotes.desc_size; ++i)
    {
        disconnect_client(epollfd, &remotes.desc[i]);
    }
    close(epollfd);
}

static int setup_remote_descriptors(size_t max_descriptors)
{
    remotes.desc_used = 0;
    remotes.desc_size = max_descriptors;
    remotes.desc = (struct remote_desc *)calloc(remotes.desc_size, sizeof(*remotes.desc));
    if (remotes.desc == NULL)
    {
        return -1;
    }
    for (size_t i = 0; i < remotes.desc_size; ++i)
    {
        remotes.desc[i].fd = -1;
    }

    return 0;
}

#ifndef NO_MAIN
int main(int argc, char ** argv)
{
    int retval = 1;
    int epollfd;

    if (argc == 0)
    {
        return 1;
    }

    if (nDPIsrvd_parse_options(argc, argv) != 0)
    {
        return 1;
    }

    openlog("nDPIsrvd", LOG_CONS | LOG_PERROR, LOG_DAEMON);

    if (access(nDPIsrvd_options.json_sockpath, F_OK) == 0)
    {
        syslog(LOG_DAEMON | LOG_ERR,
               "UNIX socket %s exists; nDPIsrvd already running? "
               "Please remove the socket manually or change socket path.",
               nDPIsrvd_options.json_sockpath);
        return 1;
    }

    if (daemonize_with_pidfile(nDPIsrvd_options.pidfile) != 0)
    {
        goto error;
    }
    closelog();
    openlog("nDPIsrvd", LOG_CONS | (nDPIsrvd_options.log_to_stderr != 0 ? LOG_PERROR : 0), LOG_DAEMON);

    if (setup_remote_descriptors(32) != 0)
    {
        goto error;
    }

    if (create_listen_sockets() != 0)
    {
        goto error;
    }

    syslog(LOG_DAEMON, "collector listen on %s", nDPIsrvd_options.json_sockpath);
    syslog(LOG_DAEMON, "distributor listen on %s", nDPIsrvd_options.serv_optarg);
    switch (serv_address.raw.sa_family)
    {
        default:
            goto error;
        case AF_INET:
        case AF_INET6:
            syslog(LOG_DAEMON | LOG_ERR,
                   "Please keep in mind that using a TCP Socket may leak sensitive information to "
                   "everyone with access to the device/network. You've been warned!");
            break;
        case AF_UNIX:
            break;
    }

    errno = 0;
    if (change_user_group(nDPIsrvd_options.user,
                          nDPIsrvd_options.group,
                          nDPIsrvd_options.pidfile,
                          nDPIsrvd_options.json_sockpath,
                          (serv_address.raw.sa_family == AF_UNIX ? nDPIsrvd_options.serv_optarg : NULL)) != 0)
    {
        if (errno != 0)
        {
            syslog(LOG_DAEMON | LOG_ERR, "Change user/group failed: %s", strerror(errno));
        }
        else
        {
            syslog(LOG_DAEMON | LOG_ERR, "Change user/group failed.");
        }
        goto error;
    }

    signal(SIGPIPE, SIG_IGN);

    signal(SIGINT, SIG_IGN);
    signal(SIGTERM, SIG_IGN);
    signal(SIGQUIT, SIG_IGN);

    epollfd = setup_event_queue();
    if (epollfd < 0)
    {
        goto error;
    }

    retval = mainloop(epollfd);
    close_event_queue(epollfd);
error:
    close(json_sockfd);
    close(serv_sockfd);

    daemonize_shutdown(nDPIsrvd_options.pidfile);
    syslog(LOG_DAEMON | LOG_NOTICE, "Bye.");
    closelog();

    unlink(nDPIsrvd_options.json_sockpath);
    unlink(nDPIsrvd_options.serv_optarg);

    return retval;
}
#endif