diff options
author | lns <matzeton@googlemail.com> | 2022-08-29 14:34:01 +0200 |
---|---|---|
committer | lns <matzeton@googlemail.com> | 2022-08-29 15:29:07 +0200 |
commit | d646ec5ab477165cf49f7f69a8ad9df587b9c79a (patch) | |
tree | a4af3822295229b4e7230922c7fd7d33c5dd8c10 /nDPId.c | |
parent | dea30501a496a2ddc906b7843bf78067fe375b01 (diff) |
nDPId: Fixed fcntl() issue; invalid fcntl() set after a blocking-write.
* nDPId: imrpvoed collector socket error messages on connect/write/etc failures
* reverted `netcat` parts of the README
Signed-off-by: lns <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r-- | nDPId.c | 151 |
1 files changed, 102 insertions, 49 deletions
@@ -247,7 +247,7 @@ struct nDPId_reader_thread struct nDPId_workflow * workflow; pthread_t thread_id; int collector_sockfd; - int collector_sock_reconnect; + int collector_sock_last_errno; size_t array_index; }; @@ -504,6 +504,44 @@ static void jsonize_flow_detection_event(struct nDPId_reader_thread * const read struct nDPId_flow * const flow, enum flow_event event); +static int set_collector_nonblock(struct nDPId_reader_thread * const reader_thread) +{ + int current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0); + + if (current_flags == -1 || fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags | O_NONBLOCK) == -1) + { + reader_thread->collector_sock_last_errno = errno; + logger(1, + "[%8llu, %zu] Could not set collector fd %d to non-blocking mode: %s", + reader_thread->workflow->packets_processed, + reader_thread->thread_id, + reader_thread->collector_sockfd, + strerror(errno)); + return 1; + } + + return 0; +} + +static int set_collector_block(struct nDPId_reader_thread * const reader_thread) +{ + int current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0); + + if (current_flags == -1 || fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags & ~O_NONBLOCK) == -1) + { + reader_thread->collector_sock_last_errno = errno; + logger(1, + "[%8llu, %zu] Could not set collector fd %d to blocking mode: %s", + reader_thread->workflow->packets_processed, + reader_thread->thread_id, + reader_thread->collector_sockfd, + strerror(errno)); + return 1; + } + + return 0; +} + #ifdef ENABLE_ZLIB static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen) { @@ -1942,36 +1980,34 @@ static int connect_to_collector(struct nDPId_reader_thread * const reader_thread reader_thread->collector_sockfd = socket(collector_address.raw.sa_family, sock_type | SOCK_CLOEXEC, 0); if (reader_thread->collector_sockfd < 0) { - reader_thread->collector_sock_reconnect = 1; + reader_thread->collector_sock_last_errno = errno; return 1; } - int opt = NETWORK_BUFFER_MAX_SIZE * 16; + int opt = NETWORK_BUFFER_MAX_SIZE; if (setsockopt(reader_thread->collector_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0) { return 1; } - if (connect(reader_thread->collector_sockfd, &collector_address.raw, collector_address.size) < 0) + if (set_collector_nonblock(reader_thread) != 0) { - reader_thread->collector_sock_reconnect = 1; return 1; } - if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0) + if (connect(reader_thread->collector_sockfd, &collector_address.raw, collector_address.size) < 0) { + reader_thread->collector_sock_last_errno = errno; return 1; } - if (fcntl(reader_thread->collector_sockfd, - F_SETFL, - fcntl(reader_thread->collector_sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) + if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0) { - reader_thread->collector_sock_reconnect = 1; + reader_thread->collector_sock_last_errno = errno; return 1; } - reader_thread->collector_sock_reconnect = 0; + reader_thread->collector_sock_last_errno = 0; return 0; } @@ -2003,26 +2039,46 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, return; } - if (reader_thread->collector_sock_reconnect != 0) + if (reader_thread->collector_sock_last_errno != 0) { + saved_errno = reader_thread->collector_sock_last_errno; + if (connect_to_collector(reader_thread) == 0) { - logger(1, - "[%8llu, %zu] Reconnected to nDPIsrvd Collector at %s", - workflow->packets_captured, - reader_thread->array_index, - nDPId_options.collector_address); - jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT); + if (collector_address.raw.sa_family == AF_UNIX) + { + logger(1, + "[%8llu, %zu] Reconnected to nDPIsrvd Collector at %s", + workflow->packets_captured, + reader_thread->array_index, + nDPId_options.collector_address); + jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT); + } + } + else + { + if (saved_errno != reader_thread->collector_sock_last_errno) + { + logger(1, + "[%8llu, %zu] Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s", + workflow->packets_captured, + reader_thread->array_index, + nDPId_options.collector_address, + (reader_thread->collector_sock_last_errno != 0 + ? strerror(reader_thread->collector_sock_last_errno) + : "Internal Error.")); + } + return; } } errno = 0; ssize_t written; - if (reader_thread->collector_sock_reconnect == 0 && + if (reader_thread->collector_sock_last_errno == 0 && (written = write(reader_thread->collector_sockfd, newline_json_str, s_ret)) != s_ret) { saved_errno = errno; - if (saved_errno == EPIPE) + if (saved_errno == EPIPE || written == 0) { logger(1, "[%8llu, %zu] Lost connection to nDPIsrvd Collector", @@ -2040,21 +2096,32 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, (collector_address.raw.sa_family == AF_UNIX ? "Connection" : "Datagram"), nDPId_options.collector_address); } - if (collector_address.raw.sa_family == AF_UNIX) - { - reader_thread->collector_sock_reconnect = 1; - } + reader_thread->collector_sock_last_errno = saved_errno; } - else + else if (collector_address.raw.sa_family == AF_UNIX) { - fcntl(reader_thread->collector_sockfd, - F_SETFL, - fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK); off_t pos = (written < 0 ? 0 : written); + logger(0, + "[%8llu, %zu] Send less data then expected (%zd < %d bytes), falling back to blocking I/O", + workflow->packets_captured, + reader_thread->array_index, + pos, + s_ret); + set_collector_block(reader_thread); while ((written = write(reader_thread->collector_sockfd, newline_json_str + pos, s_ret - pos)) != s_ret - pos) { - if (written < 0) + saved_errno = errno; + if (saved_errno == EPIPE || written == 0) + { + logger(1, + "[%8llu, %zu] Lost connection to nDPIsrvd Collector", + workflow->packets_captured, + reader_thread->array_index); + reader_thread->collector_sock_last_errno = saved_errno; + break; + } + else if (written < 0) { logger(1, "[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector at %s failed: %s", @@ -2062,10 +2129,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, reader_thread->array_index, nDPId_options.collector_address, strerror(saved_errno)); - if (collector_address.raw.sa_family == AF_UNIX) - { - reader_thread->collector_sock_reconnect = 1; - } + reader_thread->collector_sock_last_errno = saved_errno; break; } else @@ -2073,9 +2137,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, pos += written; } } - fcntl(reader_thread->collector_sockfd, - F_SETFL, - fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & O_NONBLOCK); + set_collector_nonblock(reader_thread); } } } @@ -4031,16 +4093,15 @@ static void * processing_thread(void * const ndpi_thread_arg) struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)ndpi_thread_arg; reader_thread->collector_sockfd = -1; - reader_thread->collector_sock_reconnect = 1; - errno = 0; if (connect_to_collector(reader_thread) != 0) { logger(1, "Thread %zu: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s", reader_thread->array_index, nDPId_options.collector_address, - (errno != 0 ? strerror(errno) : "Internal Error.")); + (reader_thread->collector_sock_last_errno != 0 ? strerror(reader_thread->collector_sock_last_errno) + : "Internal Error.")); } else { @@ -4048,7 +4109,7 @@ static void * processing_thread(void * const ndpi_thread_arg) } run_pcap_loop(reader_thread); - fcntl(reader_thread->collector_sockfd, F_SETFL, fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK); + set_collector_block(reader_thread); __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); return NULL; } @@ -4171,15 +4232,7 @@ static void process_remaining_flows(void) { for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { - if (fcntl(reader_threads[i].collector_sockfd, - F_SETFL, - fcntl(reader_threads[i].collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK) == -1) - { - logger(1, - "Could not set JSON fd %d to blocking mode for shutdown: %s", - reader_threads[i].collector_sockfd, - strerror(errno)); - } + set_collector_block(&reader_threads[i]); for (size_t idle_scan_index = 0; idle_scan_index < reader_threads[i].workflow->max_active_flows; ++idle_scan_index) |