summaryrefslogtreecommitdiff
path: root/nDPId.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2022-03-06 17:31:26 +0100
committerToni Uhlig <matzeton@googlemail.com>2022-03-06 17:38:05 +0100
commit46f68501d575431656b5254a4bda8acb2982ab77 (patch)
tree030c68ea408f61de131b93a51b1394648c4a7b85 /nDPId.c
parent9db048c9d93a00adf4b258d2341b24229d2a45a1 (diff)
Added daemon event: DAEMON_EVENT_STATUS (periodically send's daemon statistics.)
* Improved distributor timeout handling (per-thread). * flow-info.py / flow-dash.py: Distinguish between flow risk severities. * nDPId: Skip tag switch datalink packet dissection / processing. * nDPId: Fixed incorrect value for current active flows. * Improved JSON schema's. Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r--nDPId.c214
1 files changed, 152 insertions, 62 deletions
diff --git a/nDPId.c b/nDPId.c
index 682f0bae0..3c27af7d3 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -8,6 +8,7 @@
#include <ndpi_api.h>
#include <ndpi_main.h>
#include <ndpi_typedefs.h>
+#include <pcap/dlt.h>
#include <pcap/pcap.h>
#include <pthread.h>
#include <signal.h>
@@ -98,7 +99,8 @@ struct nDPId_flow_basic
uint8_t l4_protocol;
uint8_t tcp_fin_rst_seen : 1;
uint8_t tcp_is_midstream_flow : 1;
- uint8_t reserved_00 : 6;
+ uint8_t idled : 1;
+ uint8_t reserved_00 : 5;
uint8_t reserved_01[2];
uint16_t src_port;
uint16_t dst_port;
@@ -201,7 +203,9 @@ struct nDPId_workflow
uint64_t last_compression_scan_time;
#endif
uint64_t last_scan_time;
- uint64_t last_time;
+ uint64_t last_status_time;
+ uint64_t last_global_time;
+ uint64_t last_thread_time;
void ** ndpi_flows_active;
unsigned long long int max_active_flows;
@@ -290,6 +294,7 @@ enum daemon_event
DAEMON_EVENT_INIT,
DAEMON_EVENT_RECONNECT,
DAEMON_EVENT_SHUTDOWN,
+ DAEMON_EVENT_STATUS,
DAEMON_EVENT_COUNT
};
@@ -335,6 +340,7 @@ static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = {
[DAEMON_EVENT_INIT] = "init",
[DAEMON_EVENT_RECONNECT] = "reconnect",
[DAEMON_EVENT_SHUTDOWN] = "shutdown",
+ [DAEMON_EVENT_STATUS] = "status",
};
static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {};
@@ -382,6 +388,7 @@ static struct
unsigned long long int max_idle_flows_per_thread;
unsigned long long int tick_resolution;
unsigned long long int reader_thread_count;
+ unsigned long long int daemon_status_interval;
#ifdef ENABLE_MEMORY_PROFILING
unsigned long long int memory_profiling_log_interval;
#endif
@@ -404,6 +411,7 @@ static struct
.max_idle_flows_per_thread = nDPId_MAX_IDLE_FLOWS_PER_THREAD / 2,
.tick_resolution = nDPId_TICK_RESOLUTION,
.reader_thread_count = nDPId_MAX_READER_THREADS / 2,
+ .daemon_status_interval = nDPId_DAEMON_STATUS_INTERVAL,
#ifdef ENABLE_MEMORY_PROFILING
.memory_profiling_log_interval = nDPId_MEMORY_PROFILING_LOG_INTERVAL,
#endif
@@ -426,6 +434,7 @@ enum nDPId_subopts
MAX_IDLE_FLOWS_PER_THREAD,
TICK_RESOLUTION,
MAX_READER_THREADS,
+ DAEMON_STATUS_INTERVAL,
#ifdef ENABLE_MEMORY_PROFILING
MEMORY_PROFILING_LOG_INTERVAL,
#endif
@@ -446,6 +455,7 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th
[MAX_IDLE_FLOWS_PER_THREAD] = "max-idle-flows-per-thread",
[TICK_RESOLUTION] = "tick-resolution",
[MAX_READER_THREADS] = "max-reader-threads",
+ [DAEMON_STATUS_INTERVAL] = "daemon-status-interval",
#ifdef ENABLE_MEMORY_PROFILING
[MEMORY_PROFILING_LOG_INTERVAL] = "memory-profiling-log-interval",
#endif
@@ -646,7 +656,7 @@ static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int de
case FS_INFO:
{
- if (flow_basic->last_seen + nDPId_options.compression_flow_inactivity < workflow->last_time)
+ if (flow_basic->last_seen + nDPId_options.compression_flow_inactivity < workflow->last_thread_time)
{
struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic;
@@ -675,14 +685,14 @@ static void check_for_compressable_flows(struct nDPId_reader_thread * const read
{
struct nDPId_workflow * const workflow = reader_thread->workflow;
- if (workflow->last_compression_scan_time + nDPId_options.compression_scan_interval < workflow->last_time)
+ if (workflow->last_compression_scan_time + nDPId_options.compression_scan_interval < workflow->last_thread_time)
{
for (size_t comp_scan_index = 0; comp_scan_index < workflow->max_active_flows; ++comp_scan_index)
{
ndpi_twalk(workflow->ndpi_flows_active[comp_scan_index], ndpi_comp_scan_walker, workflow);
}
- workflow->last_compression_scan_time = workflow->last_time;
+ workflow->last_compression_scan_time = workflow->last_thread_time;
}
}
#endif
@@ -1580,22 +1590,23 @@ static int is_l4_protocol_timed_out(struct nDPId_workflow const * const workflow
uint64_t sdiff = flow_basic->last_seen % nDPId_options.flow_scan_interval;
uint64_t itime = get_l4_protocol_idle_time(flow_basic->l4_protocol) - sdiff;
- return (flow_basic->last_seen + itime <= workflow->last_time) ||
+ return (flow_basic->last_seen + itime <= workflow->last_thread_time) ||
(flow_basic->tcp_fin_rst_seen == 1 &&
- flow_basic->last_seen + nDPId_options.tcp_max_post_end_flow_time - sdiff <= workflow->last_time);
+ flow_basic->last_seen + nDPId_options.tcp_max_post_end_flow_time - sdiff <= workflow->last_thread_time);
}
static int is_flow_update_required(struct nDPId_workflow const * const workflow,
struct nDPId_flow_extended const * const flow_ext)
{
- uint64_t itime = get_l4_protocol_idle_time(flow_ext->flow_basic.l4_protocol) / 2;
+ uint64_t sdiff = flow_ext->flow_basic.last_seen % nDPId_options.flow_scan_interval;
+ uint64_t itime = get_l4_protocol_idle_time(flow_ext->flow_basic.l4_protocol) - sdiff;
- if (is_l4_protocol_timed_out(workflow, &flow_ext->flow_basic) != 0)
+ if (flow_ext->flow_basic.idled != 0)
{
return 0;
}
- return flow_ext->last_flow_update + itime <= workflow->last_time;
+ return flow_ext->last_flow_update + itime <= workflow->last_thread_time;
}
static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
@@ -1619,6 +1630,7 @@ static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int de
{
if (is_l4_protocol_timed_out(workflow, flow_basic) != 0)
{
+ flow_basic->idled = 1;
workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow_basic;
switch (flow_basic->state)
{
@@ -1799,7 +1811,7 @@ static void ndpi_flow_update_scan_walker(void const * const A, ndpi_VISIT which,
{
workflow->total_flow_updates++;
jsonize_flow_event(reader_thread, flow_ext, FLOW_EVENT_UPDATE);
- flow_ext->last_flow_update = workflow->last_time;
+ flow_ext->last_flow_update = workflow->last_thread_time;
}
break;
}
@@ -1900,6 +1912,11 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu
char const ev[] = "daemon_event_name";
struct nDPId_workflow * const workflow = reader_thread->workflow;
+ if (event == DAEMON_EVENT_RECONNECT)
+ {
+ ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer);
+ }
+
ndpi_serialize_string_int32(&workflow->ndpi_serializer, "daemon_event_id", event);
if (event > DAEMON_EVENT_INVALID && event < DAEMON_EVENT_COUNT)
{
@@ -1912,45 +1929,82 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu
jsonize_basic(reader_thread);
- if (event == DAEMON_EVENT_INIT)
- {
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
- "max-flows-per-thread",
- nDPId_options.max_flows_per_thread);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
- "max-idle-flows-per-thread",
- nDPId_options.max_idle_flows_per_thread);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "tick-resolution", nDPId_options.tick_resolution);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
- "reader-thread-count",
- nDPId_options.reader_thread_count);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
- "flow-scan-interval",
- nDPId_options.flow_scan_interval);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
- "generic-max-idle-time",
- nDPId_options.generic_max_idle_time);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
- "icmp-max-idle-time",
- nDPId_options.icmp_max_idle_time);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "udp-max-idle-time", nDPId_options.udp_max_idle_time);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
- "tcp-max-idle-time",
- nDPId_options.tcp_max_idle_time + nDPId_options.tcp_max_post_end_flow_time);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
- "max-packets-per-flow-to-send",
- nDPId_options.max_packets_per_flow_to_send);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
- "max-packets-per-flow-to-process",
- nDPId_options.max_packets_per_flow_to_process);
- }
- else if (event == DAEMON_EVENT_SHUTDOWN)
+ switch (event)
{
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
- "total-events-serialized",
- workflow->total_events_serialized +
- 1 /* DAEMON_EVENT_SHUTDOWN is an event as well */);
+ case DAEMON_EVENT_INVALID:
+ case DAEMON_EVENT_COUNT:
+ break;
+
+ case DAEMON_EVENT_INIT:
+ case DAEMON_EVENT_RECONNECT:
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "max-flows-per-thread",
+ nDPId_options.max_flows_per_thread);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "max-idle-flows-per-thread",
+ nDPId_options.max_idle_flows_per_thread);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "tick-resolution", nDPId_options.tick_resolution);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "reader-thread-count",
+ nDPId_options.reader_thread_count);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "flow-scan-interval",
+ nDPId_options.flow_scan_interval);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "generic-max-idle-time",
+ nDPId_options.generic_max_idle_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "icmp-max-idle-time",
+ nDPId_options.icmp_max_idle_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "udp-max-idle-time",
+ nDPId_options.udp_max_idle_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "tcp-max-idle-time",
+ nDPId_options.tcp_max_idle_time + nDPId_options.tcp_max_post_end_flow_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "max-packets-per-flow-to-send",
+ nDPId_options.max_packets_per_flow_to_send);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "max-packets-per-flow-to-process",
+ nDPId_options.max_packets_per_flow_to_process);
+ break;
+
+ case DAEMON_EVENT_STATUS:
+ case DAEMON_EVENT_SHUTDOWN:
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "packets-captured", workflow->packets_captured);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "packets-processed", workflow->packets_processed);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "total-skipped-flows",
+ workflow->total_skipped_flows);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-l4-data-len", workflow->total_l4_data_len);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "total-not-detected-flows",
+ workflow->total_not_detected_flows);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "total-guessed-flows",
+ workflow->total_guessed_flows);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "total-detected-flows",
+ workflow->total_detected_flows);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "total-detection-updates",
+ workflow->total_flow_detection_updates);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-updates", workflow->total_flow_updates);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "current-active-flows",
+ workflow->cur_active_flows);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "total-active-flows",
+ workflow->total_active_flows);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-idle-flows", workflow->total_idle_flows);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "total-events-serialized",
+ workflow->total_events_serialized +
+ 1 /* DAEMON_EVENT_SHUTDOWN is an event as well */);
+ break;
}
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_msec", workflow->last_global_time);
serialize_and_send(reader_thread);
}
@@ -1975,7 +2029,7 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl
? flow_ext->total_l4_payload_len / flow_ext->packets_processed
: 0));
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow_ext->flow_basic.tcp_is_midstream_flow);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "ts_msec", workflow->last_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_msec", workflow->last_thread_time);
}
static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thread)
@@ -2301,7 +2355,7 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_offset", pkt_l4_offset);
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_len", header->caplen);
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_len", pkt_l4_len);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "ts_msec", workflow->last_time);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_msec", workflow->last_thread_time);
if (base64_retval == 0 && base64_data_len > 0)
{
@@ -2633,6 +2687,7 @@ __attribute__((format(printf, 3, 4))) static void jsonize_basic_eventf(struct nD
va_end(ap);
}
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_msec", workflow->last_global_time);
serialize_and_send(reader_thread);
}
@@ -3012,6 +3067,11 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
*layer3_type = ETH_P_IPV6;
*ip_offset = 0;
break;
+ /* Switch tag datalinks are not supported for now. */
+ case DLT_DSA_TAG_DSA:
+ return 1;
+ case DLT_DSA_TAG_EDSA:
+ return 1;
default:
jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD);
jsonize_basic_eventf(
@@ -3061,25 +3121,32 @@ static struct nDPId_flow_basic * add_new_flow(struct nDPId_workflow * const work
return NULL;
}
+ workflow->cur_active_flows++;
return flow_basic;
}
static void do_periodically_work(struct nDPId_reader_thread * const reader_thread)
{
if (reader_thread->workflow->last_scan_time + nDPId_options.flow_scan_interval <=
- reader_thread->workflow->last_time)
+ reader_thread->workflow->last_global_time)
{
check_for_idle_flows(reader_thread);
check_for_flow_updates(reader_thread);
- reader_thread->workflow->last_scan_time = reader_thread->workflow->last_time;
+ reader_thread->workflow->last_scan_time = reader_thread->workflow->last_global_time;
+ }
+ if (reader_thread->workflow->last_status_time + nDPId_options.daemon_status_interval +
+ reader_thread->array_index * 1000 <= reader_thread->workflow->last_global_time)
+ {
+ jsonize_daemon(reader_thread, DAEMON_EVENT_STATUS);
+ reader_thread->workflow->last_status_time = reader_thread->workflow->last_global_time + reader_thread->array_index * 1000;
}
#ifdef ENABLE_MEMORY_PROFILING
if (reader_thread->workflow->last_memory_usage_log_time + nDPId_options.memory_profiling_log_interval <=
- reader_thread->workflow->last_time)
+ reader_thread->workflow->last_global_time)
{
log_memory_usage(reader_thread);
log_flows(reader_thread);
- reader_thread->workflow->last_memory_usage_log_time = reader_thread->workflow->last_time;
+ reader_thread->workflow->last_memory_usage_log_time = reader_thread->workflow->last_global_time;
}
#endif
}
@@ -3126,9 +3193,9 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->packets_captured++;
time_ms = ((uint64_t)header->ts.tv_sec) * nDPId_options.tick_resolution +
header->ts.tv_usec / (1000000 / nDPId_options.tick_resolution);
- if (workflow->last_time < time_ms)
+ if (workflow->last_global_time < time_ms)
{
- workflow->last_time = time_ms;
+ workflow->last_global_time = time_ms;
}
do_periodically_work(reader_thread);
@@ -3310,6 +3377,11 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
+ if (workflow->last_thread_time < time_ms)
+ {
+ workflow->last_thread_time = time_ms;
+ }
+
/* calculate flow hash for btree find, search(insert) */
switch (flow_basic.l3_type)
{
@@ -3446,7 +3518,7 @@ static void ndpi_process_packet(uint8_t * const args,
MAX_FLOW_TO_TRACK,
"%s%llu %s%llu %s%llu %s%llu",
"current_active",
- workflow->max_active_flows,
+ workflow->cur_active_flows,
"current_idle",
workflow->cur_idle_flows,
"max_active",
@@ -3466,7 +3538,6 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
- workflow->cur_active_flows++;
workflow->total_active_flows++;
flow_to_process->flow_extended.flow_id = __sync_fetch_and_add(&global_flow_id, 1);
@@ -3487,7 +3558,7 @@ static void ndpi_process_packet(uint8_t * const args,
struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result;
/* Update last seen timestamp for timeout handling. */
- flow_basic_to_process->last_seen = workflow->last_time;
+ flow_basic_to_process->last_seen = workflow->last_thread_time;
/* TCP-FIN/TCP-RST: indicates that at least one side wants to end the connection. */
if (flow_basic.tcp_fin_rst_seen != 0)
{
@@ -3535,7 +3606,7 @@ static void ndpi_process_packet(uint8_t * const args,
if (flow_to_process->flow_extended.first_seen == 0)
{
flow_to_process->flow_extended.first_seen = flow_to_process->flow_extended.flow_basic.last_seen =
- flow_to_process->flow_extended.last_flow_update = workflow->last_time;
+ flow_to_process->flow_extended.last_flow_update = workflow->last_thread_time;
}
if (l4_payload_len > flow_to_process->flow_extended.max_l4_payload_len)
{
@@ -3600,7 +3671,7 @@ static void ndpi_process_packet(uint8_t * const args,
&flow_to_process->detection_data->flow,
ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6,
ip_size,
- workflow->last_time);
+ workflow->last_thread_time);
if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_to_process->flow_extended.detected_l7_protocol) != 0 &&
flow_to_process->detection_completed == 0)
@@ -3705,17 +3776,30 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
int nready;
while (nDPId_main_thread_shutdown == 0 && processing_threads_error_or_eof() == 0)
{
+ errno = 0;
nready = epoll_wait(epoll_fd, events, events_size, timeout_ms);
+ if (errno != 0)
+ {
+ logger(1, "Epoll returned error: %s", strerror(errno));
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ break;
+ }
if (nready == 0)
{
- reader_thread->workflow->last_time += timeout_ms;
+ reader_thread->workflow->last_thread_time += timeout_ms;
do_periodically_work(reader_thread);
}
for (int i = 0; i < nready; ++i)
{
+ if ((events[i].events & EPOLLERR) != 0)
+ {
+ logger(1, "%s", "Epoll error event");
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ }
+
switch (pcap_dispatch(
reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread))
{
@@ -4049,6 +4133,9 @@ static void print_subopt_usage(void)
case MAX_READER_THREADS:
fprintf(stderr, "%llu\n", nDPId_options.reader_thread_count);
break;
+ case DAEMON_STATUS_INTERVAL:
+ fprintf(stderr, "%llu\n", nDPId_options.daemon_status_interval);
+ break;
#ifdef ENABLE_MEMORY_PROFILING
case MEMORY_PROFILING_LOG_INTERVAL:
fprintf(stderr, "%llu\n", nDPId_options.memory_profiling_log_interval);
@@ -4254,6 +4341,9 @@ static int nDPId_parse_options(int argc, char ** argv)
case MAX_READER_THREADS:
nDPId_options.reader_thread_count = value_llu;
break;
+ case DAEMON_STATUS_INTERVAL:
+ nDPId_options.daemon_status_interval = value_llu;
+ break;
#ifdef ENABLE_MEMORY_PROFILING
case MEMORY_PROFILING_LOG_INTERVAL:
nDPId_options.memory_profiling_log_interval = value_llu;