diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2020-08-03 16:27:14 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2020-08-03 16:27:14 +0200 |
commit | 92925a83552299a9462566248dee3e16a57434a6 (patch) | |
tree | 2467ea470f5183a75770b4f2ce0c3058d7c4ab87 /nDPIsrvd.c | |
parent | 536a1c03a55cc30e41dd12041238ec75706a8998 (diff) |
remote connection tracking/ event-handling for collector(UNIX) and distributor(TCP) connections
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r-- | nDPIsrvd.c | 152 |
1 files changed, 133 insertions, 19 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 0c35100eb..b940a4fea 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -15,8 +15,11 @@ enum ev_type { JSON_SOCK, SERV_SOCK }; -struct event { +struct remote_desc { enum ev_type type; + int fd; + uint8_t buf[BUFSIZ]; + size_t buf_used; union { struct { int json_sockfd; @@ -29,6 +32,12 @@ struct event { }; }; +static struct remotes { + struct remote_desc * desc; + size_t desc_size; + size_t desc_used; +} remotes = {NULL, 0, 0}; + static char json_sockpath[UNIX_PATH_MAX] = "/tmp/ndpid-collector.sock"; static char serv_listen_addr[INET6_ADDRSTRLEN] = "127.0.0.1"; static uint16_t serv_listen_port = 7000; @@ -45,8 +54,6 @@ static int create_listen_sockets(void) return 1; } - // This helps avoid spurious EADDRINUSE when the previous instance of this - // server died. int opt = 1; if (setsockopt(json_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0 || setsockopt(serv_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) @@ -109,10 +116,57 @@ static int create_listen_sockets(void) return 0; } +static struct remote_desc * get_unused_remote_descriptor(void) +{ + if (remotes.desc_used == remotes.desc_size) { + return NULL; + } + + for (size_t i = 0; i < remotes.desc_size; ++i) { + if (remotes.desc[i].fd == -1) { + remotes.desc_used++; + remotes.desc[i].buf[0] = '\0'; + remotes.desc[i].buf_used = 0; + return &remotes.desc[i]; + } + } + + return NULL; +} + +static void disconnect_client(int epollfd, struct remote_desc * const current) +{ + if (current->fd > -1) { + if (epoll_ctl(epollfd, EPOLL_CTL_DEL, current->fd, NULL) < 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Error deleting fd from epollq: %s", strerror(errno)); + } + if (close(current->fd) != 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Error closing fd: %s", strerror(errno)); + } + } + current->fd = -1; + remotes.desc_used--; +} + int main(void) { openlog("nDPIsrvd", LOG_CONS | LOG_PERROR, LOG_DAEMON); + remotes.desc_used = 0; + remotes.desc_size = 32; + remotes.desc = (struct remote_desc *) malloc(remotes.desc_size * sizeof(*remotes.desc)); + if (remotes.desc == NULL) { + return 1; + } + for (size_t i = 0; i < remotes.desc_size; ++i) + { + remotes.desc[i].fd = -1; + } + + unlink(json_sockpath); + if (create_listen_sockets() != 0) { return 1; @@ -125,7 +179,7 @@ int main(void) return 1; } - struct epoll_event accept_event; + struct epoll_event accept_event = {}; accept_event.data.fd = json_sockfd; accept_event.events = EPOLLIN; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, json_sockfd, &accept_event) < 0) @@ -143,12 +197,9 @@ int main(void) struct epoll_event events[32]; size_t const events_size = sizeof(events) / sizeof(events[0]); - struct event remotes[64]; - size_t const remotes_size = sizeof(remotes) / sizeof(remotes[0]); - size_t remotes_used = 0; - struct event * remote_curr; while (1) { + struct remote_desc * current = NULL; int nready = epoll_wait(epollfd, events, events_size, -1); for (int i = 0; i < nready; i++) @@ -156,33 +207,96 @@ int main(void) if (events[i].events & EPOLLERR) { syslog(LOG_DAEMON | LOG_ERR, "Epoll event error: %s", strerror(errno)); - return 1; + continue; } + /* New connection to collector / distributor. */ if (events[i].data.fd == json_sockfd || events[i].data.fd == serv_sockfd) { - if (remotes_used == remotes_size) { - syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes_used); + current = get_unused_remote_descriptor(); + if (current == NULL) { + syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used); continue; } - remote_curr = &remotes[remotes_used++]; + current->type = (events[i].data.fd == json_sockfd ? JSON_SOCK : SERV_SOCK); - enum ev_type type = (events[i].data.fd == json_sockfd ? JSON_SOCK : SERV_SOCK); int sockfd = (events[i].data.fd == json_sockfd ? json_sockfd : serv_sockfd); - socklen_t peer_addr_len; + socklen_t peer_addr_len = (events[i].data.fd == json_sockfd + ? sizeof(current->event_json.peer) + : sizeof(current->event_serv.peer)); + + current->fd = accept(sockfd, + (current->type == JSON_SOCK + ? (struct sockaddr *) ¤t->event_json.peer + : (struct sockaddr *) ¤t->event_serv.peer), + &peer_addr_len); + if (current->fd < 0) { + syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno)); + disconnect_client(epollfd, current); + continue; + } + + syslog(LOG_DAEMON, "New %s connection", (current->type == JSON_SOCK + ? "collector" + : "distributor")); + + /* nonblocking fd is mandatory */ + int fd_flags = fcntl(current->fd, F_GETFL, 0); + if (fd_flags == -1 || fcntl(current->fd, F_SETFL, fd_flags | O_NONBLOCK) == -1) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno)); + disconnect_client(epollfd, current); + continue; + } - int newsockfd = accept(sockfd, (type == JSON_SOCK ? (struct sockaddr *)&remote_curr->event_json.peer - : (struct sockaddr *)&remote_curr->event_serv.peer), - &peer_addr_len); - syslog(LOG_DAEMON, "New connection"); + /* setup epoll event */ + struct epoll_event accept_event = {}; + accept_event.data.ptr = current; + accept_event.events = EPOLLIN; + if (epoll_ctl(epollfd, EPOLL_CTL_ADD, current->fd, &accept_event) < 0) { + disconnect_client(epollfd, current); + continue; + } + } else { + current = (struct remote_desc *) events[i].data.ptr; + + if (current->fd < 0) { + syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd); + continue; + } + + if (events[i].events & EPOLLHUP) { + syslog(LOG_DAEMON, "%s connection closed", (current->type == JSON_SOCK + ? "collector" + : "distributor")); + disconnect_client(epollfd, current); + continue; + } + if (events[i].events & EPOLLIN) { + errno = 0; + ssize_t bytes_read = read(current->fd, + current->buf + current->buf_used, + sizeof(current->buf) - current->buf_used); + if (bytes_read < 0 || errno != 0) { + disconnect_client(epollfd, current); + continue; + } + if (bytes_read == 0) { + syslog(LOG_DAEMON, "%s connection closed during read", (current->type == JSON_SOCK + ? "collector" + : "distributor")); + disconnect_client(epollfd, current); + continue; + } + current->buf_used += bytes_read; + } } } } close(json_sockfd); close(serv_sockfd); - unlink(json_sockpath); return 0; } |