diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2022-03-06 17:31:26 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-03-06 17:38:05 +0100 |
commit | 46f68501d575431656b5254a4bda8acb2982ab77 (patch) | |
tree | 030c68ea408f61de131b93a51b1394648c4a7b85 /nDPId.c | |
parent | 9db048c9d93a00adf4b258d2341b24229d2a45a1 (diff) |
Added daemon event: DAEMON_EVENT_STATUS (periodically send's daemon statistics.)
* Improved distributor timeout handling (per-thread).
* flow-info.py / flow-dash.py: Distinguish between flow risk severities.
* nDPId: Skip tag switch datalink packet dissection / processing.
* nDPId: Fixed incorrect value for current active flows.
* Improved JSON schema's.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r-- | nDPId.c | 214 |
1 files changed, 152 insertions, 62 deletions
@@ -8,6 +8,7 @@ #include <ndpi_api.h> #include <ndpi_main.h> #include <ndpi_typedefs.h> +#include <pcap/dlt.h> #include <pcap/pcap.h> #include <pthread.h> #include <signal.h> @@ -98,7 +99,8 @@ struct nDPId_flow_basic uint8_t l4_protocol; uint8_t tcp_fin_rst_seen : 1; uint8_t tcp_is_midstream_flow : 1; - uint8_t reserved_00 : 6; + uint8_t idled : 1; + uint8_t reserved_00 : 5; uint8_t reserved_01[2]; uint16_t src_port; uint16_t dst_port; @@ -201,7 +203,9 @@ struct nDPId_workflow uint64_t last_compression_scan_time; #endif uint64_t last_scan_time; - uint64_t last_time; + uint64_t last_status_time; + uint64_t last_global_time; + uint64_t last_thread_time; void ** ndpi_flows_active; unsigned long long int max_active_flows; @@ -290,6 +294,7 @@ enum daemon_event DAEMON_EVENT_INIT, DAEMON_EVENT_RECONNECT, DAEMON_EVENT_SHUTDOWN, + DAEMON_EVENT_STATUS, DAEMON_EVENT_COUNT }; @@ -335,6 +340,7 @@ static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = { [DAEMON_EVENT_INIT] = "init", [DAEMON_EVENT_RECONNECT] = "reconnect", [DAEMON_EVENT_SHUTDOWN] = "shutdown", + [DAEMON_EVENT_STATUS] = "status", }; static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {}; @@ -382,6 +388,7 @@ static struct unsigned long long int max_idle_flows_per_thread; unsigned long long int tick_resolution; unsigned long long int reader_thread_count; + unsigned long long int daemon_status_interval; #ifdef ENABLE_MEMORY_PROFILING unsigned long long int memory_profiling_log_interval; #endif @@ -404,6 +411,7 @@ static struct .max_idle_flows_per_thread = nDPId_MAX_IDLE_FLOWS_PER_THREAD / 2, .tick_resolution = nDPId_TICK_RESOLUTION, .reader_thread_count = nDPId_MAX_READER_THREADS / 2, + .daemon_status_interval = nDPId_DAEMON_STATUS_INTERVAL, #ifdef ENABLE_MEMORY_PROFILING .memory_profiling_log_interval = nDPId_MEMORY_PROFILING_LOG_INTERVAL, #endif @@ -426,6 +434,7 @@ enum nDPId_subopts MAX_IDLE_FLOWS_PER_THREAD, TICK_RESOLUTION, MAX_READER_THREADS, + DAEMON_STATUS_INTERVAL, #ifdef ENABLE_MEMORY_PROFILING MEMORY_PROFILING_LOG_INTERVAL, #endif @@ -446,6 +455,7 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th [MAX_IDLE_FLOWS_PER_THREAD] = "max-idle-flows-per-thread", [TICK_RESOLUTION] = "tick-resolution", [MAX_READER_THREADS] = "max-reader-threads", + [DAEMON_STATUS_INTERVAL] = "daemon-status-interval", #ifdef ENABLE_MEMORY_PROFILING [MEMORY_PROFILING_LOG_INTERVAL] = "memory-profiling-log-interval", #endif @@ -646,7 +656,7 @@ static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int de case FS_INFO: { - if (flow_basic->last_seen + nDPId_options.compression_flow_inactivity < workflow->last_time) + if (flow_basic->last_seen + nDPId_options.compression_flow_inactivity < workflow->last_thread_time) { struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic; @@ -675,14 +685,14 @@ static void check_for_compressable_flows(struct nDPId_reader_thread * const read { struct nDPId_workflow * const workflow = reader_thread->workflow; - if (workflow->last_compression_scan_time + nDPId_options.compression_scan_interval < workflow->last_time) + if (workflow->last_compression_scan_time + nDPId_options.compression_scan_interval < workflow->last_thread_time) { for (size_t comp_scan_index = 0; comp_scan_index < workflow->max_active_flows; ++comp_scan_index) { ndpi_twalk(workflow->ndpi_flows_active[comp_scan_index], ndpi_comp_scan_walker, workflow); } - workflow->last_compression_scan_time = workflow->last_time; + workflow->last_compression_scan_time = workflow->last_thread_time; } } #endif @@ -1580,22 +1590,23 @@ static int is_l4_protocol_timed_out(struct nDPId_workflow const * const workflow uint64_t sdiff = flow_basic->last_seen % nDPId_options.flow_scan_interval; uint64_t itime = get_l4_protocol_idle_time(flow_basic->l4_protocol) - sdiff; - return (flow_basic->last_seen + itime <= workflow->last_time) || + return (flow_basic->last_seen + itime <= workflow->last_thread_time) || (flow_basic->tcp_fin_rst_seen == 1 && - flow_basic->last_seen + nDPId_options.tcp_max_post_end_flow_time - sdiff <= workflow->last_time); + flow_basic->last_seen + nDPId_options.tcp_max_post_end_flow_time - sdiff <= workflow->last_thread_time); } static int is_flow_update_required(struct nDPId_workflow const * const workflow, struct nDPId_flow_extended const * const flow_ext) { - uint64_t itime = get_l4_protocol_idle_time(flow_ext->flow_basic.l4_protocol) / 2; + uint64_t sdiff = flow_ext->flow_basic.last_seen % nDPId_options.flow_scan_interval; + uint64_t itime = get_l4_protocol_idle_time(flow_ext->flow_basic.l4_protocol) - sdiff; - if (is_l4_protocol_timed_out(workflow, &flow_ext->flow_basic) != 0) + if (flow_ext->flow_basic.idled != 0) { return 0; } - return flow_ext->last_flow_update + itime <= workflow->last_time; + return flow_ext->last_flow_update + itime <= workflow->last_thread_time; } static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) @@ -1619,6 +1630,7 @@ static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int de { if (is_l4_protocol_timed_out(workflow, flow_basic) != 0) { + flow_basic->idled = 1; workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow_basic; switch (flow_basic->state) { @@ -1799,7 +1811,7 @@ static void ndpi_flow_update_scan_walker(void const * const A, ndpi_VISIT which, { workflow->total_flow_updates++; jsonize_flow_event(reader_thread, flow_ext, FLOW_EVENT_UPDATE); - flow_ext->last_flow_update = workflow->last_time; + flow_ext->last_flow_update = workflow->last_thread_time; } break; } @@ -1900,6 +1912,11 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu char const ev[] = "daemon_event_name"; struct nDPId_workflow * const workflow = reader_thread->workflow; + if (event == DAEMON_EVENT_RECONNECT) + { + ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer); + } + ndpi_serialize_string_int32(&workflow->ndpi_serializer, "daemon_event_id", event); if (event > DAEMON_EVENT_INVALID && event < DAEMON_EVENT_COUNT) { @@ -1912,45 +1929,82 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu jsonize_basic(reader_thread); - if (event == DAEMON_EVENT_INIT) - { - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, - "max-flows-per-thread", - nDPId_options.max_flows_per_thread); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, - "max-idle-flows-per-thread", - nDPId_options.max_idle_flows_per_thread); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "tick-resolution", nDPId_options.tick_resolution); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, - "reader-thread-count", - nDPId_options.reader_thread_count); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, - "flow-scan-interval", - nDPId_options.flow_scan_interval); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, - "generic-max-idle-time", - nDPId_options.generic_max_idle_time); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, - "icmp-max-idle-time", - nDPId_options.icmp_max_idle_time); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "udp-max-idle-time", nDPId_options.udp_max_idle_time); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, - "tcp-max-idle-time", - nDPId_options.tcp_max_idle_time + nDPId_options.tcp_max_post_end_flow_time); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, - "max-packets-per-flow-to-send", - nDPId_options.max_packets_per_flow_to_send); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, - "max-packets-per-flow-to-process", - nDPId_options.max_packets_per_flow_to_process); - } - else if (event == DAEMON_EVENT_SHUTDOWN) + switch (event) { - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, - "total-events-serialized", - workflow->total_events_serialized + - 1 /* DAEMON_EVENT_SHUTDOWN is an event as well */); + case DAEMON_EVENT_INVALID: + case DAEMON_EVENT_COUNT: + break; + + case DAEMON_EVENT_INIT: + case DAEMON_EVENT_RECONNECT: + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-flows-per-thread", + nDPId_options.max_flows_per_thread); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-idle-flows-per-thread", + nDPId_options.max_idle_flows_per_thread); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "tick-resolution", nDPId_options.tick_resolution); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "reader-thread-count", + nDPId_options.reader_thread_count); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow-scan-interval", + nDPId_options.flow_scan_interval); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "generic-max-idle-time", + nDPId_options.generic_max_idle_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "icmp-max-idle-time", + nDPId_options.icmp_max_idle_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "udp-max-idle-time", + nDPId_options.udp_max_idle_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "tcp-max-idle-time", + nDPId_options.tcp_max_idle_time + nDPId_options.tcp_max_post_end_flow_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-packets-per-flow-to-send", + nDPId_options.max_packets_per_flow_to_send); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-packets-per-flow-to-process", + nDPId_options.max_packets_per_flow_to_process); + break; + + case DAEMON_EVENT_STATUS: + case DAEMON_EVENT_SHUTDOWN: + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "packets-captured", workflow->packets_captured); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "packets-processed", workflow->packets_processed); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-skipped-flows", + workflow->total_skipped_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-l4-data-len", workflow->total_l4_data_len); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-not-detected-flows", + workflow->total_not_detected_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-guessed-flows", + workflow->total_guessed_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-detected-flows", + workflow->total_detected_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-detection-updates", + workflow->total_flow_detection_updates); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-updates", workflow->total_flow_updates); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "current-active-flows", + workflow->cur_active_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-active-flows", + workflow->total_active_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-idle-flows", workflow->total_idle_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-events-serialized", + workflow->total_events_serialized + + 1 /* DAEMON_EVENT_SHUTDOWN is an event as well */); + break; } + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_msec", workflow->last_global_time); serialize_and_send(reader_thread); } @@ -1975,7 +2029,7 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl ? flow_ext->total_l4_payload_len / flow_ext->packets_processed : 0)); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow_ext->flow_basic.tcp_is_midstream_flow); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "ts_msec", workflow->last_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_msec", workflow->last_thread_time); } static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thread) @@ -2301,7 +2355,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_offset", pkt_l4_offset); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_len", header->caplen); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_len", pkt_l4_len); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "ts_msec", workflow->last_time); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_msec", workflow->last_thread_time); if (base64_retval == 0 && base64_data_len > 0) { @@ -2633,6 +2687,7 @@ __attribute__((format(printf, 3, 4))) static void jsonize_basic_eventf(struct nD va_end(ap); } + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_msec", workflow->last_global_time); serialize_and_send(reader_thread); } @@ -3012,6 +3067,11 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre *layer3_type = ETH_P_IPV6; *ip_offset = 0; break; + /* Switch tag datalinks are not supported for now. */ + case DLT_DSA_TAG_DSA: + return 1; + case DLT_DSA_TAG_EDSA: + return 1; default: jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf( @@ -3061,25 +3121,32 @@ static struct nDPId_flow_basic * add_new_flow(struct nDPId_workflow * const work return NULL; } + workflow->cur_active_flows++; return flow_basic; } static void do_periodically_work(struct nDPId_reader_thread * const reader_thread) { if (reader_thread->workflow->last_scan_time + nDPId_options.flow_scan_interval <= - reader_thread->workflow->last_time) + reader_thread->workflow->last_global_time) { check_for_idle_flows(reader_thread); check_for_flow_updates(reader_thread); - reader_thread->workflow->last_scan_time = reader_thread->workflow->last_time; + reader_thread->workflow->last_scan_time = reader_thread->workflow->last_global_time; + } + if (reader_thread->workflow->last_status_time + nDPId_options.daemon_status_interval + + reader_thread->array_index * 1000 <= reader_thread->workflow->last_global_time) + { + jsonize_daemon(reader_thread, DAEMON_EVENT_STATUS); + reader_thread->workflow->last_status_time = reader_thread->workflow->last_global_time + reader_thread->array_index * 1000; } #ifdef ENABLE_MEMORY_PROFILING if (reader_thread->workflow->last_memory_usage_log_time + nDPId_options.memory_profiling_log_interval <= - reader_thread->workflow->last_time) + reader_thread->workflow->last_global_time) { log_memory_usage(reader_thread); log_flows(reader_thread); - reader_thread->workflow->last_memory_usage_log_time = reader_thread->workflow->last_time; + reader_thread->workflow->last_memory_usage_log_time = reader_thread->workflow->last_global_time; } #endif } @@ -3126,9 +3193,9 @@ static void ndpi_process_packet(uint8_t * const args, workflow->packets_captured++; time_ms = ((uint64_t)header->ts.tv_sec) * nDPId_options.tick_resolution + header->ts.tv_usec / (1000000 / nDPId_options.tick_resolution); - if (workflow->last_time < time_ms) + if (workflow->last_global_time < time_ms) { - workflow->last_time = time_ms; + workflow->last_global_time = time_ms; } do_periodically_work(reader_thread); @@ -3310,6 +3377,11 @@ static void ndpi_process_packet(uint8_t * const args, return; } + if (workflow->last_thread_time < time_ms) + { + workflow->last_thread_time = time_ms; + } + /* calculate flow hash for btree find, search(insert) */ switch (flow_basic.l3_type) { @@ -3446,7 +3518,7 @@ static void ndpi_process_packet(uint8_t * const args, MAX_FLOW_TO_TRACK, "%s%llu %s%llu %s%llu %s%llu", "current_active", - workflow->max_active_flows, + workflow->cur_active_flows, "current_idle", workflow->cur_idle_flows, "max_active", @@ -3466,7 +3538,6 @@ static void ndpi_process_packet(uint8_t * const args, return; } - workflow->cur_active_flows++; workflow->total_active_flows++; flow_to_process->flow_extended.flow_id = __sync_fetch_and_add(&global_flow_id, 1); @@ -3487,7 +3558,7 @@ static void ndpi_process_packet(uint8_t * const args, struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result; /* Update last seen timestamp for timeout handling. */ - flow_basic_to_process->last_seen = workflow->last_time; + flow_basic_to_process->last_seen = workflow->last_thread_time; /* TCP-FIN/TCP-RST: indicates that at least one side wants to end the connection. */ if (flow_basic.tcp_fin_rst_seen != 0) { @@ -3535,7 +3606,7 @@ static void ndpi_process_packet(uint8_t * const args, if (flow_to_process->flow_extended.first_seen == 0) { flow_to_process->flow_extended.first_seen = flow_to_process->flow_extended.flow_basic.last_seen = - flow_to_process->flow_extended.last_flow_update = workflow->last_time; + flow_to_process->flow_extended.last_flow_update = workflow->last_thread_time; } if (l4_payload_len > flow_to_process->flow_extended.max_l4_payload_len) { @@ -3600,7 +3671,7 @@ static void ndpi_process_packet(uint8_t * const args, &flow_to_process->detection_data->flow, ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6, ip_size, - workflow->last_time); + workflow->last_thread_time); if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_to_process->flow_extended.detected_l7_protocol) != 0 && flow_to_process->detection_completed == 0) @@ -3705,17 +3776,30 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) int nready; while (nDPId_main_thread_shutdown == 0 && processing_threads_error_or_eof() == 0) { + errno = 0; nready = epoll_wait(epoll_fd, events, events_size, timeout_ms); + if (errno != 0) + { + logger(1, "Epoll returned error: %s", strerror(errno)); + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + break; + } if (nready == 0) { - reader_thread->workflow->last_time += timeout_ms; + reader_thread->workflow->last_thread_time += timeout_ms; do_periodically_work(reader_thread); } for (int i = 0; i < nready; ++i) { + if ((events[i].events & EPOLLERR) != 0) + { + logger(1, "%s", "Epoll error event"); + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + } + switch (pcap_dispatch( reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread)) { @@ -4049,6 +4133,9 @@ static void print_subopt_usage(void) case MAX_READER_THREADS: fprintf(stderr, "%llu\n", nDPId_options.reader_thread_count); break; + case DAEMON_STATUS_INTERVAL: + fprintf(stderr, "%llu\n", nDPId_options.daemon_status_interval); + break; #ifdef ENABLE_MEMORY_PROFILING case MEMORY_PROFILING_LOG_INTERVAL: fprintf(stderr, "%llu\n", nDPId_options.memory_profiling_log_interval); @@ -4254,6 +4341,9 @@ static int nDPId_parse_options(int argc, char ** argv) case MAX_READER_THREADS: nDPId_options.reader_thread_count = value_llu; break; + case DAEMON_STATUS_INTERVAL: + nDPId_options.daemon_status_interval = value_llu; + break; #ifdef ENABLE_MEMORY_PROFILING case MEMORY_PROFILING_LOG_INTERVAL: nDPId_options.memory_profiling_log_interval = value_llu; |