summaryrefslogtreecommitdiff
path: root/nDPId.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-06-16 19:25:27 +0200
committerToni Uhlig <matzeton@googlemail.com>2021-06-16 19:28:02 +0200
commitdb87d45edb175572112aa08840f08bc7c61cadcb (patch)
treee5c9ae7ff92404be13ba5b01a791be3f8a22f1f8 /nDPId.c
parentfac7648326c6cea478b92872f7abb3f799961cfc (diff)
Added zLib compression parameters to control compression conditions.
* more structs are now "compressable" * fixed missing DAEMON_RECONNECT event * improved memory profiler Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r--nDPId.c361
1 files changed, 236 insertions, 125 deletions
diff --git a/nDPId.c b/nDPId.c
index dc99acb14..f689fe44d 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -104,15 +104,22 @@ struct nDPId_flow_extended
};
/*
- * Structures related to certain flow states.
+ * Skipped flows need at least some information.
*/
struct nDPId_flow_skipped
{
struct nDPId_flow_basic flow_basic;
};
-struct nDPI_data
+/*
+ * Structure which is important for the detection process.
+ * The structure is also a compression target, if activated.
+ */
+struct nDPId_detection_data
{
+ uint32_t last_ndpi_flow_struct_hash;
+ struct ndpi_proto detected_l7_protocol;
+ struct ndpi_proto guessed_l7_protocol;
struct ndpi_flow_struct flow;
struct ndpi_id_struct src;
struct ndpi_id_struct dst;
@@ -126,16 +133,11 @@ struct nDPId_flow_info
uint8_t reserved_00 : 7;
uint8_t reserved_01[1];
#ifdef ENABLE_ZLIB
- uint16_t ndpi_compressed_size;
+ uint16_t detection_data_compressed_size;
#else
uint16_t reserved_02;
#endif
- uint32_t last_ndpi_flow_struct_hash;
-
- struct ndpi_proto detected_l7_protocol;
- struct ndpi_proto guessed_l7_protocol;
-
- struct nDPI_data * ndpi;
+ struct nDPId_detection_data * detection_data;
};
struct nDPId_flow_finished
@@ -156,6 +158,12 @@ struct nDPId_workflow
unsigned long long int total_l4_data_len;
unsigned long long int detected_flow_protocols;
+#ifdef ENABLE_MEMORY_PROFILING
+ uint64_t last_memory_usage_log_time;
+#endif
+#ifdef ENABLE_ZLIB
+ uint64_t last_compression_scan_time;
+#endif
uint64_t last_idle_scan_time;
uint64_t last_time;
@@ -329,6 +337,10 @@ static struct
unsigned long long int max_idle_flows_per_thread;
unsigned long long int tick_resolution;
unsigned long long int reader_thread_count;
+#ifdef ENABLE_ZLIB
+ unsigned long long int compression_scan_period;
+ unsigned long long int compression_flow_inactivity;
+#endif
unsigned long long int idle_scan_period;
unsigned long long int generic_max_idle_time;
unsigned long long int icmp_max_idle_time;
@@ -344,6 +356,10 @@ static struct
.max_idle_flows_per_thread = nDPId_MAX_IDLE_FLOWS_PER_THREAD / 2,
.tick_resolution = nDPId_TICK_RESOLUTION,
.reader_thread_count = nDPId_MAX_READER_THREADS / 2,
+#ifdef ENABLE_ZLIB
+ .compression_scan_period = nDPId_COMPRESSION_SCAN_PERIOD,
+ .compression_flow_inactivity = nDPId_COMPRESSION_FLOW_INACTIVITY,
+#endif
.idle_scan_period = nDPId_IDLE_SCAN_PERIOD,
.generic_max_idle_time = nDPId_GENERIC_IDLE_TIME,
.icmp_max_idle_time = nDPId_ICMP_IDLE_TIME,
@@ -360,6 +376,10 @@ enum nDPId_subopts
TICK_RESOLUTION,
MAX_READER_THREADS,
IDLE_SCAN_PERIOD,
+#ifdef ENABLE_ZLIB
+ COMPRESSION_SCAN_PERIOD,
+ COMPRESSION_FLOW_INACTIVITY,
+#endif
GENERIC_MAX_IDLE_TIME,
ICMP_MAX_IDLE_TIME,
UDP_MAX_IDLE_TIME,
@@ -372,6 +392,10 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th
[MAX_IDLE_FLOWS_PER_THREAD] = "max-idle-flows-per-thread",
[TICK_RESOLUTION] = "tick-resolution",
[MAX_READER_THREADS] = "max-reader-threads",
+#ifdef ENABLE_ZLIB
+ [COMPRESSION_SCAN_PERIOD] = "compression-scan-period",
+ [COMPRESSION_FLOW_INACTIVITY] = "compression-flow-activity",
+#endif
[IDLE_SCAN_PERIOD] = "idle-scan-period",
[GENERIC_MAX_IDLE_TIME] = "generic-max-idle-time",
[ICMP_MAX_IDLE_TIME] = "icmp-max-idle-time",
@@ -456,6 +480,7 @@ static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen)
ret = strm.total_out;
#ifdef ENABLE_MEMORY_PROFILING
__sync_fetch_and_add(&zlib_decompressions, 1);
+ __sync_fetch_and_sub(&zlib_compression_diff, ret - srcLen);
#endif
}
else
@@ -474,75 +499,137 @@ static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen)
return ret;
}
-static int ndpi_data_deflate(struct nDPId_flow_info * const flow_info)
+static int detection_data_deflate(struct nDPId_flow_info * const flow_info)
{
- uint8_t tmpOut[sizeof(*flow_info->ndpi)];
+ uint8_t tmpOut[sizeof(*flow_info->detection_data)];
int ret;
- if (flow_info->ndpi_compressed_size != 0)
+ if (flow_info->detection_data_compressed_size > 0)
{
return -7;
}
- ret = zlib_deflate(flow_info->ndpi, sizeof(*flow_info->ndpi), tmpOut, sizeof(tmpOut));
+ ret = zlib_deflate(flow_info->detection_data, sizeof(*flow_info->detection_data), tmpOut, sizeof(tmpOut));
if (ret <= 0)
{
return ret;
}
- struct nDPI_data * const new_ndpi_data = ndpi_malloc(ret);
- if (new_ndpi_data == NULL)
+ struct nDPId_detection_data * const new_det_data = ndpi_malloc(ret);
+ if (new_det_data == NULL)
{
return -8;
}
- ndpi_free(flow_info->ndpi);
- flow_info->ndpi = new_ndpi_data;
+ ndpi_free(flow_info->detection_data);
+ flow_info->detection_data = new_det_data;
- memcpy(flow_info->ndpi, tmpOut, ret);
- flow_info->ndpi_compressed_size = ret;
+ memcpy(flow_info->detection_data, tmpOut, ret);
+ flow_info->detection_data_compressed_size = ret;
return ret;
}
-static int ndpi_data_inflate(struct nDPId_flow_info * const flow_info)
+static int detection_data_inflate(struct nDPId_flow_info * const flow_info)
{
- uint8_t tmpOut[sizeof(*flow_info->ndpi)];
+ uint8_t tmpOut[sizeof(*flow_info->detection_data)];
int ret;
- if (flow_info->ndpi_compressed_size == 0)
+ if (flow_info->detection_data_compressed_size == 0)
{
return -7;
}
- ret = zlib_inflate(flow_info->ndpi, flow_info->ndpi_compressed_size, tmpOut, sizeof(tmpOut));
+ ret = zlib_inflate(flow_info->detection_data, flow_info->detection_data_compressed_size, tmpOut, sizeof(tmpOut));
if (ret <= 0)
{
return ret;
}
- struct nDPI_data * const new_ndpi_data = ndpi_malloc(ret);
- if (new_ndpi_data == NULL)
+ struct nDPId_detection_data * const new_det_data = ndpi_malloc(ret);
+ if (new_det_data == NULL)
{
return -8;
}
- ndpi_free(flow_info->ndpi);
- flow_info->ndpi = new_ndpi_data;
+ ndpi_free(flow_info->detection_data);
+ flow_info->detection_data = new_det_data;
- memcpy(flow_info->ndpi, tmpOut, ret);
- flow_info->ndpi_compressed_size = 0;
+ memcpy(flow_info->detection_data, tmpOut, ret);
+ flow_info->detection_data_compressed_size = 0;
/*
- * Do not use ndpi_id_struct's from ndpi_flow
- * as they may not be valid anymore.
+ * The ndpi_id_struct's from ndpi_flow may not be valid anymore.
* nDPI only updates those pointers while processing packets!
* This is especially important when using compression
* to prevent use of dangling pointers.
*/
- flow_info->ndpi->flow.src = &flow_info->ndpi->src;
- flow_info->ndpi->flow.dst = &flow_info->ndpi->dst;
+ flow_info->detection_data->flow.src = &flow_info->detection_data->src;
+ flow_info->detection_data->flow.dst = &flow_info->detection_data->dst;
return ret;
}
+
+static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
+{
+ struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data;
+ struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A;
+
+ (void)depth;
+
+ if (workflow == NULL || flow_basic == NULL)
+ {
+ return;
+ }
+
+ if (which == ndpi_preorder || which == ndpi_leaf)
+ {
+ switch (flow_basic->type)
+ {
+ case FT_UNKNOWN:
+ case FT_SKIPPED:
+ case FT_FINISHED:
+ break;
+
+ case FT_INFO:
+ {
+ if (flow_basic->last_seen + nDPId_options.compression_flow_inactivity < workflow->last_time)
+ {
+ struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic;
+
+ if (flow_info->detection_data_compressed_size > 0)
+ {
+ break;
+ }
+
+ int ret = detection_data_deflate(flow_info);
+
+ if (ret <= 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "zLib compression failed for flow %u with error code: %d",
+ flow_info->flow_extended.flow_id,
+ ret);
+ }
+ }
+ break;
+ }
+ }
+ }
+}
+
+static void check_for_compressable_flows(struct nDPId_reader_thread * const reader_thread)
+{
+ struct nDPId_workflow * const workflow = reader_thread->workflow;
+
+ if (workflow->last_compression_scan_time + nDPId_options.compression_scan_period < workflow->last_time)
+ {
+ for (size_t comp_scan_index = 0; comp_scan_index < workflow->max_active_flows; ++comp_scan_index)
+ {
+ ndpi_twalk(workflow->ndpi_flows_active[comp_scan_index], ndpi_comp_scan_walker, workflow);
+ }
+
+ workflow->last_compression_scan_time = workflow->last_time;
+ }
+}
#endif
static void ip_netmask_to_subnet(union nDPId_ip const * const ip,
@@ -829,6 +916,49 @@ static void ndpi_free_wrapper(void * const freeable)
free(p);
}
+
+static void log_memory_usage(struct nDPId_reader_thread * const reader_thread)
+{
+ struct nDPId_workflow * const workflow = reader_thread->workflow;
+
+ if (workflow->last_memory_usage_log_time + nDPId_LOG_MEMORY_USAGE_EVERY < workflow->last_time)
+ {
+ if (reader_thread->array_index == 0)
+ {
+ uint64_t alloc_count = __sync_fetch_and_add(&ndpi_memory_alloc_count, 0);
+ uint64_t free_count = __sync_fetch_and_add(&ndpi_memory_free_count, 0);
+ uint64_t alloc_bytes = __sync_fetch_and_add(&ndpi_memory_alloc_bytes, 0);
+ uint64_t free_bytes = __sync_fetch_and_add(&ndpi_memory_free_bytes, 0);
+
+ syslog(LOG_DAEMON,
+ "MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in "
+ "use, "
+ "%llu bytes in use",
+ (long long unsigned int)alloc_count,
+ (long long unsigned int)free_count,
+ (long long unsigned int)alloc_bytes,
+ (long long unsigned int)free_bytes,
+ (long long unsigned int)(alloc_count - free_count),
+ (long long unsigned int)(alloc_bytes - free_bytes));
+#ifdef ENABLE_ZLIB
+ uint64_t zlib_compression_count = __sync_fetch_and_add(&zlib_compressions, 0);
+ uint64_t zlib_decompression_count = __sync_fetch_and_add(&zlib_decompressions, 0);
+ uint64_t zlib_bytes_diff = __sync_fetch_and_add(&zlib_compression_diff, 0);
+
+ syslog(LOG_DAEMON,
+ "MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu compressed blocks in use, %llu "
+ "bytes "
+ "diff",
+ (long long unsigned int)zlib_compression_count,
+ (long long unsigned int)zlib_decompression_count,
+ (long long unsigned int)zlib_compression_count - (long long unsigned int)zlib_decompression_count,
+ (long long unsigned int)zlib_bytes_diff);
+#endif
+ }
+
+ workflow->last_memory_usage_log_time = workflow->last_time;
+ }
+}
#endif
static struct nDPId_workflow * init_workflow(char const * const file_or_device)
@@ -950,25 +1080,27 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
return workflow;
}
-static void free_ndpi_structs(struct nDPId_flow_info * const flow_info)
+static void free_detection_data(struct nDPId_flow_info * const flow_info)
{
- ndpi_free_flow_data(&flow_info->ndpi->flow);
- ndpi_free(flow_info->ndpi);
- flow_info->ndpi = NULL;
+ ndpi_free_flow_data(&flow_info->detection_data->flow);
+ ndpi_free(flow_info->detection_data);
+ flow_info->detection_data = NULL;
}
-static int alloc_ndpi_structs(struct nDPId_flow_info * const flow_info)
+static int alloc_detection_data(struct nDPId_flow_info * const flow_info)
{
- flow_info->ndpi = (struct nDPI_data *)ndpi_flow_malloc(sizeof(*flow_info->ndpi));
+ flow_info->detection_data = (struct nDPId_detection_data *)ndpi_flow_malloc(sizeof(*flow_info->detection_data));
- if (flow_info->ndpi == NULL)
+ if (flow_info->detection_data == NULL)
{
goto error;
}
+ memset(flow_info->detection_data, 0, sizeof(*flow_info->detection_data));
+
return 0;
error:
- free_ndpi_structs(flow_info);
+ free_detection_data(flow_info);
return 1;
}
@@ -986,7 +1118,7 @@ static void ndpi_flow_info_freer(void * const node)
case FT_INFO:
{
struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic;
- free_ndpi_structs(flow_info);
+ free_detection_data(flow_info);
break;
}
}
@@ -1295,9 +1427,9 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic;
#ifdef ENABLE_ZLIB
- if (nDPId_options.enable_zlib_compression != 0 && flow_info->ndpi_compressed_size > 0)
+ if (nDPId_options.enable_zlib_compression != 0 && flow_info->detection_data_compressed_size > 0)
{
- int ret = ndpi_data_inflate(flow_info);
+ int ret = detection_data_inflate(flow_info);
if (ret <= 0)
{
syslog(LOG_DAEMON | LOG_ERR, "zLib decompression failed with error code: %d", ret);
@@ -1310,12 +1442,11 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
{
uint8_t protocol_was_guessed = 0;
- if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_info->guessed_l7_protocol) == 0)
+ if (ndpi_is_protocol_detected(workflow->ndpi_struct,
+ flow_info->detection_data->guessed_l7_protocol) == 0)
{
- flow_info->guessed_l7_protocol = ndpi_detection_giveup(workflow->ndpi_struct,
- &flow_info->ndpi->flow,
- 1,
- &protocol_was_guessed);
+ flow_info->detection_data->guessed_l7_protocol = ndpi_detection_giveup(
+ workflow->ndpi_struct, &flow_info->detection_data->flow, 1, &protocol_was_guessed);
}
else
{
@@ -1355,38 +1486,6 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa
if (workflow->last_idle_scan_time + nDPId_options.idle_scan_period < workflow->last_time)
{
-#ifdef ENABLE_MEMORY_PROFILING
- if (reader_thread->array_index == 0)
- {
- uint64_t alloc_count = __sync_fetch_and_add(&ndpi_memory_alloc_count, 0);
- uint64_t free_count = __sync_fetch_and_add(&ndpi_memory_free_count, 0);
- uint64_t alloc_bytes = __sync_fetch_and_add(&ndpi_memory_alloc_bytes, 0);
- uint64_t free_bytes = __sync_fetch_and_add(&ndpi_memory_free_bytes, 0);
-
- syslog(LOG_DAEMON,
- "MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in "
- "use, "
- "%llu bytes in use",
- (long long unsigned int)alloc_count,
- (long long unsigned int)free_count,
- (long long unsigned int)alloc_bytes,
- (long long unsigned int)free_bytes,
- (long long unsigned int)(alloc_count - free_count),
- (long long unsigned int)(alloc_bytes - free_bytes));
-#ifdef ENABLE_ZLIB
- uint64_t zlib_compression_count = __sync_fetch_and_add(&zlib_compressions, 0);
- uint64_t zlib_decompression_count = __sync_fetch_and_add(&zlib_decompressions, 0);
- uint64_t zlib_bytes_diff = __sync_fetch_and_add(&zlib_compression_diff, 0);
-
- syslog(LOG_DAEMON,
- "MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu bytes difference",
- (long long unsigned int)zlib_compression_count,
- (long long unsigned int)zlib_decompression_count,
- (long long unsigned int)zlib_bytes_diff);
-#endif
- }
-#endif
-
for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index)
{
ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow);
@@ -1615,6 +1714,7 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
"[%8llu, %d] Reconnected to JSON sink",
workflow->packets_captured,
reader_thread->array_index);
+ jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT);
}
}
@@ -1883,8 +1983,8 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
case FLOW_EVENT_NOT_DETECTED:
case FLOW_EVENT_GUESSED:
if (ndpi_dpi2json(workflow->ndpi_struct,
- &flow->ndpi->flow,
- flow->guessed_l7_protocol,
+ &flow->detection_data->flow,
+ flow->detection_data->guessed_l7_protocol,
&workflow->ndpi_serializer) != 0)
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -1897,8 +1997,8 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
case FLOW_EVENT_DETECTED:
case FLOW_EVENT_DETECTION_UPDATE:
if (ndpi_dpi2json(workflow->ndpi_struct,
- &flow->ndpi->flow,
- flow->detected_l7_protocol,
+ &flow->detection_data->flow,
+ flow->detection_data->detected_l7_protocol,
&workflow->ndpi_serializer) != 0)
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -2471,6 +2571,9 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->last_time = time_ms;
check_for_idle_flows(reader_thread);
+#ifdef ENABLE_MEMORY_PROFILING
+ log_memory_usage(reader_thread);
+#endif
if (process_datalink_layer(reader_thread, header, packet, &ip_offset, &type) != 0)
{
@@ -2806,7 +2909,7 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->total_active_flows++;
flow_to_process->flow_extended.flow_id = __sync_fetch_and_add(&global_flow_id, 1);
- if (alloc_ndpi_structs(flow_to_process) != 0)
+ if (alloc_detection_data(flow_to_process) != 0)
{
jsonize_packet_event(
reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD);
@@ -2815,10 +2918,8 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
- memset(flow_to_process->ndpi, 0, sizeof(*flow_to_process->ndpi));
-
- ndpi_src = &flow_to_process->ndpi->src;
- ndpi_dst = &flow_to_process->ndpi->dst;
+ ndpi_src = &flow_to_process->detection_data->src;
+ ndpi_dst = &flow_to_process->detection_data->dst;
is_new_flow = 1;
}
@@ -2849,9 +2950,9 @@ static void ndpi_process_packet(uint8_t * const args,
flow_to_process = (struct nDPId_flow_info *)flow_basic_to_process;
#ifdef ENABLE_ZLIB
- if (nDPId_options.enable_zlib_compression != 0 && flow_to_process->ndpi_compressed_size > 0)
+ if (nDPId_options.enable_zlib_compression != 0 && flow_to_process->detection_data_compressed_size > 0)
{
- int ret = ndpi_data_inflate(flow_to_process);
+ int ret = detection_data_inflate(flow_to_process);
if (ret <= 0)
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -2865,13 +2966,13 @@ static void ndpi_process_packet(uint8_t * const args,
if (direction_changed != 0)
{
- ndpi_src = &flow_to_process->ndpi->dst;
- ndpi_dst = &flow_to_process->ndpi->src;
+ ndpi_src = &flow_to_process->detection_data->dst;
+ ndpi_dst = &flow_to_process->detection_data->src;
}
else
{
- ndpi_src = &flow_to_process->ndpi->src;
- ndpi_dst = &flow_to_process->ndpi->dst;
+ ndpi_src = &flow_to_process->detection_data->src;
+ ndpi_dst = &flow_to_process->detection_data->dst;
}
}
@@ -2907,7 +3008,7 @@ static void ndpi_process_packet(uint8_t * const args,
&flow_to_process->flow_extended,
PACKET_EVENT_PAYLOAD_FLOW);
- if (flow_to_process->ndpi->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1)
+ if (flow_to_process->detection_data->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1)
{
if (flow_to_process->detection_completed != 0)
{
@@ -2917,8 +3018,8 @@ static void ndpi_process_packet(uint8_t * const args,
{
/* last chance to guess something, better then nothing */
uint8_t protocol_was_guessed = 0;
- flow_to_process->guessed_l7_protocol =
- ndpi_detection_giveup(workflow->ndpi_struct, &flow_to_process->ndpi->flow, 1, &protocol_was_guessed);
+ flow_to_process->detection_data->guessed_l7_protocol = ndpi_detection_giveup(
+ workflow->ndpi_struct, &flow_to_process->detection_data->flow, 1, &protocol_was_guessed);
if (protocol_was_guessed != 0)
{
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED);
@@ -2930,48 +3031,43 @@ static void ndpi_process_packet(uint8_t * const args,
}
}
- flow_to_process->detected_l7_protocol = ndpi_detection_process_packet(workflow->ndpi_struct,
- &flow_to_process->ndpi->flow,
- ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6,
- ip_size,
- time_ms,
- ndpi_src,
- ndpi_dst);
+ flow_to_process->detection_data->detected_l7_protocol =
+ ndpi_detection_process_packet(workflow->ndpi_struct,
+ &flow_to_process->detection_data->flow,
+ ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6,
+ ip_size,
+ time_ms,
+ ndpi_src,
+ ndpi_dst);
- if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_to_process->detected_l7_protocol) != 0 &&
+ if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_to_process->detection_data->detected_l7_protocol) != 0 &&
flow_to_process->detection_completed == 0)
{
flow_to_process->detection_completed = 1;
workflow->detected_flow_protocols++;
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED);
- flow_to_process->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(&flow_to_process->ndpi->flow);
+ flow_to_process->detection_data->last_ndpi_flow_struct_hash =
+ calculate_ndpi_flow_struct_hash(&flow_to_process->detection_data->flow);
}
else if (flow_to_process->detection_completed == 1)
{
- uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->ndpi->flow);
- if (hash != flow_to_process->last_ndpi_flow_struct_hash)
+ uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->detection_data->flow);
+ if (hash != flow_to_process->detection_data->last_ndpi_flow_struct_hash)
{
jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE);
- flow_to_process->last_ndpi_flow_struct_hash = hash;
+ flow_to_process->detection_data->last_ndpi_flow_struct_hash = hash;
}
}
- if (flow_to_process->ndpi->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process)
+ if (flow_to_process->detection_data->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process)
{
- free_ndpi_structs(flow_to_process);
+ free_detection_data(flow_to_process);
flow_to_process->flow_extended.flow_basic.type = FT_FINISHED;
}
#ifdef ENABLE_ZLIB
else if (nDPId_options.enable_zlib_compression != 0)
{
- int ret = ndpi_data_deflate(flow_to_process);
- if (ret <= 0)
- {
- syslog(LOG_DAEMON | LOG_ERR,
- "zLib compression failed for flow %u with error code: %d",
- flow_to_process->flow_extended.flow_id,
- ret);
- }
+ check_for_compressable_flows(reader_thread);
}
#endif
}
@@ -3255,7 +3351,8 @@ static void print_subopt_usage(void)
if (*token != NULL)
{
fprintf(stderr, "\t\t%s = ", *token);
- switch (index++)
+ enum nDPId_subopts subopts = index++;
+ switch (subopts)
{
case MAX_FLOWS_PER_THREAD:
fprintf(stderr, "%llu\n", nDPId_options.max_flows_per_thread);
@@ -3272,6 +3369,14 @@ static void print_subopt_usage(void)
case IDLE_SCAN_PERIOD:
fprintf(stderr, "%llu\n", nDPId_options.idle_scan_period);
break;
+#ifdef ENABLE_ZLIB
+ case COMPRESSION_SCAN_PERIOD:
+ fprintf(stderr, "%llu\n", nDPId_options.compression_scan_period);
+ break;
+ case COMPRESSION_FLOW_INACTIVITY:
+ fprintf(stderr, "%llu\n", nDPId_options.compression_flow_inactivity);
+ break;
+#endif
case GENERIC_MAX_IDLE_TIME:
fprintf(stderr, "%llu\n", nDPId_options.generic_max_idle_time);
break;
@@ -3293,8 +3398,6 @@ static void print_subopt_usage(void)
case MAX_PACKETS_PER_FLOW_TO_PROCESS:
fprintf(stderr, "%llu\n", nDPId_options.max_packets_per_flow_to_process);
break;
- default:
- break;
}
}
else
@@ -3468,6 +3571,14 @@ static int nDPId_parse_options(int argc, char ** argv)
case IDLE_SCAN_PERIOD:
nDPId_options.idle_scan_period = value_llu;
break;
+#ifdef ENABLE_ZLIB
+ case COMPRESSION_SCAN_PERIOD:
+ nDPId_options.compression_scan_period = value_llu;
+ break;
+ case COMPRESSION_FLOW_INACTIVITY:
+ nDPId_options.compression_flow_inactivity = value_llu;
+ break;
+#endif
case GENERIC_MAX_IDLE_TIME:
nDPId_options.generic_max_idle_time = value_llu;
break;
@@ -3665,7 +3776,7 @@ int main(int argc, char ** argv)
openlog("nDPId", LOG_CONS | LOG_PERROR, LOG_DAEMON);
#ifdef ENABLE_MEMORY_PROFILING
- syslog(LOG_DAEMON, "size/flow: %zu bytes\n", sizeof(struct nDPId_flow_info) + sizeof(struct nDPI_data));
+ syslog(LOG_DAEMON, "size/flow: %zu bytes\n", sizeof(struct nDPId_flow_info) + sizeof(struct nDPId_detection_data));
#endif
if (setup_reader_threads() != 0)