summaryrefslogtreecommitdiff
path: root/nDPId-test.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2022-03-13 02:28:10 +0100
committerToni Uhlig <matzeton@googlemail.com>2022-03-13 02:28:10 +0100
commited1647b9446f84d81d41e8e28ccf063eff97b2f7 (patch)
tree7f22929aca611955ea129dc0afee839bb63872bf /nDPId-test.c
parentdd35d9da3fd43f1091b8ec496ec25d72e54d8e22 (diff)
Disconnect nDPIsrvd clients immediately instead waiting for a failed write().
* nDPIsrvd: Collector/Distributor logging improved * nDPIsrvd: Command line option for max remote descriptors * nDPId: Stop spamming nDPIsrvd Collector with the same events over and over again * nDPId: Refactored some variable names and events Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId-test.c')
-rw-r--r--nDPId-test.c74
1 files changed, 61 insertions, 13 deletions
diff --git a/nDPId-test.c b/nDPId-test.c
index 99630f018..c28aa41e1 100644
--- a/nDPId-test.c
+++ b/nDPId-test.c
@@ -21,8 +21,11 @@ enum
PIPE_NULL_WRITE = 1, /* Distributor (data from nDPIsrvd) write */
PIPE_NULL_READ = 0, /* Distributor (print to stdout) read */
+ PIPE_ARPA_WRITE = 1, /* Distributor (data from nDPIsrvd) write */
+ PIPE_ARPA_READ = 0, /* Distributor (IP mockup) read */
+
PIPE_FDS = 2,
- MAX_REMOTE_DESCRIPTORS = 3 /* mock pipefd's + 2 * distributor pipefd's */
+ MAX_REMOTE_DESCRIPTORS = 4 /* mock pipefd's + 2 * distributor pipefd's */
};
struct thread_return_value
@@ -97,6 +100,7 @@ struct distributor_return_value
static int mock_pipefds[PIPE_FDS] = {};
static int mock_testfds[PIPE_FDS] = {};
static int mock_nullfds[PIPE_FDS] = {};
+static int mock_arpafds[PIPE_FDS] = {};
static pthread_mutex_t nDPId_start_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t nDPIsrvd_start_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t distributor_start_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -144,6 +148,7 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
struct remote_desc * mock_json_desc = NULL;
struct remote_desc * mock_test_desc = NULL;
struct remote_desc * mock_null_desc = NULL;
+ struct remote_desc * mock_arpa_desc = NULL;
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
@@ -153,31 +158,40 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(arg);
}
- mock_json_desc = get_unused_remote_descriptor(COLLECTOR_UN, mock_pipefds[PIPE_nDPIsrvd], NETWORK_BUFFER_MAX_SIZE);
+ mock_json_desc = get_remote_descriptor(COLLECTOR_UN, mock_pipefds[PIPE_nDPIsrvd], NETWORK_BUFFER_MAX_SIZE);
if (mock_json_desc == NULL)
{
logger(1, "%s", "nDPIsrvd could not acquire remote descriptor (Collector)");
THREAD_ERROR_GOTO(arg);
}
- mock_test_desc =
- get_unused_remote_descriptor(DISTRIBUTOR_UN, mock_testfds[PIPE_TEST_WRITE], NETWORK_BUFFER_MAX_SIZE / 4);
+ mock_test_desc = get_remote_descriptor(DISTRIBUTOR_UN, mock_testfds[PIPE_TEST_WRITE], NETWORK_BUFFER_MAX_SIZE / 4);
if (mock_test_desc == NULL)
{
logger(1, "%s", "nDPIsrvd could not acquire remote descriptor (TEST Distributor)");
THREAD_ERROR_GOTO(arg);
}
- mock_null_desc =
- get_unused_remote_descriptor(DISTRIBUTOR_UN, mock_nullfds[PIPE_NULL_WRITE], NETWORK_BUFFER_MAX_SIZE);
+ mock_null_desc = get_remote_descriptor(DISTRIBUTOR_UN, mock_nullfds[PIPE_NULL_WRITE], NETWORK_BUFFER_MAX_SIZE);
if (mock_null_desc == NULL)
{
logger(1, "%s", "nDPIsrvd could not acquire remote descriptor (NULL Distributor)");
THREAD_ERROR_GOTO(arg);
}
+ mock_arpa_desc = get_remote_descriptor(DISTRIBUTOR_IN, mock_arpafds[PIPE_ARPA_WRITE], NETWORK_BUFFER_MAX_SIZE / 8);
+ if (mock_arpa_desc == NULL)
+ {
+ logger(1, "%s", "nDPIsrvd could not acquire remote descriptor (ARPA Distributor)");
+ THREAD_ERROR_GOTO(arg);
+ }
+ strncpy(mock_arpa_desc->event_distributor_in.peer_addr,
+ "arpa-mockup",
+ sizeof(mock_arpa_desc->event_distributor_in.peer_addr));
+ mock_arpa_desc->event_distributor_in.peer.sin_port = 0;
+
if (add_in_event(epollfd, mock_json_desc) != 0 || add_in_event(epollfd, mock_test_desc) != 0 ||
- add_in_event(epollfd, mock_null_desc) != 0)
+ add_in_event(epollfd, mock_null_desc) != 0 || add_in_event(epollfd, mock_arpa_desc) != 0)
{
logger(1, "%s", "nDPIsrvd add input event failed");
THREAD_ERROR_GOTO(arg);
@@ -197,7 +211,7 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
for (int i = 0; i < nready; i++)
{
if (events[i].data.ptr == mock_json_desc || events[i].data.ptr == mock_test_desc ||
- events[i].data.ptr == mock_null_desc)
+ events[i].data.ptr == mock_null_desc || events[i].data.ptr == mock_arpa_desc)
{
if (handle_data_event(epollfd, &events[i]) != 0)
{
@@ -224,13 +238,19 @@ error:
{
drain_cache_blocking(mock_null_desc);
}
+ if (mock_arpa_desc != NULL)
+ {
+ drain_cache_blocking(mock_arpa_desc);
+ }
del_event(epollfd, mock_pipefds[PIPE_nDPIsrvd]);
del_event(epollfd, mock_testfds[PIPE_TEST_WRITE]);
del_event(epollfd, mock_nullfds[PIPE_NULL_WRITE]);
+ del_event(epollfd, mock_arpafds[PIPE_ARPA_WRITE]);
close(mock_pipefds[PIPE_nDPIsrvd]);
close(mock_testfds[PIPE_TEST_WRITE]);
close(mock_nullfds[PIPE_NULL_WRITE]);
+ close(mock_arpafds[PIPE_ARPA_WRITE]);
close(epollfd);
return NULL;
@@ -469,7 +489,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
{
int dis_epollfd = create_evq();
int signalfd = setup_signalfd(dis_epollfd);
- int pipe_read_finished = 0, null_read_finished = 0;
+ int pipe_read_finished = 0, null_read_finished = 0, arpa_read_finished = 0;
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
struct distributor_return_value * const drv = (struct distributor_return_value *)arg;
@@ -502,12 +522,17 @@ static void * distributor_client_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(trv);
}
+ if (add_in_event_fd(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]) != 0)
+ {
+ THREAD_ERROR_GOTO(trv);
+ }
+
stats = (struct distributor_global_user_data *)mock_sock->global_user_data;
stats->json_string_len_min = (unsigned long long int)-1;
pthread_mutex_lock(&distributor_start_mutex);
- while (pipe_read_finished == 0 || null_read_finished == 0)
+ while (pipe_read_finished == 0 || null_read_finished == 0 || arpa_read_finished == 0)
{
int nready = epoll_wait(dis_epollfd, events, events_size, -1);
if (nready < 0 && errno != EINTR)
@@ -579,6 +604,27 @@ static void * distributor_client_mainloop_thread(void * const arg)
printf("%.*s", (int)bytes_read, buf);
}
+ else if (events[i].data.fd == mock_arpafds[PIPE_ARPA_READ])
+ {
+ char buf[NETWORK_BUFFER_MAX_SIZE];
+ ssize_t bytes_read = read(mock_arpafds[PIPE_ARPA_READ], buf, sizeof(buf));
+ if (bytes_read < 0)
+ {
+ logger(1, "Read fd returned an error: %s", strerror(errno));
+ THREAD_ERROR_GOTO(trv);
+ }
+ if (bytes_read == 0)
+ {
+ del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]);
+ arpa_read_finished = 1;
+ continue;
+ }
+
+ /*
+ * Nothing to do .. ?
+ * I am just here to trigger some IP code paths.
+ */
+ }
else if (events[i].data.fd == signalfd)
{
struct signalfd_siginfo fdsi;
@@ -627,6 +673,7 @@ error:
del_event(dis_epollfd, signalfd);
del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]);
del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]);
+ del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]);
close(dis_epollfd);
close(signalfd);
nDPIsrvd_socket_free(&mock_sock);
@@ -646,8 +693,8 @@ static void * nDPId_mainloop_thread(void * const arg)
}
/* Replace nDPId JSON socket fd with the one in our pipe and hope that no socket specific code-path triggered. */
- reader_threads[0].json_sockfd = mock_pipefds[PIPE_nDPId];
- reader_threads[0].json_sock_reconnect = 0;
+ reader_threads[0].collector_sockfd = mock_pipefds[PIPE_nDPId];
+ reader_threads[0].collector_sock_reconnect = 0;
pthread_mutex_lock(&nDPId_start_mutex);
@@ -758,7 +805,8 @@ int main(int argc, char ** argv)
return 1;
}
- if (setup_pipe(mock_pipefds) != 0 || setup_pipe(mock_testfds) != 0 || setup_pipe(mock_nullfds) != 0)
+ if (setup_pipe(mock_pipefds) != 0 || setup_pipe(mock_testfds) != 0 || setup_pipe(mock_nullfds) != 0 ||
+ setup_pipe(mock_arpafds) != 0)
{
return 1;
}