From e2e7c82d7fd2f4ace540ea6992f41abea77b9422 Mon Sep 17 00:00:00 2001 From: Toni Uhlig Date: Mon, 15 Aug 2022 11:07:13 +0200 Subject: nDPId: support for custom UDP endpoints Signed-off-by: Toni Uhlig --- nDPId.c | 52 ++++++++++++++++++---------------------------------- 1 file changed, 18 insertions(+), 34 deletions(-) (limited to 'nDPId.c') diff --git a/nDPId.c b/nDPId.c index 32c5f528d..ebef20500 100644 --- a/nDPId.c +++ b/nDPId.c @@ -25,6 +25,7 @@ #endif #include "config.h" +#include "nDPIsrvd.h" #include "utils.h" #ifndef UNIX_PATH_MAX @@ -362,6 +363,7 @@ static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = { }; static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {}; +static struct nDPIsrvd_address collector_address; static volatile int nDPId_main_thread_shutdown = 0; static volatile uint64_t global_flow_id = 1; static int ip4_interface_avail = 0, ip6_interface_avail = 0; @@ -396,7 +398,7 @@ static struct char * custom_categories_file; char * custom_ja3_file; char * custom_sha1_file; - char collector_sockpath[UNIX_PATH_MAX]; + char collector_address[UNIX_PATH_MAX]; #ifdef ENABLE_ZLIB uint8_t enable_zlib_compression; #endif @@ -424,7 +426,7 @@ static struct unsigned long long int max_packets_per_flow_to_process; } nDPId_options = {.pidfile = nDPId_PIDFILE, .user = "nobody", - .collector_sockpath = COLLECTOR_UNIX_SOCKET, + .collector_address = COLLECTOR_UNIX_SOCKET, .max_flows_per_thread = nDPId_MAX_FLOWS_PER_THREAD / 2, .max_idle_flows_per_thread = nDPId_MAX_IDLE_FLOWS_PER_THREAD / 2, .tick_resolution = nDPId_TICK_RESOLUTION, @@ -1931,14 +1933,13 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl static int connect_to_collector(struct nDPId_reader_thread * const reader_thread) { - struct sockaddr_un saddr; - if (reader_thread->collector_sockfd >= 0) { close(reader_thread->collector_sockfd); } - reader_thread->collector_sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); + int sock_type = (collector_address.raw.sa_family == AF_UNIX ? SOCK_STREAM : SOCK_DGRAM); + reader_thread->collector_sockfd = socket(collector_address.raw.sa_family, sock_type | SOCK_CLOEXEC, 0); if (reader_thread->collector_sockfd < 0) { reader_thread->collector_sock_reconnect = 1; @@ -1951,14 +1952,7 @@ static int connect_to_collector(struct nDPId_reader_thread * const reader_thread return 1; } - saddr.sun_family = AF_UNIX; - int written = snprintf(saddr.sun_path, sizeof(saddr.sun_path), "%s", nDPId_options.collector_sockpath); - if (written < 0) - { - return 1; - } - - if (connect(reader_thread->collector_sockfd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) + if (connect(reader_thread->collector_sockfd, &collector_address.raw, collector_address.size) < 0) { reader_thread->collector_sock_reconnect = 1; return 1; @@ -2748,7 +2742,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre return 1; } - struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); *layer3_type = ntohs(chdlc->proto_code); break; @@ -2770,7 +2764,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre if (packet[0] == 0x0f || packet[0] == 0x8f) { - struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); /* CHDLC_OFF = 4 */ *layer3_type = ntohs(chdlc->proto_code); } @@ -2808,7 +2802,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre } struct ndpi_radiotap_header const * const radiotap = - (struct ndpi_radiotap_header const * const)&packet[eth_offset]; + (struct ndpi_radiotap_header const * const) & packet[eth_offset]; uint16_t radio_len = radiotap->len; /* Check Bad FCS presence */ @@ -4028,7 +4022,7 @@ static void * processing_thread(void * const ndpi_thread_arg) logger(1, "Thread %zu: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s", reader_thread->array_index, - nDPId_options.collector_sockpath, + nDPId_options.collector_address, (errno != 0 ? strerror(errno) : "Internal Error.")); } else @@ -4377,7 +4371,7 @@ static int nDPId_parse_options(int argc, char ** argv) "Usage: %s " "[-i pcap-file/interface] [-I] [-E] [-B bpf-filter]\n" "\t \t" - "[-l] [-L logfile] [-c path-to-unix-sock] " + "[-l] [-L logfile] [-c address] " "[-d] [-p pidfile]\n" "\t \t" "[-u user] [-g group] " @@ -4394,7 +4388,7 @@ static int nDPId_parse_options(int argc, char ** argv) "\t-B\tSet an optional PCAP filter string. (BPF format)\n" "\t-l\tLog all messages to stderr.\n" "\t-L\tLog all messages to a log file.\n" - "\t-c\tPath to the UNIX socket (nDPIsrvd Collector).\n" + "\t-c\tPath to a UNIX socket (nDPIsrvd Collector) or a custom UDP endpoint.\n" "\t-d\tForking into background after initialization.\n" "\t-p\tWrite the daemon PID to the given file path.\n" "\t-u\tChange UID to the numeric value of user.\n" @@ -4417,7 +4411,7 @@ static int nDPId_parse_options(int argc, char ** argv) "\t-v\tversion\n" "\t-h\tthis\n\n"; - while ((opt = getopt(argc, argv, "hi:IEB:lL:c:dp:u:g:P:C:J:S:a:zo:vh")) != -1) + while ((opt = getopt(argc, argv, "i:IEB:lL:c:dp:u:g:P:C:J:S:a:zo:vh")) != -1) { switch (opt) { @@ -4443,8 +4437,8 @@ static int nDPId_parse_options(int argc, char ** argv) } break; case 'c': - strncpy(nDPId_options.collector_sockpath, optarg, sizeof(nDPId_options.collector_sockpath) - 1); - nDPId_options.collector_sockpath[sizeof(nDPId_options.collector_sockpath) - 1] = '\0'; + strncpy(nDPId_options.collector_address, optarg, sizeof(nDPId_options.collector_address) - 1); + nDPId_options.collector_address[sizeof(nDPId_options.collector_address) - 1] = '\0'; break; case 'd': daemonize_enable(); @@ -4627,20 +4621,10 @@ static int validate_options(void) } } #endif - if (is_path_absolute("Collector socket", nDPId_options.collector_sockpath) != 0) + if (nDPIsrvd_setup_address(&collector_address, nDPId_options.collector_address) != 0) { retval = 1; - } - { - struct sockaddr_un saddr; - if (strlen(nDPId_options.collector_sockpath) >= sizeof(saddr.sun_path)) - { - logger_early(1, - "Collector socket path too long, current/max: %zu/%zu", - strlen(nDPId_options.collector_sockpath), - sizeof(saddr.sun_path) - 1); - retval = 1; - } + logger_early(1, "Collector socket invalid address: %s.", nDPId_options.collector_address); } if (nDPId_options.instance_alias == NULL) { -- cgit v1.2.3 From d0b0a5060915cce41d24ac2128b227cd7f85fdf1 Mon Sep 17 00:00:00 2001 From: lns Date: Sun, 21 Aug 2022 19:04:16 +0200 Subject: nDPId: improved error messages if UNIX/UDP endpoint refuses connections/datagrams Signed-off-by: lns --- nDPId.c | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) (limited to 'nDPId.c') diff --git a/nDPId.c b/nDPId.c index ebef20500..75f6aacf2 100644 --- a/nDPId.c +++ b/nDPId.c @@ -2008,9 +2008,10 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, if (connect_to_collector(reader_thread) == 0) { logger(1, - "[%8llu, %zu] Reconnected to nDPIsrvd Collector", + "[%8llu, %zu] Reconnected to nDPIsrvd Collector at %s", workflow->packets_captured, - reader_thread->array_index); + reader_thread->array_index, + nDPId_options.collector_address); jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT); } } @@ -2030,7 +2031,19 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, } if (saved_errno != EAGAIN) { - reader_thread->collector_sock_reconnect = 1; + if (saved_errno == ECONNREFUSED) + { + logger(1, + "[%8llu, %zu] %s to %s refused by endpoint", + workflow->packets_captured, + reader_thread->array_index, + (collector_address.raw.sa_family == AF_UNIX ? "Connection" : "Datagram"), + nDPId_options.collector_address); + } + if (collector_address.raw.sa_family == AF_UNIX) + { + reader_thread->collector_sock_reconnect = 1; + } } else { @@ -2044,11 +2057,15 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, if (written < 0) { logger(1, - "[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector failed: %s", + "[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector at %s failed: %s", workflow->packets_captured, reader_thread->array_index, + nDPId_options.collector_address, strerror(saved_errno)); - reader_thread->collector_sock_reconnect = 1; + if (collector_address.raw.sa_family == AF_UNIX) + { + reader_thread->collector_sock_reconnect = 1; + } break; } else @@ -2742,7 +2759,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre return 1; } - struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset]; + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); *layer3_type = ntohs(chdlc->proto_code); break; @@ -2764,7 +2781,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre if (packet[0] == 0x0f || packet[0] == 0x8f) { - struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset]; + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); /* CHDLC_OFF = 4 */ *layer3_type = ntohs(chdlc->proto_code); } @@ -2802,7 +2819,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre } struct ndpi_radiotap_header const * const radiotap = - (struct ndpi_radiotap_header const * const) & packet[eth_offset]; + (struct ndpi_radiotap_header const * const)&packet[eth_offset]; uint16_t radio_len = radiotap->len; /* Check Bad FCS presence */ -- cgit v1.2.3 From 5e09a00062d52cae0da529c8d61352505d0b5eed Mon Sep 17 00:00:00 2001 From: Toni Uhlig Date: Mon, 15 Aug 2022 11:07:13 +0200 Subject: nDPId: support for custom UDP endpoints Signed-off-by: Toni Uhlig --- nDPId.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'nDPId.c') diff --git a/nDPId.c b/nDPId.c index 75f6aacf2..713b5a63e 100644 --- a/nDPId.c +++ b/nDPId.c @@ -2759,7 +2759,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre return 1; } - struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); *layer3_type = ntohs(chdlc->proto_code); break; @@ -2781,7 +2781,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre if (packet[0] == 0x0f || packet[0] == 0x8f) { - struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); /* CHDLC_OFF = 4 */ *layer3_type = ntohs(chdlc->proto_code); } @@ -2819,7 +2819,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre } struct ndpi_radiotap_header const * const radiotap = - (struct ndpi_radiotap_header const * const)&packet[eth_offset]; + (struct ndpi_radiotap_header const * const) & packet[eth_offset]; uint16_t radio_len = radiotap->len; /* Check Bad FCS presence */ -- cgit v1.2.3 From d9fadae71890c07584337cbbcbe756167b1474ed Mon Sep 17 00:00:00 2001 From: lns Date: Sun, 21 Aug 2022 19:04:16 +0200 Subject: nDPId: improved error messages if UNIX/UDP endpoint refuses connections/datagrams Signed-off-by: lns --- nDPId.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'nDPId.c') diff --git a/nDPId.c b/nDPId.c index 713b5a63e..75f6aacf2 100644 --- a/nDPId.c +++ b/nDPId.c @@ -2759,7 +2759,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre return 1; } - struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset]; + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); *layer3_type = ntohs(chdlc->proto_code); break; @@ -2781,7 +2781,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre if (packet[0] == 0x0f || packet[0] == 0x8f) { - struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset]; + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); /* CHDLC_OFF = 4 */ *layer3_type = ntohs(chdlc->proto_code); } @@ -2819,7 +2819,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre } struct ndpi_radiotap_header const * const radiotap = - (struct ndpi_radiotap_header const * const) & packet[eth_offset]; + (struct ndpi_radiotap_header const * const)&packet[eth_offset]; uint16_t radio_len = radiotap->len; /* Check Bad FCS presence */ -- cgit v1.2.3 From d646ec5ab477165cf49f7f69a8ad9df587b9c79a Mon Sep 17 00:00:00 2001 From: lns Date: Mon, 29 Aug 2022 14:34:01 +0200 Subject: nDPId: Fixed fcntl() issue; invalid fcntl() set after a blocking-write. * nDPId: imrpvoed collector socket error messages on connect/write/etc failures * reverted `netcat` parts of the README Signed-off-by: lns --- README.md | 12 +++-- nDPId-test.c | 2 +- nDPId.c | 151 ++++++++++++++++++++++++++++++++++++++++------------------- 3 files changed, 111 insertions(+), 54 deletions(-) (limited to 'nDPId.c') diff --git a/README.md b/README.md index 3e6b3e96b..60975672f 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ [![Build](https://github.com/utoni/nDPId/actions/workflows/build.yml/badge.svg)](https://github.com/utoni/nDPId/actions/workflows/build.yml) -[![Gitlab-CI](https://gitlab.com/utoni/nDPId/badges/master/pipeline.svg)](https://gitlab.com/utoni/nDPId/-/pipelines) +[![Gitlab-CI](https://gitlab.com/utoni/nDPId/badges/main/pipeline.svg)](https://gitlab.com/utoni/nDPId/-/pipelines) # Abstract @@ -183,22 +183,26 @@ The CMake cache variable `-DBUILD_NDPI=ON` builds a version of `libnDPI` residin As mentioned above, in order to run `nDPId` a UNIX-socket need to be provided in order to stream our related JSON-data. -Such a UNIX-socket can be provided by both the included `nDPIsrvd` daemon, or, if you simply need a quick check, with the `netcat` utility (preferably `openbsd-netcat`), with a simple `netcat -U /tmp/listen.sock -l -k`. +Such a UNIX-socket can be provided by both the included `nDPIsrvd` daemon, or, if you simply need a quick check, with the [ncat](https://nmap.org/book/ncat-man.html) utility, with a simple `ncat -U /tmp/listen.sock -l -k`. Remember that OpenBSD `netcat` is not able to handle multiple connections reliably. Once the socket is ready, you can run `nDPId` capturing and analyzing your own traffic, with something similar to: -Of course, both `netcat` and `nDPId` need to point to the same UNIX-socket (`nDPId` provides the `-c` option, exactly for this. As a default, `nDPId` refer to `/tmp/ndpid-collector.sock`, and the same default-path is also used by `nDPIsrvd` as for the incoming socket) +Of course, both `ncat` and `nDPId` need to point to the same UNIX-socket (`nDPId` provides the `-c` option, exactly for this. As a default, `nDPId` refer to `/tmp/ndpid-collector.sock`, and the same default-path is also used by `nDPIsrvd` as for the incoming socket). You also need to provide `nDPId` some real-traffic. You can capture your own traffic, with something similar to: ```shell +ncat -U /tmp/listen.sock -l -k +#socat UNIX-Listen:/tmp/listen.sock,fork - # does the same as `ncat` +sudo chown nobody:nobody /tmp/listen.sock # default `nDPId` user/group, see `-u` and `-g` sudo ./nDPId -c /tmp/listen.sock -l ``` `nDPId` supports also UDP collector endpoints: ```shell -netcat -u 127.0.0.1 7000 -l -k +ncat -u 127.0.0.1 7000 -l -k +#socat UDP-Listen:7000,fork - # does the same as `ncat` sudo ./nDPId -c 127.0.0.1:7000 -l ``` diff --git a/nDPId-test.c b/nDPId-test.c index 52f6804c8..da152f17f 100644 --- a/nDPId-test.c +++ b/nDPId-test.c @@ -793,7 +793,7 @@ static void * nDPId_mainloop_thread(void * const arg) /* Replace nDPId JSON socket fd with the one in our pipe and hope that no socket specific code-path triggered. */ reader_threads[0].collector_sockfd = mock_pipefds[PIPE_nDPId]; - reader_threads[0].collector_sock_reconnect = 0; + reader_threads[0].collector_sock_last_errno = 0; pthread_mutex_lock(&nDPId_start_mutex); diff --git a/nDPId.c b/nDPId.c index 75f6aacf2..f57419de9 100644 --- a/nDPId.c +++ b/nDPId.c @@ -247,7 +247,7 @@ struct nDPId_reader_thread struct nDPId_workflow * workflow; pthread_t thread_id; int collector_sockfd; - int collector_sock_reconnect; + int collector_sock_last_errno; size_t array_index; }; @@ -504,6 +504,44 @@ static void jsonize_flow_detection_event(struct nDPId_reader_thread * const read struct nDPId_flow * const flow, enum flow_event event); +static int set_collector_nonblock(struct nDPId_reader_thread * const reader_thread) +{ + int current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0); + + if (current_flags == -1 || fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags | O_NONBLOCK) == -1) + { + reader_thread->collector_sock_last_errno = errno; + logger(1, + "[%8llu, %zu] Could not set collector fd %d to non-blocking mode: %s", + reader_thread->workflow->packets_processed, + reader_thread->thread_id, + reader_thread->collector_sockfd, + strerror(errno)); + return 1; + } + + return 0; +} + +static int set_collector_block(struct nDPId_reader_thread * const reader_thread) +{ + int current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0); + + if (current_flags == -1 || fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags & ~O_NONBLOCK) == -1) + { + reader_thread->collector_sock_last_errno = errno; + logger(1, + "[%8llu, %zu] Could not set collector fd %d to blocking mode: %s", + reader_thread->workflow->packets_processed, + reader_thread->thread_id, + reader_thread->collector_sockfd, + strerror(errno)); + return 1; + } + + return 0; +} + #ifdef ENABLE_ZLIB static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen) { @@ -1942,36 +1980,34 @@ static int connect_to_collector(struct nDPId_reader_thread * const reader_thread reader_thread->collector_sockfd = socket(collector_address.raw.sa_family, sock_type | SOCK_CLOEXEC, 0); if (reader_thread->collector_sockfd < 0) { - reader_thread->collector_sock_reconnect = 1; + reader_thread->collector_sock_last_errno = errno; return 1; } - int opt = NETWORK_BUFFER_MAX_SIZE * 16; + int opt = NETWORK_BUFFER_MAX_SIZE; if (setsockopt(reader_thread->collector_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0) { return 1; } - if (connect(reader_thread->collector_sockfd, &collector_address.raw, collector_address.size) < 0) + if (set_collector_nonblock(reader_thread) != 0) { - reader_thread->collector_sock_reconnect = 1; return 1; } - if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0) + if (connect(reader_thread->collector_sockfd, &collector_address.raw, collector_address.size) < 0) { + reader_thread->collector_sock_last_errno = errno; return 1; } - if (fcntl(reader_thread->collector_sockfd, - F_SETFL, - fcntl(reader_thread->collector_sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) + if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0) { - reader_thread->collector_sock_reconnect = 1; + reader_thread->collector_sock_last_errno = errno; return 1; } - reader_thread->collector_sock_reconnect = 0; + reader_thread->collector_sock_last_errno = 0; return 0; } @@ -2003,26 +2039,46 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, return; } - if (reader_thread->collector_sock_reconnect != 0) + if (reader_thread->collector_sock_last_errno != 0) { + saved_errno = reader_thread->collector_sock_last_errno; + if (connect_to_collector(reader_thread) == 0) { - logger(1, - "[%8llu, %zu] Reconnected to nDPIsrvd Collector at %s", - workflow->packets_captured, - reader_thread->array_index, - nDPId_options.collector_address); - jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT); + if (collector_address.raw.sa_family == AF_UNIX) + { + logger(1, + "[%8llu, %zu] Reconnected to nDPIsrvd Collector at %s", + workflow->packets_captured, + reader_thread->array_index, + nDPId_options.collector_address); + jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT); + } + } + else + { + if (saved_errno != reader_thread->collector_sock_last_errno) + { + logger(1, + "[%8llu, %zu] Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s", + workflow->packets_captured, + reader_thread->array_index, + nDPId_options.collector_address, + (reader_thread->collector_sock_last_errno != 0 + ? strerror(reader_thread->collector_sock_last_errno) + : "Internal Error.")); + } + return; } } errno = 0; ssize_t written; - if (reader_thread->collector_sock_reconnect == 0 && + if (reader_thread->collector_sock_last_errno == 0 && (written = write(reader_thread->collector_sockfd, newline_json_str, s_ret)) != s_ret) { saved_errno = errno; - if (saved_errno == EPIPE) + if (saved_errno == EPIPE || written == 0) { logger(1, "[%8llu, %zu] Lost connection to nDPIsrvd Collector", @@ -2040,21 +2096,32 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, (collector_address.raw.sa_family == AF_UNIX ? "Connection" : "Datagram"), nDPId_options.collector_address); } - if (collector_address.raw.sa_family == AF_UNIX) - { - reader_thread->collector_sock_reconnect = 1; - } + reader_thread->collector_sock_last_errno = saved_errno; } - else + else if (collector_address.raw.sa_family == AF_UNIX) { - fcntl(reader_thread->collector_sockfd, - F_SETFL, - fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK); off_t pos = (written < 0 ? 0 : written); + logger(0, + "[%8llu, %zu] Send less data then expected (%zd < %d bytes), falling back to blocking I/O", + workflow->packets_captured, + reader_thread->array_index, + pos, + s_ret); + set_collector_block(reader_thread); while ((written = write(reader_thread->collector_sockfd, newline_json_str + pos, s_ret - pos)) != s_ret - pos) { - if (written < 0) + saved_errno = errno; + if (saved_errno == EPIPE || written == 0) + { + logger(1, + "[%8llu, %zu] Lost connection to nDPIsrvd Collector", + workflow->packets_captured, + reader_thread->array_index); + reader_thread->collector_sock_last_errno = saved_errno; + break; + } + else if (written < 0) { logger(1, "[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector at %s failed: %s", @@ -2062,10 +2129,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, reader_thread->array_index, nDPId_options.collector_address, strerror(saved_errno)); - if (collector_address.raw.sa_family == AF_UNIX) - { - reader_thread->collector_sock_reconnect = 1; - } + reader_thread->collector_sock_last_errno = saved_errno; break; } else @@ -2073,9 +2137,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, pos += written; } } - fcntl(reader_thread->collector_sockfd, - F_SETFL, - fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & O_NONBLOCK); + set_collector_nonblock(reader_thread); } } } @@ -4031,16 +4093,15 @@ static void * processing_thread(void * const ndpi_thread_arg) struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)ndpi_thread_arg; reader_thread->collector_sockfd = -1; - reader_thread->collector_sock_reconnect = 1; - errno = 0; if (connect_to_collector(reader_thread) != 0) { logger(1, "Thread %zu: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s", reader_thread->array_index, nDPId_options.collector_address, - (errno != 0 ? strerror(errno) : "Internal Error.")); + (reader_thread->collector_sock_last_errno != 0 ? strerror(reader_thread->collector_sock_last_errno) + : "Internal Error.")); } else { @@ -4048,7 +4109,7 @@ static void * processing_thread(void * const ndpi_thread_arg) } run_pcap_loop(reader_thread); - fcntl(reader_thread->collector_sockfd, F_SETFL, fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK); + set_collector_block(reader_thread); __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); return NULL; } @@ -4171,15 +4232,7 @@ static void process_remaining_flows(void) { for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { - if (fcntl(reader_threads[i].collector_sockfd, - F_SETFL, - fcntl(reader_threads[i].collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK) == -1) - { - logger(1, - "Could not set JSON fd %d to blocking mode for shutdown: %s", - reader_threads[i].collector_sockfd, - strerror(errno)); - } + set_collector_block(&reader_threads[i]); for (size_t idle_scan_index = 0; idle_scan_index < reader_threads[i].workflow->max_active_flows; ++idle_scan_index) -- cgit v1.2.3