aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--nDPId.c28
-rw-r--r--nDPIsrvd.c36
2 files changed, 57 insertions, 7 deletions
diff --git a/nDPId.c b/nDPId.c
index 2dfeb6c39..34826254d 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -793,8 +793,8 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
#if nDPIsrvd_JSON_BYTES != 4
#error "Please do not forget to change the format string if you've changed the value of nDPIsrvd_JSON_BYTES."
#endif
- s_ret =
- snprintf(newline_json_str, sizeof(newline_json_str), "%04zu%.*s\n", json_str_len + 1, (int)json_str_len, json_str);
+ s_ret = snprintf(
+ newline_json_str, sizeof(newline_json_str), "%04zu%.*s\n", json_str_len + 1, (int)json_str_len, json_str);
if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str))
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -2334,12 +2334,20 @@ static int validate_options(char const * const arg0)
if (max_flows_per_thread < 128 || max_flows_per_thread > nDPId_MAX_FLOWS_PER_THREAD)
{
- fprintf(stderr, "%s: Value not in range: 128 < max-flows-per-thread[%llu] < %d\n", arg0, max_flows_per_thread, nDPId_MAX_FLOWS_PER_THREAD);
+ fprintf(stderr,
+ "%s: Value not in range: 128 < max-flows-per-thread[%llu] < %d\n",
+ arg0,
+ max_flows_per_thread,
+ nDPId_MAX_FLOWS_PER_THREAD);
retval = 1;
}
if (max_idle_flows_per_thread < 64 || max_idle_flows_per_thread > nDPId_MAX_IDLE_FLOWS_PER_THREAD)
{
- fprintf(stderr, "%s: Value not in range: 64 < max-idle-flows-per-thread[%llu] < %d\n", arg0, max_idle_flows_per_thread, nDPId_MAX_IDLE_FLOWS_PER_THREAD);
+ fprintf(stderr,
+ "%s: Value not in range: 64 < max-idle-flows-per-thread[%llu] < %d\n",
+ arg0,
+ max_idle_flows_per_thread,
+ nDPId_MAX_IDLE_FLOWS_PER_THREAD);
retval = 1;
}
if (tick_resolution < 1)
@@ -2349,7 +2357,11 @@ static int validate_options(char const * const arg0)
}
if (reader_thread_count < 1 || reader_thread_count > nDPId_MAX_READER_THREADS)
{
- fprintf(stderr, "%s: Value not in range: 1 < reader-thread-count[%llu] < %d\n", arg0, reader_thread_count, nDPId_MAX_READER_THREADS);
+ fprintf(stderr,
+ "%s: Value not in range: 1 < reader-thread-count[%llu] < %d\n",
+ arg0,
+ reader_thread_count,
+ nDPId_MAX_READER_THREADS);
retval = 1;
}
if (idle_scan_period < 1000)
@@ -2364,7 +2376,11 @@ static int validate_options(char const * const arg0)
}
if (max_post_end_flow_time > max_idle_time)
{
- fprintf(stderr, "%s: Value not in range: max-post-end-flow-time[%llu] < max_idle_time[%llu]\n", arg0, max_post_end_flow_time, max_idle_time);
+ fprintf(stderr,
+ "%s: Value not in range: max-post-end-flow-time[%llu] < max_idle_time[%llu]\n",
+ arg0,
+ max_post_end_flow_time,
+ max_idle_time);
retval = 1;
}
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 5bbfd9763..379cfc20a 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -643,7 +643,41 @@ int main(int argc, char ** argv)
}
if (current->event_json.json_bytes > remotes.desc[i].buf.max - remotes.desc[i].buf.used)
{
- continue;
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Buffer capacity threshold (%zu of max %zu bytes) reached, "
+ "falling back to blocking mode.",
+ remotes.desc[i].buf.used,
+ remotes.desc[i].buf.max);
+ /*
+ * FIXME: Maybe switch to a Multithreading distributor data transmission,
+ * so that we do not have to switch back to blocking mode here!
+ * NOTE: If *one* distributer peer is too slow, all other distributors are
+ * affected by this. This causes starvation and leads to a possible data loss on
+ * the nDPId collector side.
+ */
+ int fd_flags = fcntl(remotes.desc[i].fd, F_GETFL, 0);
+ if (fd_flags == -1 || fcntl(remotes.desc[i].fd, F_SETFL, fd_flags & ~O_NONBLOCK) == -1)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ if (write(remotes.desc[i].fd, remotes.desc[i].buf.ptr, remotes.desc[i].buf.used) !=
+ (ssize_t)remotes.desc[i].buf.used)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Could not drain buffer by %zu bytes. (forced)",
+ remotes.desc[i].buf.used);
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
+ remotes.desc[i].buf.used = 0;
+ if (fcntl(remotes.desc[i].fd, F_SETFL, fd_flags) == -1)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Error setting fd flags: %s", strerror(errno));
+ disconnect_client(epollfd, &remotes.desc[i]);
+ continue;
+ }
}
memcpy(remotes.desc[i].buf.ptr + remotes.desc[i].buf.used,