summaryrefslogtreecommitdiff
path: root/nDPIsrvd.c
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-12-15 23:25:32 +0100
committerToni Uhlig <matzeton@googlemail.com>2022-01-20 00:50:38 +0100
commit9e07a57566cc45bf92a845d8cee968d72e0f314e (patch)
tree8f1a6bfd08bd68a5253fadf3a01beecda77b1c95 /nDPIsrvd.c
parenta35fc1d5ea8570609cc0c8cf6edadc81f8f5bb76 (diff)
Major nDPId extension. Sorry for the huge commit.
- nDPId: fixed invalid IP4/IP6 tuple compare - nDPIsrvd: fixed caching issue (finally) - added tiny c example (can be used to check flow manager sanity) - c-captured: use flow_last_seen timestamp from `struct nDPIsrvd_flow` - README.md update: added example JSON sequence - nDPId: added new flow event `update` necessary for correct timeout handling (and other future use-cases) - nDPIsrvd.h and nDPIsrvd.py: switched to an instance (consists of an alias/source tuple) based flow manager - every flow related event **must** now serialize `alias`, `source`, `flow_id`, `flow_last_seen` and `flow_idle_time` to make the timeout handling and verification process work correctly - nDPIsrvd.h: ability to profile any dynamic memory (de-)allocation - nDPIsrvd.py: removed PcapPacket class (unused) - py-flow-dashboard and py-flow-multiprocess: fixed race condition - py-flow-info: print statusbar with probably useful information - nDPId/nDPIsrvd.h: switched from packet-flow only timestamps (`pkt_*sec`) to a generic flow event timestamp `ts_msec` - nDPId-test: added additional checks - nDPId: increased ICMP flow timeout - nDPId: using event based i/o if capturing packets from a device - nDPIsrvd: fixed memory leak on shutdown if remote descriptors were still connected Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r--nDPIsrvd.c98
1 files changed, 77 insertions, 21 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index b9e71183f..4e48c9ca1 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -96,8 +96,8 @@ static void nDPIsrvd_buffer_array_copy(void * dst, const void * src)
}
buf_dst->json_string_start = buf_src->json_string_start;
- buf_dst->json_string_length = buf_src->used;
- buf_dst->json_string = buf_dst->ptr.text + buf_dst->json_string_start;
+ buf_dst->json_string_length = buf_src->json_string_length;
+ buf_dst->json_string = buf_src->json_string;
buf_dst->used = buf_src->used;
memcpy(buf_dst->ptr.raw, buf_src->ptr.raw, buf_src->used);
}
@@ -114,7 +114,20 @@ static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_buffer),
nDPIsrvd_buffer_array_copy,
nDPIsrvd_buffer_array_dtor};
-static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, size_t siz)
+#ifndef NO_MAIN
+#ifdef ENABLE_MEMORY_PROFILING
+void nDPIsrvd_memprof_log(char const * const format, ...)
+{
+ va_list ap;
+
+ va_start(ap, format);
+ vsyslog(LOG_DAEMON, format, ap);
+ va_end(ap);
+}
+#endif
+#endif
+
+static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, nDPIsrvd_ull json_string_length)
{
struct nDPIsrvd_buffer buf_src = {};
@@ -141,9 +154,8 @@ static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf,
}
buf_src.ptr.raw = buf;
- buf_src.json_string_length = buf_src.used = siz;
+ buf_src.used = buf_src.max = buf_src.json_string_length = json_string_length;
utarray_push_back(remote->buf_cache, &buf_src);
- remote->buf.used = 0;
return 0;
}
@@ -206,11 +218,9 @@ static int drain_main_buffer(struct remote_desc * const remote)
remote->buf.used);
#endif
memmove(remote->buf.ptr.raw, remote->buf.ptr.raw + bytes_written, remote->buf.used - bytes_written);
- remote->buf.used -= bytes_written;
- return 0;
}
- remote->buf.used = 0;
+ remote->buf.used -= bytes_written;
return 0;
}
@@ -397,8 +407,7 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in
{
remotes.desc_used++;
utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd);
- if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, max_buffer_size) != 0 ||
- remotes.desc[i].buf_cache == NULL)
+ if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, max_buffer_size) != 0 || remotes.desc[i].buf_cache == NULL)
{
return NULL;
}
@@ -411,6 +420,19 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in
return NULL;
}
+static void free_remote_descriptor_data(void)
+{
+ for (size_t i = 0; i < remotes.desc_size; ++i)
+ {
+ if (remotes.desc[i].buf_cache != NULL)
+ {
+ utarray_free(remotes.desc[i].buf_cache);
+ remotes.desc[i].buf_cache = NULL;
+ }
+ nDPIsrvd_buffer_free(&remotes.desc[i].buf);
+ }
+}
+
static int add_event(int epollfd, int events, int fd, void * ptr)
{
struct epoll_event event = {};
@@ -511,7 +533,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
fprintf(stderr,
"Usage: %s [-l] [-c path-to-unix-sock] [-d] [-p pidfile]\n"
"\t[-s path-to-unix-socket|distributor-host:port] [-u user] [-g group]\n"
- "\t[-C cache-array-length] [-D]\n"
+ "\t[-C max-buffered-collector-json-lines] [-D]\n"
"\t[-v] [-h]\n",
argv[0]);
return 1;
@@ -647,7 +669,7 @@ static int new_connection(int epollfd, int eventfd)
{
if (errno == EAFNOSUPPORT)
{
- syslog(LOG_DAEMON | LOG_ERR, "New distributor connection.");
+ syslog(LOG_DAEMON | LOG_ERR, "%s", "New distributor connection.");
}
else
{
@@ -738,6 +760,15 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur
return 1;
}
+ if (json_str_start - current->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS)
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "BUG: Invalid collector protocol data received. Expected protocol preamble of size %u bytes, got %ld "
+ "bytes",
+ NETWORK_BUFFER_LENGTH_DIGITS,
+ (long int)(json_str_start - current->buf.ptr.text));
+ }
+
if (current->event_json.json_bytes > current->buf.max)
{
syslog(LOG_DAEMON | LOG_ERR,
@@ -824,7 +855,8 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current)
syslog(LOG_DAEMON, "Buffer capacity threshold (%zu bytes) reached, caching JSON strings.", remotes.desc[i].buf.used);
#endif
errno = 0;
- if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0 && errno != EEXIST /* required for nDPId-test */)
+ if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0 &&
+ errno != EEXIST /* required for nDPId-test */)
{
syslog(LOG_DAEMON | LOG_ERR, "%s: %s", "Could not add event, disconnecting", strerror(errno));
disconnect_client(epollfd, &remotes.desc[i]);
@@ -867,18 +899,19 @@ static int handle_data_event(int epollfd, struct epoll_event * const event)
if ((event->events & EPOLLIN) == 0 && (event->events & EPOLLOUT) == 0)
{
+ syslog(LOG_DAEMON | LOG_ERR, "Can not handle event mask: %d", event->events);
return 1;
}
if (current == NULL)
{
- syslog(LOG_DAEMON | LOG_ERR, "%s", "remote descriptor got from event data invalid");
+ syslog(LOG_DAEMON | LOG_ERR, "%s", "Remote descriptor got from event data invalid.");
return 1;
}
if (current->fd < 0)
{
- syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd);
+ syslog(LOG_DAEMON | LOG_ERR, "File descriptor `%d' got from event data invalid.", current->fd);
return 1;
}
@@ -937,16 +970,37 @@ static int mainloop(int epollfd)
for (int i = 0; i < nready; i++)
{
- if (events[i].events & EPOLLERR)
+ if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP)
{
- syslog(LOG_DAEMON | LOG_ERR,
- "Epoll event error: %s",
- (errno != 0 ? strerror(errno) : "Client disconnected"));
if (events[i].data.fd != json_sockfd && events[i].data.fd != serv_sockfd)
{
- struct remote_desc * current = (struct remote_desc *)events[i].data.ptr;
+ struct remote_desc * const current = (struct remote_desc *)events[i].data.ptr;
+ switch (current->sock_type)
+ {
+ case JSON_SOCK:
+ syslog(LOG_DAEMON | LOG_ERR, "Collector disconnected: %d", current->fd);
+ break;
+ case SERV_SOCK:
+ if (current->event_serv.peer_addr[0] == '\0')
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "%s", "Distributor disconnected");
+ }
+ else
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "Distributor disconnected: %.*s:%u",
+ (int)sizeof(current->event_serv.peer_addr),
+ current->event_serv.peer_addr,
+ current->event_serv.peer.sin_port);
+ }
+ break;
+ }
disconnect_client(epollfd, current);
}
+ else
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Epoll event error: %s", (errno != 0 ? strerror(errno) : "unknown"));
+ }
continue;
}
@@ -992,6 +1046,8 @@ static int mainloop(int epollfd)
close(signalfd);
+ free_remote_descriptor_data();
+
return 0;
}
@@ -1037,7 +1093,7 @@ static int setup_remote_descriptors(size_t max_descriptors)
{
remotes.desc_used = 0;
remotes.desc_size = max_descriptors;
- remotes.desc = (struct remote_desc *)calloc(remotes.desc_size, sizeof(*remotes.desc));
+ remotes.desc = (struct remote_desc *)nDPIsrvd_calloc(remotes.desc_size, sizeof(*remotes.desc));
if (remotes.desc == NULL)
{
return -1;