diff options
-rw-r--r-- | nDPId.c | 7 | ||||
-rw-r--r-- | nDPIsrvd.c | 176 |
2 files changed, 113 insertions, 70 deletions
@@ -673,8 +673,8 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread, int s_ret; char newline_json_str[BUFSIZ]; - s_ret = snprintf(newline_json_str, sizeof(newline_json_str), "%zu%.*s\n", - json_str_len, (int)json_str_len, json_str); + s_ret = + snprintf(newline_json_str, sizeof(newline_json_str), "%zu%.*s\n", json_str_len, (int)json_str_len, json_str); if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str)) { syslog(LOG_DAEMON | LOG_ERR, @@ -1104,7 +1104,8 @@ static void ndpi_process_packet(uint8_t * const args, } workflow->packets_captured++; - time_ms = ((uint64_t)header->ts.tv_sec) * nDPId_TICK_RESOLUTION + header->ts.tv_usec / (1000000 / nDPId_TICK_RESOLUTION); + time_ms = + ((uint64_t)header->ts.tv_sec) * nDPId_TICK_RESOLUTION + header->ts.tv_usec / (1000000 / nDPId_TICK_RESOLUTION); workflow->last_time = time_ms; check_for_idle_flows(reader_thread); diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 1c078ad1d..b2d8eef20 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -15,20 +15,27 @@ #include "config.h" -enum ev_type { JSON_SOCK, SERV_SOCK }; +enum ev_type +{ + JSON_SOCK, + SERV_SOCK +}; -struct remote_desc { +struct remote_desc +{ enum ev_type type; int fd; uint8_t buf[BUFSIZ]; size_t buf_used; unsigned long long int buf_wanted; union { - struct { + struct + { int json_sockfd; struct sockaddr_un peer; } event_json; - struct { + struct + { int serv_sockfd; struct sockaddr_in peer; char peer_addr[INET_ADDRSTRLEN]; @@ -36,7 +43,8 @@ struct remote_desc { }; }; -static struct remotes { +static struct remotes +{ struct remote_desc * desc; size_t desc_size; size_t desc_used; @@ -122,12 +130,15 @@ static int create_listen_sockets(void) static struct remote_desc * get_unused_remote_descriptor(void) { - if (remotes.desc_used == remotes.desc_size) { + if (remotes.desc_used == remotes.desc_size) + { return NULL; } - for (size_t i = 0; i < remotes.desc_size; ++i) { - if (remotes.desc[i].fd == -1) { + for (size_t i = 0; i < remotes.desc_size; ++i) + { + if (remotes.desc[i].fd == -1) + { remotes.desc_used++; remotes.desc[i].buf[0] = '\0'; remotes.desc[i].buf_used = 0; @@ -141,7 +152,8 @@ static struct remote_desc * get_unused_remote_descriptor(void) static void disconnect_client(int epollfd, struct remote_desc * const current) { - if (current->fd > -1) { + if (current->fd > -1) + { if (epoll_ctl(epollfd, EPOLL_CTL_DEL, current->fd, NULL) < 0) { syslog(LOG_DAEMON | LOG_ERR, "Error deleting fd from epollq: %s", strerror(errno)); @@ -161,8 +173,9 @@ int main(void) remotes.desc_used = 0; remotes.desc_size = 32; - remotes.desc = (struct remote_desc *) malloc(remotes.desc_size * sizeof(*remotes.desc)); - if (remotes.desc == NULL) { + remotes.desc = (struct remote_desc *)malloc(remotes.desc_size * sizeof(*remotes.desc)); + if (remotes.desc == NULL) + { return 1; } for (size_t i = 0; i < remotes.desc_size; ++i) @@ -177,8 +190,8 @@ int main(void) return 1; } syslog(LOG_DAEMON, "collector listen on %s", json_sockpath); - syslog(LOG_DAEMON, "distributor listen on %.*s:%u", - (int) sizeof(serv_listen_addr), serv_listen_addr, serv_listen_port); + syslog( + LOG_DAEMON, "distributor listen on %.*s:%u", (int)sizeof(serv_listen_addr), serv_listen_addr, serv_listen_port); int epollfd = epoll_create1(0); if (epollfd < 0) @@ -219,47 +232,50 @@ int main(void) } /* New connection to collector / distributor. */ - if (events[i].data.fd == json_sockfd || - events[i].data.fd == serv_sockfd) + if (events[i].data.fd == json_sockfd || events[i].data.fd == serv_sockfd) { current = get_unused_remote_descriptor(); - if (current == NULL) { + if (current == NULL) + { syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used); continue; } current->type = (events[i].data.fd == json_sockfd ? JSON_SOCK : SERV_SOCK); int sockfd = (events[i].data.fd == json_sockfd ? json_sockfd : serv_sockfd); - socklen_t peer_addr_len = (events[i].data.fd == json_sockfd - ? sizeof(current->event_json.peer) - : sizeof(current->event_serv.peer)); + socklen_t peer_addr_len = (events[i].data.fd == json_sockfd ? sizeof(current->event_json.peer) + : sizeof(current->event_serv.peer)); current->fd = accept(sockfd, - (current->type == JSON_SOCK - ? (struct sockaddr *) ¤t->event_json.peer - : (struct sockaddr *) ¤t->event_serv.peer), + (current->type == JSON_SOCK ? (struct sockaddr *)¤t->event_json.peer + : (struct sockaddr *)¤t->event_serv.peer), &peer_addr_len); - if (current->fd < 0) { + if (current->fd < 0) + { syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno)); disconnect_client(epollfd, current); continue; } - if (events[i].data.fd == serv_sockfd && - inet_ntop(current->event_serv.peer.sin_family, ¤t->event_serv.peer.sin_addr, - ¤t->event_serv.peer_addr[0], sizeof(current->event_serv.peer_addr)) == NULL) + if (events[i].data.fd == serv_sockfd && inet_ntop(current->event_serv.peer.sin_family, + ¤t->event_serv.peer.sin_addr, + ¤t->event_serv.peer_addr[0], + sizeof(current->event_serv.peer_addr)) == NULL) { syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno)); disconnect_client(epollfd, current); continue; } - switch (current->type) { + switch (current->type) + { case JSON_SOCK: syslog(LOG_DAEMON, "New collector connection"); break; case SERV_SOCK: - syslog(LOG_DAEMON, "New distributor connection from %.*s:%u", - (int) sizeof(current->event_serv.peer_addr), current->event_serv.peer_addr, + syslog(LOG_DAEMON, + "New distributor connection from %.*s:%u", + (int)sizeof(current->event_serv.peer_addr), + current->event_serv.peer_addr, ntohs(current->event_serv.peer.sin_port)); break; } @@ -274,7 +290,8 @@ int main(void) } /* shutdown writing end for collector clients */ - if (current->type == JSON_SOCK) { + if (current->type == JSON_SOCK) + { shutdown(current->fd, SHUT_WR); // collector } @@ -282,66 +299,80 @@ int main(void) struct epoll_event accept_event = {}; accept_event.data.ptr = current; accept_event.events = EPOLLIN; - if (epoll_ctl(epollfd, EPOLL_CTL_ADD, current->fd, &accept_event) < 0) { + if (epoll_ctl(epollfd, EPOLL_CTL_ADD, current->fd, &accept_event) < 0) + { disconnect_client(epollfd, current); continue; } - } else { - current = (struct remote_desc *) events[i].data.ptr; + } + else + { + current = (struct remote_desc *)events[i].data.ptr; - if (current->fd < 0) { + if (current->fd < 0) + { syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd); continue; } - if (events[i].events & EPOLLHUP) { - syslog(LOG_DAEMON, "%s connection closed", (current->type == JSON_SOCK - ? "collector" - : "distributor")); + if (events[i].events & EPOLLHUP) + { + syslog(LOG_DAEMON, + "%s connection closed", + (current->type == JSON_SOCK ? "collector" : "distributor")); disconnect_client(epollfd, current); continue; } - if (events[i].events & EPOLLIN) { + if (events[i].events & EPOLLIN) + { /* read JSON strings (or parts) from the UNIX socket (collecting) */ errno = 0; - ssize_t bytes_read = read(current->fd, - current->buf + current->buf_used, - sizeof(current->buf) - current->buf_used); - if (bytes_read < 0 || errno != 0) { + ssize_t bytes_read = + read(current->fd, current->buf + current->buf_used, sizeof(current->buf) - current->buf_used); + if (bytes_read < 0 || errno != 0) + { disconnect_client(epollfd, current); continue; } - if (bytes_read == 0) { - syslog(LOG_DAEMON, "%s connection closed during read", (current->type == JSON_SOCK - ? "collector" - : "distributor")); + if (bytes_read == 0) + { + syslog(LOG_DAEMON, + "%s connection closed during read", + (current->type == JSON_SOCK ? "collector" : "distributor")); disconnect_client(epollfd, current); continue; } /* broadcast data coming from the json-collector socket to all tcp clients */ - if (current->type == JSON_SOCK) { + if (current->type == JSON_SOCK) + { /* buffer all data until we got the whole JSON string */ current->buf_used += bytes_read; - if (current->buf_wanted == 0) { + if (current->buf_wanted == 0) + { char * json_str_start = NULL; errno = 0; /* the first bytes are the textual representation of the following JSON string */ - current->buf_wanted = strtoull((char *) current->buf, &json_str_start, 10); - current->buf_wanted += (uint8_t *) json_str_start - current->buf; - if (errno == ERANGE) { + current->buf_wanted = strtoull((char *)current->buf, &json_str_start, 10); + current->buf_wanted += (uint8_t *)json_str_start - current->buf; + if (errno == ERANGE) + { current->buf_used = 0; current->buf_wanted = 0; syslog(LOG_DAEMON | LOG_ERR, "Size of JSON exceeds limit"); continue; } - if ((uint8_t *) json_str_start == current->buf) { + if ((uint8_t *)json_str_start == current->buf) + { current->buf_used = 0; current->buf_wanted = 0; - syslog(LOG_DAEMON | LOG_ERR, "Missing size before JSON string, got: '%c'", current->buf[0]); + syslog(LOG_DAEMON | LOG_ERR, + "Missing size before JSON string, got: '%c'", + current->buf[0]); continue; } - if (current->buf_wanted > BUFSIZ) { + if (current->buf_wanted > BUFSIZ) + { current->buf_used = 0; current->buf_wanted = 0; syslog(LOG_DAEMON | LOG_ERR, "BUG: JSON string too big"); @@ -349,11 +380,13 @@ int main(void) } } /* buffered enough data (full JSON String) ? */ - if (current->buf_wanted > current->buf_used) { + if (current->buf_wanted > current->buf_used) + { continue; } /* after buffering complete, last character should always be a '}' (end of object) */ - if (current->buf[current->buf_wanted - 1] != '}') { + if (current->buf[current->buf_wanted - 1] != '}') + { current->buf_used = 0; current->buf_wanted = 0; syslog(LOG_DAEMON | LOG_ERR, "Invalid JSON string"); @@ -361,22 +394,31 @@ int main(void) } /* the essence: broadcast buffered JSON string to all connected TCP clients (distribution) */ - for (size_t i = 0; i < remotes.desc_size; ++i) { - if (remotes.desc[i].fd < 0) { + for (size_t i = 0; i < remotes.desc_size; ++i) + { + if (remotes.desc[i].fd < 0) + { continue; } - if (remotes.desc[i].type == SERV_SOCK) { + if (remotes.desc[i].type == SERV_SOCK) + { ssize_t bytes_written = write(remotes.desc[i].fd, current->buf, current->buf_used); - if (bytes_written < 0 || errno != 0) { - syslog(LOG_DAEMON | LOG_ERR, "Written %zd of %zu bytes to fd %d: %s", - bytes_written, current->buf_used, remotes.desc[i].fd, strerror(errno)); + if (bytes_written < 0 || errno != 0) + { + syslog(LOG_DAEMON | LOG_ERR, + "Written %zd of %zu bytes to fd %d: %s", + bytes_written, + current->buf_used, + remotes.desc[i].fd, + strerror(errno)); disconnect_client(epollfd, &remotes.desc[i]); continue; } - if (bytes_written == 0) { - syslog(LOG_DAEMON, "%s connection closed during write", (current->type == JSON_SOCK - ? "collector" - : "distributor")); + if (bytes_written == 0) + { + syslog(LOG_DAEMON, + "%s connection closed during write", + (current->type == JSON_SOCK ? "collector" : "distributor")); disconnect_client(epollfd, &remotes.desc[i]); continue; } |