summaryrefslogtreecommitdiff
path: root/nDPIsrvd.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-03-23 14:16:17 +0100
committerToni Uhlig <matzeton@googlemail.com>2021-03-23 14:25:56 +0100
commitbdc8c5df2a25f2b626e8f5d6672d11f3ac35a694 (patch)
treeea33c49a947fdd6ef73910ef03cdb3a7b8575389 /nDPIsrvd.c
parentc68c1750ba4b00096efab74829f5be7408261eed (diff)
Reduced code duplication. Preps for nDPId-test.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r--nDPIsrvd.c552
1 files changed, 350 insertions, 202 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 0d51bce00..27e68ecf2 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -9,6 +9,7 @@
#include <string.h>
#include <syslog.h>
#include <sys/epoll.h>
+#include <sys/signalfd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
@@ -71,6 +72,30 @@ static int serv_sockfd;
static char * user = NULL;
static char * group = NULL;
+static int fcntl_add_flags(int fd, int flags)
+{
+ int cur_flags = fcntl(fd, F_GETFL, 0);
+
+ if (cur_flags == -1)
+ {
+ return 1;
+ }
+
+ return fcntl(fd, F_SETFL, cur_flags | flags);
+}
+
+static int fcntl_del_flags(int fd, int flags)
+{
+ int cur_flags = fcntl(fd, F_GETFL, 0);
+
+ if (cur_flags == -1)
+ {
+ return 1;
+ }
+
+ return fcntl(fd, F_SETFL, cur_flags & ~flags);
+}
+
static int create_listen_sockets(void)
{
json_sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
@@ -121,26 +146,23 @@ static int create_listen_sockets(void)
return 1;
}
- int json_flags = fcntl(json_sockfd, F_GETFL, 0);
- int serv_flags = fcntl(serv_sockfd, F_GETFL, 0);
- if (json_flags == -1 || serv_flags == -1)
+ if (fcntl_add_flags(json_sockfd, O_NONBLOCK) != 0)
{
unlink(json_sockpath);
- syslog(LOG_DAEMON | LOG_ERR, "Error getting fd flags: %s", strerror(errno));
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags for the collector socket: %s", strerror(errno));
return 1;
}
- if (fcntl(json_sockfd, F_SETFL, json_flags | O_NONBLOCK) == -1 ||
- fcntl(serv_sockfd, F_SETFL, serv_flags | O_NONBLOCK) == -1)
+
+ if (fcntl_add_flags(serv_sockfd, O_NONBLOCK) != 0)
{
- unlink(json_sockpath);
- syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags for the distributor socket: %s", strerror(errno));
return 1;
}
return 0;
}
-static struct remote_desc * get_unused_remote_descriptor(void)
+static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, int remote_fd)
{
if (remotes.desc_used == remotes.desc_size)
{
@@ -155,6 +177,8 @@ static struct remote_desc * get_unused_remote_descriptor(void)
remotes.desc[i].buf.ptr = (uint8_t *)malloc(NETWORK_BUFFER_MAX_SIZE);
remotes.desc[i].buf.max = NETWORK_BUFFER_MAX_SIZE;
remotes.desc[i].buf.used = 0;
+ remotes.desc[i].sock_type = type;
+ remotes.desc[i].fd = remote_fd;
return &remotes.desc[i];
}
}
@@ -162,35 +186,41 @@ static struct remote_desc * get_unused_remote_descriptor(void)
return NULL;
}
+static int add_event(int epollfd, int fd, void * ptr)
+{
+ struct epoll_event event = {};
+
+ if (ptr != NULL)
+ {
+ event.data.ptr = ptr;
+ }
+ else
+ {
+ event.data.fd = fd;
+ }
+ event.events = EPOLLIN;
+ return epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
+}
+
+static int del_event(int epollfd, int fd)
+{
+ return epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
+}
+
static void disconnect_client(int epollfd, struct remote_desc * const current)
{
if (current->fd > -1)
{
- epoll_ctl(epollfd, EPOLL_CTL_DEL, current->fd, NULL);
+ del_event(epollfd, current->fd);
if (close(current->fd) != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error closing fd: %s", strerror(errno));
}
+ current->fd = -1;
+ remotes.desc_used--;
}
free(current->buf.ptr);
current->buf.ptr = NULL;
- current->fd = -1;
- remotes.desc_used--;
-}
-
-static void sighandler(int signum)
-{
- syslog(LOG_DAEMON | LOG_NOTICE, "Received SIGNAL %d", signum);
-
- if (main_thread_shutdown == 0)
- {
- syslog(LOG_DAEMON | LOG_NOTICE, "Shutting down ..");
- main_thread_shutdown = 1;
- }
- else
- {
- syslog(LOG_DAEMON | LOG_NOTICE, "Reader threads are already shutting down, please be patient.");
- }
}
static int parse_options(int argc, char ** argv)
@@ -268,37 +298,55 @@ static int parse_options(int argc, char ** argv)
static int new_connection(int epollfd, int eventfd)
{
- struct remote_desc * current = get_unused_remote_descriptor();
+ union {
+ struct sockaddr_un event_json;
+ struct sockaddr_un event_serv;
+ } sockaddr;
- if (current == NULL)
+ socklen_t peer_addr_len;
+ enum sock_type stype;
+ int server_fd;
+ if (eventfd == json_sockfd)
+ {
+ peer_addr_len = sizeof(sockaddr.event_json);
+ stype = JSON_SOCK;
+ server_fd = json_sockfd;
+ }
+ else if (eventfd == serv_sockfd)
+ {
+ peer_addr_len = sizeof(sockaddr.event_serv);
+ stype = SERV_SOCK;
+ server_fd = serv_sockfd;
+ }
+ else
{
- syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used);
return 1;
}
- current->sock_type = (eventfd == json_sockfd ? JSON_SOCK : SERV_SOCK);
- int sockfd = (current->sock_type == JSON_SOCK ? json_sockfd : serv_sockfd);
- socklen_t peer_addr_len =
- (current->sock_type == JSON_SOCK ? sizeof(current->event_json.peer) : sizeof(current->event_serv.peer));
-
- current->fd = accept(sockfd,
- (current->sock_type == JSON_SOCK ? (struct sockaddr *)&current->event_json.peer
- : (struct sockaddr *)&current->event_serv.peer),
- &peer_addr_len);
- if (current->fd < 0)
+ int client_fd = accept(server_fd, (struct sockaddr *)&sockaddr, &peer_addr_len);
+ if (client_fd < 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno));
- disconnect_client(epollfd, current);
return 1;
}
+ struct remote_desc * current = get_unused_remote_descriptor(stype, client_fd);
+ if (current == NULL)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used);
+ return 1;
+ }
+
+ char const * sock_type = NULL;
switch (current->sock_type)
{
case JSON_SOCK:
+ sock_type = "collector";
current->event_json.json_bytes = 0;
syslog(LOG_DAEMON, "New collector connection");
break;
case SERV_SOCK:
+ sock_type = "distributor";
if (inet_ntop(current->event_serv.peer.sin_family,
&current->event_serv.peer.sin_addr,
&current->event_serv.peer_addr[0],
@@ -326,10 +374,9 @@ static int new_connection(int epollfd, int eventfd)
}
/* nonblocking fd is mandatory */
- int fd_flags = fcntl(current->fd, F_GETFL, 0);
- if (fd_flags == -1 || fcntl(current->fd, F_SETFL, fd_flags | O_NONBLOCK) == -1)
+ if (fcntl_add_flags(current->fd, O_NONBLOCK) != 0)
{
- syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting %s fd flags: %s", sock_type, strerror(errno));
disconnect_client(epollfd, current);
return 1;
}
@@ -339,10 +386,7 @@ static int new_connection(int epollfd, int eventfd)
{
shutdown(current->fd, SHUT_WR); // collector
/* setup epoll event */
- struct epoll_event accept_event = {};
- accept_event.data.ptr = current;
- accept_event.events = EPOLLIN;
- if (epoll_ctl(epollfd, EPOLL_CTL_ADD, current->fd, &accept_event) < 0)
+ if (add_event(epollfd, current->fd, current) != 0)
{
disconnect_client(epollfd, current);
return 1;
@@ -419,171 +463,222 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
return 0;
}
-static int handle_incoming_data(int epollfd, struct epoll_event * event)
+static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
- struct remote_desc * current = (struct remote_desc *)event->data.ptr;
-
- if (current->fd < 0)
+ if (current->sock_type != JSON_SOCK)
{
- syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd);
- return 1;
+ return 0;
}
- if (event->events & EPOLLIN && current->sock_type == JSON_SOCK)
+ /* read JSON strings (or parts) from the UNIX socket (collecting) */
+ if (current->buf.used == current->buf.max)
+ {
+ syslog(LOG_DAEMON, "Collector read buffer full. No more read possible.");
+ }
+ else
{
- /* read JSON strings (or parts) from the UNIX socket (collecting) */
- if (current->buf.used == current->buf.max)
+ errno = 0;
+ ssize_t bytes_read =
+ read(current->fd, current->buf.ptr + current->buf.used, current->buf.max - current->buf.used);
+ if (bytes_read < 0 || errno != 0)
{
- syslog(LOG_DAEMON, "Collector read buffer full. No more read possible.");
+ disconnect_client(epollfd, current);
+ return 1;
}
- else
+ if (bytes_read == 0)
{
- errno = 0;
- ssize_t bytes_read =
- read(current->fd, current->buf.ptr + current->buf.used, current->buf.max - current->buf.used);
- if (bytes_read < 0 || errno != 0)
- {
- disconnect_client(epollfd, current);
- return 1;
- }
- if (bytes_read == 0)
- {
- syslog(LOG_DAEMON, "Collector connection closed during read");
- disconnect_client(epollfd, current);
- return 1;
- }
- current->buf.used += bytes_read;
+ syslog(LOG_DAEMON, "Collector connection closed during read");
+ disconnect_client(epollfd, current);
+ return 1;
+ }
+ current->buf.used += bytes_read;
+ }
+
+ while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
+ {
+ if (handle_collector_protocol(epollfd, current) != 0)
+ {
+ break;
}
- while (current->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
+ for (size_t i = 0; i < remotes.desc_size; ++i)
{
- if (handle_collector_protocol(epollfd, current) != 0)
+ if (remotes.desc[i].fd < 0)
{
- break;
+ continue;
}
-
- for (size_t i = 0; i < remotes.desc_size; ++i)
+ if (remotes.desc[i].sock_type != SERV_SOCK)
{
- if (remotes.desc[i].fd < 0)
- {
- continue;
- }
- if (remotes.desc[i].sock_type != SERV_SOCK)
+ continue;
+ }
+ if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Buffer capacity threshold (%zu of max %zu bytes) reached, "
+ "falling back to blocking mode.",
+ remotes.desc[i].buf.used,
+ remotes.desc[i].buf.max);
+ /*
+ * FIXME: Maybe switch to a Multithreading distributor data transmission,
+ * so that we do not have to switch back to blocking mode here!
+ * NOTE: If *one* distributer peer is too slow, all other distributors are
+ * affected by this. This causes starvation and leads to a possible data loss on
+ * the nDPId collector side.
+ */
+ if (fcntl_del_flags(remotes.desc[i].fd, O_NONBLOCK) != 0)
{
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno));
+ disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
- if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used)
+ if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used) !=
+ (ssize_t)remotes.desc[i].buf.used)
{
syslog(LOG_DAEMON | LOG_ERR,
- "Buffer capacity threshold (%zu of max %zu bytes) reached, "
- "falling back to blocking mode.",
- remotes.desc[i].buf.used,
- remotes.desc[i].buf.max);
- /*
- * FIXME: Maybe switch to a Multithreading distributor data transmission,
- * so that we do not have to switch back to blocking mode here!
- * NOTE: If *one* distributer peer is too slow, all other distributors are
- * affected by this. This causes starvation and leads to a possible data loss on
- * the nDPId collector side.
- */
- int fd_flags = fcntl(remotes.desc[i].fd, F_GETFL, 0);
- if (fd_flags == -1 || fcntl(remotes.desc[i].fd, F_SETFL, fd_flags & ~O_NONBLOCK) == -1)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
- }
- if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used) !=
- (ssize_t)remotes.desc[i].buf.used)
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "Could not drain buffer by %zu bytes. (forced)",
- remotes.desc[i].buf.used);
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
- }
- remotes.desc[i].buf.used = 0;
- if (fcntl(remotes.desc[i].fd, F_SETFL, fd_flags) == -1)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
- }
- }
-
- memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used,
- current->buf.ptr,
- current->event_json.json_bytes);
- remotes.desc[i].buf.used += current->event_json.json_bytes;
-
- errno = 0;
- ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used);
- if (errno == EAGAIN)
- {
+ "Could not drain buffer by %zu bytes. (forced)",
+ remotes.desc[i].buf.used);
+ disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
- if (bytes_written < 0 || errno != 0)
+ remotes.desc[i].buf.used = 0;
+ if (fcntl_add_flags(remotes.desc[i].fd, O_NONBLOCK) != 0)
{
- if (remotes.desc[i].event_serv.peer_addr[0] == '\0')
- {
- syslog(LOG_DAEMON | LOG_ERR, "Distributor connection closed, send failed: %s", strerror(errno));
- }
- else
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "Distributor connection to %.*s:%u closed, send failed: %s",
- (int)sizeof(remotes.desc[i].event_serv.peer_addr),
- remotes.desc[i].event_serv.peer_addr,
- ntohs(remotes.desc[i].event_serv.peer.sin_port),
- strerror(errno));
- }
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting distributor fd flags: %s", strerror(errno));
disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
- if (bytes_written == 0)
+ }
+
+ memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used,
+ current->buf.ptr,
+ current->event_json.json_bytes);
+ remotes.desc[i].buf.used += current->event_json.json_bytes;
+
+ errno = 0;
+ ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used);
+ if (errno == EAGAIN)
+ {
+ continue;
+ }
+ if (bytes_written < 0 || errno != 0)
+ {
+ if (remotes.desc[i].event_serv.peer_addr[0] == '\0')
{
- syslog(LOG_DAEMON,
- "Distributor connection to %.*s:%u closed during write",
- (int)sizeof(remotes.desc[i].event_serv.peer_addr),
- remotes.desc[i].event_serv.peer_addr,
- ntohs(remotes.desc[i].event_serv.peer.sin_port));
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
+ syslog(LOG_DAEMON | LOG_ERR, "Distributor connection closed, send failed: %s", strerror(errno));
}
- if ((size_t)bytes_written < remotes.desc[i].buf.used)
+ else
{
- syslog(LOG_DAEMON,
- "Distributor wrote less than expected to %.*s:%u: %zd < %zu",
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Distributor connection to %.*s:%u closed, send failed: %s",
(int)sizeof(remotes.desc[i].event_serv.peer_addr),
remotes.desc[i].event_serv.peer_addr,
ntohs(remotes.desc[i].event_serv.peer.sin_port),
- bytes_written,
- remotes.desc[i].buf.used);
- memmove(remotes.desc[i].buf.ptr,
- remotes.desc[i].buf.ptr + bytes_written,
- remotes.desc[i].buf.used - bytes_written);
- remotes.desc[i].buf.used -= bytes_written;
- continue;
+ strerror(errno));
}
-
- remotes.desc[i].buf.used = 0;
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ if (bytes_written == 0)
+ {
+ syslog(LOG_DAEMON,
+ "Distributor connection to %.*s:%u closed during write",
+ (int)sizeof(remotes.desc[i].event_serv.peer_addr),
+ remotes.desc[i].event_serv.peer_addr,
+ ntohs(remotes.desc[i].event_serv.peer.sin_port));
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ if ((size_t)bytes_written < remotes.desc[i].buf.used)
+ {
+ syslog(LOG_DAEMON,
+ "Distributor wrote less than expected to %.*s:%u: %zd < %zu",
+ (int)sizeof(remotes.desc[i].event_serv.peer_addr),
+ remotes.desc[i].event_serv.peer_addr,
+ ntohs(remotes.desc[i].event_serv.peer.sin_port),
+ bytes_written,
+ remotes.desc[i].buf.used);
+ memmove(remotes.desc[i].buf.ptr,
+ remotes.desc[i].buf.ptr + bytes_written,
+ remotes.desc[i].buf.used - bytes_written);
+ remotes.desc[i].buf.used -= bytes_written;
+ continue;
}
- memmove(current->buf.ptr,
- current->buf.ptr + current->event_json.json_bytes,
- current->buf.used - current->event_json.json_bytes);
- current->buf.used -= current->event_json.json_bytes;
- current->event_json.json_bytes = 0;
+ remotes.desc[i].buf.used = 0;
}
+
+ memmove(current->buf.ptr,
+ current->buf.ptr + current->event_json.json_bytes,
+ current->buf.used - current->event_json.json_bytes);
+ current->buf.used -= current->event_json.json_bytes;
+ current->event_json.json_bytes = 0;
}
return 0;
}
+static int handle_incoming_data_event(int epollfd, struct epoll_event * const event)
+{
+ struct remote_desc * current = (struct remote_desc *)event->data.ptr;
+
+ if (current == NULL)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "%s", "remote descriptor got from event data invalid");
+ return 1;
+ }
+
+ if (current->fd < 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd);
+ return 1;
+ }
+
+ if ((event->events & EPOLLIN) == 0)
+ {
+ return 1;
+ }
+
+ return handle_incoming_data(epollfd, current);
+}
+
+static int setup_signalfd(int epollfd)
+{
+ sigset_t mask;
+ int sfd;
+
+ sigemptyset(&mask);
+ sigaddset(&mask, SIGINT);
+ sigaddset(&mask, SIGTERM);
+ sigaddset(&mask, SIGQUIT);
+
+ if (sigprocmask(SIG_BLOCK, &mask, NULL) == -1)
+ {
+ return -1;
+ }
+ sfd = signalfd(-1, &mask, 0);
+ if (sfd == -1)
+ {
+ return -1;
+ }
+
+ if (add_event(epollfd, sfd, NULL) != 0)
+ {
+ return -1;
+ }
+
+ if (fcntl_add_flags(sfd, O_NONBLOCK) != 0)
+ {
+ return -1;
+ }
+
+ return sfd;
+}
+
static int mainloop(int epollfd)
{
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
+ int signalfd = setup_signalfd(epollfd);
while (main_thread_shutdown == 0)
{
@@ -612,10 +707,31 @@ static int mainloop(int epollfd)
continue;
}
}
+ else if (events[i].data.fd == signalfd)
+ {
+ struct signalfd_siginfo fdsi;
+ ssize_t s;
+
+ s = read(signalfd, &fdsi, sizeof(struct signalfd_siginfo));
+ if (s != sizeof(struct signalfd_siginfo))
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Invalid signal fd read size. Got %zd, wanted %zu bytes.",
+ s,
+ sizeof(struct signalfd_siginfo));
+ continue;
+ }
+
+ if (fdsi.ssi_signo == SIGINT || fdsi.ssi_signo == SIGTERM || fdsi.ssi_signo == SIGQUIT)
+ {
+ main_thread_shutdown = 1;
+ break;
+ }
+ }
else
{
/* Incoming data. */
- if (handle_incoming_data(epollfd, &events[i]) != 0)
+ if (handle_incoming_data_event(epollfd, &events[i]) != 0)
{
continue;
}
@@ -623,9 +739,69 @@ static int mainloop(int epollfd)
}
}
+ close(signalfd);
+
return 0;
}
+static int create_evq(void)
+{
+ return epoll_create1(EPOLL_CLOEXEC);
+}
+
+static int setup_event_queue(void)
+{
+ int epollfd = create_evq();
+ if (epollfd < 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error creating epoll: %s", strerror(errno));
+ return -1;
+ }
+
+ if (add_event(epollfd, json_sockfd, NULL) != 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error adding JSON fd to epoll: %s", strerror(errno));
+ return -1;
+ }
+
+ if (add_event(epollfd, serv_sockfd, NULL) != 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error adding SERV fd to epoll: %s", strerror(errno));
+ return -1;
+ }
+
+ return epollfd;
+}
+
+static void close_event_queue(int epollfd)
+{
+ for (size_t i = 0; i < remotes.desc_size; ++i)
+ {
+ disconnect_client(epollfd, &remotes.desc[i]);
+ }
+ close(epollfd);
+}
+
+static int setup_remote_descriptors(size_t max_descriptors)
+{
+ remotes.desc_used = 0;
+ remotes.desc_size = max_descriptors;
+ remotes.desc = (struct remote_desc *)malloc(remotes.desc_size * sizeof(*remotes.desc));
+ if (remotes.desc == NULL)
+ {
+ return -1;
+ }
+ for (size_t i = 0; i < remotes.desc_size; ++i)
+ {
+ remotes.desc[i].fd = -1;
+ remotes.desc[i].buf.ptr = NULL;
+ remotes.desc[i].buf.max = 0;
+ }
+
+ return 0;
+}
+
+#ifndef NO_MAIN
int main(int argc, char ** argv)
{
int retval = 1;
@@ -658,19 +834,10 @@ int main(int argc, char ** argv)
closelog();
openlog("nDPIsrvd", LOG_CONS | (log_to_stderr != 0 ? LOG_PERROR : 0), LOG_DAEMON);
- remotes.desc_used = 0;
- remotes.desc_size = 32;
- remotes.desc = (struct remote_desc *)malloc(remotes.desc_size * sizeof(*remotes.desc));
- if (remotes.desc == NULL)
+ if (setup_remote_descriptors(32) != 0)
{
goto error;
}
- for (size_t i = 0; i < remotes.desc_size; ++i)
- {
- remotes.desc[i].fd = -1;
- remotes.desc[i].buf.ptr = NULL;
- remotes.desc[i].buf.max = 0;
- }
if (create_listen_sockets() != 0)
{
@@ -708,36 +875,16 @@ int main(int argc, char ** argv)
goto error;
}
- signal(SIGINT, sighandler);
- signal(SIGTERM, sighandler);
signal(SIGPIPE, SIG_IGN);
- int epollfd = epoll_create1(0);
+ int epollfd = setup_event_queue();
if (epollfd < 0)
{
- syslog(LOG_DAEMON | LOG_ERR, "Error creating epoll: %s", strerror(errno));
goto error;
}
- {
- struct epoll_event accept_event = {.data.fd = json_sockfd, .events = EPOLLIN};
- if (epoll_ctl(epollfd, EPOLL_CTL_ADD, json_sockfd, &accept_event) < 0)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Error adding JSON fd to epoll: %s", strerror(errno));
- goto error;
- }
- }
-
- {
- struct epoll_event accept_event = {.data.fd = serv_sockfd, .events = EPOLLIN};
- if (epoll_ctl(epollfd, EPOLL_CTL_ADD, serv_sockfd, &accept_event) < 0)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Error adding INET fd to epoll: %s", strerror(errno));
- goto error;
- }
- }
-
retval = mainloop(epollfd);
+ close_event_queue(epollfd);
error:
close(json_sockfd);
close(serv_sockfd);
@@ -751,3 +898,4 @@ error:
return retval;
}
+#endif