aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-06-14 14:43:29 +0200
committerToni Uhlig <matzeton@googlemail.com>2021-06-14 15:33:29 +0200
commitfac7648326c6cea478b92872f7abb3f799961cfc (patch)
treed2ad3e1c3952f787b1b25259971b04f313d8e388
parent98b11f814f54bb23cdd58299e63dc49264e3b5bc (diff)
Support for zLib flow memory compression. Experimental.
Please use this feature only for testing purposes. It will change or be removed in the future. Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r--CMakeLists.txt48
-rw-r--r--nDPId.c320
2 files changed, 309 insertions, 59 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 46df61753..997c45162 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -6,6 +6,7 @@ find_package(PkgConfig REQUIRED)
option(ENABLE_SANITIZER "Enable ASAN/LSAN/UBSAN." OFF)
option(ENABLE_SANITIZER_THREAD "Enable TSAN (does not work together with ASAN)." OFF)
option(ENABLE_MEMORY_PROFILING "Enable dynamic memory tracking." OFF)
+option(ENABLE_ZLIB "Enable zlib support for nDPId (experimental)." OFF)
option(BUILD_EXAMPLES "Build C examples." ON)
option(BUILD_NDPI "Clone and build nDPI from github." OFF)
option(NDPI_NO_PKGCONFIG "Do not use pkgconfig to search for libnDPI." OFF)
@@ -57,16 +58,19 @@ if(GIT_VERSION STREQUAL "" OR NOT IS_DIRECTORY "${CMAKE_SOURCE_DIR}/.git")
endif()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra")
-set(NDPID_C_FLAGS -DJSMN_STATIC=1 -DJSMN_STRICT=1)
+set(NDPID_DEFS -DJSMN_STATIC=1 -DJSMN_STRICT=1)
if(ENABLE_MEMORY_PROFILING)
- set(MEMORY_PROFILING_CFLAGS "-DENABLE_MEMORY_PROFILING=1"
- "-Duthash_malloc=nDPIsrvd_uthash_malloc"
- "-Duthash_free=nDPIsrvd_uthash_free")
-else()
- set(MEMORY_PROFILING_CFLAGS "")
+ add_definitions("-DENABLE_MEMORY_PROFILING=1"
+ "-Duthash_malloc=nDPIsrvd_uthash_malloc"
+ "-Duthash_free=nDPIsrvd_uthash_free")
+endif()
+
+if(ENABLE_ZLIB)
+ set(ZLIB_DEFS "-DENABLE_ZLIB=1")
+ pkg_check_modules(ZLIB REQUIRED zlib)
endif()
-if(CMAKE_BUILD_TYPE STREQUAL "Debug")
+if(CMAKE_BUILD_TYPE STREQUAL "Debug" OR CMAKE_BUILD_TYPE STREQUAL "")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O0 -g3 -fno-omit-frame-pointer -fno-inline")
endif()
@@ -88,8 +92,12 @@ if(STATIC_LIBNDPI_INSTALLDIR STREQUAL "" AND BUILD_NDPI)
libnDPI
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/libnDPI
DOWNLOAD_COMMAND ""
- CONFIGURE_COMMAND env CROSS_COMPILE_TRIPLET=${CROSS_COMPILE_TRIPLET}
- MAKE_PROGRAM=make DEST_INSTALL=${CMAKE_BINARY_DIR}/libnDPI
+ CONFIGURE_COMMAND env
+ CFLAGS=${CMAKE_C_FLAGS}
+ LDFLAGS=${CMAKE_MODULE_LINKER_FLAGS}
+ CROSS_COMPILE_TRIPLET=${CROSS_COMPILE_TRIPLET}
+ MAKE_PROGRAM=make
+ DEST_INSTALL=${CMAKE_BINARY_DIR}/libnDPI
${CMAKE_CURRENT_SOURCE_DIR}/scripts/get-and-build-libndpi.sh
BUILD_BYPRODUCTS ${CMAKE_BINARY_DIR}/libnDPI/lib/libndpi.a
BUILD_COMMAND ""
@@ -142,14 +150,15 @@ else()
endif()
find_package(PCAP "1.8.1" REQUIRED)
-target_compile_options(nDPId PRIVATE ${NDPID_C_FLAGS} -DGIT_VERSION=\"${GIT_VERSION}\" ${MEMORY_PROFILING_CFLAGS} "-pthread")
+target_compile_options(nDPId PRIVATE "-pthread")
+target_compile_definitions(nDPId PRIVATE -DGIT_VERSION=\"${GIT_VERSION}\" ${NDPID_DEFS} ${ZLIB_DEFS})
target_include_directories(nDPId PRIVATE "${STATIC_LIBNDPI_INC}" "${NDPI_INCLUDEDIR}" "${NDPI_INCLUDEDIR}/ndpi")
target_link_libraries(nDPId "${STATIC_LIBNDPI_LIB}" "${pkgcfg_lib_NDPI_ndpi}"
- "${pkgcfg_lib_PCRE_pcre}" "${pkgcfg_lib_MAXMINDDB_maxminddb}"
+ "${pkgcfg_lib_PCRE_pcre}" "${pkgcfg_lib_MAXMINDDB_maxminddb}" "${pkgcfg_lib_ZLIB_z}"
"${GCRYPT_LIBRARY}" "${GCRYPT_ERROR_LIBRARY}" "${PCAP_LIBRARY}"
"-pthread")
-target_compile_options(nDPIsrvd PRIVATE ${NDPID_C_FLAGS} -DGIT_VERSION=\"${GIT_VERSION}\" ${MEMORY_PROFILING_CFLAGS})
+target_compile_definitions(nDPIsrvd PRIVATE -DGIT_VERSION=\"${GIT_VERSION}\" ${NDPID_DEFS})
target_include_directories(nDPIsrvd PRIVATE
"${CMAKE_SOURCE_DIR}"
"${CMAKE_SOURCE_DIR}/dependencies"
@@ -161,17 +170,18 @@ target_include_directories(nDPId-test PRIVATE
"${CMAKE_SOURCE_DIR}/dependencies"
"${CMAKE_SOURCE_DIR}/dependencies/jsmn"
"${CMAKE_SOURCE_DIR}/dependencies/uthash/src")
-target_compile_options(nDPId-test PRIVATE ${NDPID_C_FLAGS} ${MEMORY_PROFILING_CFLAGS} "-Wno-unused-function" "-pthread")
+target_compile_options(nDPId-test PRIVATE "-Wno-unused-function" "-pthread")
+target_compile_definitions(nDPId-test PRIVATE ${NDPID_DEFS} ${ZLIB_DEFS})
target_include_directories(nDPId-test PRIVATE "${STATIC_LIBNDPI_INC}" "${NDPI_INCLUDEDIR}" "${NDPI_INCLUDEDIR}/ndpi")
target_compile_definitions(nDPId-test PRIVATE "-D_GNU_SOURCE=1" "-DNO_MAIN=1" "-Dsyslog=mock_syslog_stderr")
target_link_libraries(nDPId-test "${STATIC_LIBNDPI_LIB}" "${pkgcfg_lib_NDPI_ndpi}"
- "${pkgcfg_lib_PCRE_pcre}" "${pkgcfg_lib_MAXMINDDB_maxminddb}"
+ "${pkgcfg_lib_PCRE_pcre}" "${pkgcfg_lib_MAXMINDDB_maxminddb}" "${pkgcfg_lib_ZLIB_z}"
"${GCRYPT_LIBRARY}" "${GCRYPT_ERROR_LIBRARY}" "${PCAP_LIBRARY}"
"-pthread")
if(BUILD_EXAMPLES)
add_executable(nDPIsrvd-collectd examples/c-collectd/c-collectd.c)
- target_compile_options(nDPIsrvd-collectd PRIVATE ${NDPID_C_FLAGS} ${MEMORY_PROFILING_CFLAGS})
+ target_compile_definitions(nDPIsrvd-collectd PRIVATE ${NDPID_DEFS})
target_include_directories(nDPIsrvd-collectd PRIVATE
"${CMAKE_SOURCE_DIR}"
"${CMAKE_SOURCE_DIR}/dependencies"
@@ -179,7 +189,7 @@ if(BUILD_EXAMPLES)
"${CMAKE_SOURCE_DIR}/dependencies/uthash/src")
add_executable(nDPIsrvd-captured examples/c-captured/c-captured.c utils.c)
- target_compile_options(nDPIsrvd-captured PRIVATE ${NDPID_C_FLAGS} ${MEMORY_PROFILING_CFLAGS})
+ target_compile_definitions(nDPIsrvd-captured PRIVATE ${NDPID_DEFS})
target_include_directories(nDPIsrvd-captured PRIVATE
"${CMAKE_SOURCE_DIR}"
"${CMAKE_SOURCE_DIR}/dependencies"
@@ -205,13 +215,11 @@ message(STATUS "nDPId GIT_VERSION........: ${GIT_VERSION}")
message(STATUS "CROSS_COMPILE_TRIPLET....: ${CROSS_COMPILE_TRIPLET}")
message(STATUS "CMAKE_BUILD_TYPE.........: ${CMAKE_BUILD_TYPE}")
message(STATUS "CMAKE_C_FLAGS............: ${CMAKE_C_FLAGS}")
-message(STATUS "NDPID_C_FLAGS............: ${NDPID_C_FLAGS}")
-if(ENABLE_MEMORY_PROFILING)
-message(STATUS "MEMORY_PROFILING_CFLAGS..: ${MEMORY_PROFILING_CFLAGS}")
-endif()
+message(STATUS "NDPID_DEFS...............: ${NDPID_DEFS}")
message(STATUS "ENABLE_SANITIZER.........: ${ENABLE_SANITIZER}")
message(STATUS "ENABLE_SANITIZER_THREAD..: ${ENABLE_SANITIZER_THREAD}")
message(STATUS "ENABLE_MEMORY_PROFILING..: ${ENABLE_MEMORY_PROFILING}")
+message(STATUS "ENABLE_ZLIB..............: ${ENABLE_ZLIB}")
if(NOT BUILD_NDPI AND NOT STATIC_LIBNDPI_INSTALLDIR STREQUAL "")
message(STATUS "STATIC_LIBNDPI_INSTALLDIR: ${STATIC_LIBNDPI_INSTALLDIR}")
endif()
diff --git a/nDPId.c b/nDPId.c
index 427f3ab27..dc99acb14 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -18,6 +18,9 @@
#include <sys/un.h>
#include <syslog.h>
#include <unistd.h>
+#ifdef ENABLE_ZLIB
+#include <zlib.h>
+#endif
#include "config.h"
#include "utils.h"
@@ -108,21 +111,31 @@ struct nDPId_flow_skipped
struct nDPId_flow_basic flow_basic;
};
+struct nDPI_data
+{
+ struct ndpi_flow_struct flow;
+ struct ndpi_id_struct src;
+ struct ndpi_id_struct dst;
+};
+
struct nDPId_flow_info
{
struct nDPId_flow_extended flow_extended;
uint8_t detection_completed : 1;
uint8_t reserved_00 : 7;
- uint8_t reserved_01[3];
+ uint8_t reserved_01[1];
+#ifdef ENABLE_ZLIB
+ uint16_t ndpi_compressed_size;
+#else
+ uint16_t reserved_02;
+#endif
uint32_t last_ndpi_flow_struct_hash;
struct ndpi_proto detected_l7_protocol;
struct ndpi_proto guessed_l7_protocol;
- struct ndpi_flow_struct * ndpi_flow;
- struct ndpi_id_struct * ndpi_src;
- struct ndpi_id_struct * ndpi_dst;
+ struct nDPI_data * ndpi;
};
struct nDPId_flow_finished
@@ -281,6 +294,11 @@ static uint64_t ndpi_memory_alloc_count = 0;
static uint64_t ndpi_memory_alloc_bytes = 0;
static uint64_t ndpi_memory_free_count = 0;
static uint64_t ndpi_memory_free_bytes = 0;
+#ifdef ENABLE_ZLIB
+static uint64_t zlib_compressions = 0;
+static uint64_t zlib_decompressions = 0;
+static uint64_t zlib_compression_diff = 0;
+#endif
#endif
static struct
@@ -302,6 +320,9 @@ static struct
char * custom_ja3_file;
char * custom_sha1_file;
char json_sockpath[UNIX_PATH_MAX];
+#ifdef ENABLE_ZLIB
+ uint8_t enable_zlib_compression;
+#endif
/* subopts */
char * instance_alias;
unsigned long long int max_flows_per_thread;
@@ -367,6 +388,163 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
struct nDPId_flow_info * const flow,
enum flow_event event);
+#ifdef ENABLE_ZLIB
+static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen)
+{
+ z_stream strm = {0};
+ strm.total_in = strm.avail_in = srcLen;
+ strm.total_out = strm.avail_out = dstLen;
+ strm.next_in = (Bytef *)src;
+ strm.next_out = (Bytef *)dst;
+
+ strm.zalloc = Z_NULL;
+ strm.zfree = Z_NULL;
+ strm.opaque = Z_NULL;
+
+ int err = -1;
+ int ret = -1;
+
+ err = deflateInit(&strm, Z_BEST_COMPRESSION);
+ if (err == Z_OK)
+ {
+ err = deflate(&strm, Z_FINISH);
+ if (err == Z_STREAM_END)
+ {
+ ret = strm.total_out;
+#ifdef ENABLE_MEMORY_PROFILING
+ __sync_fetch_and_add(&zlib_compressions, 1);
+ __sync_fetch_and_add(&zlib_compression_diff, srcLen - ret);
+#endif
+ }
+ else
+ {
+ deflateEnd(&strm);
+ return err;
+ }
+ }
+ else
+ {
+ deflateEnd(&strm);
+ return err;
+ }
+
+ deflateEnd(&strm);
+ return ret;
+}
+
+static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen)
+{
+ z_stream strm = {0};
+ strm.total_in = strm.avail_in = srcLen;
+ strm.total_out = strm.avail_out = dstLen;
+ strm.next_in = (Bytef *)src;
+ strm.next_out = (Bytef *)dst;
+
+ strm.zalloc = Z_NULL;
+ strm.zfree = Z_NULL;
+ strm.opaque = Z_NULL;
+
+ int err = -1;
+ int ret = -1;
+
+ err = inflateInit2(&strm, (15 + 32)); // 15 window bits, and the +32 tells zlib to to detect if using gzip or zlib
+ if (err == Z_OK)
+ {
+ err = inflate(&strm, Z_FINISH);
+ if (err == Z_STREAM_END)
+ {
+ ret = strm.total_out;
+#ifdef ENABLE_MEMORY_PROFILING
+ __sync_fetch_and_add(&zlib_decompressions, 1);
+#endif
+ }
+ else
+ {
+ inflateEnd(&strm);
+ return err;
+ }
+ }
+ else
+ {
+ inflateEnd(&strm);
+ return err;
+ }
+
+ inflateEnd(&strm);
+ return ret;
+}
+
+static int ndpi_data_deflate(struct nDPId_flow_info * const flow_info)
+{
+ uint8_t tmpOut[sizeof(*flow_info->ndpi)];
+ int ret;
+
+ if (flow_info->ndpi_compressed_size != 0)
+ {
+ return -7;
+ }
+
+ ret = zlib_deflate(flow_info->ndpi, sizeof(*flow_info->ndpi), tmpOut, sizeof(tmpOut));
+ if (ret <= 0)
+ {
+ return ret;
+ }
+
+ struct nDPI_data * const new_ndpi_data = ndpi_malloc(ret);
+ if (new_ndpi_data == NULL)
+ {
+ return -8;
+ }
+ ndpi_free(flow_info->ndpi);
+ flow_info->ndpi = new_ndpi_data;
+
+ memcpy(flow_info->ndpi, tmpOut, ret);
+ flow_info->ndpi_compressed_size = ret;
+
+ return ret;
+}
+
+static int ndpi_data_inflate(struct nDPId_flow_info * const flow_info)
+{
+ uint8_t tmpOut[sizeof(*flow_info->ndpi)];
+ int ret;
+
+ if (flow_info->ndpi_compressed_size == 0)
+ {
+ return -7;
+ }
+
+ ret = zlib_inflate(flow_info->ndpi, flow_info->ndpi_compressed_size, tmpOut, sizeof(tmpOut));
+ if (ret <= 0)
+ {
+ return ret;
+ }
+
+ struct nDPI_data * const new_ndpi_data = ndpi_malloc(ret);
+ if (new_ndpi_data == NULL)
+ {
+ return -8;
+ }
+ ndpi_free(flow_info->ndpi);
+ flow_info->ndpi = new_ndpi_data;
+
+ memcpy(flow_info->ndpi, tmpOut, ret);
+ flow_info->ndpi_compressed_size = 0;
+
+ /*
+ * Do not use ndpi_id_struct's from ndpi_flow
+ * as they may not be valid anymore.
+ * nDPI only updates those pointers while processing packets!
+ * This is especially important when using compression
+ * to prevent use of dangling pointers.
+ */
+ flow_info->ndpi->flow.src = &flow_info->ndpi->src;
+ flow_info->ndpi->flow.dst = &flow_info->ndpi->dst;
+
+ return ret;
+}
+#endif
+
static void ip_netmask_to_subnet(union nDPId_ip const * const ip,
union nDPId_ip const * const netmask,
union nDPId_ip * const subnet,
@@ -774,21 +952,16 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
static void free_ndpi_structs(struct nDPId_flow_info * const flow_info)
{
- ndpi_free(flow_info->ndpi_dst);
- flow_info->ndpi_dst = NULL;
- ndpi_free(flow_info->ndpi_src);
- flow_info->ndpi_src = NULL;
- ndpi_free_flow(flow_info->ndpi_flow);
- flow_info->ndpi_flow = NULL;
+ ndpi_free_flow_data(&flow_info->ndpi->flow);
+ ndpi_free(flow_info->ndpi);
+ flow_info->ndpi = NULL;
}
static int alloc_ndpi_structs(struct nDPId_flow_info * const flow_info)
{
- flow_info->ndpi_dst = (struct ndpi_id_struct *)ndpi_calloc(1, SIZEOF_ID_STRUCT);
- flow_info->ndpi_src = (struct ndpi_id_struct *)ndpi_calloc(1, SIZEOF_ID_STRUCT);
- flow_info->ndpi_flow = (struct ndpi_flow_struct *)ndpi_flow_malloc(SIZEOF_FLOW_STRUCT);
+ flow_info->ndpi = (struct nDPI_data *)ndpi_flow_malloc(sizeof(*flow_info->ndpi));
- if (flow_info->ndpi_dst == NULL || flow_info->ndpi_src == NULL || flow_info->ndpi_flow == NULL)
+ if (flow_info->ndpi == NULL)
{
goto error;
}
@@ -1068,7 +1241,7 @@ static int ndpi_workflow_node_cmp(void const * const A, void const * const B)
return 1;
}
- /* Flows have the same hash */
+ /* flows have the same hash */
if (flow_basic_a->l4_protocol < flow_basic_b->l4_protocol)
{
return -1;
@@ -1121,6 +1294,18 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
{
struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic;
+#ifdef ENABLE_ZLIB
+ if (nDPId_options.enable_zlib_compression != 0 && flow_info->ndpi_compressed_size > 0)
+ {
+ int ret = ndpi_data_inflate(flow_info);
+ if (ret <= 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "zLib decompression failed with error code: %d", ret);
+ return;
+ }
+ }
+#endif
+
if (flow_info->detection_completed == 0)
{
uint8_t protocol_was_guessed = 0;
@@ -1128,7 +1313,7 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_info->guessed_l7_protocol) == 0)
{
flow_info->guessed_l7_protocol = ndpi_detection_giveup(workflow->ndpi_struct,
- flow_info->ndpi_flow,
+ &flow_info->ndpi->flow,
1,
&protocol_was_guessed);
}
@@ -1188,6 +1373,17 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa
(long long unsigned int)free_bytes,
(long long unsigned int)(alloc_count - free_count),
(long long unsigned int)(alloc_bytes - free_bytes));
+#ifdef ENABLE_ZLIB
+ uint64_t zlib_compression_count = __sync_fetch_and_add(&zlib_compressions, 0);
+ uint64_t zlib_decompression_count = __sync_fetch_and_add(&zlib_decompressions, 0);
+ uint64_t zlib_bytes_diff = __sync_fetch_and_add(&zlib_compression_diff, 0);
+
+ syslog(LOG_DAEMON,
+ "MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu bytes difference",
+ (long long unsigned int)zlib_compression_count,
+ (long long unsigned int)zlib_decompression_count,
+ (long long unsigned int)zlib_bytes_diff);
+#endif
}
#endif
@@ -1686,8 +1882,10 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
case FLOW_EVENT_NOT_DETECTED:
case FLOW_EVENT_GUESSED:
- if (ndpi_dpi2json(
- workflow->ndpi_struct, flow->ndpi_flow, flow->guessed_l7_protocol, &workflow->ndpi_serializer) != 0)
+ if (ndpi_dpi2json(workflow->ndpi_struct,
+ &flow->ndpi->flow,
+ flow->guessed_l7_protocol,
+ &workflow->ndpi_serializer) != 0)
{
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %4u] ndpi_dpi2json failed for not-detected/guessed flow",
@@ -1699,7 +1897,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
case FLOW_EVENT_DETECTED:
case FLOW_EVENT_DETECTION_UPDATE:
if (ndpi_dpi2json(workflow->ndpi_struct,
- flow->ndpi_flow,
+ &flow->ndpi->flow,
flow->detected_l7_protocol,
&workflow->ndpi_serializer) != 0)
{
@@ -2516,8 +2714,8 @@ static void ndpi_process_packet(uint8_t * const args,
{
/* flow still not found, must be new or midstream */
- union nDPId_ip const * netmask;
- union nDPId_ip const * subnet;
+ union nDPId_ip const * netmask = NULL;
+ union nDPId_ip const * subnet = NULL;
switch (flow_basic.l3_type)
{
case L3_IP:
@@ -2617,18 +2815,17 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
- memset(flow_to_process->ndpi_flow,
- 0,
- (SIZEOF_FLOW_STRUCT > sizeof(struct ndpi_flow_struct) ? SIZEOF_FLOW_STRUCT
- : sizeof(struct ndpi_flow_struct)));
+ memset(flow_to_process->ndpi, 0, sizeof(*flow_to_process->ndpi));
- ndpi_src = flow_to_process->ndpi_src;
- ndpi_dst = flow_to_process->ndpi_dst;
+ ndpi_src = &flow_to_process->ndpi->src;
+ ndpi_dst = &flow_to_process->ndpi->dst;
is_new_flow = 1;
}
else
{
+ /* flow already exists in the tree */
+
struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result;
/* Update last seen timestamp for timeout handling. */
flow_basic_to_process->last_seen = time_ms;
@@ -2651,15 +2848,30 @@ static void ndpi_process_packet(uint8_t * const args,
}
flow_to_process = (struct nDPId_flow_info *)flow_basic_to_process;
+#ifdef ENABLE_ZLIB
+ if (nDPId_options.enable_zlib_compression != 0 && flow_to_process->ndpi_compressed_size > 0)
+ {
+ int ret = ndpi_data_inflate(flow_to_process);
+ if (ret <= 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "zLib decompression failed for existing flow %u with error code: %d",
+ flow_to_process->flow_extended.flow_id,
+ ret);
+ return;
+ }
+ }
+#endif
+
if (direction_changed != 0)
{
- ndpi_src = flow_to_process->ndpi_dst;
- ndpi_dst = flow_to_process->ndpi_src;
+ ndpi_src = &flow_to_process->ndpi->dst;
+ ndpi_dst = &flow_to_process->ndpi->src;
}
else
{
- ndpi_src = flow_to_process->ndpi_src;
- ndpi_dst = flow_to_process->ndpi_dst;
+ ndpi_src = &flow_to_process->ndpi->src;
+ ndpi_dst = &flow_to_process->ndpi->dst;
}
}
@@ -2695,7 +2907,7 @@ static void ndpi_process_packet(uint8_t * const args,
&flow_to_process->flow_extended,
PACKET_EVENT_PAYLOAD_FLOW);
- if (flow_to_process->ndpi_flow->num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1)
+ if (flow_to_process->ndpi->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1)
{
if (flow_to_process->detection_completed != 0)
{
@@ -2706,7 +2918,7 @@ static void ndpi_process_packet(uint8_t * const args,
/* last chance to guess something, better then nothing */
uint8_t protocol_was_guessed = 0;
flow_to_process->guessed_l7_protocol =
- ndpi_detection_giveup(workflow->ndpi_struct, flow_to_process->ndpi_flow, 1, &protocol_was_guessed);
+ ndpi_detection_giveup(workflow->ndpi_struct, &flow_to_process->ndpi->flow, 1, &protocol_was_guessed);
if (protocol_was_guessed != 0)
{
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED);
@@ -2719,7 +2931,7 @@ static void ndpi_process_packet(uint8_t * const args,
}
flow_to_process->detected_l7_protocol = ndpi_detection_process_packet(workflow->ndpi_struct,
- flow_to_process->ndpi_flow,
+ &flow_to_process->ndpi->flow,
ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6,
ip_size,
time_ms,
@@ -2732,11 +2944,11 @@ static void ndpi_process_packet(uint8_t * const args,
flow_to_process->detection_completed = 1;
workflow->detected_flow_protocols++;
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED);
- flow_to_process->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(flow_to_process->ndpi_flow);
+ flow_to_process->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(&flow_to_process->ndpi->flow);
}
else if (flow_to_process->detection_completed == 1)
{
- uint32_t hash = calculate_ndpi_flow_struct_hash(flow_to_process->ndpi_flow);
+ uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->ndpi->flow);
if (hash != flow_to_process->last_ndpi_flow_struct_hash)
{
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE);
@@ -2744,11 +2956,24 @@ static void ndpi_process_packet(uint8_t * const args,
}
}
- if (flow_to_process->ndpi_flow->num_processed_pkts == nDPId_options.max_packets_per_flow_to_process)
+ if (flow_to_process->ndpi->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process)
{
free_ndpi_structs(flow_to_process);
flow_to_process->flow_extended.flow_basic.type = FT_FINISHED;
}
+#ifdef ENABLE_ZLIB
+ else if (nDPId_options.enable_zlib_compression != 0)
+ {
+ int ret = ndpi_data_deflate(flow_to_process);
+ if (ret <= 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "zLib compression failed for flow %u with error code: %d",
+ flow_to_process->flow_extended.flow_id,
+ ret);
+ }
+ }
+#endif
}
static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread)
@@ -3120,11 +3345,14 @@ static int nDPId_parse_options(int argc, char ** argv)
"\t \tThis value is required for correct flow handling of\n"
"\t \tmultiple instances and should be unique.\n"
"\t \tDefaults to your hostname.\n"
+#ifdef ENABLE_ZLIB
+ "\t-z\tEnable flow memory zLib compression. (Experimental!)\n"
+#endif
"\t-o\t(Carefully) Tune some daemon options. See subopts below.\n"
"\t-v\tversion\n"
"\t-h\tthis\n\n";
- while ((opt = getopt(argc, argv, "hi:IEB:lc:dp:u:g:P:C:J:S:a:o:vh")) != -1)
+ while ((opt = getopt(argc, argv, "hi:IEB:lc:dp:u:g:P:C:J:S:a:zo:vh")) != -1)
{
switch (opt)
{
@@ -3182,6 +3410,14 @@ static int nDPId_parse_options(int argc, char ** argv)
case 'a':
nDPId_options.instance_alias = strdup(optarg);
break;
+ case 'z':
+#ifdef ENABLE_ZLIB
+ nDPId_options.enable_zlib_compression = 1;
+ break;
+#else
+ fprintf(stderr, "nDPId was built w/o zLib compression\n");
+ return 1;
+#endif
case 'o':
{
int errfnd = 0;
@@ -3283,6 +3519,12 @@ static int validate_options(char const * const arg0)
{
int retval = 0;
+#ifdef ENABLE_ZLIB
+ if (nDPId_options.enable_zlib_compression != 0)
+ {
+ fprintf(stderr, "%s: WARNING: zLib compression is an experimental feature! Expect random crashes.\n", arg0);
+ }
+#endif
if (is_path_absolute("JSON socket", nDPId_options.json_sockpath) != 0)
{
retval = 1;
@@ -3423,7 +3665,7 @@ int main(int argc, char ** argv)
openlog("nDPId", LOG_CONS | LOG_PERROR, LOG_DAEMON);
#ifdef ENABLE_MEMORY_PROFILING
- syslog(LOG_DAEMON, "size/processed-flow: %zu bytes\n", sizeof(struct nDPId_flow_info));
+ syslog(LOG_DAEMON, "size/flow: %zu bytes\n", sizeof(struct nDPId_flow_info) + sizeof(struct nDPI_data));
#endif
if (setup_reader_threads() != 0)