diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-12-15 23:25:32 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-01-20 00:50:38 +0100 |
commit | 9e07a57566cc45bf92a845d8cee968d72e0f314e (patch) | |
tree | 8f1a6bfd08bd68a5253fadf3a01beecda77b1c95 /nDPId.c | |
parent | a35fc1d5ea8570609cc0c8cf6edadc81f8f5bb76 (diff) |
Major nDPId extension. Sorry for the huge commit.
- nDPId: fixed invalid IP4/IP6 tuple compare
- nDPIsrvd: fixed caching issue (finally)
- added tiny c example (can be used to check flow manager sanity)
- c-captured: use flow_last_seen timestamp from `struct nDPIsrvd_flow`
- README.md update: added example JSON sequence
- nDPId: added new flow event `update` necessary for correct
timeout handling (and other future use-cases)
- nDPIsrvd.h and nDPIsrvd.py: switched to an instance
(consists of an alias/source tuple) based flow manager
- every flow related event **must** now serialize `alias`, `source`,
`flow_id`, `flow_last_seen` and `flow_idle_time` to make the timeout
handling and verification process work correctly
- nDPIsrvd.h: ability to profile any dynamic memory (de-)allocation
- nDPIsrvd.py: removed PcapPacket class (unused)
- py-flow-dashboard and py-flow-multiprocess: fixed race condition
- py-flow-info: print statusbar with probably useful information
- nDPId/nDPIsrvd.h: switched from packet-flow only timestamps (`pkt_*sec`)
to a generic flow event timestamp `ts_msec`
- nDPId-test: added additional checks
- nDPId: increased ICMP flow timeout
- nDPId: using event based i/o if capturing packets from a device
- nDPIsrvd: fixed memory leak on shutdown if remote descriptors
were still connected
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r-- | nDPId.c | 769 |
1 files changed, 560 insertions, 209 deletions
@@ -14,6 +14,7 @@ #include <stdarg.h> #include <stdio.h> #include <stdlib.h> +#include <sys/epoll.h> #include <sys/ioctl.h> #include <sys/un.h> #ifndef NO_MAIN @@ -39,10 +40,15 @@ #error "Compare and Swap aka __sync_fetch_and_add not available on your platform!" #endif -#if nDPId_MAX_READER_THREADS < 0 +#if nDPId_MAX_READER_THREADS <= 0 #error "Invalid value for nDPId_MAX_READER_THREADS" #endif +#if nDPId_FLOW_SCAN_INTERVAL > nDPId_GENERIC_IDLE_TIME || nDPId_FLOW_SCAN_INTERVAL > nDPId_ICMP_IDLE_TIME || \ + nDPId_FLOW_SCAN_INTERVAL > nDPId_TCP_IDLE_TIME || nDPId_FLOW_SCAN_INTERVAL > nDPId_UDP_IDLE_TIME +#error "Invalid value for nDPId_FLOW_SCAN_INTERVAL" +#endif + enum nDPId_l3_type { L3_IP, @@ -67,10 +73,10 @@ union nDPId_ip enum nDPId_flow_type { - FT_UNKNOWN = 0, - FT_SKIPPED, - FT_FINISHED, - FT_INFO + FT_UNKNOWN = 0, // should never happen, bug otherwise + FT_SKIPPED, // flow should not be processed, see command line args -I and -E + FT_FINISHED, // detection done and detection data free'd + FT_INFO // detection in progress, detection data allocated }; /* @@ -100,13 +106,14 @@ struct nDPId_flow_extended { struct nDPId_flow_basic flow_basic; - uint32_t flow_id; + unsigned int long long flow_id; uint16_t min_l4_payload_len; uint16_t max_l4_payload_len; unsigned long long int packets_processed; uint64_t first_seen; + uint64_t last_flow_update; unsigned long long int total_l4_payload_len; }; @@ -157,14 +164,19 @@ struct nDPId_workflow { pcap_t * pcap_handle; - int error_or_eof; - int reserved_00; + uint16_t error_or_eof; + uint16_t is_pcap_file; unsigned long long int packets_captured; unsigned long long int packets_processed; unsigned long long int total_skipped_flows; unsigned long long int total_l4_data_len; - unsigned long long int detected_flow_protocols; + + unsigned long long int total_not_detected_flows; + unsigned long long int total_guessed_flows; + unsigned long long int total_detected_flows; + unsigned long long int total_flow_detection_updates; + unsigned long long int total_flow_updates; #ifdef ENABLE_MEMORY_PROFILING uint64_t last_memory_usage_log_time; @@ -172,7 +184,7 @@ struct nDPId_workflow #ifdef ENABLE_ZLIB uint64_t last_compression_scan_time; #endif - uint64_t last_idle_scan_time; + uint64_t last_scan_time; uint64_t last_time; void ** ndpi_flows_active; @@ -185,6 +197,8 @@ struct nDPId_workflow unsigned long long int cur_idle_flows; unsigned long long int total_idle_flows; + unsigned long long int total_events_serialized; + ndpi_serializer ndpi_serializer; struct ndpi_detection_module_struct * ndpi_struct; }; @@ -202,8 +216,11 @@ enum packet_event { PACKET_EVENT_INVALID = 0, - PACKET_EVENT_PAYLOAD, - PACKET_EVENT_PAYLOAD_FLOW, + PACKET_EVENT_PAYLOAD, // A single packet that does not belong to a flow for whatever reasons. + // E.g. it could be malformed and thus no flow handling is done. + // There may be additional use-cases in the future. + PACKET_EVENT_PAYLOAD_FLOW, // Special case; A packet event that belongs to a flow but does not include all + // information a flow event requires. PACKET_EVENT_COUNT }; @@ -215,6 +232,7 @@ enum flow_event FLOW_EVENT_NEW, FLOW_EVENT_END, FLOW_EVENT_IDLE, + FLOW_EVENT_UPDATE, // Inform distributor applications about flows with a long lifetime. FLOW_EVENT_GUESSED, FLOW_EVENT_DETECTED, @@ -267,6 +285,7 @@ static char const * const flow_event_name_table[FLOW_EVENT_COUNT] = {[FLOW_EVENT [FLOW_EVENT_NEW] = "new", [FLOW_EVENT_END] = "end", [FLOW_EVENT_IDLE] = "idle", + [FLOW_EVENT_UPDATE] = "update", [FLOW_EVENT_GUESSED] = "guessed", [FLOW_EVENT_DETECTED] = "detected", [FLOW_EVENT_DETECTION_UPDATE] = "detection-update", @@ -345,13 +364,13 @@ static struct unsigned long long int tick_resolution; unsigned long long int reader_thread_count; #ifdef ENABLE_MEMORY_PROFILING - unsigned long long int memory_profiling_print_every; + unsigned long long int memory_profiling_log_interval; #endif #ifdef ENABLE_ZLIB - unsigned long long int compression_scan_period; + unsigned long long int compression_scan_interval; unsigned long long int compression_flow_inactivity; #endif - unsigned long long int idle_scan_period; + unsigned long long int flow_scan_interval; unsigned long long int generic_max_idle_time; unsigned long long int icmp_max_idle_time; unsigned long long int udp_max_idle_time; @@ -367,13 +386,13 @@ static struct .tick_resolution = nDPId_TICK_RESOLUTION, .reader_thread_count = nDPId_MAX_READER_THREADS / 2, #ifdef ENABLE_MEMORY_PROFILING - .memory_profiling_print_every = nDPId_LOG_MEMORY_USAGE_EVERY, + .memory_profiling_log_interval = nDPId_MEMORY_PROFILING_LOG_INTERVAL, #endif #ifdef ENABLE_ZLIB - .compression_scan_period = nDPId_COMPRESSION_SCAN_PERIOD, + .compression_scan_interval = nDPId_COMPRESSION_SCAN_INTERVAL, .compression_flow_inactivity = nDPId_COMPRESSION_FLOW_INACTIVITY, #endif - .idle_scan_period = nDPId_IDLE_SCAN_PERIOD, + .flow_scan_interval = nDPId_FLOW_SCAN_INTERVAL, .generic_max_idle_time = nDPId_GENERIC_IDLE_TIME, .icmp_max_idle_time = nDPId_ICMP_IDLE_TIME, .udp_max_idle_time = nDPId_UDP_IDLE_TIME, @@ -388,14 +407,14 @@ enum nDPId_subopts MAX_IDLE_FLOWS_PER_THREAD, TICK_RESOLUTION, MAX_READER_THREADS, - IDLE_SCAN_PERIOD, #ifdef ENABLE_MEMORY_PROFILING - MEMORY_PROFILING_PRINT_EVERY, + MEMORY_PROFILING_LOG_INTERVAL, #endif #ifdef ENABLE_ZLIB - COMPRESSION_SCAN_PERIOD, + COMPRESSION_SCAN_INTERVAL, COMPRESSION_FLOW_INACTIVITY, #endif + FLOW_SCAN_INTVERAL, GENERIC_MAX_IDLE_TIME, ICMP_MAX_IDLE_TIME, UDP_MAX_IDLE_TIME, @@ -409,13 +428,13 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th [TICK_RESOLUTION] = "tick-resolution", [MAX_READER_THREADS] = "max-reader-threads", #ifdef ENABLE_MEMORY_PROFILING - [MEMORY_PROFILING_PRINT_EVERY] = "memory-profiling-print-every", + [MEMORY_PROFILING_LOG_INTERVAL] = "memory-profiling-log-interval", #endif #ifdef ENABLE_ZLIB - [COMPRESSION_SCAN_PERIOD] = "compression-scan-period", + [COMPRESSION_SCAN_INTERVAL] = "compression-scan-interval", [COMPRESSION_FLOW_INACTIVITY] = "compression-flow-activity", #endif - [IDLE_SCAN_PERIOD] = "idle-scan-period", + [FLOW_SCAN_INTVERAL] = "flow-scan-interval", [GENERIC_MAX_IDLE_TIME] = "generic-max-idle-time", [ICMP_MAX_IDLE_TIME] = "icmp-max-idle-time", [UDP_MAX_IDLE_TIME] = "udp-max-idle-time", @@ -425,11 +444,15 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th [MAX_PACKETS_PER_FLOW_TO_PROCESS] = "max-packets-per-flow-to-process", NULL}; +static int processing_threads_error_or_eof(void); static void free_workflow(struct nDPId_workflow ** const workflow); static void serialize_and_send(struct nDPId_reader_thread * const reader_thread); static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, - struct nDPId_flow_info * const flow, + struct nDPId_flow_extended * const flow_ext, enum flow_event event); +static void jsonize_flow_detection_event(struct nDPId_reader_thread * const reader_thread, + struct nDPId_flow_info * const flow_info, + enum flow_event event); #ifdef ENABLE_ZLIB static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen) @@ -624,7 +647,7 @@ static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int de if (ret <= 0) { syslog(LOG_DAEMON | LOG_ERR, - "zLib compression failed for flow %u with error code: %d", + "zLib compression failed for flow %llu with error code: %d", flow_info->flow_extended.flow_id, ret); } @@ -639,7 +662,7 @@ static void check_for_compressable_flows(struct nDPId_reader_thread * const read { struct nDPId_workflow * const workflow = reader_thread->workflow; - if (workflow->last_compression_scan_time + nDPId_options.compression_scan_period < workflow->last_time) + if (workflow->last_compression_scan_time + nDPId_options.compression_scan_interval < workflow->last_time) { for (size_t comp_scan_index = 0; comp_scan_index < workflow->max_active_flows; ++comp_scan_index) { @@ -1053,13 +1076,13 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F #if 1 written = snprintf(output + output_used, BUFSIZ - output_used, - "%u,", + "%llu,", flow_finished->flow_info.flow_extended.flow_id); #else written = snprintf(output + output_used, BUFSIZ - output_used, - "[%u, %u, %llu],", + "[%llu, %u, %llu],", flow_finished->flow_info.flow_extended.flow_id, flow_finished->flow_info.flow_extended.flow_basic.l4_protocol, (unsigned long long int const)flow_finished->flow_info.flow_extended.flow_basic.last_seen); @@ -1072,11 +1095,12 @@ static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_F struct nDPId_flow_info const * const flow_info = (struct nDPId_flow_info const *)flow_basic; #if 1 - written = snprintf(output + output_used, BUFSIZ - output_used, "%u,", flow_info->flow_extended.flow_id); + written = + snprintf(output + output_used, BUFSIZ - output_used, "%llu,", flow_info->flow_extended.flow_id); #else written = snprintf(output + output_used, BUFSIZ - output_used, - "[%u, %u, %llu],", + "[%llu, %u, %llu],", flow_info->flow_extended.flow_id, flow_info->flow_extended.flow_basic.l4_protocol, (unsigned long long int const)flow_info->flow_extended.flow_basic.last_seen); @@ -1152,7 +1176,6 @@ static void log_flows(struct nDPId_reader_thread const * const reader_thread) static struct nDPId_workflow * init_workflow(char const * const file_or_device) { - int pcap_argument_is_file = 0; char pcap_error_buffer[PCAP_ERRBUF_SIZE]; struct nDPId_workflow * workflow; @@ -1178,19 +1201,26 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device) { workflow->pcap_handle = pcap_open_offline_with_tstamp_precision(file_or_device, PCAP_TSTAMP_PRECISION_MICRO, pcap_error_buffer); - pcap_argument_is_file = 1; + workflow->is_pcap_file = 1; } if (workflow->pcap_handle == NULL) { syslog(LOG_DAEMON | LOG_ERR, - (pcap_argument_is_file == 0 ? "pcap_open_live: %.*s" : "pcap_open_offline_with_tstamp_precision: %.*s"), + (workflow->is_pcap_file == 0 ? "pcap_open_live: %.*s" : "pcap_open_offline_with_tstamp_precision: %.*s"), (int)PCAP_ERRBUF_SIZE, pcap_error_buffer); free_workflow(&workflow); return NULL; } + if (workflow->is_pcap_file == 0 && pcap_setnonblock(workflow->pcap_handle, 1, pcap_error_buffer) == PCAP_ERROR) + { + syslog(LOG_DAEMON | LOG_ERR, "pcap_setnonblock: %.*s", (int)PCAP_ERRBUF_SIZE, pcap_error_buffer); + free_workflow(&workflow); + return NULL; + } + if (nDPId_options.bpf_str != NULL) { struct bpf_program fp; @@ -1422,27 +1452,6 @@ static int setup_reader_threads(void) return 0; } -static int ip_tuples_equal(struct nDPId_flow_basic const * const A, struct nDPId_flow_basic const * const B) -{ - // generate a warning if the enum changes - switch (A->l3_type) - { - case L3_IP: - case L3_IP6: - break; - } - if (A->l3_type == L3_IP && B->l3_type == L3_IP6) - { - return A->src.v4.ip == B->src.v4.ip && A->dst.v4.ip == B->dst.v4.ip; - } - else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6) - { - return A->src.v6.ip[0] == B->src.v6.ip[0] && A->src.v6.ip[1] == B->src.v6.ip[1] && - A->dst.v6.ip[0] == B->dst.v6.ip[0] && A->dst.v6.ip[1] == B->dst.v6.ip[1]; - } - return 0; -} - static int ip_tuples_compare(struct nDPId_flow_basic const * const A, struct nDPId_flow_basic const * const B) { // generate a warning if the enum changes @@ -1452,38 +1461,63 @@ static int ip_tuples_compare(struct nDPId_flow_basic const * const A, struct nDP case L3_IP6: break; } - if (A->l3_type == L3_IP && B->l3_type == L3_IP6) + + if (A->l3_type == L3_IP && B->l3_type == L3_IP) { - if (A->src.v4.ip < B->src.v4.ip || A->dst.v4.ip < B->dst.v4.ip) + if (A->src.v4.ip < B->src.v4.ip) { return -1; } - if (A->src.v4.ip > B->src.v4.ip || A->dst.v4.ip > B->dst.v4.ip) + if (A->src.v4.ip > B->src.v4.ip) + { + return 1; + } + if (A->dst.v4.ip < B->dst.v4.ip) + { + return -1; + } + if (A->dst.v4.ip > B->dst.v4.ip) { return 1; } } else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6) { - if ((A->src.v6.ip[0] < B->src.v6.ip[0] && A->src.v6.ip[1] < B->src.v6.ip[1]) || - (A->dst.v6.ip[0] < B->dst.v6.ip[0] && A->dst.v6.ip[1] < B->dst.v6.ip[1])) + if (A->src.v6.ip[0] < B->src.v6.ip[0] && A->src.v6.ip[1] < B->src.v6.ip[1]) + { + return -1; + } + if (A->src.v6.ip[0] > B->src.v6.ip[0] && A->src.v6.ip[1] > B->src.v6.ip[1]) + { + return 1; + } + if (A->dst.v6.ip[0] < B->dst.v6.ip[0] && A->dst.v6.ip[1] < B->dst.v6.ip[1]) { return -1; } - if ((A->src.v6.ip[0] > B->src.v6.ip[0] && A->src.v6.ip[1] > B->src.v6.ip[1]) || - (A->dst.v6.ip[0] > B->dst.v6.ip[0] && A->dst.v6.ip[1] > B->dst.v6.ip[1])) + if (A->dst.v6.ip[0] > B->dst.v6.ip[0] && A->dst.v6.ip[1] > B->dst.v6.ip[1]) { return 1; } } - if (A->src_port < B->src_port || A->dst_port < B->dst_port) + + if (A->src_port < B->src_port) + { + return -1; + } + if (A->src_port > B->src_port) + { + return 1; + } + if (A->dst_port < B->dst_port) { return -1; } - else if (A->src_port > B->src_port || A->dst_port > B->dst_port) + if (A->dst_port > B->dst_port) { return 1; } + return 0; } @@ -1506,9 +1540,25 @@ static uint64_t get_l4_protocol_idle_time(uint8_t l4_protocol) static int is_l4_protocol_timed_out(struct nDPId_workflow const * const workflow, struct nDPId_flow_basic const * const flow_basic) { - return flow_basic->last_seen + get_l4_protocol_idle_time(flow_basic->l4_protocol) < workflow->last_time || + uint64_t itime = + get_l4_protocol_idle_time(flow_basic->l4_protocol) - (flow_basic->last_seen % nDPId_options.flow_scan_interval); + + return (flow_basic->last_seen + itime <= workflow->last_time) || (flow_basic->tcp_fin_rst_seen == 1 && - flow_basic->last_seen + nDPId_options.tcp_max_post_end_flow_time < workflow->last_time); + flow_basic->last_seen + nDPId_options.tcp_max_post_end_flow_time <= workflow->last_time); +} + +static int is_flow_update_required(struct nDPId_workflow const * const workflow, + struct nDPId_flow_extended const * const flow_ext) +{ + uint64_t itime = get_l4_protocol_idle_time(flow_ext->flow_basic.l4_protocol) / 2; + + if (is_l4_protocol_timed_out(workflow, &flow_ext->flow_basic) != 0) + { + return 0; + } + + return flow_ext->last_flow_update + itime <= workflow->last_time; } static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) @@ -1572,12 +1622,6 @@ static int ndpi_workflow_node_cmp(void const * const A, void const * const B) return 1; } - if (ip_tuples_equal(flow_basic_a, flow_basic_b) != 0 && flow_basic_a->src_port == flow_basic_b->src_port && - flow_basic_a->dst_port == flow_basic_b->dst_port) - { - return 0; - } - return ip_tuples_compare(flow_basic_a, flow_basic_b); } @@ -1602,11 +1646,11 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, if (flow_basic->tcp_fin_rst_seen != 0) { - jsonize_flow_event(reader_thread, &flow_finished->flow_info, FLOW_EVENT_END); + jsonize_flow_event(reader_thread, &flow_finished->flow_info.flow_extended, FLOW_EVENT_END); } else { - jsonize_flow_event(reader_thread, &flow_finished->flow_info, FLOW_EVENT_IDLE); + jsonize_flow_event(reader_thread, &flow_finished->flow_info.flow_extended, FLOW_EVENT_IDLE); } break; } @@ -1644,20 +1688,22 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, if (protocol_was_guessed != 0) { - jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_GUESSED); + workflow->total_guessed_flows++; + jsonize_flow_detection_event(reader_thread, flow_info, FLOW_EVENT_GUESSED); } else { - jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_NOT_DETECTED); + workflow->total_not_detected_flows++; + jsonize_flow_detection_event(reader_thread, flow_info, FLOW_EVENT_NOT_DETECTED); } } if (flow_basic->tcp_fin_rst_seen != 0) { - jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_END); + jsonize_flow_event(reader_thread, &flow_info->flow_extended, FLOW_EVENT_END); } else { - jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_IDLE); + jsonize_flow_event(reader_thread, &flow_info->flow_extended, FLOW_EVENT_IDLE); } break; } @@ -1673,15 +1719,58 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa { struct nDPId_workflow * const workflow = reader_thread->workflow; - if (workflow->last_idle_scan_time + nDPId_options.idle_scan_period < workflow->last_time) + for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index) + { + ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow); + process_idle_flow(reader_thread, idle_scan_index); + } +} + +static void ndpi_flow_update_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) +{ + struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)user_data; + struct nDPId_workflow * const workflow = reader_thread->workflow; + struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; + + (void)depth; + + if (workflow == NULL || flow_basic == NULL) + { + return; + } + + if (which == ndpi_preorder || which == ndpi_leaf) { - for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index) + switch (flow_basic->type) { - ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow); - process_idle_flow(reader_thread, idle_scan_index); + case FT_UNKNOWN: + case FT_SKIPPED: + break; + + case FT_FINISHED: + case FT_INFO: + { + struct nDPId_flow_extended * const flow_ext = (struct nDPId_flow_extended *)flow_basic; + + if (is_flow_update_required(workflow, flow_ext) != 0) + { + workflow->total_flow_updates++; + jsonize_flow_event(reader_thread, flow_ext, FLOW_EVENT_UPDATE); + flow_ext->last_flow_update = workflow->last_time; + } + break; + } } + } +} - workflow->last_idle_scan_time = workflow->last_time; +static void check_for_flow_updates(struct nDPId_reader_thread * const reader_thread) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + + for (size_t update_scan_index = 0; update_scan_index < workflow->max_active_flows; ++update_scan_index) + { + ndpi_twalk(workflow->ndpi_flows_active[update_scan_index], ndpi_flow_update_scan_walker, reader_thread); } } @@ -1782,42 +1871,55 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu if (event == DAEMON_EVENT_INIT) { - ndpi_serialize_string_int64(&workflow->ndpi_serializer, - "max-flows-per-thread", - nDPId_options.max_flows_per_thread); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, - "max-idle-flows-per-thread", - nDPId_options.max_idle_flows_per_thread); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, "tick-resolution", nDPId_options.tick_resolution); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, - "reader-thread-count", - nDPId_options.reader_thread_count); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, "idle-scan-period", nDPId_options.idle_scan_period); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, - "generic-max-idle-time", - nDPId_options.generic_max_idle_time); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, "icmp-max-idle-time", nDPId_options.icmp_max_idle_time); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, "udp-max-idle-time", nDPId_options.udp_max_idle_time); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, "tcp-max-idle-time", nDPId_options.tcp_max_idle_time); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, - "tcp-max-post-end-flow-time", - nDPId_options.tcp_max_post_end_flow_time); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, - "max-packets-per-flow-to-send", - nDPId_options.max_packets_per_flow_to_send); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, - "max-packets-per-flow-to-process", - nDPId_options.max_packets_per_flow_to_process); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-flows-per-thread", + nDPId_options.max_flows_per_thread); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-idle-flows-per-thread", + nDPId_options.max_idle_flows_per_thread); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "tick-resolution", nDPId_options.tick_resolution); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "reader-thread-count", + nDPId_options.reader_thread_count); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow-scan-interval", + nDPId_options.flow_scan_interval); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "generic-max-idle-time", + nDPId_options.generic_max_idle_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "icmp-max-idle-time", + nDPId_options.icmp_max_idle_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "udp-max-idle-time", nDPId_options.udp_max_idle_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "tcp-max-idle-time", + nDPId_options.tcp_max_idle_time + nDPId_options.tcp_max_post_end_flow_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-packets-per-flow-to-send", + nDPId_options.max_packets_per_flow_to_send); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-packets-per-flow-to-process", + nDPId_options.max_packets_per_flow_to_process); + } + else if (event == DAEMON_EVENT_SHUTDOWN) + { + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-events-serialized", + workflow->total_events_serialized + + 1 /* DAEMON_EVENT_SHUTDOWN is an event as well */); } serialize_and_send(reader_thread); } static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_flow_extended const * const flow_ext) { - ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_id", flow_ext->flow_id); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packet_id", flow_ext->packets_processed); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_id", flow_ext->flow_id); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packets_processed", flow_ext->packets_processed); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_first_seen", flow_ext->first_seen); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_last_seen", flow_ext->flow_basic.last_seen); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_idle_time", + get_l4_protocol_idle_time(flow_ext->flow_basic.l4_protocol)); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_min_l4_payload_len", flow_ext->min_l4_payload_len); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_max_l4_payload_len", flow_ext->max_l4_payload_len); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_tot_l4_payload_len", flow_ext->total_l4_payload_len); @@ -1827,6 +1929,7 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl ? flow_ext->total_l4_payload_len / flow_ext->packets_processed : 0)); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow_ext->flow_basic.tcp_is_midstream_flow); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "ts_msec", workflow->last_time); } static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thread) @@ -1890,6 +1993,7 @@ static void send_to_json_sink(struct nDPId_reader_thread * const reader_thread, json_str_len + 1, (int)json_str_len, json_str); + if (s_ret < 0 || s_ret > (int)sizeof(newline_json_str)) { syslog(LOG_DAEMON | LOG_ERR, @@ -1972,12 +2076,14 @@ static void serialize_and_send(struct nDPId_reader_thread * const reader_thread) } else { + reader_thread->workflow->total_events_serialized++; send_to_json_sink(reader_thread, json_str, json_str_len); } ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer); } /* Slightly modified code from: https://en.wikibooks.org/wiki/Algorithm_Implementation/Miscellaneous/Base64 */ +static char const * const base64_ret_strings[] = {"Success", "Buffer too small"}; static int base64encode(uint8_t const * const data_buf, size_t dataLength, char * const result, @@ -2109,8 +2215,6 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa { return; } - ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_id", flow_ext->flow_id); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packet_id", flow_ext->packets_processed); } ndpi_serialize_string_int32(&workflow->ndpi_serializer, "packet_event_id", event); @@ -2125,34 +2229,53 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa jsonize_basic(reader_thread); + if (event == PACKET_EVENT_PAYLOAD_FLOW) + { + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_id", flow_ext->flow_id); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packet_id", flow_ext->packets_processed); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_last_seen", flow_ext->flow_basic.last_seen); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_idle_time", + get_l4_protocol_idle_time(flow_ext->flow_basic.l4_protocol)); + } + char base64_data[NETWORK_BUFFER_MAX_SIZE]; size_t base64_data_len = sizeof(base64_data); int base64_retval = base64encode(packet, header->caplen, base64_data, &base64_data_len); ndpi_serialize_string_boolean(&workflow->ndpi_serializer, "pkt_oversize", base64_data_len > sizeof(base64_data)); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pkt_ts_sec", header->ts.tv_sec); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pkt_ts_usec", header->ts.tv_usec); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_caplen", header->caplen); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_type", pkt_type); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l3_offset", pkt_l3_offset); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_offset", pkt_l4_offset); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_len", header->len); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_len", pkt_l4_len); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "ts_msec", workflow->last_time); - if (base64_retval == 0 && base64_data_len > 0 && - ndpi_serialize_string_binary(&workflow->ndpi_serializer, "pkt", base64_data, base64_data_len) != 0) + if (base64_retval == 0 && base64_data_len > 0) + { + if (ndpi_serialize_string_binary(&workflow->ndpi_serializer, "pkt", base64_data, base64_data_len) != 0) + { + syslog(LOG_DAEMON | LOG_ERR, + "[%8llu, %d] JSON serializing base64 packet buffer failed", + reader_thread->workflow->packets_captured, + reader_thread->array_index); + } + } + else { syslog(LOG_DAEMON | LOG_ERR, - "[%8llu, %d] JSON serializing base64 packet buffer failed", + "[%8llu, %d] Base64 encoding failed with: %s.", reader_thread->workflow->packets_captured, - reader_thread->array_index); + reader_thread->array_index, + base64_ret_strings[base64_retval]); } serialize_and_send(reader_thread); } -/* I decided against ndpi_flow2json as does not fulfill my needs. */ +/* I decided against ndpi_flow2json as it does not fulfill my needs. */ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, - struct nDPId_flow_info * const flow, + struct nDPId_flow_extended * const flow_ext, enum flow_event event) { struct nDPId_workflow * const workflow = reader_thread->workflow; @@ -2168,8 +2291,8 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[FLOW_EVENT_INVALID]); } jsonize_basic(reader_thread); - jsonize_flow(workflow, &flow->flow_extended); - jsonize_l3_l4(workflow, &flow->flow_extended.flow_basic); + jsonize_flow(workflow, flow_ext); + jsonize_l3_l4(workflow, &flow_ext->flow_basic); switch (event) { @@ -2180,6 +2303,7 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, case FLOW_EVENT_NEW: case FLOW_EVENT_END: case FLOW_EVENT_IDLE: + case FLOW_EVENT_UPDATE: ndpi_serialize_string_int32(&workflow->ndpi_serializer, "flow_datalink", pcap_datalink(reader_thread->workflow->pcap_handle)); @@ -2190,29 +2314,79 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, case FLOW_EVENT_NOT_DETECTED: case FLOW_EVENT_GUESSED: + case FLOW_EVENT_DETECTED: + case FLOW_EVENT_DETECTION_UPDATE: + syslog(LOG_DAEMON | LOG_ERR, + "[%8llu, %4llu] internal error / invalid function call", + workflow->packets_captured, + flow_ext->flow_id); + break; + } + + serialize_and_send(reader_thread); +} + +static void jsonize_flow_detection_event(struct nDPId_reader_thread * const reader_thread, + struct nDPId_flow_info * const flow_info, + enum flow_event event) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + char const ev[] = "flow_event_name"; + + ndpi_serialize_string_int32(&workflow->ndpi_serializer, "flow_event_id", event); + if (event > FLOW_EVENT_INVALID && event < FLOW_EVENT_COUNT) + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[event]); + } + else + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[FLOW_EVENT_INVALID]); + } + jsonize_basic(reader_thread); + jsonize_flow(workflow, &flow_info->flow_extended); + jsonize_l3_l4(workflow, &flow_info->flow_extended.flow_basic); + + switch (event) + { + case FLOW_EVENT_INVALID: + case FLOW_EVENT_COUNT: + break; + + case FLOW_EVENT_NEW: + case FLOW_EVENT_END: + case FLOW_EVENT_IDLE: + case FLOW_EVENT_UPDATE: + syslog(LOG_DAEMON | LOG_ERR, + "[%8llu, %4llu] internal error / invalid function call", + workflow->packets_captured, + flow_info->flow_extended.flow_id); + break; + + case FLOW_EVENT_NOT_DETECTED: + case FLOW_EVENT_GUESSED: if (ndpi_dpi2json(workflow->ndpi_struct, - &flow->detection_data->flow, - flow->detection_data->guessed_l7_protocol, + &flow_info->detection_data->flow, + flow_info->detection_data->guessed_l7_protocol, &workflow->ndpi_serializer) != 0) { syslog(LOG_DAEMON | LOG_ERR, - "[%8llu, %4u] ndpi_dpi2json failed for not-detected/guessed flow", + "[%8llu, %4llu] ndpi_dpi2json failed for not-detected/guessed flow", workflow->packets_captured, - flow->flow_extended.flow_id); + flow_info->flow_extended.flow_id); } break; case FLOW_EVENT_DETECTED: case FLOW_EVENT_DETECTION_UPDATE: if (ndpi_dpi2json(workflow->ndpi_struct, - &flow->detection_data->flow, - flow->detection_data->detected_l7_protocol, + &flow_info->detection_data->flow, + flow_info->detection_data->detected_l7_protocol, &workflow->ndpi_serializer) != 0) { syslog(LOG_DAEMON | LOG_ERR, - "[%8llu, %4u] ndpi_dpi2json failed for detected/detection-update flow", + "[%8llu, %4llu] ndpi_dpi2json failed for detected/detection-update flow", workflow->packets_captured, - flow->flow_extended.flow_id); + flow_info->flow_extended.flow_id); } break; } @@ -2465,16 +2639,20 @@ static uint32_t calculate_ndpi_flow_struct_hash(struct ndpi_flow_struct const * hash += ndpi_flow->dst->detected_protocol_bitmask.fds_bits[i]; } - /* FIXME: Adding the first character and the length of the host/server name is not stable, but seems to work. */ - hash += ndpi_flow->host_server_name[0]; - hash += strnlen((const char *)ndpi_flow->host_server_name, sizeof(ndpi_flow->host_server_name)); + size_t host_server_name_len = + strnlen((const char *)ndpi_flow->host_server_name, sizeof(ndpi_flow->host_server_name)); + hash += host_server_name_len; + hash += murmur3_32((uint8_t const *)&ndpi_flow->host_server_name, + sizeof(ndpi_flow->host_server_name), + nDPId_FLOW_STRUCT_SEED); return hash; } +/* Some constants stolen from ndpiReader. */ #define SNAP 0xaa /* mask for FCF */ -#define WIFI_DATA 0x2 /* 0000 0010 */ +#define WIFI_DATA 0x2 #define FCF_TYPE(fc) (((fc) >> 2) & 0x3) /* 0000 0011 = 0x3 */ #define FCF_TO_DS(fc) ((fc)&0x0100) #define FCF_FROM_DS(fc) ((fc)&0x0200) @@ -2731,6 +2909,26 @@ static struct nDPId_flow_basic * add_new_flow(struct nDPId_workflow * const work return flow_basic; } +static void do_periodically_work(struct nDPId_reader_thread * const reader_thread) +{ + if (reader_thread->workflow->last_scan_time + nDPId_options.flow_scan_interval <= + reader_thread->workflow->last_time) + { + check_for_idle_flows(reader_thread); + check_for_flow_updates(reader_thread); + reader_thread->workflow->last_scan_time = reader_thread->workflow->last_time; + } +#ifdef ENABLE_MEMORY_PROFILING + if (reader_thread->workflow->last_memory_usage_log_time + nDPId_options.memory_profiling_log_interval <= + reader_thread->workflow->last_time) + { + log_memory_usage(reader_thread); + log_flows(reader_thread); + reader_thread->workflow->last_memory_usage_log_time = reader_thread->workflow->last_time; + } +#endif +} + static void ndpi_process_packet(uint8_t * const args, struct pcap_pkthdr const * const header, uint8_t const * const packet) @@ -2776,17 +2974,12 @@ static void ndpi_process_packet(uint8_t * const args, workflow->packets_captured++; time_ms = ((uint64_t)header->ts.tv_sec) * nDPId_options.tick_resolution + header->ts.tv_usec / (1000000 / nDPId_options.tick_resolution); - workflow->last_time = time_ms; - - check_for_idle_flows(reader_thread); -#ifdef ENABLE_MEMORY_PROFILING - if (workflow->last_memory_usage_log_time + nDPId_options.memory_profiling_print_every < workflow->last_time) + if (workflow->last_time < time_ms) { - log_memory_usage(reader_thread); - log_flows(reader_thread); - workflow->last_memory_usage_log_time = workflow->last_time; + workflow->last_time = time_ms; } -#endif + + do_periodically_work(reader_thread); if (process_datalink_layer(reader_thread, header, packet, &ip_offset, &type) != 0) { @@ -2949,6 +3142,11 @@ static void ndpi_process_packet(uint8_t * const args, flow_basic.src_port = ntohs(udp->source); flow_basic.dst_port = ntohs(udp->dest); } + else + { + /* Use layer4 length returned from libnDPI. */ + l4_payload_len = l4_len; + } /* distribute flows to threads while keeping stability (same flow goes always to same thread) */ thread_index += (flow_basic.src_port < flow_basic.dst_port ? flow_basic.dst_port : flow_basic.src_port); @@ -2957,8 +3155,6 @@ static void ndpi_process_packet(uint8_t * const args, { return; } - workflow->packets_processed++; - workflow->total_l4_data_len += l4_len; /* calculate flow hash for btree find, search(insert) */ switch (flow_basic.l3_type) @@ -3142,8 +3338,8 @@ static void ndpi_process_packet(uint8_t * const args, struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result; /* Update last seen timestamp for timeout handling. */ - flow_basic_to_process->last_seen = time_ms; - /* TCP-FIN: indicates that at least one side wants to end the connection (timeout handling as well) */ + flow_basic_to_process->last_seen = workflow->last_time; + /* TCP-FIN: indicates that at least one side wants to end the connection. */ if (flow_basic.tcp_fin_rst_seen != 0) { flow_basic_to_process->tcp_fin_rst_seen = 1; @@ -3152,11 +3348,10 @@ static void ndpi_process_packet(uint8_t * const args, switch (flow_basic_to_process->type) { case FT_UNKNOWN: - return; case FT_SKIPPED: return; + case FT_FINISHED: - return; case FT_INFO: break; } @@ -3169,7 +3364,7 @@ static void ndpi_process_packet(uint8_t * const args, if (ret <= 0) { syslog(LOG_DAEMON | LOG_ERR, - "zLib decompression failed for existing flow %u with error code: %d", + "zLib decompression failed for existing flow %llu with error code: %d", flow_to_process->flow_extended.flow_id, ret); return; @@ -3177,23 +3372,35 @@ static void ndpi_process_packet(uint8_t * const args, } #endif - if (direction_changed != 0) + if (flow_to_process->detection_data != NULL) { - ndpi_src = &flow_to_process->detection_data->dst; - ndpi_dst = &flow_to_process->detection_data->src; + if (direction_changed != 0) + { + ndpi_src = &flow_to_process->detection_data->dst; + ndpi_dst = &flow_to_process->detection_data->src; + } + else + { + ndpi_src = &flow_to_process->detection_data->src; + ndpi_dst = &flow_to_process->detection_data->dst; + } } else { - ndpi_src = &flow_to_process->detection_data->src; - ndpi_dst = &flow_to_process->detection_data->dst; + ndpi_src = NULL; + ndpi_dst = NULL; } } flow_to_process->flow_extended.packets_processed++; flow_to_process->flow_extended.total_l4_payload_len += l4_payload_len; + workflow->packets_processed++; + workflow->total_l4_data_len += l4_payload_len; + if (flow_to_process->flow_extended.first_seen == 0) { - flow_to_process->flow_extended.first_seen = flow_to_process->flow_extended.flow_basic.last_seen = time_ms; + flow_to_process->flow_extended.first_seen = flow_to_process->flow_extended.flow_basic.last_seen = + flow_to_process->flow_extended.last_flow_update = workflow->last_time; } if (l4_payload_len > flow_to_process->flow_extended.max_l4_payload_len) { @@ -3204,11 +3411,17 @@ static void ndpi_process_packet(uint8_t * const args, flow_to_process->flow_extended.min_l4_payload_len = l4_payload_len; } + if (flow_to_process->flow_extended.flow_basic.type != FT_INFO) + { + /* Only FT_INFO goes through the whole detection process. */ + return; + } + if (is_new_flow != 0) { flow_to_process->flow_extended.max_l4_payload_len = l4_payload_len; flow_to_process->flow_extended.min_l4_payload_len = l4_payload_len; - jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_NEW); + jsonize_flow_event(reader_thread, &flow_to_process->flow_extended, FLOW_EVENT_NEW); } jsonize_packet_event(reader_thread, @@ -3225,7 +3438,8 @@ static void ndpi_process_packet(uint8_t * const args, { if (flow_to_process->detection_completed != 0) { - jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE); + reader_thread->workflow->total_flow_detection_updates++; + jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE); } else { @@ -3235,11 +3449,13 @@ static void ndpi_process_packet(uint8_t * const args, workflow->ndpi_struct, &flow_to_process->detection_data->flow, 1, &protocol_was_guessed); if (protocol_was_guessed != 0) { - jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED); + workflow->total_guessed_flows++; + jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED); } else { - jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_NOT_DETECTED); + reader_thread->workflow->total_not_detected_flows++; + jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_NOT_DETECTED); } } } @@ -3249,7 +3465,7 @@ static void ndpi_process_packet(uint8_t * const args, &flow_to_process->detection_data->flow, ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6, ip_size, - time_ms, + workflow->last_time, ndpi_src, ndpi_dst); @@ -3257,8 +3473,8 @@ static void ndpi_process_packet(uint8_t * const args, flow_to_process->detection_completed == 0) { flow_to_process->detection_completed = 1; - workflow->detected_flow_protocols++; - jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED); + workflow->total_detected_flows++; + jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED); flow_to_process->detection_data->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(&flow_to_process->detection_data->flow); } @@ -3267,7 +3483,8 @@ static void ndpi_process_packet(uint8_t * const args, uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->detection_data->flow); if (hash != flow_to_process->detection_data->last_ndpi_flow_struct_hash) { - jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE); + workflow->total_flow_detection_updates++; + jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE); flow_to_process->detection_data->last_ndpi_flow_struct_hash = hash; } } @@ -3285,19 +3502,93 @@ static void ndpi_process_packet(uint8_t * const args, #endif } -static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread) +static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) { if (reader_thread->workflow != NULL && reader_thread->workflow->pcap_handle != NULL) { - - if (pcap_loop(reader_thread->workflow->pcap_handle, -1, &ndpi_process_packet, (uint8_t *)reader_thread) == - PCAP_ERROR) + if (reader_thread->workflow->is_pcap_file != 0) + { + switch (pcap_loop(reader_thread->workflow->pcap_handle, -1, &ndpi_process_packet, (uint8_t *)reader_thread)) + { + case PCAP_ERROR: + syslog(LOG_DAEMON | LOG_ERR, + "Error while reading pcap file: '%s'", + pcap_geterr(reader_thread->workflow->pcap_handle)); + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + return; + case PCAP_ERROR_BREAK: + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + return; + default: + return; + } + } + else { + int pcap_fd = pcap_get_selectable_fd(reader_thread->workflow->pcap_handle); + if (pcap_fd < 0) + { + syslog(LOG_DAEMON | LOG_ERR, "%s", "Got an invalid PCAP fd"); + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + return; + } - syslog(LOG_DAEMON | LOG_ERR, - "Error while reading pcap file: '%s'", - pcap_geterr(reader_thread->workflow->pcap_handle)); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + int epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (epoll_fd < 0) + { + syslog(LOG_DAEMON | LOG_ERR, "Got an invalid epoll fd: %s", strerror(errno)); + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + return; + } + + struct epoll_event event = {}; + event.events = EPOLLIN; + event.data.fd = pcap_fd; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pcap_fd, &event) != 0) + { + syslog(LOG_DAEMON | LOG_ERR, + "Could not add pcap fd %d to epoll fd %d: %s", + epoll_fd, + pcap_fd, + strerror(errno)); + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + return; + } + + struct epoll_event events[32]; + size_t const events_size = sizeof(events) / sizeof(events[0]); + int const timeout_ms = 1000; /* TODO: Configurable? */ + int nready; + while (nDPId_main_thread_shutdown == 0 && processing_threads_error_or_eof() == 0) + { + nready = epoll_wait(epoll_fd, events, events_size, timeout_ms); + + if (nready == 0) + { + reader_thread->workflow->last_time += timeout_ms; + + do_periodically_work(reader_thread); + } + + for (int i = 0; i < nready; ++i) + { + switch (pcap_dispatch( + reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread)) + { + case PCAP_ERROR: + syslog(LOG_DAEMON | LOG_ERR, + "Error while reading from pcap device: '%s'", + pcap_geterr(reader_thread->workflow->pcap_handle)); + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + break; + case PCAP_ERROR_BREAK: + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + return; + default: + break; + } + } + } } } } @@ -3446,6 +3737,16 @@ static void process_remaining_flows(void) { for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { + if (fcntl(reader_threads[i].json_sockfd, + F_SETFL, + fcntl(reader_threads[i].json_sockfd, F_GETFL, 0) & ~O_NONBLOCK) == -1) + { + syslog(LOG_DAEMON | LOG_ERR, + "Could not set JSON fd %d to blocking mode for shutdown: %s", + reader_threads[i].json_sockfd, + strerror(errno)); + } + for (size_t idle_scan_index = 0; idle_scan_index < reader_threads[i].workflow->max_active_flows; ++idle_scan_index) { @@ -3456,8 +3757,6 @@ static void process_remaining_flows(void) } jsonize_daemon(&reader_threads[i], DAEMON_EVENT_SHUTDOWN); - close(reader_threads[i].json_sockfd); - reader_threads[i].json_sockfd = -1; } } @@ -3468,7 +3767,11 @@ static int stop_reader_threads(void) unsigned long long int total_flows_skipped = 0; unsigned long long int total_flows_captured = 0; unsigned long long int total_flows_idle = 0; + unsigned long long int total_not_detected = 0; + unsigned long long int total_flows_guessed = 0; unsigned long long int total_flows_detected = 0; + unsigned long long int total_flow_detection_updates = 0; + unsigned long long int total_flow_updates = 0; for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { @@ -3505,27 +3808,41 @@ static int stop_reader_threads(void) total_flows_skipped += reader_threads[i].workflow->total_skipped_flows; total_flows_captured += reader_threads[i].workflow->total_active_flows; total_flows_idle += reader_threads[i].workflow->total_idle_flows; - total_flows_detected += reader_threads[i].workflow->detected_flow_protocols; + total_not_detected += reader_threads[i].workflow->total_not_detected_flows; + total_flows_guessed += reader_threads[i].workflow->total_guessed_flows; + total_flows_detected += reader_threads[i].workflow->total_detected_flows; + total_flow_detection_updates += reader_threads[i].workflow->total_flow_detection_updates; + total_flow_updates += reader_threads[i].workflow->total_flow_updates; printf( - "Stopping Thread %d, processed %10llu packets, %12llu bytes, skipped flows: %8llu, processed flows: %8llu, " - "idle flows: %8llu, detected flows: %8llu\n", + "Stopping Thread %2d, processed %llu packets, %llu bytes\n" + "\tskipped flows.....: %8llu, processed flows: %8llu, idle flows....: %8llu\n" + "\tnot detected flows: %8llu, guessed flows..: %8llu, detected flows: %8llu\n" + "\tdetection updates.: %8llu, updated flows..: %8llu\n", reader_threads[i].array_index, reader_threads[i].workflow->packets_processed, reader_threads[i].workflow->total_l4_data_len, reader_threads[i].workflow->total_skipped_flows, reader_threads[i].workflow->total_active_flows, reader_threads[i].workflow->total_idle_flows, - reader_threads[i].workflow->detected_flow_protocols); + reader_threads[i].workflow->total_not_detected_flows, + reader_threads[i].workflow->total_guessed_flows, + reader_threads[i].workflow->total_detected_flows, + reader_threads[i].workflow->total_flow_detection_updates, + reader_threads[i].workflow->total_flow_updates); } /* total packets captured: same value for all threads as packet2thread distribution happens later */ - printf("Total packets captured.: %llu\n", reader_threads[0].workflow->packets_captured); - printf("Total packets processed: %llu\n", total_packets_processed); - printf("Total layer4 data size.: %llu\n", total_l4_data_len); - printf("Total flows ignopred...: %llu\n", total_flows_skipped); - printf("Total flows processed..: %llu\n", total_flows_captured); - printf("Total flows timed out..: %llu\n", total_flows_idle); - printf("Total flows detected...: %llu\n", total_flows_detected); + printf("Total packets captured.......: %llu\n", reader_threads[0].workflow->packets_captured); + printf("Total packets processed......: %llu\n", total_packets_processed); + printf("Total layer4 data size.......: %llu\n", total_l4_data_len); + printf("Total flows ignopred.........: %llu\n", total_flows_skipped); + printf("Total flows processed........: %llu\n", total_flows_captured); + printf("Total flows timed out........: %llu\n", total_flows_idle); + printf("Total flows detected.........: %llu\n", total_flows_detected); + printf("Total flows guessed..........: %llu\n", total_flows_guessed); + printf("Total flows not detected.....: %llu\n", total_not_detected); + printf("Total flow updates...........: %llu\n", total_flow_updates); + printf("Total flow detections updates: %llu\n", total_flow_detection_updates); return 0; } @@ -3579,22 +3896,22 @@ static void print_subopt_usage(void) case MAX_READER_THREADS: fprintf(stderr, "%llu\n", nDPId_options.reader_thread_count); break; - case IDLE_SCAN_PERIOD: - fprintf(stderr, "%llu\n", nDPId_options.idle_scan_period); - break; #ifdef ENABLE_MEMORY_PROFILING - case MEMORY_PROFILING_PRINT_EVERY: - fprintf(stderr, "%llu\n", nDPId_options.memory_profiling_print_every); + case MEMORY_PROFILING_LOG_INTERVAL: + fprintf(stderr, "%llu\n", nDPId_options.memory_profiling_log_interval); break; #endif #ifdef ENABLE_ZLIB - case COMPRESSION_SCAN_PERIOD: - fprintf(stderr, "%llu\n", nDPId_options.compression_scan_period); + case COMPRESSION_SCAN_INTERVAL: + fprintf(stderr, "%llu\n", nDPId_options.compression_scan_interval); break; case COMPRESSION_FLOW_INACTIVITY: fprintf(stderr, "%llu\n", nDPId_options.compression_flow_inactivity); break; #endif + case FLOW_SCAN_INTVERAL: + fprintf(stderr, "%llu\n", nDPId_options.flow_scan_interval); + break; case GENERIC_MAX_IDLE_TIME: fprintf(stderr, "%llu\n", nDPId_options.generic_max_idle_time); break; @@ -3786,22 +4103,22 @@ static int nDPId_parse_options(int argc, char ** argv) case MAX_READER_THREADS: nDPId_options.reader_thread_count = value_llu; break; - case IDLE_SCAN_PERIOD: - nDPId_options.idle_scan_period = value_llu; - break; #ifdef ENABLE_MEMORY_PROFILING - case MEMORY_PROFILING_PRINT_EVERY: - nDPId_options.memory_profiling_print_every = value_llu; + case MEMORY_PROFILING_LOG_INTERVAL: + nDPId_options.memory_profiling_log_interval = value_llu; break; #endif #ifdef ENABLE_ZLIB - case COMPRESSION_SCAN_PERIOD: - nDPId_options.compression_scan_period = value_llu; + case COMPRESSION_SCAN_INTERVAL: + nDPId_options.compression_scan_interval = value_llu; break; case COMPRESSION_FLOW_INACTIVITY: nDPId_options.compression_flow_inactivity = value_llu; break; #endif + case FLOW_SCAN_INTVERAL: + nDPId_options.flow_scan_interval = value_llu; + break; case GENERIC_MAX_IDLE_TIME: nDPId_options.generic_max_idle_time = value_llu; break; @@ -3856,10 +4173,11 @@ static int validate_options(char const * const arg0) #ifdef ENABLE_ZLIB if (nDPId_options.enable_zlib_compression != 0) { - if (nDPId_options.compression_flow_inactivity < 10000 || nDPId_options.compression_scan_period < 10000) + if (nDPId_options.compression_flow_inactivity < 10000 || nDPId_options.compression_scan_interval < 10000) { fprintf(stderr, - "%s: Setting compression-scan-period / compression-flow-activity to values lower than 10000 is not " + "%s: Setting compression-scan-interval / compression-flow-activity to values lower than 10000 is " + "not " "recommended.\n" "%s: Your CPU usage may increase heavily.\n", arg0, @@ -3927,23 +4245,50 @@ static int validate_options(char const * const arg0) nDPId_MAX_READER_THREADS); retval = 1; } - if (nDPId_options.idle_scan_period < 1000) + if (nDPId_options.flow_scan_interval < 1000) { fprintf(stderr, - "%s: Value not in range: idle-scan-period[%llu] > 1000\n", + "%s: Value not in range: idle-scan-interval[%llu] > 1000\n", arg0, - nDPId_options.idle_scan_period); + nDPId_options.flow_scan_interval); retval = 1; } - if (nDPId_options.tcp_max_post_end_flow_time > nDPId_options.tcp_max_idle_time) + if (nDPId_options.flow_scan_interval >= nDPId_options.generic_max_idle_time) { fprintf(stderr, - "%s: Value not in range: tcp-max-post-end-flow-time[%llu] < tcp-max-idle-time[%llu]\n", + "%s: Value not in range: flow-scan-interval[%llu] < generic-max-idle-time[%llu]\n", arg0, - nDPId_options.tcp_max_post_end_flow_time, + nDPId_options.flow_scan_interval, + nDPId_options.generic_max_idle_time); + retval = 1; + } + if (nDPId_options.flow_scan_interval >= nDPId_options.icmp_max_idle_time) + { + fprintf(stderr, + "%s: Value not in range: flow-scan-interval[%llu] < icmp-max-idle-time[%llu]\n", + arg0, + nDPId_options.flow_scan_interval, + nDPId_options.icmp_max_idle_time); + retval = 1; + } + if (nDPId_options.flow_scan_interval >= nDPId_options.tcp_max_idle_time) + { + fprintf(stderr, + "%s: Value not in range: flow-scan-interval[%llu] < generic-max-idle-time[%llu]\n", + arg0, + nDPId_options.flow_scan_interval, nDPId_options.tcp_max_idle_time); retval = 1; } + if (nDPId_options.flow_scan_interval >= nDPId_options.udp_max_idle_time) + { + fprintf(stderr, + "%s: Value not in range:flow-scan-interval[%llu] < udp-max-idle-time[%llu]\n", + arg0, + nDPId_options.flow_scan_interval, + nDPId_options.udp_max_idle_time); + retval = 1; + } if (nDPId_options.process_internal_initial_direction != 0 && nDPId_options.process_external_initial_direction != 0) { fprintf(stderr, @@ -3961,11 +4306,17 @@ static int validate_options(char const * const arg0) if (nDPId_options.max_packets_per_flow_to_process < 1 || nDPId_options.max_packets_per_flow_to_process > 65535) { fprintf(stderr, - "%s: Value not in range: 1 =< max_packets_per_flow_to_process[%llu] =< 65535\n", + "%s: Value not in range: 1 =< max-packets-per-flow-to-process[%llu] =< 65535\n", arg0, nDPId_options.max_packets_per_flow_to_process); retval = 1; } + if (nDPId_options.max_packets_per_flow_to_send > 30) + { + fprintf(stderr, + "%s: Higher values of max-packets-per-flow-to-send may cause superfluous network usage.\n", + arg0); + } return retval; } |