summaryrefslogtreecommitdiff
path: root/nDPId.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2022-09-13 20:33:15 +0200
committerToni Uhlig <matzeton@googlemail.com>2022-09-13 22:05:08 +0200
commitd4633c11927683865d8b7bec5e0e4162bae82a60 (patch)
tree12e0d78562254e297b7ef9c0f9d4cc3c8fa53874 /nDPId.c
parentaca1615dc13bac949d507c493e9cef80fd2402ef (diff)
New flow event: 'analysis'.
* The goal was to provide a separate event for extracted feature that are not required and only useful for a few (e.g. someone who wants do ML). * Increased network buffer size to 32kB (8192 * 4). * Switched timestamp precision from ms to us for *ALL* timestamps. Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r--nDPId.c305
1 files changed, 264 insertions, 41 deletions
diff --git a/nDPId.c b/nDPId.c
index 59776f25f..8bd22cf0e 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -6,6 +6,7 @@
#include <net/if.h>
#include <netinet/in.h>
#include <ndpi_api.h>
+#include <ndpi_classify.h>
#include <ndpi_main.h>
#include <ndpi_typedefs.h>
#include <pcap/dlt.h>
@@ -103,6 +104,13 @@ enum nDPId_flow_direction
FD_COUNT
};
+struct nDPId_flow_analysis
+{
+ struct ndpi_analyze_struct iat[FD_COUNT];
+ struct ndpi_analyze_struct iat_flow;
+ struct ndpi_analyze_struct pktlen[FD_COUNT];
+};
+
/*
* Minimal per-flow information required for flow mgmt and timeout handling.
*/
@@ -120,7 +128,7 @@ struct nDPId_flow_basic
uint8_t reserved_01[2];
uint16_t src_port;
uint16_t dst_port;
- uint64_t last_seen;
+ uint64_t last_pkt_time[FD_COUNT];
};
/*
@@ -140,6 +148,7 @@ struct nDPId_flow_extended
uint64_t first_seen;
uint64_t last_flow_update;
+ struct nDPId_flow_analysis * flow_analysis;
unsigned long long int total_l4_payload_len[FD_COUNT];
struct ndpi_proto detected_l7_protocol;
};
@@ -269,13 +278,14 @@ enum flow_event
FLOW_EVENT_INVALID = 0,
FLOW_EVENT_NEW,
- FLOW_EVENT_END,
- FLOW_EVENT_IDLE,
- FLOW_EVENT_UPDATE, // Inform distributor applications about flows with a long lifetime.
+ FLOW_EVENT_END, // TCP only: FIN/RST packet seen.
+ FLOW_EVENT_IDLE, // Flow timed out.
+ FLOW_EVENT_UPDATE, // Inform distributor applications about flows with a long lifetime.
+ FLOW_EVENT_ANALYSE, // Print information regarding a flow analysis, see `struct nDPId_flow_analysis'.
FLOW_EVENT_GUESSED,
FLOW_EVENT_DETECTED,
- FLOW_EVENT_DETECTION_UPDATE,
+ FLOW_EVENT_DETECTION_UPDATE, // Some information in `struct ndpi_flow_struct' changed.
FLOW_EVENT_NOT_DETECTED,
FLOW_EVENT_COUNT
@@ -329,6 +339,7 @@ static char const * const flow_event_name_table[FLOW_EVENT_COUNT] = {[FLOW_EVENT
[FLOW_EVENT_END] = "end",
[FLOW_EVENT_IDLE] = "idle",
[FLOW_EVENT_UPDATE] = "update",
+ [FLOW_EVENT_ANALYSE] = "analyse",
[FLOW_EVENT_GUESSED] = "guessed",
[FLOW_EVENT_DETECTED] = "detected",
[FLOW_EVENT_DETECTION_UPDATE] = "detection-update",
@@ -402,6 +413,7 @@ static struct
#ifdef ENABLE_ZLIB
uint8_t enable_zlib_compression;
#endif
+ uint8_t enable_data_analysis;
/* subopts */
char * instance_alias;
unsigned long long int max_flows_per_thread;
@@ -424,6 +436,7 @@ static struct
unsigned long long int tcp_max_post_end_flow_time;
unsigned long long int max_packets_per_flow_to_send;
unsigned long long int max_packets_per_flow_to_process;
+ unsigned long long int max_packets_per_flow_to_analyze;
} nDPId_options = {.pidfile = nDPId_PIDFILE,
.user = "nobody",
.collector_address = COLLECTOR_UNIX_SOCKET,
@@ -446,7 +459,8 @@ static struct
.tcp_max_idle_time = nDPId_TCP_IDLE_TIME,
.tcp_max_post_end_flow_time = nDPId_TCP_POST_END_FLOW_TIME,
.max_packets_per_flow_to_send = nDPId_PACKETS_PER_FLOW_TO_SEND,
- .max_packets_per_flow_to_process = nDPId_PACKETS_PER_FLOW_TO_PROCESS};
+ .max_packets_per_flow_to_process = nDPId_PACKETS_PER_FLOW_TO_PROCESS,
+ .max_packets_per_flow_to_analyze = nDPId_PACKETS_PER_FLOW_TO_ANALYZE};
enum nDPId_subopts
{
@@ -470,6 +484,7 @@ enum nDPId_subopts
TCP_MAX_POST_END_FLOW_TIME,
MAX_PACKETS_PER_FLOW_TO_SEND,
MAX_PACKETS_PER_FLOW_TO_PROCESS,
+ MAX_PACKETS_PER_FLOW_TO_ANALYZE,
};
static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-thread",
[MAX_IDLE_FLOWS_PER_THREAD] = "max-idle-flows-per-thread",
@@ -491,6 +506,7 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th
[TCP_MAX_POST_END_FLOW_TIME] = "tcp-max-post-end-flow-time",
[MAX_PACKETS_PER_FLOW_TO_SEND] = "max-packets-per-flow-to-send",
[MAX_PACKETS_PER_FLOW_TO_PROCESS] = "max-packets-per-flow-to-process",
+ [MAX_PACKETS_PER_FLOW_TO_ANALYZE] = "max-packets-per-flow-to-analyze",
NULL};
static void sighandler(int signum);
@@ -540,6 +556,21 @@ static int set_collector_block(struct nDPId_reader_thread * const reader_thread)
return 0;
}
+static uint64_t get_last_pkt_time(struct nDPId_flow_basic const * const flow_basic)
+{
+ return ndpi_max(flow_basic->last_pkt_time[FD_SRC2DST], flow_basic->last_pkt_time[FD_DST2SRC]);
+}
+
+static uint64_t timer_sub(uint64_t a, uint64_t b)
+{
+ if (b > a)
+ {
+ return 0;
+ }
+
+ return a - b;
+}
+
#ifdef ENABLE_ZLIB
static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen)
{
@@ -713,7 +744,8 @@ static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int de
case FS_INFO:
{
- if (flow_basic->last_seen + nDPId_options.compression_flow_inactivity < workflow->last_thread_time)
+ if (get_last_pkt_time(flow_basic) + nDPId_options.compression_flow_inactivity <
+ workflow->last_thread_time)
{
struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic;
@@ -1227,6 +1259,19 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
return workflow;
}
+static void free_analysis_data(struct nDPId_flow_extended * const flow_ext)
+{
+ if (nDPId_options.enable_data_analysis != 0 && flow_ext->flow_analysis != NULL)
+ {
+ ndpi_free_data_analysis(&flow_ext->flow_analysis->iat[FD_SRC2DST], 0);
+ ndpi_free_data_analysis(&flow_ext->flow_analysis->iat[FD_DST2SRC], 0);
+ ndpi_free_data_analysis(&flow_ext->flow_analysis->iat_flow, 0);
+ ndpi_free_data_analysis(&flow_ext->flow_analysis->pktlen[FD_SRC2DST], 0);
+ ndpi_free_data_analysis(&flow_ext->flow_analysis->pktlen[FD_DST2SRC], 0);
+ ndpi_free(flow_ext->flow_analysis);
+ }
+}
+
static void free_detection_data(struct nDPId_flow * const flow)
{
ndpi_free_flow_data(&flow->info.detection_data->flow);
@@ -1245,13 +1290,43 @@ static int alloc_detection_data(struct nDPId_flow * const flow)
memset(flow->info.detection_data, 0, sizeof(*flow->info.detection_data));
+ if (nDPId_options.enable_data_analysis != 0)
+ {
+ flow->flow_extended.flow_analysis =
+ (struct nDPId_flow_analysis *)ndpi_malloc(sizeof(*flow->flow_extended.flow_analysis));
+ if (flow->flow_extended.flow_analysis == NULL)
+ {
+ goto error;
+ }
+
+ ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->iat[FD_SRC2DST],
+ nDPId_options.max_packets_per_flow_to_analyze);
+ ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->iat[FD_DST2SRC],
+ nDPId_options.max_packets_per_flow_to_analyze);
+ ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->iat_flow,
+ nDPId_options.max_packets_per_flow_to_analyze);
+ ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->pktlen[FD_SRC2DST],
+ nDPId_options.max_packets_per_flow_to_analyze);
+ ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->pktlen[FD_DST2SRC],
+ nDPId_options.max_packets_per_flow_to_analyze);
+
+ if (flow->flow_extended.flow_analysis->iat[FD_SRC2DST].values == NULL ||
+ flow->flow_extended.flow_analysis->iat[FD_DST2SRC].values == NULL ||
+ flow->flow_extended.flow_analysis->iat_flow.values == NULL ||
+ flow->flow_extended.flow_analysis->pktlen[FD_SRC2DST].values == NULL ||
+ flow->flow_extended.flow_analysis->pktlen[FD_DST2SRC].values == NULL)
+ {
+ goto error;
+ }
+ }
+
return 0;
error:
free_detection_data(flow);
return 1;
}
-static void ndpi_flow_info_freer(void * const node)
+static void ndpi_flow_info_free(void * const node)
{
struct nDPId_flow_basic * const flow_basic = (struct nDPId_flow_basic *)node;
@@ -1261,12 +1336,19 @@ static void ndpi_flow_info_freer(void * const node)
case FS_COUNT:
case FS_SKIPPED:
+ break;
+
case FS_FINISHED:
+ {
+ struct nDPId_flow_extended * const flow_ext = (struct nDPId_flow_extended *)flow_basic;
+ free_analysis_data(flow_ext);
break;
+ }
case FS_INFO:
{
struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic;
+ free_analysis_data(&flow->flow_extended);
free_detection_data(flow);
break;
}
@@ -1295,7 +1377,7 @@ static void free_workflow(struct nDPId_workflow ** const workflow)
}
for (size_t i = 0; i < w->max_active_flows; i++)
{
- ndpi_tdestroy(w->ndpi_flows_active[i], ndpi_flow_info_freer);
+ ndpi_tdestroy(w->ndpi_flows_active[i], ndpi_flow_info_free);
}
ndpi_free(w->ndpi_flows_active);
ndpi_free(w->ndpi_flows_idle);
@@ -1485,7 +1567,7 @@ static int is_l4_protocol_timed_out(struct nDPId_workflow const * const workflow
{
uint64_t itime = get_l4_protocol_idle_time(flow_basic->l4_protocol);
- return flow_basic->tcp_fin_rst_seen == 1 || flow_basic->last_seen + itime <= workflow->last_thread_time;
+ return flow_basic->tcp_fin_rst_seen == 1 || get_last_pkt_time(flow_basic) + itime <= workflow->last_thread_time;
}
static int is_tcp_post_end(struct nDPId_workflow const * const workflow,
@@ -1493,7 +1575,7 @@ static int is_tcp_post_end(struct nDPId_workflow const * const workflow,
{
return flow_basic->l4_protocol != IPPROTO_TCP || flow_basic->tcp_fin_rst_seen == 0 ||
(flow_basic->tcp_fin_rst_seen == 1 &&
- flow_basic->last_seen + nDPId_options.tcp_max_post_end_flow_time <= workflow->last_thread_time);
+ get_last_pkt_time(flow_basic) + nDPId_options.tcp_max_post_end_flow_time <= workflow->last_thread_time);
}
static int is_flow_update_required(struct nDPId_workflow const * const workflow,
@@ -1662,7 +1744,7 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
}
ndpi_tdelete(flow_basic, &workflow->ndpi_flows_active[idle_scan_index], ndpi_workflow_node_cmp);
- ndpi_flow_info_freer(flow_basic);
+ ndpi_flow_info_free(flow_basic);
workflow->cur_active_flows--;
}
}
@@ -1924,7 +2006,7 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu
1 /* DAEMON_EVENT_SHUTDOWN is an event as well */);
break;
}
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_msec", workflow->last_global_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_usec", workflow->last_global_time);
serialize_and_send(reader_thread);
}
@@ -1941,7 +2023,12 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl
"flow_dst_packets_processed",
flow_ext->packets_processed[FD_DST2SRC]);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_first_seen", flow_ext->first_seen);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_last_seen", flow_ext->flow_basic.last_seen);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "flow_src_last_pkt_time",
+ flow_ext->flow_basic.last_pkt_time[FD_SRC2DST]);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "flow_dst_last_pkt_time",
+ flow_ext->flow_basic.last_pkt_time[FD_DST2SRC]);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
"flow_idle_time",
get_l4_protocol_idle_time_external(flow_ext->flow_basic.l4_protocol));
@@ -1964,7 +2051,7 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl
"flow_dst_tot_l4_payload_len",
flow_ext->total_l4_payload_len[FD_DST2SRC]);
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow_ext->flow_basic.tcp_is_midstream_flow);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_msec", workflow->last_thread_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_usec", workflow->last_thread_time);
}
static int connect_to_collector(struct nDPId_reader_thread * const reader_thread)
@@ -2262,6 +2349,92 @@ static int base64encode(uint8_t const * const data_buf,
return 0; /* indicate success */
}
+static void jsonize_data_analysis(struct nDPId_reader_thread * const reader_thread,
+ struct nDPId_flow_extended const * const flow_ext)
+{
+ struct nDPId_workflow * const workflow = reader_thread->workflow;
+ struct nDPId_flow_analysis * const analysis = (struct nDPId_flow_analysis *)flow_ext->flow_analysis;
+
+ if (nDPId_options.enable_data_analysis != 0 && flow_ext->flow_analysis != NULL)
+ {
+ ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "data_analysis");
+ ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "iat");
+ ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_min", ndpi_data_min(&analysis->iat_flow));
+ ndpi_serialize_string_float(&workflow->ndpi_serializer,
+ "flow_avg",
+ ndpi_data_average(&analysis->iat_flow),
+ "%.1f");
+ ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_max", ndpi_data_max(&analysis->iat_flow));
+ ndpi_serialize_string_float(&workflow->ndpi_serializer,
+ "flow_stddev",
+ ndpi_data_stddev(&analysis->iat_flow),
+ "%.1f");
+
+ ndpi_serialize_string_uint32(&workflow->ndpi_serializer,
+ "c_to_s_min",
+ ndpi_data_min(&analysis->iat[FD_SRC2DST]));
+ ndpi_serialize_string_float(&workflow->ndpi_serializer,
+ "c_to_s_avg",
+ ndpi_data_average(&analysis->iat[FD_SRC2DST]),
+ "%.1f");
+ ndpi_serialize_string_uint32(&workflow->ndpi_serializer,
+ "c_to_s_max",
+ ndpi_data_max(&analysis->iat[FD_SRC2DST]));
+ ndpi_serialize_string_float(&workflow->ndpi_serializer,
+ "c_to_s_stddev",
+ ndpi_data_stddev(&analysis->iat[FD_SRC2DST]),
+ "%.1f");
+
+ ndpi_serialize_string_uint32(&workflow->ndpi_serializer,
+ "s_to_c_min",
+ ndpi_data_min(&analysis->iat[FD_DST2SRC]));
+ ndpi_serialize_string_float(&workflow->ndpi_serializer,
+ "s_to_c_avg",
+ ndpi_data_average(&analysis->iat[FD_DST2SRC]),
+ "%.1f");
+ ndpi_serialize_string_uint32(&workflow->ndpi_serializer,
+ "s_to_c_max",
+ ndpi_data_max(&analysis->iat[FD_DST2SRC]));
+ ndpi_serialize_string_float(&workflow->ndpi_serializer,
+ "s_to_c_stddev",
+ ndpi_data_stddev(&analysis->iat[FD_DST2SRC]),
+ "%.1f");
+ ndpi_serialize_end_of_block(&workflow->ndpi_serializer);
+ ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "pktlen");
+ ndpi_serialize_string_uint32(&workflow->ndpi_serializer,
+ "c_to_s_min",
+ ndpi_data_min(&analysis->pktlen[FD_SRC2DST]));
+ ndpi_serialize_string_float(&workflow->ndpi_serializer,
+ "c_to_s_avg",
+ ndpi_data_average(&analysis->pktlen[FD_SRC2DST]),
+ "%.1f");
+ ndpi_serialize_string_uint32(&workflow->ndpi_serializer,
+ "c_to_s_max",
+ ndpi_data_max(&analysis->pktlen[FD_SRC2DST]));
+ ndpi_serialize_string_float(&workflow->ndpi_serializer,
+ "c_to_s_stddev",
+ ndpi_data_stddev(&analysis->pktlen[FD_SRC2DST]),
+ "%.1f");
+
+ ndpi_serialize_string_uint32(&workflow->ndpi_serializer,
+ "s_to_c_min",
+ ndpi_data_min(&analysis->pktlen[FD_DST2SRC]));
+ ndpi_serialize_string_float(&workflow->ndpi_serializer,
+ "s_to_c_avg",
+ ndpi_data_average(&analysis->pktlen[FD_DST2SRC]),
+ "%.1f");
+ ndpi_serialize_string_uint32(&workflow->ndpi_serializer,
+ "s_to_c_max",
+ ndpi_data_max(&analysis->pktlen[FD_DST2SRC]));
+ ndpi_serialize_string_float(&workflow->ndpi_serializer,
+ "s_to_c_stddev",
+ ndpi_data_stddev(&analysis->pktlen[FD_DST2SRC]),
+ "%.1f");
+ ndpi_serialize_end_of_block(&workflow->ndpi_serializer);
+ ndpi_serialize_end_of_block(&workflow->ndpi_serializer);
+ }
+}
+
static void jsonize_packet_event(struct nDPId_reader_thread * const reader_thread,
struct pcap_pkthdr const * const header,
uint8_t const * const packet,
@@ -2310,7 +2483,12 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa
ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
"flow_packet_id",
flow_ext->packets_processed[FD_SRC2DST] + flow_ext->packets_processed[FD_DST2SRC]);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_last_seen", flow_ext->flow_basic.last_seen);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "flow_src_last_pkt_time",
+ flow_ext->flow_basic.last_pkt_time[FD_SRC2DST]);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "flow_dst_last_pkt_time",
+ flow_ext->flow_basic.last_pkt_time[FD_DST2SRC]);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
"flow_idle_time",
get_l4_protocol_idle_time_external(flow_ext->flow_basic.l4_protocol));
@@ -2327,7 +2505,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_offset", pkt_l4_offset);
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_len", header->caplen);
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_len", pkt_l4_len);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_msec", workflow->last_thread_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_usec", workflow->last_thread_time);
if (base64_retval == 0 && base64_data_len > 0)
{
@@ -2381,6 +2559,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
case FLOW_EVENT_END:
case FLOW_EVENT_IDLE:
case FLOW_EVENT_UPDATE:
+ case FLOW_EVENT_ANALYSE:
ndpi_serialize_string_int32(&workflow->ndpi_serializer,
"flow_datalink",
pcap_datalink(reader_thread->workflow->pcap_handle));
@@ -2388,6 +2567,10 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
"flow_max_packets",
nDPId_options.max_packets_per_flow_to_send);
+ if (event == FLOW_EVENT_ANALYSE)
+ {
+ jsonize_data_analysis(reader_thread, flow_ext);
+ }
if (flow_ext->flow_basic.state == FS_FINISHED)
{
struct nDPId_flow * const flow = (struct nDPId_flow *)flow_ext;
@@ -2446,6 +2629,7 @@ static void jsonize_flow_detection_event(struct nDPId_reader_thread * const read
case FLOW_EVENT_END:
case FLOW_EVENT_IDLE:
case FLOW_EVENT_UPDATE:
+ case FLOW_EVENT_ANALYSE:
logger(1,
"[%8llu, %4llu] internal error / invalid function call",
workflow->packets_captured,
@@ -2670,7 +2854,7 @@ __attribute__((format(printf, 3, 4))) static void jsonize_error_eventf(struct nD
va_end(ap);
}
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_msec", workflow->last_global_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_usec", workflow->last_global_time);
serialize_and_send(reader_thread);
}
@@ -3172,8 +3356,11 @@ static void ndpi_process_packet(uint8_t * const args,
const struct ndpi_iphdr * ip;
struct ndpi_ipv6hdr * ip6;
+ const struct ndpi_tcphdr * tcp = NULL;
+
+ uint64_t time_us;
+ uint64_t last_pkt_time;
- uint64_t time_ms;
uint16_t ip_offset = 0;
uint16_t ip_size;
@@ -3196,11 +3383,14 @@ static void ndpi_process_packet(uint8_t * const args,
}
workflow->packets_captured++;
- time_ms = ((uint64_t)header->ts.tv_sec) * nDPId_options.tick_resolution +
- header->ts.tv_usec / (1000000 / nDPId_options.tick_resolution);
- if (workflow->last_global_time < time_ms)
+ time_us = ndpi_timeval_to_microseconds(header->ts);
+ if (workflow->last_global_time < time_us)
{
- workflow->last_global_time = time_ms;
+ workflow->last_global_time = time_us;
+ if (workflow->last_thread_time == 0)
+ {
+ workflow->last_thread_time = time_us;
+ }
}
do_periodically_work(reader_thread);
@@ -3350,8 +3540,6 @@ static void ndpi_process_packet(uint8_t * const args,
/* process layer4 e.g. TCP / UDP */
if (flow_basic.l4_protocol == IPPROTO_TCP)
{
- const struct ndpi_tcphdr * tcp;
-
if (header->caplen < (l4_ptr - packet) + sizeof(struct ndpi_tcphdr))
{
if (distribute_single_packet(reader_thread) != 0)
@@ -3428,9 +3616,9 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
- if (workflow->last_thread_time < time_ms)
+ if (workflow->last_thread_time < time_us)
{
- workflow->last_thread_time = time_ms;
+ workflow->last_thread_time = time_us;
}
/* calculate flow hash for btree find, search(insert) */
@@ -3639,7 +3827,8 @@ static void ndpi_process_packet(uint8_t * const args,
struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result;
/* Update last seen timestamp for timeout handling. */
- flow_basic_to_process->last_seen = workflow->last_thread_time;
+ last_pkt_time = flow_basic_to_process->last_pkt_time[direction];
+ flow_basic_to_process->last_pkt_time[direction] = workflow->last_thread_time;
/* TCP-FIN/TCP-RST: indicates that at least one side wants to end the connection. */
if (flow_basic.tcp_fin_rst_seen != 0)
{
@@ -3686,11 +3875,6 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->packets_processed++;
workflow->total_l4_payload_len += l4_payload_len;
- if (flow_to_process->flow_extended.first_seen == 0)
- {
- flow_to_process->flow_extended.first_seen = flow_to_process->flow_extended.flow_basic.last_seen =
- flow_to_process->flow_extended.last_flow_update = workflow->last_thread_time;
- }
if (l4_payload_len > flow_to_process->flow_extended.max_l4_payload_len[direction])
{
flow_to_process->flow_extended.max_l4_payload_len[direction] = l4_payload_len;
@@ -3702,11 +3886,37 @@ static void ndpi_process_packet(uint8_t * const args,
if (is_new_flow != 0)
{
+ flow_to_process->flow_extended.first_seen = flow_to_process->flow_extended.flow_basic.last_pkt_time[direction] =
+ flow_to_process->flow_extended.flow_basic.last_pkt_time[1 - direction] =
+ flow_to_process->flow_extended.last_flow_update = workflow->last_thread_time;
flow_to_process->flow_extended.max_l4_payload_len[direction] = l4_payload_len;
flow_to_process->flow_extended.min_l4_payload_len[direction] = l4_payload_len;
jsonize_flow_event(reader_thread, &flow_to_process->flow_extended, FLOW_EVENT_NEW);
}
+ if (nDPId_options.enable_data_analysis != 0 && flow_to_process->flow_extended.flow_analysis != NULL &&
+ flow_to_process->flow_extended.packets_processed[FD_SRC2DST] +
+ flow_to_process->flow_extended.packets_processed[FD_DST2SRC] <=
+ nDPId_options.max_packets_per_flow_to_analyze)
+ {
+ uint64_t tdiff_us = timer_sub(workflow->last_thread_time, last_pkt_time);
+
+ if (tdiff_us > 0)
+ {
+ ndpi_data_add_value(&flow_to_process->flow_extended.flow_analysis->iat[direction], tdiff_us);
+ ndpi_data_add_value(&flow_to_process->flow_extended.flow_analysis->iat_flow, tdiff_us);
+ }
+
+ ndpi_data_add_value(&flow_to_process->flow_extended.flow_analysis->pktlen[direction], header->caplen);
+
+ if (flow_to_process->flow_extended.packets_processed[FD_SRC2DST] +
+ flow_to_process->flow_extended.packets_processed[FD_DST2SRC] ==
+ nDPId_options.max_packets_per_flow_to_analyze)
+ {
+ jsonize_flow_event(reader_thread, &flow_to_process->flow_extended, FLOW_EVENT_ANALYSE);
+ }
+ }
+
jsonize_packet_event(reader_thread,
header,
packet,
@@ -3844,7 +4054,7 @@ static void ndpi_log_flow_walker(void const * const A, ndpi_VISIT which, int dep
{
struct nDPId_flow const * const flow = (struct nDPId_flow *)flow_basic;
- uint64_t last_seen = flow->flow_extended.flow_basic.last_seen;
+ uint64_t last_seen = get_last_pkt_time(flow_basic);
uint64_t idle_time = get_l4_protocol_idle_time_external(flow->flow_extended.flow_basic.l4_protocol);
logger(0,
"[%2zu][%4llu][last-seen: %13llu][last-update: %13llu][idle-time: %7llu][time-until-timeout: "
@@ -3864,7 +4074,7 @@ static void ndpi_log_flow_walker(void const * const A, ndpi_VISIT which, int dep
{
struct nDPId_flow const * const flow = (struct nDPId_flow *)flow_basic;
- uint64_t last_seen = flow->flow_extended.flow_basic.last_seen;
+ uint64_t last_seen = get_last_pkt_time(flow_basic);
uint64_t idle_time = get_l4_protocol_idle_time_external(flow->flow_extended.flow_basic.l4_protocol);
logger(0,
"[%2zu][%4llu][last-seen: %13llu][last-update: %13llu][idle-time: %7llu][time-until-timeout: "
@@ -3999,11 +4209,11 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
{
struct timeval tval_diff;
get_current_time(&tval_after_epoll);
- timersub(&tval_after_epoll, &tval_before_epoll, &tval_diff);
- uint64_t tdiff_ms = tval_diff.tv_sec * 1000 + tval_diff.tv_usec / 1000;
+ ndpi_timer_sub(&tval_after_epoll, &tval_before_epoll, &tval_diff);
+ uint64_t tdiff_us = tval_diff.tv_sec * 1000 * 1000 + tval_diff.tv_usec;
- reader_thread->workflow->last_global_time += tdiff_ms;
- reader_thread->workflow->last_thread_time += tdiff_ms;
+ reader_thread->workflow->last_global_time += tdiff_us;
+ reader_thread->workflow->last_thread_time += tdiff_us;
do_periodically_work(reader_thread);
}
@@ -4418,6 +4628,9 @@ static void print_subopt_usage(void)
case MAX_PACKETS_PER_FLOW_TO_PROCESS:
fprintf(stderr, "%llu\n", nDPId_options.max_packets_per_flow_to_process);
break;
+ case MAX_PACKETS_PER_FLOW_TO_ANALYZE:
+ fprintf(stderr, "%llu\n", nDPId_options.max_packets_per_flow_to_analyze);
+ break;
}
}
else
@@ -4442,7 +4655,9 @@ static int nDPId_parse_options(int argc, char ** argv)
"[-u user] [-g group] "
"[-P path] [-C path] [-J path]\n"
"\t \t"
- "[-a instance-alias] [-o subopt=value]\n"
+ "[-a instance-alias] [-A]\n"
+ "\t \t"
+ "[-o subopt=value]\n"
"\t \t"
"[-v] [-h]\n\n"
"\t-i\tInterface or file from where to read packets from.\n"
@@ -4469,6 +4684,7 @@ static int nDPId_parse_options(int argc, char ** argv)
"\t \tThis value is required for correct flow handling of\n"
"\t \tmultiple instances and should be unique.\n"
"\t \tDefaults to your hostname.\n"
+ "\t-A\tEnable flow analysis aka feature extraction. Requires more memory and cpu usage.\n"
#ifdef ENABLE_ZLIB
"\t-z\tEnable flow memory zLib compression.\n"
#endif
@@ -4476,7 +4692,7 @@ static int nDPId_parse_options(int argc, char ** argv)
"\t-v\tversion\n"
"\t-h\tthis\n\n";
- while ((opt = getopt(argc, argv, "i:IEB:lL:c:dp:u:g:P:C:J:S:a:zo:vh")) != -1)
+ while ((opt = getopt(argc, argv, "i:IEB:lL:c:dp:u:g:P:C:J:S:a:Azo:vh")) != -1)
{
switch (opt)
{
@@ -4533,6 +4749,9 @@ static int nDPId_parse_options(int argc, char ** argv)
case 'a':
nDPId_options.instance_alias = strdup(optarg);
break;
+ case 'A':
+ nDPId_options.enable_data_analysis = 1;
+ break;
case 'z':
#ifdef ENABLE_ZLIB
nDPId_options.enable_zlib_compression = 1;
@@ -4633,6 +4852,10 @@ static int nDPId_parse_options(int argc, char ** argv)
break;
case MAX_PACKETS_PER_FLOW_TO_PROCESS:
nDPId_options.max_packets_per_flow_to_process = value_llu;
+ break;
+ case MAX_PACKETS_PER_FLOW_TO_ANALYZE:
+ nDPId_options.max_packets_per_flow_to_analyze = value_llu;
+ break;
}
}
break;