diff options
author | Toni <matzeton@googlemail.com> | 2023-11-06 12:38:15 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-06 12:38:15 +0100 |
commit | 1b679271693a17ce0b653b9ba45db77b731db42e (patch) | |
tree | 986a2fac5feeaae71e2c2bd4e771e31a7c966de6 /nDPIsrvd.c | |
parent | 17c21e1d27a90b394873a0e80e5d6992f4b985ee (diff) |
Event I/O abstraction layer. (#28)
* Finalize Event I/O abstraction layer.
* Fix possible fd leakage, Gitlab-CI build and error logging.
* Fixed possible uninitialized signalfd variable.
* Fixed possible memory leak.
* Fixed some SonarCloud complaints.
* Fixed nDPId-test nDPIsrvd-arpa-mockup stuck indefinitely.
* Add nDPId / nDPIsrvd command line option to use poll() on Linux instead of the default epoll().
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r-- | nDPIsrvd.c | 265 |
1 files changed, 124 insertions, 141 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index 28a989fc0..a4d7457fe 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -9,7 +9,6 @@ #include <stdlib.h> #include <stdint.h> #include <string.h> -#include <sys/epoll.h> #include <sys/signalfd.h> #include <sys/socket.h> #include <sys/types.h> @@ -17,6 +16,7 @@ #include "config.h" #include "nDPIsrvd.h" +#include "nio.h" #include "utils.h" enum sock_type @@ -92,7 +92,10 @@ static struct struct cmdarg group; nDPIsrvd_ull max_remote_descriptors; nDPIsrvd_ull max_write_buffers; - int bufferbloat_fallback_to_blocking; + uint8_t bufferbloat_fallback_to_blocking; +#ifdef ENABLE_EPOLL + uint8_t use_poll; +#endif } nDPIsrvd_options = {.pidfile = CMDARG(nDPIsrvd_PIDFILE), .collector_un_sockpath = CMDARG(COLLECTOR_UNIX_SOCKET), .distributor_un_sockpath = CMDARG(DISTRIBUTOR_UNIX_SOCKET), @@ -109,11 +112,11 @@ static void logger_nDPIsrvd(struct remote_desc const * const remote, ...); static int fcntl_add_flags(int fd, int flags); static int fcntl_del_flags(int fd, int flags); -static int add_in_event_fd(int epollfd, int fd); -static int add_in_event(int epollfd, struct remote_desc * const remote); -static int del_event(int epollfd, int fd); -static int del_out_event(int epollfd, struct remote_desc * const remote); -static void disconnect_client(int epollfd, struct remote_desc * const current); +static int add_in_event_fd(struct nio * const io, int fd); +static int add_in_event(struct nio * const io, struct remote_desc * const remote); +static int del_event(struct nio * const io, int fd); +static int set_in_event(struct nio * const io, struct remote_desc * const remote); +static void disconnect_client(struct nio * const io, struct remote_desc * const current); static int drain_write_buffers_blocking(struct remote_desc * const remote); static void nDPIsrvd_buffer_array_copy(void * dst, const void * src) @@ -414,7 +417,7 @@ static int drain_write_buffers_blocking(struct remote_desc * const remote) return retval; } -static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) +static int handle_outgoing_data(struct nio * const io, struct remote_desc * const remote) { UT_array * const additional_write_buffers = get_additional_write_buffers(remote); @@ -425,7 +428,7 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) if (drain_write_buffers(remote) != 0) { logger_nDPIsrvd(remote, "Could not drain buffers for", ": %s", strerror(errno)); - disconnect_client(epollfd, remote); + disconnect_client(io, remote); return -1; } if (utarray_len(additional_write_buffers) == 0) @@ -433,7 +436,7 @@ static int handle_outgoing_data(int epollfd, struct remote_desc * const remote) struct nDPIsrvd_write_buffer * const write_buffer = get_write_buffer(remote); if (write_buffer->buf.used == 0) { - return del_out_event(epollfd, remote); + return set_in_event(io, remote); } else { return drain_main_buffer(remote); } @@ -679,15 +682,17 @@ static struct remote_desc * get_remote_descriptor(enum sock_type type, int remot return NULL; } -static void free_remote(int epollfd, struct remote_desc * remote) +static void free_remote(struct nio * const io, struct remote_desc * remote) { if (remote->fd > -1) { errno = 0; - del_event(epollfd, remote->fd); - if (errno != 0) + if (del_event(io, remote->fd) != 0) { - logger_nDPIsrvd(remote, "Could not delete event from epoll for connection", ": %s", strerror(errno)); + logger_nDPIsrvd(remote, + "Could not delete event from queue for connection", + ": %s", + (errno != 0 ? strerror(errno) : "Internal Error")); } errno = 0; close(remote->fd); @@ -732,11 +737,11 @@ static void free_remote(int epollfd, struct remote_desc * remote) } } -static void free_remotes(int epollfd) +static void free_remotes(struct nio * const io) { for (size_t i = 0; i < remotes.desc_size; ++i) { - free_remote(epollfd, &remotes.desc[i]); + free_remote(io, &remotes.desc[i]); } nDPIsrvd_free(remotes.desc); remotes.desc = NULL; @@ -744,68 +749,34 @@ static void free_remotes(int epollfd) remotes.desc_size = 0; } -static int add_event(int epollfd, int events, int fd, void * ptr) +static int add_in_event_fd(struct nio * const io, int fd) { - int retval; - struct epoll_event event = {}; - - if (ptr != NULL) - { - event.data.ptr = ptr; - } - else - { - event.data.fd = fd; - } - event.events = events; - - while ((retval = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event)) != 0 && errno == EINTR) {} - return retval; + return nio_add_fd(io, fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS; } -static int add_in_event_fd(int epollfd, int fd) +static int add_in_event(struct nio * const io, struct remote_desc * const remote) { - return add_event(epollfd, EPOLLIN, fd, NULL); + return nio_add_fd(io, remote->fd, NIO_EVENT_INPUT, remote) != NIO_SUCCESS; } -static int add_in_event(int epollfd, struct remote_desc * const remote) +static int set_out_event(struct nio * const io, struct remote_desc * const remote) { - return add_event(epollfd, EPOLLIN, remote->fd, remote); + return nio_mod_fd(io, remote->fd, NIO_EVENT_OUTPUT, remote) != NIO_SUCCESS; } -static int mod_event(int epollfd, int events, int fd, void * ptr) +static int set_in_event(struct nio * const io, struct remote_desc * const remote) { - int retval; - struct epoll_event event = {}; - - event.data.ptr = ptr; - event.events = events; - - while ((retval = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event)) != 0 && errno == EINTR) {} - return retval; + return nio_mod_fd(io, remote->fd, NIO_EVENT_INPUT, remote) != NIO_SUCCESS; } -static int add_out_event(int epollfd, struct remote_desc * const remote) +static int del_event(struct nio * const io, int fd) { - return mod_event(epollfd, EPOLLIN | EPOLLOUT, remote->fd, remote); + return nio_del_fd(io, fd) != NIO_SUCCESS; } -static int del_out_event(int epollfd, struct remote_desc * const remote) +static void disconnect_client(struct nio * const io, struct remote_desc * const remote) { - return mod_event(epollfd, EPOLLIN, remote->fd, remote); -} - -static int del_event(int epollfd, int fd) -{ - int retval; - - while ((retval = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) != 0 && errno == EINTR) {} - return retval; -} - -static void disconnect_client(int epollfd, struct remote_desc * const remote) -{ - free_remote(epollfd, remote); + free_remote(io, remote); } static int nDPIsrvd_parse_options(int argc, char ** argv) @@ -828,6 +799,13 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) case 'c': set_cmdarg(&nDPIsrvd_options.collector_un_sockpath, optarg); break; + case 'e': +#ifdef ENABLE_EPOLL + nDPIsrvd_options.use_poll = 1; +#else + logger_early(1, "%s", "nDPIsrvd was built w/o epoll() support, poll() is already the default"); +#endif + break; case 'd': daemonize_enable(); break; @@ -870,7 +848,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) default: fprintf(stderr, "%s\n", get_nDPId_version()); fprintf(stderr, - "Usage: %s [-l] [-L logfile] [-c path-to-unix-sock] [-d] [-p pidfile]\n" + "Usage: %s [-l] [-L logfile] [-c path-to-unix-sock] [-e] [-d] [-p pidfile]\n" "\t[-s path-to-distributor-unix-socket] [-S distributor-host:port]\n" "\t[-m max-remote-descriptors] [-u user] [-g group]\n" "\t[-C max-buffered-json-lines] [-D]\n" @@ -879,6 +857,8 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) "\t-L\tLog all messages to a log file.\n" "\t-c\tPath to a listening UNIX socket (nDPIsrvd Collector).\n" "\t \tDefault: %s\n" + "\t-e\tUse poll() instead of epoll().\n" + "\t \tDefault: epoll() on Linux, poll() otherwise\n" "\t-d\tFork into background after initialization.\n" "\t-p\tWrite the daemon PID to the given file path.\n" "\t \tDefault: %s\n" @@ -970,7 +950,7 @@ static struct remote_desc * accept_remote(int server_fd, return current; } -static int new_connection(int epollfd, int eventfd) +static int new_connection(struct nio * const io, int eventfd) { union { @@ -1112,7 +1092,7 @@ static int new_connection(int epollfd, int eventfd) if (fcntl_add_flags(current->fd, O_NONBLOCK) != 0) { logger(1, "Error setting fd flags to non-blocking mode: %s", strerror(errno)); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } @@ -1123,18 +1103,19 @@ static int new_connection(int epollfd, int eventfd) /* shutdown reading end for distributor clients does not work due to epoll usage */ } - /* setup epoll event */ - if (add_in_event(epollfd, current) != 0) + /* setup event I/O */ + errno = 0; + if (add_in_event(io, current) != NIO_SUCCESS) { - logger(1, "Error adding input event to %d: %s", current->fd, strerror(errno)); - disconnect_client(epollfd, current); + logger(1, "Error adding input event to %d: %s", current->fd, (errno != 0 ? strerror(errno) : "Internal Error")); + disconnect_client(io, current); return 1; } return 0; } -static int handle_collector_protocol(int epollfd, struct remote_desc * const current) +static int handle_collector_protocol(struct nio * const io, struct remote_desc * const current) { struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current); char * json_str_start = NULL; @@ -1150,7 +1131,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur "BUG: Collector connection", "JSON invalid opening character: '%c'", json_read_buffer->buf.ptr.text[NETWORK_BUFFER_LENGTH_DIGITS]); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } @@ -1161,7 +1142,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur if (errno == ERANGE) { logger_nDPIsrvd(current, "BUG: Collector connection", "JSON string length exceeds numceric limits"); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } @@ -1172,7 +1153,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur "missing JSON string length in protocol preamble: \"%.*s\"", NETWORK_BUFFER_LENGTH_DIGITS, json_read_buffer->buf.ptr.text); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } @@ -1194,7 +1175,7 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur "JSON string too big: %llu > %zu", current->event_collector_un.json_bytes, json_read_buffer->buf.max); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } @@ -1212,14 +1193,14 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur (int)current->event_collector_un.json_bytes > 512 ? 512 : (int)current->event_collector_un.json_bytes, json_read_buffer->buf.ptr.text); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } return 0; } -static int handle_incoming_data(int epollfd, struct remote_desc * const current) +static int handle_incoming_data(struct nio * const io, struct remote_desc * const current) { struct nDPIsrvd_json_buffer * const json_read_buffer = get_read_buffer(current); @@ -1235,7 +1216,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) { logger_nDPIsrvd(current, "Distributor connection", "closed"); } - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } @@ -1261,13 +1242,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) if (bytes_read < 0 || errno != 0) { logger_nDPIsrvd(current, "Could not read remote", ": %s", strerror(errno)); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } if (bytes_read == 0) { logger_nDPIsrvd(current, "Collector connection", "closed during read"); - disconnect_client(epollfd, current); + disconnect_client(io, current); return 1; } json_read_buffer->buf.used += bytes_read; @@ -1275,7 +1256,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) while (json_read_buffer->buf.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1) { - if (handle_collector_protocol(epollfd, current) != 0) + if (handle_collector_protocol(io, current) != 0) { break; } @@ -1296,13 +1277,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) if (utarray_len(additional_write_buffers) == 0) { errno = 0; - if (add_out_event(epollfd, &remotes.desc[i]) != 0) + if (set_out_event(io, &remotes.desc[i]) != 0) { logger_nDPIsrvd(&remotes.desc[i], "Could not add event to", ", disconnecting: %s", - strerror(errno)); - disconnect_client(epollfd, &remotes.desc[i]); + (errno != 0 ? strerror(errno) : "Internal Error")); + disconnect_client(io, &remotes.desc[i]); continue; } } @@ -1310,7 +1291,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) json_read_buffer->buf.ptr.raw, current->event_collector_un.json_bytes) != 0) { - disconnect_client(epollfd, &remotes.desc[i]); + disconnect_client(io, &remotes.desc[i]); continue; } } @@ -1324,7 +1305,7 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) if (drain_main_buffer(&remotes.desc[i]) != 0) { - disconnect_client(epollfd, &remotes.desc[i]); + disconnect_client(io, &remotes.desc[i]); } } @@ -1338,13 +1319,13 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) return 0; } -static int handle_data_event(int epollfd, struct epoll_event * const event) +static int handle_data_event(struct nio * const io, int index) { - struct remote_desc * current = (struct remote_desc *)event->data.ptr; + struct remote_desc * const current = (struct remote_desc *)nio_get_ptr(io, index); - if ((event->events & EPOLLIN) == 0 && (event->events & EPOLLOUT) == 0) + if (nio_has_input(io, index) != NIO_SUCCESS && nio_can_output(io, index) != NIO_SUCCESS) { - logger(1, "Can not handle event mask: %d", event->events); + logger(1, "%s", "Neither input nor output event set."); return 1; } @@ -1360,17 +1341,17 @@ static int handle_data_event(int epollfd, struct epoll_event * const event) return 1; } - if ((event->events & EPOLLIN) != 0) + if (nio_has_input(io, index) == NIO_SUCCESS) { - return handle_incoming_data(epollfd, current); + return handle_incoming_data(io, current); } else { - return handle_outgoing_data(epollfd, current); + return handle_outgoing_data(io, current); } } -static int setup_signalfd(int epollfd) +static int setup_signalfd(struct nio * const io) { sigset_t mask; int sfd; @@ -1390,7 +1371,7 @@ static int setup_signalfd(int epollfd) return -1; } - if (add_in_event_fd(epollfd, sfd) != 0) + if (add_in_event_fd(io, sfd) != 0) { return -1; } @@ -1403,24 +1384,28 @@ static int setup_signalfd(int epollfd) return sfd; } -static int mainloop(int epollfd) +static int mainloop(struct nio * const io) { - struct epoll_event events[32]; - size_t const events_size = sizeof(events) / sizeof(events[0]); - int signalfd = setup_signalfd(epollfd); + int signalfd = setup_signalfd(io); while (nDPIsrvd_main_thread_shutdown == 0) { - int nready = epoll_wait(epollfd, events, events_size, 1000); + if (nio_run(io, 1000) != NIO_SUCCESS) + { + logger(1, "Event I/O returned error: %s", strerror(errno)); + } + + int nready = nio_get_nready(io); for (int i = 0; i < nready; i++) { - if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0) + int fd = nio_get_fd(io, i); + + if (nio_has_error(io, i) == NIO_SUCCESS) { - if (events[i].data.fd != collector_un_sockfd && events[i].data.fd != distributor_un_sockfd && - events[i].data.fd != distributor_in_sockfd) + if (fd != collector_un_sockfd && fd != distributor_un_sockfd && fd != distributor_in_sockfd) { - struct remote_desc * const current = (struct remote_desc *)events[i].data.ptr; + struct remote_desc * const current = (struct remote_desc *)nio_get_ptr(io, i); switch (current->sock_type) { case COLLECTOR_UN: @@ -1431,25 +1416,24 @@ static int mainloop(int epollfd) logger_nDPIsrvd(current, "Distributor connection", "closed"); break; } - disconnect_client(epollfd, current); + disconnect_client(io, current); } else { - logger(1, "Epoll event error: %s", (errno != 0 ? strerror(errno) : "unknown")); + logger(1, "Event I/O error: %s", (errno != 0 ? strerror(errno) : "unknown")); } break; } - if (events[i].data.fd == collector_un_sockfd || events[i].data.fd == distributor_un_sockfd || - events[i].data.fd == distributor_in_sockfd) + if (fd == collector_un_sockfd || fd == distributor_un_sockfd || fd == distributor_in_sockfd) { /* New connection to collector / distributor. */ - if (new_connection(epollfd, events[i].data.fd) != 0) + if (new_connection(io, fd) != 0) { continue; } } - else if (events[i].data.fd == signalfd) + else if (fd == signalfd) { struct signalfd_siginfo fdsi; ssize_t s; @@ -1473,7 +1457,7 @@ static int mainloop(int epollfd) else { /* Incoming data / Outoing data ready to receive / send. */ - if (handle_data_event(epollfd, &events[i]) != 0) + if (handle_data_event(io, i) != 0) { /* do nothing */ } @@ -1481,57 +1465,57 @@ static int mainloop(int epollfd) } } - free_remotes(epollfd); + free_remotes(io); + nio_free(io); close(signalfd); return 0; } -static int create_evq(void) +static int setup_event_queue(struct nio * const io) { - return epoll_create1(EPOLL_CLOEXEC); -} - -static int setup_event_queue(void) -{ - int epollfd = create_evq(); - if (epollfd < 0) +#ifdef ENABLE_EPOLL + if ((nDPIsrvd_options.use_poll == 0 && nio_use_epoll(io, 32) != NIO_SUCCESS) + || (nDPIsrvd_options.use_poll != 0 && nio_use_poll(io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)) +#else + if (nio_use_poll(io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS) +#endif { - logger(1, "Error creating epoll: %s", strerror(errno)); + logger(1, "%s", "Event I/O poll/epoll setup failed"); return -1; } - if (add_in_event_fd(epollfd, collector_un_sockfd) != 0) + errno = 0; + if (add_in_event_fd(io, collector_un_sockfd) != 0) { - logger(1, "Error adding collector UNIX socket fd to epoll: %s", strerror(errno)); + logger(1, + "Error adding collector UNIX socket fd to event I/O: %s", + (errno != 0 ? strerror(errno) : "Internal Error")); return -1; } - if (add_in_event_fd(epollfd, distributor_un_sockfd) != 0) + errno = 0; + if (add_in_event_fd(io, distributor_un_sockfd) != 0) { - logger(1, "Error adding distributor UNIX socket fd to epoll: %s", strerror(errno)); + logger(1, + "Error adding distributor UNIX socket fd to event I/O: %s", + (errno != 0 ? strerror(errno) : "Internal Error")); return -1; } if (distributor_in_sockfd >= 0) { - if (add_in_event_fd(epollfd, distributor_in_sockfd) != 0) + errno = 0; + if (add_in_event_fd(io, distributor_in_sockfd) != 0) { - logger(1, "Error adding distributor TCP/IP socket fd to epoll: %s", strerror(errno)); + logger(1, + "Error adding distributor TCP/IP socket fd to event I/O: %s", + (errno != 0 ? strerror(errno) : "Internal Error")); return -1; } } - return epollfd; -} - -static void close_event_queue(int epollfd) -{ - for (size_t i = 0; i < remotes.desc_size; ++i) - { - disconnect_client(epollfd, &remotes.desc[i]); - } - close(epollfd); + return 0; } static int setup_remote_descriptors(nDPIsrvd_ull max_remote_descriptors) @@ -1555,13 +1539,14 @@ static int setup_remote_descriptors(nDPIsrvd_ull max_remote_descriptors) int main(int argc, char ** argv) { int retval = 1; - int epollfd; + struct nio io; if (argc == 0) { return 1; } + nio_init(&io); init_logging("nDPIsrvd"); if (nDPIsrvd_parse_options(argc, argv) != 0) @@ -1677,14 +1662,12 @@ int main(int argc, char ** argv) signal(SIGTERM, SIG_IGN); signal(SIGQUIT, SIG_IGN); - epollfd = setup_event_queue(); - if (epollfd < 0) + if (setup_event_queue(&io) != 0) { goto error_unlink_sockets; } - retval = mainloop(epollfd); - close_event_queue(epollfd); + retval = mainloop(&io); error_unlink_sockets: if (unlink(get_cmdarg(&nDPIsrvd_options.collector_un_sockpath)) != 0) |