diff options
-rw-r--r-- | nDPId.c | 28 | ||||
-rw-r--r-- | nDPIsrvd.c | 36 |
2 files changed, 57 insertions, 7 deletions
@@ -793,8 +793,8 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread, #if nDPIsrvd_JSON_BYTES != 4 #error "Please do not forget to change the format string if you've changed the value of nDPIsrvd_JSON_BYTES." #endif - s_ret = - snprintf(newline_json_str, sizeof(newline_json_str), "%04zu%.*s\n", json_str_len + 1, (int)json_str_len, json_str); + s_ret = snprintf( + newline_json_str, sizeof(newline_json_str), "%04zu%.*s\n", json_str_len + 1, (int)json_str_len, json_str); if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str)) { syslog(LOG_DAEMON | LOG_ERR, @@ -2334,12 +2334,20 @@ static int validate_options(char const * const arg0) if (max_flows_per_thread < 128 || max_flows_per_thread > nDPId_MAX_FLOWS_PER_THREAD) { - fprintf(stderr, "%s: Value not in range: 128 < max-flows-per-thread[%llu] < %d\n", arg0, max_flows_per_thread, nDPId_MAX_FLOWS_PER_THREAD); + fprintf(stderr, + "%s: Value not in range: 128 < max-flows-per-thread[%llu] < %d\n", + arg0, + max_flows_per_thread, + nDPId_MAX_FLOWS_PER_THREAD); retval = 1; } if (max_idle_flows_per_thread < 64 || max_idle_flows_per_thread > nDPId_MAX_IDLE_FLOWS_PER_THREAD) { - fprintf(stderr, "%s: Value not in range: 64 < max-idle-flows-per-thread[%llu] < %d\n", arg0, max_idle_flows_per_thread, nDPId_MAX_IDLE_FLOWS_PER_THREAD); + fprintf(stderr, + "%s: Value not in range: 64 < max-idle-flows-per-thread[%llu] < %d\n", + arg0, + max_idle_flows_per_thread, + nDPId_MAX_IDLE_FLOWS_PER_THREAD); retval = 1; } if (tick_resolution < 1) @@ -2349,7 +2357,11 @@ static int validate_options(char const * const arg0) } if (reader_thread_count < 1 || reader_thread_count > nDPId_MAX_READER_THREADS) { - fprintf(stderr, "%s: Value not in range: 1 < reader-thread-count[%llu] < %d\n", arg0, reader_thread_count, nDPId_MAX_READER_THREADS); + fprintf(stderr, + "%s: Value not in range: 1 < reader-thread-count[%llu] < %d\n", + arg0, + reader_thread_count, + nDPId_MAX_READER_THREADS); retval = 1; } if (idle_scan_period < 1000) @@ -2364,7 +2376,11 @@ static int validate_options(char const * const arg0) } if (max_post_end_flow_time > max_idle_time) { - fprintf(stderr, "%s: Value not in range: max-post-end-flow-time[%llu] < max_idle_time[%llu]\n", arg0, max_post_end_flow_time, max_idle_time); + fprintf(stderr, + "%s: Value not in range: max-post-end-flow-time[%llu] < max_idle_time[%llu]\n", + arg0, + max_post_end_flow_time, + max_idle_time); retval = 1; } diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 5bbfd9763..379cfc20a 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -643,7 +643,41 @@ int main(int argc, char ** argv) } if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used) { - continue; + syslog(LOG_DAEMON | LOG_ERR, + "Buffer capacity threshold (%zu of max %zu bytes) reached, " + "falling back to blocking mode.", + remotes.desc[i].buf.used, + remotes.desc[i].buf.max); + /* + * FIXME: Maybe switch to a Multithreading distributor data transmission, + * so that we do not have to switch back to blocking mode here! + * NOTE: If *one* distributer peer is too slow, all other distributors are + * affected by this. This causes starvation and leads to a possible data loss on + * the nDPId collector side. + */ + int fd_flags = fcntl(remotes.desc[i].fd, F_GETFL, 0); + if (fd_flags == -1 || fcntl(remotes.desc[i].fd, F_SETFL, fd_flags & ~O_NONBLOCK) == -1) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno)); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } + if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used) != + (ssize_t)remotes.desc[i].buf.used) + { + syslog(LOG_DAEMON | LOG_ERR, + "Could not drain buffer by %zu bytes. (forced)", + remotes.desc[i].buf.used); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } + remotes.desc[i].buf.used = 0; + if (fcntl(remotes.desc[i].fd, F_SETFL, fd_flags) == -1) + { + syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno)); + disconnect_client(epollfd, &remotes.desc[i]); + continue; + } } memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used, |