summaryrefslogtreecommitdiff
path: root/nDPId.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2022-03-21 15:56:01 +0100
committerToni Uhlig <matzeton@googlemail.com>2022-03-21 15:56:01 +0100
commitc0b7bdacbc15c1cf5eaeb9faefc088aa698e94ba (patch)
tree90fcd8d0b791133082987af4aacbd24041e63bf3 /nDPId.c
parentdaaaa615197d8551457ecf926f6df30c6482a70a (diff)
Reworked nDPIsrvd.h C-API.
* nDPIsrvd.h: Provide nDPId thread storage. * nDPIsrvd.py: Fixed instance cleanup bug. * nDPIsrvd.h: Support for instance/thread user data and cleanup callback. * nDPIsrvd.h: Most recent flow time stored in thread ht instead of instance ht. * nDPId: Moved flow logger out the memory profilier into SIGUSR1 signal handling. * nDPId: Added signal fd to be usable within epoll's event handling (live-capture only!) * nDPId: Added information about ZLib compressions to daemon status/shutdown events. Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r--nDPId.c410
1 files changed, 214 insertions, 196 deletions
diff --git a/nDPId.c b/nDPId.c
index eb65aaeed..a3f6971ec 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -17,6 +17,7 @@
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/ioctl.h>
+#include <sys/signalfd.h>
#include <sys/un.h>
#include <unistd.h>
#ifdef ENABLE_ZLIB
@@ -214,6 +215,9 @@ struct nDPId_workflow
#endif
#ifdef ENABLE_ZLIB
uint64_t last_compression_scan_time;
+ uint64_t total_compressions;
+ uint64_t total_compression_diff;
+ uint64_t current_compression_diff;
#endif
uint64_t last_scan_time;
uint64_t last_status_time;
@@ -486,6 +490,7 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th
[MAX_PACKETS_PER_FLOW_TO_PROCESS] = "max-packets-per-flow-to-process",
NULL};
+static void sighandler(int signum);
static int processing_threads_error_or_eof(void);
static void free_workflow(struct nDPId_workflow ** const workflow);
static void serialize_and_send(struct nDPId_reader_thread * const reader_thread);
@@ -687,6 +692,12 @@ static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int de
flow_info->flow_extended.flow_id,
ret);
}
+ else
+ {
+ workflow->total_compressions++;
+ workflow->total_compression_diff += ret;
+ workflow->current_compression_diff += ret;
+ }
}
break;
}
@@ -1040,186 +1051,6 @@ static void log_memory_usage(struct nDPId_reader_thread const * const reader_thr
#endif
}
}
-
-struct log_flows_user_data
-{
- struct nDPId_workflow * workflow;
- unsigned long long int flows_ukn;
- unsigned long long int flows_skp;
- unsigned long long int flows_fin;
- unsigned long long int flows_inf;
- struct nDPId_flow_basic const * flows_active_fin[nDPId_MAX_FLOWS_PER_THREAD];
- struct nDPId_flow_basic const * flows_active_inf[nDPId_MAX_FLOWS_PER_THREAD];
-};
-
-static void log_flows_flow_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
-{
- struct log_flows_user_data * const log_user_data = (struct log_flows_user_data *)user_data;
- struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A;
-
- (void)depth;
-
- if (log_user_data->workflow == NULL || flow_basic == NULL)
- {
- return;
- }
-
- if (which == ndpi_preorder || which == ndpi_leaf)
- {
- switch (flow_basic->state)
- {
- case FS_UNKNOWN:
- log_user_data->flows_ukn++;
- break;
-
- case FS_COUNT:
- break;
-
- case FS_SKIPPED:
- log_user_data->flows_skp++;
- break;
-
- case FS_FINISHED:
- {
- log_user_data->flows_fin++;
- log_user_data->flows_active_fin[log_user_data->flows_fin - 1] = flow_basic;
- break;
- }
-
- case FS_INFO:
- {
- log_user_data->flows_inf++;
- log_user_data->flows_active_inf[log_user_data->flows_inf - 1] = flow_basic;
- break;
- }
- }
- }
-}
-
-static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_FLOWS_PER_THREAD],
- unsigned long long int flows_used,
- char output[BUFSIZ])
-{
- size_t output_used = 0;
- int written;
-
- output[0] = '\0';
-
- for (size_t flow_index = 0; flow_index < flows_used; ++flow_index)
- {
- struct nDPId_flow_basic const * const flow_basic = flows[flow_index];
-
- written = -1;
-
- switch (flow_basic->state)
- {
- case FS_UNKNOWN:
- case FS_COUNT:
-
- case FS_SKIPPED:
- written = 0;
- break;
-
- case FS_FINISHED:
- {
- struct nDPId_flow_finished const * const flow_finished = (struct nDPId_flow_finished const *)flow_basic;
-
-#if 1
- written =
- snprintf(output + output_used, BUFSIZ - output_used, "%llu,", flow_finished->flow_extended.flow_id);
-#else
- written = snprintf(output + output_used,
- BUFSIZ - output_used,
- "[%llu, %u, %llu],",
- flow_finished->flow_extended.flow_id,
- flow_finished->flow_extended.flow_basic.l4_protocol,
- (unsigned long long int const)flow_finished->flow_extended.flow_basic.last_seen);
-#endif
- break;
- }
-
- case FS_INFO:
- {
- struct nDPId_flow_info const * const flow_info = (struct nDPId_flow_info const *)flow_basic;
-
-#if 1
- written =
- snprintf(output + output_used, BUFSIZ - output_used, "%llu,", flow_info->flow_extended.flow_id);
-#else
- written = snprintf(output + output_used,
- BUFSIZ - output_used,
- "[%llu, %u, %llu],",
- flow_info->flow_extended.flow_id,
- flow_info->flow_extended.flow_basic.l4_protocol,
- (unsigned long long int const)flow_info->flow_extended.flow_basic.last_seen);
-#endif
- break;
- }
- }
-
- if (written <= 0)
- {
- break;
- }
- else
- {
- output_used += written;
- if (output_used >= BUFSIZ)
- {
- break;
- }
- }
- }
-
- return (output_used > 0 ? output_used - 1 : 0);
-}
-
-static void log_flows(struct nDPId_reader_thread const * const reader_thread)
-{
- struct nDPId_workflow * const workflow = reader_thread->workflow;
- struct log_flows_user_data log_user_data;
- char flows_log_str[BUFSIZ];
- size_t flows_log_str_used;
-
- log_user_data.workflow = reader_thread->workflow;
- log_user_data.flows_ukn = 0;
- log_user_data.flows_skp = 0;
- log_user_data.flows_fin = 0;
- log_user_data.flows_inf = 0;
-
- for (size_t scan_index = 0; scan_index < workflow->max_active_flows; ++scan_index)
- {
- ndpi_twalk(workflow->ndpi_flows_active[scan_index], log_flows_flow_walker, &log_user_data);
- }
-
- logger(0,
- "MemoryProfiler flow stats: [thread: %zu][unknown: %llu][skipped: %llu][finished: %llu][info: %llu]",
- reader_thread->array_index,
- log_user_data.flows_ukn,
- log_user_data.flows_skp,
- log_user_data.flows_fin,
- log_user_data.flows_inf);
-
- flows_log_str_used = log_flows_to_str(log_user_data.flows_active_fin, log_user_data.flows_fin, flows_log_str);
- if (flows_log_str_used > 0)
- {
- logger(0,
- "MemoryProfiler flows active (finished): [thread: %zu][%.*s]",
- reader_thread->array_index,
- (int)flows_log_str_used,
- flows_log_str);
- }
-
- flows_log_str_used = log_flows_to_str(log_user_data.flows_active_inf, log_user_data.flows_inf, flows_log_str);
- if (flows_log_str_used > 0)
- {
- logger(0,
- "MemoryProfiler flows active (info): [thread: %zu][%.*s]",
- reader_thread->array_index,
- (int)flows_log_str_used,
- flows_log_str);
- }
-}
#endif
static struct nDPId_workflow * init_workflow(char const * const file_or_device)
@@ -1728,9 +1559,11 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
#ifdef ENABLE_ZLIB
if (nDPId_options.enable_zlib_compression != 0 && flow_info->detection_data_compressed_size > 0)
{
+ workflow->current_compression_diff -= flow_info->detection_data_compressed_size;
int ret = detection_data_inflate(flow_info);
if (ret <= 0)
{
+ workflow->current_compression_diff += flow_info->detection_data_compressed_size;
logger(1, "zLib decompression failed with error code: %d", ret);
return;
}
@@ -2014,6 +1847,26 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu
"total-active-flows",
workflow->total_active_flows);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-idle-flows", workflow->total_idle_flows);
+#ifdef ENABLE_ZLIB
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "total-compressions",
+ workflow->total_compressions);
+ /* Compression diff's may very from run to run. Due to this, `nDPId-test' would be inconsistent. */
+#ifndef NO_MAIN
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "total-compression-diff",
+ workflow->total_compression_diff);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
+ "current-compression-diff",
+ workflow->current_compression_diff);
+#else
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-compression-diff", 0);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "current-compression-diff", 0);
+#endif
+#else
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-compressions", 0);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "current-compression-diff", 0);
+#endif
ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
"total-events-serialized",
workflow->total_events_serialized +
@@ -3179,7 +3032,6 @@ static void do_periodically_work(struct nDPId_reader_thread * const reader_threa
reader_thread->workflow->last_global_time)
{
log_memory_usage(reader_thread);
- log_flows(reader_thread);
reader_thread->workflow->last_memory_usage_log_time = reader_thread->workflow->last_global_time;
}
#endif
@@ -3696,9 +3548,11 @@ static void ndpi_process_packet(uint8_t * const args,
#ifdef ENABLE_ZLIB
if (nDPId_options.enable_zlib_compression != 0 && flow_to_process->detection_data_compressed_size > 0)
{
+ workflow->current_compression_diff -= flow_to_process->detection_data_compressed_size;
int ret = detection_data_inflate(flow_to_process);
if (ret <= 0)
{
+ workflow->current_compression_diff += flow_to_process->detection_data_compressed_size;
logger(1,
"zLib decompression failed for existing flow %llu with error code: %d",
flow_to_process->flow_extended.flow_id,
@@ -3835,6 +3689,96 @@ static void ndpi_process_packet(uint8_t * const args,
#endif
}
+static void get_current_time(struct timeval * const tval)
+{
+ gettimeofday(tval, NULL);
+}
+
+static void log_flows_flow_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
+{
+ struct nDPId_reader_thread const * const reader_thread = (struct nDPId_reader_thread *)user_data;
+ struct nDPId_flow_basic const * const flow_basic = *(struct nDPId_flow_basic **)A;
+
+ (void)depth;
+ (void)user_data;
+
+ if (flow_basic == NULL)
+ {
+ return;
+ }
+
+ if (which == ndpi_preorder || which == ndpi_leaf)
+ {
+ switch (flow_basic->state)
+ {
+ case FS_UNKNOWN:
+ break;
+
+ case FS_COUNT:
+ break;
+
+ case FS_SKIPPED:
+ break;
+
+ case FS_FINISHED:
+ {
+ struct nDPId_flow_finished const * const flow_fin = (struct nDPId_flow_finished *)flow_basic;
+
+ uint64_t last_seen = flow_fin->flow_extended.flow_basic.last_seen;
+ uint64_t idle_time = get_l4_protocol_idle_time(flow_fin->flow_extended.flow_basic.l4_protocol);
+ logger(0,
+ "[%2zu][%4llu][last-seen: %13llu][last-update: %13llu][idle-time: %7llu][time-until-timeout: "
+ "%7llu]",
+ reader_thread->array_index,
+ flow_fin->flow_extended.flow_id,
+ (unsigned long long int)last_seen,
+ (unsigned long long int)flow_fin->flow_extended.last_flow_update,
+ (unsigned long long int)idle_time,
+ (unsigned long long int)(last_seen + idle_time >= reader_thread->workflow->last_thread_time
+ ? last_seen + idle_time - reader_thread->workflow->last_thread_time
+ : 0));
+ break;
+ }
+
+ case FS_INFO:
+ {
+ struct nDPId_flow_info const * const flow_info = (struct nDPId_flow_info *)flow_basic;
+
+ uint64_t last_seen = flow_info->flow_extended.flow_basic.last_seen;
+ uint64_t idle_time = get_l4_protocol_idle_time(flow_info->flow_extended.flow_basic.l4_protocol);
+ logger(0,
+ "[%2zu][%4llu][last-seen: %13llu][last-update: %13llu][idle-time: %7llu][time-until-timeout: "
+ "%7llu]",
+ reader_thread->array_index,
+ flow_info->flow_extended.flow_id,
+ (unsigned long long int)last_seen,
+ (unsigned long long int)flow_info->flow_extended.last_flow_update,
+ (unsigned long long int)idle_time,
+ (unsigned long long int)(last_seen + idle_time >= reader_thread->workflow->last_thread_time
+ ? last_seen + idle_time - reader_thread->workflow->last_thread_time
+ : 0));
+ break;
+ }
+ }
+ }
+}
+
+static void log_all_flows(struct nDPId_reader_thread const * const reader_thread)
+{
+ struct nDPId_workflow const * const workflow = reader_thread->workflow;
+
+ logger(0,
+ "[%2zu][last-global-time: %13llu][last-thread-time: %13llu][last-scan-time: %13llu]",
+ reader_thread->array_index,
+ (unsigned long long int)workflow->last_global_time,
+ (unsigned long long int)workflow->last_thread_time,
+ (unsigned long long int)workflow->last_scan_time);
+ for (size_t scan_index = 0; scan_index < workflow->max_active_flows; ++scan_index)
+ {
+ ndpi_twalk(workflow->ndpi_flows_active[scan_index], log_flows_flow_walker, (void *)reader_thread);
+ }
+}
+
static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
{
if (reader_thread->workflow != NULL && reader_thread->workflow->pcap_handle != NULL)
@@ -3856,6 +3800,26 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
}
else
{
+ sigset_t thread_signal_set, old_signal_set;
+ sigfillset(&thread_signal_set);
+ if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0)
+ {
+ logger(1, "pthread_sigmask: %s", strerror(errno));
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ return;
+ }
+
+ sigaddset(&thread_signal_set, SIGINT);
+ sigaddset(&thread_signal_set, SIGTERM);
+ sigaddset(&thread_signal_set, SIGUSR1);
+ int signal_fd = signalfd(-1, &thread_signal_set, SFD_NONBLOCK | SFD_CLOEXEC);
+ if (signal_fd < 0)
+ {
+ logger(1, "signalfd: %s", strerror(errno));
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ return;
+ }
+
int pcap_fd = pcap_get_selectable_fd(reader_thread->workflow->pcap_handle);
if (pcap_fd < 0)
{
@@ -3874,10 +3838,18 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
struct epoll_event event = {};
event.events = EPOLLIN;
+
event.data.fd = pcap_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pcap_fd, &event) != 0)
{
- logger(1, "Could not add pcap fd %d to epoll fd %d: %s", epoll_fd, pcap_fd, strerror(errno));
+ logger(1, "Could not add pcap fd %d to epoll fd %d: %s", pcap_fd, epoll_fd, strerror(errno));
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ return;
+ }
+ event.data.fd = signal_fd;
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &event) != 0)
+ {
+ logger(1, "Could not add signal fd %d to epoll fd %d: %s", signal_fd, epoll_fd, strerror(errno));
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
return;
}
@@ -3886,8 +3858,10 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
size_t const events_size = sizeof(events) / sizeof(events[0]);
int const timeout_ms = 1000; /* TODO: Configurable? */
int nready;
+ struct timeval tval_before_epoll, tval_after_epoll;
while (nDPId_main_thread_shutdown == 0 && processing_threads_error_or_eof() == 0)
{
+ get_current_time(&tval_before_epoll);
errno = 0;
nready = epoll_wait(epoll_fd, events, events_size, timeout_ms);
if (errno != 0)
@@ -3903,7 +3877,13 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if (nready == 0)
{
- reader_thread->workflow->last_thread_time += timeout_ms;
+ struct timeval tval_diff;
+ get_current_time(&tval_after_epoll);
+ timersub(&tval_after_epoll, &tval_before_epoll, &tval_diff);
+ uint64_t tdiff_ms = tval_diff.tv_sec * 1000 + tval_diff.tv_usec / 1000;
+
+ reader_thread->workflow->last_global_time += tdiff_ms;
+ reader_thread->workflow->last_thread_time += tdiff_ms;
do_periodically_work(reader_thread);
}
@@ -3916,20 +3896,58 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
}
- switch (pcap_dispatch(
- reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread))
+ if (events[i].data.fd == signal_fd)
{
- case PCAP_ERROR:
- logger(1,
- "Error while reading from pcap device: '%s'",
- pcap_geterr(reader_thread->workflow->pcap_handle));
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
- break;
- case PCAP_ERROR_BREAK:
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
- return;
- default:
- break;
+ struct signalfd_siginfo fdsi;
+ if (read(signal_fd, &fdsi, sizeof(fdsi)) != sizeof(fdsi))
+ {
+ if (errno != EAGAIN)
+ {
+ logger(1, "Could not read signal data from fd %d: %s", signal_fd, strerror(errno));
+ }
+ }
+ else
+ {
+ char const * signame = "unknown";
+ switch (fdsi.ssi_signo)
+ {
+ case SIGINT:
+ signame = "SIGINT";
+ sighandler(SIGINT);
+ break;
+ case SIGTERM:
+ signame = "SIGTERM";
+ sighandler(SIGTERM);
+ break;
+ case SIGUSR1:
+ signame = "SIGUSR1";
+ log_all_flows(reader_thread);
+ break;
+ }
+ logger(1, "Received signal %d (%s)", fdsi.ssi_signo, signame);
+ }
+ }
+ else if (events[i].data.fd == pcap_fd)
+ {
+ switch (pcap_dispatch(
+ reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread))
+ {
+ case PCAP_ERROR:
+ logger(1,
+ "Error while reading from pcap device: '%s'",
+ pcap_geterr(reader_thread->workflow->pcap_handle));
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ break;
+ case PCAP_ERROR_BREAK:
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ return;
+ default:
+ break;
+ }
+ }
+ else
+ {
+ logger(1, "Unknown event data 0x%lx returned", events[i].data.u64);
}
}
}