aboutsummaryrefslogtreecommitdiff
path: root/nDPIsrvd.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2020-08-04 10:27:18 +0200
committerToni Uhlig <matzeton@googlemail.com>2020-08-04 10:27:18 +0200
commitbbeb147cdec61d16d0c7dd4215cabb97f5492ee7 (patch)
treefdf1e8767545b20601a7677ecf4da3a5aba7cd95 /nDPIsrvd.c
parent913c8d5a18258d0b6fa0b7240d5d3a122b6910a3 (diff)
nDPIsrvd: collect, buffer and distribute JSON strings
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r--nDPIsrvd.c92
1 files changed, 82 insertions, 10 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 06fe08574..1c078ad1d 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -20,6 +20,9 @@ enum ev_type { JSON_SOCK, SERV_SOCK };
struct remote_desc {
enum ev_type type;
int fd;
+ uint8_t buf[BUFSIZ];
+ size_t buf_used;
+ unsigned long long int buf_wanted;
union {
struct {
int json_sockfd;
@@ -28,6 +31,7 @@ struct remote_desc {
struct {
int serv_sockfd;
struct sockaddr_in peer;
+ char peer_addr[INET_ADDRSTRLEN];
} event_serv;
};
};
@@ -39,7 +43,7 @@ static struct remotes {
} remotes = {NULL, 0, 0};
static char json_sockpath[UNIX_PATH_MAX] = COLLECTOR_UNIX_SOCKET;
-static char serv_listen_addr[INET6_ADDRSTRLEN] = DISTRIBUTOR_HOST;
+static char serv_listen_addr[INET_ADDRSTRLEN] = DISTRIBUTOR_HOST;
static uint16_t serv_listen_port = DISTRIBUTOR_PORT;
static int json_sockfd;
static int serv_sockfd;
@@ -75,7 +79,7 @@ static int create_listen_sockets(void)
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(serv_listen_port);
- if (inet_ntop(AF_INET, &serv_addr.sin_addr, &serv_listen_addr[0], INET_ADDRSTRLEN) == NULL)
+ if (inet_ntop(AF_INET, &serv_addr.sin_addr, &serv_listen_addr[0], sizeof(serv_listen_addr)) == NULL)
{
syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno));
return 1;
@@ -125,6 +129,9 @@ static struct remote_desc * get_unused_remote_descriptor(void)
for (size_t i = 0; i < remotes.desc_size; ++i) {
if (remotes.desc[i].fd == -1) {
remotes.desc_used++;
+ remotes.desc[i].buf[0] = '\0';
+ remotes.desc[i].buf_used = 0;
+ remotes.desc[i].buf_wanted = 0;
return &remotes.desc[i];
}
}
@@ -169,6 +176,9 @@ int main(void)
{
return 1;
}
+ syslog(LOG_DAEMON, "collector listen on %s", json_sockpath);
+ syslog(LOG_DAEMON, "distributor listen on %.*s:%u",
+ (int) sizeof(serv_listen_addr), serv_listen_addr, serv_listen_port);
int epollfd = epoll_create1(0);
if (epollfd < 0)
@@ -234,10 +244,25 @@ int main(void)
disconnect_client(epollfd, current);
continue;
}
+ if (events[i].data.fd == serv_sockfd &&
+ inet_ntop(current->event_serv.peer.sin_family, &current->event_serv.peer.sin_addr,
+ &current->event_serv.peer_addr[0], sizeof(current->event_serv.peer_addr)) == NULL)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error converting an internet address: %s", strerror(errno));
+ disconnect_client(epollfd, current);
+ continue;
+ }
- syslog(LOG_DAEMON, "New %s connection", (current->type == JSON_SOCK
- ? "collector"
- : "distributor"));
+ switch (current->type) {
+ case JSON_SOCK:
+ syslog(LOG_DAEMON, "New collector connection");
+ break;
+ case SERV_SOCK:
+ syslog(LOG_DAEMON, "New distributor connection from %.*s:%u",
+ (int) sizeof(current->event_serv.peer_addr), current->event_serv.peer_addr,
+ ntohs(current->event_serv.peer.sin_port));
+ break;
+ }
/* nonblocking fd is mandatory */
int fd_flags = fcntl(current->fd, F_GETFL, 0);
@@ -277,9 +302,11 @@ int main(void)
continue;
}
if (events[i].events & EPOLLIN) {
+ /* read JSON strings (or parts) from the UNIX socket (collecting) */
errno = 0;
- char buf[BUFSIZ];
- ssize_t bytes_read = read(current->fd, buf, sizeof(buf));
+ ssize_t bytes_read = read(current->fd,
+ current->buf + current->buf_used,
+ sizeof(current->buf) - current->buf_used);
if (bytes_read < 0 || errno != 0) {
disconnect_client(epollfd, current);
continue;
@@ -294,25 +321,70 @@ int main(void)
/* broadcast data coming from the json-collector socket to all tcp clients */
if (current->type == JSON_SOCK) {
+ /* buffer all data until we got the whole JSON string */
+ current->buf_used += bytes_read;
+ if (current->buf_wanted == 0) {
+ char * json_str_start = NULL;
+ errno = 0;
+ /* the first bytes are the textual representation of the following JSON string */
+ current->buf_wanted = strtoull((char *) current->buf, &json_str_start, 10);
+ current->buf_wanted += (uint8_t *) json_str_start - current->buf;
+ if (errno == ERANGE) {
+ current->buf_used = 0;
+ current->buf_wanted = 0;
+ syslog(LOG_DAEMON | LOG_ERR, "Size of JSON exceeds limit");
+ continue;
+ }
+ if ((uint8_t *) json_str_start == current->buf) {
+ current->buf_used = 0;
+ current->buf_wanted = 0;
+ syslog(LOG_DAEMON | LOG_ERR, "Missing size before JSON string, got: '%c'", current->buf[0]);
+ continue;
+ }
+ if (current->buf_wanted > BUFSIZ) {
+ current->buf_used = 0;
+ current->buf_wanted = 0;
+ syslog(LOG_DAEMON | LOG_ERR, "BUG: JSON string too big");
+ continue;
+ }
+ }
+ /* buffered enough data (full JSON String) ? */
+ if (current->buf_wanted > current->buf_used) {
+ continue;
+ }
+ /* after buffering complete, last character should always be a '}' (end of object) */
+ if (current->buf[current->buf_wanted - 1] != '}') {
+ current->buf_used = 0;
+ current->buf_wanted = 0;
+ syslog(LOG_DAEMON | LOG_ERR, "Invalid JSON string");
+ continue;
+ }
+
+ /* the essence: broadcast buffered JSON string to all connected TCP clients (distribution) */
for (size_t i = 0; i < remotes.desc_size; ++i) {
if (remotes.desc[i].fd < 0) {
continue;
}
if (remotes.desc[i].type == SERV_SOCK) {
- ssize_t bytes_written = write(remotes.desc[i].fd, buf, bytes_read);
+ ssize_t bytes_written = write(remotes.desc[i].fd, current->buf, current->buf_used);
if (bytes_written < 0 || errno != 0) {
- disconnect_client(epollfd, current);
+ syslog(LOG_DAEMON | LOG_ERR, "Written %zd of %zu bytes to fd %d: %s",
+ bytes_written, current->buf_used, remotes.desc[i].fd, strerror(errno));
+ disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
if (bytes_written == 0) {
syslog(LOG_DAEMON, "%s connection closed during write", (current->type == JSON_SOCK
? "collector"
: "distributor"));
- disconnect_client(epollfd, current);
+ disconnect_client(epollfd, &remotes.desc[i]);
continue;
}
}
}
+
+ current->buf_used = 0;
+ current->buf_wanted = 0;
}
}
}