diff options
-rw-r--r-- | nDPId-test.c | 279 | ||||
-rw-r--r-- | nDPId.c | 11 | ||||
-rw-r--r-- | nDPIsrvd.c | 2 |
3 files changed, 214 insertions, 78 deletions
diff --git a/nDPId-test.c b/nDPId-test.c index b125d5e99..71f2c8cba 100644 --- a/nDPId-test.c +++ b/nDPId-test.c @@ -7,7 +7,7 @@ static void nDPIsrvd_memprof_log(char const * const format, ...); static void nDPIsrvd_memprof_log_alloc(size_t alloc_size); static void nDPIsrvd_memprof_log_free(size_t free_size); -//#define DO_MEMORY_LOGGING 1 +//#define VERBOSE_MEMORY_PROFILING 1 #define NO_MAIN 1 #include "utils.c" #include "nDPIsrvd.c" @@ -99,6 +99,8 @@ struct distributor_global_user_data unsigned long long int flow_detection_update_count; unsigned long long int flow_update_count; + unsigned long long int shutdown_events; + unsigned long long int json_string_len_min; unsigned long long int json_string_len_max; double json_string_len_avg; @@ -132,17 +134,30 @@ struct distributor_return_value struct distributor_global_user_data stats; }; +#define TC_INIT(initial, wanted) \ + { \ + .mutex = PTHREAD_MUTEX_INITIALIZER, .condition = PTHREAD_COND_INITIALIZER, .value = initial, \ + .wanted_value = wanted \ + } +struct thread_condition +{ + pthread_mutex_t mutex; + pthread_cond_t condition; + int value; + int wanted_value; +}; + static int mock_pipefds[PIPE_FDS] = {}; static int mock_testfds[PIPE_FDS] = {}; static int mock_bufffds[PIPE_FDS] = {}; static int mock_nullfds[PIPE_FDS] = {}; static int mock_arpafds[PIPE_FDS] = {}; -static pthread_mutex_t nDPId_start_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_mutex_t nDPIsrvd_start_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_mutex_t distributor_start_mutex = PTHREAD_MUTEX_INITIALIZER; -#ifdef DO_MEMORY_LOGGING +static struct thread_condition start_condition = TC_INIT(3, 0); +#ifdef VERBOSE_MEMORY_PROFILING static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; #endif +static pthread_mutex_t mem_mutex = PTHREAD_MUTEX_INITIALIZER; // required; memory wrappers are used from two threads + // (distributor and nDPIsrvd) static unsigned long long int nDPIsrvd_alloc_count = 0; static unsigned long long int nDPIsrvd_alloc_bytes = 0; static unsigned long long int nDPIsrvd_free_count = 0; @@ -162,7 +177,7 @@ static unsigned long long int nDPIsrvd_free_bytes = 0; static void nDPIsrvd_memprof_log(char const * const format, ...) { -#ifdef DO_MEMORY_LOGGING +#ifdef VERBOSE_MEMORY_PROFILING int logbuf_used, logbuf_used_tmp; char logbuf[BUFSIZ]; va_list ap; @@ -188,16 +203,60 @@ static void nDPIsrvd_memprof_log(char const * const format, ...) void nDPIsrvd_memprof_log_alloc(size_t alloc_size) { + unsigned long alloc_count; + + // nDPIsrvd.h is used by client applications and nDPIsrvd (two threads!) + pthread_mutex_lock(&mem_mutex); nDPIsrvd_alloc_count++; nDPIsrvd_alloc_bytes += alloc_size; - // nDPIsrvd_memprof_log("nDPIsrvd.h: malloc #%llu, %llu bytes", nDPIsrvd_alloc_count, alloc_size); + alloc_count = nDPIsrvd_alloc_count; + pthread_mutex_unlock(&mem_mutex); + nDPIsrvd_memprof_log("nDPIsrvd.h: malloc #%llu, %llu bytes", alloc_count, alloc_size); } void nDPIsrvd_memprof_log_free(size_t free_size) { + unsigned long free_count; + + // nDPIsrvd.h is used by client applications and nDPIsrvd (two threads!) + pthread_mutex_lock(&mem_mutex); nDPIsrvd_free_count++; nDPIsrvd_free_bytes += free_size; - // nDPIsrvd_memprof_log("nDPIsrvd.h: free #%llu, %llu bytes", nDPIsrvd_free_count, free_size); + free_count = nDPIsrvd_free_count; + pthread_mutex_unlock(&mem_mutex); + nDPIsrvd_memprof_log("nDPIsrvd.h: free #%llu, %llu bytes", free_count, free_size); +} + +static int thread_wait(struct thread_condition * const tc) +{ + int ret = 0; + + ret |= (pthread_mutex_lock(&tc->mutex) << 16); + while (tc->value > tc->wanted_value) + { + ret |= (pthread_cond_wait(&tc->condition, &tc->mutex) << 8); + } + ret |= (pthread_mutex_unlock(&tc->mutex)); + + return ret; +} + +static int thread_signal(struct thread_condition * const tc) +{ + int ret = 0; + + ret |= (pthread_mutex_lock(&tc->mutex) << 16); + if (tc->value > tc->wanted_value) + { + tc->value--; + } + if (tc->value == tc->wanted_value) + { + ret |= (pthread_cond_broadcast(&tc->condition) << 8); + } + ret |= (pthread_mutex_unlock(&tc->mutex)); + + return ret; } static int setup_pipe(int pipefd[PIPE_FDS]) @@ -207,12 +266,23 @@ static int setup_pipe(int pipefd[PIPE_FDS]) return -1; } + if (fcntl_add_flags(pipefd[0], O_NONBLOCK) != 0) + { + return -1; + } + + if (fcntl_add_flags(pipefd[1], O_NONBLOCK) != 0) + { + return -1; + } + return 0; } static void * nDPIsrvd_mainloop_thread(void * const arg) { - int nDPIsrvd_shutdown = 0; + int nDPIsrvd_distributor_disconnects = 0; + int const nDPIsrvd_distributor_expected_disconnects = 5; int epollfd; struct remote_desc * mock_json_desc = NULL; struct remote_desc * mock_test_desc = NULL; @@ -278,9 +348,10 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) THREAD_ERROR_GOTO(arg); } - pthread_mutex_lock(&nDPIsrvd_start_mutex); + thread_signal(&start_condition); + thread_wait(&start_condition); - while (nDPIsrvd_shutdown == 0) + while (nDPIsrvd_distributor_disconnects < nDPIsrvd_distributor_expected_disconnects) { errno = 0; int nready = epoll_wait(epollfd, events, events_size, -1); @@ -303,6 +374,16 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) if (remote == mock_json_desc) { remote_desc_name = "Mock JSON"; + do { + if (mock_test_desc->fd >= 0) + drain_write_buffers_blocking(mock_test_desc); + if (mock_buff_desc->fd >= 0) + drain_write_buffers_blocking(mock_buff_desc); + if (mock_null_desc->fd >= 0) + drain_write_buffers_blocking(mock_null_desc); + if (mock_arpa_desc->fd >= 0) + drain_write_buffers_blocking(mock_arpa_desc); + } while (handle_data_event(epollfd, &events[i]) == 0); } else if (remote == mock_test_desc) { @@ -324,14 +405,21 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) { remote_desc_name = "UNKNOWN"; } - logger(1, "nDPIsrvd distributor '%s' connection closed", remote_desc_name); - handle_data_event(epollfd, &events[i]); - nDPIsrvd_shutdown++; + nDPIsrvd_distributor_disconnects++; + logger(1, + "nDPIsrvd distributor '%s' connection closed (%d/%d)", + remote_desc_name, + nDPIsrvd_distributor_disconnects, + nDPIsrvd_distributor_expected_disconnects); + free_remote(epollfd, remote); } - else if (handle_data_event(epollfd, &events[i]) != 0) + else { - logger(1, "nDPIsrvd data event handler failed for distributor %d", events[i].data.fd); - THREAD_ERROR_GOTO(arg); + if (handle_data_event(epollfd, &events[i]) != 0) + { + logger(1, "%s", "nDPIsrvd data event handler failed"); + THREAD_ERROR_GOTO(arg); + } } } else @@ -346,27 +434,10 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) } error: - if (mock_test_desc != NULL) - { - drain_write_buffers_blocking(mock_test_desc); - } - if (mock_buff_desc != NULL) - { - drain_write_buffers_blocking(mock_buff_desc); - } - if (mock_null_desc != NULL) - { - drain_write_buffers_blocking(mock_null_desc); - } - if (mock_arpa_desc != NULL) - { - drain_write_buffers_blocking(mock_arpa_desc); - } - - pthread_mutex_lock(&nDPIsrvd_start_mutex); free_remotes(epollfd); close(epollfd); + logger(0, "%s", "nDPIsrvd worker thread exits.."); return NULL; } @@ -504,8 +575,8 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s global_stats->total_events_serialized = nmb; } - pthread_mutex_unlock(&nDPIsrvd_start_mutex); - pthread_mutex_unlock(&nDPId_start_mutex); + logger(0, "%s", "Distributor received shutdown event.."); + global_stats->shutdown_events++; } } } @@ -610,8 +681,7 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s return CALLBACK_OK; callback_error: - pthread_mutex_unlock(&nDPIsrvd_start_mutex); - pthread_mutex_unlock(&nDPId_start_mutex); + logger(1, "%s", "Distributor error.."); return CALLBACK_ERROR; } @@ -719,11 +789,40 @@ static enum nDPIsrvd_callback_return distributor_json_mock_buff_callback( return distributor_json_callback(sock, instance, thread_data, flow); } +static enum nDPIsrvd_callback_return distributor_json_printer(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_thread_data * const thread_data, + struct nDPIsrvd_flow * const flow) +{ + (void)instance; + (void)thread_data; + (void)flow; + + { + struct nDPIsrvd_json_token const * const daemon_event_name = TOKEN_GET_SZ(sock, "daemon_event_name"); + + if (daemon_event_name != NULL) + { + if (TOKEN_VALUE_EQUALS_SZ(sock, daemon_event_name, "shutdown") != 0) + { + logger(0, "%s", "Distributor received shutdown event.."); + int * const mock_null_shutdown_events = (int *)sock->global_user_data; + (*mock_null_shutdown_events)++; + } + } + } + + printf("%0" NETWORK_BUFFER_LENGTH_DIGITS_STR "llu%.*s", + sock->buffer.json_string_length - NETWORK_BUFFER_LENGTH_DIGITS, + nDPIsrvd_json_buffer_length(sock), + nDPIsrvd_json_buffer_string(sock)); + return CALLBACK_OK; +} + static void * distributor_client_mainloop_thread(void * const arg) { int dis_epollfd = create_evq(); int signalfd = setup_signalfd(dis_epollfd); - int pipe_read_finished = 0, buff_read_finished = 0, null_read_finished = 0, arpa_read_finished = 0; struct epoll_event events[32]; size_t const events_size = sizeof(events) / sizeof(events[0]); struct distributor_return_value * const drv = (struct distributor_return_value *)arg; @@ -742,17 +841,21 @@ static void * distributor_client_mainloop_thread(void * const arg) distributor_json_mock_buff_callback, distributor_instance_cleanup_callback, distributor_flow_cleanup_callback); + struct nDPIsrvd_socket * mock_null = + nDPIsrvd_socket_init(sizeof(int), 0, 0, 0, distributor_json_printer, NULL, NULL); struct distributor_global_user_data * sock_stats; struct distributor_global_user_data * buff_stats; + int * mock_null_shutdown_events; errno = 0; - if (mock_sock == NULL || mock_buff == NULL) + if (mock_sock == NULL || mock_buff == NULL || mock_null == NULL) { THREAD_ERROR_GOTO(trv); } mock_sock->fd = mock_testfds[PIPE_TEST_READ]; mock_buff->fd = mock_bufffds[PIPE_BUFFER_READ]; + mock_null->fd = mock_nullfds[PIPE_NULL_READ]; if (dis_epollfd < 0 || signalfd < 0) { @@ -789,10 +892,13 @@ static void * distributor_client_mainloop_thread(void * const arg) buff_stats = (struct distributor_global_user_data *)mock_buff->global_user_data; buff_stats->json_string_len_min = (unsigned long long int)-1; buff_stats->options.do_hash_checks = 0; + mock_null_shutdown_events = (int *)mock_null->global_user_data; + *mock_null_shutdown_events = 0; - pthread_mutex_lock(&distributor_start_mutex); + thread_signal(&start_condition); + thread_wait(&start_condition); - while (pipe_read_finished == 0 || buff_read_finished == 0 || null_read_finished == 0 || arpa_read_finished == 0) + while (sock_stats->shutdown_events == 0 || buff_stats->shutdown_events == 0 || *mock_null_shutdown_events == 0) { int nready = epoll_wait(dis_epollfd, events, events_size, -1); if (nready < 0 && errno != EINTR) @@ -811,6 +917,7 @@ static void * distributor_client_mainloop_thread(void * const arg) if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0) { logger(1, "Distributor disconnected: %d", events[i].data.fd); + del_event(dis_epollfd, events[i].data.fd); } if (events[i].data.fd == mock_testfds[PIPE_TEST_READ]) @@ -825,8 +932,6 @@ static void * distributor_client_mainloop_thread(void * const arg) logger(1, "Read and verify fd returned an error: %s", strerror(errno)); THREAD_ERROR_GOTO(trv); case READ_PEER_DISCONNECT: - del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]); - pipe_read_finished = 1; break; } @@ -862,8 +967,6 @@ static void * distributor_client_mainloop_thread(void * const arg) logger(1, "Read and verify fd returned an error: %s", strerror(errno)); THREAD_ERROR_GOTO(trv); case READ_PEER_DISCONNECT: - del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]); - buff_read_finished = 1; break; } @@ -889,21 +992,32 @@ static void * distributor_client_mainloop_thread(void * const arg) } else if (events[i].data.fd == mock_nullfds[PIPE_NULL_READ]) { - /* Read all data from the pipe, but do nothing else. */ - char buf[NETWORK_BUFFER_MAX_SIZE]; - ssize_t bytes_read = read(mock_nullfds[PIPE_NULL_READ], buf, sizeof(buf)); - if (bytes_read < 0) + switch (nDPIsrvd_read(mock_null)) { - logger(1, "Read and print to stdout fd returned an error: %s", strerror(errno)); - THREAD_ERROR_GOTO(trv); + case READ_OK: + break; + case READ_LAST_ENUM_VALUE: + case READ_ERROR: + case READ_TIMEOUT: + logger(1, "Read and verify fd returned an error: %s", strerror(errno)); + THREAD_ERROR_GOTO(trv); + case READ_PEER_DISCONNECT: + break; } - if (bytes_read == 0) + + enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse_all(mock_null); + if (parse_ret != PARSE_NEED_MORE_DATA) { - del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]); - null_read_finished = 1; + logger(1, "JSON parsing failed: %s", nDPIsrvd_enum_to_string(parse_ret)); + logger(1, + "Problematic JSON string (buff sock, start: %zu, length: %llu, buffer usage: %zu): %.*s", + mock_null->buffer.json_string_start, + mock_null->buffer.json_string_length, + mock_null->buffer.buf.used, + (int)mock_null->buffer.json_string_length, + mock_null->buffer.json_string); + THREAD_ERROR_GOTO(trv); } - - printf("%.*s", (int)bytes_read, buf); } else if (events[i].data.fd == mock_arpafds[PIPE_ARPA_READ]) { @@ -914,11 +1028,6 @@ static void * distributor_client_mainloop_thread(void * const arg) logger(1, "Read fd returned an error: %s", strerror(errno)); THREAD_ERROR_GOTO(trv); } - if (bytes_read == 0) - { - del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]); - arpa_read_finished = 1; - } /* * Nothing to do .. ? @@ -963,7 +1072,9 @@ static void * distributor_client_mainloop_thread(void * const arg) { HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp) { - logger(1, "Active flow found during client distributor shutdown with id: %llu", current_flow->id_as_ull); + logger(1, + "[Mock Sock] Active flow found during client distributor shutdown with id: %llu", + current_flow->id_as_ull); errno = 0; THREAD_ERROR(trv); } @@ -974,7 +1085,9 @@ static void * distributor_client_mainloop_thread(void * const arg) { HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp) { - logger(1, "Active flow found during client distributor shutdown with id: %llu", current_flow->id_as_ull); + logger(1, + "[Mock Buff] Active flow found during client distributor shutdown with id: %llu", + current_flow->id_as_ull); errno = 0; THREAD_ERROR(trv); } @@ -1002,17 +1115,36 @@ static void * distributor_client_mainloop_thread(void * const arg) } drv->stats = *sock_stats; + if (sock_stats->shutdown_events != 1 || buff_stats->shutdown_events != 1 || *mock_null_shutdown_events != 1) + { + logger(1, + "Unexpected amount of shutdown events received, expected 1 per nDPIsrvd socket, got (Sock/Buff/NULL): " + "%llu/%llu/%d", + sock_stats->shutdown_events, + buff_stats->shutdown_events, + *mock_null_shutdown_events); + errno = 0; + THREAD_ERROR(trv); + } + error: del_event(dis_epollfd, signalfd); del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]); del_event(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]); del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]); del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]); + close(mock_testfds[PIPE_TEST_READ]); + close(mock_bufffds[PIPE_BUFFER_READ]); + close(mock_nullfds[PIPE_NULL_READ]); + close(mock_arpafds[PIPE_ARPA_READ]); close(dis_epollfd); close(signalfd); + nDPIsrvd_socket_free(&mock_sock); nDPIsrvd_socket_free(&mock_buff); + nDPIsrvd_socket_free(&mock_null); + logger(0, "%s", "Distributor worker thread exits.."); return NULL; } @@ -1035,7 +1167,8 @@ static void * nDPId_mainloop_thread(void * const arg) goto error; } - pthread_mutex_lock(&nDPId_start_mutex); + thread_signal(&start_condition); + thread_wait(&start_condition); jsonize_daemon(&reader_threads[0], DAEMON_EVENT_INIT); /* restore SIGPIPE to the default handler (Termination) */ @@ -1073,10 +1206,10 @@ static void * nDPId_mainloop_thread(void * const arg) } error: - pthread_mutex_lock(&nDPId_start_mutex); free_reader_threads(); close(mock_pipefds[PIPE_nDPId]); + logger(0, "%s", "nDPId worker thread exits.."); return NULL; } @@ -1169,11 +1302,6 @@ int main(int argc, char ** argv) return 1; } - /* Start processing after all threads started and initialized. */ - pthread_mutex_lock(&nDPId_start_mutex); - pthread_mutex_lock(&nDPIsrvd_start_mutex); - pthread_mutex_lock(&distributor_start_mutex); - pthread_t nDPId_thread; struct nDPId_return_value nDPId_return = {}; if (pthread_create(&nDPId_thread, NULL, nDPId_mainloop_thread, &nDPId_return) != 0) @@ -1195,10 +1323,6 @@ int main(int argc, char ** argv) return 1; } - pthread_mutex_unlock(&nDPIsrvd_start_mutex); - pthread_mutex_unlock(&distributor_start_mutex); - pthread_mutex_unlock(&nDPId_start_mutex); - /* Try to gracefully shutdown all threads. */ while (thread_wait_for_termination(distributor_thread, 1, &distributor_return.thread_return_value) == 0) { @@ -1224,6 +1348,8 @@ int main(int argc, char ** argv) } } + logger(0, "%s", "All worker threads terminated.."); + if (THREADS_RETURNED_ERROR() != 0) { char const * which_thread = "Unknown"; @@ -1356,6 +1482,7 @@ int main(int argc, char ** argv) logger(1, "%s: %s", argv[0], "nDPIsrvd.h memory leak detected."); logger(1, "%s: Allocated / Free'd bytes: %llu / %llu", argv[0], nDPIsrvd_alloc_bytes, nDPIsrvd_free_bytes); logger(1, "%s: Allocated / Free'd count: %llu / %llu", argv[0], nDPIsrvd_alloc_count, nDPIsrvd_free_count); + return 1; } if (nDPId_return.cur_active_flows != 0 || nDPId_return.cur_idle_flows != 0) @@ -2221,7 +2221,7 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, (int)json_str_len, json_str); - if (s_ret < 0 || s_ret == (int)sizeof(newline_json_str)) + if (s_ret < 0 || s_ret >= (int)sizeof(newline_json_str)) { logger(1, "[%8llu, %zu] JSON buffer prepare failed: snprintf returned %d, buffer size %zu", @@ -2229,6 +2229,15 @@ static void send_to_collector(struct nDPId_reader_thread * const reader_thread, reader_thread->array_index, s_ret, sizeof(newline_json_str)); + if (s_ret >= (int)sizeof(newline_json_str)) + { + logger(1, + "[%8llu, %zu] JSON string: %.*s...", + workflow->packets_captured, + reader_thread->array_index, + ndpi_min(512, NETWORK_BUFFER_MAX_SIZE), + newline_json_str); + } return; } diff --git a/nDPIsrvd.c b/nDPIsrvd.c index e57e2625a..6ac6894c9 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -1452,8 +1452,8 @@ static int mainloop(int epollfd) } } - close(signalfd); free_remotes(epollfd); + close(signalfd); return 0; } |