diff options
-rw-r--r-- | nDPIsrvd.c | 176 |
1 files changed, 84 insertions, 92 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 5aecb0bc3..f1c450443 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -383,6 +383,8 @@ int main(int argc, char ** argv) if (current->type == JSON_SOCK) { shutdown(current->fd, SHUT_WR); // collector + } else { + shutdown(current->fd, SHUT_RD); // descriptor } /* setup epoll event */ @@ -413,7 +415,8 @@ int main(int argc, char ** argv) disconnect_client(epollfd, current); continue; } - if (events[i].events & EPOLLIN) + + if (events[i].events & EPOLLIN && current->type == JSON_SOCK) { /* read JSON strings (or parts) from the UNIX socket (collecting) */ errno = 0; @@ -433,120 +436,109 @@ int main(int argc, char ** argv) continue; } - /* broadcast data coming from the json-collector socket to all tcp clients */ - if (current->type == JSON_SOCK) + current->buf.used += bytes_read; + while (current->event_json.json_bytes == 0 && + current->buf.used >= nDPIsrvd_JSON_BYTES + 1) { - /* buffer all data until we got the whole JSON string */ - current->buf.used += bytes_read; - if (current->event_json.json_bytes == 0) + if (current->buf.ptr[nDPIsrvd_JSON_BYTES] != '{') { - char * json_str_start = NULL; - errno = 0; - /* the first bytes are the textual representation of the following JSON string */ - current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10); - current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr; - if (errno == ERANGE) - { - current->buf.used = 0; - current->event_json.json_bytes = 0; - syslog(LOG_DAEMON | LOG_ERR, "Size of JSON exceeds limit"); - continue; - } - if ((uint8_t *)json_str_start == current->buf.ptr) - { - syslog(LOG_DAEMON | LOG_ERR, - "Missing size before JSON string: %.*s", - (int)current->buf.used, - current->buf.ptr); - current->buf.used = 0; - current->event_json.json_bytes = 0; - continue; - } - if (current->event_json.json_bytes > current->buf.max) - { - syslog(LOG_DAEMON | LOG_ERR, - "BUG: JSON string too big: %llu > %zu", - current->event_json.json_bytes, - current->buf.max); - current->buf.used = 0; - current->event_json.json_bytes = 0; - continue; - } + syslog(LOG_DAEMON | LOG_ERR, "BUG: JSON invalid opening character: '%c'", + current->buf.ptr[nDPIsrvd_JSON_BYTES]); + disconnect_client(epollfd, current); + break; + } + + char * json_str_start = NULL; + current->event_json.json_bytes = strtoull((char *)current->buf.ptr, &json_str_start, 10); + current->event_json.json_bytes += (uint8_t *)json_str_start - current->buf.ptr; + + if (errno == ERANGE) + { + syslog(LOG_DAEMON | LOG_ERR, "BUG: Size of JSON exceeds limit"); + disconnect_client(epollfd, current); + break; + } + if ((uint8_t *)json_str_start == current->buf.ptr) + { + syslog(LOG_DAEMON | LOG_ERR, + "BUG: Missing size before JSON string: \"%.*s\"", + nDPIsrvd_JSON_BYTES, current->buf.ptr); + disconnect_client(epollfd, current); + break; + } + if (current->event_json.json_bytes > current->buf.max) + { + syslog(LOG_DAEMON | LOG_ERR, + "BUG: JSON string too big: %llu > %zu", + current->event_json.json_bytes, + current->buf.max); + disconnect_client(epollfd, current); + break; } - /* buffered enough data (full JSON String) ? */ if (current->event_json.json_bytes > current->buf.used) { - continue; + break; } - /* after buffering complete, last character should always be a '}' (end of object) */ + if (current->buf.ptr[current->event_json.json_bytes - 1] != '}') { syslog(LOG_DAEMON | LOG_ERR, - "Invalid JSON string: %.*s", + "BUG: Invalid JSON string: %.*s", (int)current->event_json.json_bytes, current->buf.ptr); - current->buf.used = 0; - current->event_json.json_bytes = 0; - continue; + disconnect_client(epollfd, current); + break; } - /* the essence: broadcast buffered JSON string to all connected TCP clients (distribution) */ for (size_t i = 0; i < remotes.desc_size; ++i) { if (remotes.desc[i].fd < 0) { continue; } - if (remotes.desc[i].type == SERV_SOCK) + if (remotes.desc[i].type != SERV_SOCK) + { + continue; + } + if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used) { - ssize_t bytes_written; + continue; + } + + memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used, + current->buf.ptr, current->event_json.json_bytes); + remotes.desc[i].buf.used += current->event_json.json_bytes; - if (remotes.desc[i].buf.used > 0) { - bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, + ssize_t bytes_written = write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used); - if (bytes_written > 0) { - memmove(remotes.desc[i].buf.ptr, - remotes.desc[i].buf.ptr + remotes.desc[i].event_json.json_bytes, - remotes.desc[i].buf.used - remotes.desc[i].event_json.json_bytes); - remotes.desc[i].buf.used -= remotes.desc[i].event_json.json_bytes; - } - } - - bytes_written = write(remotes.desc[i].fd, current->buf.ptr, - current->event_json.json_bytes); - if (errno == EAGAIN) - { - if ((unsigned long long int)bytes_written < current->event_json.json_bytes) { - if (remotes.desc[i].buf.max - remotes.desc[i].buf.used < current->event_json.json_bytes) - { - syslog(LOG_DAEMON | LOG_ERR, "Distributor write buffer bloat, no more space available."); - disconnect_client(epollfd, &remotes.desc[i]); - continue; - } - syslog(LOG_DAEMON | LOG_ERR, "Distributor write temporarily failed, buffering .."); - memcpy(remotes.desc[i].buf.ptr, - remotes.desc[i].buf.ptr + remotes.desc[i].buf.used, - current->event_json.json_bytes); - remotes.desc[i].buf.used += current->event_json.json_bytes; - } - continue; - } - if (bytes_written < 0 || errno != 0) - { - syslog(LOG_DAEMON | LOG_ERR, - "Distributor connection closed, send failed: %s", - strerror(errno)); - disconnect_client(epollfd, &remotes.desc[i]); - continue; - } - if (bytes_written == 0) - { - syslog(LOG_DAEMON, - "Distributor connection closed during write"); - disconnect_client(epollfd, &remotes.desc[i]); - continue; - } + if (errno == EAGAIN) { + continue; + } + if (bytes_written < 0 || errno != 0) + { + syslog(LOG_DAEMON | LOG_ERR, + "Distributor connection closed, send failed: %s", + strerror(errno)); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } + if (bytes_written == 0) + { + syslog(LOG_DAEMON, + "Distributor connection closed during write"); + disconnect_client(epollfd, &remotes.desc[i]); + continue; } + if ((size_t)bytes_written != remotes.desc[i].buf.used) + { + syslog(LOG_DAEMON, + "Distributor connection wrote less bytes than expected: %zd < %zu", + bytes_written, remotes.desc[i].buf.used); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } + + remotes.desc[i].buf.used = 0; } memmove(current->buf.ptr, |