diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-01-27 17:10:06 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2021-01-27 17:19:29 +0100 |
commit | 9564b0ce2c239b02235414d908829fff9c684a8c (patch) | |
tree | 7ed1623ef3335e79b558211b9f740460ed47646c | |
parent | 102b61175ccb8a8e16444b25b319b8e3806eba1c (diff) |
Increased JSON buffer size to 12288 (libnDPI serializes more and more information).
* Making Compare&Fetch mandatory.
* Added some more Compare&Fetch to prevent TSAN complaining about data races.
Fixed possible but more ore less harmless data races during shutdown process.
* Shrink SIGNAL handler to a minimum. SYSV Signal handling and MT-safety is awkward.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r-- | Makefile | 9 | ||||
-rw-r--r-- | config.h | 2 | ||||
-rw-r--r-- | dependencies/nDPIsrvd.py | 2 | ||||
-rw-r--r-- | examples/c-captured/c-captured.c | 16 | ||||
-rw-r--r-- | examples/go-dashboard/main.go | 2 | ||||
-rw-r--r-- | nDPId.c | 124 |
6 files changed, 113 insertions, 42 deletions
@@ -39,6 +39,10 @@ endif endif # PKG_CONFIG_BIN +ifeq ($(ENABLE_MEMORY_PROFILING),yes) +PROJECT_CFLAGS += -DENABLE_MEMORY_PROFILING=1 +endif + ifeq ($(ENABLE_DEBUG),yes) PROJECT_CFLAGS += -O0 -g3 -fno-omit-frame-pointer -fno-inline endif @@ -120,6 +124,11 @@ else @echo 'NDPI_WITH_PCRE = no' endif endif # PKG_CONFIG_BIN +ifeq ($(ENABLE_MEMORY_PROFILING),yes) + @echo 'ENABLE_MEMORY_PROFILING = yes' +else + @echo 'ENABLE_MEMORY_PROFILING = no' +endif ifeq ($(ENABLE_DEBUG),yes) @echo 'ENABLE_DEBUG = yes' else @@ -11,7 +11,7 @@ * NOTE: Buffer size needs to keep in sync with other implementations * e.g. dependencies/nDPIsrvd.py */ -#define NETWORK_BUFFER_MAX_SIZE 9728 /* 8192 + 1024 + 512 */ +#define NETWORK_BUFFER_MAX_SIZE 12288 /* 8192 + 4096 */ /* nDPId default config options */ #define nDPId_PIDFILE "/tmp/ndpid.pid" diff --git a/dependencies/nDPIsrvd.py b/dependencies/nDPIsrvd.py index d39858ede..573792bba 100644 --- a/dependencies/nDPIsrvd.py +++ b/dependencies/nDPIsrvd.py @@ -20,7 +20,7 @@ DEFAULT_PORT = 7000 DEFAULT_UNIX = '/tmp/ndpid-distributor.sock' NETWORK_BUFFER_MIN_SIZE = 5 -NETWORK_BUFFER_MAX_SIZE = 9728 # Please keep this value in sync with the one in config.h +NETWORK_BUFFER_MAX_SIZE = 12288 # Please keep this value in sync with the one in config.h PKT_TYPE_ETH_IP4 = 0x0800 PKT_TYPE_ETH_IP6 = 0x86DD diff --git a/examples/c-captured/c-captured.c b/examples/c-captured/c-captured.c index a7370ccbd..b75b4e463 100644 --- a/examples/c-captured/c-captured.c +++ b/examples/c-captured/c-captured.c @@ -93,7 +93,8 @@ static char * generate_pcap_filename(struct nDPIsrvd_flow const * const flow, { if (flow_user->guessed != 0 || flow_user->detected == 0) { - int ret = snprintf(dest, size, "flow-%s-%s.pcap", (flow_user->guessed != 0 ? "guessed" : "undetected"), flow->id); + int ret = + snprintf(dest, size, "flow-%s-%s.pcap", (flow_user->guessed != 0 ? "guessed" : "undetected"), flow->id); if (ret <= 0 || (size_t)ret > size) { return NULL; @@ -220,16 +221,19 @@ enum nDPIsrvd_callback_return nDPIsrvd_json_callback(struct nDPIsrvd_socket * co utarray_push_back(flow_user->packets, &cb_user_data->tmp.pkt); } flow_user->pkt_datalink = cb_user_data->tmp.pkt_datalink; - } else { - if (cb_user_data->tmp.guessed != 0) { + } + else + { + if (cb_user_data->tmp.guessed != 0) + { flow_user->guessed = cb_user_data->tmp.guessed; } - if (cb_user_data->tmp.detected != 0) { + if (cb_user_data->tmp.detected != 0) + { flow_user->detected = cb_user_data->tmp.detected; } } - if (cb_user_data->tmp.flow_end_or_idle == 1 && - (flow_user->guessed != 0 || flow_user->detected == 0)) + if (cb_user_data->tmp.flow_end_or_idle == 1 && (flow_user->guessed != 0 || flow_user->detected == 0)) { if (flow_user->packets != NULL) { diff --git a/examples/go-dashboard/main.go b/examples/go-dashboard/main.go index efd01974b..f1acfebb3 100644 --- a/examples/go-dashboard/main.go +++ b/examples/go-dashboard/main.go @@ -19,7 +19,7 @@ var ( InfoLogger *log.Logger ErrorLogger *log.Logger - NETWORK_BUFFER_MAX_SIZE uint16 = 9216 + NETWORK_BUFFER_MAX_SIZE uint16 = 12288 nDPIsrvd_JSON_BYTES uint16 = 4 ) @@ -26,6 +26,10 @@ #error "nDPI >= 3.3.0 requiired" #endif +#ifndef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_8 +#error "Compare and Fetch aka __sync_fetch_and_add not available on your platform!" +#endif + enum nDPId_l3_type { L3_IP, @@ -113,9 +117,7 @@ struct nDPId_workflow { pcap_t * pcap_handle; - uint8_t error_or_eof : 1; - uint8_t reserved_00 : 7; - uint8_t reserved_01[3]; + int error_or_eof; unsigned long long int packets_captured; unsigned long long int packets_processed; @@ -251,7 +253,14 @@ static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = { static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {}; int main_thread_shutdown = 0; -static uint32_t global_flow_id = 0; +static uint64_t global_flow_id = 0; + +#ifdef ENABLE_MEMORY_PROFILING +static uint64_t ndpi_memory_alloc_count = 0; +static uint64_t ndpi_memory_alloc_bytes = 0; +static uint64_t ndpi_memory_free_count = 0; +static uint64_t ndpi_memory_free_bytes = 0; +#endif static char * pcap_file_or_interface = NULL; static union nDPId_ip pcap_dev_ip = {}; @@ -457,12 +466,48 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev) return 1; } +#ifdef ENABLE_MEMORY_PROFILING +static void * ndpi_malloc_wrapper(size_t const size) +{ + void * p = malloc(sizeof(uint64_t) + size); + + if (p == NULL) + { + return NULL; + } + *(uint64_t *)p = size; + + __sync_fetch_and_add(&ndpi_memory_alloc_count, 1); + __sync_fetch_and_add(&ndpi_memory_alloc_bytes, size); + + return (uint8_t *)p + sizeof(uint64_t); +} + +static void ndpi_free_wrapper(void * const freeable) +{ + void * p = (uint8_t *)freeable - sizeof(uint64_t); + + __sync_fetch_and_add(&ndpi_memory_free_count, 1); + __sync_fetch_and_add(&ndpi_memory_free_bytes, *(uint64_t *)p); + + free(p); +} +#endif + static struct nDPId_workflow * init_workflow(char const * const file_or_device) { int pcap_argument_is_file = 0; char pcap_error_buffer[PCAP_ERRBUF_SIZE]; - struct nDPId_workflow * workflow = (struct nDPId_workflow *)ndpi_calloc(1, sizeof(*workflow)); + struct nDPId_workflow * workflow; +#ifdef ENABLE_MEMORY_PROFILING + set_ndpi_malloc(ndpi_malloc_wrapper); + set_ndpi_free(ndpi_free_wrapper); + set_ndpi_flow_malloc(NULL); + set_ndpi_flow_free(NULL); +#endif + + workflow = (struct nDPId_workflow *)ndpi_calloc(1, sizeof(*workflow)); if (workflow == NULL) { return NULL; @@ -855,6 +900,27 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa if (workflow->last_idle_scan_time + idle_scan_period < workflow->last_time) { +#ifdef ENABLE_MEMORY_PROFILING + if (reader_thread->array_index == 0) + { + uint64_t alloc_count = __sync_fetch_and_add(&ndpi_memory_alloc_count, 0); + uint64_t free_count = __sync_fetch_and_add(&ndpi_memory_free_count, 0); + uint64_t alloc_bytes = __sync_fetch_and_add(&ndpi_memory_alloc_bytes, 0); + uint64_t free_bytes = __sync_fetch_and_add(&ndpi_memory_free_bytes, 0); + + syslog(LOG_DAEMON, + "MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in " + "use, " + "%llu bytes in use", + alloc_count, + free_count, + alloc_bytes, + free_bytes, + alloc_count - free_count, + alloc_bytes - free_bytes); + } +#endif + for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index) { ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow); @@ -2080,12 +2146,7 @@ static void ndpi_process_packet(uint8_t * const args, workflow->cur_active_flows++; workflow->total_active_flows++; -#ifdef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_4 flow_to_process->flow_id = __sync_fetch_and_add(&global_flow_id, 1); -#else -#warning "Compare and Fetch aka __sync_fetch_and_add not available on your platform/compiler, do not trust any flow_id!" - flow_to_process->flow_id = global_flow_id++; -#endif memset(&flow_to_process->ndpi_flow, 0, @@ -2223,7 +2284,7 @@ static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread syslog(LOG_DAEMON | LOG_ERR, "Error while reading pcap file: '%s'", pcap_geterr(reader_thread->workflow->pcap_handle)); - reader_thread->workflow->error_or_eof = 1; + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); } } } @@ -2259,7 +2320,7 @@ static void * processing_thread(void * const ndpi_thread_arg) run_pcap_loop(reader_thread); fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & ~O_NONBLOCK); - reader_thread->workflow->error_or_eof = 1; + __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); return NULL; } @@ -2267,7 +2328,7 @@ static int processing_threads_error_or_eof(void) { for (unsigned long long int i = 0; i < reader_thread_count; ++i) { - if (reader_threads[i].workflow->error_or_eof == 0) + if (__sync_fetch_and_add(&reader_threads[i].workflow->error_or_eof, 0) == 0) { return 0; } @@ -2460,21 +2521,11 @@ static void free_reader_threads(void) static void sighandler(int signum) { - syslog(LOG_DAEMON | LOG_NOTICE, "Received SIGNAL %d", signum); + (void)signum; - if (main_thread_shutdown == 0) - { - syslog(LOG_DAEMON | LOG_NOTICE, "Stopping reader threads."); - main_thread_shutdown = 1; - if (stop_reader_threads() != 0) - { - syslog(LOG_DAEMON | LOG_ERR, "Failed to stop reader threads!"); - exit(EXIT_FAILURE); - } - } - else + if (__sync_fetch_and_add(&main_thread_shutdown, 0) == 0) { - syslog(LOG_DAEMON | LOG_NOTICE, "Reader threads are already shutting down, please be patient."); + __sync_fetch_and_add(&main_thread_shutdown, 1); } } @@ -2687,18 +2738,22 @@ static int validate_options(char const * const arg0) { int retval = 0; - if (instance_alias == NULL) { + if (instance_alias == NULL) + { char hname[256]; errno = 0; - if (gethostname(hname, sizeof(hname)) != 0) { + if (gethostname(hname, sizeof(hname)) != 0) + { fprintf(stderr, "%s: Could not retrieve your hostname: %s\n", arg0, strerror(errno)); retval = 1; - } else { + } + else + { instance_alias = strdup(hname); - fprintf(stderr, - "%s: No instance alias given, using your hostname '%s'\n", arg0, instance_alias); - if (instance_alias == NULL) { + fprintf(stderr, "%s: No instance alias given, using your hostname '%s'\n", arg0, instance_alias); + if (instance_alias == NULL) + { retval = 1; } } @@ -2807,6 +2862,9 @@ int main(int argc, char ** argv) } openlog("nDPId", LOG_CONS | LOG_PERROR, LOG_DAEMON); +#ifdef ENABLE_MEMORY_PROFILING + syslog(LOG_DAEMON, "size/processed-flow: %zu bytes\n", sizeof(struct nDPId_flow_info)); +#endif if (setup_reader_threads() != 0) { @@ -2829,7 +2887,7 @@ int main(int argc, char ** argv) sleep(1); } - if (main_thread_shutdown == 0 && stop_reader_threads() != 0) + if (main_thread_shutdown == 1 && stop_reader_threads() != 0) { syslog(LOG_DAEMON | LOG_ERR, "stop_reader_threads"); return 1; |