summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--nDPId.c7
-rw-r--r--nDPIsrvd.c176
2 files changed, 113 insertions, 70 deletions
diff --git a/nDPId.c b/nDPId.c
index 2f8ffb778..9c432ea8a 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -673,8 +673,8 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
int s_ret;
char newline_json_str[BUFSIZ];
- s_ret = snprintf(newline_json_str, sizeof(newline_json_str), "%zu%.*s\n",
- json_str_len, (int)json_str_len, json_str);
+ s_ret =
+ snprintf(newline_json_str, sizeof(newline_json_str), "%zu%.*s\n", json_str_len, (int)json_str_len, json_str);
if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str))
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -1104,7 +1104,8 @@ static void ndpi_process_packet(uint8_t * const args,
}
workflow->packets_captured++;
- time_ms = ((uint64_t)header->ts.tv_sec) * nDPId_TICK_RESOLUTION + header->ts.tv_usec / (1000000 / nDPId_TICK_RESOLUTION);
+ time_ms =
+ ((uint64_t)header->ts.tv_sec) * nDPId_TICK_RESOLUTION + header->ts.tv_usec / (1000000 / nDPId_TICK_RESOLUTION);
workflow->last_time = time_ms;
check_for_idle_flows(reader_thread);
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 1c078ad1d..b2d8eef20 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -15,20 +15,27 @@
#include "config.h"
-enum ev_type { JSON_SOCK, SERV_SOCK };
+enum ev_type
+{
+ JSON_SOCK,
+ SERV_SOCK
+};
-struct remote_desc {
+struct remote_desc
+{
enum ev_type type;
int fd;
uint8_t buf[BUFSIZ];
size_t buf_used;
unsigned long long int buf_wanted;
union {
- struct {
+ struct
+ {
int json_sockfd;
struct sockaddr_un peer;
} event_json;
- struct {
+ struct
+ {
int serv_sockfd;
struct sockaddr_in peer;
char peer_addr[INET_ADDRSTRLEN];
@@ -36,7 +43,8 @@ struct remote_desc {
};
};
-static struct remotes {
+static struct remotes
+{
struct remote_desc * desc;
size_t desc_size;
size_t desc_used;
@@ -122,12 +130,15 @@ static int create_listen_sockets(void)
static struct remote_desc * get_unused_remote_descriptor(void)
{
- if (remotes.desc_used == remotes.desc_size) {
+ if (remotes.desc_used == remotes.desc_size)
+ {
return NULL;
}
- for (size_t i = 0; i < remotes.desc_size; ++i) {
- if (remotes.desc[i].fd == -1) {
+ for (size_t i = 0; i < remotes.desc_size; ++i)
+ {
+ if (remotes.desc[i].fd == -1)
+ {
remotes.desc_used++;
remotes.desc[i].buf[0] = '\0';
remotes.desc[i].buf_used = 0;
@@ -141,7 +152,8 @@ static struct remote_desc * get_unused_remote_descriptor(void)
static void disconnect_client(int epollfd, struct remote_desc * const current)
{
- if (current->fd > -1) {
+ if (current->fd > -1)
+ {
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, current->fd, NULL) < 0)
{
syslog(LOG_DAEMON | LOG_ERR, "Error deleting fd from epollq: %s", strerror(errno));
@@ -161,8 +173,9 @@ int main(void)
remotes.desc_used = 0;
remotes.desc_size = 32;
- remotes.desc = (struct remote_desc *) malloc(remotes.desc_size * sizeof(*remotes.desc));
- if (remotes.desc == NULL) {
+ remotes.desc = (struct remote_desc *)malloc(remotes.desc_size * sizeof(*remotes.desc));
+ if (remotes.desc == NULL)
+ {
return 1;
}
for (size_t i = 0; i < remotes.desc_size; ++i)
@@ -177,8 +190,8 @@ int main(void)
return 1;
}
syslog(LOG_DAEMON, "collector listen on %s", json_sockpath);
- syslog(LOG_DAEMON, "distributor listen on %.*s:%u",
- (int) sizeof(serv_listen_addr), serv_listen_addr, serv_listen_port);
+ syslog(
+ LOG_DAEMON, "distributor listen on %.*s:%u", (int)sizeof(serv_listen_addr), serv_listen_addr, serv_listen_port);
int epollfd = epoll_create1(0);
if (epollfd < 0)
@@ -219,47 +232,50 @@ int main(void)
}
/* New connection to collector / distributor. */
- if (events[i].data.fd == json_sockfd ||
- events[i].data.fd == serv_sockfd)
+ if (events[i].data.fd == json_sockfd || events[i].data.fd == serv_sockfd)
{
current = get_unused_remote_descriptor();
- if (current == NULL) {
+ if (current == NULL)
+ {
syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used);
continue;
}
current->type = (events[i].data.fd == json_sockfd ? JSON_SOCK : SERV_SOCK);
int sockfd = (events[i].data.fd == json_sockfd ? json_sockfd : serv_sockfd);
- socklen_t peer_addr_len = (events[i].data.fd == json_sockfd
- ? sizeof(current->event_json.peer)
- : sizeof(current->event_serv.peer));
+ socklen_t peer_addr_len = (events[i].data.fd == json_sockfd ? sizeof(current->event_json.peer)
+ : sizeof(current->event_serv.peer));
current->fd = accept(sockfd,
- (current->type == JSON_SOCK
- ? (struct sockaddr *) &current->event_json.peer
- : (struct sockaddr *) &current->event_serv.peer),
+ (current->type == JSON_SOCK ? (struct sockaddr *)&current->event_json.peer
+ : (struct sockaddr *)&current->event_serv.peer),
&peer_addr_len);
- if (current->fd < 0) {
+ if (current->fd < 0)
+ {
syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno));
disconnect_client(epollfd, current);
continue;
}
- if (events[i].data.fd == serv_sockfd &&
- inet_ntop(current->event_serv.peer.sin_family, &current->event_serv.peer.sin_addr,
- &current->event_serv.peer_addr[0], sizeof(current->event_serv.peer_addr)) == NULL)
+ if (events[i].data.fd == serv_sockfd && inet_ntop(current->event_serv.peer.sin_family,
+ &current->event_serv.peer.sin_addr,
+ &current->event_serv.peer_addr[0],
+ sizeof(current->event_serv.peer_addr)) == NULL)
{
syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno));
disconnect_client(epollfd, current);
continue;
}
- switch (current->type) {
+ switch (current->type)
+ {
case JSON_SOCK:
syslog(LOG_DAEMON, "New collector connection");
break;
case SERV_SOCK:
- syslog(LOG_DAEMON, "New distributor connection from %.*s:%u",
- (int) sizeof(current->event_serv.peer_addr), current->event_serv.peer_addr,
+ syslog(LOG_DAEMON,
+ "New distributor connection from %.*s:%u",
+ (int)sizeof(current->event_serv.peer_addr),
+ current->event_serv.peer_addr,
ntohs(current->event_serv.peer.sin_port));
break;
}
@@ -274,7 +290,8 @@ int main(void)
}
/* shutdown writing end for collector clients */
- if (current->type == JSON_SOCK) {
+ if (current->type == JSON_SOCK)
+ {
shutdown(current->fd, SHUT_WR); // collector
}
@@ -282,66 +299,80 @@ int main(void)
struct epoll_event accept_event = {};
accept_event.data.ptr = current;
accept_event.events = EPOLLIN;
- if (epoll_ctl(epollfd, EPOLL_CTL_ADD, current->fd, &accept_event) < 0) {
+ if (epoll_ctl(epollfd, EPOLL_CTL_ADD, current->fd, &accept_event) < 0)
+ {
disconnect_client(epollfd, current);
continue;
}
- } else {
- current = (struct remote_desc *) events[i].data.ptr;
+ }
+ else
+ {
+ current = (struct remote_desc *)events[i].data.ptr;
- if (current->fd < 0) {
+ if (current->fd < 0)
+ {
syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd);
continue;
}
- if (events[i].events & EPOLLHUP) {
- syslog(LOG_DAEMON, "%s connection closed", (current->type == JSON_SOCK
- ? "collector"
- : "distributor"));
+ if (events[i].events & EPOLLHUP)
+ {
+ syslog(LOG_DAEMON,
+ "%s connection closed",
+ (current->type == JSON_SOCK ? "collector" : "distributor"));
disconnect_client(epollfd, current);
continue;
}
- if (events[i].events & EPOLLIN) {
+ if (events[i].events & EPOLLIN)
+ {
/* read JSON strings (or parts) from the UNIX socket (collecting) */
errno = 0;
- ssize_t bytes_read = read(current->fd,
- current->buf + current->buf_used,
- sizeof(current->buf) - current->buf_used);
- if (bytes_read < 0 || errno != 0) {
+ ssize_t bytes_read =
+ read(current->fd, current->buf + current->buf_used, sizeof(current->buf) - current->buf_used);
+ if (bytes_read < 0 || errno != 0)
+ {
disconnect_client(epollfd, current);
continue;
}
- if (bytes_read == 0) {
- syslog(LOG_DAEMON, "%s connection closed during read", (current->type == JSON_SOCK
- ? "collector"
- : "distributor"));
+ if (bytes_read == 0)
+ {
+ syslog(LOG_DAEMON,
+ "%s connection closed during read",
+ (current->type == JSON_SOCK ? "collector" : "distributor"));
disconnect_client(epollfd, current);
continue;
}
/* broadcast data coming from the json-collector socket to all tcp clients */
- if (current->type == JSON_SOCK) {
+ if (current->type == JSON_SOCK)
+ {
/* buffer all data until we got the whole JSON string */
current->buf_used += bytes_read;
- if (current->buf_wanted == 0) {
+ if (current->buf_wanted == 0)
+ {
char * json_str_start = NULL;
errno = 0;
/* the first bytes are the textual representation of the following JSON string */
- current->buf_wanted = strtoull((char *) current->buf, &json_str_start, 10);
- current->buf_wanted += (uint8_t *) json_str_start - current->buf;
- if (errno == ERANGE) {
+ current->buf_wanted = strtoull((char *)current->buf, &json_str_start, 10);
+ current->buf_wanted += (uint8_t *)json_str_start - current->buf;
+ if (errno == ERANGE)
+ {
current->buf_used = 0;
current->buf_wanted = 0;
syslog(LOG_DAEMON | LOG_ERR, "Size of JSON exceeds limit");
continue;
}
- if ((uint8_t *) json_str_start == current->buf) {
+ if ((uint8_t *)json_str_start == current->buf)
+ {
current->buf_used = 0;
current->buf_wanted = 0;
- syslog(LOG_DAEMON | LOG_ERR, "Missing size before JSON string, got: '%c'", current->buf[0]);
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Missing size before JSON string, got: '%c'",
+ current->buf[0]);
continue;
}
- if (current->buf_wanted > BUFSIZ) {
+ if (current->buf_wanted > BUFSIZ)
+ {
current->buf_used = 0;
current->buf_wanted = 0;
syslog(LOG_DAEMON | LOG_ERR, "BUG: JSON string too big");
@@ -349,11 +380,13 @@ int main(void)
}
}
/* buffered enough data (full JSON String) ? */
- if (current->buf_wanted > current->buf_used) {
+ if (current->buf_wanted > current->buf_used)
+ {
continue;
}
/* after buffering complete, last character should always be a '}' (end of object) */
- if (current->buf[current->buf_wanted - 1] != '}') {
+ if (current->buf[current->buf_wanted - 1] != '}')
+ {
current->buf_used = 0;
current->buf_wanted = 0;
syslog(LOG_DAEMON | LOG_ERR, "Invalid JSON string");
@@ -361,22 +394,31 @@ int main(void)
}
/* the essence: broadcast buffered JSON string to all connected TCP clients (distribution) */
- for (size_t i = 0; i < remotes.desc_size; ++i) {
- if (remotes.desc[i].fd < 0) {
+ for (size_t i = 0; i < remotes.desc_size; ++i)
+ {
+ if (remotes.desc[i].fd < 0)
+ {
continue;
}
- if (remotes.desc[i].type == SERV_SOCK) {
+ if (remotes.desc[i].type == SERV_SOCK)
+ {
ssize_t bytes_written = write(remotes.desc[i].fd, current->buf, current->buf_used);
- if (bytes_written < 0 || errno != 0) {
- syslog(LOG_DAEMON | LOG_ERR, "Written %zd of %zu bytes to fd %d: %s",
- bytes_written, current->buf_used, remotes.desc[i].fd, strerror(errno));
+ if (bytes_written < 0 || errno != 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Written %zd of %zu bytes to fd %d: %s",
+ bytes_written,
+ current->buf_used,
+ remotes.desc[i].fd,
+ strerror(errno));
disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
- if (bytes_written == 0) {
- syslog(LOG_DAEMON, "%s connection closed during write", (current->type == JSON_SOCK
- ? "collector"
- : "distributor"));
+ if (bytes_written == 0)
+ {
+ syslog(LOG_DAEMON,
+ "%s connection closed during write",
+ (current->type == JSON_SOCK ? "collector" : "distributor"));
disconnect_client(epollfd, &remotes.desc[i]);
continue;
}