summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-01-27 17:10:06 +0100
committerToni Uhlig <matzeton@googlemail.com>2021-01-27 17:19:29 +0100
commit9564b0ce2c239b02235414d908829fff9c684a8c (patch)
tree7ed1623ef3335e79b558211b9f740460ed47646c
parent102b61175ccb8a8e16444b25b319b8e3806eba1c (diff)
Increased JSON buffer size to 12288 (libnDPI serializes more and more information).
* Making Compare&Fetch mandatory. * Added some more Compare&Fetch to prevent TSAN complaining about data races. Fixed possible but more ore less harmless data races during shutdown process. * Shrink SIGNAL handler to a minimum. SYSV Signal handling and MT-safety is awkward. Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r--Makefile9
-rw-r--r--config.h2
-rw-r--r--dependencies/nDPIsrvd.py2
-rw-r--r--examples/c-captured/c-captured.c16
-rw-r--r--examples/go-dashboard/main.go2
-rw-r--r--nDPId.c124
6 files changed, 113 insertions, 42 deletions
diff --git a/Makefile b/Makefile
index e4cc0ea85..d3b70d805 100644
--- a/Makefile
+++ b/Makefile
@@ -39,6 +39,10 @@ endif
endif # PKG_CONFIG_BIN
+ifeq ($(ENABLE_MEMORY_PROFILING),yes)
+PROJECT_CFLAGS += -DENABLE_MEMORY_PROFILING=1
+endif
+
ifeq ($(ENABLE_DEBUG),yes)
PROJECT_CFLAGS += -O0 -g3 -fno-omit-frame-pointer -fno-inline
endif
@@ -120,6 +124,11 @@ else
@echo 'NDPI_WITH_PCRE = no'
endif
endif # PKG_CONFIG_BIN
+ifeq ($(ENABLE_MEMORY_PROFILING),yes)
+ @echo 'ENABLE_MEMORY_PROFILING = yes'
+else
+ @echo 'ENABLE_MEMORY_PROFILING = no'
+endif
ifeq ($(ENABLE_DEBUG),yes)
@echo 'ENABLE_DEBUG = yes'
else
diff --git a/config.h b/config.h
index c345f15cf..75fcf0e16 100644
--- a/config.h
+++ b/config.h
@@ -11,7 +11,7 @@
* NOTE: Buffer size needs to keep in sync with other implementations
* e.g. dependencies/nDPIsrvd.py
*/
-#define NETWORK_BUFFER_MAX_SIZE 9728 /* 8192 + 1024 + 512 */
+#define NETWORK_BUFFER_MAX_SIZE 12288 /* 8192 + 4096 */
/* nDPId default config options */
#define nDPId_PIDFILE "/tmp/ndpid.pid"
diff --git a/dependencies/nDPIsrvd.py b/dependencies/nDPIsrvd.py
index d39858ede..573792bba 100644
--- a/dependencies/nDPIsrvd.py
+++ b/dependencies/nDPIsrvd.py
@@ -20,7 +20,7 @@ DEFAULT_PORT = 7000
DEFAULT_UNIX = '/tmp/ndpid-distributor.sock'
NETWORK_BUFFER_MIN_SIZE = 5
-NETWORK_BUFFER_MAX_SIZE = 9728 # Please keep this value in sync with the one in config.h
+NETWORK_BUFFER_MAX_SIZE = 12288 # Please keep this value in sync with the one in config.h
PKT_TYPE_ETH_IP4 = 0x0800
PKT_TYPE_ETH_IP6 = 0x86DD
diff --git a/examples/c-captured/c-captured.c b/examples/c-captured/c-captured.c
index a7370ccbd..b75b4e463 100644
--- a/examples/c-captured/c-captured.c
+++ b/examples/c-captured/c-captured.c
@@ -93,7 +93,8 @@ static char * generate_pcap_filename(struct nDPIsrvd_flow const * const flow,
{
if (flow_user->guessed != 0 || flow_user->detected == 0)
{
- int ret = snprintf(dest, size, "flow-%s-%s.pcap", (flow_user->guessed != 0 ? "guessed" : "undetected"), flow->id);
+ int ret =
+ snprintf(dest, size, "flow-%s-%s.pcap", (flow_user->guessed != 0 ? "guessed" : "undetected"), flow->id);
if (ret <= 0 || (size_t)ret > size)
{
return NULL;
@@ -220,16 +221,19 @@ enum nDPIsrvd_callback_return nDPIsrvd_json_callback(struct nDPIsrvd_socket * co
utarray_push_back(flow_user->packets, &cb_user_data->tmp.pkt);
}
flow_user->pkt_datalink = cb_user_data->tmp.pkt_datalink;
- } else {
- if (cb_user_data->tmp.guessed != 0) {
+ }
+ else
+ {
+ if (cb_user_data->tmp.guessed != 0)
+ {
flow_user->guessed = cb_user_data->tmp.guessed;
}
- if (cb_user_data->tmp.detected != 0) {
+ if (cb_user_data->tmp.detected != 0)
+ {
flow_user->detected = cb_user_data->tmp.detected;
}
}
- if (cb_user_data->tmp.flow_end_or_idle == 1 &&
- (flow_user->guessed != 0 || flow_user->detected == 0))
+ if (cb_user_data->tmp.flow_end_or_idle == 1 && (flow_user->guessed != 0 || flow_user->detected == 0))
{
if (flow_user->packets != NULL)
{
diff --git a/examples/go-dashboard/main.go b/examples/go-dashboard/main.go
index efd01974b..f1acfebb3 100644
--- a/examples/go-dashboard/main.go
+++ b/examples/go-dashboard/main.go
@@ -19,7 +19,7 @@ var (
InfoLogger *log.Logger
ErrorLogger *log.Logger
- NETWORK_BUFFER_MAX_SIZE uint16 = 9216
+ NETWORK_BUFFER_MAX_SIZE uint16 = 12288
nDPIsrvd_JSON_BYTES uint16 = 4
)
diff --git a/nDPId.c b/nDPId.c
index 433ebd65b..41a7f1442 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -26,6 +26,10 @@
#error "nDPI >= 3.3.0 requiired"
#endif
+#ifndef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_8
+#error "Compare and Fetch aka __sync_fetch_and_add not available on your platform!"
+#endif
+
enum nDPId_l3_type
{
L3_IP,
@@ -113,9 +117,7 @@ struct nDPId_workflow
{
pcap_t * pcap_handle;
- uint8_t error_or_eof : 1;
- uint8_t reserved_00 : 7;
- uint8_t reserved_01[3];
+ int error_or_eof;
unsigned long long int packets_captured;
unsigned long long int packets_processed;
@@ -251,7 +253,14 @@ static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = {
static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {};
int main_thread_shutdown = 0;
-static uint32_t global_flow_id = 0;
+static uint64_t global_flow_id = 0;
+
+#ifdef ENABLE_MEMORY_PROFILING
+static uint64_t ndpi_memory_alloc_count = 0;
+static uint64_t ndpi_memory_alloc_bytes = 0;
+static uint64_t ndpi_memory_free_count = 0;
+static uint64_t ndpi_memory_free_bytes = 0;
+#endif
static char * pcap_file_or_interface = NULL;
static union nDPId_ip pcap_dev_ip = {};
@@ -457,12 +466,48 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev)
return 1;
}
+#ifdef ENABLE_MEMORY_PROFILING
+static void * ndpi_malloc_wrapper(size_t const size)
+{
+ void * p = malloc(sizeof(uint64_t) + size);
+
+ if (p == NULL)
+ {
+ return NULL;
+ }
+ *(uint64_t *)p = size;
+
+ __sync_fetch_and_add(&ndpi_memory_alloc_count, 1);
+ __sync_fetch_and_add(&ndpi_memory_alloc_bytes, size);
+
+ return (uint8_t *)p + sizeof(uint64_t);
+}
+
+static void ndpi_free_wrapper(void * const freeable)
+{
+ void * p = (uint8_t *)freeable - sizeof(uint64_t);
+
+ __sync_fetch_and_add(&ndpi_memory_free_count, 1);
+ __sync_fetch_and_add(&ndpi_memory_free_bytes, *(uint64_t *)p);
+
+ free(p);
+}
+#endif
+
static struct nDPId_workflow * init_workflow(char const * const file_or_device)
{
int pcap_argument_is_file = 0;
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
- struct nDPId_workflow * workflow = (struct nDPId_workflow *)ndpi_calloc(1, sizeof(*workflow));
+ struct nDPId_workflow * workflow;
+#ifdef ENABLE_MEMORY_PROFILING
+ set_ndpi_malloc(ndpi_malloc_wrapper);
+ set_ndpi_free(ndpi_free_wrapper);
+ set_ndpi_flow_malloc(NULL);
+ set_ndpi_flow_free(NULL);
+#endif
+
+ workflow = (struct nDPId_workflow *)ndpi_calloc(1, sizeof(*workflow));
if (workflow == NULL)
{
return NULL;
@@ -855,6 +900,27 @@ static void check_for_idle_flows(struct nDPId_reader_thread * const reader_threa
if (workflow->last_idle_scan_time + idle_scan_period < workflow->last_time)
{
+#ifdef ENABLE_MEMORY_PROFILING
+ if (reader_thread->array_index == 0)
+ {
+ uint64_t alloc_count = __sync_fetch_and_add(&ndpi_memory_alloc_count, 0);
+ uint64_t free_count = __sync_fetch_and_add(&ndpi_memory_free_count, 0);
+ uint64_t alloc_bytes = __sync_fetch_and_add(&ndpi_memory_alloc_bytes, 0);
+ uint64_t free_bytes = __sync_fetch_and_add(&ndpi_memory_free_bytes, 0);
+
+ syslog(LOG_DAEMON,
+ "MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in "
+ "use, "
+ "%llu bytes in use",
+ alloc_count,
+ free_count,
+ alloc_bytes,
+ free_bytes,
+ alloc_count - free_count,
+ alloc_bytes - free_bytes);
+ }
+#endif
+
for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index)
{
ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow);
@@ -2080,12 +2146,7 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->cur_active_flows++;
workflow->total_active_flows++;
-#ifdef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_4
flow_to_process->flow_id = __sync_fetch_and_add(&global_flow_id, 1);
-#else
-#warning "Compare and Fetch aka __sync_fetch_and_add not available on your platform/compiler, do not trust any flow_id!"
- flow_to_process->flow_id = global_flow_id++;
-#endif
memset(&flow_to_process->ndpi_flow,
0,
@@ -2223,7 +2284,7 @@ static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread
syslog(LOG_DAEMON | LOG_ERR,
"Error while reading pcap file: '%s'",
pcap_geterr(reader_thread->workflow->pcap_handle));
- reader_thread->workflow->error_or_eof = 1;
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
}
}
}
@@ -2259,7 +2320,7 @@ static void * processing_thread(void * const ndpi_thread_arg)
run_pcap_loop(reader_thread);
fcntl(reader_thread->json_sockfd, F_SETFL, fcntl(reader_thread->json_sockfd, F_GETFL, 0) & ~O_NONBLOCK);
- reader_thread->workflow->error_or_eof = 1;
+ __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
return NULL;
}
@@ -2267,7 +2328,7 @@ static int processing_threads_error_or_eof(void)
{
for (unsigned long long int i = 0; i < reader_thread_count; ++i)
{
- if (reader_threads[i].workflow->error_or_eof == 0)
+ if (__sync_fetch_and_add(&reader_threads[i].workflow->error_or_eof, 0) == 0)
{
return 0;
}
@@ -2460,21 +2521,11 @@ static void free_reader_threads(void)
static void sighandler(int signum)
{
- syslog(LOG_DAEMON | LOG_NOTICE, "Received SIGNAL %d", signum);
+ (void)signum;
- if (main_thread_shutdown == 0)
- {
- syslog(LOG_DAEMON | LOG_NOTICE, "Stopping reader threads.");
- main_thread_shutdown = 1;
- if (stop_reader_threads() != 0)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Failed to stop reader threads!");
- exit(EXIT_FAILURE);
- }
- }
- else
+ if (__sync_fetch_and_add(&main_thread_shutdown, 0) == 0)
{
- syslog(LOG_DAEMON | LOG_NOTICE, "Reader threads are already shutting down, please be patient.");
+ __sync_fetch_and_add(&main_thread_shutdown, 1);
}
}
@@ -2687,18 +2738,22 @@ static int validate_options(char const * const arg0)
{
int retval = 0;
- if (instance_alias == NULL) {
+ if (instance_alias == NULL)
+ {
char hname[256];
errno = 0;
- if (gethostname(hname, sizeof(hname)) != 0) {
+ if (gethostname(hname, sizeof(hname)) != 0)
+ {
fprintf(stderr, "%s: Could not retrieve your hostname: %s\n", arg0, strerror(errno));
retval = 1;
- } else {
+ }
+ else
+ {
instance_alias = strdup(hname);
- fprintf(stderr,
- "%s: No instance alias given, using your hostname '%s'\n", arg0, instance_alias);
- if (instance_alias == NULL) {
+ fprintf(stderr, "%s: No instance alias given, using your hostname '%s'\n", arg0, instance_alias);
+ if (instance_alias == NULL)
+ {
retval = 1;
}
}
@@ -2807,6 +2862,9 @@ int main(int argc, char ** argv)
}
openlog("nDPId", LOG_CONS | LOG_PERROR, LOG_DAEMON);
+#ifdef ENABLE_MEMORY_PROFILING
+ syslog(LOG_DAEMON, "size/processed-flow: %zu bytes\n", sizeof(struct nDPId_flow_info));
+#endif
if (setup_reader_threads() != 0)
{
@@ -2829,7 +2887,7 @@ int main(int argc, char ** argv)
sleep(1);
}
- if (main_thread_shutdown == 0 && stop_reader_threads() != 0)
+ if (main_thread_shutdown == 1 && stop_reader_threads() != 0)
{
syslog(LOG_DAEMON | LOG_ERR, "stop_reader_threads");
return 1;