diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-12-15 23:25:32 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-01-20 00:50:38 +0100 |
commit | 9e07a57566cc45bf92a845d8cee968d72e0f314e (patch) | |
tree | 8f1a6bfd08bd68a5253fadf3a01beecda77b1c95 /nDPIsrvd.c | |
parent | a35fc1d5ea8570609cc0c8cf6edadc81f8f5bb76 (diff) |
Major nDPId extension. Sorry for the huge commit.
- nDPId: fixed invalid IP4/IP6 tuple compare
- nDPIsrvd: fixed caching issue (finally)
- added tiny c example (can be used to check flow manager sanity)
- c-captured: use flow_last_seen timestamp from `struct nDPIsrvd_flow`
- README.md update: added example JSON sequence
- nDPId: added new flow event `update` necessary for correct
timeout handling (and other future use-cases)
- nDPIsrvd.h and nDPIsrvd.py: switched to an instance
(consists of an alias/source tuple) based flow manager
- every flow related event **must** now serialize `alias`, `source`,
`flow_id`, `flow_last_seen` and `flow_idle_time` to make the timeout
handling and verification process work correctly
- nDPIsrvd.h: ability to profile any dynamic memory (de-)allocation
- nDPIsrvd.py: removed PcapPacket class (unused)
- py-flow-dashboard and py-flow-multiprocess: fixed race condition
- py-flow-info: print statusbar with probably useful information
- nDPId/nDPIsrvd.h: switched from packet-flow only timestamps (`pkt_*sec`)
to a generic flow event timestamp `ts_msec`
- nDPId-test: added additional checks
- nDPId: increased ICMP flow timeout
- nDPId: using event based i/o if capturing packets from a device
- nDPIsrvd: fixed memory leak on shutdown if remote descriptors
were still connected
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'nDPIsrvd.c')
-rw-r--r-- | nDPIsrvd.c | 98 |
1 files changed, 77 insertions, 21 deletions
diff --git a/nDPIsrvd.c b/nDPIsrvd.c index b9e71183f..4e48c9ca1 100644 --- a/nDPIsrvd.c +++ b/nDPIsrvd.c @@ -96,8 +96,8 @@ static void nDPIsrvd_buffer_array_copy(void * dst, const void * src) } buf_dst->json_string_start = buf_src->json_string_start; - buf_dst->json_string_length = buf_src->used; - buf_dst->json_string = buf_dst->ptr.text + buf_dst->json_string_start; + buf_dst->json_string_length = buf_src->json_string_length; + buf_dst->json_string = buf_src->json_string; buf_dst->used = buf_src->used; memcpy(buf_dst->ptr.raw, buf_src->ptr.raw, buf_src->used); } @@ -114,7 +114,20 @@ static const UT_icd nDPIsrvd_buffer_array_icd = {sizeof(struct nDPIsrvd_buffer), nDPIsrvd_buffer_array_copy, nDPIsrvd_buffer_array_dtor}; -static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, size_t siz) +#ifndef NO_MAIN +#ifdef ENABLE_MEMORY_PROFILING +void nDPIsrvd_memprof_log(char const * const format, ...) +{ + va_list ap; + + va_start(ap, format); + vsyslog(LOG_DAEMON, format, ap); + va_end(ap); +} +#endif +#endif + +static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, nDPIsrvd_ull json_string_length) { struct nDPIsrvd_buffer buf_src = {}; @@ -141,9 +154,8 @@ static int add_to_cache(struct remote_desc * const remote, uint8_t * const buf, } buf_src.ptr.raw = buf; - buf_src.json_string_length = buf_src.used = siz; + buf_src.used = buf_src.max = buf_src.json_string_length = json_string_length; utarray_push_back(remote->buf_cache, &buf_src); - remote->buf.used = 0; return 0; } @@ -206,11 +218,9 @@ static int drain_main_buffer(struct remote_desc * const remote) remote->buf.used); #endif memmove(remote->buf.ptr.raw, remote->buf.ptr.raw + bytes_written, remote->buf.used - bytes_written); - remote->buf.used -= bytes_written; - return 0; } - remote->buf.used = 0; + remote->buf.used -= bytes_written; return 0; } @@ -397,8 +407,7 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in { remotes.desc_used++; utarray_new(remotes.desc[i].buf_cache, &nDPIsrvd_buffer_array_icd); - if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, max_buffer_size) != 0 || - remotes.desc[i].buf_cache == NULL) + if (nDPIsrvd_buffer_init(&remotes.desc[i].buf, max_buffer_size) != 0 || remotes.desc[i].buf_cache == NULL) { return NULL; } @@ -411,6 +420,19 @@ static struct remote_desc * get_unused_remote_descriptor(enum sock_type type, in return NULL; } +static void free_remote_descriptor_data(void) +{ + for (size_t i = 0; i < remotes.desc_size; ++i) + { + if (remotes.desc[i].buf_cache != NULL) + { + utarray_free(remotes.desc[i].buf_cache); + remotes.desc[i].buf_cache = NULL; + } + nDPIsrvd_buffer_free(&remotes.desc[i].buf); + } +} + static int add_event(int epollfd, int events, int fd, void * ptr) { struct epoll_event event = {}; @@ -511,7 +533,7 @@ static int nDPIsrvd_parse_options(int argc, char ** argv) fprintf(stderr, "Usage: %s [-l] [-c path-to-unix-sock] [-d] [-p pidfile]\n" "\t[-s path-to-unix-socket|distributor-host:port] [-u user] [-g group]\n" - "\t[-C cache-array-length] [-D]\n" + "\t[-C max-buffered-collector-json-lines] [-D]\n" "\t[-v] [-h]\n", argv[0]); return 1; @@ -647,7 +669,7 @@ static int new_connection(int epollfd, int eventfd) { if (errno == EAFNOSUPPORT) { - syslog(LOG_DAEMON | LOG_ERR, "New distributor connection."); + syslog(LOG_DAEMON | LOG_ERR, "%s", "New distributor connection."); } else { @@ -738,6 +760,15 @@ static int handle_collector_protocol(int epollfd, struct remote_desc * const cur return 1; } + if (json_str_start - current->buf.ptr.text != NETWORK_BUFFER_LENGTH_DIGITS) + { + syslog(LOG_DAEMON | LOG_ERR, + "BUG: Invalid collector protocol data received. Expected protocol preamble of size %u bytes, got %ld " + "bytes", + NETWORK_BUFFER_LENGTH_DIGITS, + (long int)(json_str_start - current->buf.ptr.text)); + } + if (current->event_json.json_bytes > current->buf.max) { syslog(LOG_DAEMON | LOG_ERR, @@ -824,7 +855,8 @@ static int handle_incoming_data(int epollfd, struct remote_desc * const current) syslog(LOG_DAEMON, "Buffer capacity threshold (%zu bytes) reached, caching JSON strings.", remotes.desc[i].buf.used); #endif errno = 0; - if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0 && errno != EEXIST /* required for nDPId-test */) + if (add_out_event(epollfd, remotes.desc[i].fd, &remotes.desc[i]) != 0 && + errno != EEXIST /* required for nDPId-test */) { syslog(LOG_DAEMON | LOG_ERR, "%s: %s", "Could not add event, disconnecting", strerror(errno)); disconnect_client(epollfd, &remotes.desc[i]); @@ -867,18 +899,19 @@ static int handle_data_event(int epollfd, struct epoll_event * const event) if ((event->events & EPOLLIN) == 0 && (event->events & EPOLLOUT) == 0) { + syslog(LOG_DAEMON | LOG_ERR, "Can not handle event mask: %d", event->events); return 1; } if (current == NULL) { - syslog(LOG_DAEMON | LOG_ERR, "%s", "remote descriptor got from event data invalid"); + syslog(LOG_DAEMON | LOG_ERR, "%s", "Remote descriptor got from event data invalid."); return 1; } if (current->fd < 0) { - syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd); + syslog(LOG_DAEMON | LOG_ERR, "File descriptor `%d' got from event data invalid.", current->fd); return 1; } @@ -937,16 +970,37 @@ static int mainloop(int epollfd) for (int i = 0; i < nready; i++) { - if (events[i].events & EPOLLERR) + if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP) { - syslog(LOG_DAEMON | LOG_ERR, - "Epoll event error: %s", - (errno != 0 ? strerror(errno) : "Client disconnected")); if (events[i].data.fd != json_sockfd && events[i].data.fd != serv_sockfd) { - struct remote_desc * current = (struct remote_desc *)events[i].data.ptr; + struct remote_desc * const current = (struct remote_desc *)events[i].data.ptr; + switch (current->sock_type) + { + case JSON_SOCK: + syslog(LOG_DAEMON | LOG_ERR, "Collector disconnected: %d", current->fd); + break; + case SERV_SOCK: + if (current->event_serv.peer_addr[0] == '\0') + { + syslog(LOG_DAEMON | LOG_ERR, "%s", "Distributor disconnected"); + } + else + { + syslog(LOG_DAEMON | LOG_ERR, + "Distributor disconnected: %.*s:%u", + (int)sizeof(current->event_serv.peer_addr), + current->event_serv.peer_addr, + current->event_serv.peer.sin_port); + } + break; + } disconnect_client(epollfd, current); } + else + { + syslog(LOG_DAEMON | LOG_ERR, "Epoll event error: %s", (errno != 0 ? strerror(errno) : "unknown")); + } continue; } @@ -992,6 +1046,8 @@ static int mainloop(int epollfd) close(signalfd); + free_remote_descriptor_data(); + return 0; } @@ -1037,7 +1093,7 @@ static int setup_remote_descriptors(size_t max_descriptors) { remotes.desc_used = 0; remotes.desc_size = max_descriptors; - remotes.desc = (struct remote_desc *)calloc(remotes.desc_size, sizeof(*remotes.desc)); + remotes.desc = (struct remote_desc *)nDPIsrvd_calloc(remotes.desc_size, sizeof(*remotes.desc)); if (remotes.desc == NULL) { return -1; |