aboutsummaryrefslogtreecommitdiff
path: root/nDPIsrvd.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2020-08-03 16:27:14 +0200
committerToni Uhlig <matzeton@googlemail.com>2020-08-03 16:27:14 +0200
commit92925a83552299a9462566248dee3e16a57434a6 (patch)
tree2467ea470f5183a75770b4f2ce0c3058d7c4ab87 /nDPIsrvd.c
parent536a1c03a55cc30e41dd12041238ec75706a8998 (diff)
remote connection tracking/ event-handling for collector(UNIX) and distributor(TCP) connections
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r--nDPIsrvd.c152
1 files changed, 133 insertions, 19 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 0c35100eb..b940a4fea 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -15,8 +15,11 @@
enum ev_type { JSON_SOCK, SERV_SOCK };
-struct event {
+struct remote_desc {
enum ev_type type;
+ int fd;
+ uint8_t buf[BUFSIZ];
+ size_t buf_used;
union {
struct {
int json_sockfd;
@@ -29,6 +32,12 @@ struct event {
};
};
+static struct remotes {
+ struct remote_desc * desc;
+ size_t desc_size;
+ size_t desc_used;
+} remotes = {NULL, 0, 0};
+
static char json_sockpath[UNIX_PATH_MAX] = "/tmp/ndpid-collector.sock";
static char serv_listen_addr[INET6_ADDRSTRLEN] = "127.0.0.1";
static uint16_t serv_listen_port = 7000;
@@ -45,8 +54,6 @@ static int create_listen_sockets(void)
return 1;
}
- // This helps avoid spurious EADDRINUSE when the previous instance of this
- // server died.
int opt = 1;
if (setsockopt(json_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 ||
setsockopt(serv_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
@@ -109,10 +116,57 @@ static int create_listen_sockets(void)
return 0;
}
+static struct remote_desc * get_unused_remote_descriptor(void)
+{
+ if (remotes.desc_used == remotes.desc_size) {
+ return NULL;
+ }
+
+ for (size_t i = 0; i < remotes.desc_size; ++i) {
+ if (remotes.desc[i].fd == -1) {
+ remotes.desc_used++;
+ remotes.desc[i].buf[0] = '\0';
+ remotes.desc[i].buf_used = 0;
+ return &remotes.desc[i];
+ }
+ }
+
+ return NULL;
+}
+
+static void disconnect_client(int epollfd, struct remote_desc * const current)
+{
+ if (current->fd > -1) {
+ if (epoll_ctl(epollfd, EPOLL_CTL_DEL, current->fd, NULL) < 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error deleting fd from epollq: %s", strerror(errno));
+ }
+ if (close(current->fd) != 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error closing fd: %s", strerror(errno));
+ }
+ }
+ current->fd = -1;
+ remotes.desc_used--;
+}
+
int main(void)
{
openlog("nDPIsrvd", LOG_CONS | LOG_PERROR, LOG_DAEMON);
+ remotes.desc_used = 0;
+ remotes.desc_size = 32;
+ remotes.desc = (struct remote_desc *) malloc(remotes.desc_size * sizeof(*remotes.desc));
+ if (remotes.desc == NULL) {
+ return 1;
+ }
+ for (size_t i = 0; i < remotes.desc_size; ++i)
+ {
+ remotes.desc[i].fd = -1;
+ }
+
+ unlink(json_sockpath);
+
if (create_listen_sockets() != 0)
{
return 1;
@@ -125,7 +179,7 @@ int main(void)
return 1;
}
- struct epoll_event accept_event;
+ struct epoll_event accept_event = {};
accept_event.data.fd = json_sockfd;
accept_event.events = EPOLLIN;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, json_sockfd, &accept_event) < 0)
@@ -143,12 +197,9 @@ int main(void)
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
- struct event remotes[64];
- size_t const remotes_size = sizeof(remotes) / sizeof(remotes[0]);
- size_t remotes_used = 0;
- struct event * remote_curr;
while (1)
{
+ struct remote_desc * current = NULL;
int nready = epoll_wait(epollfd, events, events_size, -1);
for (int i = 0; i < nready; i++)
@@ -156,33 +207,96 @@ int main(void)
if (events[i].events & EPOLLERR)
{
syslog(LOG_DAEMON | LOG_ERR, "Epoll event error: %s", strerror(errno));
- return 1;
+ continue;
}
+ /* New connection to collector / distributor. */
if (events[i].data.fd == json_sockfd ||
events[i].data.fd == serv_sockfd)
{
- if (remotes_used == remotes_size) {
- syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes_used);
+ current = get_unused_remote_descriptor();
+ if (current == NULL) {
+ syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used);
continue;
}
- remote_curr = &remotes[remotes_used++];
+ current->type = (events[i].data.fd == json_sockfd ? JSON_SOCK : SERV_SOCK);
- enum ev_type type = (events[i].data.fd == json_sockfd ? JSON_SOCK : SERV_SOCK);
int sockfd = (events[i].data.fd == json_sockfd ? json_sockfd : serv_sockfd);
- socklen_t peer_addr_len;
+ socklen_t peer_addr_len = (events[i].data.fd == json_sockfd
+ ? sizeof(current->event_json.peer)
+ : sizeof(current->event_serv.peer));
+
+ current->fd = accept(sockfd,
+ (current->type == JSON_SOCK
+ ? (struct sockaddr *) &current->event_json.peer
+ : (struct sockaddr *) &current->event_serv.peer),
+ &peer_addr_len);
+ if (current->fd < 0) {
+ syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno));
+ disconnect_client(epollfd, current);
+ continue;
+ }
+
+ syslog(LOG_DAEMON, "New %s connection", (current->type == JSON_SOCK
+ ? "collector"
+ : "distributor"));
+
+ /* nonblocking fd is mandatory */
+ int fd_flags = fcntl(current->fd, F_GETFL, 0);
+ if (fd_flags == -1 || fcntl(current->fd, F_SETFL, fd_flags | O_NONBLOCK) == -1)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
+ disconnect_client(epollfd, current);
+ continue;
+ }
- int newsockfd = accept(sockfd, (type == JSON_SOCK ? (struct sockaddr *)&remote_curr->event_json.peer
- : (struct sockaddr *)&remote_curr->event_serv.peer),
- &peer_addr_len);
- syslog(LOG_DAEMON, "New connection");
+ /* setup epoll event */
+ struct epoll_event accept_event = {};
+ accept_event.data.ptr = current;
+ accept_event.events = EPOLLIN;
+ if (epoll_ctl(epollfd, EPOLL_CTL_ADD, current->fd, &accept_event) < 0) {
+ disconnect_client(epollfd, current);
+ continue;
+ }
+ } else {
+ current = (struct remote_desc *) events[i].data.ptr;
+
+ if (current->fd < 0) {
+ syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd);
+ continue;
+ }
+
+ if (events[i].events & EPOLLHUP) {
+ syslog(LOG_DAEMON, "%s connection closed", (current->type == JSON_SOCK
+ ? "collector"
+ : "distributor"));
+ disconnect_client(epollfd, current);
+ continue;
+ }
+ if (events[i].events & EPOLLIN) {
+ errno = 0;
+ ssize_t bytes_read = read(current->fd,
+ current->buf + current->buf_used,
+ sizeof(current->buf) - current->buf_used);
+ if (bytes_read < 0 || errno != 0) {
+ disconnect_client(epollfd, current);
+ continue;
+ }
+ if (bytes_read == 0) {
+ syslog(LOG_DAEMON, "%s connection closed during read", (current->type == JSON_SOCK
+ ? "collector"
+ : "distributor"));
+ disconnect_client(epollfd, current);
+ continue;
+ }
+ current->buf_used += bytes_read;
+ }
}
}
}
close(json_sockfd);
close(serv_sockfd);
- unlink(json_sockpath);
return 0;
}