summaryrefslogtreecommitdiff
path: root/nDPId.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-06-14 14:43:29 +0200
committerToni Uhlig <matzeton@googlemail.com>2021-06-14 15:33:29 +0200
commitfac7648326c6cea478b92872f7abb3f799961cfc (patch)
treed2ad3e1c3952f787b1b25259971b04f313d8e388 /nDPId.c
parent98b11f814f54bb23cdd58299e63dc49264e3b5bc (diff)
Support for zLib flow memory compression. Experimental.
Please use this feature only for testing purposes. It will change or be removed in the future. Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r--nDPId.c320
1 files changed, 281 insertions, 39 deletions
diff --git a/nDPId.c b/nDPId.c
index 427f3ab27..dc99acb14 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -18,6 +18,9 @@
#include <sys/un.h>
#include <syslog.h>
#include <unistd.h>
+#ifdef ENABLE_ZLIB
+#include <zlib.h>
+#endif
#include "config.h"
#include "utils.h"
@@ -108,21 +111,31 @@ struct nDPId_flow_skipped
struct nDPId_flow_basic flow_basic;
};
+struct nDPI_data
+{
+ struct ndpi_flow_struct flow;
+ struct ndpi_id_struct src;
+ struct ndpi_id_struct dst;
+};
+
struct nDPId_flow_info
{
struct nDPId_flow_extended flow_extended;
uint8_t detection_completed : 1;
uint8_t reserved_00 : 7;
- uint8_t reserved_01[3];
+ uint8_t reserved_01[1];
+#ifdef ENABLE_ZLIB
+ uint16_t ndpi_compressed_size;
+#else
+ uint16_t reserved_02;
+#endif
uint32_t last_ndpi_flow_struct_hash;
struct ndpi_proto detected_l7_protocol;
struct ndpi_proto guessed_l7_protocol;
- struct ndpi_flow_struct * ndpi_flow;
- struct ndpi_id_struct * ndpi_src;
- struct ndpi_id_struct * ndpi_dst;
+ struct nDPI_data * ndpi;
};
struct nDPId_flow_finished
@@ -281,6 +294,11 @@ static uint64_t ndpi_memory_alloc_count = 0;
static uint64_t ndpi_memory_alloc_bytes = 0;
static uint64_t ndpi_memory_free_count = 0;
static uint64_t ndpi_memory_free_bytes = 0;
+#ifdef ENABLE_ZLIB
+static uint64_t zlib_compressions = 0;
+static uint64_t zlib_decompressions = 0;
+static uint64_t zlib_compression_diff = 0;
+#endif
#endif
static struct
@@ -302,6 +320,9 @@ static struct
char * custom_ja3_file;
char * custom_sha1_file;
char json_sockpath[UNIX_PATH_MAX];
+#ifdef ENABLE_ZLIB
+ uint8_t enable_zlib_compression;
+#endif
/* subopts */
char * instance_alias;
unsigned long long int max_flows_per_thread;
@@ -367,6 +388,163 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
struct nDPId_flow_info * const flow,
enum flow_event event);
+#ifdef ENABLE_ZLIB
+static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen)
+{
+ z_stream strm = {0};
+ strm.total_in = strm.avail_in = srcLen;
+ strm.total_out = strm.avail_out = dstLen;
+ strm.next_in = (Bytef *)src;
+ strm.next_out = (Bytef *)dst;
+
+ strm.zalloc = Z_NULL;
+ strm.zfree = Z_NULL;
+ strm.opaque = Z_NULL;
+
+ int err = -1;
+ int ret = -1;
+
+ err = deflateInit(&strm, Z_BEST_COMPRESSION);
+ if (err == Z_OK)
+ {
+ err = deflate(&strm, Z_FINISH);
+ if (err == Z_STREAM_END)
+ {
+ ret = strm.total_out;
+#ifdef ENABLE_MEMORY_PROFILING
+ __sync_fetch_and_add(&zlib_compressions, 1);
+ __sync_fetch_and_add(&zlib_compression_diff, srcLen - ret);
+#endif
+ }
+ else
+ {
+ deflateEnd(&strm);
+ return err;
+ }
+ }
+ else
+ {
+ deflateEnd(&strm);
+ return err;
+ }
+
+ deflateEnd(&strm);
+ return ret;
+}
+
+static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen)
+{
+ z_stream strm = {0};
+ strm.total_in = strm.avail_in = srcLen;
+ strm.total_out = strm.avail_out = dstLen;
+ strm.next_in = (Bytef *)src;
+ strm.next_out = (Bytef *)dst;
+
+ strm.zalloc = Z_NULL;
+ strm.zfree = Z_NULL;
+ strm.opaque = Z_NULL;
+
+ int err = -1;
+ int ret = -1;
+
+ err = inflateInit2(&strm, (15 + 32)); // 15 window bits, and the +32 tells zlib to to detect if using gzip or zlib
+ if (err == Z_OK)
+ {
+ err = inflate(&strm, Z_FINISH);
+ if (err == Z_STREAM_END)
+ {
+ ret = strm.total_out;
+#ifdef ENABLE_MEMORY_PROFILING
+ __sync_fetch_and_add(&zlib_decompressions, 1);
+#endif
+ }
+ else
+ {
+ inflateEnd(&strm);
+ return err;
+ }
+ }
+ else
+ {
+ inflateEnd(&strm);
+ return err;
+ }
+
+ inflateEnd(&strm);
+ return ret;
+}
+
+static int ndpi_data_deflate(struct nDPId_flow_info * const flow_info)
+{
+ uint8_t tmpOut[sizeof(*flow_info->ndpi)];
+ int ret;
+
+ if (flow_info->ndpi_compressed_size != 0)
+ {
+ return -7;
+ }
+
+ ret = zlib_deflate(flow_info->ndpi, sizeof(*flow_info->ndpi), tmpOut, sizeof(tmpOut));
+ if (ret <= 0)
+ {
+ return ret;
+ }
+
+ struct nDPI_data * const new_ndpi_data = ndpi_malloc(ret);
+ if (new_ndpi_data == NULL)
+ {
+ return -8;
+ }
+ ndpi_free(flow_info->ndpi);
+ flow_info->ndpi = new_ndpi_data;
+
+ memcpy(flow_info->ndpi, tmpOut, ret);
+ flow_info->ndpi_compressed_size = ret;
+
+ return ret;
+}
+
+static int ndpi_data_inflate(struct nDPId_flow_info * const flow_info)
+{
+ uint8_t tmpOut[sizeof(*flow_info->ndpi)];
+ int ret;
+
+ if (flow_info->ndpi_compressed_size == 0)
+ {
+ return -7;
+ }
+
+ ret = zlib_inflate(flow_info->ndpi, flow_info->ndpi_compressed_size, tmpOut, sizeof(tmpOut));
+ if (ret <= 0)
+ {
+ return ret;
+ }
+
+ struct nDPI_data * const new_ndpi_data = ndpi_malloc(ret);
+ if (new_ndpi_data == NULL)
+ {
+ return -8;
+ }
+ ndpi_free(flow_info->ndpi);
+ flow_info->ndpi = new_ndpi_data;
+
+ memcpy(flow_info->ndpi, tmpOut, ret);
+ flow_info->ndpi_compressed_size = 0;
+
+ /*
+ * Do not use ndpi_id_struct's from ndpi_flow
+ * as they may not be valid anymore.
+ * nDPI only updates those pointers while processing packets!
+ * This is especially important when using compression
+ * to prevent use of dangling pointers.
+ */
+ flow_info->ndpi->flow.src = &flow_info->ndpi->src;
+ flow_info->ndpi->flow.dst = &flow_info->ndpi->dst;
+
+ return ret;
+}
+#endif
+
static void ip_netmask_to_subnet(union nDPId_ip const * const ip,
union nDPId_ip const * const netmask,
union nDPId_ip * const subnet,
@@ -774,21 +952,16 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
static void free_ndpi_structs(struct nDPId_flow_info * const flow_info)
{
- ndpi_free(flow_info->ndpi_dst);
- flow_info->ndpi_dst = NULL;
- ndpi_free(flow_info->ndpi_src);
- flow_info->ndpi_src = NULL;
- ndpi_free_flow(flow_info->ndpi_flow);
- flow_info->ndpi_flow = NULL;
+ ndpi_free_flow_data(&flow_info->ndpi->flow);
+ ndpi_free(flow_info->ndpi);
+ flow_info->ndpi = NULL;
}
static int alloc_ndpi_structs(struct nDPId_flow_info * const flow_info)
{
- flow_info->ndpi_dst = (struct ndpi_id_struct *)ndpi_calloc(1, SIZEOF_ID_STRUCT);
- flow_info->ndpi_src = (struct ndpi_id_struct *)ndpi_calloc(1, SIZEOF_ID_STRUCT);
- flow_info->ndpi_flow = (struct ndpi_flow_struct *)ndpi_flow_malloc(SIZEOF_FLOW_STRUCT);
+ flow_info->ndpi = (struct nDPI_data *)ndpi_flow_malloc(sizeof(*flow_info->ndpi));
- if (flow_info->ndpi_dst == NULL || flow_info->ndpi_src == NULL || flow_info->ndpi_flow == NULL)
+ if (flow_info->ndpi == NULL)
{
goto error;
}
@@ -1068,7 +1241,7 @@ static int ndpi_workflow_node_cmp(void const * const A, void const * const B)
return 1;
}
- /* Flows have the same hash */
+ /* flows have the same hash */
if (flow_basic_a->l4_protocol < flow_basic_b->l4_protocol)
{
return -1;
@@ -1121,6 +1294,18 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
{
struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic;
+#ifdef ENABLE_ZLIB
+ if (nDPId_options.enable_zlib_compression != 0 && flow_info->ndpi_compressed_size > 0)
+ {
+ int ret = ndpi_data_inflate(flow_info);
+ if (ret <= 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "zLib decompression failed with error code: %d", ret);
+ return;
+ }
+ }
+#endif
+
if (flow_info->detection_completed == 0)
{
uint8_t protocol_was_guessed = 0;
@@ -1128,7 +1313,7 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_info->guessed_l7_protocol) == 0)
{
flow_info->guessed_l7_protocol = ndpi_detection_giveup(workflow->ndpi_struct,
- flow_info->ndpi_flow,
+ &flow_info->ndpi->flow,
1,
&protocol_was_guessed);
}
@@ -1188,6 +1373,17 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa
(long long unsigned int)free_bytes,
(long long unsigned int)(alloc_count - free_count),
(long long unsigned int)(alloc_bytes - free_bytes));
+#ifdef ENABLE_ZLIB
+ uint64_t zlib_compression_count = __sync_fetch_and_add(&zlib_compressions, 0);
+ uint64_t zlib_decompression_count = __sync_fetch_and_add(&zlib_decompressions, 0);
+ uint64_t zlib_bytes_diff = __sync_fetch_and_add(&zlib_compression_diff, 0);
+
+ syslog(LOG_DAEMON,
+ "MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu bytes difference",
+ (long long unsigned int)zlib_compression_count,
+ (long long unsigned int)zlib_decompression_count,
+ (long long unsigned int)zlib_bytes_diff);
+#endif
}
#endif
@@ -1686,8 +1882,10 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
case FLOW_EVENT_NOT_DETECTED:
case FLOW_EVENT_GUESSED:
- if (ndpi_dpi2json(
- workflow->ndpi_struct, flow->ndpi_flow, flow->guessed_l7_protocol, &workflow->ndpi_serializer) != 0)
+ if (ndpi_dpi2json(workflow->ndpi_struct,
+ &flow->ndpi->flow,
+ flow->guessed_l7_protocol,
+ &workflow->ndpi_serializer) != 0)
{
syslog(LOG_DAEMON | LOG_ERR,
"[%8llu, %4u] ndpi_dpi2json failed for not-detected/guessed flow",
@@ -1699,7 +1897,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
case FLOW_EVENT_DETECTED:
case FLOW_EVENT_DETECTION_UPDATE:
if (ndpi_dpi2json(workflow->ndpi_struct,
- flow->ndpi_flow,
+ &flow->ndpi->flow,
flow->detected_l7_protocol,
&workflow->ndpi_serializer) != 0)
{
@@ -2516,8 +2714,8 @@ static void ndpi_process_packet(uint8_t * const args,
{
/* flow still not found, must be new or midstream */
- union nDPId_ip const * netmask;
- union nDPId_ip const * subnet;
+ union nDPId_ip const * netmask = NULL;
+ union nDPId_ip const * subnet = NULL;
switch (flow_basic.l3_type)
{
case L3_IP:
@@ -2617,18 +2815,17 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
- memset(flow_to_process->ndpi_flow,
- 0,
- (SIZEOF_FLOW_STRUCT > sizeof(struct ndpi_flow_struct) ? SIZEOF_FLOW_STRUCT
- : sizeof(struct ndpi_flow_struct)));
+ memset(flow_to_process->ndpi, 0, sizeof(*flow_to_process->ndpi));
- ndpi_src = flow_to_process->ndpi_src;
- ndpi_dst = flow_to_process->ndpi_dst;
+ ndpi_src = &flow_to_process->ndpi->src;
+ ndpi_dst = &flow_to_process->ndpi->dst;
is_new_flow = 1;
}
else
{
+ /* flow already exists in the tree */
+
struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result;
/* Update last seen timestamp for timeout handling. */
flow_basic_to_process->last_seen = time_ms;
@@ -2651,15 +2848,30 @@ static void ndpi_process_packet(uint8_t * const args,
}
flow_to_process = (struct nDPId_flow_info *)flow_basic_to_process;
+#ifdef ENABLE_ZLIB
+ if (nDPId_options.enable_zlib_compression != 0 && flow_to_process->ndpi_compressed_size > 0)
+ {
+ int ret = ndpi_data_inflate(flow_to_process);
+ if (ret <= 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "zLib decompression failed for existing flow %u with error code: %d",
+ flow_to_process->flow_extended.flow_id,
+ ret);
+ return;
+ }
+ }
+#endif
+
if (direction_changed != 0)
{
- ndpi_src = flow_to_process->ndpi_dst;
- ndpi_dst = flow_to_process->ndpi_src;
+ ndpi_src = &flow_to_process->ndpi->dst;
+ ndpi_dst = &flow_to_process->ndpi->src;
}
else
{
- ndpi_src = flow_to_process->ndpi_src;
- ndpi_dst = flow_to_process->ndpi_dst;
+ ndpi_src = &flow_to_process->ndpi->src;
+ ndpi_dst = &flow_to_process->ndpi->dst;
}
}
@@ -2695,7 +2907,7 @@ static void ndpi_process_packet(uint8_t * const args,
&flow_to_process->flow_extended,
PACKET_EVENT_PAYLOAD_FLOW);
- if (flow_to_process->ndpi_flow->num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1)
+ if (flow_to_process->ndpi->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1)
{
if (flow_to_process->detection_completed != 0)
{
@@ -2706,7 +2918,7 @@ static void ndpi_process_packet(uint8_t * const args,
/* last chance to guess something, better then nothing */
uint8_t protocol_was_guessed = 0;
flow_to_process->guessed_l7_protocol =
- ndpi_detection_giveup(workflow->ndpi_struct, flow_to_process->ndpi_flow, 1, &protocol_was_guessed);
+ ndpi_detection_giveup(workflow->ndpi_struct, &flow_to_process->ndpi->flow, 1, &protocol_was_guessed);
if (protocol_was_guessed != 0)
{
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED);
@@ -2719,7 +2931,7 @@ static void ndpi_process_packet(uint8_t * const args,
}
flow_to_process->detected_l7_protocol = ndpi_detection_process_packet(workflow->ndpi_struct,
- flow_to_process->ndpi_flow,
+ &flow_to_process->ndpi->flow,
ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6,
ip_size,
time_ms,
@@ -2732,11 +2944,11 @@ static void ndpi_process_packet(uint8_t * const args,
flow_to_process->detection_completed = 1;
workflow->detected_flow_protocols++;
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED);
- flow_to_process->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(flow_to_process->ndpi_flow);
+ flow_to_process->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(&flow_to_process->ndpi->flow);
}
else if (flow_to_process->detection_completed == 1)
{
- uint32_t hash = calculate_ndpi_flow_struct_hash(flow_to_process->ndpi_flow);
+ uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->ndpi->flow);
if (hash != flow_to_process->last_ndpi_flow_struct_hash)
{
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE);
@@ -2744,11 +2956,24 @@ static void ndpi_process_packet(uint8_t * const args,
}
}
- if (flow_to_process->ndpi_flow->num_processed_pkts == nDPId_options.max_packets_per_flow_to_process)
+ if (flow_to_process->ndpi->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process)
{
free_ndpi_structs(flow_to_process);
flow_to_process->flow_extended.flow_basic.type = FT_FINISHED;
}
+#ifdef ENABLE_ZLIB
+ else if (nDPId_options.enable_zlib_compression != 0)
+ {
+ int ret = ndpi_data_deflate(flow_to_process);
+ if (ret <= 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "zLib compression failed for flow %u with error code: %d",
+ flow_to_process->flow_extended.flow_id,
+ ret);
+ }
+ }
+#endif
}
static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread)
@@ -3120,11 +3345,14 @@ static int nDPId_parse_options(int argc, char ** argv)
"\t \tThis value is required for correct flow handling of\n"
"\t \tmultiple instances and should be unique.\n"
"\t \tDefaults to your hostname.\n"
+#ifdef ENABLE_ZLIB
+ "\t-z\tEnable flow memory zLib compression. (Experimental!)\n"
+#endif
"\t-o\t(Carefully) Tune some daemon options. See subopts below.\n"
"\t-v\tversion\n"
"\t-h\tthis\n\n";
- while ((opt = getopt(argc, argv, "hi:IEB:lc:dp:u:g:P:C:J:S:a:o:vh")) != -1)
+ while ((opt = getopt(argc, argv, "hi:IEB:lc:dp:u:g:P:C:J:S:a:zo:vh")) != -1)
{
switch (opt)
{
@@ -3182,6 +3410,14 @@ static int nDPId_parse_options(int argc, char ** argv)
case 'a':
nDPId_options.instance_alias = strdup(optarg);
break;
+ case 'z':
+#ifdef ENABLE_ZLIB
+ nDPId_options.enable_zlib_compression = 1;
+ break;
+#else
+ fprintf(stderr, "nDPId was built w/o zLib compression\n");
+ return 1;
+#endif
case 'o':
{
int errfnd = 0;
@@ -3283,6 +3519,12 @@ static int validate_options(char const * const arg0)
{
int retval = 0;
+#ifdef ENABLE_ZLIB
+ if (nDPId_options.enable_zlib_compression != 0)
+ {
+ fprintf(stderr, "%s: WARNING: zLib compression is an experimental feature! Expect random crashes.\n", arg0);
+ }
+#endif
if (is_path_absolute("JSON socket", nDPId_options.json_sockpath) != 0)
{
retval = 1;
@@ -3423,7 +3665,7 @@ int main(int argc, char ** argv)
openlog("nDPId", LOG_CONS | LOG_PERROR, LOG_DAEMON);
#ifdef ENABLE_MEMORY_PROFILING
- syslog(LOG_DAEMON, "size/processed-flow: %zu bytes\n", sizeof(struct nDPId_flow_info));
+ syslog(LOG_DAEMON, "size/flow: %zu bytes\n", sizeof(struct nDPId_flow_info) + sizeof(struct nDPI_data));
#endif
if (setup_reader_threads() != 0)