diff options
-rw-r--r-- | nDPIsrvd.c | 92 |
1 files changed, 82 insertions, 10 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 06fe08574..1c078ad1d 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -20,6 +20,9 @@ enum ev_type { JSON_SOCK, SERV_SOCK }; struct remote_desc { enum ev_type type; int fd; + uint8_t buf[BUFSIZ]; + size_t buf_used; + unsigned long long int buf_wanted; union { struct { int json_sockfd; @@ -28,6 +31,7 @@ struct remote_desc { struct { int serv_sockfd; struct sockaddr_in peer; + char peer_addr[INET_ADDRSTRLEN]; } event_serv; }; }; @@ -39,7 +43,7 @@ static struct remotes { } remotes = {NULL, 0, 0}; static char json_sockpath[UNIX_PATH_MAX] = COLLECTOR_UNIX_SOCKET; -static char serv_listen_addr[INET6_ADDRSTRLEN] = DISTRIBUTOR_HOST; +static char serv_listen_addr[INET_ADDRSTRLEN] = DISTRIBUTOR_HOST; static uint16_t serv_listen_port = DISTRIBUTOR_PORT; static int json_sockfd; static int serv_sockfd; @@ -75,7 +79,7 @@ static int create_listen_sockets(void) serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = INADDR_ANY; serv_addr.sin_port = htons(serv_listen_port); - if (inet_ntop(AF_INET, &serv_addr.sin_addr, &serv_listen_addr[0], INET_ADDRSTRLEN) == NULL) + if (inet_ntop(AF_INET, &serv_addr.sin_addr, &serv_listen_addr[0], sizeof(serv_listen_addr)) == NULL) { syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno)); return 1; @@ -125,6 +129,9 @@ static struct remote_desc * get_unused_remote_descriptor(void) for (size_t i = 0; i < remotes.desc_size; ++i) { if (remotes.desc[i].fd == -1) { remotes.desc_used++; + remotes.desc[i].buf[0] = '\0'; + remotes.desc[i].buf_used = 0; + remotes.desc[i].buf_wanted = 0; return &remotes.desc[i]; } } @@ -169,6 +176,9 @@ int main(void) { return 1; } + syslog(LOG_DAEMON, "collector listen on %s", json_sockpath); + syslog(LOG_DAEMON, "distributor listen on %.*s:%u", + (int) sizeof(serv_listen_addr), serv_listen_addr, serv_listen_port); int epollfd = epoll_create1(0); if (epollfd < 0) @@ -234,10 +244,25 @@ int main(void) disconnect_client(epollfd, current); continue; } + if (events[i].data.fd == serv_sockfd && + inet_ntop(current->event_serv.peer.sin_family, ¤t->event_serv.peer.sin_addr, + ¤t->event_serv.peer_addr[0], sizeof(current->event_serv.peer_addr)) == NULL) + { + syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno)); + disconnect_client(epollfd, current); + continue; + } - syslog(LOG_DAEMON, "New %s connection", (current->type == JSON_SOCK - ? "collector" - : "distributor")); + switch (current->type) { + case JSON_SOCK: + syslog(LOG_DAEMON, "New collector connection"); + break; + case SERV_SOCK: + syslog(LOG_DAEMON, "New distributor connection from %.*s:%u", + (int) sizeof(current->event_serv.peer_addr), current->event_serv.peer_addr, + ntohs(current->event_serv.peer.sin_port)); + break; + } /* nonblocking fd is mandatory */ int fd_flags = fcntl(current->fd, F_GETFL, 0); @@ -277,9 +302,11 @@ int main(void) continue; } if (events[i].events & EPOLLIN) { + /* read JSON strings (or parts) from the UNIX socket (collecting) */ errno = 0; - char buf[BUFSIZ]; - ssize_t bytes_read = read(current->fd, buf, sizeof(buf)); + ssize_t bytes_read = read(current->fd, + current->buf + current->buf_used, + sizeof(current->buf) - current->buf_used); if (bytes_read < 0 || errno != 0) { disconnect_client(epollfd, current); continue; @@ -294,25 +321,70 @@ int main(void) /* broadcast data coming from the json-collector socket to all tcp clients */ if (current->type == JSON_SOCK) { + /* buffer all data until we got the whole JSON string */ + current->buf_used += bytes_read; + if (current->buf_wanted == 0) { + char * json_str_start = NULL; + errno = 0; + /* the first bytes are the textual representation of the following JSON string */ + current->buf_wanted = strtoull((char *) current->buf, &json_str_start, 10); + current->buf_wanted += (uint8_t *) json_str_start - current->buf; + if (errno == ERANGE) { + current->buf_used = 0; + current->buf_wanted = 0; + syslog(LOG_DAEMON | LOG_ERR, "Size of JSON exceeds limit"); + continue; + } + if ((uint8_t *) json_str_start == current->buf) { + current->buf_used = 0; + current->buf_wanted = 0; + syslog(LOG_DAEMON | LOG_ERR, "Missing size before JSON string, got: '%c'", current->buf[0]); + continue; + } + if (current->buf_wanted > BUFSIZ) { + current->buf_used = 0; + current->buf_wanted = 0; + syslog(LOG_DAEMON | LOG_ERR, "BUG: JSON string too big"); + continue; + } + } + /* buffered enough data (full JSON String) ? */ + if (current->buf_wanted > current->buf_used) { + continue; + } + /* after buffering complete, last character should always be a '}' (end of object) */ + if (current->buf[current->buf_wanted - 1] != '}') { + current->buf_used = 0; + current->buf_wanted = 0; + syslog(LOG_DAEMON | LOG_ERR, "Invalid JSON string"); + continue; + } + + /* the essence: broadcast buffered JSON string to all connected TCP clients (distribution) */ for (size_t i = 0; i < remotes.desc_size; ++i) { if (remotes.desc[i].fd < 0) { continue; } if (remotes.desc[i].type == SERV_SOCK) { - ssize_t bytes_written = write(remotes.desc[i].fd, buf, bytes_read); + ssize_t bytes_written = write(remotes.desc[i].fd, current->buf, current->buf_used); if (bytes_written < 0 || errno != 0) { - disconnect_client(epollfd, current); + syslog(LOG_DAEMON | LOG_ERR, "Written %zd of %zu bytes to fd %d: %s", + bytes_written, current->buf_used, remotes.desc[i].fd, strerror(errno)); + disconnect_client(epollfd, &remotes.desc[i]); continue; } if (bytes_written == 0) { syslog(LOG_DAEMON, "%s connection closed during write", (current->type == JSON_SOCK ? "collector" : "distributor")); - disconnect_client(epollfd, current); + disconnect_client(epollfd, &remotes.desc[i]); continue; } } } + + current->buf_used = 0; + current->buf_wanted = 0; } } } |