diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2022-09-17 18:27:17 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-09-17 18:27:17 +0200 |
commit | efaa76e9786b64af644c338e47497e9aa18eb41e (patch) | |
tree | cb74fb381811ed56859cc809faaab5fdb3fd0e40 /nDPId.c | |
parent | b3e9af495c79f6c8c68c0eb36df67dfa5ede4e16 (diff) |
Provide thread sync via locking on architectures that do not support Compare&Swap.
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId.c')
-rw-r--r-- | nDPId.c | 156 |
1 files changed, 102 insertions, 54 deletions
@@ -53,10 +53,6 @@ #error "nDPI >= 4.4.0 or API version >= 6336 required" #endif -#if !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) || !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8) -#error "Compare and Swap aka __sync_fetch_and_add not available on your platform!" -#endif - #if nDPId_MAX_READER_THREADS <= 0 #error "Invalid value for nDPId_MAX_READER_THREADS" #endif @@ -66,6 +62,56 @@ #error "Invalid value for nDPId_FLOW_SCAN_INTERVAL" #endif +/* MIPS* does not support Compare and Swap. Use traditional locking as fallback. */ +#if !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) || !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8) +#define MT_VALUE(name, type) \ + struct name \ + { \ + volatile uint64_t var; \ + pthread_mutex_t var_mutex; \ + } name +#define MT_INIT(value) \ + { \ + value, PTHREAD_MUTEX_INITIALIZER \ + } +#define MT_INIT2(name, value) \ + do \ + { \ + name.var = value; \ + pthread_mutex_init(&name.var_mutex, NULL); \ + } while (0) +static inline uint64_t mt_pt_get_and_add(volatile uint64_t * value, uint64_t add, pthread_mutex_t * mutex) +{ + uint64_t result; + pthread_mutex_lock(mutex); + result = *value; + *value += add; + pthread_mutex_unlock(mutex); + return result; +} +#define MT_GET_AND_ADD(name, value) mt_pt_get_and_add(&name.var, value, &name.var_mutex) +static inline uint64_t mt_pt_get_and_sub(volatile uint64_t * value, uint64_t sub, pthread_mutex_t * mutex) +{ + uint64_t result; + pthread_mutex_lock(mutex); + result = *value; + *value -= sub; + pthread_mutex_unlock(mutex); + return result; +} +#define MT_GET_AND_SUB(name, value) mt_pt_get_and_sub(&name.var, value, &name.var_mutex) +#else +#define MT_VALUE(name, type) volatile type name +#define MT_INIT(value) value +#define MT_INIT2(name, value) \ + do \ + { \ + name = value; \ + } while (0) +#define MT_GET_AND_ADD(name, value) __sync_fetch_and_add(&name, value) +#define MT_GET_AND_SUB(name, value) __sync_fetch_and_sub(&name, value) +#endif + enum nDPId_l3_type { L3_IP, @@ -200,7 +246,7 @@ struct nDPId_workflow { pcap_t * pcap_handle; - uint8_t error_or_eof; + MT_VALUE(error_or_eof, uint8_t); uint8_t is_pcap_file; uint8_t max_flow_to_track_reached : 1; @@ -375,20 +421,20 @@ static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = { static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {}; static struct nDPIsrvd_address collector_address; -static volatile int nDPId_main_thread_shutdown = 0; -static volatile uint64_t global_flow_id = 1; +static MT_VALUE(nDPId_main_thread_shutdown, int) = MT_INIT(0); +static MT_VALUE(global_flow_id, uint64_t) = MT_INIT(1); static int ip4_interface_avail = 0, ip6_interface_avail = 0; #ifdef ENABLE_MEMORY_PROFILING -static volatile uint64_t ndpi_memory_alloc_count = 0; -static volatile uint64_t ndpi_memory_alloc_bytes = 0; -static volatile uint64_t ndpi_memory_free_count = 0; -static volatile uint64_t ndpi_memory_free_bytes = 0; +static MT_VALUE(ndpi_memory_alloc_count, uint64_t) = MT_INIT(0); +static MT_VALUE(ndpi_memory_alloc_bytes, uint64_t) = MT_INIT(0); +static MT_VALUE(ndpi_memory_free_count, uint64_t) = MT_INIT(0); +static MT_VALUE(ndpi_memory_free_bytes, uint64_t) = MT_INIT(0); #ifdef ENABLE_ZLIB -static volatile uint64_t zlib_compressions = 0; -static volatile uint64_t zlib_decompressions = 0; -static volatile uint64_t zlib_compression_diff = 0; -static volatile uint64_t zlib_compression_bytes = 0; +static MT_VALUE(zlib_compressions, uint64_t) = MT_INIT(0); +static MT_VALUE(zlib_decompressions, uint64_t) = MT_INIT(0); +static MT_VALUE(zlib_compression_diff, uint64_t) = MT_INIT(0); +static MT_VALUE(zlib_compression_bytes, uint64_t) = MT_INIT(0); #endif #endif @@ -595,9 +641,9 @@ static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstL { ret = strm.total_out; #ifdef ENABLE_MEMORY_PROFILING - __sync_fetch_and_add(&zlib_compressions, 1); - __sync_fetch_and_add(&zlib_compression_diff, srcLen - ret); - __sync_fetch_and_add(&zlib_compression_bytes, ret); + MT_GET_AND_ADD(zlib_compressions, 1); + MT_GET_AND_ADD(zlib_compression_diff, srcLen - ret); + MT_GET_AND_ADD(zlib_compression_bytes, ret); #endif } else @@ -639,8 +685,8 @@ static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen) { ret = strm.total_out; #ifdef ENABLE_MEMORY_PROFILING - __sync_fetch_and_add(&zlib_decompressions, 1); - __sync_fetch_and_sub(&zlib_compression_diff, ret - srcLen); + MT_GET_AND_ADD(zlib_decompressions, 1); + MT_GET_AND_SUB(zlib_compression_diff, ret - srcLen); #endif } else @@ -1070,8 +1116,8 @@ static void * ndpi_malloc_wrapper(size_t const size) } *(uint64_t *)p = size; - __sync_fetch_and_add(&ndpi_memory_alloc_count, 1); - __sync_fetch_and_add(&ndpi_memory_alloc_bytes, size); + MT_GET_AND_ADD(ndpi_memory_alloc_count, 1); + MT_GET_AND_ADD(ndpi_memory_alloc_bytes, size); return (uint8_t *)p + sizeof(uint64_t); } @@ -1080,8 +1126,8 @@ static void ndpi_free_wrapper(void * const freeable) { void * p = (uint8_t *)freeable - sizeof(uint64_t); - __sync_fetch_and_add(&ndpi_memory_free_count, 1); - __sync_fetch_and_add(&ndpi_memory_free_bytes, *(uint64_t *)p); + MT_GET_AND_ADD(ndpi_memory_free_count, 1); + MT_GET_AND_ADD(ndpi_memory_free_bytes, *(uint64_t *)p); free(p); } @@ -1090,10 +1136,10 @@ static void log_memory_usage(struct nDPId_reader_thread const * const reader_thr { if (reader_thread->array_index == 0) { - uint64_t alloc_count = __sync_fetch_and_add(&ndpi_memory_alloc_count, 0); - uint64_t free_count = __sync_fetch_and_add(&ndpi_memory_free_count, 0); - uint64_t alloc_bytes = __sync_fetch_and_add(&ndpi_memory_alloc_bytes, 0); - uint64_t free_bytes = __sync_fetch_and_add(&ndpi_memory_free_bytes, 0); + uint64_t alloc_count = MT_GET_AND_ADD(ndpi_memory_alloc_count, 0); + uint64_t free_count = MT_GET_AND_ADD(ndpi_memory_free_count, 0); + uint64_t alloc_bytes = MT_GET_AND_ADD(ndpi_memory_alloc_bytes, 0); + uint64_t free_bytes = MT_GET_AND_ADD(ndpi_memory_free_bytes, 0); logger(0, "MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in " @@ -1106,10 +1152,10 @@ static void log_memory_usage(struct nDPId_reader_thread const * const reader_thr (long long unsigned int)(alloc_count - free_count), (long long unsigned int)(alloc_bytes - free_bytes)); #ifdef ENABLE_ZLIB - uint64_t zlib_compression_count = __sync_fetch_and_add(&zlib_compressions, 0); - uint64_t zlib_decompression_count = __sync_fetch_and_add(&zlib_decompressions, 0); - uint64_t zlib_bytes_diff = __sync_fetch_and_add(&zlib_compression_diff, 0); - uint64_t zlib_bytes_total = __sync_fetch_and_add(&zlib_compression_bytes, 0); + uint64_t zlib_compression_count = MT_GET_AND_ADD(zlib_compressions, 0); + uint64_t zlib_decompression_count = MT_GET_AND_ADD(zlib_decompressions, 0); + uint64_t zlib_bytes_diff = MT_GET_AND_ADD(zlib_compression_diff, 0); + uint64_t zlib_bytes_total = MT_GET_AND_ADD(zlib_compression_bytes, 0); logger(0, "MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu compressed blocks in use, %llu " @@ -1142,6 +1188,8 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device) return NULL; } + MT_INIT2(workflow->error_or_eof, 0); + errno = 0; if (access(file_or_device, R_OK) != 0 && errno == ENOENT) { @@ -2999,7 +3047,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre return 1; } - struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); *layer3_type = ntohs(chdlc->proto_code); break; @@ -3021,7 +3069,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre if (packet[0] == 0x0f || packet[0] == 0x8f) { - struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const) & packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); /* CHDLC_OFF = 4 */ *layer3_type = ntohs(chdlc->proto_code); } @@ -3059,7 +3107,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre } struct ndpi_radiotap_header const * const radiotap = - (struct ndpi_radiotap_header const * const)&packet[eth_offset]; + (struct ndpi_radiotap_header const * const) & packet[eth_offset]; uint16_t radio_len = radiotap->len; /* Check Bad FCS presence */ @@ -3808,7 +3856,7 @@ static void ndpi_process_packet(uint8_t * const args, workflow->flow_allocation_already_failed = 0; workflow->total_active_flows++; - flow_to_process->flow_extended.flow_id = __sync_fetch_and_add(&global_flow_id, 1); + flow_to_process->flow_extended.flow_id = MT_GET_AND_ADD(global_flow_id, 1); if (alloc_detection_data(flow_to_process) != 0) { @@ -4120,10 +4168,10 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) { case PCAP_ERROR: logger(1, "Error while reading pcap file: '%s'", pcap_geterr(reader_thread->workflow->pcap_handle)); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; case PCAP_ERROR_BREAK: - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; default: return; @@ -4136,7 +4184,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0) { logger(1, "pthread_sigmask: %s", strerror(errno)); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; } @@ -4147,7 +4195,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) if (signal_fd < 0) { logger(1, "signalfd: %s", strerror(errno)); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; } @@ -4155,7 +4203,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) if (pcap_fd < 0) { logger(1, "%s", "Got an invalid PCAP fd"); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; } @@ -4163,7 +4211,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) if (epoll_fd < 0) { logger(1, "Got an invalid epoll fd: %s", strerror(errno)); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; } @@ -4174,14 +4222,14 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pcap_fd, &event) != 0) { logger(1, "Could not add pcap fd %d to epoll fd %d: %s", pcap_fd, epoll_fd, strerror(errno)); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; } event.data.fd = signal_fd; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &event) != 0) { logger(1, "Could not add signal fd %d to epoll fd %d: %s", signal_fd, epoll_fd, strerror(errno)); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; } @@ -4190,7 +4238,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) int const timeout_ms = 1000; /* TODO: Configurable? */ int nready; struct timeval tval_before_epoll, tval_after_epoll; - while (__sync_fetch_and_add(&nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) + while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) { get_current_time(&tval_before_epoll); errno = 0; @@ -4202,7 +4250,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) continue; } logger(1, "Epoll returned error: %s", strerror(errno)); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); break; } @@ -4224,7 +4272,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) if ((events[i].events & EPOLLERR) != 0) { logger(1, "%s", "Epoll error event"); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); } if (events[i].data.fd == signal_fd) @@ -4267,10 +4315,10 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) logger(1, "Error while reading from pcap device: '%s'", pcap_geterr(reader_thread->workflow->pcap_handle)); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); break; case PCAP_ERROR_BREAK: - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; default: break; @@ -4316,7 +4364,7 @@ static void * processing_thread(void * const ndpi_thread_arg) run_pcap_loop(reader_thread); set_collector_block(reader_thread); - __sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return NULL; } @@ -4324,7 +4372,7 @@ static int processing_threads_error_or_eof(void) { for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { - if (__sync_fetch_and_add(&reader_threads[i].workflow->error_or_eof, 0) == 0) + if (MT_GET_AND_ADD(reader_threads[i].workflow->error_or_eof, 0) == 0) { return 0; } @@ -4557,9 +4605,9 @@ static void sighandler(int signum) { (void)signum; - if (__sync_fetch_and_add(&nDPId_main_thread_shutdown, 0) == 0) + if (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0) { - __sync_fetch_and_add(&nDPId_main_thread_shutdown, 1); + MT_GET_AND_ADD(nDPId_main_thread_shutdown, 1); } } @@ -5096,7 +5144,7 @@ int main(int argc, char ** argv) signal(SIGTERM, sighandler); signal(SIGPIPE, SIG_IGN); - while (__sync_fetch_and_add(&nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) + while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) { sleep(1); } |