summaryrefslogtreecommitdiff
path: root/nDPId-test.c
diff options
context:
space:
mode:
Diffstat (limited to 'nDPId-test.c')
-rw-r--r--nDPId-test.c192
1 files changed, 106 insertions, 86 deletions
diff --git a/nDPId-test.c b/nDPId-test.c
index fb21bc65c..48c0e2b70 100644
--- a/nDPId-test.c
+++ b/nDPId-test.c
@@ -292,14 +292,12 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
{
int nDPIsrvd_distributor_disconnects = 0;
int const nDPIsrvd_distributor_expected_disconnects = 5;
- int epollfd;
+ struct nio io;
struct remote_desc * mock_json_desc = NULL;
struct remote_desc * mock_test_desc = NULL;
struct remote_desc * mock_buff_desc = NULL;
struct remote_desc * mock_null_desc = NULL;
struct remote_desc * mock_arpa_desc = NULL;
- struct epoll_event events[32];
- size_t const events_size = sizeof(events) / sizeof(events[0]);
logger(0, "nDPIsrvd thread started, init..");
@@ -308,11 +306,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
logger(1, "nDPIsrvd block signals failed: %s", strerror(errno));
}
- errno = 0;
- epollfd = create_evq();
- if (epollfd < 0)
+ nio_init(&io);
+
+#ifdef ENABLE_EPOLL
+ if (nio_use_epoll(&io, 32) != NIO_SUCCESS)
+#else
+ if (nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)
+#endif
{
- logger(1, "nDPIsrvd epollfd invalid: %d", epollfd);
+ logger(1, "%s", "Error creating nDPIsrvd poll/epoll event I/O");
THREAD_ERROR_GOTO(arg);
}
@@ -356,9 +358,9 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
mock_arpa_desc->event_distributor_in.peer.sin_port = 0;
errno = 0;
- if (add_in_event(epollfd, mock_json_desc) != 0 || add_in_event(epollfd, mock_test_desc) != 0 ||
- add_in_event(epollfd, mock_buff_desc) != 0 || add_in_event(epollfd, mock_null_desc) != 0 ||
- add_in_event(epollfd, mock_arpa_desc) != 0)
+ if (add_in_event(&io, mock_json_desc) != 0 || add_in_event(&io, mock_test_desc) != 0 ||
+ add_in_event(&io, mock_buff_desc) != 0 || add_in_event(&io, mock_null_desc) != 0 ||
+ add_in_event(&io, mock_arpa_desc) != 0)
{
logger(1, "%s", "nDPIsrvd add input event failed");
THREAD_ERROR_GOTO(arg);
@@ -370,28 +372,25 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
while (nDPIsrvd_distributor_disconnects < nDPIsrvd_distributor_expected_disconnects)
{
- errno = 0;
- int nready = epoll_wait(epollfd, events, events_size, -1);
- if (nready < 0 && errno != EINTR)
+ if (nio_run(&io, -1) != NIO_SUCCESS)
{
- logger(1, "%s", "nDPIsrvd epoll wait failed.");
+ logger(1, "nDPIsrvd event I/O returned: %s", strerror(errno));
THREAD_ERROR_GOTO(arg);
}
- else if (errno == EINTR)
- {
- continue;
- }
+
+ int nready = nio_get_nready(&io);
for (int i = 0; i < nready; i++)
{
- if (events[i].data.ptr == mock_json_desc || events[i].data.ptr == mock_test_desc ||
- events[i].data.ptr == mock_buff_desc || events[i].data.ptr == mock_null_desc ||
- events[i].data.ptr == mock_arpa_desc)
+ struct remote_desc * remote = (struct remote_desc *)nio_get_ptr(&io, i);
+
+ if (remote == mock_json_desc || remote == mock_test_desc ||
+ remote == mock_buff_desc || remote == mock_null_desc ||
+ remote == mock_arpa_desc)
{
- if ((events[i].events & EPOLLHUP) != 0 || (events[i].events & EPOLLERR) != 0)
+ if (nio_has_error(&io, i) == NIO_SUCCESS)
{
char const * remote_desc_name;
- struct remote_desc * remote = (struct remote_desc *)events[i].data.ptr;
if (remote == mock_json_desc)
{
remote_desc_name = "Mock JSON";
@@ -405,7 +404,7 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
drain_write_buffers_blocking(mock_null_desc);
if (mock_arpa_desc->fd >= 0)
drain_write_buffers_blocking(mock_arpa_desc);
- } while (handle_data_event(epollfd, &events[i]) == 0);
+ } while (handle_data_event(&io, i) == 0);
}
else if (remote == mock_test_desc)
{
@@ -433,16 +432,22 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
remote_desc_name,
nDPIsrvd_distributor_disconnects,
nDPIsrvd_distributor_expected_disconnects);
- free_remote(epollfd, remote);
+ free_remote(&io, remote);
}
else
{
- if (handle_data_event(epollfd, &events[i]) != 0)
+ if (handle_data_event(&io, i) != 0)
{
- if (mock_arpa_desc == events[i].data.ptr)
+ if (mock_arpa_desc == remote)
{
// arpa mock does not care about shutdown events
- disconnect_client(epollfd, mock_arpa_desc);
+ free_remote(&io, mock_arpa_desc);
+ nDPIsrvd_distributor_disconnects++;
+ logger(1,
+ "nDPIsrvd distributor '%s' connection closed (%d/%d)",
+ "Mock ARPA",
+ nDPIsrvd_distributor_disconnects,
+ nDPIsrvd_distributor_expected_disconnects);
continue;
}
logger(1, "%s", "nDPIsrvd data event handler failed");
@@ -453,17 +458,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
else
{
logger(1,
- "nDPIsrvd epoll returned unexpected event data: %d (%p)",
- events[i].data.fd,
- events[i].data.ptr);
+ "nDPIsrvd epoll returned unexpected event data: %p", remote);
THREAD_ERROR_GOTO(arg);
}
}
}
error:
- free_remotes(epollfd);
- close(epollfd);
+ free_remotes(&io);
+ nio_free(&io);
logger(0, "%s", "nDPIsrvd worker thread exits..");
return NULL;
@@ -919,10 +922,8 @@ static enum nDPIsrvd_callback_return distributor_json_printer(struct nDPIsrvd_so
static void * distributor_client_mainloop_thread(void * const arg)
{
- int dis_epollfd = create_evq();
- int signalfd = setup_signalfd(dis_epollfd);
- struct epoll_event events[32];
- size_t const events_size = sizeof(events) / sizeof(events[0]);
+ struct nio io;
+ int signalfd = -1;
struct distributor_return_value * const drv = (struct distributor_return_value *)arg;
struct thread_return_value * const trv = &drv->thread_return_value;
struct nDPIsrvd_socket * mock_sock = nDPIsrvd_socket_init(sizeof(struct distributor_global_user_data),
@@ -947,9 +948,29 @@ static void * distributor_client_mainloop_thread(void * const arg)
logger(0, "Distributor thread started, init..");
+ nio_init(&io);
+
+#ifdef ENABLE_EPOLL
+ if (nio_use_epoll(&io, 32) != NIO_SUCCESS)
+#else
+ if (nio_use_poll(&io, 32) != NIO_SUCCESS)
+#endif
+ {
+ logger(1, "%s", "Error creating Distributor poll/epoll event I/O");
+ THREAD_ERROR_GOTO(trv);
+ }
+
+ signalfd = setup_signalfd(&io);
+ if (signalfd < 0)
+ {
+ logger(1, "Distributor signal fd setup failed: %s", strerror(errno));
+ THREAD_ERROR_GOTO(trv);
+ }
+
if (thread_block_signals() != 0)
{
logger(1, "Distributor block signals failed: %s", strerror(errno));
+ THREAD_ERROR_GOTO(trv);
}
errno = 0;
@@ -962,31 +983,26 @@ static void * distributor_client_mainloop_thread(void * const arg)
mock_buff->fd = mock_bufffds[PIPE_BUFFER_READ];
mock_null->fd = mock_nullfds[PIPE_NULL_READ];
- if (dis_epollfd < 0 || signalfd < 0)
- {
- THREAD_ERROR_GOTO(trv);
- }
-
errno = 0;
- if (add_in_event_fd(dis_epollfd, mock_testfds[PIPE_TEST_READ]) != 0)
+ if (add_in_event_fd(&io, mock_testfds[PIPE_TEST_READ]) != 0)
{
THREAD_ERROR_GOTO(trv);
}
errno = 0;
- if (add_in_event_fd(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]) != 0)
+ if (add_in_event_fd(&io, mock_bufffds[PIPE_BUFFER_READ]) != 0)
{
THREAD_ERROR_GOTO(trv);
}
errno = 0;
- if (add_in_event_fd(dis_epollfd, mock_nullfds[PIPE_NULL_READ]) != 0)
+ if (add_in_event_fd(&io, mock_nullfds[PIPE_NULL_READ]) != 0)
{
THREAD_ERROR_GOTO(trv);
}
errno = 0;
- if (add_in_event_fd(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]) != 0)
+ if (add_in_event_fd(&io, mock_arpafds[PIPE_ARPA_READ]) != 0)
{
THREAD_ERROR_GOTO(trv);
}
@@ -1006,31 +1022,30 @@ static void * distributor_client_mainloop_thread(void * const arg)
while (sock_stats->shutdown_events == 0 || buff_stats->shutdown_events == 0 || *mock_null_shutdown_events == 0)
{
- int nready = epoll_wait(dis_epollfd, events, events_size, -1);
- if (nready < 0 && errno != EINTR)
+ if (nio_run(&io, -1) != NIO_SUCCESS)
{
- logger(1, "%s", "Distributor epoll wait failed.");
+ logger(1, "nDPIsrvd event I/O returned: %s", strerror(errno));
THREAD_ERROR_GOTO(trv);
}
- else if (nready < 0 && errno == EINTR)
- {
- continue;
- }
+
+ int nready = nio_get_nready(&io);
for (int i = 0; i < nready; i++)
{
- if ((events[i].events & EPOLLIN) == 0 && (events[i].events & EPOLLHUP) == 0)
+ int fd = nio_get_fd(&io, i);
+
+ if (nio_has_input(&io, i) != NIO_SUCCESS && nio_has_error(&io, i) != NIO_SUCCESS)
{
- logger(1, "Invalid epoll event received: %d", events[i].events & (~EPOLLIN & ~EPOLLHUP));
+ logger(1, "Invalid epoll event received for fd: %d", fd);
THREAD_ERROR_GOTO(trv);
}
- if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0)
+ if (nio_has_error(&io, i) == NIO_SUCCESS)
{
- logger(1, "Distributor disconnected: %d", events[i].data.fd);
- del_event(dis_epollfd, events[i].data.fd);
+ logger(1, "Distributor disconnected: %d", fd);
+ del_event(&io, fd);
}
- if (events[i].data.fd == mock_testfds[PIPE_TEST_READ])
+ if (fd == mock_testfds[PIPE_TEST_READ])
{
switch (nDPIsrvd_read(mock_sock))
{
@@ -1065,7 +1080,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(trv);
}
}
- else if (events[i].data.fd == mock_bufffds[PIPE_BUFFER_READ])
+ else if (fd == mock_bufffds[PIPE_BUFFER_READ])
{
switch (nDPIsrvd_read(mock_buff))
{
@@ -1100,7 +1115,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(trv);
}
}
- else if (events[i].data.fd == mock_nullfds[PIPE_NULL_READ])
+ else if (fd == mock_nullfds[PIPE_NULL_READ])
{
switch (nDPIsrvd_read(mock_null))
{
@@ -1129,7 +1144,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(trv);
}
}
- else if (events[i].data.fd == mock_arpafds[PIPE_ARPA_READ])
+ else if (fd == mock_arpafds[PIPE_ARPA_READ])
{
char buf[NETWORK_BUFFER_MAX_SIZE];
ssize_t bytes_read = read(mock_arpafds[PIPE_ARPA_READ], buf, sizeof(buf));
@@ -1144,7 +1159,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
* I am just here to trigger some IP code paths.
*/
}
- else if (events[i].data.fd == signalfd)
+ else if (fd == signalfd)
{
struct signalfd_siginfo fdsi;
ssize_t s;
@@ -1166,9 +1181,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
else
{
logger(1,
- "Distributor epoll returned unexpected event data: %d (%p)",
- events[i].data.fd,
- events[i].data.ptr);
+ "Distributor epoll returned unexpected event data: %p", nio_get_ptr(&io, i));
THREAD_ERROR_GOTO(trv);
}
}
@@ -1238,17 +1251,17 @@ static void * distributor_client_mainloop_thread(void * const arg)
}
error:
- del_event(dis_epollfd, signalfd);
- del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]);
- del_event(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]);
- del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]);
- del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]);
+ del_event(&io, signalfd);
+ del_event(&io, mock_testfds[PIPE_TEST_READ]);
+ del_event(&io, mock_bufffds[PIPE_BUFFER_READ]);
+ del_event(&io, mock_nullfds[PIPE_NULL_READ]);
+ del_event(&io, mock_arpafds[PIPE_ARPA_READ]);
close(mock_testfds[PIPE_TEST_READ]);
close(mock_bufffds[PIPE_BUFFER_READ]);
close(mock_nullfds[PIPE_NULL_READ]);
close(mock_arpafds[PIPE_ARPA_READ]);
- close(dis_epollfd);
close(signalfd);
+ nio_free(&io);
nDPIsrvd_socket_free(&mock_sock);
nDPIsrvd_socket_free(&mock_buff);
@@ -1501,9 +1514,9 @@ static int nio_selftest()
#endif
#ifdef ENABLE_EPOLL
- if (nio_use_epoll(&io, 5) != NIO_ERROR_SUCCESS)
+ if (nio_use_epoll(&io, 32) != NIO_SUCCESS)
#else
- if (nio_use_poll(&io, 3) != NIO_ERROR_SUCCESS)
+ if (nio_use_poll(&io, 32) != NIO_SUCCESS)
#endif
{
logger(1, "%s", "Could not use poll/epoll for nio");
@@ -1518,8 +1531,8 @@ static int nio_selftest()
goto error;
}
- if (nio_add_fd(&io, pipefds[1], NIO_EVENT_OUTPUT, NULL) != NIO_ERROR_SUCCESS ||
- nio_add_fd(&io, pipefds[0], NIO_EVENT_INPUT, NULL) != NIO_ERROR_SUCCESS)
+ if (nio_add_fd(&io, pipefds[1], NIO_EVENT_OUTPUT, NULL) != NIO_SUCCESS ||
+ nio_add_fd(&io, pipefds[0], NIO_EVENT_INPUT, NULL) != NIO_SUCCESS)
{
logger(1, "%s", "Could not add pipe fds to nio");
goto error;
@@ -1535,26 +1548,26 @@ static int nio_selftest()
size_t const wlen = strnlen(wbuf, sizeof(wbuf));
write(pipefds[1], wbuf, wlen);
- if (nio_run(&io, 1000) != NIO_ERROR_SUCCESS)
+ if (nio_run(&io, 1000) != NIO_SUCCESS)
{
logger(1, "%s", "Event notification failed");
goto error;
}
- if (nio_can_output(&io, 0) != NIO_ERROR_SUCCESS)
+ if (nio_can_output(&io, 0) != NIO_SUCCESS)
{
logger(1, "%s", "Pipe fd (write) can not output");
goto error;
}
- if (nio_has_input(&io, 1) != NIO_ERROR_SUCCESS)
+ if (nio_has_input(&io, 1) != NIO_SUCCESS)
{
logger(1, "%s", "Pipe fd (read) has no input");
goto error;
}
- if (nio_is_valid(&io, 0) != NIO_ERROR_SUCCESS || nio_is_valid(&io, 1) != NIO_ERROR_SUCCESS ||
- nio_has_error(&io, 0) == NIO_ERROR_SUCCESS || nio_has_error(&io, 1) == NIO_ERROR_SUCCESS)
+ if (nio_is_valid(&io, 0) != NIO_SUCCESS || nio_is_valid(&io, 1) != NIO_SUCCESS ||
+ nio_has_error(&io, 0) == NIO_SUCCESS || nio_has_error(&io, 1) == NIO_SUCCESS)
{
logger(1, "%s", "Event validation failed");
goto error;
@@ -1567,31 +1580,38 @@ static int nio_selftest()
goto error;
}
- if (nio_run(&io, 1000) != NIO_ERROR_SUCCESS)
+ if (nio_run(&io, 1000) != NIO_SUCCESS)
{
logger(1, "%s", "Event notification failed");
goto error;
}
- if (nio_can_output(&io, 0) != NIO_ERROR_SUCCESS)
+ if (nio_can_output(&io, 0) != NIO_SUCCESS)
{
logger(1, "%s", "Pipe fd (write) can not output");
goto error;
}
- if (nio_has_input(&io, 1) == NIO_ERROR_SUCCESS)
+ if (nio_has_input(&io, 1) == NIO_SUCCESS)
{
logger(1, "%s", "Pipe fd (read) has input");
goto error;
}
- if (nio_is_valid(&io, 0) != NIO_ERROR_SUCCESS || nio_is_valid(&io, 1) == NIO_ERROR_SUCCESS ||
- nio_has_error(&io, 0) == NIO_ERROR_SUCCESS || nio_has_error(&io, 1) == NIO_ERROR_SUCCESS)
+ if (nio_is_valid(&io, 0) != NIO_SUCCESS || nio_is_valid(&io, 1) == NIO_SUCCESS ||
+ nio_has_error(&io, 0) == NIO_SUCCESS || nio_has_error(&io, 1) == NIO_SUCCESS)
{
logger(1, "%s", "Event validation failed");
goto error;
}
+ if (nio_del_fd(&io, pipefds[0]) != NIO_SUCCESS
+ || nio_del_fd(&io, pipefds[1]) != NIO_SUCCESS)
+ {
+ logger(1, "%s", "Event delete failed");
+ goto error;
+ }
+
nio_free(&io);
return 0;
error: