diff options
author | Toni <matzeton@googlemail.com> | 2023-11-06 12:38:15 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-06 12:38:15 +0100 |
commit | 1b679271693a17ce0b653b9ba45db77b731db42e (patch) | |
tree | 986a2fac5feeaae71e2c2bd4e771e31a7c966de6 | |
parent | 17c21e1d27a90b394873a0e80e5d6992f4b985ee (diff) |
Event I/O abstraction layer. (#28)
* Finalize Event I/O abstraction layer.
* Fix possible fd leakage, Gitlab-CI build and error logging.
* Fixed possible uninitialized signalfd variable.
* Fixed possible memory leak.
* Fixed some SonarCloud complaints.
* Fixed nDPId-test nDPIsrvd-arpa-mockup stuck indefinitely.
* Add nDPId / nDPIsrvd command line option to use poll() on Linux instead of the default epoll().
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r-- | .github/workflows/build.yml | 2 | ||||
-rw-r--r-- | .gitlab-ci.yml | 2 | ||||
-rw-r--r-- | CMakeLists.txt | 9 | ||||
-rw-r--r-- | nDPId-test.c | 192 | ||||
-rw-r--r-- | nDPId.c | 81 | ||||
-rw-r--r-- | nDPIsrvd.c | 265 | ||||
-rw-r--r-- | nio.c | 210 | ||||
-rw-r--r-- | nio.h | 28 |
8 files changed, 448 insertions, 341 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2f435b180..83b98ac50 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -147,7 +147,7 @@ jobs: - name: Build single nDPId executable (invoke CC directly) if: startsWith(matrix.coverage, '-DENABLE_COVERAGE=OFF') && startsWith(matrix.sanitizer, '-DENABLE_SANITIZER=ON') && startsWith(matrix.ndpid_gcrypt, '-DNDPI_WITH_GCRYPT=OFF') && startsWith(matrix.ndpid_zlib, '-DENABLE_ZLIB=ON') run: | - cc -fsanitize=address -fsanitize=undefined -fno-sanitize=alignment -fsanitize=enum -fsanitize=leak nDPId.c utils.c -I./build/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build/libnDPI/lib/libndpi.a -pthread -lm -lz + cc -fsanitize=address -fsanitize=undefined -fno-sanitize=alignment -fsanitize=enum -fsanitize=leak nDPId.c nio.c utils.c -I./build/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build/libnDPI/lib/libndpi.a -pthread -lm -lz - name: Test EXEC run: | ./build/nDPId-test diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 39a98b6af..85c840fed 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -67,7 +67,7 @@ build_and_test_static_libndpi: - > if ldd ./build-cmake-submodule/nDPId | grep -qoEi libndpi; then \ echo 'nDPId linked against a static libnDPI should not contain a shared linked libnDPI.' >&2; false; fi - - cc nDPId.c utils.c -I./build-cmake-submodule/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build-cmake-submodule/libnDPI/lib/libndpi.a -pthread -lm -lz + - cc nDPId.c nio.c utils.c -I./build-cmake-submodule/libnDPI/include/ndpi -I. -I./dependencies -I./dependencies/jsmn -I./dependencies/uthash/include -o /tmp/a.out -lpcap ./build-cmake-submodule/libnDPI/lib/libndpi.a -pthread -lm -lz artifacts: expire_in: 1 week paths: diff --git a/CMakeLists.txt b/CMakeLists.txt index 94c5ef24b..73d659a5d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,7 +38,10 @@ if(HAS_EPOLL) option(FORCE_POLL "Force the use of poll() instead of epoll()." OFF) if(NOT FORCE_POLL) set(EPOLL_DEFS "-DENABLE_EPOLL=1") - set(EPOLL_SRCS "nio.c") + endif() +else() + if(BUILD_EXAMPLES) + message(FATAL_ERROR "Examples are using epoll event I/O. Without epoll available, you can not build/run those.") endif() endif() @@ -100,8 +103,8 @@ else() unset(NDPI_WITH_MAXMINDDB CACHE) endif() -add_executable(nDPId nDPId.c ${EPOLL_SRCS} utils.c) -add_executable(nDPIsrvd nDPIsrvd.c ${EPOLL_SRCS} utils.c) +add_executable(nDPId nDPId.c nio.c utils.c) +add_executable(nDPIsrvd nDPIsrvd.c nio.c utils.c) add_executable(nDPId-test nDPId-test.c) add_custom_target(dist) diff --git a/nDPId-test.c b/nDPId-test.c index fb21bc65c..48c0e2b70 100644 --- a/nDPId-test.c +++ b/nDPId-test.c @@ -292,14 +292,12 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) { int nDPIsrvd_distributor_disconnects = 0; int const nDPIsrvd_distributor_expected_disconnects = 5; - int epollfd; + struct nio io; struct remote_desc * mock_json_desc = NULL; struct remote_desc * mock_test_desc = NULL; struct remote_desc * mock_buff_desc = NULL; struct remote_desc * mock_null_desc = NULL; struct remote_desc * mock_arpa_desc = NULL; - struct epoll_event events[32]; - size_t const events_size = sizeof(events) / sizeof(events[0]); logger(0, "nDPIsrvd thread started, init.."); @@ -308,11 +306,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) logger(1, "nDPIsrvd block signals failed: %s", strerror(errno)); } - errno = 0; - epollfd = create_evq(); - if (epollfd < 0) + nio_init(&io); + +#ifdef ENABLE_EPOLL + if (nio_use_epoll(&io, 32) != NIO_SUCCESS) +#else + if (nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS) +#endif { - logger(1, "nDPIsrvd epollfd invalid: %d", epollfd); + logger(1, "%s", "Error creating nDPIsrvd poll/epoll event I/O"); THREAD_ERROR_GOTO(arg); } @@ -356,9 +358,9 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) mock_arpa_desc->event_distributor_in.peer.sin_port = 0; errno = 0; - if (add_in_event(epollfd, mock_json_desc) != 0 || add_in_event(epollfd, mock_test_desc) != 0 || - add_in_event(epollfd, mock_buff_desc) != 0 || add_in_event(epollfd, mock_null_desc) != 0 || - add_in_event(epollfd, mock_arpa_desc) != 0) + if (add_in_event(&io, mock_json_desc) != 0 || add_in_event(&io, mock_test_desc) != 0 || + add_in_event(&io, mock_buff_desc) != 0 || add_in_event(&io, mock_null_desc) != 0 || + add_in_event(&io, mock_arpa_desc) != 0) { logger(1, "%s", "nDPIsrvd add input event failed"); THREAD_ERROR_GOTO(arg); @@ -370,28 +372,25 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) while (nDPIsrvd_distributor_disconnects < nDPIsrvd_distributor_expected_disconnects) { - errno = 0; - int nready = epoll_wait(epollfd, events, events_size, -1); - if (nready < 0 && errno != EINTR) + if (nio_run(&io, -1) != NIO_SUCCESS) { - logger(1, "%s", "nDPIsrvd epoll wait failed."); + logger(1, "nDPIsrvd event I/O returned: %s", strerror(errno)); THREAD_ERROR_GOTO(arg); } - else if (errno == EINTR) - { - continue; - } + + int nready = nio_get_nready(&io); for (int i = 0; i < nready; i++) { - if (events[i].data.ptr == mock_json_desc || events[i].data.ptr == mock_test_desc || - events[i].data.ptr == mock_buff_desc || events[i].data.ptr == mock_null_desc || - events[i].data.ptr == mock_arpa_desc) + struct remote_desc * remote = (struct remote_desc *)nio_get_ptr(&io, i); + + if (remote == mock_json_desc || remote == mock_test_desc || + remote == mock_buff_desc || remote == mock_null_desc || + remote == mock_arpa_desc) { - if ((events[i].events & EPOLLHUP) != 0 || (events[i].events & EPOLLERR) != 0) + if (nio_has_error(&io, i) == NIO_SUCCESS) { char const * remote_desc_name; - struct remote_desc * remote = (struct remote_desc *)events[i].data.ptr; if (remote == mock_json_desc) { remote_desc_name = "Mock JSON"; @@ -405,7 +404,7 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) drain_write_buffers_blocking(mock_null_desc); if (mock_arpa_desc->fd >= 0) drain_write_buffers_blocking(mock_arpa_desc); - } while (handle_data_event(epollfd, &events[i]) == 0); + } while (handle_data_event(&io, i) == 0); } else if (remote == mock_test_desc) { @@ -433,16 +432,22 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) remote_desc_name, nDPIsrvd_distributor_disconnects, nDPIsrvd_distributor_expected_disconnects); - free_remote(epollfd, remote); + free_remote(&io, remote); } else { - if (handle_data_event(epollfd, &events[i]) != 0) + if (handle_data_event(&io, i) != 0) { - if (mock_arpa_desc == events[i].data.ptr) + if (mock_arpa_desc == remote) { // arpa mock does not care about shutdown events - disconnect_client(epollfd, mock_arpa_desc); + free_remote(&io, mock_arpa_desc); + nDPIsrvd_distributor_disconnects++; + logger(1, + "nDPIsrvd distributor '%s' connection closed (%d/%d)", + "Mock ARPA", + nDPIsrvd_distributor_disconnects, + nDPIsrvd_distributor_expected_disconnects); continue; } logger(1, "%s", "nDPIsrvd data event handler failed"); @@ -453,17 +458,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) else { logger(1, - "nDPIsrvd epoll returned unexpected event data: %d (%p)", - events[i].data.fd, - events[i].data.ptr); + "nDPIsrvd epoll returned unexpected event data: %p", remote); THREAD_ERROR_GOTO(arg); } } } error: - free_remotes(epollfd); - close(epollfd); + free_remotes(&io); + nio_free(&io); logger(0, "%s", "nDPIsrvd worker thread exits.."); return NULL; @@ -919,10 +922,8 @@ static enum nDPIsrvd_callback_return distributor_json_printer(struct nDPIsrvd_so static void * distributor_client_mainloop_thread(void * const arg) { - int dis_epollfd = create_evq(); - int signalfd = setup_signalfd(dis_epollfd); - struct epoll_event events[32]; - size_t const events_size = sizeof(events) / sizeof(events[0]); + struct nio io; + int signalfd = -1; struct distributor_return_value * const drv = (struct distributor_return_value *)arg; struct thread_return_value * const trv = &drv->thread_return_value; struct nDPIsrvd_socket * mock_sock = nDPIsrvd_socket_init(sizeof(struct distributor_global_user_data), @@ -947,9 +948,29 @@ static void * distributor_client_mainloop_thread(void * const arg) logger(0, "Distributor thread started, init.."); + nio_init(&io); + +#ifdef ENABLE_EPOLL + if (nio_use_epoll(&io, 32) != NIO_SUCCESS) +#else + if (nio_use_poll(&io, 32) != NIO_SUCCESS) +#endif + { + logger(1, "%s", "Error creating Distributor poll/epoll event I/O"); + THREAD_ERROR_GOTO(trv); + } + + signalfd = setup_signalfd(&io); + if (signalfd < 0) + { + logger(1, "Distributor signal fd setup failed: %s", strerror(errno)); + THREAD_ERROR_GOTO(trv); + } + if (thread_block_signals() != 0) { logger(1, "Distributor block signals failed: %s", strerror(errno)); + THREAD_ERROR_GOTO(trv); } errno = 0; @@ -962,31 +983,26 @@ static void * distributor_client_mainloop_thread(void * const arg) mock_buff->fd = mock_bufffds[PIPE_BUFFER_READ]; mock_null->fd = mock_nullfds[PIPE_NULL_READ]; - if (dis_epollfd < 0 || signalfd < 0) - { - THREAD_ERROR_GOTO(trv); - } - errno = 0; - if (add_in_event_fd(dis_epollfd, mock_testfds[PIPE_TEST_READ]) != 0) + if (add_in_event_fd(&io, mock_testfds[PIPE_TEST_READ]) != 0) { THREAD_ERROR_GOTO(trv); } errno = 0; - if (add_in_event_fd(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]) != 0) + if (add_in_event_fd(&io, mock_bufffds[PIPE_BUFFER_READ]) != 0) { THREAD_ERROR_GOTO(trv); } errno = 0; - if (add_in_event_fd(dis_epollfd, mock_nullfds[PIPE_NULL_READ]) != 0) + if (add_in_event_fd(&io, mock_nullfds[PIPE_NULL_READ]) != 0) { THREAD_ERROR_GOTO(trv); } errno = 0; - if (add_in_event_fd(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]) != 0) + if (add_in_event_fd(&io, mock_arpafds[PIPE_ARPA_READ]) != 0) { THREAD_ERROR_GOTO(trv); } @@ -1006,31 +1022,30 @@ static void * distributor_client_mainloop_thread(void * const arg) while (sock_stats->shutdown_events == 0 || buff_stats->shutdown_events == 0 || *mock_null_shutdown_events == 0) { - int nready = epoll_wait(dis_epollfd, events, events_size, -1); - if (nready < 0 && errno != EINTR) + if (nio_run(&io, -1) != NIO_SUCCESS) { - logger(1, "%s", "Distributor epoll wait failed."); + logger(1, "nDPIsrvd event I/O returned: %s", strerror(errno)); THREAD_ERROR_GOTO(trv); } - else if (nready < 0 && errno == EINTR) - { - continue; - } + + int nready = nio_get_nready(&io); for (int i = 0; i < nready; i++) { - if ((events[i].events & EPOLLIN) == 0 && (events[i].events & EPOLLHUP) == 0) + int fd = nio_get_fd(&io, i); + + if (nio_has_input(&io, i) != NIO_SUCCESS && nio_has_error(&io, i) != NIO_SUCCESS) { - logger(1, "Invalid epoll event received: %d", events[i].events & (~EPOLLIN & ~EPOLLHUP)); + logger(1, "Invalid epoll event received for fd: %d", fd); THREAD_ERROR_GOTO(trv); } - if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0) + if (nio_has_error(&io, i) == NIO_SUCCESS) { - logger(1, "Distributor disconnected: %d", events[i].data.fd); - del_event(dis_epollfd, events[i].data.fd); + logger(1, "Distributor disconnected: %d", fd); + del_event(&io, fd); } - if (events[i].data.fd == mock_testfds[PIPE_TEST_READ]) + if (fd == mock_testfds[PIPE_TEST_READ]) { switch (nDPIsrvd_read(mock_sock)) { @@ -1065,7 +1080,7 @@ static void * distributor_client_mainloop_thread(void * const arg) THREAD_ERROR_GOTO(trv); } } - else if (events[i].data.fd == mock_bufffds[PIPE_BUFFER_READ]) + else if (fd == mock_bufffds[PIPE_BUFFER_READ]) { switch (nDPIsrvd_read(mock_buff)) { @@ -1100,7 +1115,7 @@ static void * distributor_client_mainloop_thread(void * const arg) THREAD_ERROR_GOTO(trv); } } - else if (events[i].data.fd == mock_nullfds[PIPE_NULL_READ]) + else if (fd == mock_nullfds[PIPE_NULL_READ]) { switch (nDPIsrvd_read(mock_null)) { @@ -1129,7 +1144,7 @@ static void * distributor_client_mainloop_thread(void * const arg) THREAD_ERROR_GOTO(trv); } } - else if (events[i].data.fd == mock_arpafds[PIPE_ARPA_READ]) + else if (fd == mock_arpafds[PIPE_ARPA_READ]) { char buf[NETWORK_BUFFER_MAX_SIZE]; ssize_t bytes_read = read(mock_arpafds[PIPE_ARPA_READ], buf, sizeof(buf)); @@ -1144,7 +1159,7 @@ static void * distributor_client_mainloop_thread(void * const arg) * I am just here to trigger some IP code paths. */ } - else if (events[i].data.fd == signalfd) + else if (fd == signalfd) { struct signalfd_siginfo fdsi; ssize_t s; @@ -1166,9 +1181,7 @@ static void * distributor_client_mainloop_thread(void * const arg) else { logger(1, - "Distributor epoll returned unexpected event data: %d (%p)", - events[i].data.fd, - events[i].data.ptr); + "Distributor epoll returned unexpected event data: %p", nio_get_ptr(&io, i)); THREAD_ERROR_GOTO(trv); } } @@ -1238,17 +1251,17 @@ static void * distributor_client_mainloop_thread(void * const arg) } error: - del_event(dis_epollfd, signalfd); - del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]); - del_event(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]); - del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]); - del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]); + del_event(&io, signalfd); + del_event(&io, mock_testfds[PIPE_TEST_READ]); + del_event(&io, mock_bufffds[PIPE_BUFFER_READ]); + del_event(&io, mock_nullfds[PIPE_NULL_READ]); + del_event(&io, mock_arpafds[PIPE_ARPA_READ]); close(mock_testfds[PIPE_TEST_READ]); close(mock_bufffds[PIPE_BUFFER_READ]); close(mock_nullfds[PIPE_NULL_READ]); close(mock_arpafds[PIPE_ARPA_READ]); - close(dis_epollfd); close(signalfd); + nio_free(&io); nDPIsrvd_socket_free(&mock_sock); nDPIsrvd_socket_free(&mock_buff); @@ -1501,9 +1514,9 @@ static int nio_selftest() #endif #ifdef ENABLE_EPOLL - if (nio_use_epoll(&io, 5) != NIO_ERROR_SUCCESS) + if (nio_use_epoll(&io, 32) != NIO_SUCCESS) #else - if (nio_use_poll(&io, 3) != NIO_ERROR_SUCCESS) + if (nio_use_poll(&io, 32) != NIO_SUCCESS) #endif { logger(1, "%s", "Could not use poll/epoll for nio"); @@ -1518,8 +1531,8 @@ static int nio_selftest() goto error; } - if (nio_add_fd(&io, pipefds[1], NIO_EVENT_OUTPUT, NULL) != NIO_ERROR_SUCCESS || - nio_add_fd(&io, pipefds[0], NIO_EVENT_INPUT, NULL) != NIO_ERROR_SUCCESS) + if (nio_add_fd(&io, pipefds[1], NIO_EVENT_OUTPUT, NULL) != NIO_SUCCESS || + nio_add_fd(&io, pipefds[0], NIO_EVENT_INPUT, NULL) != NIO_SUCCESS) { logger(1, "%s", "Could not add pipe fds to nio"); goto error; @@ -1535,26 +1548,26 @@ static int nio_selftest() size_t const wlen = strnlen(wbuf, sizeof(wbuf)); write(pipefds[1], wbuf, wlen); - if (nio_run(&io, 1000) != NIO_ERROR_SUCCESS) + if (nio_run(&io, 1000) != NIO_SUCCESS) { logger(1, "%s", "Event notification failed"); goto error; } - if (nio_can_output(&io, 0) != NIO_ERROR_SUCCESS) + if (nio_can_output(&io, 0) != NIO_SUCCESS) { logger(1, "%s", "Pipe fd (write) can not output"); goto error; } - if (nio_has_input(&io, 1) != NIO_ERROR_SUCCESS) + if (nio_has_input(&io, 1) != NIO_SUCCESS) { logger(1, "%s", "Pipe fd (read) has no input"); goto error; } - if (nio_is_valid(&io, 0) != NIO_ERROR_SUCCESS || nio_is_valid(&io, 1) != NIO_ERROR_SUCCESS || - nio_has_error(&io, 0) == NIO_ERROR_SUCCESS || nio_has_error(&io, 1) == NIO_ERROR_SUCCESS) + if (nio_is_valid(&io, 0) != NIO_SUCCESS || nio_is_valid(&io, 1) != NIO_SUCCESS || + nio_has_error(&io, 0) == NIO_SUCCESS || nio_has_error(&io, 1) == NIO_SUCCESS) { logger(1, "%s", "Event validation failed"); goto error; @@ -1567,31 +1580,38 @@ static int nio_selftest() goto error; } - if (nio_run(&io, 1000) != NIO_ERROR_SUCCESS) + if (nio_run(&io, 1000) != NIO_SUCCESS) { logger(1, "%s", "Event notification failed"); goto error; } - if (nio_can_output(&io, 0) != NIO_ERROR_SUCCESS) + if (nio_can_output(&io, 0) != NIO_SUCCESS) { logger(1, "%s", "Pipe fd (write) can not output"); goto error; } - if (nio_has_input(&io, 1) == NIO_ERROR_SUCCESS) + if (nio_has_input(&io, 1) == NIO_SUCCESS) { logger(1, "%s", "Pipe fd (read) has input"); goto error; } - if (nio_is_valid(&io, 0) != NIO_ERROR_SUCCESS || nio_is_valid(&io, 1) == NIO_ERROR_SUCCESS || - nio_has_error(&io, 0) == NIO_ERROR_SUCCESS || nio_has_error(&io, 1) == NIO_ERROR_SUCCESS) + if (nio_is_valid(&io, 0) != NIO_SUCCESS || nio_is_valid(&io, 1) == NIO_SUCCESS || + nio_has_error(&io, 0) == NIO_SUCCESS || nio_has_error(&io, 1) == NIO_SUCCESS) { logger(1, "%s", "Event validation failed"); goto error; } + if (nio_del_fd(&io, pipefds[0]) != NIO_SUCCESS + || nio_del_fd(&io, pipefds[1]) != NIO_SUCCESS) + { + logger(1, "%s", "Event delete failed"); + goto error; + } + nio_free(&io); return 0; error: @@ -16,7 +16,6 @@ #include <stdarg.h> #include <stdio.h> #include <stdlib.h> -#include <sys/epoll.h> #include <sys/ioctl.h> #include <sys/signalfd.h> #include <sys/un.h> @@ -27,6 +26,7 @@ #include "config.h" #include "nDPIsrvd.h" +#include "nio.h" #include "utils.h" #ifndef UNIX_PATH_MAX @@ -467,6 +467,9 @@ static struct uint8_t enable_zlib_compression; #endif uint8_t enable_data_analysis; +#ifdef ENABLE_EPOLL + uint8_t use_poll; +#endif /* subopts */ unsigned long long int max_flows_per_thread; unsigned long long int max_idle_flows_per_thread; @@ -4405,53 +4408,57 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) return; } - int epoll_fd = epoll_create1(EPOLL_CLOEXEC); - if (epoll_fd < 0) + struct nio io; + nio_init(&io); +#ifdef ENABLE_EPOLL + if ((nDPId_options.use_poll == 0 && nio_use_epoll(&io, 32) != NIO_SUCCESS) + || (nDPId_options.use_poll != 0 && nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)) +#else + if (nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS) +#endif { - logger(1, "Got an invalid epoll fd: %s", strerror(errno)); + logger(1, "%s", "Event I/O poll/epoll setup failed"); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + nio_free(&io); return; } - struct epoll_event event = {}; - event.events = EPOLLIN; - - event.data.fd = pcap_fd; - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pcap_fd, &event) != 0) + errno = 0; + if (nio_add_fd(&io, pcap_fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS) { - logger(1, "Could not add pcap fd %d to epoll fd %d: %s", pcap_fd, epoll_fd, strerror(errno)); + logger(1, + "Could not add pcap fd to event queue: %s", + (errno != 0 ? strerror(errno) : "Internal Error")); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + nio_free(&io); return; } - event.data.fd = signal_fd; - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &event) != 0) + errno = 0; + if (nio_add_fd(&io, signal_fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS) { - logger(1, "Could not add signal fd %d to epoll fd %d: %s", signal_fd, epoll_fd, strerror(errno)); + logger(1, + "Could not add signal fd to event queue: %s", + (errno != 0 ? strerror(errno) : "Internal Error")); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + nio_free(&io); return; } - struct epoll_event events[32]; - size_t const events_size = sizeof(events) / sizeof(events[0]); int const timeout_ms = 1000; /* TODO: Configurable? */ - int nready; struct timeval tval_before_epoll, tval_after_epoll; while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) { get_current_time(&tval_before_epoll); errno = 0; - nready = epoll_wait(epoll_fd, events, events_size, timeout_ms); - if (errno != 0) + if (nio_run(&io, timeout_ms) != NIO_SUCCESS) { - if (errno == EINTR) - { - continue; - } - logger(1, "Epoll returned error: %s", strerror(errno)); + logger(1, "Event I/O returned error: %s", strerror(errno)); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); break; } + int nready = nio_get_nready(&io); + if (nready == 0) { struct timeval tval_diff; @@ -4467,13 +4474,15 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) for (int i = 0; i < nready; ++i) { - if ((events[i].events & EPOLLERR) != 0) + if (nio_has_error(&io, i) == NIO_SUCCESS) { - logger(1, "%s", "Epoll error event"); + logger(1, "%s", "Event I/O error"); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); } - if (events[i].data.fd == signal_fd) + int fd = nio_get_fd(&io, i); + + if (fd == signal_fd) { struct signalfd_siginfo fdsi; if (read(signal_fd, &fdsi, sizeof(fdsi)) != sizeof(fdsi)) @@ -4504,7 +4513,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) logger(1, "Received signal %d (%s)", fdsi.ssi_signo, signame); } } - else if (events[i].data.fd == pcap_fd) + else if (fd == pcap_fd) { switch (pcap_dispatch( reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread)) @@ -4517,6 +4526,7 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) break; case PCAP_ERROR_BREAK: MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + nio_free(&io); return; default: break; @@ -4524,10 +4534,12 @@ static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) } else { - logger(1, "Unknown event data 0x%llx returned", (unsigned long long int)events[i].data.u64); + logger(1, "Unknown event descriptor or data returned: %p", nio_get_ptr(&io, i)); } } } + + nio_free(&io); } } } @@ -4900,7 +4912,7 @@ static void print_usage(char const * const arg0) "Usage: %s " "[-i pcap-file/interface] [-I] [-E] [-B bpf-filter]\n" "\t \t" - "[-l] [-L logfile] [-c address] " + "[-l] [-L logfile] [-c address] [-e]" "[-d] [-p pidfile]\n" "\t \t" "[-u user] [-g group] " @@ -4921,6 +4933,8 @@ static void print_usage(char const * const arg0) "\t-L\tLog all messages to a log file.\n" "\t-c\tPath to a UNIX socket (nDPIsrvd Collector) or a custom UDP endpoint.\n" "\t \tDefault: %s\n" + "\t-e\tUse poll() instead of epoll().\n" + "\t \tDefault: epoll() on Linux, poll() otherwise\n" "\t-d\tFork into background after initialization.\n" "\t-p\tWrite the daemon PID to the given file path.\n" "\t \tDefault: %s\n" @@ -4982,7 +4996,7 @@ static int nDPId_parse_options(int argc, char ** argv) { int opt; - while ((opt = getopt(argc, argv, "i:IEB:lL:c:dp:u:g:P:C:J:S:a:Azo:vh")) != -1) + while ((opt = getopt(argc, argv, "i:IEB:lL:c:edp:u:g:P:C:J:S:a:Azo:vh")) != -1) { switch (opt) { @@ -5010,6 +5024,13 @@ static int nDPId_parse_options(int argc, char ** argv) case 'c': set_cmdarg(&nDPId_options.collector_address, optarg); break; + case 'e': +#ifdef ENABLE_EPOLL + nDPId_options.use_poll = 1; +#else + logger_early(1, "%s", "nDPId was built w/o epoll() support, poll() is already the default"); +#endif + break; case 'd': daemonize_enable(); break; diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 28a989fc0..a4d7457fe 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -9,7 +9,6 @@ #include <stdlib.h> #include <stdint.h> #include <string.h> -#include <sys/epoll.h> #include <sys/signalfd.h> #include <sys/socket.h> #include <sys/types.h> @@ -17,6 +16,7 @@ #include "config.h" #include "nDPIsrvd.h" +#include "nio.h" #include "utils.h" enum sock_type @@ -92,7 +92,10 @@ static struct struct cmdarg group; nDPIsrvd_ull max_remote_descriptors; nDPIsrvd_ull max_write_buffers; - int bufferbloat_fallback_to_blocking; + uint8_t bufferbloat_fallback_to_blocking; +#ifdef ENABLE_EPOLL + uint8_t use_poll; +#endif } nDPIsrvd_options = {.pidfile = CMDARG(nDPIsrvd_PIDFILE), .collector_un_sockpath = CMDARG(COLLECTOR_UNIX_SOCKET), .distributor_un_sockpath = CMDARG(DISTRIBUTOR_UNIX_SOCKET), @@ -109,11 +112,11 @@ static void logger_nDPIsrvd(struct remote_desc const * const remote, ...); static int fcntl_add_flags(int fd, int flags); static int fcntl_del_flags(int fd, int flags); -static int add_in_event_fd(int epollfd, int fd); -static int add_in_event(int epollfd, struct remote_desc * const remote); -static int del_event(int epollfd, int fd); -static int del_out_event(int epollfd, struct remote_desc * const remote); -static void disconnect_client(int epollfd, struct remote_desc * const current); +static int add_in_event_fd(struct nio * const io, int fd); +static int add_in_event(struct nio * const io, struct remote_desc * const remote); +static int del_event(struct nio * const io, int fd); +static int set_in_event(struct nio * const io, struct remote_desc * const remote); +static void disconnect_client(struct nio * const io, struct remote_desc * const current); static int drain_write_buffers_blocking(struct remote_desc * const remote); static void nDPIsrvd_buffer_array_copy(void * dst, const void * src) @@ -414,7 +417,7 @@ static int drain_write_buffers_blocking(struct remote_desc * const remote) return retval; } -static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) +static int handle_outgoing_data(struct nio * const io, struct remote_desc * const remote) { UT_array * const additional_write_buffers = get_additional_write_buffers(remote); @@ -425,7 +428,7 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) if (drain_write_buffers(remote) != 0) { logger_nDPIsrvd(remote, "Could not drain buffers for", ": %s", strerror(errno)); - disconnect_client(epollfd, remote); + disconnect_client(io, remote); return -1; } if (utarray_len(additional_write_buffers) == 0) @@ -433,7 +436,7 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(remote); if (write_buffer->buf.used == 0) { - return del_out_event(epollfd, remote); + return set_in_event(io, remote); } else { return drain_main_buffer(remote); } @@ -679,15 +682,17 @@ static struct remote_desc * get_remote_descriptor(enum sock_type type, int remot return NULL; } -static void free_remote(int epollfd, struct remote_desc * remote) +static void free_remote(struct nio * const io, struct remote_desc * remote) { if (remote->fd > -1) { errno = 0; - del_event(epollfd, remote->fd); - if (errno != 0) + if (del_event(io, remote->fd) != 0) { - logger_nDPIsrvd(remote, "Could not delete event from epoll for connection", ": %s", strerror(errno)); + logger_nDPIsrvd(remote, + "Could not delete event from queue for connection", + ": %s", + (errno != 0 ? strerror(errno) : "Internal Error")); } errno = 0; close(remote->fd); @@ -732,11 +737,11 @@ static void free_remote(int epollfd, struct remote_desc * remote) } } -static void free_remotes(int epollfd) +static void free_remotes(struct nio * const io) { for (size_t i = 0; i < remotes.desc_size; ++i) { - free_remote(epollfd, &remotes.desc[i]); + free_remote(io, &remotes.desc[i]); } nDPIsrvd_free(remotes.desc); remotes.desc = NULL; @@ -744,68 +749,34 @@ static void free_remotes(int epollfd) remotes.desc_size = 0; } -static int add_event(int epollfd, int events, int fd, void * ptr) +static int add_in_event_fd(struct nio * const io, int fd) { - int retval; - struct epoll_event event = {}; - - if (ptr != NULL) - { - event.data.ptr = ptr; - } - else - { - event.data.fd = fd; - } - event.events = events; - - while ((retval = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event)) != 0 && errno == EINTR) {} - return retval; + return nio_add_fd(io, fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS; } -static int add_in_event_fd(int epollfd, int fd) +static int add_in_event(struct nio * const io, struct remote_desc * const remote) { - return add_event(epollfd, EPOLLIN, fd, NULL); + return nio_add_fd(io, remote->fd, NIO_EVENT_INPUT, remote) != NIO_SUCCESS; } -static int add_in_event(int epollfd, struct remote_desc * const remote) +static int set_out_event(struct nio * const io, struct remote_desc * const remote) { - return add_event(epollfd, EPOLLIN, remote->fd, remote); + return nio_mod_fd(io, remote->fd, NIO_EVENT_OUTPUT, remote) != NIO_SUCCESS; } -static int mod_event(int epollfd, int events, int fd, void * ptr) +static int set_in_event(struct nio * const io, struct remote_desc * const remote) { - int retval; - struct epoll_event event = {}; - - event.data.ptr = ptr; - event.events = events; - - while ((retval = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event)) != 0 && errno == EINTR) {} - return retval; + return nio_mod_fd(io, remote->fd, NIO_EVENT_INPUT, remote) != NIO_SUCCESS; } -static int add_out_event(int epollfd, struct remote_desc * const remote) +static int del_event(struct nio * const io, int fd) { - return mod_event(epollfd, EPOLLIN | EPOLLOUT, remote->fd, remote); + return nio_del_fd(io, fd) != NIO_SUCCESS; } -static int del_out_event(int epollfd, struct remote_desc * const remote) +static void disconnect_client(struct nio * const io, struct remote_desc * const remote) { - return mod_event(epollfd, EPOLLIN, remote->fd, remote); -} - -static int del_event(int epollfd, int fd) -{ - int retval; - - while ((retval = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) != 0 && errno == EINTR) {} - return retval; -} - -static void disconnect_client(int epollfd, struct remote_desc * const remote) -{ - free_remote(epollfd, remote); + free_remote(io, remote); } static int nDPIsrvd_parse_options(int argc, char ** argv) @@ -828,6 +799,13 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) case 'c': set_cmdarg(&nDPIsrvd_options.collector_un_sockpath, optarg); break; + case 'e': +#ifdef ENABLE_EPOLL + nDPIsrvd_options.use_poll = 1; +#else + logger_early(1, "%s", "nDPIsrvd was built w/o epoll() support, poll() is already the default"); +#endif + break; case 'd': daemonize_enable(); break; @@ -870,7 +848,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) default: fprintf(stderr, "%s\n", get_nDPId_version()); fprintf(stderr, - "Usage: %s [-l] [-L logfile] [-c path-to-unix-sock] [-d] [-p pidfile]\n" + "Usage: %s [-l] [-L logfile] [-c path-to-unix-sock] [-e] [-d] [-p pidfile]\n" "\t[-s path-to-distributor-unix-socket] [-S distributor-host:port]\n" "\t[-m max-remote-descriptors] [-u user] [-g group]\n" "\t[-C max-buffered-json-lines] [-D]\n" @@ -879,6 +857,8 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) "\t-L\tLog all messages to a log file.\n" "\t-c\tPath to a listening UNIX socket (nDPIsrvd Collector).\n" "\t \tDefault: %s\n" + "\t-e\tUse poll() instead of epoll().\n" + "\t \tDefault: epoll() on Linux, poll() otherwise\n" "\t-d\tFork into background after initialization.\n" "\t-p\tWrite the daemon PID to the given file path.\n" "\t \tDefault: %s\n" @@ -970,7 +950,7 @@ static struct remote_desc * accept_remote(int server_fd, return current; } -static int new_connection(int epollfd, int eventfd) +static int new_connection(struct nio * const io, int eventfd) { union { @@ -1112,7 +1092,7 @@ static int new_connection(int epollfd, int eventfd) if (fcntl_add_flags(current->fd, O_NONBLOCK) != 0) { logger(1, "Error setting fd flags to non-blocking mode: %s", strerror(errno)); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } @@ -1123,18 +1103,19 @@ static int new_connection(int epollfd, int eventfd) /* shutdown reading end for distributor clients does not work due to epoll usage */ } - /* setup epoll event */ - if (add_in_event(epollfd, current) != 0) + /* setup event I/O */ + errno = 0; + if (add_in_event(io, current) != NIO_SUCCESS) { - logger(1, "Error adding input event to %d: %s", current->fd, strerror(errno)); - disconnect_client(epollfd, current); + logger(1, "Error adding input event to %d: %s", current->fd, (errno != 0 ? strerror(errno) : "Internal Error")); + disconnect_client(io, current); return 1; } return 0; } -static int handle_collector_protocol(int epollfd, struct remote_desc * const current) +static int handle_collector_protocol(struct nio * const io, struct remote_desc * const current) { struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current); char * json_str_start = NULL; @@ -1150,7 +1131,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur "BUG: Collector connection", "JSON invalid opening character: '%c'", json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } @@ -1161,7 +1142,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur if (errno == ERANGE) { logger_nDPIsrvd(current, "BUG: Collector connection", "JSON string length exceeds numceric limits"); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } @@ -1172,7 +1153,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur "missing JSON string length in protocol preamble: \"%.*s\"", NETWORK_BUFFER_LENGTH_DIGITS, json_read_buffer->buf.ptr.text); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } @@ -1194,7 +1175,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur "JSON string too big: %llu > %zu", current->event_collector_un.json_bytes, json_read_buffer->buf.max); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } @@ -1212,14 +1193,14 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur (int)current->event_collector_un.json_bytes > 512 ? 512 : (int)current->event_collector_un.json_bytes, json_read_buffer->buf.ptr.text); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } return 0; } -static int handle_incoming_data(int epollfd, struct remote_desc * const current) +static int handle_incoming_data(struct nio * const io, struct remote_desc * const current) { struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current); @@ -1235,7 +1216,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) { logger_nDPIsrvd(current, "Distributor connection", "closed"); } - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } @@ -1261,13 +1242,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) if (bytes_read < 0 || errno != 0) { logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno)); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } if (bytes_read == 0) { logger_nDPIsrvd(current, "Collector connection", "closed during read"); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } json_read_buffer->buf.used += bytes_read; @@ -1275,7 +1256,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) while (json_read_buffer->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1) { - if (handle_collector_protocol(epollfd, current) != 0) + if (handle_collector_protocol(io, current) != 0) { break; } @@ -1296,13 +1277,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) if (utarray_len(additional_write_buffers) == 0) { errno = 0; - if (add_out_event(epollfd, &remotes.desc[i]) != 0) + if (set_out_event(io, &remotes.desc[i]) != 0) { logger_nDPIsrvd(&remotes.desc[i], "Could not add event to", ", disconnecting: %s", - strerror(errno)); - disconnect_client(epollfd, &remotes.desc[i]); + (errno != 0 ? strerror(errno) : "Internal Error")); + disconnect_client(io, &remotes.desc[i]); continue; } } @@ -1310,7 +1291,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) json_read_buffer->buf.ptr.raw, current->event_collector_un.json_bytes) != 0) { - disconnect_client(epollfd, &remotes.desc[i]); + disconnect_client(io, &remotes.desc[i]); continue; } } @@ -1324,7 +1305,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) if (drain_main_buffer(&remotes.desc[i]) != 0) { - disconnect_client(epollfd, &remotes.desc[i]); + disconnect_client(io, &remotes.desc[i]); } } @@ -1338,13 +1319,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) return 0; } -static int handle_data_event(int epollfd, struct epoll_event * const event) +static int handle_data_event(struct nio * const io, int index) { - struct remote_desc * current = (struct remote_desc *)event->data.ptr; + struct remote_desc * const current = (struct remote_desc *)nio_get_ptr(io, index); - if ((event->events & EPOLLIN) == 0 && (event->events & EPOLLOUT) == 0) + if (nio_has_input(io, index) != NIO_SUCCESS && nio_can_output(io, index) != NIO_SUCCESS) { - logger(1, "Can not handle event mask: %d", event->events); + logger(1, "%s", "Neither input nor output event set."); return 1; } @@ -1360,17 +1341,17 @@ static int handle_data_event(int epollfd, struct epoll_event * const event) return 1; } - if ((event->events & EPOLLIN) != 0) + if (nio_has_input(io, index) == NIO_SUCCESS) { - return handle_incoming_data(epollfd, current); + return handle_incoming_data(io, current); } else { - return handle_outgoing_data(epollfd, current); + return handle_outgoing_data(io, current); } } -static int setup_signalfd(int epollfd) +static int setup_signalfd(struct nio * const io) { sigset_t mask; int sfd; @@ -1390,7 +1371,7 @@ static int setup_signalfd(int epollfd) return -1; } - if (add_in_event_fd(epollfd, sfd) != 0) + if (add_in_event_fd(io, sfd) != 0) { return -1; } @@ -1403,24 +1384,28 @@ static int setup_signalfd(int epollfd) return sfd; } -static int mainloop(int epollfd) +static int mainloop(struct nio * const io) { - struct epoll_event events[32]; - size_t const events_size = sizeof(events) / sizeof(events[0]); - int signalfd = setup_signalfd(epollfd); + int signalfd = setup_signalfd(io); while (nDPIsrvd_main_thread_shutdown == 0) { - int nready = epoll_wait(epollfd, events, events_size, 1000); + if (nio_run(io, 1000) != NIO_SUCCESS) + { + logger(1, "Event I/O returned error: %s", strerror(errno)); + } + + int nready = nio_get_nready(io); for (int i = 0; i < nready; i++) { - if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0) + int fd = nio_get_fd(io, i); + + if (nio_has_error(io, i) == NIO_SUCCESS) { - if (events[i].data.fd != collector_un_sockfd && events[i].data.fd != distributor_un_sockfd && - events[i].data.fd != distributor_in_sockfd) + if (fd != collector_un_sockfd && fd != distributor_un_sockfd && fd != distributor_in_sockfd) { - struct remote_desc * const current = (struct remote_desc *)events[i].data.ptr; + struct remote_desc * const current = (struct remote_desc *)nio_get_ptr(io, i); switch (current->sock_type) { case COLLECTOR_UN: @@ -1431,25 +1416,24 @@ static int mainloop(int epollfd) logger_nDPIsrvd(current, "Distributor connection", "closed"); break; } - disconnect_client(epollfd, current); + disconnect_client(io, current); } else { - logger(1, "Epoll event error: %s", (errno != 0 ? strerror(errno) : "unknown")); + logger(1, "Event I/O error: %s", (errno != 0 ? strerror(errno) : "unknown")); } break; } - if (events[i].data.fd == collector_un_sockfd || events[i].data.fd == distributor_un_sockfd || - events[i].data.fd == distributor_in_sockfd) + if (fd == collector_un_sockfd || fd == distributor_un_sockfd || fd == distributor_in_sockfd) { /* New connection to collector / distributor. */ - if (new_connection(epollfd, events[i].data.fd) != 0) + if (new_connection(io, fd) != 0) { continue; } } - else if (events[i].data.fd == signalfd) + else if (fd == signalfd) { struct signalfd_siginfo fdsi; ssize_t s; @@ -1473,7 +1457,7 @@ static int mainloop(int epollfd) else { /* Incoming data / Outoing data ready to receive / send. */ - if (handle_data_event(epollfd, &events[i]) != 0) + if (handle_data_event(io, i) != 0) { /* do nothing */ } @@ -1481,57 +1465,57 @@ static int mainloop(int epollfd) } } - free_remotes(epollfd); + free_remotes(io); + nio_free(io); close(signalfd); return 0; } -static int create_evq(void) +static int setup_event_queue(struct nio * const io) { - return epoll_create1(EPOLL_CLOEXEC); -} - -static int setup_event_queue(void) -{ - int epollfd = create_evq(); - if (epollfd < 0) +#ifdef ENABLE_EPOLL + if ((nDPIsrvd_options.use_poll == 0 && nio_use_epoll(io, 32) != NIO_SUCCESS) + || (nDPIsrvd_options.use_poll != 0 && nio_use_poll(io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)) +#else + if (nio_use_poll(io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS) +#endif { - logger(1, "Error creating epoll: %s", strerror(errno)); + logger(1, "%s", "Event I/O poll/epoll setup failed"); return -1; } - if (add_in_event_fd(epollfd, collector_un_sockfd) != 0) + errno = 0; + if (add_in_event_fd(io, collector_un_sockfd) != 0) { - logger(1, "Error adding collector UNIX socket fd to epoll: %s", strerror(errno)); + logger(1, + "Error adding collector UNIX socket fd to event I/O: %s", + (errno != 0 ? strerror(errno) : "Internal Error")); return -1; } - if (add_in_event_fd(epollfd, distributor_un_sockfd) != 0) + errno = 0; + if (add_in_event_fd(io, distributor_un_sockfd) != 0) { - logger(1, "Error adding distributor UNIX socket fd to epoll: %s", strerror(errno)); + logger(1, + "Error adding distributor UNIX socket fd to event I/O: %s", + (errno != 0 ? strerror(errno) : "Internal Error")); return -1; } if (distributor_in_sockfd >= 0) { - if (add_in_event_fd(epollfd, distributor_in_sockfd) != 0) + errno = 0; + if (add_in_event_fd(io, distributor_in_sockfd) != 0) { - logger(1, "Error adding distributor TCP/IP socket fd to epoll: %s", strerror(errno)); + logger(1, + "Error adding distributor TCP/IP socket fd to event I/O: %s", + (errno != 0 ? strerror(errno) : "Internal Error")); return -1; } } - return epollfd; -} - -static void close_event_queue(int epollfd) -{ - for (size_t i = 0; i < remotes.desc_size; ++i) - { - disconnect_client(epollfd, &remotes.desc[i]); - } - close(epollfd); + return 0; } static int setup_remote_descriptors(nDPIsrvd_ull max_remote_descriptors) @@ -1555,13 +1539,14 @@ static int setup_remote_descriptors(nDPIsrvd_ull max_remote_descriptors) int main(int argc, char ** argv) { int retval = 1; - int epollfd; + struct nio io; if (argc == 0) { return 1; } + nio_init(&io); init_logging("nDPIsrvd"); if (nDPIsrvd_parse_options(argc, argv) != 0) @@ -1677,14 +1662,12 @@ int main(int argc, char ** argv) signal(SIGTERM, SIG_IGN); signal(SIGQUIT, SIG_IGN); - epollfd = setup_event_queue(); - if (epollfd < 0) + if (setup_event_queue(&io) != 0) { goto error_unlink_sockets; } - retval = mainloop(epollfd); - close_event_queue(epollfd); + retval = mainloop(&io); error_unlink_sockets: if (unlink(get_cmdarg(&nDPIsrvd_options.collector_un_sockpath)) != 0) @@ -1,5 +1,6 @@ #include "nio.h" +#include <errno.h> #include <stdint.h> #include <stdlib.h> #ifdef ENABLE_EPOLL @@ -11,9 +12,9 @@ void nio_init(struct nio * io) { io->nready = -1; io->poll_max_fds = 0; - io->poll_cur_fds = 0; io->poll_fds = NULL; io->poll_ptrs = NULL; + io->poll_fds_set = NULL; io->epoll_fd = -1; io->max_events = 0; io->events = NULL; @@ -27,13 +28,15 @@ int nio_use_poll(struct nio * io, nfds_t max_fds) io->poll_max_fds = max_fds; io->poll_fds = (struct pollfd *)calloc(max_fds, sizeof(*io->poll_fds)); io->poll_ptrs = calloc(max_fds, sizeof(*io->poll_ptrs)); + io->poll_fds_set = calloc(max_fds, sizeof(*io->poll_fds_set)); for (size_t i = 0; i < max_fds; ++i) { io->poll_fds[i].fd = -1; } - return io->poll_fds == NULL || io->poll_ptrs == NULL; // return NIO_ERROR_INTERNAL on error + return io->poll_fds == NULL || io->poll_ptrs == NULL || io->poll_fds_set == NULL; // return NIO_ERROR_INTERNAL on + // error } int nio_use_epoll(struct nio * io, int max_events) @@ -46,7 +49,7 @@ int nio_use_epoll(struct nio * io, int max_events) io->max_events = max_events; io->events = calloc(max_events, sizeof(struct epoll_event)); - return io->events == NULL || io->epoll_fd < 0; + return io->events == NULL || io->epoll_fd < 0; // return NIO_ERROR_INTERNAL on error #else (void)io; (void)max_events; @@ -57,9 +60,13 @@ int nio_use_epoll(struct nio * io, int max_events) int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr) { + if (fd < 0) + return NIO_ERROR_INTERNAL; + #ifdef ENABLE_EPOLL if (io->epoll_fd >= 0) { + int rv; struct epoll_event event = {}; if (ptr == NULL) @@ -78,7 +85,11 @@ int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr) if (event.events == 0) return NIO_ERROR_INTERNAL; - return epoll_ctl(io->epoll_fd, EPOLL_CTL_ADD, fd, &event); + while ((rv = epoll_ctl(io->epoll_fd, EPOLL_CTL_ADD, fd, &event)) != 0 && errno == EINTR) + { + /* If epoll_ctl() was interrupted by the system, repeat. */ + } + return rv; } else #endif @@ -87,9 +98,6 @@ int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr) struct pollfd * unused_pollfd = NULL; void ** unused_ptr = NULL; - if (io->poll_cur_fds == io->poll_max_fds || fd < 0) - return NIO_ERROR_INTERNAL; - for (size_t i = 0; i < io->poll_max_fds; ++i) { if (io->poll_fds[i].fd < 0) @@ -112,9 +120,8 @@ int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr) unused_pollfd->fd = fd; *unused_ptr = ptr; - io->poll_cur_fds++; - return NIO_ERROR_SUCCESS; + return NIO_SUCCESS; } return NIO_ERROR_INTERNAL; @@ -122,9 +129,13 @@ int nio_add_fd(struct nio * io, int fd, int event_flags, void * ptr) int nio_mod_fd(struct nio * io, int fd, int event_flags, void * ptr) { + if (fd < 0) + return NIO_ERROR_INTERNAL; + #ifdef ENABLE_EPOLL if (io->epoll_fd >= 0) { + int rv; struct epoll_event event = {}; if (ptr == NULL) @@ -143,43 +154,43 @@ int nio_mod_fd(struct nio * io, int fd, int event_flags, void * ptr) if (event.events == 0) return NIO_ERROR_INTERNAL; - return epoll_ctl(io->epoll_fd, EPOLL_CTL_MOD, fd, &event); + while ((rv = epoll_ctl(io->epoll_fd, EPOLL_CTL_MOD, fd, &event)) != 0 && errno == EINTR) + { + /* If epoll_ctl() was interrupted by the system, repeat. */ + } + return rv; } else #endif if (io->poll_max_fds > 0) { - struct pollfd * unused_pollfd = NULL; - void ** unused_ptr = NULL; - - if (io->poll_cur_fds == io->poll_max_fds || fd < 0) - return NIO_ERROR_INTERNAL; + struct pollfd * used_pollfd = NULL; + void ** used_ptr = NULL; for (size_t i = 0; i < io->poll_max_fds; ++i) { - if (io->poll_fds[i].fd < 0) + if (io->poll_fds[i].fd == fd) { - unused_pollfd = &io->poll_fds[i]; - unused_ptr = &io->poll_ptrs[i]; + used_pollfd = &io->poll_fds[i]; + used_ptr = &io->poll_ptrs[i]; break; } } - if (unused_pollfd == NULL) + if (used_pollfd == NULL) return NIO_ERROR_INTERNAL; - unused_pollfd->events = 0; + used_pollfd->events = 0; if ((event_flags & NIO_EVENT_INPUT) != 0) - unused_pollfd->events |= POLLIN; + used_pollfd->events |= POLLIN; if ((event_flags & NIO_EVENT_OUTPUT) != 0) - unused_pollfd->events |= POLLOUT; - if (unused_pollfd->events == 0) + used_pollfd->events |= POLLOUT; + if (used_pollfd->events == 0) return NIO_ERROR_INTERNAL; - unused_pollfd->fd = fd; - *unused_ptr = ptr; - io->poll_cur_fds++; + used_pollfd->fd = fd; + *used_ptr = ptr; - return NIO_ERROR_SUCCESS; + return NIO_SUCCESS; } return NIO_ERROR_INTERNAL; @@ -187,38 +198,43 @@ int nio_mod_fd(struct nio * io, int fd, int event_flags, void * ptr) int nio_del_fd(struct nio * io, int fd) { + if (fd < 0) + return NIO_ERROR_INTERNAL; + #ifdef ENABLE_EPOLL if (io->epoll_fd >= 0) { - return epoll_ctl(io->epoll_fd, EPOLL_CTL_DEL, fd, NULL); + int rv; + + while ((rv = epoll_ctl(io->epoll_fd, EPOLL_CTL_DEL, fd, NULL)) != 0 && errno == EINTR) + { + /* If epoll_ctl() was interrupted by the system, repeat. */ + } + return rv; } else #endif if (io->poll_max_fds > 0) { - struct pollfd * unused_pollfd = NULL; - void ** unused_ptr = NULL; - - if (io->poll_cur_fds == io->poll_max_fds || fd < 0) - return NIO_ERROR_INTERNAL; + struct pollfd * used_pollfd = NULL; + void ** used_ptr = NULL; for (size_t i = 0; i < io->poll_max_fds; ++i) { - if (io->poll_fds[i].fd < 0) + if (io->poll_fds[i].fd == fd) { - unused_pollfd = &io->poll_fds[i]; - unused_ptr = &io->poll_ptrs[i]; + used_pollfd = &io->poll_fds[i]; + used_ptr = &io->poll_ptrs[i]; break; } } - if (unused_pollfd == NULL) + if (used_pollfd == NULL) return NIO_ERROR_INTERNAL; - unused_pollfd->fd = -1; - *unused_ptr = NULL; - io->poll_cur_fds--; + used_pollfd->fd = -1; + *used_ptr = NULL; - return NIO_ERROR_SUCCESS; + return NIO_SUCCESS; } return NIO_ERROR_INTERNAL; @@ -229,7 +245,11 @@ int nio_run(struct nio * io, int timeout) #ifdef ENABLE_EPOLL if (io->epoll_fd >= 0) { - io->nready = epoll_wait(io->epoll_fd, io->events, io->max_events, timeout); + do + { + io->nready = epoll_wait(io->epoll_fd, io->events, io->max_events, timeout); + } while (io->nready < 0 && errno == EINTR); + if (io->nready < 0) return NIO_ERROR_SYSTEM; } @@ -237,60 +257,73 @@ int nio_run(struct nio * io, int timeout) #endif if (io->poll_max_fds > 0) { - io->nready = poll(io->poll_fds, io->poll_max_fds, timeout); + do + { + io->nready = poll(io->poll_fds, io->poll_max_fds, timeout); + } while (io->nready < 0 && errno == EINTR); + if (io->nready < 0) return NIO_ERROR_SYSTEM; - else - io->nready = io->poll_max_fds; + + if (io->nready > 0) + { + for (nfds_t i = 0, j = 0; i < io->poll_max_fds; ++i) + { + if (io->poll_fds[i].fd >= 0 && io->poll_fds[i].revents != 0) + { + io->poll_fds_set[j++] = i; + } + } + } } - return NIO_ERROR_SUCCESS; + return NIO_SUCCESS; } -int nio_check(struct nio * io, int index, int events) +int nio_check(struct nio * io, int index, int event_flags) { - if (index < 0 || index >= io->nready) + if (nio_is_valid(io, index) != NIO_SUCCESS) return NIO_ERROR_INTERNAL; #ifdef ENABLE_EPOLL - if (io->epoll_fd >= 0 && index < io->max_events) + if (io->epoll_fd >= 0) { uint32_t epoll_events = 0; - if ((events & NIO_EVENT_INPUT) != 0) + if ((event_flags & NIO_EVENT_INPUT) != 0) epoll_events |= EPOLLIN; - if ((events & NIO_EVENT_OUTPUT) != 0) + if ((event_flags & NIO_EVENT_OUTPUT) != 0) epoll_events |= EPOLLOUT; - if ((events & NIO_EVENT_ERROR) != 0) + if ((event_flags & NIO_EVENT_ERROR) != 0) epoll_events |= EPOLLERR | EPOLLHUP; if (epoll_events == 0) return NIO_ERROR_INTERNAL; - struct epoll_event * ee = (struct epoll_event *)io->events; - if ((ee[index].events & epoll_events) == 0) + struct epoll_event const * const events = (struct epoll_event *)io->events; + if ((events[index].events & epoll_events) == 0) return NIO_ERROR_INTERNAL; - return NIO_ERROR_SUCCESS; + return NIO_SUCCESS; } else #endif - if (io->poll_max_fds > 0 && index < (int)io->poll_max_fds) + if (io->poll_max_fds > 0) { short int poll_events = 0; - if ((events & NIO_EVENT_INPUT) != 0) + if ((event_flags & NIO_EVENT_INPUT) != 0) poll_events |= POLLIN; - if ((events & NIO_EVENT_OUTPUT) != 0) + if ((event_flags & NIO_EVENT_OUTPUT) != 0) poll_events |= POLLOUT; - if ((events & NIO_EVENT_ERROR) != 0) + if ((event_flags & NIO_EVENT_ERROR) != 0) poll_events |= POLLERR | POLLHUP; if (poll_events == 0) return NIO_ERROR_INTERNAL; - if ((io->poll_fds[index].revents & poll_events) == 0) + if ((io->poll_fds[io->poll_fds_set[index]].revents & poll_events) == 0) return NIO_ERROR_INTERNAL; - return NIO_ERROR_SUCCESS; + return NIO_SUCCESS; } return NIO_ERROR_INTERNAL; @@ -302,34 +335,62 @@ int nio_is_valid(struct nio * io, int index) return NIO_ERROR_INTERNAL; #ifdef ENABLE_EPOLL - if (io->epoll_fd >= 0 && index <= io->max_events) + if (io->epoll_fd >= 0) { - return NIO_ERROR_SUCCESS; + return NIO_SUCCESS; } else #endif - if (io->poll_max_fds > 0 && index < (int)io->poll_max_fds) + if (io->poll_max_fds > 0 && io->poll_fds[io->poll_fds_set[index]].fd >= 0) { - if (io->poll_fds[index].revents != 0) - return NIO_ERROR_SUCCESS; + return NIO_SUCCESS; } return NIO_ERROR_INTERNAL; } -int nio_has_input(struct nio * io, int index) +int nio_get_fd(struct nio * io, int index) { - return nio_check(io, index, NIO_EVENT_INPUT); -} + if (nio_is_valid(io, index) != NIO_SUCCESS) + return -1; -int nio_can_output(struct nio * io, int index) -{ - return nio_check(io, index, NIO_EVENT_OUTPUT); +#ifdef ENABLE_EPOLL + if (io->epoll_fd >= 0) + { + struct epoll_event const * const events = (struct epoll_event *)io->events; + + return events[index].data.fd; + } + else +#endif + if (io->poll_max_fds > 0) + { + return io->poll_fds[io->poll_fds_set[index]].fd; + } + + return -1; } -int nio_has_error(struct nio * io, int index) +void * nio_get_ptr(struct nio * io, int index) { - return nio_check(io, index, NIO_EVENT_ERROR); + if (nio_is_valid(io, index) != NIO_SUCCESS) + return NULL; + +#ifdef ENABLE_EPOLL + if (io->epoll_fd >= 0) + { + struct epoll_event * const events = (struct epoll_event *)io->events; + + return events[index].data.ptr; + } + else +#endif + if (io->poll_max_fds > 0) + { + return io->poll_ptrs[io->poll_fds_set[index]]; + } + + return NULL; } void nio_free(struct nio * io) @@ -351,5 +412,6 @@ void nio_free(struct nio * io) #endif free(io->poll_fds); free(io->poll_ptrs); + free(io->poll_fds_set); free(io->events); } @@ -5,7 +5,7 @@ enum { - NIO_ERROR_SUCCESS = 0, + NIO_SUCCESS = 0, NIO_ERROR_INTERNAL = 1, NIO_ERROR_SYSTEM = -1 }; @@ -23,9 +23,9 @@ struct nio int nready; nfds_t poll_max_fds; - nfds_t poll_cur_fds; struct pollfd * poll_fds; void ** poll_ptrs; + nfds_t * poll_fds_set; int epoll_fd; int max_events; @@ -46,15 +46,33 @@ int nio_del_fd(struct nio * io, int fd); int nio_run(struct nio * io, int timeout); +static inline int nio_get_nready(struct nio const * const io) +{ + return io->nready; +} + int nio_check(struct nio * io, int index, int events); int nio_is_valid(struct nio * io, int index); -int nio_has_input(struct nio * io, int index); +int nio_get_fd(struct nio * io, int index); -int nio_can_output(struct nio * io, int index); +void * nio_get_ptr(struct nio * io, int index); + +static inline int nio_has_input(struct nio * io, int index) +{ + return nio_check(io, index, NIO_EVENT_INPUT); +} -int nio_has_error(struct nio * io, int index); +static inline int nio_can_output(struct nio * io, int index) +{ + return nio_check(io, index, NIO_EVENT_OUTPUT); +} + +static inline int nio_has_error(struct nio * io, int index) +{ + return nio_check(io, index, NIO_EVENT_ERROR); +} void nio_free(struct nio * io); |