summaryrefslogtreecommitdiff
path: root/nDPId.c
diff options
context:
space:
mode:
authorlns <matzeton@googlemail.com>2022-08-29 14:34:01 +0200
committerlns <matzeton@googlemail.com>2022-08-29 15:29:07 +0200
commitd646ec5ab477165cf49f7f69a8ad9df587b9c79a (patch)
treea4af3822295229b4e7230922c7fd7d33c5dd8c10 /nDPId.c
parentdea30501a496a2ddc906b7843bf78067fe375b01 (diff)
nDPId: Fixed fcntl() issue; invalid fcntl() set after a blocking-write.
* nDPId: imrpvoed collector socket error messages on connect/write/etc failures * reverted `netcat` parts of the README Signed-off-by: lns <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r--nDPId.c151
1 files changed, 102 insertions, 49 deletions
diff --git a/nDPId.c b/nDPId.c
index 75f6aacf2..f57419de9 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -247,7 +247,7 @@ struct nDPId_reader_thread
struct nDPId_workflow * workflow;
pthread_t thread_id;
int collector_sockfd;
- int collector_sock_reconnect;
+ int collector_sock_last_errno;
size_t array_index;
};
@@ -504,6 +504,44 @@ static void jsonize_flow_detection_event(struct nDPId_reader_thread * const read
struct nDPId_flow * const flow,
enum flow_event event);
+static int set_collector_nonblock(struct nDPId_reader_thread * const reader_thread)
+{
+ int current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0);
+
+ if (current_flags == -1 || fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags | O_NONBLOCK) == -1)
+ {
+ reader_thread->collector_sock_last_errno = errno;
+ logger(1,
+ "[%8llu, %zu] Could not set collector fd %d to non-blocking mode: %s",
+ reader_thread->workflow->packets_processed,
+ reader_thread->thread_id,
+ reader_thread->collector_sockfd,
+ strerror(errno));
+ return 1;
+ }
+
+ return 0;
+}
+
+static int set_collector_block(struct nDPId_reader_thread * const reader_thread)
+{
+ int current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0);
+
+ if (current_flags == -1 || fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags & ~O_NONBLOCK) == -1)
+ {
+ reader_thread->collector_sock_last_errno = errno;
+ logger(1,
+ "[%8llu, %zu] Could not set collector fd %d to blocking mode: %s",
+ reader_thread->workflow->packets_processed,
+ reader_thread->thread_id,
+ reader_thread->collector_sockfd,
+ strerror(errno));
+ return 1;
+ }
+
+ return 0;
+}
+
#ifdef ENABLE_ZLIB
static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen)
{
@@ -1942,36 +1980,34 @@ static int connect_to_collector(struct nDPId_reader_thread * const reader_thread
reader_thread->collector_sockfd = socket(collector_address.raw.sa_family, sock_type | SOCK_CLOEXEC, 0);
if (reader_thread->collector_sockfd < 0)
{
- reader_thread->collector_sock_reconnect = 1;
+ reader_thread->collector_sock_last_errno = errno;
return 1;
}
- int opt = NETWORK_BUFFER_MAX_SIZE * 16;
+ int opt = NETWORK_BUFFER_MAX_SIZE;
if (setsockopt(reader_thread->collector_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0)
{
return 1;
}
- if (connect(reader_thread->collector_sockfd, &collector_address.raw, collector_address.size) < 0)
+ if (set_collector_nonblock(reader_thread) != 0)
{
- reader_thread->collector_sock_reconnect = 1;
return 1;
}
- if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0)
+ if (connect(reader_thread->collector_sockfd, &collector_address.raw, collector_address.size) < 0)
{
+ reader_thread->collector_sock_last_errno = errno;
return 1;
}
- if (fcntl(reader_thread->collector_sockfd,
- F_SETFL,
- fcntl(reader_thread->collector_sockfd, F_GETFL, 0) | O_NONBLOCK) == -1)
+ if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0)
{
- reader_thread->collector_sock_reconnect = 1;
+ reader_thread->collector_sock_last_errno = errno;
return 1;
}
- reader_thread->collector_sock_reconnect = 0;
+ reader_thread->collector_sock_last_errno = 0;
return 0;
}
@@ -2003,26 +2039,46 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
return;
}
- if (reader_thread->collector_sock_reconnect != 0)
+ if (reader_thread->collector_sock_last_errno != 0)
{
+ saved_errno = reader_thread->collector_sock_last_errno;
+
if (connect_to_collector(reader_thread) == 0)
{
- logger(1,
- "[%8llu, %zu] Reconnected to nDPIsrvd Collector at %s",
- workflow->packets_captured,
- reader_thread->array_index,
- nDPId_options.collector_address);
- jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT);
+ if (collector_address.raw.sa_family == AF_UNIX)
+ {
+ logger(1,
+ "[%8llu, %zu] Reconnected to nDPIsrvd Collector at %s",
+ workflow->packets_captured,
+ reader_thread->array_index,
+ nDPId_options.collector_address);
+ jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT);
+ }
+ }
+ else
+ {
+ if (saved_errno != reader_thread->collector_sock_last_errno)
+ {
+ logger(1,
+ "[%8llu, %zu] Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s",
+ workflow->packets_captured,
+ reader_thread->array_index,
+ nDPId_options.collector_address,
+ (reader_thread->collector_sock_last_errno != 0
+ ? strerror(reader_thread->collector_sock_last_errno)
+ : "Internal Error."));
+ }
+ return;
}
}
errno = 0;
ssize_t written;
- if (reader_thread->collector_sock_reconnect == 0 &&
+ if (reader_thread->collector_sock_last_errno == 0 &&
(written = write(reader_thread->collector_sockfd, newline_json_str, s_ret)) != s_ret)
{
saved_errno = errno;
- if (saved_errno == EPIPE)
+ if (saved_errno == EPIPE || written == 0)
{
logger(1,
"[%8llu, %zu] Lost connection to nDPIsrvd Collector",
@@ -2040,21 +2096,32 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
(collector_address.raw.sa_family == AF_UNIX ? "Connection" : "Datagram"),
nDPId_options.collector_address);
}
- if (collector_address.raw.sa_family == AF_UNIX)
- {
- reader_thread->collector_sock_reconnect = 1;
- }
+ reader_thread->collector_sock_last_errno = saved_errno;
}
- else
+ else if (collector_address.raw.sa_family == AF_UNIX)
{
- fcntl(reader_thread->collector_sockfd,
- F_SETFL,
- fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
off_t pos = (written < 0 ? 0 : written);
+ logger(0,
+ "[%8llu, %zu] Send less data then expected (%zd < %d bytes), falling back to blocking I/O",
+ workflow->packets_captured,
+ reader_thread->array_index,
+ pos,
+ s_ret);
+ set_collector_block(reader_thread);
while ((written = write(reader_thread->collector_sockfd, newline_json_str + pos, s_ret - pos)) !=
s_ret - pos)
{
- if (written < 0)
+ saved_errno = errno;
+ if (saved_errno == EPIPE || written == 0)
+ {
+ logger(1,
+ "[%8llu, %zu] Lost connection to nDPIsrvd Collector",
+ workflow->packets_captured,
+ reader_thread->array_index);
+ reader_thread->collector_sock_last_errno = saved_errno;
+ break;
+ }
+ else if (written < 0)
{
logger(1,
"[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector at %s failed: %s",
@@ -2062,10 +2129,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
reader_thread->array_index,
nDPId_options.collector_address,
strerror(saved_errno));
- if (collector_address.raw.sa_family == AF_UNIX)
- {
- reader_thread->collector_sock_reconnect = 1;
- }
+ reader_thread->collector_sock_last_errno = saved_errno;
break;
}
else
@@ -2073,9 +2137,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
pos += written;
}
}
- fcntl(reader_thread->collector_sockfd,
- F_SETFL,
- fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & O_NONBLOCK);
+ set_collector_nonblock(reader_thread);
}
}
}
@@ -4031,16 +4093,15 @@ static void * processing_thread(void * const ndpi_thread_arg)
struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)ndpi_thread_arg;
reader_thread->collector_sockfd = -1;
- reader_thread->collector_sock_reconnect = 1;
- errno = 0;
if (connect_to_collector(reader_thread) != 0)
{
logger(1,
"Thread %zu: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s",
reader_thread->array_index,
nDPId_options.collector_address,
- (errno != 0 ? strerror(errno) : "Internal Error."));
+ (reader_thread->collector_sock_last_errno != 0 ? strerror(reader_thread->collector_sock_last_errno)
+ : "Internal Error."));
}
else
{
@@ -4048,7 +4109,7 @@ static void * processing_thread(void * const ndpi_thread_arg)
}
run_pcap_loop(reader_thread);
- fcntl(reader_thread->collector_sockfd, F_SETFL, fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
+ set_collector_block(reader_thread);
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
return NULL;
}
@@ -4171,15 +4232,7 @@ static void process_remaining_flows(void)
{
for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i)
{
- if (fcntl(reader_threads[i].collector_sockfd,
- F_SETFL,
- fcntl(reader_threads[i].collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
- {
- logger(1,
- "Could not set JSON fd %d to blocking mode for shutdown: %s",
- reader_threads[i].collector_sockfd,
- strerror(errno));
- }
+ set_collector_block(&reader_threads[i]);
for (size_t idle_scan_index = 0; idle_scan_index < reader_threads[i].workflow->max_active_flows;
++idle_scan_index)