aboutsummaryrefslogtreecommitdiff
path: root/dependencies/nDPIsrvd.h
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-12-15 23:25:32 +0100
committerToni Uhlig <matzeton@googlemail.com>2022-01-20 00:50:38 +0100
commit9e07a57566cc45bf92a845d8cee968d72e0f314e (patch)
tree8f1a6bfd08bd68a5253fadf3a01beecda77b1c95 /dependencies/nDPIsrvd.h
parenta35fc1d5ea8570609cc0c8cf6edadc81f8f5bb76 (diff)
Major nDPId extension. Sorry for the huge commit.
- nDPId: fixed invalid IP4/IP6 tuple compare - nDPIsrvd: fixed caching issue (finally) - added tiny c example (can be used to check flow manager sanity) - c-captured: use flow_last_seen timestamp from `struct nDPIsrvd_flow` - README.md update: added example JSON sequence - nDPId: added new flow event `update` necessary for correct timeout handling (and other future use-cases) - nDPIsrvd.h and nDPIsrvd.py: switched to an instance (consists of an alias/source tuple) based flow manager - every flow related event **must** now serialize `alias`, `source`, `flow_id`, `flow_last_seen` and `flow_idle_time` to make the timeout handling and verification process work correctly - nDPIsrvd.h: ability to profile any dynamic memory (de-)allocation - nDPIsrvd.py: removed PcapPacket class (unused) - py-flow-dashboard and py-flow-multiprocess: fixed race condition - py-flow-info: print statusbar with probably useful information - nDPId/nDPIsrvd.h: switched from packet-flow only timestamps (`pkt_*sec`) to a generic flow event timestamp `ts_msec` - nDPId-test: added additional checks - nDPId: increased ICMP flow timeout - nDPId: using event based i/o if capturing packets from a device - nDPIsrvd: fixed memory leak on shutdown if remote descriptors were still connected Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'dependencies/nDPIsrvd.h')
-rw-r--r--dependencies/nDPIsrvd.h457
1 files changed, 351 insertions, 106 deletions
diff --git a/dependencies/nDPIsrvd.h b/dependencies/nDPIsrvd.h
index 33dd1a217..535941aa1 100644
--- a/dependencies/nDPIsrvd.h
+++ b/dependencies/nDPIsrvd.h
@@ -12,14 +12,20 @@
#include <sys/un.h>
#include <unistd.h>
+#ifndef JSMN_PARENT_LINKS
+#define JSMN_PARENT_LINKS 1
+#endif
+
#include "config.h"
#include "jsmn.h"
#include "utarray.h"
#include "uthash.h"
+#ifdef ENABLE_MEMORY_PROFILING
+#include <stdarg.h>
+#endif
+
#define nDPIsrvd_MAX_JSON_TOKENS 128
-#define nDPIsrvd_FLOW_KEY_TOKENS 3
-#define nDPIsrvd_FLOW_KEY_STRLEN 24
#define nDPIsrvd_JSON_KEY_STRLEN 32
#define nDPIsrvd_STRLEN_SZ(s) (sizeof(s) / sizeof(s[0]) - sizeof(s[0]))
@@ -32,7 +38,7 @@
#define TOKEN_VALUE_TO_ULL(token, value) token_value_to_ull(token, value)
#define FIRST_ENUM_VALUE 1
-#define LAST_ENUM_VALUE CONVERSION_LAST_ENUM_VALUE
+#define LAST_ENUM_VALUE CLEANUP_REASON_LAST_ENUM_VALUE
enum nDPIsrvd_connect_return
{
@@ -87,41 +93,68 @@ enum nDPIsrvd_conversion_return
CONVERSION_LAST_ENUM_VALUE
};
-typedef unsigned long long int nDPIsrvd_ull;
-typedef nDPIsrvd_ull * nDPIsrvd_ull_ptr;
-
-struct nDPIsrvd_flow_key
+enum nDPIsrvd_cleanup_reason
{
- char key[nDPIsrvd_FLOW_KEY_STRLEN];
+ CLEANUP_REASON_DAEMON_INIT = CONVERSION_LAST_ENUM_VALUE, // can happen if kill -SIGKILL $(pidof nDPId) or restart
+ // after SIGSEGV
+ CLEANUP_REASON_DAEMON_SHUTDOWN, // graceful shutdown e.g. kill -SIGTERM $(pidof nDPId)
+ CLEANUP_REASON_FLOW_END,
+ CLEANUP_REASON_FLOW_IDLE,
+ CLEANUP_REASON_FLOW_TIMEOUT,
+ CLEANUP_REASON_APP_SHUTDOWN,
+
+ CLEANUP_REASON_LAST_ENUM_VALUE
};
+typedef unsigned long long int nDPIsrvd_ull;
+typedef nDPIsrvd_ull * nDPIsrvd_ull_ptr;
+typedef int nDPIsrvd_hashkey;
+
struct nDPIsrvd_flow
{
- struct nDPIsrvd_flow_key flow_key;
+ nDPIsrvd_hashkey flow_key;
nDPIsrvd_ull id_as_ull;
+ nDPIsrvd_ull last_seen;
+ nDPIsrvd_ull idle_time;
UT_hash_handle hh;
uint8_t flow_user_data[0];
};
+struct nDPIsrvd_instance
+{
+ nDPIsrvd_hashkey alias_source_key;
+ nDPIsrvd_ull most_recent_flow_time;
+ struct nDPIsrvd_flow * flow_table;
+ UT_hash_handle hh;
+};
+
struct nDPIsrvd_json_token
{
char key[nDPIsrvd_JSON_KEY_STRLEN];
- UT_hash_handle hh;
char const * value;
int key_length;
int value_length;
int token_index;
+ UT_hash_handle hh;
};
struct nDPIsrvd_socket;
+static inline void * nDPIsrvd_calloc(size_t const n, size_t const size);
+static inline void * nDPIsrvd_malloc(size_t const size);
+static inline void nDPIsrvd_free(void * const freeable);
#ifdef ENABLE_MEMORY_PROFILING
static inline void * nDPIsrvd_uthash_malloc(size_t const size);
static inline void nDPIsrvd_uthash_free(void * const freeable, size_t const size);
+extern void nDPIsrvd_memprof_log(char const * const format, ...);
#endif
typedef enum nDPIsrvd_callback_return (*json_callback)(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
struct nDPIsrvd_flow * const flow);
-typedef void (*flow_end_callback)(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_flow * const flow);
+typedef void (*flow_cleanup_callback)(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_flow * const flow,
+ enum nDPIsrvd_cleanup_reason reason);
struct nDPIsrvd_address
{
@@ -162,9 +195,9 @@ struct nDPIsrvd_socket
struct nDPIsrvd_address address;
size_t flow_user_data_size;
- struct nDPIsrvd_flow * flow_table;
+ struct nDPIsrvd_instance * instance_table;
json_callback json_callback;
- flow_end_callback flow_end_callback;
+ flow_cleanup_callback flow_cleanup_callback;
struct nDPIsrvd_buffer buffer;
struct nDPIsrvd_jsmn jsmn;
@@ -180,7 +213,7 @@ struct nDPIsrvd_socket
uint8_t global_user_data[0];
};
-static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock);
+static inline void nDPIsrvd_socket_free(struct nDPIsrvd_socket ** const sock);
/* Slightly modified code: https://en.wikibooks.org/wiki/Algorithm_Implementation/Miscellaneous/Base64 */
#define WHITESPACE 64
@@ -286,6 +319,13 @@ static inline char const * nDPIsrvd_enum_to_string(int enum_value)
"CONVERSION_NOT_A_NUMBER",
"CONVERSION_RANGE_EXCEEDED",
+ "CLEANUP_REASON_DAEMON_INIT",
+ "CLEANUP_REASON_DAEMON_SHUTDOWN",
+ "CLEANUP_REASON_FLOW_END",
+ "CLEANUP_REASON_FLOW_IDLE",
+ "CLEANUP_REASON_FLOW_TIMEOUT",
+ "CLEANUP_REASON_APP_SHUTDOWN",
+
[LAST_ENUM_VALUE] = "LAST_ENUM_VALUE"};
if (enum_value < FIRST_ENUM_VALUE || enum_value >= LAST_ENUM_VALUE)
@@ -298,12 +338,12 @@ static inline char const * nDPIsrvd_enum_to_string(int enum_value)
static inline int nDPIsrvd_buffer_init(struct nDPIsrvd_buffer * const buffer, size_t buffer_size)
{
- if (buffer->ptr.raw != NULL && buffer->max != buffer_size)
+ if (buffer->ptr.raw != NULL)
{
return 1; /* Do not fail and realloc()? */
}
- buffer->ptr.raw = (uint8_t *)malloc(buffer_size);
+ buffer->ptr.raw = (uint8_t *)nDPIsrvd_malloc(buffer_size);
if (buffer->ptr.raw == NULL)
{
return 1;
@@ -320,17 +360,17 @@ static inline int nDPIsrvd_buffer_init(struct nDPIsrvd_buffer * const buffer, si
static inline void nDPIsrvd_buffer_free(struct nDPIsrvd_buffer * const buffer)
{
- free(buffer->ptr.raw);
+ nDPIsrvd_free(buffer->ptr.raw);
buffer->ptr.raw = NULL;
}
-static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_size,
- size_t flow_user_data_size,
- json_callback json_cb,
- flow_end_callback flow_end_cb)
+static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_data_size,
+ size_t flow_user_data_size,
+ json_callback json_cb,
+ flow_cleanup_callback flow_cleanup_callback_cb)
{
static const UT_icd packet_data_icd = {sizeof(struct nDPIsrvd_json_token), NULL, NULL, NULL};
- struct nDPIsrvd_socket * sock = (struct nDPIsrvd_socket *)malloc(sizeof(*sock) + global_user_data_size);
+ struct nDPIsrvd_socket * sock = (struct nDPIsrvd_socket *)nDPIsrvd_calloc(1, sizeof(*sock) + global_user_data_size);
if (json_cb == NULL)
{
@@ -339,8 +379,6 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_siz
if (sock != NULL)
{
- memset(sock, 0, sizeof(*sock));
-
sock->fd = -1;
if (nDPIsrvd_buffer_init(&sock->buffer, NETWORK_BUFFER_MAX_SIZE) != 0)
{
@@ -350,7 +388,7 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_siz
sock->flow_user_data_size = flow_user_data_size;
sock->json_callback = json_cb;
- sock->flow_end_callback = flow_end_cb;
+ sock->flow_cleanup_callback = flow_cleanup_callback_cb;
utarray_new(sock->json.tokens, &packet_data_icd);
if (sock->json.tokens == NULL)
@@ -365,14 +403,49 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_siz
return sock;
error:
nDPIsrvd_buffer_free(&sock->buffer);
- nDPIsrvd_free(&sock);
+ nDPIsrvd_socket_free(&sock);
return NULL;
}
-static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock)
+static inline void nDPIsrvd_cleanup_flow(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_flow * const flow,
+ enum nDPIsrvd_cleanup_reason reason)
+{
+ if (sock->flow_cleanup_callback != NULL)
+ {
+ sock->flow_cleanup_callback(sock, instance, flow, reason);
+ }
+ HASH_DEL(instance->flow_table, flow);
+ nDPIsrvd_free(flow);
+}
+
+static inline void nDPIsrvd_cleanup_instance(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
+ enum nDPIsrvd_cleanup_reason reason)
{
struct nDPIsrvd_flow * current_flow;
struct nDPIsrvd_flow * ftmp;
+
+ if (instance != NULL)
+ {
+ if (instance->flow_table != NULL)
+ {
+ HASH_ITER(hh, instance->flow_table, current_flow, ftmp)
+ {
+ nDPIsrvd_cleanup_flow(sock, instance, current_flow, reason);
+ }
+ instance->flow_table = NULL;
+ }
+ HASH_DEL(sock->instance_table, instance);
+ nDPIsrvd_free(instance);
+ }
+}
+
+static inline void nDPIsrvd_socket_free(struct nDPIsrvd_socket ** const sock)
+{
+ struct nDPIsrvd_instance * current_instance;
+ struct nDPIsrvd_instance * itmp;
struct nDPIsrvd_json_token * current_json_token;
struct nDPIsrvd_json_token * jtmp;
@@ -395,22 +468,13 @@ static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock)
utarray_free((*sock)->json.tokens);
}
- if ((*sock)->flow_table != NULL)
+ HASH_ITER(hh, (*sock)->instance_table, current_instance, itmp)
{
- HASH_ITER(hh, (*sock)->flow_table, current_flow, ftmp)
- {
- if ((*sock)->flow_end_callback != NULL)
- {
- (*sock)->flow_end_callback(*sock, current_flow);
- }
- HASH_DEL((*sock)->flow_table, current_flow);
- free(current_flow);
- }
- (*sock)->flow_table = NULL;
+ nDPIsrvd_cleanup_instance(*sock, current_instance, CLEANUP_REASON_APP_SHUTDOWN);
}
-
+ (*sock)->instance_table = NULL;
nDPIsrvd_buffer_free(&(*sock)->buffer);
- free(*sock);
+ nDPIsrvd_free(*sock);
*sock = NULL;
}
@@ -418,8 +482,8 @@ static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock)
static inline int nDPIsrvd_setup_address(struct nDPIsrvd_address * const address, char const * const destination)
{
size_t len = strlen(destination);
- char * first_colon = strchr(destination, ':');
- char * last_colon = strrchr(destination, ':');
+ char const * first_colon = strchr(destination, ':');
+ char const * last_colon = strrchr(destination, ':');
memset(address, 0, sizeof(*address));
@@ -505,7 +569,9 @@ static inline enum nDPIsrvd_connect_return nDPIsrvd_connect(struct nDPIsrvd_sock
static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * const sock)
{
- ssize_t bytes_read = read(sock->fd, sock->buffer.ptr.raw + sock->buffer.used, sock->buffer.max - sock->buffer.used);
+ ssize_t bytes_read = read(sock->fd,
+ sock->buffer.ptr.raw + sock->buffer.used,
+ sock->buffer.max - sock->buffer.used);
if (bytes_read == 0)
{
@@ -663,88 +729,191 @@ static inline enum nDPIsrvd_conversion_return token_value_to_ull(struct nDPIsrvd
return str_value_to_ull(token->value, value);
}
-static inline int nDPIsrvd_build_flow_key(struct nDPIsrvd_flow_key * const key,
- struct nDPIsrvd_json_token const * const tokens[nDPIsrvd_FLOW_KEY_TOKENS])
+static nDPIsrvd_hashkey nDPIsrvd_build_key(char const * str, int len)
+{
+ uint32_t hash = 5381;
+ uint32_t c;
+
+ while (len-- > 0 && (c = *str++) != 0)
+ {
+ hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
+ }
+
+ return hash;
+}
+
+static inline int nDPIsrvd_build_instance_key(struct nDPIsrvd_json_token const * const alias,
+ struct nDPIsrvd_json_token const * const source,
+ nDPIsrvd_hashkey * const alias_source_key)
{
- if (tokens[0] == NULL || tokens[1] == NULL || tokens[2] == NULL)
+ if (alias == NULL || source == NULL)
{
return 1;
}
- if (snprintf(key->key,
- nDPIsrvd_FLOW_KEY_STRLEN,
- "%.*s-%.*s-%.*s",
- tokens[0]->value_length,
- tokens[0]->value,
- tokens[1]->value_length,
- tokens[1]->value,
- tokens[2]->value_length,
- tokens[2]->value) <= 0)
+ *alias_source_key = nDPIsrvd_build_key(alias->value, alias->value_length);
+ *alias_source_key ^= nDPIsrvd_build_key(source->value, source->value_length);
+
+ return 0;
+}
+
+static inline int nDPIsrvd_build_flow_key(struct nDPIsrvd_json_token const * const flow_id_token,
+ nDPIsrvd_hashkey * const flow_key)
+{
+ if (flow_id_token == NULL)
{
return 1;
}
+ *flow_key = nDPIsrvd_build_key(flow_id_token->value, flow_id_token->value_length);
+
return 0;
}
-static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * const sock,
- struct nDPIsrvd_json_token const * const flow_id)
+static inline struct nDPIsrvd_instance * nDPIsrvd_get_instance(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_json_token const * const alias,
+ struct nDPIsrvd_json_token const * const source)
{
- struct nDPIsrvd_json_token const * const tokens[nDPIsrvd_FLOW_KEY_TOKENS] = {
- flow_id,
- TOKEN_GET_SZ(sock, "alias"),
- TOKEN_GET_SZ(sock, "source"),
- };
- struct nDPIsrvd_flow_key key = {};
+ struct nDPIsrvd_instance * instance;
+ nDPIsrvd_hashkey alias_source_key;
- if (nDPIsrvd_build_flow_key(&key, tokens) != 0)
+ if (nDPIsrvd_build_instance_key(alias, source, &alias_source_key) != 0)
{
return NULL;
}
- struct nDPIsrvd_flow * flow = NULL;
- HASH_FIND(hh, sock->flow_table, &key, sizeof(key), flow);
+ HASH_FIND_INT(sock->instance_table, &alias_source_key, instance);
+
+ if (instance == NULL)
+ {
+ instance = (struct nDPIsrvd_instance *)nDPIsrvd_calloc(1, sizeof(*instance));
+ if (instance == NULL)
+ {
+ return NULL;
+ }
+
+ instance->alias_source_key = alias_source_key;
+ HASH_ADD_INT(sock->instance_table, alias_source_key, instance);
+#ifdef ENABLE_MEMORY_PROFILING
+ nDPIsrvd_memprof_log("Instance alias \"%.*s\" with source \"%.*s\" added: %zu bytes.",
+ alias->value_length,
+ alias->value,
+ source->value_length,
+ source->value,
+ sizeof(*instance));
+#endif
+ }
+
+ return instance;
+}
+
+static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance ** const instance)
+{
+ struct nDPIsrvd_flow * flow;
+ struct nDPIsrvd_json_token const * const tokens[] = {TOKEN_GET_SZ(sock, "alias"),
+ TOKEN_GET_SZ(sock, "source"),
+ TOKEN_GET_SZ(sock, "flow_id"),
+ TOKEN_GET_SZ(sock, "ts_msec"),
+ TOKEN_GET_SZ(sock, "flow_last_seen"),
+ TOKEN_GET_SZ(sock, "flow_idle_time")};
+ nDPIsrvd_hashkey flow_key;
+
+ *instance = nDPIsrvd_get_instance(sock, tokens[0], tokens[1]);
+ if (*instance == NULL || nDPIsrvd_build_flow_key(tokens[2], &flow_key) != 0)
+ {
+ return NULL;
+ }
+
+ HASH_FIND_INT((*instance)->flow_table, &flow_key, flow);
if (flow == NULL)
{
- flow = (struct nDPIsrvd_flow *)calloc(1, sizeof(*flow) + sock->flow_user_data_size);
+ flow = (struct nDPIsrvd_flow *)nDPIsrvd_calloc(1, sizeof(*flow) + sock->flow_user_data_size);
if (flow == NULL)
{
return NULL;
}
- TOKEN_VALUE_TO_ULL(tokens[0], &flow->id_as_ull);
- memcpy(flow->flow_key.key, key.key, nDPIsrvd_FLOW_KEY_STRLEN);
- HASH_ADD(hh, sock->flow_table, flow_key, sizeof(flow->flow_key), flow);
+ flow->flow_key = flow_key;
+ TOKEN_VALUE_TO_ULL(tokens[2], &flow->id_as_ull);
+ HASH_ADD_INT((*instance)->flow_table, flow_key, flow);
#ifdef ENABLE_MEMORY_PROFILING
- printf("Flow %llu added: %zu bytes.\n", flow->id_as_ull, sizeof(*flow) + sock->flow_user_data_size);
+ nDPIsrvd_memprof_log("Flow %llu added: %zu bytes.", flow->id_as_ull, sizeof(*flow) + sock->flow_user_data_size);
#endif
}
+ if (tokens[3] != NULL)
+ {
+ nDPIsrvd_ull ts_msec;
+ TOKEN_VALUE_TO_ULL(tokens[3], &ts_msec);
+ if (ts_msec > (*instance)->most_recent_flow_time)
+ {
+ (*instance)->most_recent_flow_time = ts_msec;
+ }
+ }
+
+ if (tokens[4] != NULL)
+ {
+ nDPIsrvd_ull flow_last_seen;
+ TOKEN_VALUE_TO_ULL(tokens[4], &flow_last_seen);
+ flow->last_seen = flow_last_seen;
+ }
+
+ if (tokens[5] != NULL)
+ {
+ nDPIsrvd_ull flow_idle_time = 0;
+ TOKEN_VALUE_TO_ULL(tokens[5], &flow_idle_time);
+ flow->idle_time = flow_idle_time;
+ }
+
return flow;
}
static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
struct nDPIsrvd_flow * const current_flow)
{
- if (current_flow == NULL)
+ if (instance == NULL || current_flow == NULL)
{
- return 1;
+ return 0;
}
- struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name");
+ struct nDPIsrvd_json_token const * const daemon_event_name = TOKEN_GET_SZ(sock, "daemon_event_name");
+ if (TOKEN_VALUE_EQUALS_SZ(daemon_event_name, "init") != 0)
+ {
+ nDPIsrvd_cleanup_instance(sock, instance, CLEANUP_REASON_DAEMON_INIT);
+ }
+ if (TOKEN_VALUE_EQUALS_SZ(daemon_event_name, "shutdown") != 0)
+ {
+ nDPIsrvd_cleanup_instance(sock, instance, CLEANUP_REASON_DAEMON_SHUTDOWN);
+ }
- if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle") != 0 || TOKEN_VALUE_EQUALS_SZ(flow_event_name, "end") != 0)
+ struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name");
+ int is_idle_flow;
+ if ((is_idle_flow = TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle")) != 0 ||
+ TOKEN_VALUE_EQUALS_SZ(flow_event_name, "end") != 0)
{
- if (sock->flow_end_callback != NULL)
- {
- sock->flow_end_callback(sock, current_flow);
- }
- HASH_DEL(sock->flow_table, current_flow);
#ifdef ENABLE_MEMORY_PROFILING
- printf("Flow %llu deleted: %zu bytes.\n", current_flow->id_as_ull, sizeof(*current_flow) + sock->flow_user_data_size);
+ nDPIsrvd_memprof_log("Flow %llu deleted: %zu bytes.",
+ current_flow->id_as_ull,
+ sizeof(*current_flow) + sock->flow_user_data_size);
#endif
- free(current_flow);
+ nDPIsrvd_cleanup_flow(sock,
+ instance,
+ current_flow,
+ (is_idle_flow != 0 ? CLEANUP_REASON_FLOW_IDLE : CLEANUP_REASON_FLOW_END));
+ }
+ else if (current_flow->last_seen + current_flow->idle_time < instance->most_recent_flow_time)
+ {
+#ifdef ENABLE_MEMORY_PROFILING
+ nDPIsrvd_memprof_log("Flow %llu timed out: %zu bytes. Last seen [%llu] + idle time [%llu] < most recent flow time [%llu]. Diff: [%llu]",
+ current_flow->id_as_ull,
+ sizeof(*current_flow) + sock->flow_user_data_size,
+ current_flow->last_seen, current_flow->idle_time, instance->most_recent_flow_time,
+ instance->most_recent_flow_time - (current_flow->last_seen + current_flow->idle_time));
+#endif
+ nDPIsrvd_cleanup_flow(sock, instance, current_flow, CLEANUP_REASON_FLOW_TIMEOUT);
}
return 0;
@@ -813,9 +982,9 @@ static void nDPIsrvd_drain_buffer(struct nDPIsrvd_buffer * const buffer)
static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_socket * const sock)
{
- enum nDPIsrvd_parse_return ret;
+ enum nDPIsrvd_parse_return ret = PARSE_OK;
- while ((ret = nDPIsrvd_parse_line(&sock->buffer, &sock->jsmn)) == PARSE_OK)
+ while (ret == PARSE_OK && (ret = nDPIsrvd_parse_line(&sock->buffer, &sock->jsmn)) == PARSE_OK)
{
char const * key = NULL;
int key_length = 0;
@@ -852,8 +1021,8 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_sock
{
struct nDPIsrvd_json_token jt = {.value = jsmn_token_get(sock, current_token),
.value_length = jsmn_token_size(sock, current_token),
- .hh = {},
- .token_index = current_token - 1};
+ .token_index = current_token - 1,
+ .hh = {}};
if (key == NULL || key_length > nDPIsrvd_JSON_KEY_STRLEN ||
utarray_len(sock->json.tokens) == nDPIsrvd_MAX_JSON_TOKENS)
@@ -875,21 +1044,14 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_sock
}
}
- struct nDPIsrvd_json_token const * const flow_id = TOKEN_GET_SZ(sock, "flow_id");
+ struct nDPIsrvd_instance * instance = NULL;
struct nDPIsrvd_flow * flow = NULL;
- if (flow_id != NULL)
- {
- flow = nDPIsrvd_get_flow(sock, flow_id);
- if (flow == NULL)
- {
- ret = PARSE_FLOW_MGMT_ERROR;
- }
- }
- if (ret == PARSE_OK && sock->json_callback(sock, flow) != CALLBACK_OK)
+ flow = nDPIsrvd_get_flow(sock, &instance);
+ if (ret == PARSE_OK && sock->json_callback(sock, instance, flow) != CALLBACK_OK)
{
ret = PARSE_JSON_CALLBACK_ERROR;
}
- if (flow_id != NULL && nDPIsrvd_check_flow_end(sock, flow) != 0)
+ if (nDPIsrvd_check_flow_end(sock, instance, flow) != 0)
{
ret = PARSE_FLOW_MGMT_ERROR;
}
@@ -912,6 +1074,55 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_sock
return ret;
}
+static inline void * nDPIsrvd_calloc(size_t const n, size_t const size)
+{
+ void * p = nDPIsrvd_malloc(n * size);
+
+ if (p == NULL)
+ {
+ return NULL;
+ }
+ memset(p, 0, n * size);
+
+ return p;
+}
+
+static inline void * nDPIsrvd_malloc(size_t const size)
+{
+ void * p = malloc(sizeof(uint64_t) + size);
+
+ if (p == NULL)
+ {
+ return NULL;
+ }
+
+ *(uint64_t *)p = size;
+#ifdef ENABLE_MEMORY_PROFILING
+ nDPIsrvd_memprof_log("malloc(%zu)", size);
+#endif
+
+ return (uint8_t *)p + sizeof(uint64_t);
+}
+
+static inline void nDPIsrvd_free(void * const freeable)
+{
+ void * p;
+
+ if (freeable == NULL)
+ {
+ return;
+ }
+
+ p = (uint8_t *)freeable - sizeof(uint64_t);
+
+#ifdef ENABLE_MEMORY_PROFILING
+ size_t size = *(uint64_t *)p;
+ nDPIsrvd_memprof_log("free(%zu)", size);
+#endif
+
+ free(p);
+}
+
#ifdef ENABLE_MEMORY_PROFILING
static inline void * nDPIsrvd_uthash_malloc(size_t const size)
{
@@ -921,28 +1132,62 @@ static inline void * nDPIsrvd_uthash_malloc(size_t const size)
{
return NULL;
}
- printf("malloc(%zu)\n", size);
+ nDPIsrvd_memprof_log("uthash malloc(%zu)", size);
return p;
}
static inline void nDPIsrvd_uthash_free(void * const freeable, size_t const size)
{
- printf("free(%zu)\n", size);
+ nDPIsrvd_memprof_log("uthash free(%zu)", size);
free(freeable);
}
#endif
-static inline void nDPIsrvd_write_flow_info(int outfd, struct nDPIsrvd_socket const * const sock, void (*write_cb)(int outfd, struct nDPIsrvd_flow *))
+static inline int nDPIsrvd_verify_flows(struct nDPIsrvd_instance * const instance,
+ void (*verify_cb)(struct nDPIsrvd_flow const *, void * user_data),
+ void * user_data)
{
- struct nDPIsrvd_flow * current_flow;
- struct nDPIsrvd_flow * ftmp;
+ int retval = 0;
+ struct nDPIsrvd_flow const * current_flow;
+ struct nDPIsrvd_flow const * ftmp;
- HASH_ITER(hh, sock->flow_table, current_flow, ftmp)
+ HASH_ITER(hh, instance->flow_table, current_flow, ftmp)
{
- dprintf(outfd, "[Flow %4llu]", current_flow->id_as_ull);
- write_cb(outfd, current_flow);
- dprintf(outfd, "%c", '\n');
+ if (current_flow->last_seen + current_flow->idle_time < instance->most_recent_flow_time)
+ {
+ if (verify_cb != NULL)
+ {
+ verify_cb(current_flow, user_data);
+ }
+ retval = 1;
+ }
+ }
+
+ return retval;
+}
+
+static inline void nDPIsrvd_flow_info(struct nDPIsrvd_socket const * const sock,
+ void (*info_cb)(struct nDPIsrvd_flow const *, void * user_data),
+ void * user_data)
+{
+ struct nDPIsrvd_instance const * current_instance;
+ struct nDPIsrvd_instance const * itmp;
+ struct nDPIsrvd_flow const * current_flow;
+ struct nDPIsrvd_flow const * ftmp;
+
+ if (sock->instance_table != NULL)
+ {
+ HASH_ITER(hh, sock->instance_table, current_instance, itmp)
+ {
+ if (current_instance->flow_table != NULL)
+ {
+ HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp)
+ {
+ info_cb(current_flow, user_data);
+ }
+ }
+ }
}
}