diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2022-03-21 15:56:01 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-03-21 15:56:01 +0100 |
commit | c0b7bdacbc15c1cf5eaeb9faefc088aa698e94ba (patch) | |
tree | 90fcd8d0b791133082987af4aacbd24041e63bf3 /nDPId.c | |
parent | daaaa615197d8551457ecf926f6df30c6482a70a (diff) |
Reworked nDPIsrvd.h C-API.
* nDPIsrvd.h: Provide nDPId thread storage.
* nDPIsrvd.py: Fixed instance cleanup bug.
* nDPIsrvd.h: Support for instance/thread user data and cleanup callback.
* nDPIsrvd.h: Most recent flow time stored in thread ht instead of instance ht.
* nDPId: Moved flow logger out the memory profilier into SIGUSR1 signal handling.
* nDPId: Added signal fd to be usable within epoll's event handling (live-capture only!)
* nDPId: Added information about ZLib compressions to daemon status/shutdown events.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r-- | nDPId.c | 410 |
1 files changed, 214 insertions, 196 deletions
@@ -17,6 +17,7 @@ #include <stdlib.h> #include <sys/epoll.h> #include <sys/ioctl.h> +#include <sys/signalfd.h> #include <sys/un.h> #include <unistd.h> #ifdef ENABLE_ZLIB @@ -214,6 +215,9 @@ struct nDPId_workflow #endif #ifdef ENABLE_ZLIB uint64_t last_compression_scan_time; + uint64_t total_compressions; + uint64_t total_compression_diff; + uint64_t current_compression_diff; #endif uint64_t last_scan_time; uint64_t last_status_time; @@ -486,6 +490,7 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th [MAX_PACKETS_PER_FLOW_TO_PROCESS] = "max-packets-per-flow-to-process", NULL}; +static void sighandler(int signum); static int processing_threads_error_or_eof(void); static void free_workflow(struct nDPId_workflow ** const workflow); static void serialize_and_send(struct nDPId_reader_thread * const reader_thread); @@ -687,6 +692,12 @@ static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int de flow_info->flow_extended.flow_id, ret); } + else + { + workflow->total_compressions++; + workflow->total_compression_diff += ret; + workflow->current_compression_diff += ret; + } } break; } @@ -1040,186 +1051,6 @@ static void log_memory_usage(struct nDPId_reader_thread const * const reader_thr #endif } } - -struct log_flows_user_data -{ - struct nDPId_workflow * workflow; - unsigned long long int flows_ukn; - unsigned long long int flows_skp; - unsigned long long int flows_fin; - unsigned long long int flows_inf; - struct nDPId_flow_basic const * flows_active_fin[nDPId_MAX_FLOWS_PER_THREAD]; - struct nDPId_flow_basic const * flows_active_inf[nDPId_MAX_FLOWS_PER_THREAD]; -}; - -static void log_flows_flow_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) -{ - struct log_flows_user_data * const log_user_data = (struct log_flows_user_data *)user_data; - struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; - - (void)depth; - - if (log_user_data->workflow == NULL || flow_basic == NULL) - { - return; - } - - if (which == ndpi_preorder || which == ndpi_leaf) - { - switch (flow_basic->state) - { - case FS_UNKNOWN: - log_user_data->flows_ukn++; - break; - - case FS_COUNT: - break; - - case FS_SKIPPED: - log_user_data->flows_skp++; - break; - - case FS_FINISHED: - { - log_user_data->flows_fin++; - log_user_data->flows_active_fin[log_user_data->flows_fin - 1] = flow_basic; - break; - } - - case FS_INFO: - { - log_user_data->flows_inf++; - log_user_data->flows_active_inf[log_user_data->flows_inf - 1] = flow_basic; - break; - } - } - } -} - -static size_t log_flows_to_str(struct nDPId_flow_basic const * flows[nDPId_MAX_FLOWS_PER_THREAD], - unsigned long long int flows_used, - char output[BUFSIZ]) -{ - size_t output_used = 0; - int written; - - output[0] = '\0'; - - for (size_t flow_index = 0; flow_index < flows_used; ++flow_index) - { - struct nDPId_flow_basic const * const flow_basic = flows[flow_index]; - - written = -1; - - switch (flow_basic->state) - { - case FS_UNKNOWN: - case FS_COUNT: - - case FS_SKIPPED: - written = 0; - break; - - case FS_FINISHED: - { - struct nDPId_flow_finished const * const flow_finished = (struct nDPId_flow_finished const *)flow_basic; - -#if 1 - written = - snprintf(output + output_used, BUFSIZ - output_used, "%llu,", flow_finished->flow_extended.flow_id); -#else - written = snprintf(output + output_used, - BUFSIZ - output_used, - "[%llu, %u, %llu],", - flow_finished->flow_extended.flow_id, - flow_finished->flow_extended.flow_basic.l4_protocol, - (unsigned long long int const)flow_finished->flow_extended.flow_basic.last_seen); -#endif - break; - } - - case FS_INFO: - { - struct nDPId_flow_info const * const flow_info = (struct nDPId_flow_info const *)flow_basic; - -#if 1 - written = - snprintf(output + output_used, BUFSIZ - output_used, "%llu,", flow_info->flow_extended.flow_id); -#else - written = snprintf(output + output_used, - BUFSIZ - output_used, - "[%llu, %u, %llu],", - flow_info->flow_extended.flow_id, - flow_info->flow_extended.flow_basic.l4_protocol, - (unsigned long long int const)flow_info->flow_extended.flow_basic.last_seen); -#endif - break; - } - } - - if (written <= 0) - { - break; - } - else - { - output_used += written; - if (output_used >= BUFSIZ) - { - break; - } - } - } - - return (output_used > 0 ? output_used - 1 : 0); -} - -static void log_flows(struct nDPId_reader_thread const * const reader_thread) -{ - struct nDPId_workflow * const workflow = reader_thread->workflow; - struct log_flows_user_data log_user_data; - char flows_log_str[BUFSIZ]; - size_t flows_log_str_used; - - log_user_data.workflow = reader_thread->workflow; - log_user_data.flows_ukn = 0; - log_user_data.flows_skp = 0; - log_user_data.flows_fin = 0; - log_user_data.flows_inf = 0; - - for (size_t scan_index = 0; scan_index < workflow->max_active_flows; ++scan_index) - { - ndpi_twalk(workflow->ndpi_flows_active[scan_index], log_flows_flow_walker, &log_user_data); - } - - logger(0, - "MemoryProfiler flow stats: [thread: %zu][unknown: %llu][skipped: %llu][finished: %llu][info: %llu]", - reader_thread->array_index, - log_user_data.flows_ukn, - log_user_data.flows_skp, - log_user_data.flows_fin, - log_user_data.flows_inf); - - flows_log_str_used = log_flows_to_str(log_user_data.flows_active_fin, log_user_data.flows_fin, flows_log_str); - if (flows_log_str_used > 0) - { - logger(0, - "MemoryProfiler flows active (finished): [thread: %zu][%.*s]", - reader_thread->array_index, - (int)flows_log_str_used, - flows_log_str); - } - - flows_log_str_used = log_flows_to_str(log_user_data.flows_active_inf, log_user_data.flows_inf, flows_log_str); - if (flows_log_str_used > 0) - { - logger(0, - "MemoryProfiler flows active (info): [thread: %zu][%.*s]", - reader_thread->array_index, - (int)flows_log_str_used, - flows_log_str); - } -} #endif static struct nDPId_workflow * init_workflow(char const * const file_or_device) @@ -1728,9 +1559,11 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, #ifdef ENABLE_ZLIB if (nDPId_options.enable_zlib_compression != 0 && flow_info->detection_data_compressed_size > 0) { + workflow->current_compression_diff -= flow_info->detection_data_compressed_size; int ret = detection_data_inflate(flow_info); if (ret <= 0) { + workflow->current_compression_diff += flow_info->detection_data_compressed_size; logger(1, "zLib decompression failed with error code: %d", ret); return; } @@ -2014,6 +1847,26 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu "total-active-flows", workflow->total_active_flows); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-idle-flows", workflow->total_idle_flows); +#ifdef ENABLE_ZLIB + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-compressions", + workflow->total_compressions); + /* Compression diff's may very from run to run. Due to this, `nDPId-test' would be inconsistent. */ +#ifndef NO_MAIN + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-compression-diff", + workflow->total_compression_diff); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "current-compression-diff", + workflow->current_compression_diff); +#else + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-compression-diff", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "current-compression-diff", 0); +#endif +#else + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-compressions", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "current-compression-diff", 0); +#endif ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-events-serialized", workflow->total_events_serialized + @@ -3179,7 +3032,6 @@ static void do_periodically_work(struct nDPId_reader_thread * const reader_threa reader_thread->workflow->last_global_time) { log_memory_usage(reader_thread); - log_flows(reader_thread); reader_thread->workflow->last_memory_usage_log_time = reader_thread->workflow->last_global_time; } #endif @@ -3696,9 +3548,11 @@ static void ndpi_process_packet(uint8_t * const args, #ifdef ENABLE_ZLIB if (nDPId_options.enable_zlib_compression != 0 && flow_to_process->detection_data_compressed_size > 0) { + workflow->current_compression_diff -= flow_to_process->detection_data_compressed_size; int ret = detection_data_inflate(flow_to_process); if (ret <= 0) { + workflow->current_compression_diff += flow_to_process->detection_data_compressed_size; logger(1, "zLib decompression failed for existing flow %llu with error code: %d", flow_to_process->flow_extended.flow_id, @@ -3835,6 +3689,96 @@ static void ndpi_process_packet(uint8_t * const args, #endif } +static void get_current_time(struct timeval * const tval) +{ + gettimeofday(tval, NULL); +} + +static void log_flows_flow_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) +{ + struct nDPId_reader_thread const * const reader_thread = (struct nDPId_reader_thread *)user_data; + struct nDPId_flow_basic const * const flow_basic = *(struct nDPId_flow_basic **)A; + + (void)depth; + (void)user_data; + + if (flow_basic == NULL) + { + return; + } + + if (which == ndpi_preorder || which == ndpi_leaf) + { + switch (flow_basic->state) + { + case FS_UNKNOWN: + break; + + case FS_COUNT: + break; + + case FS_SKIPPED: + break; + + case FS_FINISHED: + { + struct nDPId_flow_finished const * const flow_fin = (struct nDPId_flow_finished *)flow_basic; + + uint64_t last_seen = flow_fin->flow_extended.flow_basic.last_seen; + uint64_t idle_time = get_l4_protocol_idle_time(flow_fin->flow_extended.flow_basic.l4_protocol); + logger(0, + "[%2zu][%4llu][last-seen: %13llu][last-update: %13llu][idle-time: %7llu][time-until-timeout: " + "%7llu]", + reader_thread->array_index, + flow_fin->flow_extended.flow_id, + (unsigned long long int)last_seen, + (unsigned long long int)flow_fin->flow_extended.last_flow_update, + (unsigned long long int)idle_time, + (unsigned long long int)(last_seen + idle_time >= reader_thread->workflow->last_thread_time + ? last_seen + idle_time - reader_thread->workflow->last_thread_time + : 0)); + break; + } + + case FS_INFO: + { + struct nDPId_flow_info const * const flow_info = (struct nDPId_flow_info *)flow_basic; + + uint64_t last_seen = flow_info->flow_extended.flow_basic.last_seen; + uint64_t idle_time = get_l4_protocol_idle_time(flow_info->flow_extended.flow_basic.l4_protocol); + logger(0, + "[%2zu][%4llu][last-seen: %13llu][last-update: %13llu][idle-time: %7llu][time-until-timeout: " + "%7llu]", + reader_thread->array_index, + flow_info->flow_extended.flow_id, + (unsigned long long int)last_seen, + (unsigned long long int)flow_info->flow_extended.last_flow_update, + (unsigned long long int)idle_time, + (unsigned long long int)(last_seen + idle_time >= reader_thread->workflow->last_thread_time + ? last_seen + idle_time - reader_thread->workflow->last_thread_time + : 0)); + break; + } + } + } +} + +static void log_all_flows(struct nDPId_reader_thread const * const reader_thread) +{ + struct nDPId_workflow const * const workflow = reader_thread->workflow; + + logger(0, + "[%2zu][last-global-time: %13llu][last-thread-time: %13llu][last-scan-time: %13llu]", + reader_thread->array_index, + (unsigned long long int)workflow->last_global_time, + (unsigned long long int)workflow->last_thread_time, + (unsigned long long int)workflow->last_scan_time); + for (size_t scan_index = 0; scan_index < workflow->max_active_flows; ++scan_index) + { + ndpi_twalk(workflow->ndpi_flows_active[scan_index], log_flows_flow_walker, (void *)reader_thread); + } +} + static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) { if (reader_thread->workflow != NULL && reader_thread->workflow->pcap_handle != NULL) @@ -3856,6 +3800,26 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) } else { + sigset_t thread_signal_set, old_signal_set; + sigfillset(&thread_signal_set); + if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0) + { + logger(1, "pthread_sigmask: %s", strerror(errno)); + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + return; + } + + sigaddset(&thread_signal_set, SIGINT); + sigaddset(&thread_signal_set, SIGTERM); + sigaddset(&thread_signal_set, SIGUSR1); + int signal_fd = signalfd(-1, &thread_signal_set, SFD_NONBLOCK | SFD_CLOEXEC); + if (signal_fd < 0) + { + logger(1, "signalfd: %s", strerror(errno)); + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + return; + } + int pcap_fd = pcap_get_selectable_fd(reader_thread->workflow->pcap_handle); if (pcap_fd < 0) { @@ -3874,10 +3838,18 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) struct epoll_event event = {}; event.events = EPOLLIN; + event.data.fd = pcap_fd; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pcap_fd, &event) != 0) { - logger(1, "Could not add pcap fd %d to epoll fd %d: %s", epoll_fd, pcap_fd, strerror(errno)); + logger(1, "Could not add pcap fd %d to epoll fd %d: %s", pcap_fd, epoll_fd, strerror(errno)); + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + return; + } + event.data.fd = signal_fd; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &event) != 0) + { + logger(1, "Could not add signal fd %d to epoll fd %d: %s", signal_fd, epoll_fd, strerror(errno)); __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); return; } @@ -3886,8 +3858,10 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) size_t const events_size = sizeof(events) / sizeof(events[0]); int const timeout_ms = 1000; /* TODO: Configurable? */ int nready; + struct timeval tval_before_epoll, tval_after_epoll; while (nDPId_main_thread_shutdown == 0 && processing_threads_error_or_eof() == 0) { + get_current_time(&tval_before_epoll); errno = 0; nready = epoll_wait(epoll_fd, events, events_size, timeout_ms); if (errno != 0) @@ -3903,7 +3877,13 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) if (nready == 0) { - reader_thread->workflow->last_thread_time += timeout_ms; + struct timeval tval_diff; + get_current_time(&tval_after_epoll); + timersub(&tval_after_epoll, &tval_before_epoll, &tval_diff); + uint64_t tdiff_ms = tval_diff.tv_sec * 1000 + tval_diff.tv_usec / 1000; + + reader_thread->workflow->last_global_time += tdiff_ms; + reader_thread->workflow->last_thread_time += tdiff_ms; do_periodically_work(reader_thread); } @@ -3916,20 +3896,58 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); } - switch (pcap_dispatch( - reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread)) + if (events[i].data.fd == signal_fd) { - case PCAP_ERROR: - logger(1, - "Error while reading from pcap device: '%s'", - pcap_geterr(reader_thread->workflow->pcap_handle)); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); - break; - case PCAP_ERROR_BREAK: - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); - return; - default: - break; + struct signalfd_siginfo fdsi; + if (read(signal_fd, &fdsi, sizeof(fdsi)) != sizeof(fdsi)) + { + if (errno != EAGAIN) + { + logger(1, "Could not read signal data from fd %d: %s", signal_fd, strerror(errno)); + } + } + else + { + char const * signame = "unknown"; + switch (fdsi.ssi_signo) + { + case SIGINT: + signame = "SIGINT"; + sighandler(SIGINT); + break; + case SIGTERM: + signame = "SIGTERM"; + sighandler(SIGTERM); + break; + case SIGUSR1: + signame = "SIGUSR1"; + log_all_flows(reader_thread); + break; + } + logger(1, "Received signal %d (%s)", fdsi.ssi_signo, signame); + } + } + else if (events[i].data.fd == pcap_fd) + { + switch (pcap_dispatch( + reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread)) + { + case PCAP_ERROR: + logger(1, + "Error while reading from pcap device: '%s'", + pcap_geterr(reader_thread->workflow->pcap_handle)); + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + break; + case PCAP_ERROR_BREAK: + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + return; + default: + break; + } + } + else + { + logger(1, "Unknown event data 0x%lx returned", events[i].data.u64); } } } |