summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--nDPIsrvd.c176
1 files changed, 84 insertions, 92 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 5aecb0bc3..f1c450443 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -383,6 +383,8 @@ int main(int argc, char ** argv)
if (current->type == JSON_SOCK)
{
shutdown(current->fd, SHUT_WR); // collector
+ } else {
+ shutdown(current->fd, SHUT_RD); // descriptor
}
/* setup epoll event */
@@ -413,7 +415,8 @@ int main(int argc, char ** argv)
disconnect_client(epollfd, current);
continue;
}
- if (events[i].events & EPOLLIN)
+
+ if (events[i].events & EPOLLIN && current->type == JSON_SOCK)
{
/* read JSON strings (or parts) from the UNIX socket (collecting) */
errno = 0;
@@ -433,120 +436,109 @@ int main(int argc, char ** argv)
continue;
}
- /* broadcast data coming from the json-collector socket to all tcp clients */
- if (current->type == JSON_SOCK)
+ current->buf.used += bytes_read;
+ while (current->event_json.json_bytes == 0 &&
+ current->buf.used >= nDPIsrvd_JSON_BYTES + 1)
{
- /* buffer all data until we got the whole JSON string */
- current->buf.used += bytes_read;
- if (current->event_json.json_bytes == 0)
+ if (current->buf.ptr[nDPIsrvd_JSON_BYTES] != '{')
{
- char * json_str_start = NULL;
- errno = 0;
- /* the first bytes are the textual representation of the following JSON string */
- current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10);
- current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr;
- if (errno == ERANGE)
- {
- current->buf.used = 0;
- current->event_json.json_bytes = 0;
- syslog(LOG_DAEMON | LOG_ERR, "Size of JSON exceeds limit");
- continue;
- }
- if ((uint8_t *)json_str_start == current->buf.ptr)
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "Missing size before JSON string: %.*s",
- (int)current->buf.used,
- current->buf.ptr);
- current->buf.used = 0;
- current->event_json.json_bytes = 0;
- continue;
- }
- if (current->event_json.json_bytes > current->buf.max)
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "BUG: JSON string too big: %llu > %zu",
- current->event_json.json_bytes,
- current->buf.max);
- current->buf.used = 0;
- current->event_json.json_bytes = 0;
- continue;
- }
+ syslog(LOG_DAEMON | LOG_ERR, "BUG: JSON invalid opening character: '%c'",
+ current->buf.ptr[nDPIsrvd_JSON_BYTES]);
+ disconnect_client(epollfd, current);
+ break;
+ }
+
+ char * json_str_start = NULL;
+ current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10);
+ current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr;
+
+ if (errno == ERANGE)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "BUG: Size of JSON exceeds limit");
+ disconnect_client(epollfd, current);
+ break;
+ }
+ if ((uint8_t *)json_str_start == current->buf.ptr)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "BUG: Missing size before JSON string: \"%.*s\"",
+ nDPIsrvd_JSON_BYTES, current->buf.ptr);
+ disconnect_client(epollfd, current);
+ break;
+ }
+ if (current->event_json.json_bytes > current->buf.max)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "BUG: JSON string too big: %llu > %zu",
+ current->event_json.json_bytes,
+ current->buf.max);
+ disconnect_client(epollfd, current);
+ break;
}
- /* buffered enough data (full JSON String) ? */
if (current->event_json.json_bytes > current->buf.used)
{
- continue;
+ break;
}
- /* after buffering complete, last character should always be a '}' (end of object) */
+
if (current->buf.ptr[current->event_json.json_bytes - 1] != '}')
{
syslog(LOG_DAEMON | LOG_ERR,
- "Invalid JSON string: %.*s",
+ "BUG: Invalid JSON string: %.*s",
(int)current->event_json.json_bytes,
current->buf.ptr);
- current->buf.used = 0;
- current->event_json.json_bytes = 0;
- continue;
+ disconnect_client(epollfd, current);
+ break;
}
- /* the essence: broadcast buffered JSON string to all connected TCP clients (distribution) */
for (size_t i = 0; i < remotes.desc_size; ++i)
{
if (remotes.desc[i].fd < 0)
{
continue;
}
- if (remotes.desc[i].type == SERV_SOCK)
+ if (remotes.desc[i].type != SERV_SOCK)
+ {
+ continue;
+ }
+ if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used)
{
- ssize_t bytes_written;
+ continue;
+ }
+
+ memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used,
+ current->buf.ptr, current->event_json.json_bytes);
+ remotes.desc[i].buf.used += current->event_json.json_bytes;
- if (remotes.desc[i].buf.used > 0) {
- bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr,
+ ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr,
remotes.desc[i].buf.used);
- if (bytes_written > 0) {
- memmove(remotes.desc[i].buf.ptr,
- remotes.desc[i].buf.ptr + remotes.desc[i].event_json.json_bytes,
- remotes.desc[i].buf.used - remotes.desc[i].event_json.json_bytes);
- remotes.desc[i].buf.used -= remotes.desc[i].event_json.json_bytes;
- }
- }
-
- bytes_written = write(remotes.desc[i].fd, current->buf.ptr,
- current->event_json.json_bytes);
- if (errno == EAGAIN)
- {
- if ((unsigned long long int)bytes_written < current->event_json.json_bytes) {
- if (remotes.desc[i].buf.max - remotes.desc[i].buf.used < current->event_json.json_bytes)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Distributor write buffer bloat, no more space available.");
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
- }
- syslog(LOG_DAEMON | LOG_ERR, "Distributor write temporarily failed, buffering ..");
- memcpy(remotes.desc[i].buf.ptr,
- remotes.desc[i].buf.ptr + remotes.desc[i].buf.used,
- current->event_json.json_bytes);
- remotes.desc[i].buf.used += current->event_json.json_bytes;
- }
- continue;
- }
- if (bytes_written < 0 || errno != 0)
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "Distributor connection closed, send failed: %s",
- strerror(errno));
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
- }
- if (bytes_written == 0)
- {
- syslog(LOG_DAEMON,
- "Distributor connection closed during write");
- disconnect_client(epollfd, &remotes.desc[i]);
- continue;
- }
+ if (errno == EAGAIN) {
+ continue;
+ }
+ if (bytes_written < 0 || errno != 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Distributor connection closed, send failed: %s",
+ strerror(errno));
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ if (bytes_written == 0)
+ {
+ syslog(LOG_DAEMON,
+ "Distributor connection closed during write");
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
}
+ if ((size_t)bytes_written != remotes.desc[i].buf.used)
+ {
+ syslog(LOG_DAEMON,
+ "Distributor connection wrote less bytes than expected: %zd < %zu",
+ bytes_written, remotes.desc[i].buf.used);
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+
+ remotes.desc[i].buf.used = 0;
}
memmove(current->buf.ptr,