diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2022-09-13 20:33:15 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-09-13 22:05:08 +0200 |
commit | d4633c11927683865d8b7bec5e0e4162bae82a60 (patch) | |
tree | 12e0d78562254e297b7ef9c0f9d4cc3c8fa53874 /nDPId.c | |
parent | aca1615dc13bac949d507c493e9cef80fd2402ef (diff) |
New flow event: 'analysis'.
* The goal was to provide a separate event for extracted feature that are not required
and only useful for a few (e.g. someone who wants do ML).
* Increased network buffer size to 32kB (8192 * 4).
* Switched timestamp precision from ms to us for *ALL* timestamps.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r-- | nDPId.c | 305 |
1 files changed, 264 insertions, 41 deletions
@@ -6,6 +6,7 @@ #include <net/if.h> #include <netinet/in.h> #include <ndpi_api.h> +#include <ndpi_classify.h> #include <ndpi_main.h> #include <ndpi_typedefs.h> #include <pcap/dlt.h> @@ -103,6 +104,13 @@ enum nDPId_flow_direction FD_COUNT }; +struct nDPId_flow_analysis +{ + struct ndpi_analyze_struct iat[FD_COUNT]; + struct ndpi_analyze_struct iat_flow; + struct ndpi_analyze_struct pktlen[FD_COUNT]; +}; + /* * Minimal per-flow information required for flow mgmt and timeout handling. */ @@ -120,7 +128,7 @@ struct nDPId_flow_basic uint8_t reserved_01[2]; uint16_t src_port; uint16_t dst_port; - uint64_t last_seen; + uint64_t last_pkt_time[FD_COUNT]; }; /* @@ -140,6 +148,7 @@ struct nDPId_flow_extended uint64_t first_seen; uint64_t last_flow_update; + struct nDPId_flow_analysis * flow_analysis; unsigned long long int total_l4_payload_len[FD_COUNT]; struct ndpi_proto detected_l7_protocol; }; @@ -269,13 +278,14 @@ enum flow_event FLOW_EVENT_INVALID = 0, FLOW_EVENT_NEW, - FLOW_EVENT_END, - FLOW_EVENT_IDLE, - FLOW_EVENT_UPDATE, // Inform distributor applications about flows with a long lifetime. + FLOW_EVENT_END, // TCP only: FIN/RST packet seen. + FLOW_EVENT_IDLE, // Flow timed out. + FLOW_EVENT_UPDATE, // Inform distributor applications about flows with a long lifetime. + FLOW_EVENT_ANALYSE, // Print information regarding a flow analysis, see `struct nDPId_flow_analysis'. FLOW_EVENT_GUESSED, FLOW_EVENT_DETECTED, - FLOW_EVENT_DETECTION_UPDATE, + FLOW_EVENT_DETECTION_UPDATE, // Some information in `struct ndpi_flow_struct' changed. FLOW_EVENT_NOT_DETECTED, FLOW_EVENT_COUNT @@ -329,6 +339,7 @@ static char const * const flow_event_name_table[FLOW_EVENT_COUNT] = {[FLOW_EVENT [FLOW_EVENT_END] = "end", [FLOW_EVENT_IDLE] = "idle", [FLOW_EVENT_UPDATE] = "update", + [FLOW_EVENT_ANALYSE] = "analyse", [FLOW_EVENT_GUESSED] = "guessed", [FLOW_EVENT_DETECTED] = "detected", [FLOW_EVENT_DETECTION_UPDATE] = "detection-update", @@ -402,6 +413,7 @@ static struct #ifdef ENABLE_ZLIB uint8_t enable_zlib_compression; #endif + uint8_t enable_data_analysis; /* subopts */ char * instance_alias; unsigned long long int max_flows_per_thread; @@ -424,6 +436,7 @@ static struct unsigned long long int tcp_max_post_end_flow_time; unsigned long long int max_packets_per_flow_to_send; unsigned long long int max_packets_per_flow_to_process; + unsigned long long int max_packets_per_flow_to_analyze; } nDPId_options = {.pidfile = nDPId_PIDFILE, .user = "nobody", .collector_address = COLLECTOR_UNIX_SOCKET, @@ -446,7 +459,8 @@ static struct .tcp_max_idle_time = nDPId_TCP_IDLE_TIME, .tcp_max_post_end_flow_time = nDPId_TCP_POST_END_FLOW_TIME, .max_packets_per_flow_to_send = nDPId_PACKETS_PER_FLOW_TO_SEND, - .max_packets_per_flow_to_process = nDPId_PACKETS_PER_FLOW_TO_PROCESS}; + .max_packets_per_flow_to_process = nDPId_PACKETS_PER_FLOW_TO_PROCESS, + .max_packets_per_flow_to_analyze = nDPId_PACKETS_PER_FLOW_TO_ANALYZE}; enum nDPId_subopts { @@ -470,6 +484,7 @@ enum nDPId_subopts TCP_MAX_POST_END_FLOW_TIME, MAX_PACKETS_PER_FLOW_TO_SEND, MAX_PACKETS_PER_FLOW_TO_PROCESS, + MAX_PACKETS_PER_FLOW_TO_ANALYZE, }; static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-thread", [MAX_IDLE_FLOWS_PER_THREAD] = "max-idle-flows-per-thread", @@ -491,6 +506,7 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th [TCP_MAX_POST_END_FLOW_TIME] = "tcp-max-post-end-flow-time", [MAX_PACKETS_PER_FLOW_TO_SEND] = "max-packets-per-flow-to-send", [MAX_PACKETS_PER_FLOW_TO_PROCESS] = "max-packets-per-flow-to-process", + [MAX_PACKETS_PER_FLOW_TO_ANALYZE] = "max-packets-per-flow-to-analyze", NULL}; static void sighandler(int signum); @@ -540,6 +556,21 @@ static int set_collector_block(struct nDPId_reader_thread * const reader_thread) return 0; } +static uint64_t get_last_pkt_time(struct nDPId_flow_basic const * const flow_basic) +{ + return ndpi_max(flow_basic->last_pkt_time[FD_SRC2DST], flow_basic->last_pkt_time[FD_DST2SRC]); +} + +static uint64_t timer_sub(uint64_t a, uint64_t b) +{ + if (b > a) + { + return 0; + } + + return a - b; +} + #ifdef ENABLE_ZLIB static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen) { @@ -713,7 +744,8 @@ static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int de case FS_INFO: { - if (flow_basic->last_seen + nDPId_options.compression_flow_inactivity < workflow->last_thread_time) + if (get_last_pkt_time(flow_basic) + nDPId_options.compression_flow_inactivity < + workflow->last_thread_time) { struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic; @@ -1227,6 +1259,19 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device) return workflow; } +static void free_analysis_data(struct nDPId_flow_extended * const flow_ext) +{ + if (nDPId_options.enable_data_analysis != 0 && flow_ext->flow_analysis != NULL) + { + ndpi_free_data_analysis(&flow_ext->flow_analysis->iat[FD_SRC2DST], 0); + ndpi_free_data_analysis(&flow_ext->flow_analysis->iat[FD_DST2SRC], 0); + ndpi_free_data_analysis(&flow_ext->flow_analysis->iat_flow, 0); + ndpi_free_data_analysis(&flow_ext->flow_analysis->pktlen[FD_SRC2DST], 0); + ndpi_free_data_analysis(&flow_ext->flow_analysis->pktlen[FD_DST2SRC], 0); + ndpi_free(flow_ext->flow_analysis); + } +} + static void free_detection_data(struct nDPId_flow * const flow) { ndpi_free_flow_data(&flow->info.detection_data->flow); @@ -1245,13 +1290,43 @@ static int alloc_detection_data(struct nDPId_flow * const flow) memset(flow->info.detection_data, 0, sizeof(*flow->info.detection_data)); + if (nDPId_options.enable_data_analysis != 0) + { + flow->flow_extended.flow_analysis = + (struct nDPId_flow_analysis *)ndpi_malloc(sizeof(*flow->flow_extended.flow_analysis)); + if (flow->flow_extended.flow_analysis == NULL) + { + goto error; + } + + ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->iat[FD_SRC2DST], + nDPId_options.max_packets_per_flow_to_analyze); + ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->iat[FD_DST2SRC], + nDPId_options.max_packets_per_flow_to_analyze); + ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->iat_flow, + nDPId_options.max_packets_per_flow_to_analyze); + ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->pktlen[FD_SRC2DST], + nDPId_options.max_packets_per_flow_to_analyze); + ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->pktlen[FD_DST2SRC], + nDPId_options.max_packets_per_flow_to_analyze); + + if (flow->flow_extended.flow_analysis->iat[FD_SRC2DST].values == NULL || + flow->flow_extended.flow_analysis->iat[FD_DST2SRC].values == NULL || + flow->flow_extended.flow_analysis->iat_flow.values == NULL || + flow->flow_extended.flow_analysis->pktlen[FD_SRC2DST].values == NULL || + flow->flow_extended.flow_analysis->pktlen[FD_DST2SRC].values == NULL) + { + goto error; + } + } + return 0; error: free_detection_data(flow); return 1; } -static void ndpi_flow_info_freer(void * const node) +static void ndpi_flow_info_free(void * const node) { struct nDPId_flow_basic * const flow_basic = (struct nDPId_flow_basic *)node; @@ -1261,12 +1336,19 @@ static void ndpi_flow_info_freer(void * const node) case FS_COUNT: case FS_SKIPPED: + break; + case FS_FINISHED: + { + struct nDPId_flow_extended * const flow_ext = (struct nDPId_flow_extended *)flow_basic; + free_analysis_data(flow_ext); break; + } case FS_INFO: { struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic; + free_analysis_data(&flow->flow_extended); free_detection_data(flow); break; } @@ -1295,7 +1377,7 @@ static void free_workflow(struct nDPId_workflow ** const workflow) } for (size_t i = 0; i < w->max_active_flows; i++) { - ndpi_tdestroy(w->ndpi_flows_active[i], ndpi_flow_info_freer); + ndpi_tdestroy(w->ndpi_flows_active[i], ndpi_flow_info_free); } ndpi_free(w->ndpi_flows_active); ndpi_free(w->ndpi_flows_idle); @@ -1485,7 +1567,7 @@ static int is_l4_protocol_timed_out(struct nDPId_workflow const * const workflow { uint64_t itime = get_l4_protocol_idle_time(flow_basic->l4_protocol); - return flow_basic->tcp_fin_rst_seen == 1 || flow_basic->last_seen + itime <= workflow->last_thread_time; + return flow_basic->tcp_fin_rst_seen == 1 || get_last_pkt_time(flow_basic) + itime <= workflow->last_thread_time; } static int is_tcp_post_end(struct nDPId_workflow const * const workflow, @@ -1493,7 +1575,7 @@ static int is_tcp_post_end(struct nDPId_workflow const * const workflow, { return flow_basic->l4_protocol != IPPROTO_TCP || flow_basic->tcp_fin_rst_seen == 0 || (flow_basic->tcp_fin_rst_seen == 1 && - flow_basic->last_seen + nDPId_options.tcp_max_post_end_flow_time <= workflow->last_thread_time); + get_last_pkt_time(flow_basic) + nDPId_options.tcp_max_post_end_flow_time <= workflow->last_thread_time); } static int is_flow_update_required(struct nDPId_workflow const * const workflow, @@ -1662,7 +1744,7 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, } ndpi_tdelete(flow_basic, &workflow->ndpi_flows_active[idle_scan_index], ndpi_workflow_node_cmp); - ndpi_flow_info_freer(flow_basic); + ndpi_flow_info_free(flow_basic); workflow->cur_active_flows--; } } @@ -1924,7 +2006,7 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu 1 /* DAEMON_EVENT_SHUTDOWN is an event as well */); break; } - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_msec", workflow->last_global_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_usec", workflow->last_global_time); serialize_and_send(reader_thread); } @@ -1941,7 +2023,12 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl "flow_dst_packets_processed", flow_ext->packets_processed[FD_DST2SRC]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_first_seen", flow_ext->first_seen); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_last_seen", flow_ext->flow_basic.last_seen); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_src_last_pkt_time", + flow_ext->flow_basic.last_pkt_time[FD_SRC2DST]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_dst_last_pkt_time", + flow_ext->flow_basic.last_pkt_time[FD_DST2SRC]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_idle_time", get_l4_protocol_idle_time_external(flow_ext->flow_basic.l4_protocol)); @@ -1964,7 +2051,7 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl "flow_dst_tot_l4_payload_len", flow_ext->total_l4_payload_len[FD_DST2SRC]); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow_ext->flow_basic.tcp_is_midstream_flow); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_msec", workflow->last_thread_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_usec", workflow->last_thread_time); } static int connect_to_collector(struct nDPId_reader_thread * const reader_thread) @@ -2262,6 +2349,92 @@ static int base64encode(uint8_t const * const data_buf, return 0; /* indicate success */ } +static void jsonize_data_analysis(struct nDPId_reader_thread * const reader_thread, + struct nDPId_flow_extended const * const flow_ext) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + struct nDPId_flow_analysis * const analysis = (struct nDPId_flow_analysis *)flow_ext->flow_analysis; + + if (nDPId_options.enable_data_analysis != 0 && flow_ext->flow_analysis != NULL) + { + ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "data_analysis"); + ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "iat"); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_min", ndpi_data_min(&analysis->iat_flow)); + ndpi_serialize_string_float(&workflow->ndpi_serializer, + "flow_avg", + ndpi_data_average(&analysis->iat_flow), + "%.1f"); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_max", ndpi_data_max(&analysis->iat_flow)); + ndpi_serialize_string_float(&workflow->ndpi_serializer, + "flow_stddev", + ndpi_data_stddev(&analysis->iat_flow), + "%.1f"); + + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, + "c_to_s_min", + ndpi_data_min(&analysis->iat[FD_SRC2DST])); + ndpi_serialize_string_float(&workflow->ndpi_serializer, + "c_to_s_avg", + ndpi_data_average(&analysis->iat[FD_SRC2DST]), + "%.1f"); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, + "c_to_s_max", + ndpi_data_max(&analysis->iat[FD_SRC2DST])); + ndpi_serialize_string_float(&workflow->ndpi_serializer, + "c_to_s_stddev", + ndpi_data_stddev(&analysis->iat[FD_SRC2DST]), + "%.1f"); + + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, + "s_to_c_min", + ndpi_data_min(&analysis->iat[FD_DST2SRC])); + ndpi_serialize_string_float(&workflow->ndpi_serializer, + "s_to_c_avg", + ndpi_data_average(&analysis->iat[FD_DST2SRC]), + "%.1f"); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, + "s_to_c_max", + ndpi_data_max(&analysis->iat[FD_DST2SRC])); + ndpi_serialize_string_float(&workflow->ndpi_serializer, + "s_to_c_stddev", + ndpi_data_stddev(&analysis->iat[FD_DST2SRC]), + "%.1f"); + ndpi_serialize_end_of_block(&workflow->ndpi_serializer); + ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "pktlen"); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, + "c_to_s_min", + ndpi_data_min(&analysis->pktlen[FD_SRC2DST])); + ndpi_serialize_string_float(&workflow->ndpi_serializer, + "c_to_s_avg", + ndpi_data_average(&analysis->pktlen[FD_SRC2DST]), + "%.1f"); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, + "c_to_s_max", + ndpi_data_max(&analysis->pktlen[FD_SRC2DST])); + ndpi_serialize_string_float(&workflow->ndpi_serializer, + "c_to_s_stddev", + ndpi_data_stddev(&analysis->pktlen[FD_SRC2DST]), + "%.1f"); + + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, + "s_to_c_min", + ndpi_data_min(&analysis->pktlen[FD_DST2SRC])); + ndpi_serialize_string_float(&workflow->ndpi_serializer, + "s_to_c_avg", + ndpi_data_average(&analysis->pktlen[FD_DST2SRC]), + "%.1f"); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, + "s_to_c_max", + ndpi_data_max(&analysis->pktlen[FD_DST2SRC])); + ndpi_serialize_string_float(&workflow->ndpi_serializer, + "s_to_c_stddev", + ndpi_data_stddev(&analysis->pktlen[FD_DST2SRC]), + "%.1f"); + ndpi_serialize_end_of_block(&workflow->ndpi_serializer); + ndpi_serialize_end_of_block(&workflow->ndpi_serializer); + } +} + static void jsonize_packet_event(struct nDPId_reader_thread * const reader_thread, struct pcap_pkthdr const * const header, uint8_t const * const packet, @@ -2310,7 +2483,12 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packet_id", flow_ext->packets_processed[FD_SRC2DST] + flow_ext->packets_processed[FD_DST2SRC]); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_last_seen", flow_ext->flow_basic.last_seen); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_src_last_pkt_time", + flow_ext->flow_basic.last_pkt_time[FD_SRC2DST]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_dst_last_pkt_time", + flow_ext->flow_basic.last_pkt_time[FD_DST2SRC]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_idle_time", get_l4_protocol_idle_time_external(flow_ext->flow_basic.l4_protocol)); @@ -2327,7 +2505,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_offset", pkt_l4_offset); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_len", header->caplen); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_len", pkt_l4_len); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_msec", workflow->last_thread_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_usec", workflow->last_thread_time); if (base64_retval == 0 && base64_data_len > 0) { @@ -2381,6 +2559,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, case FLOW_EVENT_END: case FLOW_EVENT_IDLE: case FLOW_EVENT_UPDATE: + case FLOW_EVENT_ANALYSE: ndpi_serialize_string_int32(&workflow->ndpi_serializer, "flow_datalink", pcap_datalink(reader_thread->workflow->pcap_handle)); @@ -2388,6 +2567,10 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, "flow_max_packets", nDPId_options.max_packets_per_flow_to_send); + if (event == FLOW_EVENT_ANALYSE) + { + jsonize_data_analysis(reader_thread, flow_ext); + } if (flow_ext->flow_basic.state == FS_FINISHED) { struct nDPId_flow * const flow = (struct nDPId_flow *)flow_ext; @@ -2446,6 +2629,7 @@ static void jsonize_flow_detection_event(struct nDPId_reader_thread * const read case FLOW_EVENT_END: case FLOW_EVENT_IDLE: case FLOW_EVENT_UPDATE: + case FLOW_EVENT_ANALYSE: logger(1, "[%8llu, %4llu] internal error / invalid function call", workflow->packets_captured, @@ -2670,7 +2854,7 @@ __attribute__((format(printf, 3, 4))) static void jsonize_error_eventf(struct nD va_end(ap); } - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_msec", workflow->last_global_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_usec", workflow->last_global_time); serialize_and_send(reader_thread); } @@ -3172,8 +3356,11 @@ static void ndpi_process_packet(uint8_t * const args, const struct ndpi_iphdr * ip; struct ndpi_ipv6hdr * ip6; + const struct ndpi_tcphdr * tcp = NULL; + + uint64_t time_us; + uint64_t last_pkt_time; - uint64_t time_ms; uint16_t ip_offset = 0; uint16_t ip_size; @@ -3196,11 +3383,14 @@ static void ndpi_process_packet(uint8_t * const args, } workflow->packets_captured++; - time_ms = ((uint64_t)header->ts.tv_sec) * nDPId_options.tick_resolution + - header->ts.tv_usec / (1000000 / nDPId_options.tick_resolution); - if (workflow->last_global_time < time_ms) + time_us = ndpi_timeval_to_microseconds(header->ts); + if (workflow->last_global_time < time_us) { - workflow->last_global_time = time_ms; + workflow->last_global_time = time_us; + if (workflow->last_thread_time == 0) + { + workflow->last_thread_time = time_us; + } } do_periodically_work(reader_thread); @@ -3350,8 +3540,6 @@ static void ndpi_process_packet(uint8_t * const args, /* process layer4 e.g. TCP / UDP */ if (flow_basic.l4_protocol == IPPROTO_TCP) { - const struct ndpi_tcphdr * tcp; - if (header->caplen < (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)) { if (distribute_single_packet(reader_thread) != 0) @@ -3428,9 +3616,9 @@ static void ndpi_process_packet(uint8_t * const args, return; } - if (workflow->last_thread_time < time_ms) + if (workflow->last_thread_time < time_us) { - workflow->last_thread_time = time_ms; + workflow->last_thread_time = time_us; } /* calculate flow hash for btree find, search(insert) */ @@ -3639,7 +3827,8 @@ static void ndpi_process_packet(uint8_t * const args, struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result; /* Update last seen timestamp for timeout handling. */ - flow_basic_to_process->last_seen = workflow->last_thread_time; + last_pkt_time = flow_basic_to_process->last_pkt_time[direction]; + flow_basic_to_process->last_pkt_time[direction] = workflow->last_thread_time; /* TCP-FIN/TCP-RST: indicates that at least one side wants to end the connection. */ if (flow_basic.tcp_fin_rst_seen != 0) { @@ -3686,11 +3875,6 @@ static void ndpi_process_packet(uint8_t * const args, workflow->packets_processed++; workflow->total_l4_payload_len += l4_payload_len; - if (flow_to_process->flow_extended.first_seen == 0) - { - flow_to_process->flow_extended.first_seen = flow_to_process->flow_extended.flow_basic.last_seen = - flow_to_process->flow_extended.last_flow_update = workflow->last_thread_time; - } if (l4_payload_len > flow_to_process->flow_extended.max_l4_payload_len[direction]) { flow_to_process->flow_extended.max_l4_payload_len[direction] = l4_payload_len; @@ -3702,11 +3886,37 @@ static void ndpi_process_packet(uint8_t * const args, if (is_new_flow != 0) { + flow_to_process->flow_extended.first_seen = flow_to_process->flow_extended.flow_basic.last_pkt_time[direction] = + flow_to_process->flow_extended.flow_basic.last_pkt_time[1 - direction] = + flow_to_process->flow_extended.last_flow_update = workflow->last_thread_time; flow_to_process->flow_extended.max_l4_payload_len[direction] = l4_payload_len; flow_to_process->flow_extended.min_l4_payload_len[direction] = l4_payload_len; jsonize_flow_event(reader_thread, &flow_to_process->flow_extended, FLOW_EVENT_NEW); } + if (nDPId_options.enable_data_analysis != 0 && flow_to_process->flow_extended.flow_analysis != NULL && + flow_to_process->flow_extended.packets_processed[FD_SRC2DST] + + flow_to_process->flow_extended.packets_processed[FD_DST2SRC] <= + nDPId_options.max_packets_per_flow_to_analyze) + { + uint64_t tdiff_us = timer_sub(workflow->last_thread_time, last_pkt_time); + + if (tdiff_us > 0) + { + ndpi_data_add_value(&flow_to_process->flow_extended.flow_analysis->iat[direction], tdiff_us); + ndpi_data_add_value(&flow_to_process->flow_extended.flow_analysis->iat_flow, tdiff_us); + } + + ndpi_data_add_value(&flow_to_process->flow_extended.flow_analysis->pktlen[direction], header->caplen); + + if (flow_to_process->flow_extended.packets_processed[FD_SRC2DST] + + flow_to_process->flow_extended.packets_processed[FD_DST2SRC] == + nDPId_options.max_packets_per_flow_to_analyze) + { + jsonize_flow_event(reader_thread, &flow_to_process->flow_extended, FLOW_EVENT_ANALYSE); + } + } + jsonize_packet_event(reader_thread, header, packet, @@ -3844,7 +4054,7 @@ static void ndpi_log_flow_walker(void const * const A, ndpi_VISIT which, int dep { struct nDPId_flow const * const flow = (struct nDPId_flow *)flow_basic; - uint64_t last_seen = flow->flow_extended.flow_basic.last_seen; + uint64_t last_seen = get_last_pkt_time(flow_basic); uint64_t idle_time = get_l4_protocol_idle_time_external(flow->flow_extended.flow_basic.l4_protocol); logger(0, "[%2zu][%4llu][last-seen: %13llu][last-update: %13llu][idle-time: %7llu][time-until-timeout: " @@ -3864,7 +4074,7 @@ static void ndpi_log_flow_walker(void const * const A, ndpi_VISIT which, int dep { struct nDPId_flow const * const flow = (struct nDPId_flow *)flow_basic; - uint64_t last_seen = flow->flow_extended.flow_basic.last_seen; + uint64_t last_seen = get_last_pkt_time(flow_basic); uint64_t idle_time = get_l4_protocol_idle_time_external(flow->flow_extended.flow_basic.l4_protocol); logger(0, "[%2zu][%4llu][last-seen: %13llu][last-update: %13llu][idle-time: %7llu][time-until-timeout: " @@ -3999,11 +4209,11 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) { struct timeval tval_diff; get_current_time(&tval_after_epoll); - timersub(&tval_after_epoll, &tval_before_epoll, &tval_diff); - uint64_t tdiff_ms = tval_diff.tv_sec * 1000 + tval_diff.tv_usec / 1000; + ndpi_timer_sub(&tval_after_epoll, &tval_before_epoll, &tval_diff); + uint64_t tdiff_us = tval_diff.tv_sec * 1000 * 1000 + tval_diff.tv_usec; - reader_thread->workflow->last_global_time += tdiff_ms; - reader_thread->workflow->last_thread_time += tdiff_ms; + reader_thread->workflow->last_global_time += tdiff_us; + reader_thread->workflow->last_thread_time += tdiff_us; do_periodically_work(reader_thread); } @@ -4418,6 +4628,9 @@ static void print_subopt_usage(void) case MAX_PACKETS_PER_FLOW_TO_PROCESS: fprintf(stderr, "%llu\n", nDPId_options.max_packets_per_flow_to_process); break; + case MAX_PACKETS_PER_FLOW_TO_ANALYZE: + fprintf(stderr, "%llu\n", nDPId_options.max_packets_per_flow_to_analyze); + break; } } else @@ -4442,7 +4655,9 @@ static int nDPId_parse_options(int argc, char ** argv) "[-u user] [-g group] " "[-P path] [-C path] [-J path]\n" "\t \t" - "[-a instance-alias] [-o subopt=value]\n" + "[-a instance-alias] [-A]\n" + "\t \t" + "[-o subopt=value]\n" "\t \t" "[-v] [-h]\n\n" "\t-i\tInterface or file from where to read packets from.\n" @@ -4469,6 +4684,7 @@ static int nDPId_parse_options(int argc, char ** argv) "\t \tThis value is required for correct flow handling of\n" "\t \tmultiple instances and should be unique.\n" "\t \tDefaults to your hostname.\n" + "\t-A\tEnable flow analysis aka feature extraction. Requires more memory and cpu usage.\n" #ifdef ENABLE_ZLIB "\t-z\tEnable flow memory zLib compression.\n" #endif @@ -4476,7 +4692,7 @@ static int nDPId_parse_options(int argc, char ** argv) "\t-v\tversion\n" "\t-h\tthis\n\n"; - while ((opt = getopt(argc, argv, "i:IEB:lL:c:dp:u:g:P:C:J:S:a:zo:vh")) != -1) + while ((opt = getopt(argc, argv, "i:IEB:lL:c:dp:u:g:P:C:J:S:a:Azo:vh")) != -1) { switch (opt) { @@ -4533,6 +4749,9 @@ static int nDPId_parse_options(int argc, char ** argv) case 'a': nDPId_options.instance_alias = strdup(optarg); break; + case 'A': + nDPId_options.enable_data_analysis = 1; + break; case 'z': #ifdef ENABLE_ZLIB nDPId_options.enable_zlib_compression = 1; @@ -4633,6 +4852,10 @@ static int nDPId_parse_options(int argc, char ** argv) break; case MAX_PACKETS_PER_FLOW_TO_PROCESS: nDPId_options.max_packets_per_flow_to_process = value_llu; + break; + case MAX_PACKETS_PER_FLOW_TO_ANALYZE: + nDPId_options.max_packets_per_flow_to_analyze = value_llu; + break; } } break; |