summaryrefslogtreecommitdiff
path: root/nDPId-test.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-04-08 20:33:25 +0200
committerToni Uhlig <matzeton@googlemail.com>2021-04-09 00:18:35 +0200
commit0a7ad7a76ac34d7a0c7635591203de08979b60da (patch)
tree28b1afb918be5733b85501df4affbded3c4fe100 /nDPId-test.c
parente576162a43c78290961b0b6c8cd3e5cc2965316f (diff)
nDPId-test: added JSON distribution + JSON parsing (Multithreaded design re-using most of nDPId/nDPIsrvd core)
* improved Makefile.old install targets * splitted nDPIsrvd_parse into nDPIsrvd_parse_line and nDPIsrvd_parse_all for the sake of readability * minor Python script improvments (check for nDPIsrvd.py on multiple locations, may be superseeded by setuptools in the future) * some paths needs to be absolute (chdir() during daemonize) and therefor additional checks introduced * test run script checks and fails if certain files are are missing (PCAP file <=> result output file) * removed not very useful "internal format error" JSON serialization if a BUG for same exists * fixed invalid l4 type statistics counters for nDPIsrvd-collectd Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPId-test.c')
-rw-r--r--nDPId-test.c246
1 files changed, 175 insertions, 71 deletions
diff --git a/nDPId-test.c b/nDPId-test.c
index afe77adf7..528109a2a 100644
--- a/nDPId-test.c
+++ b/nDPId-test.c
@@ -16,11 +16,29 @@ enum
PIPE_COUNT = 2
};
-static int epollfd = -1;
+struct thread_return_value
+{
+ int val;
+};
+
static int mock_pipefds[PIPE_COUNT] = {};
static int mock_servfds[PIPE_COUNT] = {};
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
+#define MAX_REMOTE_DESCRIPTORS 2
+
+#define THREAD_ERROR(thread_arg) \
+ do \
+ { \
+ ((struct thread_return_value *)thread_arg)->val = 1; \
+ } while (0);
+#define THREAD_ERROR_GOTO(thread_arg) \
+ do \
+ { \
+ THREAD_ERROR(thread_arg); \
+ goto error; \
+ } while (0);
+
void mock_syslog_stderr(int p, const char * format, ...)
{
va_list ap;
@@ -47,98 +65,158 @@ static int setup_pipe(int pipefd[PIPE_COUNT])
static void * nDPIsrvd_mainloop_thread(void * const arg)
{
(void)arg;
+ int epollfd = create_evq();
struct remote_desc * mock_json_desc = NULL;
struct remote_desc * mock_serv_desc = NULL;
+ struct epoll_event events[32];
+ size_t const events_size = sizeof(events) / sizeof(events[0]);
+
+ if (epollfd < 0)
+ {
+ THREAD_ERROR_GOTO(arg);
+ }
mock_json_desc = get_unused_remote_descriptor(JSON_SOCK, mock_pipefds[PIPE_nDPIsrvd]);
if (mock_json_desc == NULL)
{
- goto error;
+ THREAD_ERROR_GOTO(arg);
}
mock_serv_desc = get_unused_remote_descriptor(SERV_SOCK, mock_servfds[PIPE_WRITE]);
if (mock_serv_desc == NULL)
{
- goto error;
+ THREAD_ERROR_GOTO(arg);
}
strncpy(mock_serv_desc->event_serv.peer_addr, "0.0.0.0", sizeof(mock_serv_desc->event_serv.peer_addr));
mock_serv_desc->event_serv.peer.sin_port = 0;
if (add_event(epollfd, mock_pipefds[PIPE_nDPIsrvd], mock_json_desc) != 0)
{
- goto error;
+ THREAD_ERROR_GOTO(arg);
}
if (add_event(epollfd, mock_servfds[PIPE_WRITE], mock_serv_desc) != 0)
{
- goto error;
+ THREAD_ERROR_GOTO(arg);
}
- if (mainloop(epollfd) != 0)
+ while (1)
{
- goto error;
- }
+ int nready = epoll_wait(epollfd, events, events_size, -1);
+
+ if (nready < 0)
+ {
+ THREAD_ERROR_GOTO(arg);
+ }
- while (handle_incoming_data(epollfd, mock_json_desc) == 0) {}
+ for (int i = 0; i < nready; i++)
+ {
+ if (events[i].data.ptr == mock_json_desc)
+ {
+ if (handle_incoming_data_event(epollfd, &events[i]) != 0)
+ {
+ goto error;
+ }
+ }
+ else
+ {
+ THREAD_ERROR_GOTO(arg);
+ }
+ }
+ }
error:
+ del_event(epollfd, mock_pipefds[PIPE_nDPIsrvd]);
+ del_event(epollfd, mock_servfds[PIPE_WRITE]);
+ close(mock_pipefds[PIPE_nDPIsrvd]);
close(mock_servfds[PIPE_WRITE]);
+ close(epollfd);
return NULL;
}
-static void * distributor_mainloop_thread(void * const arg)
+static enum nDPIsrvd_parse_return parse_json_lines(struct io_buffer * const buffer)
{
- char buf[NETWORK_BUFFER_MAX_SIZE];
+ struct nDPIsrvd_buffer buf = {};
+ struct nDPIsrvd_jsmn jsmn = {};
+ size_t const n = (buffer->used > sizeof(buf.raw) ? sizeof(buf.raw) : buffer->used);
- (void)arg;
+ if (n > NETWORK_BUFFER_MAX_SIZE)
+ {
+ return PARSE_STRING_TOO_BIG;
+ }
+
+ memcpy(buf.raw, buffer->ptr, n);
+ buf.used = buffer->used;
+
+ enum nDPIsrvd_parse_return ret;
+ while ((ret = nDPIsrvd_parse_line(&buf, &jsmn)) == PARSE_OK)
+ {
+ if (jsmn.tokens_found == 0)
+ {
+ return PARSE_JSMN_ERROR;
+ }
+ nDPIsrvd_drain_buffer(&buf);
+ }
+
+ memcpy(buffer->ptr, buf.raw, buf.used);
+ buffer->used = buf.used;
- int dis_thread_shutdown = 0;
+ return ret;
+}
+
+static void * distributor_client_mainloop_thread(void * const arg)
+{
+ struct io_buffer client_buffer = {.ptr = (uint8_t *)malloc(NETWORK_BUFFER_MAX_SIZE),
+ .max = NETWORK_BUFFER_MAX_SIZE,
+ .used = 0};
int dis_epollfd = create_evq();
int signalfd = setup_signalfd(dis_epollfd);
-
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
- if (dis_epollfd < 0)
+ if (client_buffer.ptr == NULL || dis_epollfd < 0 || signalfd < 0)
{
- goto error;
+ THREAD_ERROR_GOTO(arg);
}
if (add_event(dis_epollfd, mock_servfds[PIPE_READ], NULL) != 0)
{
- goto error;
- }
- if (signalfd < 0)
- {
- goto error;
+ THREAD_ERROR_GOTO(arg);
}
- while (dis_thread_shutdown == 0)
+ while (1)
{
int nready = epoll_wait(dis_epollfd, events, events_size, -1);
for (int i = 0; i < nready; i++)
{
- if ((events[i].events & EPOLLERR) != 0)
- {
- dis_thread_shutdown = 1;
- break;
- }
- if ((events[i].events & EPOLLIN) == 0)
+ if ((events[i].events & EPOLLIN) == 0 && (events[i].events & EPOLLHUP) == 0)
{
- dis_thread_shutdown = 1;
- break;
+ THREAD_ERROR_GOTO(arg);
}
if (events[i].data.fd == mock_servfds[PIPE_READ])
{
- ssize_t bytes_read = read(mock_servfds[PIPE_READ], buf, sizeof(buf));
- if (bytes_read <= 0)
+ ssize_t bytes_read = read(mock_servfds[PIPE_READ],
+ client_buffer.ptr + client_buffer.used,
+ client_buffer.max - client_buffer.used);
+ if (bytes_read == 0)
{
- dis_thread_shutdown = 1;
- break;
+ goto error;
+ }
+ else if (bytes_read < 0)
+ {
+ THREAD_ERROR_GOTO(arg);
+ }
+ printf("%.*s", (int)bytes_read, client_buffer.ptr + client_buffer.used);
+ client_buffer.used += bytes_read;
+
+ enum nDPIsrvd_parse_return parse_ret = parse_json_lines(&client_buffer);
+ if (parse_ret != PARSE_NEED_MORE_DATA)
+ {
+ fprintf(stderr, "JSON parsing failed: %s\n", nDPIsrvd_enum_to_string(parse_ret));
+ THREAD_ERROR(arg);
}
- printf("%.*s", (int)bytes_read, buf);
}
else if (events[i].data.fd == signalfd)
{
@@ -148,45 +226,38 @@ static void * distributor_mainloop_thread(void * const arg)
s = read(signalfd, &fdsi, sizeof(struct signalfd_siginfo));
if (s != sizeof(struct signalfd_siginfo))
{
- dis_thread_shutdown = 1;
- break;
+ THREAD_ERROR(arg);
}
if (fdsi.ssi_signo == SIGINT || fdsi.ssi_signo == SIGTERM || fdsi.ssi_signo == SIGQUIT)
{
- dis_thread_shutdown = 1;
- break;
+ fprintf(stderr, "Got signal %d, abort.\n", fdsi.ssi_signo);
+ THREAD_ERROR(arg);
}
}
else
{
- dis_thread_shutdown = 1;
- break;
+ THREAD_ERROR(arg);
}
}
}
- ssize_t bytes_read;
- while ((bytes_read = read(mock_servfds[PIPE_READ], buf, sizeof(buf))) > 0)
- {
- printf("%.*s", (int)bytes_read, buf);
- }
error:
del_event(dis_epollfd, signalfd);
del_event(dis_epollfd, mock_servfds[PIPE_READ]);
close(dis_epollfd);
close(signalfd);
+ free(client_buffer.ptr);
return NULL;
}
static void * nDPId_mainloop_thread(void * const arg)
{
- (void)arg;
-
if (setup_reader_threads() != 0)
{
- exit(EXIT_FAILURE);
+ THREAD_ERROR(arg);
+ return NULL;
}
/* Replace nDPId JSON socket fd with the one in our pipe and hope that no socket specific code-path triggered. */
@@ -203,9 +274,33 @@ static void * nDPId_mainloop_thread(void * const arg)
static void usage(char const * const arg0)
{
- printf("usage: %s [path-to-pcap-file]\n", arg0);
+ fprintf(stderr, "usage: %s [path-to-pcap-file]\n", arg0);
+}
+
+static int thread_wait_for_termination(pthread_t thread, time_t wait_time_secs, struct thread_return_value * const trv)
+{
+ struct timespec ts;
+
+ if (clock_gettime(CLOCK_REALTIME, &ts) == -1)
+ {
+ return -1;
+ }
+
+ ts.tv_sec += wait_time_secs;
+ int err = pthread_timedjoin_np(thread, (void **)&trv, &ts);
+
+ switch (err)
+ {
+ case EBUSY:
+ return 0;
+ case ETIMEDOUT:
+ return 0;
+ }
+
+ return 1;
}
+#define THREADS_RETURNED_ERROR() (nDPId_return.val != 0 || nDPIsrvd_return.val != 0 || distributor_return.val != 0)
int main(int argc, char ** argv)
{
if (argc != 2)
@@ -214,6 +309,11 @@ int main(int argc, char ** argv)
return -1;
}
+ if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+ {
+ return -1;
+ }
+
nDPId_options.reader_thread_count = 1; /* Please do not change this! Generating meaningful pcap diff's relies on a
single reader thread! */
nDPId_options.instance_alias = strdup("nDPId-test");
@@ -232,53 +332,57 @@ int main(int argc, char ** argv)
json_sockfd = -1;
serv_sockfd = -1;
- if (setup_remote_descriptors(2) != 0)
- {
- return -1;
- }
-
- epollfd = create_evq();
- if (epollfd < 0)
+ if (setup_remote_descriptors(MAX_REMOTE_DESCRIPTORS) != 0)
{
return -1;
}
pthread_t nDPId_thread;
- if (pthread_create(&nDPId_thread, NULL, nDPId_mainloop_thread, NULL) != 0)
+ struct thread_return_value nDPId_return = {};
+ if (pthread_create(&nDPId_thread, NULL, nDPId_mainloop_thread, &nDPId_return) != 0)
{
return -1;
}
pthread_t nDPIsrvd_thread;
- if (pthread_create(&nDPIsrvd_thread, NULL, nDPIsrvd_mainloop_thread, NULL) != 0)
+ struct thread_return_value nDPIsrvd_return = {};
+ if (pthread_create(&nDPIsrvd_thread, NULL, nDPIsrvd_mainloop_thread, &nDPIsrvd_return) != 0)
{
return -1;
}
pthread_t distributor_thread;
- if (pthread_create(&distributor_thread, NULL, distributor_mainloop_thread, NULL) != 0)
+ struct thread_return_value distributor_return = {};
+ if (pthread_create(&distributor_thread, NULL, distributor_client_mainloop_thread, &distributor_return) != 0)
{
return -1;
}
- if (pthread_join(nDPId_thread, NULL) != 0)
+ /* Try to gracefully shutdown all threads. */
+
+ while (thread_wait_for_termination(distributor_thread, 1, &distributor_return) == 0)
{
- return -1;
+ if (THREADS_RETURNED_ERROR() != 0)
+ {
+ return -1;
+ }
}
- pthread_kill(nDPIsrvd_thread, SIGINT);
-
- if (pthread_join(nDPIsrvd_thread, NULL) != 0)
+ while (thread_wait_for_termination(nDPId_thread, 1, &nDPId_return) == 0)
{
- return -1;
+ if (THREADS_RETURNED_ERROR() != 0)
+ {
+ return -1;
+ }
}
- pthread_kill(distributor_thread, SIGINT);
-
- if (pthread_join(distributor_thread, NULL) != 0)
+ while (thread_wait_for_termination(nDPIsrvd_thread, 1, &nDPIsrvd_return) == 0)
{
- return -1;
+ if (THREADS_RETURNED_ERROR() != 0)
+ {
+ return -1;
+ }
}
- return 0;
+ return THREADS_RETURNED_ERROR();
}