summaryrefslogtreecommitdiff
path: root/nDPId.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-11-16 15:53:29 +0100
committerToni Uhlig <matzeton@googlemail.com>2021-11-16 15:59:38 +0100
commit25b974af67d98fa19e0953194347a1260d684ef5 (patch)
treecb63248fc29f87fd6b793a4aae82ed35e1084ba8 /nDPId.c
parentd389f04135354bb67ca7495bf97790e817e570e4 (diff)
Use blocking I/O to prevent data loss if nDPIsrvd too slow.
* Fixed MemoryProfiler stack overflow. Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r--nDPId.c58
1 files changed, 43 insertions, 15 deletions
diff --git a/nDPId.c b/nDPId.c
index 60cb5c190..c58d43dd5 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -39,6 +39,10 @@
#error "Compare and Swap aka __sync_fetch_and_add not available on your platform!"
#endif
+#if nDPId_MAX_READER_THREADS < 0
+#error "Invalid value for nDPId_MAX_READER_THREADS"
+#endif
+
enum nDPId_l3_type
{
L3_IP,
@@ -1039,6 +1043,7 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F
{
case FT_UNKNOWN:
case FT_SKIPPED:
+ written = 0;
break;
case FT_FINISHED:
@@ -1046,11 +1051,14 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F
struct nDPId_flow_finished const * const flow_finished = (struct nDPId_flow_finished const *)flow_basic;
#if 1
- written = snprintf(output + output_used, BUFSIZ, "%u,", flow_finished->flow_info.flow_extended.flow_id);
+ written = snprintf(output + output_used,
+ BUFSIZ - output_used,
+ "%u,",
+ flow_finished->flow_info.flow_extended.flow_id);
#else
written =
snprintf(output + output_used,
- BUFSIZ,
+ BUFSIZ - output_used,
"[%u, %u, %llu],",
flow_finished->flow_info.flow_extended.flow_id,
flow_finished->flow_info.flow_extended.flow_basic.l4_protocol,
@@ -1064,10 +1072,10 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F
struct nDPId_flow_info const * const flow_info = (struct nDPId_flow_info const *)flow_basic;
#if 1
- written = snprintf(output + output_used, BUFSIZ, "%u,", flow_info->flow_extended.flow_id);
+ written = snprintf(output + output_used, BUFSIZ - output_used, "%u,", flow_info->flow_extended.flow_id);
#else
written = snprintf(output + output_used,
- BUFSIZ,
+ BUFSIZ - output_used,
"[%u, %u, %llu],",
flow_info->flow_extended.flow_id,
flow_info->flow_extended.flow_basic.l4_protocol,
@@ -1081,7 +1089,14 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F
{
break;
}
- output_used += written;
+ else
+ {
+ output_used += written;
+ if (output_used >= BUFSIZ)
+ {
+ break;
+ }
+ }
}
return (output_used > 0 ? output_used - 1 : 0);
@@ -1899,14 +1914,11 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
}
errno = 0;
- if (reader_thread->json_sock_reconnect == 0 && write(reader_thread->json_sockfd, newline_json_str, s_ret) != s_ret)
+ ssize_t written;
+ if (reader_thread->json_sock_reconnect == 0 &&
+ (written = write(reader_thread->json_sockfd, newline_json_str, s_ret)) != s_ret)
{
saved_errno = errno;
- syslog(LOG_DAEMON | LOG_ERR,
- "[%8llu, %d] Send data to nDPIsrvd Collector failed: %s",
- workflow->packets_captured,
- reader_thread->array_index,
- strerror(saved_errno));
if (saved_errno == EPIPE)
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -1920,10 +1932,26 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
}
else
{
- syslog(LOG_DAEMON | LOG_ERR,
- "[%8llu, %d] Possible data loss detected",
- workflow->packets_captured,
- reader_thread->array_index);
+ fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
+ off_t pos = (written < 0 ? 0 : written);
+ while ((written = write(reader_thread->json_sockfd, newline_json_str + pos, s_ret - pos)) != s_ret - pos)
+ {
+ if (written < 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "[%8llu, %d] Send data (blocking I/O) to nDPIsrvd Collector failed: %s",
+ workflow->packets_captured,
+ reader_thread->array_index,
+ strerror(saved_errno));
+ reader_thread->json_sock_reconnect = 1;
+ break;
+ }
+ else
+ {
+ pos += written;
+ }
+ }
+ fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & O_NONBLOCK);
}
}
}