summaryrefslogtreecommitdiff
path: root/dependencies/nDPIsrvd.h
diff options
context:
space:
mode:
Diffstat (limited to 'dependencies/nDPIsrvd.h')
-rw-r--r--dependencies/nDPIsrvd.h288
1 files changed, 232 insertions, 56 deletions
diff --git a/dependencies/nDPIsrvd.h b/dependencies/nDPIsrvd.h
index 1e732f5c6..44bd4703c 100644
--- a/dependencies/nDPIsrvd.h
+++ b/dependencies/nDPIsrvd.h
@@ -53,7 +53,7 @@ enum nDPIsrvd_read_return
{
READ_OK = CONNECT_LAST_ENUM_VALUE,
READ_PEER_DISCONNECT,
- READ_ERROR,
+ READ_ERROR, /* check for errno */
READ_LAST_ENUM_VALUE
};
@@ -118,18 +118,28 @@ struct nDPIsrvd_flow
{
nDPIsrvd_hashkey flow_key;
nDPIsrvd_ull id_as_ull;
+ nDPIsrvd_hashkey thread_id;
nDPIsrvd_ull last_seen;
nDPIsrvd_ull idle_time;
UT_hash_handle hh;
uint8_t flow_user_data[0];
};
+struct nDPIsrvd_thread_data
+{
+ nDPIsrvd_hashkey thread_key;
+ nDPIsrvd_ull most_recent_flow_time;
+ UT_hash_handle hh;
+ uint8_t thread_user_data[0];
+};
+
struct nDPIsrvd_instance
{
nDPIsrvd_hashkey alias_source_key;
- nDPIsrvd_ull most_recent_flow_time;
struct nDPIsrvd_flow * flow_table;
+ struct nDPIsrvd_thread_data * thread_data_table;
UT_hash_handle hh;
+ uint8_t instance_user_data[0];
};
struct nDPIsrvd_json_token
@@ -154,9 +164,14 @@ extern void nDPIsrvd_memprof_log(char const * const format, ...);
typedef enum nDPIsrvd_callback_return (*json_callback)(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_thread_data * const thread_data,
struct nDPIsrvd_flow * const flow);
+typedef void (*instance_cleanup_callback)(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
+ enum nDPIsrvd_cleanup_reason reason);
typedef void (*flow_cleanup_callback)(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_thread_data * const thread_data,
struct nDPIsrvd_flow * const flow,
enum nDPIsrvd_cleanup_reason reason);
@@ -198,9 +213,12 @@ struct nDPIsrvd_socket
int fd;
struct nDPIsrvd_address address;
+ size_t instance_user_data_size;
+ size_t thread_user_data_size;
size_t flow_user_data_size;
struct nDPIsrvd_instance * instance_table;
json_callback json_callback;
+ instance_cleanup_callback instance_cleanup_callback;
flow_cleanup_callback flow_cleanup_callback;
struct nDPIsrvd_buffer buffer;
@@ -373,8 +391,11 @@ static inline void nDPIsrvd_buffer_free(struct nDPIsrvd_buffer * const buffer)
}
static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_data_size,
+ size_t instance_user_data_size,
+ size_t thread_user_data_size,
size_t flow_user_data_size,
json_callback json_cb,
+ instance_cleanup_callback instance_cleanup_cb,
flow_cleanup_callback flow_cleanup_callback_cb)
{
static const UT_icd packet_data_icd = {sizeof(struct nDPIsrvd_json_token), NULL, NULL, NULL};
@@ -393,9 +414,13 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_d
goto error;
}
sock->address.raw.sa_family = -1;
+
+ sock->instance_user_data_size = instance_user_data_size;
+ sock->thread_user_data_size = thread_user_data_size;
sock->flow_user_data_size = flow_user_data_size;
sock->json_callback = json_cb;
+ sock->instance_cleanup_callback = instance_cleanup_cb;
sock->flow_cleanup_callback = flow_cleanup_callback_cb;
utarray_new(sock->json.tokens, &packet_data_icd);
@@ -417,34 +442,72 @@ error:
static inline void nDPIsrvd_cleanup_flow(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_thread_data * const thread_data,
struct nDPIsrvd_flow * const flow,
enum nDPIsrvd_cleanup_reason reason)
{
if (sock->flow_cleanup_callback != NULL)
{
- sock->flow_cleanup_callback(sock, instance, flow, reason);
+ sock->flow_cleanup_callback(sock, instance, thread_data, flow, reason);
}
HASH_DEL(instance->flow_table, flow);
nDPIsrvd_free(flow);
}
+static inline void nDPIsrvd_cleanup_flows(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_thread_data * const thread_data,
+ enum nDPIsrvd_cleanup_reason reason)
+{
+ struct nDPIsrvd_flow * current_flow;
+ struct nDPIsrvd_flow * ftmp;
+
+ if (instance->flow_table != NULL)
+ {
+#ifdef ENABLE_MEMORY_PROFILING
+ nDPIsrvd_memprof_log("Cleaning up flows for instance 0x%x and thread %d.",
+ instance->alias_source_key,
+ thread_data->thread_key);
+#endif
+
+ HASH_ITER(hh, instance->flow_table, current_flow, ftmp)
+ {
+ if (current_flow->thread_id == thread_data->thread_key)
+ {
+ nDPIsrvd_cleanup_flow(sock, instance, thread_data, current_flow, reason);
+ }
+ }
+ }
+}
+
static inline void nDPIsrvd_cleanup_instance(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
enum nDPIsrvd_cleanup_reason reason)
{
- struct nDPIsrvd_flow * current_flow;
- struct nDPIsrvd_flow * ftmp;
+ struct nDPIsrvd_thread_data * current_thread_data;
+ struct nDPIsrvd_thread_data * ttmp;
if (instance != NULL)
{
- if (instance->flow_table != NULL)
+#ifdef ENABLE_MEMORY_PROFILING
+ nDPIsrvd_memprof_log("Cleaning up instance 0x%x.", instance->alias_source_key);
+#endif
+ if (sock->instance_cleanup_callback != NULL)
+ {
+ sock->instance_cleanup_callback(sock, instance, reason);
+ }
+
+ if (instance->thread_data_table != NULL)
{
- HASH_ITER(hh, instance->flow_table, current_flow, ftmp)
+ HASH_ITER(hh, instance->thread_data_table, current_thread_data, ttmp)
{
- nDPIsrvd_cleanup_flow(sock, instance, current_flow, reason);
+ nDPIsrvd_cleanup_flows(sock, instance, current_thread_data, reason);
+ HASH_DEL(instance->thread_data_table, current_thread_data);
+ nDPIsrvd_free(current_thread_data);
}
- instance->flow_table = NULL;
+ instance->thread_data_table = NULL;
}
+
HASH_DEL(sock->instance_table, instance);
nDPIsrvd_free(instance);
}
@@ -481,6 +544,7 @@ static inline void nDPIsrvd_socket_free(struct nDPIsrvd_socket ** const sock)
nDPIsrvd_cleanup_instance(*sock, current_instance, CLEANUP_REASON_APP_SHUTDOWN);
}
(*sock)->instance_table = NULL;
+
nDPIsrvd_buffer_free(&(*sock)->buffer);
nDPIsrvd_free(*sock);
@@ -582,9 +646,7 @@ static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * c
return READ_OK;
}
- ssize_t bytes_read = read(sock->fd,
- sock->buffer.ptr.raw + sock->buffer.used,
- sock->buffer.max - sock->buffer.used);
+ ssize_t bytes_read = read(sock->fd, sock->buffer.ptr.raw + sock->buffer.used, sock->buffer.max - sock->buffer.used);
if (bytes_read == 0)
{
@@ -799,7 +861,7 @@ static inline struct nDPIsrvd_instance * nDPIsrvd_get_instance(struct nDPIsrvd_s
if (instance == NULL)
{
- instance = (struct nDPIsrvd_instance *)nDPIsrvd_calloc(1, sizeof(*instance));
+ instance = (struct nDPIsrvd_instance *)nDPIsrvd_calloc(1, sizeof(*instance) + sock->instance_user_data_size);
if (instance == NULL)
{
return NULL;
@@ -820,24 +882,100 @@ static inline struct nDPIsrvd_instance * nDPIsrvd_get_instance(struct nDPIsrvd_s
return instance;
}
+static inline struct nDPIsrvd_thread_data * nDPIsrvd_get_thread_data(
+ struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_json_token const * const thread_id_token,
+ struct nDPIsrvd_json_token const * const ts_msec_token)
+{
+ struct nDPIsrvd_thread_data * thread_data;
+ nDPIsrvd_hashkey thread_id;
+
+ if (thread_id_token == NULL)
+ {
+ return NULL;
+ }
+
+ {
+ nDPIsrvd_ull thread_key;
+ TOKEN_VALUE_TO_ULL(thread_id_token, &thread_key);
+ thread_id = thread_key;
+ }
+
+ HASH_FIND_INT(instance->thread_data_table, &thread_id, thread_data);
+
+ if (thread_data == NULL)
+ {
+ thread_data =
+ (struct nDPIsrvd_thread_data *)nDPIsrvd_calloc(1, sizeof(*thread_data) + sock->thread_user_data_size);
+ if (thread_data == NULL)
+ {
+ return NULL;
+ }
+
+ thread_data->thread_key = thread_id;
+ HASH_ADD_INT(instance->thread_data_table, thread_key, thread_data);
+#ifdef ENABLE_MEMORY_PROFILING
+ nDPIsrvd_memprof_log("Thread Data %d added: %zu bytes.",
+ thread_data->thread_key,
+ sizeof(*thread_data) + sock->thread_user_data_size);
+#endif
+ }
+
+ if (ts_msec_token != NULL)
+ {
+ nDPIsrvd_ull thread_ts_msec;
+ TOKEN_VALUE_TO_ULL(ts_msec_token, &thread_ts_msec);
+
+ if (thread_ts_msec > thread_data->most_recent_flow_time)
+ {
+ thread_data->most_recent_flow_time = thread_ts_msec;
+ }
+ }
+
+ return thread_data;
+}
+
static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * const sock,
- struct nDPIsrvd_instance ** const instance)
+ struct nDPIsrvd_instance ** const instance,
+ struct nDPIsrvd_thread_data ** const thread_data)
{
struct nDPIsrvd_flow * flow;
struct nDPIsrvd_json_token const * const tokens[] = {TOKEN_GET_SZ(sock, "alias"),
TOKEN_GET_SZ(sock, "source"),
+ TOKEN_GET_SZ(sock, "thread_id"),
TOKEN_GET_SZ(sock, "flow_id"),
TOKEN_GET_SZ(sock, "thread_ts_msec"),
TOKEN_GET_SZ(sock, "flow_last_seen"),
TOKEN_GET_SZ(sock, "flow_idle_time")};
+ enum
+ {
+ TOKEN_ALIAS = 0,
+ TOKEN_SOURCE,
+ TOKEN_THREAD_ID,
+ TOKEN_FLOW_ID,
+ TOKEN_THREAD_TS_MSEC,
+ TOKEN_FLOW_LAST_SEEN,
+ TOKEN_FLOW_IDLE_TIME
+ };
nDPIsrvd_hashkey flow_key;
- *instance = nDPIsrvd_get_instance(sock, tokens[0], tokens[1]);
- if (*instance == NULL || nDPIsrvd_build_flow_key(tokens[2], &flow_key) != 0)
+ *instance = nDPIsrvd_get_instance(sock, tokens[TOKEN_ALIAS], tokens[TOKEN_SOURCE]);
+ if (*instance == NULL)
{
return NULL;
}
+ *thread_data = nDPIsrvd_get_thread_data(sock, *instance, tokens[TOKEN_THREAD_ID], tokens[TOKEN_THREAD_TS_MSEC]);
+ if (*thread_data == NULL)
+ {
+ return NULL;
+ }
+
+ if (nDPIsrvd_build_flow_key(tokens[TOKEN_FLOW_ID], &flow_key) != 0)
+ {
+ return NULL;
+ }
HASH_FIND_INT((*instance)->flow_table, &flow_key, flow);
if (flow == NULL)
@@ -849,34 +987,26 @@ static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket *
}
flow->flow_key = flow_key;
- TOKEN_VALUE_TO_ULL(tokens[2], &flow->id_as_ull);
+ flow->thread_id = (*thread_data)->thread_key;
+
+ TOKEN_VALUE_TO_ULL(tokens[TOKEN_FLOW_ID], &flow->id_as_ull);
HASH_ADD_INT((*instance)->flow_table, flow_key, flow);
#ifdef ENABLE_MEMORY_PROFILING
nDPIsrvd_memprof_log("Flow %llu added: %zu bytes.", flow->id_as_ull, sizeof(*flow) + sock->flow_user_data_size);
#endif
}
- if (tokens[3] != NULL)
- {
- nDPIsrvd_ull thread_ts_msec;
- TOKEN_VALUE_TO_ULL(tokens[3], &thread_ts_msec);
- if (thread_ts_msec > (*instance)->most_recent_flow_time)
- {
- (*instance)->most_recent_flow_time = thread_ts_msec;
- }
- }
-
- if (tokens[4] != NULL)
+ if (tokens[TOKEN_FLOW_LAST_SEEN] != NULL)
{
nDPIsrvd_ull flow_last_seen;
- TOKEN_VALUE_TO_ULL(tokens[4], &flow_last_seen);
+ TOKEN_VALUE_TO_ULL(tokens[TOKEN_FLOW_LAST_SEEN], &flow_last_seen);
flow->last_seen = flow_last_seen;
}
- if (tokens[5] != NULL)
+ if (tokens[TOKEN_FLOW_IDLE_TIME] != NULL)
{
- nDPIsrvd_ull flow_idle_time = 0;
- TOKEN_VALUE_TO_ULL(tokens[5], &flow_idle_time);
+ nDPIsrvd_ull flow_idle_time;
+ TOKEN_VALUE_TO_ULL(tokens[TOKEN_FLOW_IDLE_TIME], &flow_idle_time);
flow->idle_time = flow_idle_time;
}
@@ -885,27 +1015,39 @@ static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket *
static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_thread_data * const thread_data,
struct nDPIsrvd_flow * const current_flow)
{
- if (instance == NULL || current_flow == NULL)
+ struct nDPIsrvd_json_token const * const tokens[] = {TOKEN_GET_SZ(sock, "daemon_event_name"),
+ TOKEN_GET_SZ(sock, "flow_event_name")};
+ enum
+ {
+ TOKEN_DAEMON_EVENT_NAME = 0,
+ TOKEN_FLOW_EVENT_NAME
+ };
+
+ if (instance == NULL)
{
return 0;
}
- struct nDPIsrvd_json_token const * const daemon_event_name = TOKEN_GET_SZ(sock, "daemon_event_name");
- if (TOKEN_VALUE_EQUALS_SZ(daemon_event_name, "init") != 0)
+ if (TOKEN_VALUE_EQUALS_SZ(tokens[TOKEN_DAEMON_EVENT_NAME], "init") != 0)
{
- nDPIsrvd_cleanup_instance(sock, instance, CLEANUP_REASON_DAEMON_INIT);
+ nDPIsrvd_cleanup_flows(sock, instance, thread_data, CLEANUP_REASON_DAEMON_INIT);
}
- if (TOKEN_VALUE_EQUALS_SZ(daemon_event_name, "shutdown") != 0)
+ if (TOKEN_VALUE_EQUALS_SZ(tokens[TOKEN_DAEMON_EVENT_NAME], "shutdown") != 0)
{
- nDPIsrvd_cleanup_instance(sock, instance, CLEANUP_REASON_DAEMON_SHUTDOWN);
+ nDPIsrvd_cleanup_flows(sock, instance, thread_data, CLEANUP_REASON_DAEMON_SHUTDOWN);
+ }
+
+ if (current_flow == NULL)
+ {
+ return 0;
}
- struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name");
int is_idle_flow;
- if ((is_idle_flow = TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle")) != 0 ||
- TOKEN_VALUE_EQUALS_SZ(flow_event_name, "end") != 0)
+ if ((is_idle_flow = TOKEN_VALUE_EQUALS_SZ(tokens[TOKEN_FLOW_EVENT_NAME], "idle")) != 0 ||
+ TOKEN_VALUE_EQUALS_SZ(tokens[TOKEN_FLOW_EVENT_NAME], "end") != 0)
{
#ifdef ENABLE_MEMORY_PROFILING
nDPIsrvd_memprof_log("Flow %llu deleted: %zu bytes.",
@@ -914,19 +1056,25 @@ static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock,
#endif
nDPIsrvd_cleanup_flow(sock,
instance,
+ thread_data,
current_flow,
(is_idle_flow != 0 ? CLEANUP_REASON_FLOW_IDLE : CLEANUP_REASON_FLOW_END));
}
- else if (current_flow->last_seen + current_flow->idle_time < instance->most_recent_flow_time)
+ else if (thread_data != NULL &&
+ current_flow->last_seen + current_flow->idle_time < thread_data->most_recent_flow_time)
{
#ifdef ENABLE_MEMORY_PROFILING
- nDPIsrvd_memprof_log("Flow %llu timed out: %zu bytes. Last seen [%llu] + idle time [%llu] < most recent flow time [%llu]. Diff: [%llu]",
- current_flow->id_as_ull,
- sizeof(*current_flow) + sock->flow_user_data_size,
- current_flow->last_seen, current_flow->idle_time, instance->most_recent_flow_time,
- instance->most_recent_flow_time - (current_flow->last_seen + current_flow->idle_time));
+ nDPIsrvd_memprof_log(
+ "Flow %llu timed out: %zu bytes. Last seen [%llu] + idle time [%llu] < most recent flow time [%llu]. Diff: "
+ "[%llu]",
+ current_flow->id_as_ull,
+ sizeof(*current_flow) + sock->flow_user_data_size,
+ current_flow->last_seen,
+ current_flow->idle_time,
+ thread_data->most_recent_flow_time,
+ thread_data->most_recent_flow_time - (current_flow->last_seen + current_flow->idle_time));
#endif
- nDPIsrvd_cleanup_flow(sock, instance, current_flow, CLEANUP_REASON_FLOW_TIMEOUT);
+ nDPIsrvd_cleanup_flow(sock, instance, thread_data, current_flow, CLEANUP_REASON_FLOW_TIMEOUT);
}
return 0;
@@ -1068,13 +1216,14 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_sock
}
struct nDPIsrvd_instance * instance = NULL;
+ struct nDPIsrvd_thread_data * thread_data = NULL;
struct nDPIsrvd_flow * flow = NULL;
- flow = nDPIsrvd_get_flow(sock, &instance);
- if (ret == PARSE_OK && sock->json_callback(sock, instance, flow) != CALLBACK_OK)
+ flow = nDPIsrvd_get_flow(sock, &instance, &thread_data);
+ if (ret == PARSE_OK && sock->json_callback(sock, instance, thread_data, flow) != CALLBACK_OK)
{
ret = PARSE_JSON_CALLBACK_ERROR;
}
- if (nDPIsrvd_check_flow_end(sock, instance, flow) != 0)
+ if (nDPIsrvd_check_flow_end(sock, instance, thread_data, flow) != 0)
{
ret = PARSE_FLOW_MGMT_ERROR;
}
@@ -1168,7 +1317,9 @@ static inline void nDPIsrvd_uthash_free(void * const freeable, size_t const size
#endif
static inline int nDPIsrvd_verify_flows(struct nDPIsrvd_instance * const instance,
- void (*verify_cb)(struct nDPIsrvd_flow const *, void * user_data),
+ void (*verify_cb)(struct nDPIsrvd_thread_data const * const,
+ struct nDPIsrvd_flow const *,
+ void * user_data),
void * user_data)
{
int retval = 0;
@@ -1177,11 +1328,30 @@ static inline int nDPIsrvd_verify_flows(struct nDPIsrvd_instance * const instanc
HASH_ITER(hh, instance->flow_table, current_flow, ftmp)
{
- if (current_flow->last_seen + current_flow->idle_time < instance->most_recent_flow_time)
+ struct nDPIsrvd_thread_data * current_thread_data;
+
+ HASH_FIND_INT(instance->thread_data_table, &current_flow->thread_id, current_thread_data);
+ if (current_thread_data == NULL)
+ {
+ if (verify_cb != NULL)
+ {
+ verify_cb(current_thread_data, current_flow, user_data);
+ }
+ retval = 1;
+ }
+ else if (current_flow->thread_id != current_thread_data->thread_key)
+ {
+ if (verify_cb != NULL)
+ {
+ verify_cb(current_thread_data, current_flow, user_data);
+ }
+ retval = 1;
+ }
+ else if (current_flow->last_seen + current_flow->idle_time < current_thread_data->most_recent_flow_time)
{
if (verify_cb != NULL)
{
- verify_cb(current_flow, user_data);
+ verify_cb(current_thread_data, current_flow, user_data);
}
retval = 1;
}
@@ -1191,11 +1361,16 @@ static inline int nDPIsrvd_verify_flows(struct nDPIsrvd_instance * const instanc
}
static inline void nDPIsrvd_flow_info(struct nDPIsrvd_socket const * const sock,
- void (*info_cb)(struct nDPIsrvd_flow const *, void * user_data),
+ void (*info_cb)(struct nDPIsrvd_socket const *,
+ struct nDPIsrvd_instance const *,
+ struct nDPIsrvd_thread_data const *,
+ struct nDPIsrvd_flow const *,
+ void *),
void * user_data)
{
struct nDPIsrvd_instance const * current_instance;
struct nDPIsrvd_instance const * itmp;
+ struct nDPIsrvd_thread_data * current_thread_data;
struct nDPIsrvd_flow const * current_flow;
struct nDPIsrvd_flow const * ftmp;
@@ -1207,7 +1382,8 @@ static inline void nDPIsrvd_flow_info(struct nDPIsrvd_socket const * const sock,
{
HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp)
{
- info_cb(current_flow, user_data);
+ HASH_FIND_INT(current_instance->thread_data_table, &current_flow->thread_id, current_thread_data);
+ info_cb(sock, current_instance, current_thread_data, current_flow, user_data);
}
}
}