aboutsummaryrefslogtreecommitdiff
path: root/nDPId.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2022-09-17 18:27:17 +0200
committerToni Uhlig <matzeton@googlemail.com>2022-09-17 18:27:17 +0200
commitefaa76e9786b64af644c338e47497e9aa18eb41e (patch)
treecb74fb381811ed56859cc809faaab5fdb3fd0e40 /nDPId.c
parentb3e9af495c79f6c8c68c0eb36df67dfa5ede4e16 (diff)
Provide thread sync via locking on architectures that do not support Compare&Swap.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r--nDPId.c156
1 files changed, 102 insertions, 54 deletions
diff --git a/nDPId.c b/nDPId.c
index 44753fb1a..7840e1afa 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -53,10 +53,6 @@
#error "nDPI >= 4.4.0 or API version >= 6336 required"
#endif
-#if !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) || !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8)
-#error "Compare and Swap aka __sync_fetch_and_add not available on your platform!"
-#endif
-
#if nDPId_MAX_READER_THREADS <= 0
#error "Invalid value for nDPId_MAX_READER_THREADS"
#endif
@@ -66,6 +62,56 @@
#error "Invalid value for nDPId_FLOW_SCAN_INTERVAL"
#endif
+/* MIPS* does not support Compare and Swap. Use traditional locking as fallback. */
+#if !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) || !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8)
+#define MT_VALUE(name, type) \
+ struct name \
+ { \
+ volatile uint64_t var; \
+ pthread_mutex_t var_mutex; \
+ } name
+#define MT_INIT(value) \
+ { \
+ value, PTHREAD_MUTEX_INITIALIZER \
+ }
+#define MT_INIT2(name, value) \
+ do \
+ { \
+ name.var = value; \
+ pthread_mutex_init(&name.var_mutex, NULL); \
+ } while (0)
+static inline uint64_t mt_pt_get_and_add(volatile uint64_t * value, uint64_t add, pthread_mutex_t * mutex)
+{
+ uint64_t result;
+ pthread_mutex_lock(mutex);
+ result = *value;
+ *value += add;
+ pthread_mutex_unlock(mutex);
+ return result;
+}
+#define MT_GET_AND_ADD(name, value) mt_pt_get_and_add(&name.var, value, &name.var_mutex)
+static inline uint64_t mt_pt_get_and_sub(volatile uint64_t * value, uint64_t sub, pthread_mutex_t * mutex)
+{
+ uint64_t result;
+ pthread_mutex_lock(mutex);
+ result = *value;
+ *value -= sub;
+ pthread_mutex_unlock(mutex);
+ return result;
+}
+#define MT_GET_AND_SUB(name, value) mt_pt_get_and_sub(&name.var, value, &name.var_mutex)
+#else
+#define MT_VALUE(name, type) volatile type name
+#define MT_INIT(value) value
+#define MT_INIT2(name, value) \
+ do \
+ { \
+ name = value; \
+ } while (0)
+#define MT_GET_AND_ADD(name, value) __sync_fetch_and_add(&name, value)
+#define MT_GET_AND_SUB(name, value) __sync_fetch_and_sub(&name, value)
+#endif
+
enum nDPId_l3_type
{
L3_IP,
@@ -200,7 +246,7 @@ struct nDPId_workflow
{
pcap_t * pcap_handle;
- uint8_t error_or_eof;
+ MT_VALUE(error_or_eof, uint8_t);
uint8_t is_pcap_file;
uint8_t max_flow_to_track_reached : 1;
@@ -375,20 +421,20 @@ static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = {
static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {};
static struct nDPIsrvd_address collector_address;
-static volatile int nDPId_main_thread_shutdown = 0;
-static volatile uint64_t global_flow_id = 1;
+static MT_VALUE(nDPId_main_thread_shutdown, int) = MT_INIT(0);
+static MT_VALUE(global_flow_id, uint64_t) = MT_INIT(1);
static int ip4_interface_avail = 0, ip6_interface_avail = 0;
#ifdef ENABLE_MEMORY_PROFILING
-static volatile uint64_t ndpi_memory_alloc_count = 0;
-static volatile uint64_t ndpi_memory_alloc_bytes = 0;
-static volatile uint64_t ndpi_memory_free_count = 0;
-static volatile uint64_t ndpi_memory_free_bytes = 0;
+static MT_VALUE(ndpi_memory_alloc_count, uint64_t) = MT_INIT(0);
+static MT_VALUE(ndpi_memory_alloc_bytes, uint64_t) = MT_INIT(0);
+static MT_VALUE(ndpi_memory_free_count, uint64_t) = MT_INIT(0);
+static MT_VALUE(ndpi_memory_free_bytes, uint64_t) = MT_INIT(0);
#ifdef ENABLE_ZLIB
-static volatile uint64_t zlib_compressions = 0;
-static volatile uint64_t zlib_decompressions = 0;
-static volatile uint64_t zlib_compression_diff = 0;
-static volatile uint64_t zlib_compression_bytes = 0;
+static MT_VALUE(zlib_compressions, uint64_t) = MT_INIT(0);
+static MT_VALUE(zlib_decompressions, uint64_t) = MT_INIT(0);
+static MT_VALUE(zlib_compression_diff, uint64_t) = MT_INIT(0);
+static MT_VALUE(zlib_compression_bytes, uint64_t) = MT_INIT(0);
#endif
#endif
@@ -595,9 +641,9 @@ static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstL
{
ret = strm.total_out;
#ifdef ENABLE_MEMORY_PROFILING
- __sync_fetch_and_add(&zlib_compressions, 1);
- __sync_fetch_and_add(&zlib_compression_diff, srcLen - ret);
- __sync_fetch_and_add(&zlib_compression_bytes, ret);
+ MT_GET_AND_ADD(zlib_compressions, 1);
+ MT_GET_AND_ADD(zlib_compression_diff, srcLen - ret);
+ MT_GET_AND_ADD(zlib_compression_bytes, ret);
#endif
}
else
@@ -639,8 +685,8 @@ static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen)
{
ret = strm.total_out;
#ifdef ENABLE_MEMORY_PROFILING
- __sync_fetch_and_add(&zlib_decompressions, 1);
- __sync_fetch_and_sub(&zlib_compression_diff, ret - srcLen);
+ MT_GET_AND_ADD(zlib_decompressions, 1);
+ MT_GET_AND_SUB(zlib_compression_diff, ret - srcLen);
#endif
}
else
@@ -1070,8 +1116,8 @@ static void * ndpi_malloc_wrapper(size_t const size)
}
*(uint64_t *)p = size;
- __sync_fetch_and_add(&ndpi_memory_alloc_count, 1);
- __sync_fetch_and_add(&ndpi_memory_alloc_bytes, size);
+ MT_GET_AND_ADD(ndpi_memory_alloc_count, 1);
+ MT_GET_AND_ADD(ndpi_memory_alloc_bytes, size);
return (uint8_t *)p + sizeof(uint64_t);
}
@@ -1080,8 +1126,8 @@ static void ndpi_free_wrapper(void * const freeable)
{
void * p = (uint8_t *)freeable - sizeof(uint64_t);
- __sync_fetch_and_add(&ndpi_memory_free_count, 1);
- __sync_fetch_and_add(&ndpi_memory_free_bytes, *(uint64_t *)p);
+ MT_GET_AND_ADD(ndpi_memory_free_count, 1);
+ MT_GET_AND_ADD(ndpi_memory_free_bytes, *(uint64_t *)p);
free(p);
}
@@ -1090,10 +1136,10 @@ static void log_memory_usage(struct nDPId_reader_thread const * const reader_thr
{
if (reader_thread->array_index == 0)
{
- uint64_t alloc_count = __sync_fetch_and_add(&ndpi_memory_alloc_count, 0);
- uint64_t free_count = __sync_fetch_and_add(&ndpi_memory_free_count, 0);
- uint64_t alloc_bytes = __sync_fetch_and_add(&ndpi_memory_alloc_bytes, 0);
- uint64_t free_bytes = __sync_fetch_and_add(&ndpi_memory_free_bytes, 0);
+ uint64_t alloc_count = MT_GET_AND_ADD(ndpi_memory_alloc_count, 0);
+ uint64_t free_count = MT_GET_AND_ADD(ndpi_memory_free_count, 0);
+ uint64_t alloc_bytes = MT_GET_AND_ADD(ndpi_memory_alloc_bytes, 0);
+ uint64_t free_bytes = MT_GET_AND_ADD(ndpi_memory_free_bytes, 0);
logger(0,
"MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in "
@@ -1106,10 +1152,10 @@ static void log_memory_usage(struct nDPId_reader_thread const * const reader_thr
(long long unsigned int)(alloc_count - free_count),
(long long unsigned int)(alloc_bytes - free_bytes));
#ifdef ENABLE_ZLIB
- uint64_t zlib_compression_count = __sync_fetch_and_add(&zlib_compressions, 0);
- uint64_t zlib_decompression_count = __sync_fetch_and_add(&zlib_decompressions, 0);
- uint64_t zlib_bytes_diff = __sync_fetch_and_add(&zlib_compression_diff, 0);
- uint64_t zlib_bytes_total = __sync_fetch_and_add(&zlib_compression_bytes, 0);
+ uint64_t zlib_compression_count = MT_GET_AND_ADD(zlib_compressions, 0);
+ uint64_t zlib_decompression_count = MT_GET_AND_ADD(zlib_decompressions, 0);
+ uint64_t zlib_bytes_diff = MT_GET_AND_ADD(zlib_compression_diff, 0);
+ uint64_t zlib_bytes_total = MT_GET_AND_ADD(zlib_compression_bytes, 0);
logger(0,
"MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu compressed blocks in use, %llu "
@@ -1142,6 +1188,8 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
return NULL;
}
+ MT_INIT2(workflow->error_or_eof, 0);
+
errno = 0;
if (access(file_or_device, R_OK) != 0 && errno == ENOENT)
{
@@ -2999,7 +3047,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
return 1;
}
- struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset];
+ struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset];
*ip_offset = sizeof(struct ndpi_chdlc);
*layer3_type = ntohs(chdlc->proto_code);
break;
@@ -3021,7 +3069,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
if (packet[0] == 0x0f || packet[0] == 0x8f)
{
- struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset];
+ struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset];
*ip_offset = sizeof(struct ndpi_chdlc); /* CHDLC_OFF = 4 */
*layer3_type = ntohs(chdlc->proto_code);
}
@@ -3059,7 +3107,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
}
struct ndpi_radiotap_header const * const radiotap =
- (struct ndpi_radiotap_header const * const)&packet[eth_offset];
+ (struct ndpi_radiotap_header const * const) & packet[eth_offset];
uint16_t radio_len = radiotap->len;
/* Check Bad FCS presence */
@@ -3808,7 +3856,7 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->flow_allocation_already_failed = 0;
workflow->total_active_flows++;
- flow_to_process->flow_extended.flow_id = __sync_fetch_and_add(&global_flow_id, 1);
+ flow_to_process->flow_extended.flow_id = MT_GET_AND_ADD(global_flow_id, 1);
if (alloc_detection_data(flow_to_process) != 0)
{
@@ -4120,10 +4168,10 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
{
case PCAP_ERROR:
logger(1, "Error while reading pcap file: '%s'", pcap_geterr(reader_thread->workflow->pcap_handle));
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return;
case PCAP_ERROR_BREAK:
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return;
default:
return;
@@ -4136,7 +4184,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0)
{
logger(1, "pthread_sigmask: %s", strerror(errno));
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return;
}
@@ -4147,7 +4195,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if (signal_fd < 0)
{
logger(1, "signalfd: %s", strerror(errno));
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return;
}
@@ -4155,7 +4203,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if (pcap_fd < 0)
{
logger(1, "%s", "Got an invalid PCAP fd");
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return;
}
@@ -4163,7 +4211,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if (epoll_fd < 0)
{
logger(1, "Got an invalid epoll fd: %s", strerror(errno));
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return;
}
@@ -4174,14 +4222,14 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pcap_fd, &event) != 0)
{
logger(1, "Could not add pcap fd %d to epoll fd %d: %s", pcap_fd, epoll_fd, strerror(errno));
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return;
}
event.data.fd = signal_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &event) != 0)
{
logger(1, "Could not add signal fd %d to epoll fd %d: %s", signal_fd, epoll_fd, strerror(errno));
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return;
}
@@ -4190,7 +4238,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
int const timeout_ms = 1000; /* TODO: Configurable? */
int nready;
struct timeval tval_before_epoll, tval_after_epoll;
- while (__sync_fetch_and_add(&nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0)
+ while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0)
{
get_current_time(&tval_before_epoll);
errno = 0;
@@ -4202,7 +4250,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
continue;
}
logger(1, "Epoll returned error: %s", strerror(errno));
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
break;
}
@@ -4224,7 +4272,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
if ((events[i].events & EPOLLERR) != 0)
{
logger(1, "%s", "Epoll error event");
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
}
if (events[i].data.fd == signal_fd)
@@ -4267,10 +4315,10 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
logger(1,
"Error while reading from pcap device: '%s'",
pcap_geterr(reader_thread->workflow->pcap_handle));
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
break;
case PCAP_ERROR_BREAK:
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return;
default:
break;
@@ -4316,7 +4364,7 @@ static void * processing_thread(void * const ndpi_thread_arg)
run_pcap_loop(reader_thread);
set_collector_block(reader_thread);
- __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
+ MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
return NULL;
}
@@ -4324,7 +4372,7 @@ static int processing_threads_error_or_eof(void)
{
for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i)
{
- if (__sync_fetch_and_add(&reader_threads[i].workflow->error_or_eof, 0) == 0)
+ if (MT_GET_AND_ADD(reader_threads[i].workflow->error_or_eof, 0) == 0)
{
return 0;
}
@@ -4557,9 +4605,9 @@ static void sighandler(int signum)
{
(void)signum;
- if (__sync_fetch_and_add(&nDPId_main_thread_shutdown, 0) == 0)
+ if (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0)
{
- __sync_fetch_and_add(&nDPId_main_thread_shutdown, 1);
+ MT_GET_AND_ADD(nDPId_main_thread_shutdown, 1);
}
}
@@ -5096,7 +5144,7 @@ int main(int argc, char ** argv)
signal(SIGTERM, sighandler);
signal(SIGPIPE, SIG_IGN);
- while (__sync_fetch_and_add(&nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0)
+ while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0)
{
sleep(1);
}