diff options
Diffstat (limited to 'nDPId-test.c')
-rw-r--r-- | nDPId-test.c | 192 |
1 files changed, 106 insertions, 86 deletions
diff --git a/nDPId-test.c b/nDPId-test.c index fb21bc65c..48c0e2b70 100644 --- a/nDPId-test.c +++ b/nDPId-test.c @@ -292,14 +292,12 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) { int nDPIsrvd_distributor_disconnects = 0; int const nDPIsrvd_distributor_expected_disconnects = 5; - int epollfd; + struct nio io; struct remote_desc * mock_json_desc = NULL; struct remote_desc * mock_test_desc = NULL; struct remote_desc * mock_buff_desc = NULL; struct remote_desc * mock_null_desc = NULL; struct remote_desc * mock_arpa_desc = NULL; - struct epoll_event events[32]; - size_t const events_size = sizeof(events) / sizeof(events[0]); logger(0, "nDPIsrvd thread started, init.."); @@ -308,11 +306,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) logger(1, "nDPIsrvd block signals failed: %s", strerror(errno)); } - errno = 0; - epollfd = create_evq(); - if (epollfd < 0) + nio_init(&io); + +#ifdef ENABLE_EPOLL + if (nio_use_epoll(&io, 32) != NIO_SUCCESS) +#else + if (nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS) +#endif { - logger(1, "nDPIsrvd epollfd invalid: %d", epollfd); + logger(1, "%s", "Error creating nDPIsrvd poll/epoll event I/O"); THREAD_ERROR_GOTO(arg); } @@ -356,9 +358,9 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) mock_arpa_desc->event_distributor_in.peer.sin_port = 0; errno = 0; - if (add_in_event(epollfd, mock_json_desc) != 0 || add_in_event(epollfd, mock_test_desc) != 0 || - add_in_event(epollfd, mock_buff_desc) != 0 || add_in_event(epollfd, mock_null_desc) != 0 || - add_in_event(epollfd, mock_arpa_desc) != 0) + if (add_in_event(&io, mock_json_desc) != 0 || add_in_event(&io, mock_test_desc) != 0 || + add_in_event(&io, mock_buff_desc) != 0 || add_in_event(&io, mock_null_desc) != 0 || + add_in_event(&io, mock_arpa_desc) != 0) { logger(1, "%s", "nDPIsrvd add input event failed"); THREAD_ERROR_GOTO(arg); @@ -370,28 +372,25 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) while (nDPIsrvd_distributor_disconnects < nDPIsrvd_distributor_expected_disconnects) { - errno = 0; - int nready = epoll_wait(epollfd, events, events_size, -1); - if (nready < 0 && errno != EINTR) + if (nio_run(&io, -1) != NIO_SUCCESS) { - logger(1, "%s", "nDPIsrvd epoll wait failed."); + logger(1, "nDPIsrvd event I/O returned: %s", strerror(errno)); THREAD_ERROR_GOTO(arg); } - else if (errno == EINTR) - { - continue; - } + + int nready = nio_get_nready(&io); for (int i = 0; i < nready; i++) { - if (events[i].data.ptr == mock_json_desc || events[i].data.ptr == mock_test_desc || - events[i].data.ptr == mock_buff_desc || events[i].data.ptr == mock_null_desc || - events[i].data.ptr == mock_arpa_desc) + struct remote_desc * remote = (struct remote_desc *)nio_get_ptr(&io, i); + + if (remote == mock_json_desc || remote == mock_test_desc || + remote == mock_buff_desc || remote == mock_null_desc || + remote == mock_arpa_desc) { - if ((events[i].events & EPOLLHUP) != 0 || (events[i].events & EPOLLERR) != 0) + if (nio_has_error(&io, i) == NIO_SUCCESS) { char const * remote_desc_name; - struct remote_desc * remote = (struct remote_desc *)events[i].data.ptr; if (remote == mock_json_desc) { remote_desc_name = "Mock JSON"; @@ -405,7 +404,7 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) drain_write_buffers_blocking(mock_null_desc); if (mock_arpa_desc->fd >= 0) drain_write_buffers_blocking(mock_arpa_desc); - } while (handle_data_event(epollfd, &events[i]) == 0); + } while (handle_data_event(&io, i) == 0); } else if (remote == mock_test_desc) { @@ -433,16 +432,22 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) remote_desc_name, nDPIsrvd_distributor_disconnects, nDPIsrvd_distributor_expected_disconnects); - free_remote(epollfd, remote); + free_remote(&io, remote); } else { - if (handle_data_event(epollfd, &events[i]) != 0) + if (handle_data_event(&io, i) != 0) { - if (mock_arpa_desc == events[i].data.ptr) + if (mock_arpa_desc == remote) { // arpa mock does not care about shutdown events - disconnect_client(epollfd, mock_arpa_desc); + free_remote(&io, mock_arpa_desc); + nDPIsrvd_distributor_disconnects++; + logger(1, + "nDPIsrvd distributor '%s' connection closed (%d/%d)", + "Mock ARPA", + nDPIsrvd_distributor_disconnects, + nDPIsrvd_distributor_expected_disconnects); continue; } logger(1, "%s", "nDPIsrvd data event handler failed"); @@ -453,17 +458,15 @@ static void * nDPIsrvd_mainloop_thread(void * const arg) else { logger(1, - "nDPIsrvd epoll returned unexpected event data: %d (%p)", - events[i].data.fd, - events[i].data.ptr); + "nDPIsrvd epoll returned unexpected event data: %p", remote); THREAD_ERROR_GOTO(arg); } } } error: - free_remotes(epollfd); - close(epollfd); + free_remotes(&io); + nio_free(&io); logger(0, "%s", "nDPIsrvd worker thread exits.."); return NULL; @@ -919,10 +922,8 @@ static enum nDPIsrvd_callback_return distributor_json_printer(struct nDPIsrvd_so static void * distributor_client_mainloop_thread(void * const arg) { - int dis_epollfd = create_evq(); - int signalfd = setup_signalfd(dis_epollfd); - struct epoll_event events[32]; - size_t const events_size = sizeof(events) / sizeof(events[0]); + struct nio io; + int signalfd = -1; struct distributor_return_value * const drv = (struct distributor_return_value *)arg; struct thread_return_value * const trv = &drv->thread_return_value; struct nDPIsrvd_socket * mock_sock = nDPIsrvd_socket_init(sizeof(struct distributor_global_user_data), @@ -947,9 +948,29 @@ static void * distributor_client_mainloop_thread(void * const arg) logger(0, "Distributor thread started, init.."); + nio_init(&io); + +#ifdef ENABLE_EPOLL + if (nio_use_epoll(&io, 32) != NIO_SUCCESS) +#else + if (nio_use_poll(&io, 32) != NIO_SUCCESS) +#endif + { + logger(1, "%s", "Error creating Distributor poll/epoll event I/O"); + THREAD_ERROR_GOTO(trv); + } + + signalfd = setup_signalfd(&io); + if (signalfd < 0) + { + logger(1, "Distributor signal fd setup failed: %s", strerror(errno)); + THREAD_ERROR_GOTO(trv); + } + if (thread_block_signals() != 0) { logger(1, "Distributor block signals failed: %s", strerror(errno)); + THREAD_ERROR_GOTO(trv); } errno = 0; @@ -962,31 +983,26 @@ static void * distributor_client_mainloop_thread(void * const arg) mock_buff->fd = mock_bufffds[PIPE_BUFFER_READ]; mock_null->fd = mock_nullfds[PIPE_NULL_READ]; - if (dis_epollfd < 0 || signalfd < 0) - { - THREAD_ERROR_GOTO(trv); - } - errno = 0; - if (add_in_event_fd(dis_epollfd, mock_testfds[PIPE_TEST_READ]) != 0) + if (add_in_event_fd(&io, mock_testfds[PIPE_TEST_READ]) != 0) { THREAD_ERROR_GOTO(trv); } errno = 0; - if (add_in_event_fd(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]) != 0) + if (add_in_event_fd(&io, mock_bufffds[PIPE_BUFFER_READ]) != 0) { THREAD_ERROR_GOTO(trv); } errno = 0; - if (add_in_event_fd(dis_epollfd, mock_nullfds[PIPE_NULL_READ]) != 0) + if (add_in_event_fd(&io, mock_nullfds[PIPE_NULL_READ]) != 0) { THREAD_ERROR_GOTO(trv); } errno = 0; - if (add_in_event_fd(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]) != 0) + if (add_in_event_fd(&io, mock_arpafds[PIPE_ARPA_READ]) != 0) { THREAD_ERROR_GOTO(trv); } @@ -1006,31 +1022,30 @@ static void * distributor_client_mainloop_thread(void * const arg) while (sock_stats->shutdown_events == 0 || buff_stats->shutdown_events == 0 || *mock_null_shutdown_events == 0) { - int nready = epoll_wait(dis_epollfd, events, events_size, -1); - if (nready < 0 && errno != EINTR) + if (nio_run(&io, -1) != NIO_SUCCESS) { - logger(1, "%s", "Distributor epoll wait failed."); + logger(1, "nDPIsrvd event I/O returned: %s", strerror(errno)); THREAD_ERROR_GOTO(trv); } - else if (nready < 0 && errno == EINTR) - { - continue; - } + + int nready = nio_get_nready(&io); for (int i = 0; i < nready; i++) { - if ((events[i].events & EPOLLIN) == 0 && (events[i].events & EPOLLHUP) == 0) + int fd = nio_get_fd(&io, i); + + if (nio_has_input(&io, i) != NIO_SUCCESS && nio_has_error(&io, i) != NIO_SUCCESS) { - logger(1, "Invalid epoll event received: %d", events[i].events & (~EPOLLIN & ~EPOLLHUP)); + logger(1, "Invalid epoll event received for fd: %d", fd); THREAD_ERROR_GOTO(trv); } - if ((events[i].events & EPOLLERR) != 0 || (events[i].events & EPOLLHUP) != 0) + if (nio_has_error(&io, i) == NIO_SUCCESS) { - logger(1, "Distributor disconnected: %d", events[i].data.fd); - del_event(dis_epollfd, events[i].data.fd); + logger(1, "Distributor disconnected: %d", fd); + del_event(&io, fd); } - if (events[i].data.fd == mock_testfds[PIPE_TEST_READ]) + if (fd == mock_testfds[PIPE_TEST_READ]) { switch (nDPIsrvd_read(mock_sock)) { @@ -1065,7 +1080,7 @@ static void * distributor_client_mainloop_thread(void * const arg) THREAD_ERROR_GOTO(trv); } } - else if (events[i].data.fd == mock_bufffds[PIPE_BUFFER_READ]) + else if (fd == mock_bufffds[PIPE_BUFFER_READ]) { switch (nDPIsrvd_read(mock_buff)) { @@ -1100,7 +1115,7 @@ static void * distributor_client_mainloop_thread(void * const arg) THREAD_ERROR_GOTO(trv); } } - else if (events[i].data.fd == mock_nullfds[PIPE_NULL_READ]) + else if (fd == mock_nullfds[PIPE_NULL_READ]) { switch (nDPIsrvd_read(mock_null)) { @@ -1129,7 +1144,7 @@ static void * distributor_client_mainloop_thread(void * const arg) THREAD_ERROR_GOTO(trv); } } - else if (events[i].data.fd == mock_arpafds[PIPE_ARPA_READ]) + else if (fd == mock_arpafds[PIPE_ARPA_READ]) { char buf[NETWORK_BUFFER_MAX_SIZE]; ssize_t bytes_read = read(mock_arpafds[PIPE_ARPA_READ], buf, sizeof(buf)); @@ -1144,7 +1159,7 @@ static void * distributor_client_mainloop_thread(void * const arg) * I am just here to trigger some IP code paths. */ } - else if (events[i].data.fd == signalfd) + else if (fd == signalfd) { struct signalfd_siginfo fdsi; ssize_t s; @@ -1166,9 +1181,7 @@ static void * distributor_client_mainloop_thread(void * const arg) else { logger(1, - "Distributor epoll returned unexpected event data: %d (%p)", - events[i].data.fd, - events[i].data.ptr); + "Distributor epoll returned unexpected event data: %p", nio_get_ptr(&io, i)); THREAD_ERROR_GOTO(trv); } } @@ -1238,17 +1251,17 @@ static void * distributor_client_mainloop_thread(void * const arg) } error: - del_event(dis_epollfd, signalfd); - del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]); - del_event(dis_epollfd, mock_bufffds[PIPE_BUFFER_READ]); - del_event(dis_epollfd, mock_nullfds[PIPE_NULL_READ]); - del_event(dis_epollfd, mock_arpafds[PIPE_ARPA_READ]); + del_event(&io, signalfd); + del_event(&io, mock_testfds[PIPE_TEST_READ]); + del_event(&io, mock_bufffds[PIPE_BUFFER_READ]); + del_event(&io, mock_nullfds[PIPE_NULL_READ]); + del_event(&io, mock_arpafds[PIPE_ARPA_READ]); close(mock_testfds[PIPE_TEST_READ]); close(mock_bufffds[PIPE_BUFFER_READ]); close(mock_nullfds[PIPE_NULL_READ]); close(mock_arpafds[PIPE_ARPA_READ]); - close(dis_epollfd); close(signalfd); + nio_free(&io); nDPIsrvd_socket_free(&mock_sock); nDPIsrvd_socket_free(&mock_buff); @@ -1501,9 +1514,9 @@ static int nio_selftest() #endif #ifdef ENABLE_EPOLL - if (nio_use_epoll(&io, 5) != NIO_ERROR_SUCCESS) + if (nio_use_epoll(&io, 32) != NIO_SUCCESS) #else - if (nio_use_poll(&io, 3) != NIO_ERROR_SUCCESS) + if (nio_use_poll(&io, 32) != NIO_SUCCESS) #endif { logger(1, "%s", "Could not use poll/epoll for nio"); @@ -1518,8 +1531,8 @@ static int nio_selftest() goto error; } - if (nio_add_fd(&io, pipefds[1], NIO_EVENT_OUTPUT, NULL) != NIO_ERROR_SUCCESS || - nio_add_fd(&io, pipefds[0], NIO_EVENT_INPUT, NULL) != NIO_ERROR_SUCCESS) + if (nio_add_fd(&io, pipefds[1], NIO_EVENT_OUTPUT, NULL) != NIO_SUCCESS || + nio_add_fd(&io, pipefds[0], NIO_EVENT_INPUT, NULL) != NIO_SUCCESS) { logger(1, "%s", "Could not add pipe fds to nio"); goto error; @@ -1535,26 +1548,26 @@ static int nio_selftest() size_t const wlen = strnlen(wbuf, sizeof(wbuf)); write(pipefds[1], wbuf, wlen); - if (nio_run(&io, 1000) != NIO_ERROR_SUCCESS) + if (nio_run(&io, 1000) != NIO_SUCCESS) { logger(1, "%s", "Event notification failed"); goto error; } - if (nio_can_output(&io, 0) != NIO_ERROR_SUCCESS) + if (nio_can_output(&io, 0) != NIO_SUCCESS) { logger(1, "%s", "Pipe fd (write) can not output"); goto error; } - if (nio_has_input(&io, 1) != NIO_ERROR_SUCCESS) + if (nio_has_input(&io, 1) != NIO_SUCCESS) { logger(1, "%s", "Pipe fd (read) has no input"); goto error; } - if (nio_is_valid(&io, 0) != NIO_ERROR_SUCCESS || nio_is_valid(&io, 1) != NIO_ERROR_SUCCESS || - nio_has_error(&io, 0) == NIO_ERROR_SUCCESS || nio_has_error(&io, 1) == NIO_ERROR_SUCCESS) + if (nio_is_valid(&io, 0) != NIO_SUCCESS || nio_is_valid(&io, 1) != NIO_SUCCESS || + nio_has_error(&io, 0) == NIO_SUCCESS || nio_has_error(&io, 1) == NIO_SUCCESS) { logger(1, "%s", "Event validation failed"); goto error; @@ -1567,31 +1580,38 @@ static int nio_selftest() goto error; } - if (nio_run(&io, 1000) != NIO_ERROR_SUCCESS) + if (nio_run(&io, 1000) != NIO_SUCCESS) { logger(1, "%s", "Event notification failed"); goto error; } - if (nio_can_output(&io, 0) != NIO_ERROR_SUCCESS) + if (nio_can_output(&io, 0) != NIO_SUCCESS) { logger(1, "%s", "Pipe fd (write) can not output"); goto error; } - if (nio_has_input(&io, 1) == NIO_ERROR_SUCCESS) + if (nio_has_input(&io, 1) == NIO_SUCCESS) { logger(1, "%s", "Pipe fd (read) has input"); goto error; } - if (nio_is_valid(&io, 0) != NIO_ERROR_SUCCESS || nio_is_valid(&io, 1) == NIO_ERROR_SUCCESS || - nio_has_error(&io, 0) == NIO_ERROR_SUCCESS || nio_has_error(&io, 1) == NIO_ERROR_SUCCESS) + if (nio_is_valid(&io, 0) != NIO_SUCCESS || nio_is_valid(&io, 1) == NIO_SUCCESS || + nio_has_error(&io, 0) == NIO_SUCCESS || nio_has_error(&io, 1) == NIO_SUCCESS) { logger(1, "%s", "Event validation failed"); goto error; } + if (nio_del_fd(&io, pipefds[0]) != NIO_SUCCESS + || nio_del_fd(&io, pipefds[1]) != NIO_SUCCESS) + { + logger(1, "%s", "Event delete failed"); + goto error; + } + nio_free(&io); return 0; error: |