summaryrefslogtreecommitdiff
path: root/nDPId.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-12-15 23:25:32 +0100
committerToni Uhlig <matzeton@googlemail.com>2022-01-20 00:50:38 +0100
commit9e07a57566cc45bf92a845d8cee968d72e0f314e (patch)
tree8f1a6bfd08bd68a5253fadf3a01beecda77b1c95 /nDPId.c
parenta35fc1d5ea8570609cc0c8cf6edadc81f8f5bb76 (diff)
Major nDPId extension. Sorry for the huge commit.
- nDPId: fixed invalid IP4/IP6 tuple compare - nDPIsrvd: fixed caching issue (finally) - added tiny c example (can be used to check flow manager sanity) - c-captured: use flow_last_seen timestamp from `struct nDPIsrvd_flow` - README.md update: added example JSON sequence - nDPId: added new flow event `update` necessary for correct timeout handling (and other future use-cases) - nDPIsrvd.h and nDPIsrvd.py: switched to an instance (consists of an alias/source tuple) based flow manager - every flow related event **must** now serialize `alias`, `source`, `flow_id`, `flow_last_seen` and `flow_idle_time` to make the timeout handling and verification process work correctly - nDPIsrvd.h: ability to profile any dynamic memory (de-)allocation - nDPIsrvd.py: removed PcapPacket class (unused) - py-flow-dashboard and py-flow-multiprocess: fixed race condition - py-flow-info: print statusbar with probably useful information - nDPId/nDPIsrvd.h: switched from packet-flow only timestamps (`pkt_*sec`) to a generic flow event timestamp `ts_msec` - nDPId-test: added additional checks - nDPId: increased ICMP flow timeout - nDPId: using event based i/o if capturing packets from a device - nDPIsrvd: fixed memory leak on shutdown if remote descriptors were still connected Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r--nDPId.c769
1 files changed, 560 insertions, 209 deletions
diff --git a/nDPId.c b/nDPId.c
index c58d43dd5..3320f5733 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -14,6 +14,7 @@
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
+#include <sys/epoll.h>
#include <sys/ioctl.h>
#include <sys/un.h>
#ifndef NO_MAIN
@@ -39,10 +40,15 @@
#error "Compare and Swap aka __sync_fetch_and_add not available on your platform!"
#endif
-#if nDPId_MAX_READER_THREADS < 0
+#if nDPId_MAX_READER_THREADS <= 0
#error "Invalid value for nDPId_MAX_READER_THREADS"
#endif
+#if nDPId_FLOW_SCAN_INTERVAL > nDPId_GENERIC_IDLE_TIME || nDPId_FLOW_SCAN_INTERVAL > nDPId_ICMP_IDLE_TIME || \
+ nDPId_FLOW_SCAN_INTERVAL > nDPId_TCP_IDLE_TIME || nDPId_FLOW_SCAN_INTERVAL > nDPId_UDP_IDLE_TIME
+#error "Invalid value for nDPId_FLOW_SCAN_INTERVAL"
+#endif
+
enum nDPId_l3_type
{
L3_IP,
@@ -67,10 +73,10 @@ union nDPId_ip
enum nDPId_flow_type
{
- FT_UNKNOWN = 0,
- FT_SKIPPED,
- FT_FINISHED,
- FT_INFO
+ FT_UNKNOWN = 0, // should never happen, bug otherwise
+ FT_SKIPPED, // flow should not be processed, see command line args -I and -E
+ FT_FINISHED, // detection done and detection data free'd
+ FT_INFO // detection in progress, detection data allocated
};
/*
@@ -100,13 +106,14 @@ struct nDPId_flow_extended
{
struct nDPId_flow_basic flow_basic;
- uint32_t flow_id;
+ unsigned int long long flow_id;
uint16_t min_l4_payload_len;
uint16_t max_l4_payload_len;
unsigned long long int packets_processed;
uint64_t first_seen;
+ uint64_t last_flow_update;
unsigned long long int total_l4_payload_len;
};
@@ -157,14 +164,19 @@ struct nDPId_workflow
{
pcap_t * pcap_handle;
- int error_or_eof;
- int reserved_00;
+ uint16_t error_or_eof;
+ uint16_t is_pcap_file;
unsigned long long int packets_captured;
unsigned long long int packets_processed;
unsigned long long int total_skipped_flows;
unsigned long long int total_l4_data_len;
- unsigned long long int detected_flow_protocols;
+
+ unsigned long long int total_not_detected_flows;
+ unsigned long long int total_guessed_flows;
+ unsigned long long int total_detected_flows;
+ unsigned long long int total_flow_detection_updates;
+ unsigned long long int total_flow_updates;
#ifdef ENABLE_MEMORY_PROFILING
uint64_t last_memory_usage_log_time;
@@ -172,7 +184,7 @@ struct nDPId_workflow
#ifdef ENABLE_ZLIB
uint64_t last_compression_scan_time;
#endif
- uint64_t last_idle_scan_time;
+ uint64_t last_scan_time;
uint64_t last_time;
void ** ndpi_flows_active;
@@ -185,6 +197,8 @@ struct nDPId_workflow
unsigned long long int cur_idle_flows;
unsigned long long int total_idle_flows;
+ unsigned long long int total_events_serialized;
+
ndpi_serializer ndpi_serializer;
struct ndpi_detection_module_struct * ndpi_struct;
};
@@ -202,8 +216,11 @@ enum packet_event
{
PACKET_EVENT_INVALID = 0,
- PACKET_EVENT_PAYLOAD,
- PACKET_EVENT_PAYLOAD_FLOW,
+ PACKET_EVENT_PAYLOAD, // A single packet that does not belong to a flow for whatever reasons.
+ // E.g. it could be malformed and thus no flow handling is done.
+ // There may be additional use-cases in the future.
+ PACKET_EVENT_PAYLOAD_FLOW, // Special case; A packet event that belongs to a flow but does not include all
+ // information a flow event requires.
PACKET_EVENT_COUNT
};
@@ -215,6 +232,7 @@ enum flow_event
FLOW_EVENT_NEW,
FLOW_EVENT_END,
FLOW_EVENT_IDLE,
+ FLOW_EVENT_UPDATE, // Inform distributor applications about flows with a long lifetime.
FLOW_EVENT_GUESSED,
FLOW_EVENT_DETECTED,
@@ -267,6 +285,7 @@ static char const * const flow_event_name_table[FLOW_EVENT_COUNT] = {[FLOW_EVENT
[FLOW_EVENT_NEW] = "new",
[FLOW_EVENT_END] = "end",
[FLOW_EVENT_IDLE] = "idle",
+ [FLOW_EVENT_UPDATE] = "update",
[FLOW_EVENT_GUESSED] = "guessed",
[FLOW_EVENT_DETECTED] = "detected",
[FLOW_EVENT_DETECTION_UPDATE] = "detection-update",
@@ -345,13 +364,13 @@ static struct
unsigned long long int tick_resolution;
unsigned long long int reader_thread_count;
#ifdef ENABLE_MEMORY_PROFILING
- unsigned long long int memory_profiling_print_every;
+ unsigned long long int memory_profiling_log_interval;
#endif
#ifdef ENABLE_ZLIB
- unsigned long long int compression_scan_period;
+ unsigned long long int compression_scan_interval;
unsigned long long int compression_flow_inactivity;
#endif
- unsigned long long int idle_scan_period;
+ unsigned long long int flow_scan_interval;
unsigned long long int generic_max_idle_time;
unsigned long long int icmp_max_idle_time;
unsigned long long int udp_max_idle_time;
@@ -367,13 +386,13 @@ static struct
.tick_resolution = nDPId_TICK_RESOLUTION,
.reader_thread_count = nDPId_MAX_READER_THREADS / 2,
#ifdef ENABLE_MEMORY_PROFILING
- .memory_profiling_print_every = nDPId_LOG_MEMORY_USAGE_EVERY,
+ .memory_profiling_log_interval = nDPId_MEMORY_PROFILING_LOG_INTERVAL,
#endif
#ifdef ENABLE_ZLIB
- .compression_scan_period = nDPId_COMPRESSION_SCAN_PERIOD,
+ .compression_scan_interval = nDPId_COMPRESSION_SCAN_INTERVAL,
.compression_flow_inactivity = nDPId_COMPRESSION_FLOW_INACTIVITY,
#endif
- .idle_scan_period = nDPId_IDLE_SCAN_PERIOD,
+ .flow_scan_interval = nDPId_FLOW_SCAN_INTERVAL,
.generic_max_idle_time = nDPId_GENERIC_IDLE_TIME,
.icmp_max_idle_time = nDPId_ICMP_IDLE_TIME,
.udp_max_idle_time = nDPId_UDP_IDLE_TIME,
@@ -388,14 +407,14 @@ enum nDPId_subopts
MAX_IDLE_FLOWS_PER_THREAD,
TICK_RESOLUTION,
MAX_READER_THREADS,
- IDLE_SCAN_PERIOD,
#ifdef ENABLE_MEMORY_PROFILING
- MEMORY_PROFILING_PRINT_EVERY,
+ MEMORY_PROFILING_LOG_INTERVAL,
#endif
#ifdef ENABLE_ZLIB
- COMPRESSION_SCAN_PERIOD,
+ COMPRESSION_SCAN_INTERVAL,
COMPRESSION_FLOW_INACTIVITY,
#endif
+ FLOW_SCAN_INTVERAL,
GENERIC_MAX_IDLE_TIME,
ICMP_MAX_IDLE_TIME,
UDP_MAX_IDLE_TIME,
@@ -409,13 +428,13 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th
[TICK_RESOLUTION] = "tick-resolution",
[MAX_READER_THREADS] = "max-reader-threads",
#ifdef ENABLE_MEMORY_PROFILING
- [MEMORY_PROFILING_PRINT_EVERY] = "memory-profiling-print-every",
+ [MEMORY_PROFILING_LOG_INTERVAL] = "memory-profiling-log-interval",
#endif
#ifdef ENABLE_ZLIB
- [COMPRESSION_SCAN_PERIOD] = "compression-scan-period",
+ [COMPRESSION_SCAN_INTERVAL] = "compression-scan-interval",
[COMPRESSION_FLOW_INACTIVITY] = "compression-flow-activity",
#endif
- [IDLE_SCAN_PERIOD] = "idle-scan-period",
+ [FLOW_SCAN_INTVERAL] = "flow-scan-interval",
[GENERIC_MAX_IDLE_TIME] = "generic-max-idle-time",
[ICMP_MAX_IDLE_TIME] = "icmp-max-idle-time",
[UDP_MAX_IDLE_TIME] = "udp-max-idle-time",
@@ -425,11 +444,15 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th
[MAX_PACKETS_PER_FLOW_TO_PROCESS] = "max-packets-per-flow-to-process",
NULL};
+static int processing_threads_error_or_eof(void);
static void free_workflow(struct nDPId_workflow ** const workflow);
static void serialize_and_send(struct nDPId_reader_thread * const reader_thread);
static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
- struct nDPId_flow_info * const flow,
+ struct nDPId_flow_extended * const flow_ext,
enum flow_event event);
+static void jsonize_flow_detection_event(struct nDPId_reader_thread * const reader_thread,
+ struct nDPId_flow_info * const flow_info,
+ enum flow_event event);
#ifdef ENABLE_ZLIB
static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen)
@@ -624,7 +647,7 @@ static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int de
if (ret <= 0)
{
syslog(LOG_DAEMON | LOG_ERR,
- "zLib compression failed for flow %u with error code: %d",
+ "zLib compression failed for flow %llu with error code: %d",
flow_info->flow_extended.flow_id,
ret);
}
@@ -639,7 +662,7 @@ static void check_for_compressable_flows(struct nDPId_reader_thread * const read
{
struct nDPId_workflow * const workflow = reader_thread->workflow;
- if (workflow->last_compression_scan_time + nDPId_options.compression_scan_period < workflow->last_time)
+ if (workflow->last_compression_scan_time + nDPId_options.compression_scan_interval < workflow->last_time)
{
for (size_t comp_scan_index = 0; comp_scan_index < workflow->max_active_flows; ++comp_scan_index)
{
@@ -1053,13 +1076,13 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F
#if 1
written = snprintf(output + output_used,
BUFSIZ - output_used,
- "%u,",
+ "%llu,",
flow_finished->flow_info.flow_extended.flow_id);
#else
written =
snprintf(output + output_used,
BUFSIZ - output_used,
- "[%u, %u, %llu],",
+ "[%llu, %u, %llu],",
flow_finished->flow_info.flow_extended.flow_id,
flow_finished->flow_info.flow_extended.flow_basic.l4_protocol,
(unsigned long long int const)flow_finished->flow_info.flow_extended.flow_basic.last_seen);
@@ -1072,11 +1095,12 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F
struct nDPId_flow_info const * const flow_info = (struct nDPId_flow_info const *)flow_basic;
#if 1
- written = snprintf(output + output_used, BUFSIZ - output_used, "%u,", flow_info->flow_extended.flow_id);
+ written =
+ snprintf(output + output_used, BUFSIZ - output_used, "%llu,", flow_info->flow_extended.flow_id);
#else
written = snprintf(output + output_used,
BUFSIZ - output_used,
- "[%u, %u, %llu],",
+ "[%llu, %u, %llu],",
flow_info->flow_extended.flow_id,
flow_info->flow_extended.flow_basic.l4_protocol,
(unsigned long long int const)flow_info->flow_extended.flow_basic.last_seen);
@@ -1152,7 +1176,6 @@ static void log_flows(struct nDPId_reader_thread const * const reader_thread)
static struct nDPId_workflow * init_workflow(char const * const file_or_device)
{
- int pcap_argument_is_file = 0;
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
struct nDPId_workflow * workflow;
@@ -1178,19 +1201,26 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
{
workflow->pcap_handle =
pcap_open_offline_with_tstamp_precision(file_or_device, PCAP_TSTAMP_PRECISION_MICRO, pcap_error_buffer);
- pcap_argument_is_file = 1;
+ workflow->is_pcap_file = 1;
}
if (workflow->pcap_handle == NULL)
{
syslog(LOG_DAEMON | LOG_ERR,
- (pcap_argument_is_file == 0 ? "pcap_open_live: %.*s" : "pcap_open_offline_with_tstamp_precision: %.*s"),
+ (workflow->is_pcap_file == 0 ? "pcap_open_live: %.*s" : "pcap_open_offline_with_tstamp_precision: %.*s"),
(int)PCAP_ERRBUF_SIZE,
pcap_error_buffer);
free_workflow(&workflow);
return NULL;
}
+ if (workflow->is_pcap_file == 0 && pcap_setnonblock(workflow->pcap_handle, 1, pcap_error_buffer) == PCAP_ERROR)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "pcap_setnonblock: %.*s", (int)PCAP_ERRBUF_SIZE, pcap_error_buffer);
+ free_workflow(&workflow);
+ return NULL;
+ }
+
if (nDPId_options.bpf_str != NULL)
{
struct bpf_program fp;
@@ -1422,27 +1452,6 @@ static int setup_reader_threads(void)
return 0;
}
-static int ip_tuples_equal(struct nDPId_flow_basic const * const A, struct nDPId_flow_basic const * const B)
-{
- // generate a warning if the enum changes
- switch (A->l3_type)
- {
- case L3_IP:
- case L3_IP6:
- break;
- }
- if (A->l3_type == L3_IP && B->l3_type == L3_IP6)
- {
- return A->src.v4.ip == B->src.v4.ip && A->dst.v4.ip == B->dst.v4.ip;
- }
- else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6)
- {
- return A->src.v6.ip[0] == B->src.v6.ip[0] && A->src.v6.ip[1] == B->src.v6.ip[1] &&
- A->dst.v6.ip[0] == B->dst.v6.ip[0] && A->dst.v6.ip[1] == B->dst.v6.ip[1];
- }
- return 0;
-}
-
static int ip_tuples_compare(struct nDPId_flow_basic const * const A, struct nDPId_flow_basic const * const B)
{
// generate a warning if the enum changes
@@ -1452,38 +1461,63 @@ static int ip_tuples_compare(struct nDPId_flow_basic const * const A, struct nDP
case L3_IP6:
break;
}
- if (A->l3_type == L3_IP && B->l3_type == L3_IP6)
+
+ if (A->l3_type == L3_IP && B->l3_type == L3_IP)
{
- if (A->src.v4.ip < B->src.v4.ip || A->dst.v4.ip < B->dst.v4.ip)
+ if (A->src.v4.ip < B->src.v4.ip)
{
return -1;
}
- if (A->src.v4.ip > B->src.v4.ip || A->dst.v4.ip > B->dst.v4.ip)
+ if (A->src.v4.ip > B->src.v4.ip)
+ {
+ return 1;
+ }
+ if (A->dst.v4.ip < B->dst.v4.ip)
+ {
+ return -1;
+ }
+ if (A->dst.v4.ip > B->dst.v4.ip)
{
return 1;
}
}
else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6)
{
- if ((A->src.v6.ip[0] < B->src.v6.ip[0] && A->src.v6.ip[1] < B->src.v6.ip[1]) ||
- (A->dst.v6.ip[0] < B->dst.v6.ip[0] && A->dst.v6.ip[1] < B->dst.v6.ip[1]))
+ if (A->src.v6.ip[0] < B->src.v6.ip[0] && A->src.v6.ip[1] < B->src.v6.ip[1])
+ {
+ return -1;
+ }
+ if (A->src.v6.ip[0] > B->src.v6.ip[0] && A->src.v6.ip[1] > B->src.v6.ip[1])
+ {
+ return 1;
+ }
+ if (A->dst.v6.ip[0] < B->dst.v6.ip[0] && A->dst.v6.ip[1] < B->dst.v6.ip[1])
{
return -1;
}
- if ((A->src.v6.ip[0] > B->src.v6.ip[0] && A->src.v6.ip[1] > B->src.v6.ip[1]) ||
- (A->dst.v6.ip[0] > B->dst.v6.ip[0] && A->dst.v6.ip[1] > B->dst.v6.ip[1]))
+ if (A->dst.v6.ip[0] > B->dst.v6.ip[0] && A->dst.v6.ip[1] > B->dst.v6.ip[1])
{
return 1;
}
}
- if (A->src_port < B->src_port || A->dst_port < B->dst_port)
+
+ if (A->src_port < B->src_port)
+ {
+ return -1;
+ }
+ if (A->src_port > B->src_port)
+ {
+ return 1;
+ }
+ if (A->dst_port < B->dst_port)
{
return -1;
}
- else if (A->src_port > B->src_port || A->dst_port > B->dst_port)
+ if (A->dst_port > B->dst_port)
{
return 1;
}
+
return 0;
}
@@ -1506,9 +1540,25 @@ static uint64_t get_l4_protocol_idle_time(uint8_t l4_protocol)
static int is_l4_protocol_timed_out(struct nDPId_workflow const * const workflow,
struct nDPId_flow_basic const * const flow_basic)
{
- return flow_basic->last_seen + get_l4_protocol_idle_time(flow_basic->l4_protocol) < workflow->last_time ||
+ uint64_t itime =
+ get_l4_protocol_idle_time(flow_basic->l4_protocol) - (flow_basic->last_seen % nDPId_options.flow_scan_interval);
+
+ return (flow_basic->last_seen + itime <= workflow->last_time) ||
(flow_basic->tcp_fin_rst_seen == 1 &&
- flow_basic->last_seen + nDPId_options.tcp_max_post_end_flow_time < workflow->last_time);
+ flow_basic->last_seen + nDPId_options.tcp_max_post_end_flow_time <= workflow->last_time);
+}
+
+static int is_flow_update_required(struct nDPId_workflow const * const workflow,
+ struct nDPId_flow_extended const * const flow_ext)
+{
+ uint64_t itime = get_l4_protocol_idle_time(flow_ext->flow_basic.l4_protocol) / 2;
+
+ if (is_l4_protocol_timed_out(workflow, &flow_ext->flow_basic) != 0)
+ {
+ return 0;
+ }
+
+ return flow_ext->last_flow_update + itime <= workflow->last_time;
}
static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
@@ -1572,12 +1622,6 @@ static int ndpi_workflow_node_cmp(void const * const A, void const * const B)
return 1;
}
- if (ip_tuples_equal(flow_basic_a, flow_basic_b) != 0 && flow_basic_a->src_port == flow_basic_b->src_port &&
- flow_basic_a->dst_port == flow_basic_b->dst_port)
- {
- return 0;
- }
-
return ip_tuples_compare(flow_basic_a, flow_basic_b);
}
@@ -1602,11 +1646,11 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
if (flow_basic->tcp_fin_rst_seen != 0)
{
- jsonize_flow_event(reader_thread, &flow_finished->flow_info, FLOW_EVENT_END);
+ jsonize_flow_event(reader_thread, &flow_finished->flow_info.flow_extended, FLOW_EVENT_END);
}
else
{
- jsonize_flow_event(reader_thread, &flow_finished->flow_info, FLOW_EVENT_IDLE);
+ jsonize_flow_event(reader_thread, &flow_finished->flow_info.flow_extended, FLOW_EVENT_IDLE);
}
break;
}
@@ -1644,20 +1688,22 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
if (protocol_was_guessed != 0)
{
- jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_GUESSED);
+ workflow->total_guessed_flows++;
+ jsonize_flow_detection_event(reader_thread, flow_info, FLOW_EVENT_GUESSED);
}
else
{
- jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_NOT_DETECTED);
+ workflow->total_not_detected_flows++;
+ jsonize_flow_detection_event(reader_thread, flow_info, FLOW_EVENT_NOT_DETECTED);
}
}
if (flow_basic->tcp_fin_rst_seen != 0)
{
- jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_END);
+ jsonize_flow_event(reader_thread, &flow_info->flow_extended, FLOW_EVENT_END);
}
else
{
- jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_IDLE);
+ jsonize_flow_event(reader_thread, &flow_info->flow_extended, FLOW_EVENT_IDLE);
}
break;
}
@@ -1673,15 +1719,58 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa
{
struct nDPId_workflow * const workflow = reader_thread->workflow;
- if (workflow->last_idle_scan_time + nDPId_options.idle_scan_period < workflow->last_time)
+ for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index)
+ {
+ ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow);
+ process_idle_flow(reader_thread, idle_scan_index);
+ }
+}
+
+static void ndpi_flow_update_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
+{
+ struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)user_data;
+ struct nDPId_workflow * const workflow = reader_thread->workflow;
+ struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A;
+
+ (void)depth;
+
+ if (workflow == NULL || flow_basic == NULL)
+ {
+ return;
+ }
+
+ if (which == ndpi_preorder || which == ndpi_leaf)
{
- for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index)
+ switch (flow_basic->type)
{
- ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow);
- process_idle_flow(reader_thread, idle_scan_index);
+ case FT_UNKNOWN:
+ case FT_SKIPPED:
+ break;
+
+ case FT_FINISHED:
+ case FT_INFO:
+ {
+ struct nDPId_flow_extended * const flow_ext = (struct nDPId_flow_extended *)flow_basic;
+
+ if (is_flow_update_required(workflow, flow_ext) != 0)
+ {
+ workflow->total_flow_updates++;
+ jsonize_flow_event(reader_thread, flow_ext, FLOW_EVENT_UPDATE);
+ flow_ext->last_flow_update = workflow->last_time;
+ }
+ break;
+ }
}
+ }
+}
- workflow->last_idle_scan_time = workflow->last_time;
+static void check_for_flow_updates(struct nDPId_reader_thread * const reader_thread)
+{
+ struct nDPId_workflow * const workflow = reader_thread->workflow;
+
+ for (size_t update_scan_index = 0; update_scan_index < workflow->max_active_flows; ++update_scan_index)
+ {
+ ndpi_twalk(workflow->ndpi_flows_active[update_scan_index], ndpi_flow_update_scan_walker, reader_thread);
}
}
@@ -1782,42 +1871,55 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu
if (event == DAEMON_EVENT_INIT)
{
- ndpi_serialize_string_int64(&workflow->ndpi_serializer,
- "max-flows-per-thread",
- nDPId_options.max_flows_per_thread);
- ndpi_serialize_string_int64(&workflow->ndpi_serializer,
- "max-idle-flows-per-thread",
- nDPId_options.max_idle_flows_per_thread);
- ndpi_serialize_string_int64(&workflow->ndpi_serializer, "tick-resolution", nDPId_options.tick_resolution);
- ndpi_serialize_string_int64(&workflow->ndpi_serializer,
- "reader-thread-count",
- nDPId_options.reader_thread_count);
- ndpi_serialize_string_int64(&workflow->ndpi_serializer, "idle-scan-period", nDPId_options.idle_scan_period);
- ndpi_serialize_string_int64(&workflow->ndpi_serializer,
- "generic-max-idle-time",
- nDPId_options.generic_max_idle_time);
- ndpi_serialize_string_int64(&workflow->ndpi_serializer, "icmp-max-idle-time", nDPId_options.icmp_max_idle_time);
- ndpi_serialize_string_int64(&workflow->ndpi_serializer, "udp-max-idle-time", nDPId_options.udp_max_idle_time);
- ndpi_serialize_string_int64(&workflow->ndpi_serializer, "tcp-max-idle-time", nDPId_options.tcp_max_idle_time);
- ndpi_serialize_string_int64(&workflow->ndpi_serializer,
- "tcp-max-post-end-flow-time",
- nDPId_options.tcp_max_post_end_flow_time);
- ndpi_serialize_string_int64(&workflow->ndpi_serializer,
- "max-packets-per-flow-to-send",
- nDPId_options.max_packets_per_flow_to_send);
- ndpi_serialize_string_int64(&workflow->ndpi_serializer,
- "max-packets-per-flow-to-process",
- nDPId_options.max_packets_per_flow_to_process);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "max-flows-per-thread",
+ nDPId_options.max_flows_per_thread);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "max-idle-flows-per-thread",
+ nDPId_options.max_idle_flows_per_thread);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "tick-resolution", nDPId_options.tick_resolution);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "reader-thread-count",
+ nDPId_options.reader_thread_count);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "flow-scan-interval",
+ nDPId_options.flow_scan_interval);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "generic-max-idle-time",
+ nDPId_options.generic_max_idle_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "icmp-max-idle-time",
+ nDPId_options.icmp_max_idle_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "udp-max-idle-time", nDPId_options.udp_max_idle_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "tcp-max-idle-time",
+ nDPId_options.tcp_max_idle_time + nDPId_options.tcp_max_post_end_flow_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "max-packets-per-flow-to-send",
+ nDPId_options.max_packets_per_flow_to_send);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "max-packets-per-flow-to-process",
+ nDPId_options.max_packets_per_flow_to_process);
+ }
+ else if (event == DAEMON_EVENT_SHUTDOWN)
+ {
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "total-events-serialized",
+ workflow->total_events_serialized +
+ 1 /* DAEMON_EVENT_SHUTDOWN is an event as well */);
}
serialize_and_send(reader_thread);
}
static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_flow_extended const * const flow_ext)
{
- ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_id", flow_ext->flow_id);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packet_id", flow_ext->packets_processed);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_id", flow_ext->flow_id);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packets_processed", flow_ext->packets_processed);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_first_seen", flow_ext->first_seen);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_last_seen", flow_ext->flow_basic.last_seen);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "flow_idle_time",
+ get_l4_protocol_idle_time(flow_ext->flow_basic.l4_protocol));
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_min_l4_payload_len", flow_ext->min_l4_payload_len);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_max_l4_payload_len", flow_ext->max_l4_payload_len);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_tot_l4_payload_len", flow_ext->total_l4_payload_len);
@@ -1827,6 +1929,7 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl
? flow_ext->total_l4_payload_len / flow_ext->packets_processed
: 0));
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow_ext->flow_basic.tcp_is_midstream_flow);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "ts_msec", workflow->last_time);
}
static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thread)
@@ -1890,6 +1993,7 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread,
json_str_len + 1,
(int)json_str_len,
json_str);
+
if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str))
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -1972,12 +2076,14 @@ static void serialize_and_send(struct nDPId_reader_thread * const reader_thread)
}
else
{
+ reader_thread->workflow->total_events_serialized++;
send_to_json_sink(reader_thread, json_str, json_str_len);
}
ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer);
}
/* Slightly modified code from: https://en.wikibooks.org/wiki/Algorithm_Implementation/Miscellaneous/Base64 */
+static char const * const base64_ret_strings[] = {"Success", "Buffer too small"};
static int base64encode(uint8_t const * const data_buf,
size_t dataLength,
char * const result,
@@ -2109,8 +2215,6 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa
{
return;
}
- ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_id", flow_ext->flow_id);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packet_id", flow_ext->packets_processed);
}
ndpi_serialize_string_int32(&workflow->ndpi_serializer, "packet_event_id", event);
@@ -2125,34 +2229,53 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa
jsonize_basic(reader_thread);
+ if (event == PACKET_EVENT_PAYLOAD_FLOW)
+ {
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_id", flow_ext->flow_id);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packet_id", flow_ext->packets_processed);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_last_seen", flow_ext->flow_basic.last_seen);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "flow_idle_time",
+ get_l4_protocol_idle_time(flow_ext->flow_basic.l4_protocol));
+ }
+
char base64_data[NETWORK_BUFFER_MAX_SIZE];
size_t base64_data_len = sizeof(base64_data);
int base64_retval = base64encode(packet, header->caplen, base64_data, &base64_data_len);
ndpi_serialize_string_boolean(&workflow->ndpi_serializer, "pkt_oversize", base64_data_len > sizeof(base64_data));
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pkt_ts_sec", header->ts.tv_sec);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pkt_ts_usec", header->ts.tv_usec);
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_caplen", header->caplen);
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_type", pkt_type);
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l3_offset", pkt_l3_offset);
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_offset", pkt_l4_offset);
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_len", header->len);
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_len", pkt_l4_len);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "ts_msec", workflow->last_time);
- if (base64_retval == 0 && base64_data_len > 0 &&
- ndpi_serialize_string_binary(&workflow->ndpi_serializer, "pkt", base64_data, base64_data_len) != 0)
+ if (base64_retval == 0 && base64_data_len > 0)
+ {
+ if (ndpi_serialize_string_binary(&workflow->ndpi_serializer, "pkt", base64_data, base64_data_len) != 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "[%8llu, %d] JSON serializing base64 packet buffer failed",
+ reader_thread->workflow->packets_captured,
+ reader_thread->array_index);
+ }
+ }
+ else
{
syslog(LOG_DAEMON | LOG_ERR,
- "[%8llu, %d] JSON serializing base64 packet buffer failed",
+ "[%8llu, %d] Base64 encoding failed with: %s.",
reader_thread->workflow->packets_captured,
- reader_thread->array_index);
+ reader_thread->array_index,
+ base64_ret_strings[base64_retval]);
}
serialize_and_send(reader_thread);
}
-/* I decided against ndpi_flow2json as does not fulfill my needs. */
+/* I decided against ndpi_flow2json as it does not fulfill my needs. */
static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
- struct nDPId_flow_info * const flow,
+ struct nDPId_flow_extended * const flow_ext,
enum flow_event event)
{
struct nDPId_workflow * const workflow = reader_thread->workflow;
@@ -2168,8 +2291,8 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[FLOW_EVENT_INVALID]);
}
jsonize_basic(reader_thread);
- jsonize_flow(workflow, &flow->flow_extended);
- jsonize_l3_l4(workflow, &flow->flow_extended.flow_basic);
+ jsonize_flow(workflow, flow_ext);
+ jsonize_l3_l4(workflow, &flow_ext->flow_basic);
switch (event)
{
@@ -2180,6 +2303,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
case FLOW_EVENT_NEW:
case FLOW_EVENT_END:
case FLOW_EVENT_IDLE:
+ case FLOW_EVENT_UPDATE:
ndpi_serialize_string_int32(&workflow->ndpi_serializer,
"flow_datalink",
pcap_datalink(reader_thread->workflow->pcap_handle));
@@ -2190,29 +2314,79 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
case FLOW_EVENT_NOT_DETECTED:
case FLOW_EVENT_GUESSED:
+ case FLOW_EVENT_DETECTED:
+ case FLOW_EVENT_DETECTION_UPDATE:
+ syslog(LOG_DAEMON | LOG_ERR,
+ "[%8llu, %4llu] internal error / invalid function call",
+ workflow->packets_captured,
+ flow_ext->flow_id);
+ break;
+ }
+
+ serialize_and_send(reader_thread);
+}
+
+static void jsonize_flow_detection_event(struct nDPId_reader_thread * const reader_thread,
+ struct nDPId_flow_info * const flow_info,
+ enum flow_event event)
+{
+ struct nDPId_workflow * const workflow = reader_thread->workflow;
+ char const ev[] = "flow_event_name";
+
+ ndpi_serialize_string_int32(&workflow->ndpi_serializer, "flow_event_id", event);
+ if (event > FLOW_EVENT_INVALID && event < FLOW_EVENT_COUNT)
+ {
+ ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[event]);
+ }
+ else
+ {
+ ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[FLOW_EVENT_INVALID]);
+ }
+ jsonize_basic(reader_thread);
+ jsonize_flow(workflow, &flow_info->flow_extended);
+ jsonize_l3_l4(workflow, &flow_info->flow_extended.flow_basic);
+
+ switch (event)
+ {
+ case FLOW_EVENT_INVALID:
+ case FLOW_EVENT_COUNT:
+ break;
+
+ case FLOW_EVENT_NEW:
+ case FLOW_EVENT_END:
+ case FLOW_EVENT_IDLE:
+ case FLOW_EVENT_UPDATE:
+ syslog(LOG_DAEMON | LOG_ERR,
+ "[%8llu, %4llu] internal error / invalid function call",
+ workflow->packets_captured,
+ flow_info->flow_extended.flow_id);
+ break;
+
+ case FLOW_EVENT_NOT_DETECTED:
+ case FLOW_EVENT_GUESSED:
if (ndpi_dpi2json(workflow->ndpi_struct,
- &flow->detection_data->flow,
- flow->detection_data->guessed_l7_protocol,
+ &flow_info->detection_data->flow,
+ flow_info->detection_data->guessed_l7_protocol,
&workflow->ndpi_serializer) != 0)
{
syslog(LOG_DAEMON | LOG_ERR,
- "[%8llu, %4u] ndpi_dpi2json failed for not-detected/guessed flow",
+ "[%8llu, %4llu] ndpi_dpi2json failed for not-detected/guessed flow",
workflow->packets_captured,
- flow->flow_extended.flow_id);
+ flow_info->flow_extended.flow_id);
}
break;
case FLOW_EVENT_DETECTED:
case FLOW_EVENT_DETECTION_UPDATE:
if (ndpi_dpi2json(workflow->ndpi_struct,
- &flow->detection_data->flow,
- flow->detection_data->detected_l7_protocol,
+ &flow_info->detection_data->flow,
+ flow_info->detection_data->detected_l7_protocol,
&workflow->ndpi_serializer) != 0)
{
syslog(LOG_DAEMON | LOG_ERR,
- "[%8llu, %4u] ndpi_dpi2json failed for detected/detection-update flow",
+ "[%8llu, %4llu] ndpi_dpi2json failed for detected/detection-update flow",
workflow->packets_captured,
- flow->flow_extended.flow_id);
+ flow_info->flow_extended.flow_id);
}
break;
}
@@ -2465,16 +2639,20 @@ static uint32_t calculate_ndpi_flow_struct_hash(struct ndpi_flow_struct const *
hash += ndpi_flow->dst->detected_protocol_bitmask.fds_bits[i];
}
- /* FIXME: Adding the first character and the length of the host/server name is not stable, but seems to work. */
- hash += ndpi_flow->host_server_name[0];
- hash += strnlen((const char *)ndpi_flow->host_server_name, sizeof(ndpi_flow->host_server_name));
+ size_t host_server_name_len =
+ strnlen((const char *)ndpi_flow->host_server_name, sizeof(ndpi_flow->host_server_name));
+ hash += host_server_name_len;
+ hash += murmur3_32((uint8_t const *)&ndpi_flow->host_server_name,
+ sizeof(ndpi_flow->host_server_name),
+ nDPId_FLOW_STRUCT_SEED);
return hash;
}
+/* Some constants stolen from ndpiReader. */
#define SNAP 0xaa
/* mask for FCF */
-#define WIFI_DATA 0x2 /* 0000 0010 */
+#define WIFI_DATA 0x2
#define FCF_TYPE(fc) (((fc) >> 2) & 0x3) /* 0000 0011 = 0x3 */
#define FCF_TO_DS(fc) ((fc)&0x0100)
#define FCF_FROM_DS(fc) ((fc)&0x0200)
@@ -2731,6 +2909,26 @@ static struct nDPId_flow_basic * add_new_flow(struct nDPId_workflow * const work
return flow_basic;
}
+static void do_periodically_work(struct nDPId_reader_thread * const reader_thread)
+{
+ if (reader_thread->workflow->last_scan_time + nDPId_options.flow_scan_interval <=
+ reader_thread->workflow->last_time)
+ {
+ check_for_idle_flows(reader_thread);
+ check_for_flow_updates(reader_thread);
+ reader_thread->workflow->last_scan_time = reader_thread->workflow->last_time;
+ }
+#ifdef ENABLE_MEMORY_PROFILING
+ if (reader_thread->workflow->last_memory_usage_log_time + nDPId_options.memory_profiling_log_interval <=
+ reader_thread->workflow->last_time)
+ {
+ log_memory_usage(reader_thread);
+ log_flows(reader_thread);
+ reader_thread->workflow->last_memory_usage_log_time = reader_thread->workflow->last_time;
+ }
+#endif
+}
+
static void ndpi_process_packet(uint8_t * const args,
struct pcap_pkthdr const * const header,
uint8_t const * const packet)
@@ -2776,17 +2974,12 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->packets_captured++;
time_ms = ((uint64_t)header->ts.tv_sec) * nDPId_options.tick_resolution +
header->ts.tv_usec / (1000000 / nDPId_options.tick_resolution);
- workflow->last_time = time_ms;
-
- check_for_idle_flows(reader_thread);
-#ifdef ENABLE_MEMORY_PROFILING
- if (workflow->last_memory_usage_log_time + nDPId_options.memory_profiling_print_every < workflow->last_time)
+ if (workflow->last_time < time_ms)
{
- log_memory_usage(reader_thread);
- log_flows(reader_thread);
- workflow->last_memory_usage_log_time = workflow->last_time;
+ workflow->last_time = time_ms;
}
-#endif
+
+ do_periodically_work(reader_thread);
if (process_datalink_layer(reader_thread, header, packet, &ip_offset, &type) != 0)
{
@@ -2949,6 +3142,11 @@ static void ndpi_process_packet(uint8_t * const args,
flow_basic.src_port = ntohs(udp->source);
flow_basic.dst_port = ntohs(udp->dest);
}
+ else
+ {
+ /* Use layer4 length returned from libnDPI. */
+ l4_payload_len = l4_len;
+ }
/* distribute flows to threads while keeping stability (same flow goes always to same thread) */
thread_index += (flow_basic.src_port < flow_basic.dst_port ? flow_basic.dst_port : flow_basic.src_port);
@@ -2957,8 +3155,6 @@ static void ndpi_process_packet(uint8_t * const args,
{
return;
}
- workflow->packets_processed++;
- workflow->total_l4_data_len += l4_len;
/* calculate flow hash for btree find, search(insert) */
switch (flow_basic.l3_type)
@@ -3142,8 +3338,8 @@ static void ndpi_process_packet(uint8_t * const args,
struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result;
/* Update last seen timestamp for timeout handling. */
- flow_basic_to_process->last_seen = time_ms;
- /* TCP-FIN: indicates that at least one side wants to end the connection (timeout handling as well) */
+ flow_basic_to_process->last_seen = workflow->last_time;
+ /* TCP-FIN: indicates that at least one side wants to end the connection. */
if (flow_basic.tcp_fin_rst_seen != 0)
{
flow_basic_to_process->tcp_fin_rst_seen = 1;
@@ -3152,11 +3348,10 @@ static void ndpi_process_packet(uint8_t * const args,
switch (flow_basic_to_process->type)
{
case FT_UNKNOWN:
- return;
case FT_SKIPPED:
return;
+
case FT_FINISHED:
- return;
case FT_INFO:
break;
}
@@ -3169,7 +3364,7 @@ static void ndpi_process_packet(uint8_t * const args,
if (ret <= 0)
{
syslog(LOG_DAEMON | LOG_ERR,
- "zLib decompression failed for existing flow %u with error code: %d",
+ "zLib decompression failed for existing flow %llu with error code: %d",
flow_to_process->flow_extended.flow_id,
ret);
return;
@@ -3177,23 +3372,35 @@ static void ndpi_process_packet(uint8_t * const args,
}
#endif
- if (direction_changed != 0)
+ if (flow_to_process->detection_data != NULL)
{
- ndpi_src = &flow_to_process->detection_data->dst;
- ndpi_dst = &flow_to_process->detection_data->src;
+ if (direction_changed != 0)
+ {
+ ndpi_src = &flow_to_process->detection_data->dst;
+ ndpi_dst = &flow_to_process->detection_data->src;
+ }
+ else
+ {
+ ndpi_src = &flow_to_process->detection_data->src;
+ ndpi_dst = &flow_to_process->detection_data->dst;
+ }
}
else
{
- ndpi_src = &flow_to_process->detection_data->src;
- ndpi_dst = &flow_to_process->detection_data->dst;
+ ndpi_src = NULL;
+ ndpi_dst = NULL;
}
}
flow_to_process->flow_extended.packets_processed++;
flow_to_process->flow_extended.total_l4_payload_len += l4_payload_len;
+ workflow->packets_processed++;
+ workflow->total_l4_data_len += l4_payload_len;
+
if (flow_to_process->flow_extended.first_seen == 0)
{
- flow_to_process->flow_extended.first_seen = flow_to_process->flow_extended.flow_basic.last_seen = time_ms;
+ flow_to_process->flow_extended.first_seen = flow_to_process->flow_extended.flow_basic.last_seen =
+ flow_to_process->flow_extended.last_flow_update = workflow->last_time;
}
if (l4_payload_len > flow_to_process->flow_extended.max_l4_payload_len)
{
@@ -3204,11 +3411,17 @@ static void ndpi_process_packet(uint8_t * const args,
flow_to_process->flow_extended.min_l4_payload_len = l4_payload_len;
}
+ if (flow_to_process->flow_extended.flow_basic.type != FT_INFO)
+ {
+ /* Only FT_INFO goes through the whole detection process. */
+ return;
+ }
+
if (is_new_flow != 0)
{
flow_to_process->flow_extended.max_l4_payload_len = l4_payload_len;
flow_to_process->flow_extended.min_l4_payload_len = l4_payload_len;
- jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_NEW);
+ jsonize_flow_event(reader_thread, &flow_to_process->flow_extended, FLOW_EVENT_NEW);
}
jsonize_packet_event(reader_thread,
@@ -3225,7 +3438,8 @@ static void ndpi_process_packet(uint8_t * const args,
{
if (flow_to_process->detection_completed != 0)
{
- jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE);
+ reader_thread->workflow->total_flow_detection_updates++;
+ jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE);
}
else
{
@@ -3235,11 +3449,13 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->ndpi_struct, &flow_to_process->detection_data->flow, 1, &protocol_was_guessed);
if (protocol_was_guessed != 0)
{
- jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED);
+ workflow->total_guessed_flows++;
+ jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED);
}
else
{
- jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_NOT_DETECTED);
+ reader_thread->workflow->total_not_detected_flows++;
+ jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_NOT_DETECTED);
}
}
}
@@ -3249,7 +3465,7 @@ static void ndpi_process_packet(uint8_t * const args,
&flow_to_process->detection_data->flow,
ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6,
ip_size,
- time_ms,
+ workflow->last_time,
ndpi_src,
ndpi_dst);
@@ -3257,8 +3473,8 @@ static void ndpi_process_packet(uint8_t * const args,
flow_to_process->detection_completed == 0)
{
flow_to_process->detection_completed = 1;
- workflow->detected_flow_protocols++;
- jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED);
+ workflow->total_detected_flows++;
+ jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED);
flow_to_process->detection_data->last_ndpi_flow_struct_hash =
calculate_ndpi_flow_struct_hash(&flow_to_process->detection_data->flow);
}
@@ -3267,7 +3483,8 @@ static void ndpi_process_packet(uint8_t * const args,
uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->detection_data->flow);
if (hash != flow_to_process->detection_data->last_ndpi_flow_struct_hash)
{
- jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE);
+ workflow->total_flow_detection_updates++;
+ jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE);
flow_to_process->detection_data->last_ndpi_flow_struct_hash = hash;
}
}
@@ -3285,19 +3502,93 @@ static void ndpi_process_packet(uint8_t * const args,
#endif
}
-static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread)
+static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
{
if (reader_thread->workflow != NULL && reader_thread->workflow->pcap_handle != NULL)
{
-
- if (pcap_loop(reader_thread->workflow->pcap_handle, -1, &ndpi_process_packet, (uint8_t *)reader_thread) ==
- PCAP_ERROR)
+ if (reader_thread->workflow->is_pcap_file != 0)
+ {
+ switch (pcap_loop(reader_thread->workflow->pcap_handle, -1, &ndpi_process_packet, (uint8_t *)reader_thread))
+ {
+ case PCAP_ERROR:
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Error while reading pcap file: '%s'",
+ pcap_geterr(reader_thread->workflow->pcap_handle));
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ return;
+ case PCAP_ERROR_BREAK:
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ return;
+ default:
+ return;
+ }
+ }
+ else
{
+ int pcap_fd = pcap_get_selectable_fd(reader_thread->workflow->pcap_handle);
+ if (pcap_fd < 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "%s", "Got an invalid PCAP fd");
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ return;
+ }
- syslog(LOG_DAEMON | LOG_ERR,
- "Error while reading pcap file: '%s'",
- pcap_geterr(reader_thread->workflow->pcap_handle));
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ int epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ if (epoll_fd < 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Got an invalid epoll fd: %s", strerror(errno));
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ return;
+ }
+
+ struct epoll_event event = {};
+ event.events = EPOLLIN;
+ event.data.fd = pcap_fd;
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pcap_fd, &event) != 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Could not add pcap fd %d to epoll fd %d: %s",
+ epoll_fd,
+ pcap_fd,
+ strerror(errno));
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ return;
+ }
+
+ struct epoll_event events[32];
+ size_t const events_size = sizeof(events) / sizeof(events[0]);
+ int const timeout_ms = 1000; /* TODO: Configurable? */
+ int nready;
+ while (nDPId_main_thread_shutdown == 0 && processing_threads_error_or_eof() == 0)
+ {
+ nready = epoll_wait(epoll_fd, events, events_size, timeout_ms);
+
+ if (nready == 0)
+ {
+ reader_thread->workflow->last_time += timeout_ms;
+
+ do_periodically_work(reader_thread);
+ }
+
+ for (int i = 0; i < nready; ++i)
+ {
+ switch (pcap_dispatch(
+ reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread))
+ {
+ case PCAP_ERROR:
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Error while reading from pcap device: '%s'",
+ pcap_geterr(reader_thread->workflow->pcap_handle));
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ break;
+ case PCAP_ERROR_BREAK:
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ return;
+ default:
+ break;
+ }
+ }
+ }
}
}
}
@@ -3446,6 +3737,16 @@ static void process_remaining_flows(void)
{
for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i)
{
+ if (fcntl(reader_threads[i].json_sockfd,
+ F_SETFL,
+ fcntl(reader_threads[i].json_sockfd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Could not set JSON fd %d to blocking mode for shutdown: %s",
+ reader_threads[i].json_sockfd,
+ strerror(errno));
+ }
+
for (size_t idle_scan_index = 0; idle_scan_index < reader_threads[i].workflow->max_active_flows;
++idle_scan_index)
{
@@ -3456,8 +3757,6 @@ static void process_remaining_flows(void)
}
jsonize_daemon(&reader_threads[i], DAEMON_EVENT_SHUTDOWN);
- close(reader_threads[i].json_sockfd);
- reader_threads[i].json_sockfd = -1;
}
}
@@ -3468,7 +3767,11 @@ static int stop_reader_threads(void)
unsigned long long int total_flows_skipped = 0;
unsigned long long int total_flows_captured = 0;
unsigned long long int total_flows_idle = 0;
+ unsigned long long int total_not_detected = 0;
+ unsigned long long int total_flows_guessed = 0;
unsigned long long int total_flows_detected = 0;
+ unsigned long long int total_flow_detection_updates = 0;
+ unsigned long long int total_flow_updates = 0;
for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i)
{
@@ -3505,27 +3808,41 @@ static int stop_reader_threads(void)
total_flows_skipped += reader_threads[i].workflow->total_skipped_flows;
total_flows_captured += reader_threads[i].workflow->total_active_flows;
total_flows_idle += reader_threads[i].workflow->total_idle_flows;
- total_flows_detected += reader_threads[i].workflow->detected_flow_protocols;
+ total_not_detected += reader_threads[i].workflow->total_not_detected_flows;
+ total_flows_guessed += reader_threads[i].workflow->total_guessed_flows;
+ total_flows_detected += reader_threads[i].workflow->total_detected_flows;
+ total_flow_detection_updates += reader_threads[i].workflow->total_flow_detection_updates;
+ total_flow_updates += reader_threads[i].workflow->total_flow_updates;
printf(
- "Stopping Thread %d, processed %10llu packets, %12llu bytes, skipped flows: %8llu, processed flows: %8llu, "
- "idle flows: %8llu, detected flows: %8llu\n",
+ "Stopping Thread %2d, processed %llu packets, %llu bytes\n"
+ "\tskipped flows.....: %8llu, processed flows: %8llu, idle flows....: %8llu\n"
+ "\tnot detected flows: %8llu, guessed flows..: %8llu, detected flows: %8llu\n"
+ "\tdetection updates.: %8llu, updated flows..: %8llu\n",
reader_threads[i].array_index,
reader_threads[i].workflow->packets_processed,
reader_threads[i].workflow->total_l4_data_len,
reader_threads[i].workflow->total_skipped_flows,
reader_threads[i].workflow->total_active_flows,
reader_threads[i].workflow->total_idle_flows,
- reader_threads[i].workflow->detected_flow_protocols);
+ reader_threads[i].workflow->total_not_detected_flows,
+ reader_threads[i].workflow->total_guessed_flows,
+ reader_threads[i].workflow->total_detected_flows,
+ reader_threads[i].workflow->total_flow_detection_updates,
+ reader_threads[i].workflow->total_flow_updates);
}
/* total packets captured: same value for all threads as packet2thread distribution happens later */
- printf("Total packets captured.: %llu\n", reader_threads[0].workflow->packets_captured);
- printf("Total packets processed: %llu\n", total_packets_processed);
- printf("Total layer4 data size.: %llu\n", total_l4_data_len);
- printf("Total flows ignopred...: %llu\n", total_flows_skipped);
- printf("Total flows processed..: %llu\n", total_flows_captured);
- printf("Total flows timed out..: %llu\n", total_flows_idle);
- printf("Total flows detected...: %llu\n", total_flows_detected);
+ printf("Total packets captured.......: %llu\n", reader_threads[0].workflow->packets_captured);
+ printf("Total packets processed......: %llu\n", total_packets_processed);
+ printf("Total layer4 data size.......: %llu\n", total_l4_data_len);
+ printf("Total flows ignopred.........: %llu\n", total_flows_skipped);
+ printf("Total flows processed........: %llu\n", total_flows_captured);
+ printf("Total flows timed out........: %llu\n", total_flows_idle);
+ printf("Total flows detected.........: %llu\n", total_flows_detected);
+ printf("Total flows guessed..........: %llu\n", total_flows_guessed);
+ printf("Total flows not detected.....: %llu\n", total_not_detected);
+ printf("Total flow updates...........: %llu\n", total_flow_updates);
+ printf("Total flow detections updates: %llu\n", total_flow_detection_updates);
return 0;
}
@@ -3579,22 +3896,22 @@ static void print_subopt_usage(void)
case MAX_READER_THREADS:
fprintf(stderr, "%llu\n", nDPId_options.reader_thread_count);
break;
- case IDLE_SCAN_PERIOD:
- fprintf(stderr, "%llu\n", nDPId_options.idle_scan_period);
- break;
#ifdef ENABLE_MEMORY_PROFILING
- case MEMORY_PROFILING_PRINT_EVERY:
- fprintf(stderr, "%llu\n", nDPId_options.memory_profiling_print_every);
+ case MEMORY_PROFILING_LOG_INTERVAL:
+ fprintf(stderr, "%llu\n", nDPId_options.memory_profiling_log_interval);
break;
#endif
#ifdef ENABLE_ZLIB
- case COMPRESSION_SCAN_PERIOD:
- fprintf(stderr, "%llu\n", nDPId_options.compression_scan_period);
+ case COMPRESSION_SCAN_INTERVAL:
+ fprintf(stderr, "%llu\n", nDPId_options.compression_scan_interval);
break;
case COMPRESSION_FLOW_INACTIVITY:
fprintf(stderr, "%llu\n", nDPId_options.compression_flow_inactivity);
break;
#endif
+ case FLOW_SCAN_INTVERAL:
+ fprintf(stderr, "%llu\n", nDPId_options.flow_scan_interval);
+ break;
case GENERIC_MAX_IDLE_TIME:
fprintf(stderr, "%llu\n", nDPId_options.generic_max_idle_time);
break;
@@ -3786,22 +4103,22 @@ static int nDPId_parse_options(int argc, char ** argv)
case MAX_READER_THREADS:
nDPId_options.reader_thread_count = value_llu;
break;
- case IDLE_SCAN_PERIOD:
- nDPId_options.idle_scan_period = value_llu;
- break;
#ifdef ENABLE_MEMORY_PROFILING
- case MEMORY_PROFILING_PRINT_EVERY:
- nDPId_options.memory_profiling_print_every = value_llu;
+ case MEMORY_PROFILING_LOG_INTERVAL:
+ nDPId_options.memory_profiling_log_interval = value_llu;
break;
#endif
#ifdef ENABLE_ZLIB
- case COMPRESSION_SCAN_PERIOD:
- nDPId_options.compression_scan_period = value_llu;
+ case COMPRESSION_SCAN_INTERVAL:
+ nDPId_options.compression_scan_interval = value_llu;
break;
case COMPRESSION_FLOW_INACTIVITY:
nDPId_options.compression_flow_inactivity = value_llu;
break;
#endif
+ case FLOW_SCAN_INTVERAL:
+ nDPId_options.flow_scan_interval = value_llu;
+ break;
case GENERIC_MAX_IDLE_TIME:
nDPId_options.generic_max_idle_time = value_llu;
break;
@@ -3856,10 +4173,11 @@ static int validate_options(char const * const arg0)
#ifdef ENABLE_ZLIB
if (nDPId_options.enable_zlib_compression != 0)
{
- if (nDPId_options.compression_flow_inactivity < 10000 || nDPId_options.compression_scan_period < 10000)
+ if (nDPId_options.compression_flow_inactivity < 10000 || nDPId_options.compression_scan_interval < 10000)
{
fprintf(stderr,
- "%s: Setting compression-scan-period / compression-flow-activity to values lower than 10000 is not "
+ "%s: Setting compression-scan-interval / compression-flow-activity to values lower than 10000 is "
+ "not "
"recommended.\n"
"%s: Your CPU usage may increase heavily.\n",
arg0,
@@ -3927,23 +4245,50 @@ static int validate_options(char const * const arg0)
nDPId_MAX_READER_THREADS);
retval = 1;
}
- if (nDPId_options.idle_scan_period < 1000)
+ if (nDPId_options.flow_scan_interval < 1000)
{
fprintf(stderr,
- "%s: Value not in range: idle-scan-period[%llu] > 1000\n",
+ "%s: Value not in range: idle-scan-interval[%llu] > 1000\n",
arg0,
- nDPId_options.idle_scan_period);
+ nDPId_options.flow_scan_interval);
retval = 1;
}
- if (nDPId_options.tcp_max_post_end_flow_time > nDPId_options.tcp_max_idle_time)
+ if (nDPId_options.flow_scan_interval >= nDPId_options.generic_max_idle_time)
{
fprintf(stderr,
- "%s: Value not in range: tcp-max-post-end-flow-time[%llu] < tcp-max-idle-time[%llu]\n",
+ "%s: Value not in range: flow-scan-interval[%llu] < generic-max-idle-time[%llu]\n",
arg0,
- nDPId_options.tcp_max_post_end_flow_time,
+ nDPId_options.flow_scan_interval,
+ nDPId_options.generic_max_idle_time);
+ retval = 1;
+ }
+ if (nDPId_options.flow_scan_interval >= nDPId_options.icmp_max_idle_time)
+ {
+ fprintf(stderr,
+ "%s: Value not in range: flow-scan-interval[%llu] < icmp-max-idle-time[%llu]\n",
+ arg0,
+ nDPId_options.flow_scan_interval,
+ nDPId_options.icmp_max_idle_time);
+ retval = 1;
+ }
+ if (nDPId_options.flow_scan_interval >= nDPId_options.tcp_max_idle_time)
+ {
+ fprintf(stderr,
+ "%s: Value not in range: flow-scan-interval[%llu] < generic-max-idle-time[%llu]\n",
+ arg0,
+ nDPId_options.flow_scan_interval,
nDPId_options.tcp_max_idle_time);
retval = 1;
}
+ if (nDPId_options.flow_scan_interval >= nDPId_options.udp_max_idle_time)
+ {
+ fprintf(stderr,
+ "%s: Value not in range:flow-scan-interval[%llu] < udp-max-idle-time[%llu]\n",
+ arg0,
+ nDPId_options.flow_scan_interval,
+ nDPId_options.udp_max_idle_time);
+ retval = 1;
+ }
if (nDPId_options.process_internal_initial_direction != 0 && nDPId_options.process_external_initial_direction != 0)
{
fprintf(stderr,
@@ -3961,11 +4306,17 @@ static int validate_options(char const * const arg0)
if (nDPId_options.max_packets_per_flow_to_process < 1 || nDPId_options.max_packets_per_flow_to_process > 65535)
{
fprintf(stderr,
- "%s: Value not in range: 1 =< max_packets_per_flow_to_process[%llu] =< 65535\n",
+ "%s: Value not in range: 1 =< max-packets-per-flow-to-process[%llu] =< 65535\n",
arg0,
nDPId_options.max_packets_per_flow_to_process);
retval = 1;
}
+ if (nDPId_options.max_packets_per_flow_to_send > 30)
+ {
+ fprintf(stderr,
+ "%s: Higher values of max-packets-per-flow-to-send may cause superfluous network usage.\n",
+ arg0);
+ }
return retval;
}