diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-04-08 20:33:25 +0200 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2021-04-09 00:18:35 +0200 |
commit | 0a7ad7a76ac34d7a0c7635591203de08979b60da (patch) | |
tree | 28b1afb918be5733b85501df4affbded3c4fe100 /nDPId-test.c | |
parent | e576162a43c78290961b0b6c8cd3e5cc2965316f (diff) |
nDPId-test: added JSON distribution + JSON parsing (Multithreaded design re-using most of nDPId/nDPIsrvd core)
* improved Makefile.old install targets
* splitted nDPIsrvd_parse into nDPIsrvd_parse_line and nDPIsrvd_parse_all for the sake of readability
* minor Python script improvments (check for nDPIsrvd.py on multiple locations, may be superseeded by setuptools in the future)
* some paths needs to be absolute (chdir() during daemonize) and therefor additional checks introduced
* test run script checks and fails if certain files are are missing (PCAP file <=> result output file)
* removed not very useful "internal format error" JSON serialization if a BUG for same exists
* fixed invalid l4 type statistics counters for nDPIsrvd-collectd
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId-test.c')
-rw-r--r-- | nDPId-test.c | 246 |
1 files changed, 175 insertions, 71 deletions
diff --git a/nDPId-test.c b/nDPId-test.c index afe77adf7..528109a2a 100644 --- a/nDPId-test.c +++ b/nDPId-test.c @@ -16,11 +16,29 @@ enum PIPE_COUNT = 2 }; -static int epollfd = -1; +struct thread_return_value +{ + int val; +}; + static int mock_pipefds[PIPE_COUNT] = {}; static int mock_servfds[PIPE_COUNT] = {}; static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; +#define MAX_REMOTE_DESCRIPTORS 2 + +#define THREAD_ERROR(thread_arg) \ + do \ + { \ + ((struct thread_return_value *)thread_arg)->val = 1; \ + } while (0); +#define THREAD_ERROR_GOTO(thread_arg) \ + do \ + { \ + THREAD_ERROR(thread_arg); \ + goto error; \ + } while (0); + void mock_syslog_stderr(int p, const char * format, ...) { va_list ap; @@ -47,98 +65,158 @@ static int setup_pipe(int pipefd[PIPE_COUNT]) static void * nDPIsrvd_mainloop_thread(void * const arg) { (void)arg; + int epollfd = create_evq(); struct remote_desc * mock_json_desc = NULL; struct remote_desc * mock_serv_desc = NULL; + struct epoll_event events[32]; + size_t const events_size = sizeof(events) / sizeof(events[0]); + + if (epollfd < 0) + { + THREAD_ERROR_GOTO(arg); + } mock_json_desc = get_unused_remote_descriptor(JSON_SOCK, mock_pipefds[PIPE_nDPIsrvd]); if (mock_json_desc == NULL) { - goto error; + THREAD_ERROR_GOTO(arg); } mock_serv_desc = get_unused_remote_descriptor(SERV_SOCK, mock_servfds[PIPE_WRITE]); if (mock_serv_desc == NULL) { - goto error; + THREAD_ERROR_GOTO(arg); } strncpy(mock_serv_desc->event_serv.peer_addr, "0.0.0.0", sizeof(mock_serv_desc->event_serv.peer_addr)); mock_serv_desc->event_serv.peer.sin_port = 0; if (add_event(epollfd, mock_pipefds[PIPE_nDPIsrvd], mock_json_desc) != 0) { - goto error; + THREAD_ERROR_GOTO(arg); } if (add_event(epollfd, mock_servfds[PIPE_WRITE], mock_serv_desc) != 0) { - goto error; + THREAD_ERROR_GOTO(arg); } - if (mainloop(epollfd) != 0) + while (1) { - goto error; - } + int nready = epoll_wait(epollfd, events, events_size, -1); + + if (nready < 0) + { + THREAD_ERROR_GOTO(arg); + } - while (handle_incoming_data(epollfd, mock_json_desc) == 0) {} + for (int i = 0; i < nready; i++) + { + if (events[i].data.ptr == mock_json_desc) + { + if (handle_incoming_data_event(epollfd, &events[i]) != 0) + { + goto error; + } + } + else + { + THREAD_ERROR_GOTO(arg); + } + } + } error: + del_event(epollfd, mock_pipefds[PIPE_nDPIsrvd]); + del_event(epollfd, mock_servfds[PIPE_WRITE]); + close(mock_pipefds[PIPE_nDPIsrvd]); close(mock_servfds[PIPE_WRITE]); + close(epollfd); return NULL; } -static void * distributor_mainloop_thread(void * const arg) +static enum nDPIsrvd_parse_return parse_json_lines(struct io_buffer * const buffer) { - char buf[NETWORK_BUFFER_MAX_SIZE]; + struct nDPIsrvd_buffer buf = {}; + struct nDPIsrvd_jsmn jsmn = {}; + size_t const n = (buffer->used > sizeof(buf.raw) ? sizeof(buf.raw) : buffer->used); - (void)arg; + if (n > NETWORK_BUFFER_MAX_SIZE) + { + return PARSE_STRING_TOO_BIG; + } + + memcpy(buf.raw, buffer->ptr, n); + buf.used = buffer->used; + + enum nDPIsrvd_parse_return ret; + while ((ret = nDPIsrvd_parse_line(&buf, &jsmn)) == PARSE_OK) + { + if (jsmn.tokens_found == 0) + { + return PARSE_JSMN_ERROR; + } + nDPIsrvd_drain_buffer(&buf); + } + + memcpy(buffer->ptr, buf.raw, buf.used); + buffer->used = buf.used; - int dis_thread_shutdown = 0; + return ret; +} + +static void * distributor_client_mainloop_thread(void * const arg) +{ + struct io_buffer client_buffer = {.ptr = (uint8_t *)malloc(NETWORK_BUFFER_MAX_SIZE), + .max = NETWORK_BUFFER_MAX_SIZE, + .used = 0}; int dis_epollfd = create_evq(); int signalfd = setup_signalfd(dis_epollfd); - struct epoll_event events[32]; size_t const events_size = sizeof(events) / sizeof(events[0]); - if (dis_epollfd < 0) + if (client_buffer.ptr == NULL || dis_epollfd < 0 || signalfd < 0) { - goto error; + THREAD_ERROR_GOTO(arg); } if (add_event(dis_epollfd, mock_servfds[PIPE_READ], NULL) != 0) { - goto error; - } - if (signalfd < 0) - { - goto error; + THREAD_ERROR_GOTO(arg); } - while (dis_thread_shutdown == 0) + while (1) { int nready = epoll_wait(dis_epollfd, events, events_size, -1); for (int i = 0; i < nready; i++) { - if ((events[i].events & EPOLLERR) != 0) - { - dis_thread_shutdown = 1; - break; - } - if ((events[i].events & EPOLLIN) == 0) + if ((events[i].events & EPOLLIN) == 0 && (events[i].events & EPOLLHUP) == 0) { - dis_thread_shutdown = 1; - break; + THREAD_ERROR_GOTO(arg); } if (events[i].data.fd == mock_servfds[PIPE_READ]) { - ssize_t bytes_read = read(mock_servfds[PIPE_READ], buf, sizeof(buf)); - if (bytes_read <= 0) + ssize_t bytes_read = read(mock_servfds[PIPE_READ], + client_buffer.ptr + client_buffer.used, + client_buffer.max - client_buffer.used); + if (bytes_read == 0) { - dis_thread_shutdown = 1; - break; + goto error; + } + else if (bytes_read < 0) + { + THREAD_ERROR_GOTO(arg); + } + printf("%.*s", (int)bytes_read, client_buffer.ptr + client_buffer.used); + client_buffer.used += bytes_read; + + enum nDPIsrvd_parse_return parse_ret = parse_json_lines(&client_buffer); + if (parse_ret != PARSE_NEED_MORE_DATA) + { + fprintf(stderr, "JSON parsing failed: %s\n", nDPIsrvd_enum_to_string(parse_ret)); + THREAD_ERROR(arg); } - printf("%.*s", (int)bytes_read, buf); } else if (events[i].data.fd == signalfd) { @@ -148,45 +226,38 @@ static void * distributor_mainloop_thread(void * const arg) s = read(signalfd, &fdsi, sizeof(struct signalfd_siginfo)); if (s != sizeof(struct signalfd_siginfo)) { - dis_thread_shutdown = 1; - break; + THREAD_ERROR(arg); } if (fdsi.ssi_signo == SIGINT || fdsi.ssi_signo == SIGTERM || fdsi.ssi_signo == SIGQUIT) { - dis_thread_shutdown = 1; - break; + fprintf(stderr, "Got signal %d, abort.\n", fdsi.ssi_signo); + THREAD_ERROR(arg); } } else { - dis_thread_shutdown = 1; - break; + THREAD_ERROR(arg); } } } - ssize_t bytes_read; - while ((bytes_read = read(mock_servfds[PIPE_READ], buf, sizeof(buf))) > 0) - { - printf("%.*s", (int)bytes_read, buf); - } error: del_event(dis_epollfd, signalfd); del_event(dis_epollfd, mock_servfds[PIPE_READ]); close(dis_epollfd); close(signalfd); + free(client_buffer.ptr); return NULL; } static void * nDPId_mainloop_thread(void * const arg) { - (void)arg; - if (setup_reader_threads() != 0) { - exit(EXIT_FAILURE); + THREAD_ERROR(arg); + return NULL; } /* Replace nDPId JSON socket fd with the one in our pipe and hope that no socket specific code-path triggered. */ @@ -203,9 +274,33 @@ static void * nDPId_mainloop_thread(void * const arg) static void usage(char const * const arg0) { - printf("usage: %s [path-to-pcap-file]\n", arg0); + fprintf(stderr, "usage: %s [path-to-pcap-file]\n", arg0); +} + +static int thread_wait_for_termination(pthread_t thread, time_t wait_time_secs, struct thread_return_value * const trv) +{ + struct timespec ts; + + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) + { + return -1; + } + + ts.tv_sec += wait_time_secs; + int err = pthread_timedjoin_np(thread, (void **)&trv, &ts); + + switch (err) + { + case EBUSY: + return 0; + case ETIMEDOUT: + return 0; + } + + return 1; } +#define THREADS_RETURNED_ERROR() (nDPId_return.val != 0 || nDPIsrvd_return.val != 0 || distributor_return.val != 0) int main(int argc, char ** argv) { if (argc != 2) @@ -214,6 +309,11 @@ int main(int argc, char ** argv) return -1; } + if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) + { + return -1; + } + nDPId_options.reader_thread_count = 1; /* Please do not change this! Generating meaningful pcap diff's relies on a single reader thread! */ nDPId_options.instance_alias = strdup("nDPId-test"); @@ -232,53 +332,57 @@ int main(int argc, char ** argv) json_sockfd = -1; serv_sockfd = -1; - if (setup_remote_descriptors(2) != 0) - { - return -1; - } - - epollfd = create_evq(); - if (epollfd < 0) + if (setup_remote_descriptors(MAX_REMOTE_DESCRIPTORS) != 0) { return -1; } pthread_t nDPId_thread; - if (pthread_create(&nDPId_thread, NULL, nDPId_mainloop_thread, NULL) != 0) + struct thread_return_value nDPId_return = {}; + if (pthread_create(&nDPId_thread, NULL, nDPId_mainloop_thread, &nDPId_return) != 0) { return -1; } pthread_t nDPIsrvd_thread; - if (pthread_create(&nDPIsrvd_thread, NULL, nDPIsrvd_mainloop_thread, NULL) != 0) + struct thread_return_value nDPIsrvd_return = {}; + if (pthread_create(&nDPIsrvd_thread, NULL, nDPIsrvd_mainloop_thread, &nDPIsrvd_return) != 0) { return -1; } pthread_t distributor_thread; - if (pthread_create(&distributor_thread, NULL, distributor_mainloop_thread, NULL) != 0) + struct thread_return_value distributor_return = {}; + if (pthread_create(&distributor_thread, NULL, distributor_client_mainloop_thread, &distributor_return) != 0) { return -1; } - if (pthread_join(nDPId_thread, NULL) != 0) + /* Try to gracefully shutdown all threads. */ + + while (thread_wait_for_termination(distributor_thread, 1, &distributor_return) == 0) { - return -1; + if (THREADS_RETURNED_ERROR() != 0) + { + return -1; + } } - pthread_kill(nDPIsrvd_thread, SIGINT); - - if (pthread_join(nDPIsrvd_thread, NULL) != 0) + while (thread_wait_for_termination(nDPId_thread, 1, &nDPId_return) == 0) { - return -1; + if (THREADS_RETURNED_ERROR() != 0) + { + return -1; + } } - pthread_kill(distributor_thread, SIGINT); - - if (pthread_join(distributor_thread, NULL) != 0) + while (thread_wait_for_termination(nDPIsrvd_thread, 1, &nDPIsrvd_return) == 0) { - return -1; + if (THREADS_RETURNED_ERROR() != 0) + { + return -1; + } } - return 0; + return THREADS_RETURNED_ERROR(); } |