summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToni <matzeton@googlemail.com>2023-11-06 12:38:15 +0100
committerGitHub <noreply@github.com>2023-11-06 12:38:15 +0100
commit1b679271693a17ce0b653b9ba45db77b731db42e (patch)
tree986a2fac5feeaae71e2c2bd4e771e31a7c966de6
parent17c21e1d27a90b394873a0e80e5d6992f4b985ee (diff)
Event I/O abstraction layer. (#28)
* Finalize Event I/O abstraction layer. * Fix possible fd leakage, Gitlab-CI build and error logging. * Fixed possible uninitialized signalfd variable. * Fixed possible memory leak. * Fixed some SonarCloud complaints. * Fixed nDPId-test nDPIsrvd-arpa-mockup stuck indefinitely. * Add nDPId / nDPIsrvd command line option to use poll() on Linux instead of the default epoll(). Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r--.github/workflows/build.yml2
-rw-r--r--.gitlab-ci.yml2
-rw-r--r--CMakeLists.txt9
-rw-r--r--nDPId-test.c192
-rw-r--r--nDPId.c81
-rw-r--r--nDPIsrvd.c265
-rw-r--r--nio.c210
-rw-r--r--nio.h28
8 files changed, 448 insertions, 341 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 2f435b180..83b98ac50 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -147,7 +147,7 @@ jobs:
- name: Build single nDPId executable (invoke CC directly)
if: startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON')
run: |
- cc -fsanitize=address -fsanitize=undefined -fno-sanitize=alignment -fsanitize=enum -fsanitize=leak nDPId.c utils.c -I./build/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build/libnDPI/lib/libndpi.a -pthread -lm -lz
+ cc -fsanitize=address -fsanitize=undefined -fno-sanitize=alignment -fsanitize=enum -fsanitize=leak nDPId.c nio.c utils.c -I./build/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build/libnDPI/lib/libndpi.a -pthread -lm -lz
- name: Test EXEC
run: |
./build/nDPId-test
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 39a98b6af..85c840fed 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -67,7 +67,7 @@ build_and_test_static_libndpi:
- >
if ldd ./build-cmake-submodule/nDPId | grep -qoEi libndpi; then \
echo 'nDPId linked against a static libnDPI should not contain a shared linked libnDPI.' >&2; false; fi
- - cc nDPId.c utils.c -I./build-cmake-submodule/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build-cmake-submodule/libnDPI/lib/libndpi.a -pthread -lm -lz
+ - cc nDPId.c nio.c utils.c -I./build-cmake-submodule/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build-cmake-submodule/libnDPI/lib/libndpi.a -pthread -lm -lz
artifacts:
expire_in: 1 week
paths:
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 94c5ef24b..73d659a5d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -38,7 +38,10 @@ if(HAS_EPOLL)
option(FORCE_POLL "Force the use of poll() instead of epoll()." OFF)
if(NOT FORCE_POLL)
set(EPOLL_DEFS "-DENABLE_EPOLL=1")
- set(EPOLL_SRCS "nio.c")
+ endif()
+else()
+ if(BUILD_EXAMPLES)
+ message(FATAL_ERROR "Examples are using epoll event I/O. Without epoll available, you can not build/run those.")
endif()
endif()
@@ -100,8 +103,8 @@ else()
unset(NDPI_WITH_MAXMINDDB CACHE)
endif()
-add_executable(nDPId nDPId.c ${EPOLL_SRCS} utils.c)
-add_executable(nDPIsrvd nDPIsrvd.c ${EPOLL_SRCS} utils.c)
+add_executable(nDPId nDPId.c nio.c utils.c)
+add_executable(nDPIsrvd nDPIsrvd.c nio.c utils.c)
add_executable(nDPId-test nDPId-test.c)
add_custom_target(dist)
diff --git a/nDPId-test.c b/nDPId-test.c
index fb21bc65c..48c0e2b70 100644
--- a/nDPId-test.c
+++ b/nDPId-test.c
@@ -292,14 +292,12 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
{
int nDPIsrvd_distributor_disconnects = 0;
int const nDPIsrvd_distributor_expected_disconnects = 5;
- int epollfd;
+ struct nio io;
struct remote_desc * mock_json_desc = NULL;
struct remote_desc * mock_test_desc = NULL;
struct remote_desc * mock_buff_desc = NULL;
struct remote_desc * mock_null_desc = NULL;
struct remote_desc * mock_arpa_desc = NULL;
- struct epoll_event events[32];
- size_t const events_size = sizeof(events) / sizeof(events[0]);
logger(0, "nDPIsrvd thread started, init..");
@@ -308,11 +306,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
logger(1, "nDPIsrvd block signals failed: %s", strerror(errno));
}
- errno = 0;
- epollfd = create_evq();
- if (epollfd < 0)
+ nio_init(&io);
+
+#ifdef ENABLE_EPOLL
+ if (nio_use_epoll(&io, 32) != NIO_SUCCESS)
+#else
+ if (nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)
+#endif
{
- logger(1, "nDPIsrvd epollfd invalid: %d", epollfd);
+ logger(1, "%s", "Error creating nDPIsrvd poll/epoll event I/O");
THREAD_ERROR_GOTO(arg);
}
@@ -356,9 +358,9 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
mock_arpa_desc->event_distributor_in.peer.sin_port = 0;
errno = 0;
- if (add_in_event(epollfd, mock_json_desc) != 0 || add_in_event(epollfd, mock_test_desc) != 0 ||
- add_in_event(epollfd, mock_buff_desc) != 0 || add_in_event(epollfd, mock_null_desc) != 0 ||
- add_in_event(epollfd, mock_arpa_desc) != 0)
+ if (add_in_event(&io, mock_json_desc) != 0 || add_in_event(&io, mock_test_desc) != 0 ||
+ add_in_event(&io, mock_buff_desc) != 0 || add_in_event(&io, mock_null_desc) != 0 ||
+ add_in_event(&io, mock_arpa_desc) != 0)
{
logger(1, "%s", "nDPIsrvd add input event failed");
THREAD_ERROR_GOTO(arg);
@@ -370,28 +372,25 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
while (nDPIsrvd_distributor_disconnects < nDPIsrvd_distributor_expected_disconnects)
{
- errno = 0;
- int nready = epoll_wait(epollfd, events, events_size, -1);
- if (nready < 0 && errno != EINTR)
+ if (nio_run(&io, -1) != NIO_SUCCESS)
{
- logger(1, "%s", "nDPIsrvd epoll wait failed.");
+ logger(1, "nDPIsrvd event I/O returned: %s", strerror(errno));
THREAD_ERROR_GOTO(arg);
}
- else if (errno == EINTR)
- {
- continue;
- }
+
+ int nready = nio_get_nready(&io);
for (int i = 0; i < nready; i++)
{
- if (events[i].data.ptr == mock_json_desc || events[i].data.ptr == mock_test_desc ||
- events[i].data.ptr == mock_buff_desc || events[i].data.ptr == mock_null_desc ||
- events[i].data.ptr == mock_arpa_desc)
+ struct remote_desc * remote = (struct remote_desc *)nio_get_ptr(&io, i);
+
+ if (remote == mock_json_desc || remote == mock_test_desc ||
+ remote == mock_buff_desc || remote == mock_null_desc ||
+ remote == mock_arpa_desc)
{
- if ((events[i].events & EPOLLHUP) != 0 || (events[i].events & EPOLLERR) != 0)
+ if (nio_has_error(&io, i) == NIO_SUCCESS)
{
char const * remote_desc_name;
- struct remote_desc * remote = (struct remote_desc *)events[i].data.ptr;
if (remote == mock_json_desc)
{
remote_desc_name = "Mock JSON";
@@ -405,7 +404,7 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
drain_write_buffers_blocking(mock_null_desc);
if (mock_arpa_desc->fd >= 0)
drain_write_buffers_blocking(mock_arpa_desc);
- } while (handle_data_event(epollfd, &events[i]) == 0);
+ } while (handle_data_event(&io, i) == 0);
}
else if (remote == mock_test_desc)
{
@@ -433,16 +432,22 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
remote_desc_name,
nDPIsrvd_distributor_disconnects,
nDPIsrvd_distributor_expected_disconnects);
- free_remote(epollfd, remote);
+ free_remote(&io, remote);
}
else
{
- if (handle_data_event(epollfd, &events[i]) != 0)
+ if (handle_data_event(&io, i) != 0)
{
- if (mock_arpa_desc == events[i].data.ptr)
+ if (mock_arpa_desc == remote)
{
// arpa mock does not care about shutdown events
- disconnect_client(epollfd, mock_arpa_desc);
+ free_remote(&io, mock_arpa_desc);
+ nDPIsrvd_distributor_disconnects++;
+ logger(1,
+ "nDPIsrvd distributor '%s' connection closed (%d/%d)",
+ "Mock ARPA",
+ nDPIsrvd_distributor_disconnects,
+ nDPIsrvd_distributor_expected_disconnects);
continue;
}
logger(1, "%s", "nDPIsrvd data event handler failed");
@@ -453,17 +458,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg)
else
{
logger(1,
- "nDPIsrvd epoll returned unexpected event data: %d (%p)",
- events[i].data.fd,
- events[i].data.ptr);
+ "nDPIsrvd epoll returned unexpected event data: %p", remote);
THREAD_ERROR_GOTO(arg);
}
}
}
error:
- free_remotes(epollfd);
- close(epollfd);
+ free_remotes(&io);
+ nio_free(&io);
logger(0, "%s", "nDPIsrvd worker thread exits..");
return NULL;
@@ -919,10 +922,8 @@ static enum nDPIsrvd_callback_return distributor_json_printer(struct nDPIsrvd_so
static void * distributor_client_mainloop_thread(void * const arg)
{
- int dis_epollfd = create_evq();
- int signalfd = setup_signalfd(dis_epollfd);
- struct epoll_event events[32];
- size_t const events_size = sizeof(events) / sizeof(events[0]);
+ struct nio io;
+ int signalfd = -1;
struct distributor_return_value * const drv = (struct distributor_return_value *)arg;
struct thread_return_value * const trv = &drv->thread_return_value;
struct nDPIsrvd_socket * mock_sock = nDPIsrvd_socket_init(sizeof(struct distributor_global_user_data),
@@ -947,9 +948,29 @@ static void * distributor_client_mainloop_thread(void * const arg)
logger(0, "Distributor thread started, init..");
+ nio_init(&io);
+
+#ifdef ENABLE_EPOLL
+ if (nio_use_epoll(&io, 32) != NIO_SUCCESS)
+#else
+ if (nio_use_poll(&io, 32) != NIO_SUCCESS)
+#endif
+ {
+ logger(1, "%s", "Error creating Distributor poll/epoll event I/O");
+ THREAD_ERROR_GOTO(trv);
+ }
+
+ signalfd = setup_signalfd(&io);
+ if (signalfd < 0)
+ {
+ logger(1, "Distributor signal fd setup failed: %s", strerror(errno));
+ THREAD_ERROR_GOTO(trv);
+ }
+
if (thread_block_signals() != 0)
{
logger(1, "Distributor block signals failed: %s", strerror(errno));
+ THREAD_ERROR_GOTO(trv);
}
errno = 0;
@@ -962,31 +983,26 @@ static void * distributor_client_mainloop_thread(void * const arg)
mock_buff->fd = mock_bufffds[PIPE_BUFFER_READ];
mock_null->fd = mock_nullfds[PIPE_NULL_READ];
- if (dis_epollfd < 0 || signalfd < 0)
- {
- THREAD_ERROR_GOTO(trv);
- }
-
errno = 0;
- if (add_in_event_fd(dis_epollfd, mock_testfds[PIPE_TEST_READ]) != 0)
+ if (add_in_event_fd(&io, mock_testfds[PIPE_TEST_READ]) != 0)
{
THREAD_ERROR_GOTO(trv);
}
errno = 0;
- if (add_in_event_fd(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]) != 0)
+ if (add_in_event_fd(&io, mock_bufffds[PIPE_BUFFER_READ]) != 0)
{
THREAD_ERROR_GOTO(trv);
}
errno = 0;
- if (add_in_event_fd(dis_epollfd, mock_nullfds[PIPE_NULL_READ]) != 0)
+ if (add_in_event_fd(&io, mock_nullfds[PIPE_NULL_READ]) != 0)
{
THREAD_ERROR_GOTO(trv);
}
errno = 0;
- if (add_in_event_fd(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]) != 0)
+ if (add_in_event_fd(&io, mock_arpafds[PIPE_ARPA_READ]) != 0)
{
THREAD_ERROR_GOTO(trv);
}
@@ -1006,31 +1022,30 @@ static void * distributor_client_mainloop_thread(void * const arg)
while (sock_stats->shutdown_events == 0 || buff_stats->shutdown_events == 0 || *mock_null_shutdown_events == 0)
{
- int nready = epoll_wait(dis_epollfd, events, events_size, -1);
- if (nready < 0 && errno != EINTR)
+ if (nio_run(&io, -1) != NIO_SUCCESS)
{
- logger(1, "%s", "Distributor epoll wait failed.");
+ logger(1, "nDPIsrvd event I/O returned: %s", strerror(errno));
THREAD_ERROR_GOTO(trv);
}
- else if (nready < 0 && errno == EINTR)
- {
- continue;
- }
+
+ int nready = nio_get_nready(&io);
for (int i = 0; i < nready; i++)
{
- if ((events[i].events & EPOLLIN) == 0 && (events[i].events & EPOLLHUP) == 0)
+ int fd = nio_get_fd(&io, i);
+
+ if (nio_has_input(&io, i) != NIO_SUCCESS && nio_has_error(&io, i) != NIO_SUCCESS)
{
- logger(1, "Invalid epoll event received: %d", events[i].events & (~EPOLLIN & ~EPOLLHUP));
+ logger(1, "Invalid epoll event received for fd: %d", fd);
THREAD_ERROR_GOTO(trv);
}
- if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0)
+ if (nio_has_error(&io, i) == NIO_SUCCESS)
{
- logger(1, "Distributor disconnected: %d", events[i].data.fd);
- del_event(dis_epollfd, events[i].data.fd);
+ logger(1, "Distributor disconnected: %d", fd);
+ del_event(&io, fd);
}
- if (events[i].data.fd == mock_testfds[PIPE_TEST_READ])
+ if (fd == mock_testfds[PIPE_TEST_READ])
{
switch (nDPIsrvd_read(mock_sock))
{
@@ -1065,7 +1080,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(trv);
}
}
- else if (events[i].data.fd == mock_bufffds[PIPE_BUFFER_READ])
+ else if (fd == mock_bufffds[PIPE_BUFFER_READ])
{
switch (nDPIsrvd_read(mock_buff))
{
@@ -1100,7 +1115,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(trv);
}
}
- else if (events[i].data.fd == mock_nullfds[PIPE_NULL_READ])
+ else if (fd == mock_nullfds[PIPE_NULL_READ])
{
switch (nDPIsrvd_read(mock_null))
{
@@ -1129,7 +1144,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
THREAD_ERROR_GOTO(trv);
}
}
- else if (events[i].data.fd == mock_arpafds[PIPE_ARPA_READ])
+ else if (fd == mock_arpafds[PIPE_ARPA_READ])
{
char buf[NETWORK_BUFFER_MAX_SIZE];
ssize_t bytes_read = read(mock_arpafds[PIPE_ARPA_READ], buf, sizeof(buf));
@@ -1144,7 +1159,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
* I am just here to trigger some IP code paths.
*/
}
- else if (events[i].data.fd == signalfd)
+ else if (fd == signalfd)
{
struct signalfd_siginfo fdsi;
ssize_t s;
@@ -1166,9 +1181,7 @@ static void * distributor_client_mainloop_thread(void * const arg)
else
{
logger(1,
- "Distributor epoll returned unexpected event data: %d (%p)",
- events[i].data.fd,
- events[i].data.ptr);
+ "Distributor epoll returned unexpected event data: %p", nio_get_ptr(&io, i));
THREAD_ERROR_GOTO(trv);
}
}
@@ -1238,17 +1251,17 @@ static void * distributor_client_mainloop_thread(void * const arg)
}
error:
- del_event(dis_epollfd, signalfd);
- del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]);
- del_event(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]);
- del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]);
- del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]);
+ del_event(&io, signalfd);
+ del_event(&io, mock_testfds[PIPE_TEST_READ]);
+ del_event(&io, mock_bufffds[PIPE_BUFFER_READ]);
+ del_event(&io, mock_nullfds[PIPE_NULL_READ]);
+ del_event(&io, mock_arpafds[PIPE_ARPA_READ]);
close(mock_testfds[PIPE_TEST_READ]);
close(mock_bufffds[PIPE_BUFFER_READ]);
close(mock_nullfds[PIPE_NULL_READ]);
close(mock_arpafds[PIPE_ARPA_READ]);
- close(dis_epollfd);
close(signalfd);
+ nio_free(&io);
nDPIsrvd_socket_free(&mock_sock);
nDPIsrvd_socket_free(&mock_buff);
@@ -1501,9 +1514,9 @@ static int nio_selftest()
#endif
#ifdef ENABLE_EPOLL
- if (nio_use_epoll(&io, 5) != NIO_ERROR_SUCCESS)
+ if (nio_use_epoll(&io, 32) != NIO_SUCCESS)
#else
- if (nio_use_poll(&io, 3) != NIO_ERROR_SUCCESS)
+ if (nio_use_poll(&io, 32) != NIO_SUCCESS)
#endif
{
logger(1, "%s", "Could not use poll/epoll for nio");
@@ -1518,8 +1531,8 @@ static int nio_selftest()
goto error;
}
- if (nio_add_fd(&io, pipefds[1], NIO_EVENT_OUTPUT, NULL) != NIO_ERROR_SUCCESS ||
- nio_add_fd(&io, pipefds[0], NIO_EVENT_INPUT, NULL) != NIO_ERROR_SUCCESS)
+ if (nio_add_fd(&io, pipefds[1], NIO_EVENT_OUTPUT, NULL) != NIO_SUCCESS ||
+ nio_add_fd(&io, pipefds[0], NIO_EVENT_INPUT, NULL) != NIO_SUCCESS)
{
logger(1, "%s", "Could not add pipe fds to nio");
goto error;
@@ -1535,26 +1548,26 @@ static int nio_selftest()
size_t const wlen = strnlen(wbuf, sizeof(wbuf));
write(pipefds[1], wbuf, wlen);
- if (nio_run(&io, 1000) != NIO_ERROR_SUCCESS)
+ if (nio_run(&io, 1000) != NIO_SUCCESS)
{
logger(1, "%s", "Event notification failed");
goto error;
}
- if (nio_can_output(&io, 0) != NIO_ERROR_SUCCESS)
+ if (nio_can_output(&io, 0) != NIO_SUCCESS)
{
logger(1, "%s", "Pipe fd (write) can not output");
goto error;
}
- if (nio_has_input(&io, 1) != NIO_ERROR_SUCCESS)
+ if (nio_has_input(&io, 1) != NIO_SUCCESS)
{
logger(1, "%s", "Pipe fd (read) has no input");
goto error;
}
- if (nio_is_valid(&io, 0) != NIO_ERROR_SUCCESS || nio_is_valid(&io, 1) != NIO_ERROR_SUCCESS ||
- nio_has_error(&io, 0) == NIO_ERROR_SUCCESS || nio_has_error(&io, 1) == NIO_ERROR_SUCCESS)
+ if (nio_is_valid(&io, 0) != NIO_SUCCESS || nio_is_valid(&io, 1) != NIO_SUCCESS ||
+ nio_has_error(&io, 0) == NIO_SUCCESS || nio_has_error(&io, 1) == NIO_SUCCESS)
{
logger(1, "%s", "Event validation failed");
goto error;
@@ -1567,31 +1580,38 @@ static int nio_selftest()
goto error;
}
- if (nio_run(&io, 1000) != NIO_ERROR_SUCCESS)
+ if (nio_run(&io, 1000) != NIO_SUCCESS)
{
logger(1, "%s", "Event notification failed");
goto error;
}
- if (nio_can_output(&io, 0) != NIO_ERROR_SUCCESS)
+ if (nio_can_output(&io, 0) != NIO_SUCCESS)
{
logger(1, "%s", "Pipe fd (write) can not output");
goto error;
}
- if (nio_has_input(&io, 1) == NIO_ERROR_SUCCESS)
+ if (nio_has_input(&io, 1) == NIO_SUCCESS)
{
logger(1, "%s", "Pipe fd (read) has input");
goto error;
}
- if (nio_is_valid(&io, 0) != NIO_ERROR_SUCCESS || nio_is_valid(&io, 1) == NIO_ERROR_SUCCESS ||
- nio_has_error(&io, 0) == NIO_ERROR_SUCCESS || nio_has_error(&io, 1) == NIO_ERROR_SUCCESS)
+ if (nio_is_valid(&io, 0) != NIO_SUCCESS || nio_is_valid(&io, 1) == NIO_SUCCESS ||
+ nio_has_error(&io, 0) == NIO_SUCCESS || nio_has_error(&io, 1) == NIO_SUCCESS)
{
logger(1, "%s", "Event validation failed");
goto error;
}
+ if (nio_del_fd(&io, pipefds[0]) != NIO_SUCCESS
+ || nio_del_fd(&io, pipefds[1]) != NIO_SUCCESS)
+ {
+ logger(1, "%s", "Event delete failed");
+ goto error;
+ }
+
nio_free(&io);
return 0;
error:
diff --git a/nDPId.c b/nDPId.c
index c411eaa27..9fada5e40 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -16,7 +16,6 @@
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
-#include <sys/epoll.h>
#include <sys/ioctl.h>
#include <sys/signalfd.h>
#include <sys/un.h>
@@ -27,6 +26,7 @@
#include "config.h"
#include "nDPIsrvd.h"
+#include "nio.h"
#include "utils.h"
#ifndef UNIX_PATH_MAX
@@ -467,6 +467,9 @@ static struct
uint8_t enable_zlib_compression;
#endif
uint8_t enable_data_analysis;
+#ifdef ENABLE_EPOLL
+ uint8_t use_poll;
+#endif
/* subopts */
unsigned long long int max_flows_per_thread;
unsigned long long int max_idle_flows_per_thread;
@@ -4405,53 +4408,57 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
return;
}
- int epoll_fd = epoll_create1(EPOLL_CLOEXEC);
- if (epoll_fd < 0)
+ struct nio io;
+ nio_init(&io);
+#ifdef ENABLE_EPOLL
+ if ((nDPId_options.use_poll == 0 && nio_use_epoll(&io, 32) != NIO_SUCCESS)
+ || (nDPId_options.use_poll != 0 && nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS))
+#else
+ if (nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)
+#endif
{
- logger(1, "Got an invalid epoll fd: %s", strerror(errno));
+ logger(1, "%s", "Event I/O poll/epoll setup failed");
MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
+ nio_free(&io);
return;
}
- struct epoll_event event = {};
- event.events = EPOLLIN;
-
- event.data.fd = pcap_fd;
- if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pcap_fd, &event) != 0)
+ errno = 0;
+ if (nio_add_fd(&io, pcap_fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS)
{
- logger(1, "Could not add pcap fd %d to epoll fd %d: %s", pcap_fd, epoll_fd, strerror(errno));
+ logger(1,
+ "Could not add pcap fd to event queue: %s",
+ (errno != 0 ? strerror(errno) : "Internal Error"));
MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
+ nio_free(&io);
return;
}
- event.data.fd = signal_fd;
- if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &event) != 0)
+ errno = 0;
+ if (nio_add_fd(&io, signal_fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS)
{
- logger(1, "Could not add signal fd %d to epoll fd %d: %s", signal_fd, epoll_fd, strerror(errno));
+ logger(1,
+ "Could not add signal fd to event queue: %s",
+ (errno != 0 ? strerror(errno) : "Internal Error"));
MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
+ nio_free(&io);
return;
}
- struct epoll_event events[32];
- size_t const events_size = sizeof(events) / sizeof(events[0]);
int const timeout_ms = 1000; /* TODO: Configurable? */
- int nready;
struct timeval tval_before_epoll, tval_after_epoll;
while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0)
{
get_current_time(&tval_before_epoll);
errno = 0;
- nready = epoll_wait(epoll_fd, events, events_size, timeout_ms);
- if (errno != 0)
+ if (nio_run(&io, timeout_ms) != NIO_SUCCESS)
{
- if (errno == EINTR)
- {
- continue;
- }
- logger(1, "Epoll returned error: %s", strerror(errno));
+ logger(1, "Event I/O returned error: %s", strerror(errno));
MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
break;
}
+ int nready = nio_get_nready(&io);
+
if (nready == 0)
{
struct timeval tval_diff;
@@ -4467,13 +4474,15 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
for (int i = 0; i < nready; ++i)
{
- if ((events[i].events & EPOLLERR) != 0)
+ if (nio_has_error(&io, i) == NIO_SUCCESS)
{
- logger(1, "%s", "Epoll error event");
+ logger(1, "%s", "Event I/O error");
MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
}
- if (events[i].data.fd == signal_fd)
+ int fd = nio_get_fd(&io, i);
+
+ if (fd == signal_fd)
{
struct signalfd_siginfo fdsi;
if (read(signal_fd, &fdsi, sizeof(fdsi)) != sizeof(fdsi))
@@ -4504,7 +4513,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
logger(1, "Received signal %d (%s)", fdsi.ssi_signo, signame);
}
}
- else if (events[i].data.fd == pcap_fd)
+ else if (fd == pcap_fd)
{
switch (pcap_dispatch(
reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread))
@@ -4517,6 +4526,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
break;
case PCAP_ERROR_BREAK:
MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1);
+ nio_free(&io);
return;
default:
break;
@@ -4524,10 +4534,12 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
}
else
{
- logger(1, "Unknown event data 0x%llx returned", (unsigned long long int)events[i].data.u64);
+ logger(1, "Unknown event descriptor or data returned: %p", nio_get_ptr(&io, i));
}
}
}
+
+ nio_free(&io);
}
}
}
@@ -4900,7 +4912,7 @@ static void print_usage(char const * const arg0)
"Usage: %s "
"[-i pcap-file/interface] [-I] [-E] [-B bpf-filter]\n"
"\t \t"
- "[-l] [-L logfile] [-c address] "
+ "[-l] [-L logfile] [-c address] [-e]"
"[-d] [-p pidfile]\n"
"\t \t"
"[-u user] [-g group] "
@@ -4921,6 +4933,8 @@ static void print_usage(char const * const arg0)
"\t-L\tLog all messages to a log file.\n"
"\t-c\tPath to a UNIX socket (nDPIsrvd Collector) or a custom UDP endpoint.\n"
"\t \tDefault: %s\n"
+ "\t-e\tUse poll() instead of epoll().\n"
+ "\t \tDefault: epoll() on Linux, poll() otherwise\n"
"\t-d\tFork into background after initialization.\n"
"\t-p\tWrite the daemon PID to the given file path.\n"
"\t \tDefault: %s\n"
@@ -4982,7 +4996,7 @@ static int nDPId_parse_options(int argc, char ** argv)
{
int opt;
- while ((opt = getopt(argc, argv, "i:IEB:lL:c:dp:u:g:P:C:J:S:a:Azo:vh")) != -1)
+ while ((opt = getopt(argc, argv, "i:IEB:lL:c:edp:u:g:P:C:J:S:a:Azo:vh")) != -1)
{
switch (opt)
{
@@ -5010,6 +5024,13 @@ static int nDPId_parse_options(int argc, char ** argv)
case 'c':
set_cmdarg(&nDPId_options.collector_address, optarg);
break;
+ case 'e':
+#ifdef ENABLE_EPOLL
+ nDPId_options.use_poll = 1;
+#else
+ logger_early(1, "%s", "nDPId was built w/o epoll() support, poll() is already the default");
+#endif
+ break;
case 'd':
daemonize_enable();
break;
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 28a989fc0..a4d7457fe 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -9,7 +9,6 @@
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
-#include <sys/epoll.h>
#include <sys/signalfd.h>
#include <sys/socket.h>
#include <sys/types.h>
@@ -17,6 +16,7 @@
#include "config.h"
#include "nDPIsrvd.h"
+#include "nio.h"
#include "utils.h"
enum sock_type
@@ -92,7 +92,10 @@ static struct
struct cmdarg group;
nDPIsrvd_ull max_remote_descriptors;
nDPIsrvd_ull max_write_buffers;
- int bufferbloat_fallback_to_blocking;
+ uint8_t bufferbloat_fallback_to_blocking;
+#ifdef ENABLE_EPOLL
+ uint8_t use_poll;
+#endif
} nDPIsrvd_options = {.pidfile = CMDARG(nDPIsrvd_PIDFILE),
.collector_un_sockpath = CMDARG(COLLECTOR_UNIX_SOCKET),
.distributor_un_sockpath = CMDARG(DISTRIBUTOR_UNIX_SOCKET),
@@ -109,11 +112,11 @@ static void logger_nDPIsrvd(struct remote_desc const * const remote,
...);
static int fcntl_add_flags(int fd, int flags);
static int fcntl_del_flags(int fd, int flags);
-static int add_in_event_fd(int epollfd, int fd);
-static int add_in_event(int epollfd, struct remote_desc * const remote);
-static int del_event(int epollfd, int fd);
-static int del_out_event(int epollfd, struct remote_desc * const remote);
-static void disconnect_client(int epollfd, struct remote_desc * const current);
+static int add_in_event_fd(struct nio * const io, int fd);
+static int add_in_event(struct nio * const io, struct remote_desc * const remote);
+static int del_event(struct nio * const io, int fd);
+static int set_in_event(struct nio * const io, struct remote_desc * const remote);
+static void disconnect_client(struct nio * const io, struct remote_desc * const current);
static int drain_write_buffers_blocking(struct remote_desc * const remote);
static void nDPIsrvd_buffer_array_copy(void * dst, const void * src)
@@ -414,7 +417,7 @@ static int drain_write_buffers_blocking(struct remote_desc * const remote)
return retval;
}
-static int handle_outgoing_data(int epollfd, struct remote_desc * const remote)
+static int handle_outgoing_data(struct nio * const io, struct remote_desc * const remote)
{
UT_array * const additional_write_buffers = get_additional_write_buffers(remote);
@@ -425,7 +428,7 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote)
if (drain_write_buffers(remote) != 0)
{
logger_nDPIsrvd(remote, "Could not drain buffers for", ": %s", strerror(errno));
- disconnect_client(epollfd, remote);
+ disconnect_client(io, remote);
return -1;
}
if (utarray_len(additional_write_buffers) == 0)
@@ -433,7 +436,7 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote)
struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(remote);
if (write_buffer->buf.used == 0) {
- return del_out_event(epollfd, remote);
+ return set_in_event(io, remote);
} else {
return drain_main_buffer(remote);
}
@@ -679,15 +682,17 @@ static struct remote_desc * get_remote_descriptor(enum sock_type type, int remot
return NULL;
}
-static void free_remote(int epollfd, struct remote_desc * remote)
+static void free_remote(struct nio * const io, struct remote_desc * remote)
{
if (remote->fd > -1)
{
errno = 0;
- del_event(epollfd, remote->fd);
- if (errno != 0)
+ if (del_event(io, remote->fd) != 0)
{
- logger_nDPIsrvd(remote, "Could not delete event from epoll for connection", ": %s", strerror(errno));
+ logger_nDPIsrvd(remote,
+ "Could not delete event from queue for connection",
+ ": %s",
+ (errno != 0 ? strerror(errno) : "Internal Error"));
}
errno = 0;
close(remote->fd);
@@ -732,11 +737,11 @@ static void free_remote(int epollfd, struct remote_desc * remote)
}
}
-static void free_remotes(int epollfd)
+static void free_remotes(struct nio * const io)
{
for (size_t i = 0; i < remotes.desc_size; ++i)
{
- free_remote(epollfd, &remotes.desc[i]);
+ free_remote(io, &remotes.desc[i]);
}
nDPIsrvd_free(remotes.desc);
remotes.desc = NULL;
@@ -744,68 +749,34 @@ static void free_remotes(int epollfd)
remotes.desc_size = 0;
}
-static int add_event(int epollfd, int events, int fd, void * ptr)
+static int add_in_event_fd(struct nio * const io, int fd)
{
- int retval;
- struct epoll_event event = {};
-
- if (ptr != NULL)
- {
- event.data.ptr = ptr;
- }
- else
- {
- event.data.fd = fd;
- }
- event.events = events;
-
- while ((retval = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event)) != 0 && errno == EINTR) {}
- return retval;
+ return nio_add_fd(io, fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS;
}
-static int add_in_event_fd(int epollfd, int fd)
+static int add_in_event(struct nio * const io, struct remote_desc * const remote)
{
- return add_event(epollfd, EPOLLIN, fd, NULL);
+ return nio_add_fd(io, remote->fd, NIO_EVENT_INPUT, remote) != NIO_SUCCESS;
}
-static int add_in_event(int epollfd, struct remote_desc * const remote)
+static int set_out_event(struct nio * const io, struct remote_desc * const remote)
{
- return add_event(epollfd, EPOLLIN, remote->fd, remote);
+ return nio_mod_fd(io, remote->fd, NIO_EVENT_OUTPUT, remote) != NIO_SUCCESS;
}
-static int mod_event(int epollfd, int events, int fd, void * ptr)
+static int set_in_event(struct nio * const io, struct remote_desc * const remote)
{
- int retval;
- struct epoll_event event = {};
-
- event.data.ptr = ptr;
- event.events = events;
-
- while ((retval = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event)) != 0 && errno == EINTR) {}
- return retval;
+ return nio_mod_fd(io, remote->fd, NIO_EVENT_INPUT, remote) != NIO_SUCCESS;
}
-static int add_out_event(int epollfd, struct remote_desc * const remote)
+static int del_event(struct nio * const io, int fd)
{
- return mod_event(epollfd, EPOLLIN | EPOLLOUT, remote->fd, remote);
+ return nio_del_fd(io, fd) != NIO_SUCCESS;
}
-static int del_out_event(int epollfd, struct remote_desc * const remote)
+static void disconnect_client(struct nio * const io, struct remote_desc * const remote)
{
- return mod_event(epollfd, EPOLLIN, remote->fd, remote);
-}
-
-static int del_event(int epollfd, int fd)
-{
- int retval;
-
- while ((retval = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) != 0 && errno == EINTR) {}
- return retval;
-}
-
-static void disconnect_client(int epollfd, struct remote_desc * const remote)
-{
- free_remote(epollfd, remote);
+ free_remote(io, remote);
}
static int nDPIsrvd_parse_options(int argc, char ** argv)
@@ -828,6 +799,13 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
case 'c':
set_cmdarg(&nDPIsrvd_options.collector_un_sockpath, optarg);
break;
+ case 'e':
+#ifdef ENABLE_EPOLL
+ nDPIsrvd_options.use_poll = 1;
+#else
+ logger_early(1, "%s", "nDPIsrvd was built w/o epoll() support, poll() is already the default");
+#endif
+ break;
case 'd':
daemonize_enable();
break;
@@ -870,7 +848,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
default:
fprintf(stderr, "%s\n", get_nDPId_version());
fprintf(stderr,
- "Usage: %s [-l] [-L logfile] [-c path-to-unix-sock] [-d] [-p pidfile]\n"
+ "Usage: %s [-l] [-L logfile] [-c path-to-unix-sock] [-e] [-d] [-p pidfile]\n"
"\t[-s path-to-distributor-unix-socket] [-S distributor-host:port]\n"
"\t[-m max-remote-descriptors] [-u user] [-g group]\n"
"\t[-C max-buffered-json-lines] [-D]\n"
@@ -879,6 +857,8 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
"\t-L\tLog all messages to a log file.\n"
"\t-c\tPath to a listening UNIX socket (nDPIsrvd Collector).\n"
"\t \tDefault: %s\n"
+ "\t-e\tUse poll() instead of epoll().\n"
+ "\t \tDefault: epoll() on Linux, poll() otherwise\n"
"\t-d\tFork into background after initialization.\n"
"\t-p\tWrite the daemon PID to the given file path.\n"
"\t \tDefault: %s\n"
@@ -970,7 +950,7 @@ static struct remote_desc * accept_remote(int server_fd,
return current;
}
-static int new_connection(int epollfd, int eventfd)
+static int new_connection(struct nio * const io, int eventfd)
{
union
{
@@ -1112,7 +1092,7 @@ static int new_connection(int epollfd, int eventfd)
if (fcntl_add_flags(current->fd, O_NONBLOCK) != 0)
{
logger(1, "Error setting fd flags to non-blocking mode: %s", strerror(errno));
- disconnect_client(epollfd, current);
+ disconnect_client(io, current);
return 1;
}
@@ -1123,18 +1103,19 @@ static int new_connection(int epollfd, int eventfd)
/* shutdown reading end for distributor clients does not work due to epoll usage */
}
- /* setup epoll event */
- if (add_in_event(epollfd, current) != 0)
+ /* setup event I/O */
+ errno = 0;
+ if (add_in_event(io, current) != NIO_SUCCESS)
{
- logger(1, "Error adding input event to %d: %s", current->fd, strerror(errno));
- disconnect_client(epollfd, current);
+ logger(1, "Error adding input event to %d: %s", current->fd, (errno != 0 ? strerror(errno) : "Internal Error"));
+ disconnect_client(io, current);
return 1;
}
return 0;
}
-static int handle_collector_protocol(int epollfd, struct remote_desc * const current)
+static int handle_collector_protocol(struct nio * const io, struct remote_desc * const current)
{
struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current);
char * json_str_start = NULL;
@@ -1150,7 +1131,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
"BUG: Collector connection",
"JSON invalid opening character: '%c'",
json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]);
- disconnect_client(epollfd, current);
+ disconnect_client(io, current);
return 1;
}
@@ -1161,7 +1142,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
if (errno == ERANGE)
{
logger_nDPIsrvd(current, "BUG: Collector connection", "JSON string length exceeds numceric limits");
- disconnect_client(epollfd, current);
+ disconnect_client(io, current);
return 1;
}
@@ -1172,7 +1153,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
"missing JSON string length in protocol preamble: \"%.*s\"",
NETWORK_BUFFER_LENGTH_DIGITS,
json_read_buffer->buf.ptr.text);
- disconnect_client(epollfd, current);
+ disconnect_client(io, current);
return 1;
}
@@ -1194,7 +1175,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
"JSON string too big: %llu > %zu",
current->event_collector_un.json_bytes,
json_read_buffer->buf.max);
- disconnect_client(epollfd, current);
+ disconnect_client(io, current);
return 1;
}
@@ -1212,14 +1193,14 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
(int)current->event_collector_un.json_bytes > 512 ? 512
: (int)current->event_collector_un.json_bytes,
json_read_buffer->buf.ptr.text);
- disconnect_client(epollfd, current);
+ disconnect_client(io, current);
return 1;
}
return 0;
}
-static int handle_incoming_data(int epollfd, struct remote_desc * const current)
+static int handle_incoming_data(struct nio * const io, struct remote_desc * const current)
{
struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current);
@@ -1235,7 +1216,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
{
logger_nDPIsrvd(current, "Distributor connection", "closed");
}
- disconnect_client(epollfd, current);
+ disconnect_client(io, current);
return 1;
}
@@ -1261,13 +1242,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
if (bytes_read < 0 || errno != 0)
{
logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno));
- disconnect_client(epollfd, current);
+ disconnect_client(io, current);
return 1;
}
if (bytes_read == 0)
{
logger_nDPIsrvd(current, "Collector connection", "closed during read");
- disconnect_client(epollfd, current);
+ disconnect_client(io, current);
return 1;
}
json_read_buffer->buf.used += bytes_read;
@@ -1275,7 +1256,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
while (json_read_buffer->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
{
- if (handle_collector_protocol(epollfd, current) != 0)
+ if (handle_collector_protocol(io, current) != 0)
{
break;
}
@@ -1296,13 +1277,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
if (utarray_len(additional_write_buffers) == 0)
{
errno = 0;
- if (add_out_event(epollfd, &remotes.desc[i]) != 0)
+ if (set_out_event(io, &remotes.desc[i]) != 0)
{
logger_nDPIsrvd(&remotes.desc[i],
"Could not add event to",
", disconnecting: %s",
- strerror(errno));
- disconnect_client(epollfd, &remotes.desc[i]);
+ (errno != 0 ? strerror(errno) : "Internal Error"));
+ disconnect_client(io, &remotes.desc[i]);
continue;
}
}
@@ -1310,7 +1291,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
json_read_buffer->buf.ptr.raw,
current->event_collector_un.json_bytes) != 0)
{
- disconnect_client(epollfd, &remotes.desc[i]);
+ disconnect_client(io, &remotes.desc[i]);
continue;
}
}
@@ -1324,7 +1305,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
if (drain_main_buffer(&remotes.desc[i]) != 0)
{
- disconnect_client(epollfd, &remotes.desc[i]);
+ disconnect_client(io, &remotes.desc[i]);
}
}
@@ -1338,13 +1319,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
return 0;
}
-static int handle_data_event(int epollfd, struct epoll_event * const event)
+static int handle_data_event(struct nio * const io, int index)
{
- struct remote_desc * current = (struct remote_desc *)event->data.ptr;
+ struct remote_desc * const current = (struct remote_desc *)nio_get_ptr(io, index);
- if ((event->events & EPOLLIN) == 0 && (event->events & EPOLLOUT) == 0)
+ if (nio_has_input(io, index) != NIO_SUCCESS && nio_can_output(io, index) != NIO_SUCCESS)
{
- logger(1, "Can not handle event mask: %d", event->events);
+ logger(1, "%s", "Neither input nor output event set.");
return 1;
}
@@ -1360,17 +1341,17 @@ static int handle_data_event(int epollfd, struct epoll_event * const event)
return 1;
}
- if ((event->events & EPOLLIN) != 0)
+ if (nio_has_input(io, index) == NIO_SUCCESS)
{
- return handle_incoming_data(epollfd, current);
+ return handle_incoming_data(io, current);
}
else
{
- return handle_outgoing_data(epollfd, current);
+ return handle_outgoing_data(io, current);
}
}
-static int setup_signalfd(int epollfd)
+static int setup_signalfd(struct nio * const io)
{
sigset_t mask;
int sfd;
@@ -1390,7 +1371,7 @@ static int setup_signalfd(int epollfd)
return -1;
}
- if (add_in_event_fd(epollfd, sfd) != 0)
+ if (add_in_event_fd(io, sfd) != 0)
{
return -1;
}
@@ -1403,24 +1384,28 @@ static int setup_signalfd(int epollfd)
return sfd;
}
-static int mainloop(int epollfd)
+static int mainloop(struct nio * const io)
{
- struct epoll_event events[32];
- size_t const events_size = sizeof(events) / sizeof(events[0]);
- int signalfd = setup_signalfd(epollfd);
+ int signalfd = setup_signalfd(io);
while (nDPIsrvd_main_thread_shutdown == 0)
{
- int nready = epoll_wait(epollfd, events, events_size, 1000);
+ if (nio_run(io, 1000) != NIO_SUCCESS)
+ {
+ logger(1, "Event I/O returned error: %s", strerror(errno));
+ }
+
+ int nready = nio_get_nready(io);
for (int i = 0; i < nready; i++)
{
- if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0)
+ int fd = nio_get_fd(io, i);
+
+ if (nio_has_error(io, i) == NIO_SUCCESS)
{
- if (events[i].data.fd != collector_un_sockfd && events[i].data.fd != distributor_un_sockfd &&
- events[i].data.fd != distributor_in_sockfd)
+ if (fd != collector_un_sockfd && fd != distributor_un_sockfd && fd != distributor_in_sockfd)
{
- struct remote_desc * const current = (struct remote_desc *)events[i].data.ptr;
+ struct remote_desc * const current = (struct remote_desc *)nio_get_ptr(io, i);
switch (current->sock_type)
{
case COLLECTOR_UN:
@@ -1431,25 +1416,24 @@ static int mainloop(int epollfd)
logger_nDPIsrvd(current, "Distributor connection", "closed");
break;
}
- disconnect_client(epollfd, current);
+ disconnect_client(io, current);
}
else
{
- logger(1, "Epoll event error: %s", (errno != 0 ? strerror(errno) : "unknown"));
+ logger(1, "Event I/O error: %s", (errno != 0 ? strerror(errno) : "unknown"));
}
break;
}
- if (events[i].data.fd == collector_un_sockfd || events[i].data.fd == distributor_un_sockfd ||
- events[i].data.fd == distributor_in_sockfd)
+ if (fd == collector_un_sockfd || fd == distributor_un_sockfd || fd == distributor_in_sockfd)
{
/* New connection to collector / distributor. */
- if (new_connection(epollfd, events[i].data.fd) != 0)
+ if (new_connection(io, fd) != 0)
{
continue;
}
}
- else if (events[i].data.fd == signalfd)
+ else if (fd == signalfd)
{
struct signalfd_siginfo fdsi;
ssize_t s;
@@ -1473,7 +1457,7 @@ static int mainloop(int epollfd)
else
{
/* Incoming data / Outoing data ready to receive / send. */
- if (handle_data_event(epollfd, &events[i]) != 0)
+ if (handle_data_event(io, i) != 0)
{
/* do nothing */
}
@@ -1481,57 +1465,57 @@ static int mainloop(int epollfd)
}
}
- free_remotes(epollfd);
+ free_remotes(io);
+ nio_free(io);
close(signalfd);
return 0;
}
-static int create_evq(void)
+static int setup_event_queue(struct nio * const io)
{
- return epoll_create1(EPOLL_CLOEXEC);
-}
-
-static int setup_event_queue(void)
-{
- int epollfd = create_evq();
- if (epollfd < 0)
+#ifdef ENABLE_EPOLL
+ if ((nDPIsrvd_options.use_poll == 0 && nio_use_epoll(io, 32) != NIO_SUCCESS)
+ || (nDPIsrvd_options.use_poll != 0 && nio_use_poll(io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS))
+#else
+ if (nio_use_poll(io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)
+#endif
{
- logger(1, "Error creating epoll: %s", strerror(errno));
+ logger(1, "%s", "Event I/O poll/epoll setup failed");
return -1;
}
- if (add_in_event_fd(epollfd, collector_un_sockfd) != 0)
+ errno = 0;
+ if (add_in_event_fd(io, collector_un_sockfd) != 0)
{
- logger(1, "Error adding collector UNIX socket fd to epoll: %s", strerror(errno));
+ logger(1,
+ "Error adding collector UNIX socket fd to event I/O: %s",
+ (errno != 0 ? strerror(errno) : "Internal Error"));
return -1;
}
- if (add_in_event_fd(epollfd, distributor_un_sockfd) != 0)
+ errno = 0;
+ if (add_in_event_fd(io, distributor_un_sockfd) != 0)
{
- logger(1, "Error adding distributor UNIX socket fd to epoll: %s", strerror(errno));
+ logger(1,
+ "Error adding distributor UNIX socket fd to event I/O: %s",
+ (errno != 0 ? strerror(errno) : "Internal Error"));
return -1;
}
if (distributor_in_sockfd >= 0)
{
- if (add_in_event_fd(epollfd, distributor_in_sockfd) != 0)
+ errno = 0;
+ if (add_in_event_fd(io, distributor_in_sockfd) != 0)
{
- logger(1, "Error adding distributor TCP/IP socket fd to epoll: %s", strerror(errno));
+ logger(1,
+ "Error adding distributor TCP/IP socket fd to event I/O: %s",
+ (errno != 0 ? strerror(errno) : "Internal Error"));
return -1;
}
}
- return epollfd;
-}
-
-static void close_event_queue(int epollfd)
-{
- for (size_t i = 0; i < remotes.desc_size; ++i)
- {
- disconnect_client(epollfd, &remotes.desc[i]);
- }
- close(epollfd);
+ return 0;
}
static int setup_remote_descriptors(nDPIsrvd_ull max_remote_descriptors)
@@ -1555,13 +1539,14 @@ static int setup_remote_descriptors(nDPIsrvd_ull max_remote_descriptors)
int main(int argc, char ** argv)
{
int retval = 1;
- int epollfd;
+ struct nio io;
if (argc == 0)
{
return 1;
}
+ nio_init(&io);
init_logging("nDPIsrvd");
if (nDPIsrvd_parse_options(argc, argv) != 0)
@@ -1677,14 +1662,12 @@ int main(int argc, char ** argv)
signal(SIGTERM, SIG_IGN);
signal(SIGQUIT, SIG_IGN);
- epollfd = setup_event_queue();
- if (epollfd < 0)
+ if (setup_event_queue(&io) != 0)
{
goto error_unlink_sockets;
}
- retval = mainloop(epollfd);
- close_event_queue(epollfd);
+ retval = mainloop(&io);
error_unlink_sockets:
if (unlink(get_cmdarg(&nDPIsrvd_options.collector_un_sockpath)) != 0)
diff --git a/nio.c b/nio.c
index 0e1ea3671..babccb75e 100644
--- a/nio.c
+++ b/nio.c
@@ -1,5 +1,6 @@
#include "nio.h"
+#include <errno.h>
#include <stdint.h>
#include <stdlib.h>
#ifdef ENABLE_EPOLL
@@ -11,9 +12,9 @@ void nio_init(struct nio * io)
{
io->nready = -1;
io->poll_max_fds = 0;
- io->poll_cur_fds = 0;
io->poll_fds = NULL;
io->poll_ptrs = NULL;
+ io->poll_fds_set = NULL;
io->epoll_fd = -1;
io->max_events = 0;
io->events = NULL;
@@ -27,13 +28,15 @@ int nio_use_poll(struct nio * io, nfds_t max_fds)
io->poll_max_fds = max_fds;
io->poll_fds = (struct pollfd *)calloc(max_fds, sizeof(*io->poll_fds));
io->poll_ptrs = calloc(max_fds, sizeof(*io->poll_ptrs));
+ io->poll_fds_set = calloc(max_fds, sizeof(*io->poll_fds_set));
for (size_t i = 0; i < max_fds; ++i)
{
io->poll_fds[i].fd = -1;
}
- return io->poll_fds == NULL || io->poll_ptrs == NULL; // return NIO_ERROR_INTERNAL on error
+ return io->poll_fds == NULL || io->poll_ptrs == NULL || io->poll_fds_set == NULL; // return NIO_ERROR_INTERNAL on
+ // error
}
int nio_use_epoll(struct nio * io, int max_events)
@@ -46,7 +49,7 @@ int nio_use_epoll(struct nio * io, int max_events)
io->max_events = max_events;
io->events = calloc(max_events, sizeof(struct epoll_event));
- return io->events == NULL || io->epoll_fd < 0;
+ return io->events == NULL || io->epoll_fd < 0; // return NIO_ERROR_INTERNAL on error
#else
(void)io;
(void)max_events;
@@ -57,9 +60,13 @@ int nio_use_epoll(struct nio * io, int max_events)
int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr)
{
+ if (fd < 0)
+ return NIO_ERROR_INTERNAL;
+
#ifdef ENABLE_EPOLL
if (io->epoll_fd >= 0)
{
+ int rv;
struct epoll_event event = {};
if (ptr == NULL)
@@ -78,7 +85,11 @@ int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr)
if (event.events == 0)
return NIO_ERROR_INTERNAL;
- return epoll_ctl(io->epoll_fd, EPOLL_CTL_ADD, fd, &event);
+ while ((rv = epoll_ctl(io->epoll_fd, EPOLL_CTL_ADD, fd, &event)) != 0 && errno == EINTR)
+ {
+ /* If epoll_ctl() was interrupted by the system, repeat. */
+ }
+ return rv;
}
else
#endif
@@ -87,9 +98,6 @@ int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr)
struct pollfd * unused_pollfd = NULL;
void ** unused_ptr = NULL;
- if (io->poll_cur_fds == io->poll_max_fds || fd < 0)
- return NIO_ERROR_INTERNAL;
-
for (size_t i = 0; i < io->poll_max_fds; ++i)
{
if (io->poll_fds[i].fd < 0)
@@ -112,9 +120,8 @@ int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr)
unused_pollfd->fd = fd;
*unused_ptr = ptr;
- io->poll_cur_fds++;
- return NIO_ERROR_SUCCESS;
+ return NIO_SUCCESS;
}
return NIO_ERROR_INTERNAL;
@@ -122,9 +129,13 @@ int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr)
int nio_mod_fd(struct nio * io, int fd, int event_flags, void * ptr)
{
+ if (fd < 0)
+ return NIO_ERROR_INTERNAL;
+
#ifdef ENABLE_EPOLL
if (io->epoll_fd >= 0)
{
+ int rv;
struct epoll_event event = {};
if (ptr == NULL)
@@ -143,43 +154,43 @@ int nio_mod_fd(struct nio * io, int fd, int event_flags, void * ptr)
if (event.events == 0)
return NIO_ERROR_INTERNAL;
- return epoll_ctl(io->epoll_fd, EPOLL_CTL_MOD, fd, &event);
+ while ((rv = epoll_ctl(io->epoll_fd, EPOLL_CTL_MOD, fd, &event)) != 0 && errno == EINTR)
+ {
+ /* If epoll_ctl() was interrupted by the system, repeat. */
+ }
+ return rv;
}
else
#endif
if (io->poll_max_fds > 0)
{
- struct pollfd * unused_pollfd = NULL;
- void ** unused_ptr = NULL;
-
- if (io->poll_cur_fds == io->poll_max_fds || fd < 0)
- return NIO_ERROR_INTERNAL;
+ struct pollfd * used_pollfd = NULL;
+ void ** used_ptr = NULL;
for (size_t i = 0; i < io->poll_max_fds; ++i)
{
- if (io->poll_fds[i].fd < 0)
+ if (io->poll_fds[i].fd == fd)
{
- unused_pollfd = &io->poll_fds[i];
- unused_ptr = &io->poll_ptrs[i];
+ used_pollfd = &io->poll_fds[i];
+ used_ptr = &io->poll_ptrs[i];
break;
}
}
- if (unused_pollfd == NULL)
+ if (used_pollfd == NULL)
return NIO_ERROR_INTERNAL;
- unused_pollfd->events = 0;
+ used_pollfd->events = 0;
if ((event_flags & NIO_EVENT_INPUT) != 0)
- unused_pollfd->events |= POLLIN;
+ used_pollfd->events |= POLLIN;
if ((event_flags & NIO_EVENT_OUTPUT) != 0)
- unused_pollfd->events |= POLLOUT;
- if (unused_pollfd->events == 0)
+ used_pollfd->events |= POLLOUT;
+ if (used_pollfd->events == 0)
return NIO_ERROR_INTERNAL;
- unused_pollfd->fd = fd;
- *unused_ptr = ptr;
- io->poll_cur_fds++;
+ used_pollfd->fd = fd;
+ *used_ptr = ptr;
- return NIO_ERROR_SUCCESS;
+ return NIO_SUCCESS;
}
return NIO_ERROR_INTERNAL;
@@ -187,38 +198,43 @@ int nio_mod_fd(struct nio * io, int fd, int event_flags, void * ptr)
int nio_del_fd(struct nio * io, int fd)
{
+ if (fd < 0)
+ return NIO_ERROR_INTERNAL;
+
#ifdef ENABLE_EPOLL
if (io->epoll_fd >= 0)
{
- return epoll_ctl(io->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
+ int rv;
+
+ while ((rv = epoll_ctl(io->epoll_fd, EPOLL_CTL_DEL, fd, NULL)) != 0 && errno == EINTR)
+ {
+ /* If epoll_ctl() was interrupted by the system, repeat. */
+ }
+ return rv;
}
else
#endif
if (io->poll_max_fds > 0)
{
- struct pollfd * unused_pollfd = NULL;
- void ** unused_ptr = NULL;
-
- if (io->poll_cur_fds == io->poll_max_fds || fd < 0)
- return NIO_ERROR_INTERNAL;
+ struct pollfd * used_pollfd = NULL;
+ void ** used_ptr = NULL;
for (size_t i = 0; i < io->poll_max_fds; ++i)
{
- if (io->poll_fds[i].fd < 0)
+ if (io->poll_fds[i].fd == fd)
{
- unused_pollfd = &io->poll_fds[i];
- unused_ptr = &io->poll_ptrs[i];
+ used_pollfd = &io->poll_fds[i];
+ used_ptr = &io->poll_ptrs[i];
break;
}
}
- if (unused_pollfd == NULL)
+ if (used_pollfd == NULL)
return NIO_ERROR_INTERNAL;
- unused_pollfd->fd = -1;
- *unused_ptr = NULL;
- io->poll_cur_fds--;
+ used_pollfd->fd = -1;
+ *used_ptr = NULL;
- return NIO_ERROR_SUCCESS;
+ return NIO_SUCCESS;
}
return NIO_ERROR_INTERNAL;
@@ -229,7 +245,11 @@ int nio_run(struct nio * io, int timeout)
#ifdef ENABLE_EPOLL
if (io->epoll_fd >= 0)
{
- io->nready = epoll_wait(io->epoll_fd, io->events, io->max_events, timeout);
+ do
+ {
+ io->nready = epoll_wait(io->epoll_fd, io->events, io->max_events, timeout);
+ } while (io->nready < 0 && errno == EINTR);
+
if (io->nready < 0)
return NIO_ERROR_SYSTEM;
}
@@ -237,60 +257,73 @@ int nio_run(struct nio * io, int timeout)
#endif
if (io->poll_max_fds > 0)
{
- io->nready = poll(io->poll_fds, io->poll_max_fds, timeout);
+ do
+ {
+ io->nready = poll(io->poll_fds, io->poll_max_fds, timeout);
+ } while (io->nready < 0 && errno == EINTR);
+
if (io->nready < 0)
return NIO_ERROR_SYSTEM;
- else
- io->nready = io->poll_max_fds;
+
+ if (io->nready > 0)
+ {
+ for (nfds_t i = 0, j = 0; i < io->poll_max_fds; ++i)
+ {
+ if (io->poll_fds[i].fd >= 0 && io->poll_fds[i].revents != 0)
+ {
+ io->poll_fds_set[j++] = i;
+ }
+ }
+ }
}
- return NIO_ERROR_SUCCESS;
+ return NIO_SUCCESS;
}
-int nio_check(struct nio * io, int index, int events)
+int nio_check(struct nio * io, int index, int event_flags)
{
- if (index < 0 || index >= io->nready)
+ if (nio_is_valid(io, index) != NIO_SUCCESS)
return NIO_ERROR_INTERNAL;
#ifdef ENABLE_EPOLL
- if (io->epoll_fd >= 0 && index < io->max_events)
+ if (io->epoll_fd >= 0)
{
uint32_t epoll_events = 0;
- if ((events & NIO_EVENT_INPUT) != 0)
+ if ((event_flags & NIO_EVENT_INPUT) != 0)
epoll_events |= EPOLLIN;
- if ((events & NIO_EVENT_OUTPUT) != 0)
+ if ((event_flags & NIO_EVENT_OUTPUT) != 0)
epoll_events |= EPOLLOUT;
- if ((events & NIO_EVENT_ERROR) != 0)
+ if ((event_flags & NIO_EVENT_ERROR) != 0)
epoll_events |= EPOLLERR | EPOLLHUP;
if (epoll_events == 0)
return NIO_ERROR_INTERNAL;
- struct epoll_event * ee = (struct epoll_event *)io->events;
- if ((ee[index].events & epoll_events) == 0)
+ struct epoll_event const * const events = (struct epoll_event *)io->events;
+ if ((events[index].events & epoll_events) == 0)
return NIO_ERROR_INTERNAL;
- return NIO_ERROR_SUCCESS;
+ return NIO_SUCCESS;
}
else
#endif
- if (io->poll_max_fds > 0 && index < (int)io->poll_max_fds)
+ if (io->poll_max_fds > 0)
{
short int poll_events = 0;
- if ((events & NIO_EVENT_INPUT) != 0)
+ if ((event_flags & NIO_EVENT_INPUT) != 0)
poll_events |= POLLIN;
- if ((events & NIO_EVENT_OUTPUT) != 0)
+ if ((event_flags & NIO_EVENT_OUTPUT) != 0)
poll_events |= POLLOUT;
- if ((events & NIO_EVENT_ERROR) != 0)
+ if ((event_flags & NIO_EVENT_ERROR) != 0)
poll_events |= POLLERR | POLLHUP;
if (poll_events == 0)
return NIO_ERROR_INTERNAL;
- if ((io->poll_fds[index].revents & poll_events) == 0)
+ if ((io->poll_fds[io->poll_fds_set[index]].revents & poll_events) == 0)
return NIO_ERROR_INTERNAL;
- return NIO_ERROR_SUCCESS;
+ return NIO_SUCCESS;
}
return NIO_ERROR_INTERNAL;
@@ -302,34 +335,62 @@ int nio_is_valid(struct nio * io, int index)
return NIO_ERROR_INTERNAL;
#ifdef ENABLE_EPOLL
- if (io->epoll_fd >= 0 && index <= io->max_events)
+ if (io->epoll_fd >= 0)
{
- return NIO_ERROR_SUCCESS;
+ return NIO_SUCCESS;
}
else
#endif
- if (io->poll_max_fds > 0 && index < (int)io->poll_max_fds)
+ if (io->poll_max_fds > 0 && io->poll_fds[io->poll_fds_set[index]].fd >= 0)
{
- if (io->poll_fds[index].revents != 0)
- return NIO_ERROR_SUCCESS;
+ return NIO_SUCCESS;
}
return NIO_ERROR_INTERNAL;
}
-int nio_has_input(struct nio * io, int index)
+int nio_get_fd(struct nio * io, int index)
{
- return nio_check(io, index, NIO_EVENT_INPUT);
-}
+ if (nio_is_valid(io, index) != NIO_SUCCESS)
+ return -1;
-int nio_can_output(struct nio * io, int index)
-{
- return nio_check(io, index, NIO_EVENT_OUTPUT);
+#ifdef ENABLE_EPOLL
+ if (io->epoll_fd >= 0)
+ {
+ struct epoll_event const * const events = (struct epoll_event *)io->events;
+
+ return events[index].data.fd;
+ }
+ else
+#endif
+ if (io->poll_max_fds > 0)
+ {
+ return io->poll_fds[io->poll_fds_set[index]].fd;
+ }
+
+ return -1;
}
-int nio_has_error(struct nio * io, int index)
+void * nio_get_ptr(struct nio * io, int index)
{
- return nio_check(io, index, NIO_EVENT_ERROR);
+ if (nio_is_valid(io, index) != NIO_SUCCESS)
+ return NULL;
+
+#ifdef ENABLE_EPOLL
+ if (io->epoll_fd >= 0)
+ {
+ struct epoll_event * const events = (struct epoll_event *)io->events;
+
+ return events[index].data.ptr;
+ }
+ else
+#endif
+ if (io->poll_max_fds > 0)
+ {
+ return io->poll_ptrs[io->poll_fds_set[index]];
+ }
+
+ return NULL;
}
void nio_free(struct nio * io)
@@ -351,5 +412,6 @@ void nio_free(struct nio * io)
#endif
free(io->poll_fds);
free(io->poll_ptrs);
+ free(io->poll_fds_set);
free(io->events);
}
diff --git a/nio.h b/nio.h
index d1f5add72..a0ac0b6b0 100644
--- a/nio.h
+++ b/nio.h
@@ -5,7 +5,7 @@
enum
{
- NIO_ERROR_SUCCESS = 0,
+ NIO_SUCCESS = 0,
NIO_ERROR_INTERNAL = 1,
NIO_ERROR_SYSTEM = -1
};
@@ -23,9 +23,9 @@ struct nio
int nready;
nfds_t poll_max_fds;
- nfds_t poll_cur_fds;
struct pollfd * poll_fds;
void ** poll_ptrs;
+ nfds_t * poll_fds_set;
int epoll_fd;
int max_events;
@@ -46,15 +46,33 @@ int nio_del_fd(struct nio * io, int fd);
int nio_run(struct nio * io, int timeout);
+static inline int nio_get_nready(struct nio const * const io)
+{
+ return io->nready;
+}
+
int nio_check(struct nio * io, int index, int events);
int nio_is_valid(struct nio * io, int index);
-int nio_has_input(struct nio * io, int index);
+int nio_get_fd(struct nio * io, int index);
-int nio_can_output(struct nio * io, int index);
+void * nio_get_ptr(struct nio * io, int index);
+
+static inline int nio_has_input(struct nio * io, int index)
+{
+ return nio_check(io, index, NIO_EVENT_INPUT);
+}
-int nio_has_error(struct nio * io, int index);
+static inline int nio_can_output(struct nio * io, int index)
+{
+ return nio_check(io, index, NIO_EVENT_OUTPUT);
+}
+
+static inline int nio_has_error(struct nio * io, int index)
+{
+ return nio_check(io, index, NIO_EVENT_ERROR);
+}
void nio_free(struct nio * io);