diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2022-03-13 02:28:10 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-03-13 02:28:10 +0100 |
commit | ed1647b9446f84d81d41e8e28ccf063eff97b2f7 (patch) | |
tree | 7f22929aca611955ea129dc0afee839bb63872bf /nDPId-test.c | |
parent | dd35d9da3fd43f1091b8ec496ec25d72e54d8e22 (diff) |
Disconnect nDPIsrvd clients immediately instead waiting for a failed write().
* nDPIsrvd: Collector/Distributor logging improved
* nDPIsrvd: Command line option for max remote descriptors
* nDPId: Stop spamming nDPIsrvd Collector with the same events over and over again
* nDPId: Refactored some variable names and events
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId-test.c')
-rw-r--r-- | nDPId-test.c | 74 |
1 files changed, 61 insertions, 13 deletions
diff --git a/nDPId-test.c b/nDPId-test.c index 99630f018..c28aa41e1 100644 --- a/nDPId-test.c +++ b/nDPId-test.c @@ -21,8 +21,11 @@ enum PIPE_NULL_WRITE = 1, /* Distributor (data from nDPIsrvd) write */ PIPE_NULL_READ = 0, /* Distributor (print to stdout) read */ + PIPE_ARPA_WRITE = 1, /* Distributor (data from nDPIsrvd) write */ + PIPE_ARPA_READ = 0, /* Distributor (IP mockup) read */ + PIPE_FDS = 2, - MAX_REMOTE_DESCRIPTORS = 3 /* mock pipefd's + 2 * distributor pipefd's */ + MAX_REMOTE_DESCRIPTORS = 4 /* mock pipefd's + 2 * distributor pipefd's */ }; struct thread_return_value @@ -97,6 +100,7 @@ struct distributor_return_value static int mock_pipefds[PIPE_FDS] = {}; static int mock_testfds[PIPE_FDS] = {}; static int mock_nullfds[PIPE_FDS] = {}; +static int mock_arpafds[PIPE_FDS] = {}; static pthread_mutex_t nDPId_start_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t nDPIsrvd_start_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t distributor_start_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -144,6 +148,7 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) struct remote_desc * mock_json_desc = NULL; struct remote_desc * mock_test_desc = NULL; struct remote_desc * mock_null_desc = NULL; + struct remote_desc * mock_arpa_desc = NULL; struct epoll_event events[32]; size_t const events_size = sizeof(events) / sizeof(events[0]); @@ -153,31 +158,40 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) THREAD_ERROR_GOTO(arg); } - mock_json_desc = get_unused_remote_descriptor(COLLECTOR_UN, mock_pipefds[PIPE_nDPIsrvd], NETWORK_BUFFER_MAX_SIZE); + mock_json_desc = get_remote_descriptor(COLLECTOR_UN, mock_pipefds[PIPE_nDPIsrvd], NETWORK_BUFFER_MAX_SIZE); if (mock_json_desc == NULL) { logger(1, "%s", "nDPIsrvd could not acquire remote descriptor (Collector)"); THREAD_ERROR_GOTO(arg); } - mock_test_desc = - get_unused_remote_descriptor(DISTRIBUTOR_UN, mock_testfds[PIPE_TEST_WRITE], NETWORK_BUFFER_MAX_SIZE / 4); + mock_test_desc = get_remote_descriptor(DISTRIBUTOR_UN, mock_testfds[PIPE_TEST_WRITE], NETWORK_BUFFER_MAX_SIZE / 4); if (mock_test_desc == NULL) { logger(1, "%s", "nDPIsrvd could not acquire remote descriptor (TEST Distributor)"); THREAD_ERROR_GOTO(arg); } - mock_null_desc = - get_unused_remote_descriptor(DISTRIBUTOR_UN, mock_nullfds[PIPE_NULL_WRITE], NETWORK_BUFFER_MAX_SIZE); + mock_null_desc = get_remote_descriptor(DISTRIBUTOR_UN, mock_nullfds[PIPE_NULL_WRITE], NETWORK_BUFFER_MAX_SIZE); if (mock_null_desc == NULL) { logger(1, "%s", "nDPIsrvd could not acquire remote descriptor (NULL Distributor)"); THREAD_ERROR_GOTO(arg); } + mock_arpa_desc = get_remote_descriptor(DISTRIBUTOR_IN, mock_arpafds[PIPE_ARPA_WRITE], NETWORK_BUFFER_MAX_SIZE / 8); + if (mock_arpa_desc == NULL) + { + logger(1, "%s", "nDPIsrvd could not acquire remote descriptor (ARPA Distributor)"); + THREAD_ERROR_GOTO(arg); + } + strncpy(mock_arpa_desc->event_distributor_in.peer_addr, + "arpa-mockup", + sizeof(mock_arpa_desc->event_distributor_in.peer_addr)); + mock_arpa_desc->event_distributor_in.peer.sin_port = 0; + if (add_in_event(epollfd, mock_json_desc) != 0 || add_in_event(epollfd, mock_test_desc) != 0 || - add_in_event(epollfd, mock_null_desc) != 0) + add_in_event(epollfd, mock_null_desc) != 0 || add_in_event(epollfd, mock_arpa_desc) != 0) { logger(1, "%s", "nDPIsrvd add input event failed"); THREAD_ERROR_GOTO(arg); @@ -197,7 +211,7 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) for (int i = 0; i < nready; i++) { if (events[i].data.ptr == mock_json_desc || events[i].data.ptr == mock_test_desc || - events[i].data.ptr == mock_null_desc) + events[i].data.ptr == mock_null_desc || events[i].data.ptr == mock_arpa_desc) { if (handle_data_event(epollfd, &events[i]) != 0) { @@ -224,13 +238,19 @@ error: { drain_cache_blocking(mock_null_desc); } + if (mock_arpa_desc != NULL) + { + drain_cache_blocking(mock_arpa_desc); + } del_event(epollfd, mock_pipefds[PIPE_nDPIsrvd]); del_event(epollfd, mock_testfds[PIPE_TEST_WRITE]); del_event(epollfd, mock_nullfds[PIPE_NULL_WRITE]); + del_event(epollfd, mock_arpafds[PIPE_ARPA_WRITE]); close(mock_pipefds[PIPE_nDPIsrvd]); close(mock_testfds[PIPE_TEST_WRITE]); close(mock_nullfds[PIPE_NULL_WRITE]); + close(mock_arpafds[PIPE_ARPA_WRITE]); close(epollfd); return NULL; @@ -469,7 +489,7 @@ static void * distributor_client_mainloop_thread(void * const arg) { int dis_epollfd = create_evq(); int signalfd = setup_signalfd(dis_epollfd); - int pipe_read_finished = 0, null_read_finished = 0; + int pipe_read_finished = 0, null_read_finished = 0, arpa_read_finished = 0; struct epoll_event events[32]; size_t const events_size = sizeof(events) / sizeof(events[0]); struct distributor_return_value * const drv = (struct distributor_return_value *)arg; @@ -502,12 +522,17 @@ static void * distributor_client_mainloop_thread(void * const arg) THREAD_ERROR_GOTO(trv); } + if (add_in_event_fd(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]) != 0) + { + THREAD_ERROR_GOTO(trv); + } + stats = (struct distributor_global_user_data *)mock_sock->global_user_data; stats->json_string_len_min = (unsigned long long int)-1; pthread_mutex_lock(&distributor_start_mutex); - while (pipe_read_finished == 0 || null_read_finished == 0) + while (pipe_read_finished == 0 || null_read_finished == 0 || arpa_read_finished == 0) { int nready = epoll_wait(dis_epollfd, events, events_size, -1); if (nready < 0 && errno != EINTR) @@ -579,6 +604,27 @@ static void * distributor_client_mainloop_thread(void * const arg) printf("%.*s", (int)bytes_read, buf); } + else if (events[i].data.fd == mock_arpafds[PIPE_ARPA_READ]) + { + char buf[NETWORK_BUFFER_MAX_SIZE]; + ssize_t bytes_read = read(mock_arpafds[PIPE_ARPA_READ], buf, sizeof(buf)); + if (bytes_read < 0) + { + logger(1, "Read fd returned an error: %s", strerror(errno)); + THREAD_ERROR_GOTO(trv); + } + if (bytes_read == 0) + { + del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]); + arpa_read_finished = 1; + continue; + } + + /* + * Nothing to do .. ? + * I am just here to trigger some IP code paths. + */ + } else if (events[i].data.fd == signalfd) { struct signalfd_siginfo fdsi; @@ -627,6 +673,7 @@ error: del_event(dis_epollfd, signalfd); del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]); del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]); + del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]); close(dis_epollfd); close(signalfd); nDPIsrvd_socket_free(&mock_sock); @@ -646,8 +693,8 @@ static void * nDPId_mainloop_thread(void * const arg) } /* Replace nDPId JSON socket fd with the one in our pipe and hope that no socket specific code-path triggered. */ - reader_threads[0].json_sockfd = mock_pipefds[PIPE_nDPId]; - reader_threads[0].json_sock_reconnect = 0; + reader_threads[0].collector_sockfd = mock_pipefds[PIPE_nDPId]; + reader_threads[0].collector_sock_reconnect = 0; pthread_mutex_lock(&nDPId_start_mutex); @@ -758,7 +805,8 @@ int main(int argc, char ** argv) return 1; } - if (setup_pipe(mock_pipefds) != 0 || setup_pipe(mock_testfds) != 0 || setup_pipe(mock_nullfds) != 0) + if (setup_pipe(mock_pipefds) != 0 || setup_pipe(mock_testfds) != 0 || setup_pipe(mock_nullfds) != 0 || + setup_pipe(mock_arpafds) != 0) { return 1; } |