aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2022-09-04 17:26:21 +0200
committerToni Uhlig <matzeton@googlemail.com>2022-09-04 17:26:21 +0200
commit70f517b0403d7f2504fb18aeae8633365782f03b (patch)
treebd29812a77e12c31cf3c696c12d16d231062dfc8
parentdcf78ad3ed5b758c5be2c4e401b042912340a916 (diff)
parentd646ec5ab477165cf49f7f69a8ad9df587b9c79a (diff)
Merge branch 'main' of github.com:utoni/nDPId
-rw-r--r--CMakeLists.txt41
-rw-r--r--README.md100
-rw-r--r--nDPId-test.c2
-rw-r--r--nDPId.c198
4 files changed, 235 insertions, 106 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index cfc00ac07..e44e223eb 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -160,6 +160,10 @@ endif()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra")
set(NDPID_DEFS -DJSMN_STATIC=1 -DJSMN_STRICT=1 -DJSMN_PARENT_LINKS=1)
+set(NDPID_DEPS_INC "${CMAKE_SOURCE_DIR}"
+ "${CMAKE_SOURCE_DIR}/dependencies"
+ "${CMAKE_SOURCE_DIR}/dependencies/jsmn"
+ "${CMAKE_SOURCE_DIR}/dependencies/uthash/src")
if(ENABLE_MEMORY_PROFILING)
message(WARNING "ENABLE_MEMORY_PROFILING should not be used in production environments.")
add_definitions("-DENABLE_MEMORY_PROFILING=1"
@@ -250,29 +254,21 @@ find_package(PCAP "1.8.1" REQUIRED)
target_compile_options(nDPId PRIVATE "-pthread")
target_compile_definitions(nDPId PRIVATE -D_GNU_SOURCE=1 -DGIT_VERSION=\"${GIT_VERSION}\" ${NDPID_DEFS} ${ZLIB_DEFS})
-target_include_directories(nDPId PRIVATE "${STATIC_LIBNDPI_INC}" "${DEFAULT_NDPI_INCLUDE}")
+target_include_directories(nDPId PRIVATE "${STATIC_LIBNDPI_INC}" "${DEFAULT_NDPI_INCLUDE}" ${NDPID_DEPS_INC})
target_link_libraries(nDPId "${STATIC_LIBNDPI_LIB}" "${pkgcfg_lib_NDPI_ndpi}"
"${pkgcfg_lib_PCRE_pcre}" "${pkgcfg_lib_MAXMINDDB_maxminddb}" "${pkgcfg_lib_ZLIB_z}"
"${GCRYPT_LIBRARY}" "${GCRYPT_ERROR_LIBRARY}" "${PCAP_LIBRARY}" "${LIBM_LIB}"
"-pthread")
target_compile_definitions(nDPIsrvd PRIVATE -D_GNU_SOURCE=1 -DGIT_VERSION=\"${GIT_VERSION}\" ${NDPID_DEFS})
-target_include_directories(nDPIsrvd PRIVATE
- "${CMAKE_SOURCE_DIR}"
- "${CMAKE_SOURCE_DIR}/dependencies"
- "${CMAKE_SOURCE_DIR}/dependencies/jsmn"
- "${CMAKE_SOURCE_DIR}/dependencies/uthash/src")
+target_include_directories(nDPIsrvd PRIVATE ${NDPID_DEPS_INC})
-target_include_directories(nDPId-test PRIVATE
- "${CMAKE_SOURCE_DIR}"
- "${CMAKE_SOURCE_DIR}/dependencies"
- "${CMAKE_SOURCE_DIR}/dependencies/jsmn"
- "${CMAKE_SOURCE_DIR}/dependencies/uthash/src")
+target_include_directories(nDPId-test PRIVATE ${NDPID_DEPS_INC})
target_compile_options(nDPId-test PRIVATE "-Wno-unused-function" "-pthread")
target_compile_definitions(nDPId-test PRIVATE -D_GNU_SOURCE=1 -DNO_MAIN=1 -DGIT_VERSION=\"${GIT_VERSION}\"
${NDPID_DEFS} ${ZLIB_DEFS} ${NDPID_TEST_MPROF_DEFS})
target_include_directories(nDPId-test PRIVATE
- "${STATIC_LIBNDPI_INC}" "${DEFAULT_NDPI_INCLUDE}" "${CMAKE_SOURCE_DIR}/dependencies/uthash/src")
+ "${STATIC_LIBNDPI_INC}" "${DEFAULT_NDPI_INCLUDE}" ${NDPID_DEPS_INC})
target_link_libraries(nDPId-test "${STATIC_LIBNDPI_LIB}" "${pkgcfg_lib_NDPI_ndpi}"
"${pkgcfg_lib_PCRE_pcre}" "${pkgcfg_lib_MAXMINDDB_maxminddb}" "${pkgcfg_lib_ZLIB_z}"
"${GCRYPT_LIBRARY}" "${GCRYPT_ERROR_LIBRARY}" "${PCAP_LIBRARY}" "${LIBM_LIB}"
@@ -281,11 +277,7 @@ target_link_libraries(nDPId-test "${STATIC_LIBNDPI_LIB}" "${pkgcfg_lib_NDPI_ndpi
if(BUILD_EXAMPLES)
add_executable(nDPIsrvd-collectd examples/c-collectd/c-collectd.c)
target_compile_definitions(nDPIsrvd-collectd PRIVATE ${NDPID_DEFS})
- target_include_directories(nDPIsrvd-collectd PRIVATE
- "${CMAKE_SOURCE_DIR}"
- "${CMAKE_SOURCE_DIR}/dependencies"
- "${CMAKE_SOURCE_DIR}/dependencies/jsmn"
- "${CMAKE_SOURCE_DIR}/dependencies/uthash/src")
+ target_include_directories(nDPIsrvd-collectd PRIVATE ${NDPID_DEPS_INC})
add_executable(nDPIsrvd-captured examples/c-captured/c-captured.c utils.c)
if(BUILD_NDPI)
@@ -293,27 +285,18 @@ if(BUILD_EXAMPLES)
endif()
target_compile_definitions(nDPIsrvd-captured PRIVATE ${NDPID_DEFS})
target_include_directories(nDPIsrvd-captured PRIVATE
- "${STATIC_LIBNDPI_INC}" "${DEFAULT_NDPI_INCLUDE}" "${CMAKE_SOURCE_DIR}"
- "${CMAKE_SOURCE_DIR}/dependencies"
- "${CMAKE_SOURCE_DIR}/dependencies/jsmn"
- "${CMAKE_SOURCE_DIR}/dependencies/uthash/src")
+ "${STATIC_LIBNDPI_INC}" "${DEFAULT_NDPI_INCLUDE}" "${CMAKE_SOURCE_DIR}" ${NDPID_DEPS_INC})
target_link_libraries(nDPIsrvd-captured "${pkgcfg_lib_NDPI_ndpi}"
"${pkgcfg_lib_PCRE_pcre}" "${pkgcfg_lib_MAXMINDDB_maxminddb}"
"${GCRYPT_LIBRARY}" "${GCRYPT_ERROR_LIBRARY}" "${PCAP_LIBRARY}")
add_executable(nDPIsrvd-json-dump examples/c-json-stdout/c-json-stdout.c)
target_compile_definitions(nDPIsrvd-json-dump PRIVATE ${NDPID_DEFS})
- target_include_directories(nDPIsrvd-json-dump PRIVATE
- "${CMAKE_SOURCE_DIR}"
- "${CMAKE_SOURCE_DIR}/dependencies/jsmn")
+ target_include_directories(nDPIsrvd-json-dump PRIVATE ${NDPID_DEPS_INC})
add_executable(nDPIsrvd-simple examples/c-simple/c-simple.c)
target_compile_definitions(nDPIsrvd-simple PRIVATE ${NDPID_DEFS})
- target_include_directories(nDPIsrvd-simple PRIVATE
- "${CMAKE_SOURCE_DIR}"
- "${CMAKE_SOURCE_DIR}/dependencies"
- "${CMAKE_SOURCE_DIR}/dependencies/jsmn"
- "${CMAKE_SOURCE_DIR}/dependencies/uthash/src")
+ target_include_directories(nDPIsrvd-simple PRIVATE ${NDPID_DEPS_INC})
target_link_libraries(nDPIsrvd-simple "${pkgcfg_lib_NDPI_ndpi}"
"${pkgcfg_lib_PCRE_pcre}" "${pkgcfg_lib_MAXMINDDB_maxminddb}"
"${GCRYPT_LIBRARY}" "${GCRYPT_ERROR_LIBRARY}" "${PCAP_LIBRARY}")
diff --git a/README.md b/README.md
index e5214427b..60975672f 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
[![Build](https://github.com/utoni/nDPId/actions/workflows/build.yml/badge.svg)](https://github.com/utoni/nDPId/actions/workflows/build.yml)
-[![Gitlab-CI](https://gitlab.com/utoni/nDPId/badges/master/pipeline.svg)](https://gitlab.com/utoni/nDPId/-/pipelines)
+[![Gitlab-CI](https://gitlab.com/utoni/nDPId/badges/main/pipeline.svg)](https://gitlab.com/utoni/nDPId/-/pipelines)
# Abstract
@@ -69,6 +69,60 @@ The full stream of `nDPId` generated JSON-events can be retrieved directly from
Technical details about JSON-messages format can be obtained from related `.schema` file included in the `schema` directory
+# Events
+
+`nDPId` generates JSON strings whereas each string is assigned to a certain event.
+Those events specify the contents (key-value-pairs) of the JSON string.
+They are divided into four categories, each with a number of events.
+
+Error Events: indicates that layer2 or layer3 packet processing failed or not enough flow memory available
+ 1. Unknown datalink layer packet
+ 2. Unknown L3 protocol
+ 3. Unsupported datalink layer
+ 4. Packet too short
+ 5. Unknown packet type
+ 6. Packet header invalid
+ 7. IP4 packet too short
+ 8. Packet smaller than IP4 header:
+ 9. nDPI IPv4/L4 payload detection failed
+ 10. IP6 packet too short
+ 11. Packet smaller than IP6 header
+ 12. nDPI IPv6/L4 payload detection failed
+ 13. TCP packet smaller than expected
+ 14. UDP packet smaller than expected
+ 15. Captured packet size is smaller than expected packet size
+ 16. Max flows to track reached
+ 17. Flow memory allocation failed
+
+Daemon Events: startup/shutdown or status events as well as a reconnect event if there was a previous connection failure (collector)
+ 1. init: `nDPId` startup
+ 2. reconnect: (UNIX) socket connection lost previously and was established again
+ 3. shutdown: `nDPId` terminates gracefully
+ 4. status: statistics about the daemon itself e.g. memory consumption, zLib compressions (if enabled)
+
+Packet Events: contains base64 encoded packet payload either belonging to a flow or not
+ 1. packet: does not belong to any flow
+ 2. packet-flow: does belong to a flow e.g. TCP/UDP or ICMP
+
+Flow Events: all events related to a flow
+ 1. new: a new TCP/UDP/ICMP flow seen which will be tracked
+ 2. end: a TCP connections terminates
+ 3. idle: a flow timed out, because there was no packet on the wire for a certain amount of time
+ 4. update: inform nDPIsrvd or other apps about a long-lasting flow, whose detection was finished a long time ago but is still active
+ 5. guessed: `libnDPI` was not able to reliable detect a layer7 protocol and falls back to IP/Port based detection
+ 6. detected: `libnDPI` sucessfully detected a layer7 protocol
+ 7. detection-update: `libnDPI` dissected more layer7 protocol data (after detection already done)
+ 8. not-detected: neither detected nor guessed
+
+
+# Flow States
+
+A flow can have three different states while it is been tracked by `nDPId`.
+
+1. skipped: the flow will be tracked, but no detection will happen to safe memory, see command line argument `-I` and `-E`
+2. finished: detection finished and the memory used for the detection is free'd
+3. info: detection is in progress and all flow memory required for `libnDPI` is allocated (this state consumes most memory)
+
# Build (CMake)
`nDPId` build system is based on [CMake](https://cmake.org/)
@@ -129,15 +183,28 @@ The CMake cache variable `-DBUILD_NDPI=ON` builds a version of `libnDPI` residin
As mentioned above, in order to run `nDPId` a UNIX-socket need to be provided in order to stream our related JSON-data.
-Such a UNIX-socket can be provided by both the included `nDPIsrvd` daemon, or, if you simply need a quick check, with the [ncat](https://nmap.org/book/ncat-man.html) utility, with a simple `ncat -U /tmp/listen.sock -l -k`
+Such a UNIX-socket can be provided by both the included `nDPIsrvd` daemon, or, if you simply need a quick check, with the [ncat](https://nmap.org/book/ncat-man.html) utility, with a simple `ncat -U /tmp/listen.sock -l -k`. Remember that OpenBSD `netcat` is not able to handle multiple connections reliably.
Once the socket is ready, you can run `nDPId` capturing and analyzing your own traffic, with something similar to:
-Of course, both `ncat` and `nDPId` need to point to the same UNIX-socket (`nDPId` provides the `-c` option, exactly for this. As a default, `nDPId` refer to `/tmp/ndpid-collector.sock`, and the same default-path is also used by `nDPIsrvd` as for the incoming socket)
+Of course, both `ncat` and `nDPId` need to point to the same UNIX-socket (`nDPId` provides the `-c` option, exactly for this. As a default, `nDPId` refer to `/tmp/ndpid-collector.sock`, and the same default-path is also used by `nDPIsrvd` as for the incoming socket).
You also need to provide `nDPId` some real-traffic. You can capture your own traffic, with something similar to:
- ./nDPId -c /tmp/listen.sock -i wlan0 -l
+```shell
+ncat -U /tmp/listen.sock -l -k
+#socat UNIX-Listen:/tmp/listen.sock,fork - # does the same as `ncat`
+sudo chown nobody:nobody /tmp/listen.sock # default `nDPId` user/group, see `-u` and `-g`
+sudo ./nDPId -c /tmp/listen.sock -l
+```
+
+`nDPId` supports also UDP collector endpoints:
+
+```shell
+ncat -u 127.0.0.1 7000 -l -k
+#socat UDP-Listen:7000,fork - # does the same as `ncat`
+sudo ./nDPId -c 127.0.0.1:7000 -l
+```
or you can generate a nDPId-compatible JSON dump with:
@@ -177,6 +244,31 @@ or
or anything below `./examples`.
+# nDPId tuning
+
+It is possible to change `nDPId` internals w/o recompiling by using `-o subopt=value`.
+But be careful: changing the default values may render `nDPId` useless and is not well tested.
+
+Suboptions for `-o`:
+
+Format: `subopt` (unit, comment): description
+
+ * `max-flows-per-thread` (N, caution advised): affects max. memory usage
+ * `max-idle-flows-per-thread` (N, safe): max. allowed idle flows which memory get's free'd after `flow-scan-interval`
+ * `tick-resolution` (ns, untested): timestamp resolution (applies to **all** timestamps!)
+ * `max-reader-threads` (N, safe): amount of packet processing threads, every thread can have a max. of `max-flows-per-thread` flows
+ * `daemon-status-interval` (ms, safe): specifies how often daemon event `status` will be generated
+ * `compression-scan-interval` (ms, untested): specifies how often `nDPId` should scan for inactive flows ready for compression
+ * `compression-flow-inactivity` (ms, untested): the earliest period of time that must elapse before `nDPId` may consider compressing a flow that did neither send nor receive any data
+ * `flow-scan-interval` (ms, safe): min. amount of time after which `nDPId` will scan for idle or long-lasting flows
+ * `generic-max-idle-time` (ms, untested): time after which a non TCP/UDP/ICMP flow will time out
+ * `icmp-max-idle-time` (ms, untested): time after which an ICMP flow will time out
+ * `udp-max-idle-time` (ms, caution advised): time after which an UDP flow will time out
+ * `tcp-max-idle-time` (ms, caution advised): time after which a TCP flow will time out
+ * `tcp-max-post-end-flow-time` (ms, caution advised): a TCP flow that received a FIN or RST will wait that amount of time before flow tracking will be stopped and the flow memory free'd
+ * `max-packets-per-flow-to-send` (N, safe): max. `packet-flow` events that will be generated for the first N packets of each flow
+ * `max-packets-per-flow-to-process` (N, caution advised): max. packets that will be processed by `libnDPI`
+
# test
The recommended way to run integration / diff tests:
diff --git a/nDPId-test.c b/nDPId-test.c
index 52f6804c8..da152f17f 100644
--- a/nDPId-test.c
+++ b/nDPId-test.c
@@ -793,7 +793,7 @@ static void * nDPId_mainloop_thread(void * const arg)
/* Replace nDPId JSON socket fd with the one in our pipe and hope that no socket specific code-path triggered. */
reader_threads[0].collector_sockfd = mock_pipefds[PIPE_nDPId];
- reader_threads[0].collector_sock_reconnect = 0;
+ reader_threads[0].collector_sock_last_errno = 0;
pthread_mutex_lock(&nDPId_start_mutex);
diff --git a/nDPId.c b/nDPId.c
index 32c5f528d..f57419de9 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -25,6 +25,7 @@
#endif
#include "config.h"
+#include "nDPIsrvd.h"
#include "utils.h"
#ifndef UNIX_PATH_MAX
@@ -246,7 +247,7 @@ struct nDPId_reader_thread
struct nDPId_workflow * workflow;
pthread_t thread_id;
int collector_sockfd;
- int collector_sock_reconnect;
+ int collector_sock_last_errno;
size_t array_index;
};
@@ -362,6 +363,7 @@ static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = {
};
static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {};
+static struct nDPIsrvd_address collector_address;
static volatile int nDPId_main_thread_shutdown = 0;
static volatile uint64_t global_flow_id = 1;
static int ip4_interface_avail = 0, ip6_interface_avail = 0;
@@ -396,7 +398,7 @@ static struct
char * custom_categories_file;
char * custom_ja3_file;
char * custom_sha1_file;
- char collector_sockpath[UNIX_PATH_MAX];
+ char collector_address[UNIX_PATH_MAX];
#ifdef ENABLE_ZLIB
uint8_t enable_zlib_compression;
#endif
@@ -424,7 +426,7 @@ static struct
unsigned long long int max_packets_per_flow_to_process;
} nDPId_options = {.pidfile = nDPId_PIDFILE,
.user = "nobody",
- .collector_sockpath = COLLECTOR_UNIX_SOCKET,
+ .collector_address = COLLECTOR_UNIX_SOCKET,
.max_flows_per_thread = nDPId_MAX_FLOWS_PER_THREAD / 2,
.max_idle_flows_per_thread = nDPId_MAX_IDLE_FLOWS_PER_THREAD / 2,
.tick_resolution = nDPId_TICK_RESOLUTION,
@@ -502,6 +504,44 @@ static void jsonize_flow_detection_event(struct nDPId_reader_thread * const read
struct nDPId_flow * const flow,
enum flow_event event);
+static int set_collector_nonblock(struct nDPId_reader_thread * const reader_thread)
+{
+ int current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0);
+
+ if (current_flags == -1 || fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags | O_NONBLOCK) == -1)
+ {
+ reader_thread->collector_sock_last_errno = errno;
+ logger(1,
+ "[%8llu, %zu] Could not set collector fd %d to non-blocking mode: %s",
+ reader_thread->workflow->packets_processed,
+ reader_thread->thread_id,
+ reader_thread->collector_sockfd,
+ strerror(errno));
+ return 1;
+ }
+
+ return 0;
+}
+
+static int set_collector_block(struct nDPId_reader_thread * const reader_thread)
+{
+ int current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0);
+
+ if (current_flags == -1 || fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags & ~O_NONBLOCK) == -1)
+ {
+ reader_thread->collector_sock_last_errno = errno;
+ logger(1,
+ "[%8llu, %zu] Could not set collector fd %d to blocking mode: %s",
+ reader_thread->workflow->packets_processed,
+ reader_thread->thread_id,
+ reader_thread->collector_sockfd,
+ strerror(errno));
+ return 1;
+ }
+
+ return 0;
+}
+
#ifdef ENABLE_ZLIB
static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen)
{
@@ -1931,53 +1971,43 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl
static int connect_to_collector(struct nDPId_reader_thread * const reader_thread)
{
- struct sockaddr_un saddr;
-
if (reader_thread->collector_sockfd >= 0)
{
close(reader_thread->collector_sockfd);
}
- reader_thread->collector_sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
+ int sock_type = (collector_address.raw.sa_family == AF_UNIX ? SOCK_STREAM : SOCK_DGRAM);
+ reader_thread->collector_sockfd = socket(collector_address.raw.sa_family, sock_type | SOCK_CLOEXEC, 0);
if (reader_thread->collector_sockfd < 0)
{
- reader_thread->collector_sock_reconnect = 1;
+ reader_thread->collector_sock_last_errno = errno;
return 1;
}
- int opt = NETWORK_BUFFER_MAX_SIZE * 16;
+ int opt = NETWORK_BUFFER_MAX_SIZE;
if (setsockopt(reader_thread->collector_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0)
{
return 1;
}
- saddr.sun_family = AF_UNIX;
- int written = snprintf(saddr.sun_path, sizeof(saddr.sun_path), "%s", nDPId_options.collector_sockpath);
- if (written < 0)
+ if (set_collector_nonblock(reader_thread) != 0)
{
return 1;
}
- if (connect(reader_thread->collector_sockfd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0)
+ if (connect(reader_thread->collector_sockfd, &collector_address.raw, collector_address.size) < 0)
{
- reader_thread->collector_sock_reconnect = 1;
+ reader_thread->collector_sock_last_errno = errno;
return 1;
}
if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0)
{
+ reader_thread->collector_sock_last_errno = errno;
return 1;
}
- if (fcntl(reader_thread->collector_sockfd,
- F_SETFL,
- fcntl(reader_thread->collector_sockfd, F_GETFL, 0) | O_NONBLOCK) == -1)
- {
- reader_thread->collector_sock_reconnect = 1;
- return 1;
- }
-
- reader_thread->collector_sock_reconnect = 0;
+ reader_thread->collector_sock_last_errno = 0;
return 0;
}
@@ -2009,25 +2039,46 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
return;
}
- if (reader_thread->collector_sock_reconnect != 0)
+ if (reader_thread->collector_sock_last_errno != 0)
{
+ saved_errno = reader_thread->collector_sock_last_errno;
+
if (connect_to_collector(reader_thread) == 0)
{
- logger(1,
- "[%8llu, %zu] Reconnected to nDPIsrvd Collector",
- workflow->packets_captured,
- reader_thread->array_index);
- jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT);
+ if (collector_address.raw.sa_family == AF_UNIX)
+ {
+ logger(1,
+ "[%8llu, %zu] Reconnected to nDPIsrvd Collector at %s",
+ workflow->packets_captured,
+ reader_thread->array_index,
+ nDPId_options.collector_address);
+ jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT);
+ }
+ }
+ else
+ {
+ if (saved_errno != reader_thread->collector_sock_last_errno)
+ {
+ logger(1,
+ "[%8llu, %zu] Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s",
+ workflow->packets_captured,
+ reader_thread->array_index,
+ nDPId_options.collector_address,
+ (reader_thread->collector_sock_last_errno != 0
+ ? strerror(reader_thread->collector_sock_last_errno)
+ : "Internal Error."));
+ }
+ return;
}
}
errno = 0;
ssize_t written;
- if (reader_thread->collector_sock_reconnect == 0 &&
+ if (reader_thread->collector_sock_last_errno == 0 &&
(written = write(reader_thread->collector_sockfd, newline_json_str, s_ret)) != s_ret)
{
saved_errno = errno;
- if (saved_errno == EPIPE)
+ if (saved_errno == EPIPE || written == 0)
{
logger(1,
"[%8llu, %zu] Lost connection to nDPIsrvd Collector",
@@ -2036,25 +2087,49 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
}
if (saved_errno != EAGAIN)
{
- reader_thread->collector_sock_reconnect = 1;
+ if (saved_errno == ECONNREFUSED)
+ {
+ logger(1,
+ "[%8llu, %zu] %s to %s refused by endpoint",
+ workflow->packets_captured,
+ reader_thread->array_index,
+ (collector_address.raw.sa_family == AF_UNIX ? "Connection" : "Datagram"),
+ nDPId_options.collector_address);
+ }
+ reader_thread->collector_sock_last_errno = saved_errno;
}
- else
+ else if (collector_address.raw.sa_family == AF_UNIX)
{
- fcntl(reader_thread->collector_sockfd,
- F_SETFL,
- fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
off_t pos = (written < 0 ? 0 : written);
+ logger(0,
+ "[%8llu, %zu] Send less data then expected (%zd < %d bytes), falling back to blocking I/O",
+ workflow->packets_captured,
+ reader_thread->array_index,
+ pos,
+ s_ret);
+ set_collector_block(reader_thread);
while ((written = write(reader_thread->collector_sockfd, newline_json_str + pos, s_ret - pos)) !=
s_ret - pos)
{
- if (written < 0)
+ saved_errno = errno;
+ if (saved_errno == EPIPE || written == 0)
{
logger(1,
- "[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector failed: %s",
+ "[%8llu, %zu] Lost connection to nDPIsrvd Collector",
+ workflow->packets_captured,
+ reader_thread->array_index);
+ reader_thread->collector_sock_last_errno = saved_errno;
+ break;
+ }
+ else if (written < 0)
+ {
+ logger(1,
+ "[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector at %s failed: %s",
workflow->packets_captured,
reader_thread->array_index,
+ nDPId_options.collector_address,
strerror(saved_errno));
- reader_thread->collector_sock_reconnect = 1;
+ reader_thread->collector_sock_last_errno = saved_errno;
break;
}
else
@@ -2062,9 +2137,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread,
pos += written;
}
}
- fcntl(reader_thread->collector_sockfd,
- F_SETFL,
- fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & O_NONBLOCK);
+ set_collector_nonblock(reader_thread);
}
}
}
@@ -4020,16 +4093,15 @@ static void * processing_thread(void * const ndpi_thread_arg)
struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)ndpi_thread_arg;
reader_thread->collector_sockfd = -1;
- reader_thread->collector_sock_reconnect = 1;
- errno = 0;
if (connect_to_collector(reader_thread) != 0)
{
logger(1,
"Thread %zu: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s",
reader_thread->array_index,
- nDPId_options.collector_sockpath,
- (errno != 0 ? strerror(errno) : "Internal Error."));
+ nDPId_options.collector_address,
+ (reader_thread->collector_sock_last_errno != 0 ? strerror(reader_thread->collector_sock_last_errno)
+ : "Internal Error."));
}
else
{
@@ -4037,7 +4109,7 @@ static void * processing_thread(void * const ndpi_thread_arg)
}
run_pcap_loop(reader_thread);
- fcntl(reader_thread->collector_sockfd, F_SETFL, fcntl(reader_thread->collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
+ set_collector_block(reader_thread);
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
return NULL;
}
@@ -4160,15 +4232,7 @@ static void process_remaining_flows(void)
{
for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i)
{
- if (fcntl(reader_threads[i].collector_sockfd,
- F_SETFL,
- fcntl(reader_threads[i].collector_sockfd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
- {
- logger(1,
- "Could not set JSON fd %d to blocking mode for shutdown: %s",
- reader_threads[i].collector_sockfd,
- strerror(errno));
- }
+ set_collector_block(&reader_threads[i]);
for (size_t idle_scan_index = 0; idle_scan_index < reader_threads[i].workflow->max_active_flows;
++idle_scan_index)
@@ -4377,7 +4441,7 @@ static int nDPId_parse_options(int argc, char ** argv)
"Usage: %s "
"[-i pcap-file/interface] [-I] [-E] [-B bpf-filter]\n"
"\t \t"
- "[-l] [-L logfile] [-c path-to-unix-sock] "
+ "[-l] [-L logfile] [-c address] "
"[-d] [-p pidfile]\n"
"\t \t"
"[-u user] [-g group] "
@@ -4394,7 +4458,7 @@ static int nDPId_parse_options(int argc, char ** argv)
"\t-B\tSet an optional PCAP filter string. (BPF format)\n"
"\t-l\tLog all messages to stderr.\n"
"\t-L\tLog all messages to a log file.\n"
- "\t-c\tPath to the UNIX socket (nDPIsrvd Collector).\n"
+ "\t-c\tPath to a UNIX socket (nDPIsrvd Collector) or a custom UDP endpoint.\n"
"\t-d\tForking into background after initialization.\n"
"\t-p\tWrite the daemon PID to the given file path.\n"
"\t-u\tChange UID to the numeric value of user.\n"
@@ -4417,7 +4481,7 @@ static int nDPId_parse_options(int argc, char ** argv)
"\t-v\tversion\n"
"\t-h\tthis\n\n";
- while ((opt = getopt(argc, argv, "hi:IEB:lL:c:dp:u:g:P:C:J:S:a:zo:vh")) != -1)
+ while ((opt = getopt(argc, argv, "i:IEB:lL:c:dp:u:g:P:C:J:S:a:zo:vh")) != -1)
{
switch (opt)
{
@@ -4443,8 +4507,8 @@ static int nDPId_parse_options(int argc, char ** argv)
}
break;
case 'c':
- strncpy(nDPId_options.collector_sockpath, optarg, sizeof(nDPId_options.collector_sockpath) - 1);
- nDPId_options.collector_sockpath[sizeof(nDPId_options.collector_sockpath) - 1] = '\0';
+ strncpy(nDPId_options.collector_address, optarg, sizeof(nDPId_options.collector_address) - 1);
+ nDPId_options.collector_address[sizeof(nDPId_options.collector_address) - 1] = '\0';
break;
case 'd':
daemonize_enable();
@@ -4627,20 +4691,10 @@ static int validate_options(void)
}
}
#endif
- if (is_path_absolute("Collector socket", nDPId_options.collector_sockpath) != 0)
+ if (nDPIsrvd_setup_address(&collector_address, nDPId_options.collector_address) != 0)
{
retval = 1;
- }
- {
- struct sockaddr_un saddr;
- if (strlen(nDPId_options.collector_sockpath) >= sizeof(saddr.sun_path))
- {
- logger_early(1,
- "Collector socket path too long, current/max: %zu/%zu",
- strlen(nDPId_options.collector_sockpath),
- sizeof(saddr.sun_path) - 1);
- retval = 1;
- }
+ logger_early(1, "Collector socket invalid address: %s.", nDPId_options.collector_address);
}
if (nDPId_options.instance_alias == NULL)
{