diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-12-15 23:25:32 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2022-01-20 00:50:38 +0100 |
commit | 9e07a57566cc45bf92a845d8cee968d72e0f314e (patch) | |
tree | 8f1a6bfd08bd68a5253fadf3a01beecda77b1c95 /dependencies/nDPIsrvd.h | |
parent | a35fc1d5ea8570609cc0c8cf6edadc81f8f5bb76 (diff) |
Major nDPId extension. Sorry for the huge commit.
- nDPId: fixed invalid IP4/IP6 tuple compare
- nDPIsrvd: fixed caching issue (finally)
- added tiny c example (can be used to check flow manager sanity)
- c-captured: use flow_last_seen timestamp from `struct nDPIsrvd_flow`
- README.md update: added example JSON sequence
- nDPId: added new flow event `update` necessary for correct
timeout handling (and other future use-cases)
- nDPIsrvd.h and nDPIsrvd.py: switched to an instance
(consists of an alias/source tuple) based flow manager
- every flow related event **must** now serialize `alias`, `source`,
`flow_id`, `flow_last_seen` and `flow_idle_time` to make the timeout
handling and verification process work correctly
- nDPIsrvd.h: ability to profile any dynamic memory (de-)allocation
- nDPIsrvd.py: removed PcapPacket class (unused)
- py-flow-dashboard and py-flow-multiprocess: fixed race condition
- py-flow-info: print statusbar with probably useful information
- nDPId/nDPIsrvd.h: switched from packet-flow only timestamps (`pkt_*sec`)
to a generic flow event timestamp `ts_msec`
- nDPId-test: added additional checks
- nDPId: increased ICMP flow timeout
- nDPId: using event based i/o if capturing packets from a device
- nDPIsrvd: fixed memory leak on shutdown if remote descriptors
were still connected
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
Diffstat (limited to 'dependencies/nDPIsrvd.h')
-rw-r--r-- | dependencies/nDPIsrvd.h | 457 |
1 files changed, 351 insertions, 106 deletions
diff --git a/dependencies/nDPIsrvd.h b/dependencies/nDPIsrvd.h index 33dd1a217..535941aa1 100644 --- a/dependencies/nDPIsrvd.h +++ b/dependencies/nDPIsrvd.h @@ -12,14 +12,20 @@ #include <sys/un.h> #include <unistd.h> +#ifndef JSMN_PARENT_LINKS +#define JSMN_PARENT_LINKS 1 +#endif + #include "config.h" #include "jsmn.h" #include "utarray.h" #include "uthash.h" +#ifdef ENABLE_MEMORY_PROFILING +#include <stdarg.h> +#endif + #define nDPIsrvd_MAX_JSON_TOKENS 128 -#define nDPIsrvd_FLOW_KEY_TOKENS 3 -#define nDPIsrvd_FLOW_KEY_STRLEN 24 #define nDPIsrvd_JSON_KEY_STRLEN 32 #define nDPIsrvd_STRLEN_SZ(s) (sizeof(s) / sizeof(s[0]) - sizeof(s[0])) @@ -32,7 +38,7 @@ #define TOKEN_VALUE_TO_ULL(token, value) token_value_to_ull(token, value) #define FIRST_ENUM_VALUE 1 -#define LAST_ENUM_VALUE CONVERSION_LAST_ENUM_VALUE +#define LAST_ENUM_VALUE CLEANUP_REASON_LAST_ENUM_VALUE enum nDPIsrvd_connect_return { @@ -87,41 +93,68 @@ enum nDPIsrvd_conversion_return CONVERSION_LAST_ENUM_VALUE }; -typedef unsigned long long int nDPIsrvd_ull; -typedef nDPIsrvd_ull * nDPIsrvd_ull_ptr; - -struct nDPIsrvd_flow_key +enum nDPIsrvd_cleanup_reason { - char key[nDPIsrvd_FLOW_KEY_STRLEN]; + CLEANUP_REASON_DAEMON_INIT = CONVERSION_LAST_ENUM_VALUE, // can happen if kill -SIGKILL $(pidof nDPId) or restart + // after SIGSEGV + CLEANUP_REASON_DAEMON_SHUTDOWN, // graceful shutdown e.g. kill -SIGTERM $(pidof nDPId) + CLEANUP_REASON_FLOW_END, + CLEANUP_REASON_FLOW_IDLE, + CLEANUP_REASON_FLOW_TIMEOUT, + CLEANUP_REASON_APP_SHUTDOWN, + + CLEANUP_REASON_LAST_ENUM_VALUE }; +typedef unsigned long long int nDPIsrvd_ull; +typedef nDPIsrvd_ull * nDPIsrvd_ull_ptr; +typedef int nDPIsrvd_hashkey; + struct nDPIsrvd_flow { - struct nDPIsrvd_flow_key flow_key; + nDPIsrvd_hashkey flow_key; nDPIsrvd_ull id_as_ull; + nDPIsrvd_ull last_seen; + nDPIsrvd_ull idle_time; UT_hash_handle hh; uint8_t flow_user_data[0]; }; +struct nDPIsrvd_instance +{ + nDPIsrvd_hashkey alias_source_key; + nDPIsrvd_ull most_recent_flow_time; + struct nDPIsrvd_flow * flow_table; + UT_hash_handle hh; +}; + struct nDPIsrvd_json_token { char key[nDPIsrvd_JSON_KEY_STRLEN]; - UT_hash_handle hh; char const * value; int key_length; int value_length; int token_index; + UT_hash_handle hh; }; struct nDPIsrvd_socket; +static inline void * nDPIsrvd_calloc(size_t const n, size_t const size); +static inline void * nDPIsrvd_malloc(size_t const size); +static inline void nDPIsrvd_free(void * const freeable); #ifdef ENABLE_MEMORY_PROFILING static inline void * nDPIsrvd_uthash_malloc(size_t const size); static inline void nDPIsrvd_uthash_free(void * const freeable, size_t const size); +extern void nDPIsrvd_memprof_log(char const * const format, ...); #endif typedef enum nDPIsrvd_callback_return (*json_callback)(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, struct nDPIsrvd_flow * const flow); -typedef void (*flow_end_callback)(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_flow * const flow); +typedef void (*flow_cleanup_callback)(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_flow * const flow, + enum nDPIsrvd_cleanup_reason reason); struct nDPIsrvd_address { @@ -162,9 +195,9 @@ struct nDPIsrvd_socket struct nDPIsrvd_address address; size_t flow_user_data_size; - struct nDPIsrvd_flow * flow_table; + struct nDPIsrvd_instance * instance_table; json_callback json_callback; - flow_end_callback flow_end_callback; + flow_cleanup_callback flow_cleanup_callback; struct nDPIsrvd_buffer buffer; struct nDPIsrvd_jsmn jsmn; @@ -180,7 +213,7 @@ struct nDPIsrvd_socket uint8_t global_user_data[0]; }; -static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock); +static inline void nDPIsrvd_socket_free(struct nDPIsrvd_socket ** const sock); /* Slightly modified code: https://en.wikibooks.org/wiki/Algorithm_Implementation/Miscellaneous/Base64 */ #define WHITESPACE 64 @@ -286,6 +319,13 @@ static inline char const * nDPIsrvd_enum_to_string(int enum_value) "CONVERSION_NOT_A_NUMBER", "CONVERSION_RANGE_EXCEEDED", + "CLEANUP_REASON_DAEMON_INIT", + "CLEANUP_REASON_DAEMON_SHUTDOWN", + "CLEANUP_REASON_FLOW_END", + "CLEANUP_REASON_FLOW_IDLE", + "CLEANUP_REASON_FLOW_TIMEOUT", + "CLEANUP_REASON_APP_SHUTDOWN", + [LAST_ENUM_VALUE] = "LAST_ENUM_VALUE"}; if (enum_value < FIRST_ENUM_VALUE || enum_value >= LAST_ENUM_VALUE) @@ -298,12 +338,12 @@ static inline char const * nDPIsrvd_enum_to_string(int enum_value) static inline int nDPIsrvd_buffer_init(struct nDPIsrvd_buffer * const buffer, size_t buffer_size) { - if (buffer->ptr.raw != NULL && buffer->max != buffer_size) + if (buffer->ptr.raw != NULL) { return 1; /* Do not fail and realloc()? */ } - buffer->ptr.raw = (uint8_t *)malloc(buffer_size); + buffer->ptr.raw = (uint8_t *)nDPIsrvd_malloc(buffer_size); if (buffer->ptr.raw == NULL) { return 1; @@ -320,17 +360,17 @@ static inline int nDPIsrvd_buffer_init(struct nDPIsrvd_buffer * const buffer, si static inline void nDPIsrvd_buffer_free(struct nDPIsrvd_buffer * const buffer) { - free(buffer->ptr.raw); + nDPIsrvd_free(buffer->ptr.raw); buffer->ptr.raw = NULL; } -static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_size, - size_t flow_user_data_size, - json_callback json_cb, - flow_end_callback flow_end_cb) +static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_data_size, + size_t flow_user_data_size, + json_callback json_cb, + flow_cleanup_callback flow_cleanup_callback_cb) { static const UT_icd packet_data_icd = {sizeof(struct nDPIsrvd_json_token), NULL, NULL, NULL}; - struct nDPIsrvd_socket * sock = (struct nDPIsrvd_socket *)malloc(sizeof(*sock) + global_user_data_size); + struct nDPIsrvd_socket * sock = (struct nDPIsrvd_socket *)nDPIsrvd_calloc(1, sizeof(*sock) + global_user_data_size); if (json_cb == NULL) { @@ -339,8 +379,6 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_siz if (sock != NULL) { - memset(sock, 0, sizeof(*sock)); - sock->fd = -1; if (nDPIsrvd_buffer_init(&sock->buffer, NETWORK_BUFFER_MAX_SIZE) != 0) { @@ -350,7 +388,7 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_siz sock->flow_user_data_size = flow_user_data_size; sock->json_callback = json_cb; - sock->flow_end_callback = flow_end_cb; + sock->flow_cleanup_callback = flow_cleanup_callback_cb; utarray_new(sock->json.tokens, &packet_data_icd); if (sock->json.tokens == NULL) @@ -365,14 +403,49 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_siz return sock; error: nDPIsrvd_buffer_free(&sock->buffer); - nDPIsrvd_free(&sock); + nDPIsrvd_socket_free(&sock); return NULL; } -static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock) +static inline void nDPIsrvd_cleanup_flow(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_flow * const flow, + enum nDPIsrvd_cleanup_reason reason) +{ + if (sock->flow_cleanup_callback != NULL) + { + sock->flow_cleanup_callback(sock, instance, flow, reason); + } + HASH_DEL(instance->flow_table, flow); + nDPIsrvd_free(flow); +} + +static inline void nDPIsrvd_cleanup_instance(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, + enum nDPIsrvd_cleanup_reason reason) { struct nDPIsrvd_flow * current_flow; struct nDPIsrvd_flow * ftmp; + + if (instance != NULL) + { + if (instance->flow_table != NULL) + { + HASH_ITER(hh, instance->flow_table, current_flow, ftmp) + { + nDPIsrvd_cleanup_flow(sock, instance, current_flow, reason); + } + instance->flow_table = NULL; + } + HASH_DEL(sock->instance_table, instance); + nDPIsrvd_free(instance); + } +} + +static inline void nDPIsrvd_socket_free(struct nDPIsrvd_socket ** const sock) +{ + struct nDPIsrvd_instance * current_instance; + struct nDPIsrvd_instance * itmp; struct nDPIsrvd_json_token * current_json_token; struct nDPIsrvd_json_token * jtmp; @@ -395,22 +468,13 @@ static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock) utarray_free((*sock)->json.tokens); } - if ((*sock)->flow_table != NULL) + HASH_ITER(hh, (*sock)->instance_table, current_instance, itmp) { - HASH_ITER(hh, (*sock)->flow_table, current_flow, ftmp) - { - if ((*sock)->flow_end_callback != NULL) - { - (*sock)->flow_end_callback(*sock, current_flow); - } - HASH_DEL((*sock)->flow_table, current_flow); - free(current_flow); - } - (*sock)->flow_table = NULL; + nDPIsrvd_cleanup_instance(*sock, current_instance, CLEANUP_REASON_APP_SHUTDOWN); } - + (*sock)->instance_table = NULL; nDPIsrvd_buffer_free(&(*sock)->buffer); - free(*sock); + nDPIsrvd_free(*sock); *sock = NULL; } @@ -418,8 +482,8 @@ static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock) static inline int nDPIsrvd_setup_address(struct nDPIsrvd_address * const address, char const * const destination) { size_t len = strlen(destination); - char * first_colon = strchr(destination, ':'); - char * last_colon = strrchr(destination, ':'); + char const * first_colon = strchr(destination, ':'); + char const * last_colon = strrchr(destination, ':'); memset(address, 0, sizeof(*address)); @@ -505,7 +569,9 @@ static inline enum nDPIsrvd_connect_return nDPIsrvd_connect(struct nDPIsrvd_sock static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * const sock) { - ssize_t bytes_read = read(sock->fd, sock->buffer.ptr.raw + sock->buffer.used, sock->buffer.max - sock->buffer.used); + ssize_t bytes_read = read(sock->fd, + sock->buffer.ptr.raw + sock->buffer.used, + sock->buffer.max - sock->buffer.used); if (bytes_read == 0) { @@ -663,88 +729,191 @@ static inline enum nDPIsrvd_conversion_return token_value_to_ull(struct nDPIsrvd return str_value_to_ull(token->value, value); } -static inline int nDPIsrvd_build_flow_key(struct nDPIsrvd_flow_key * const key, - struct nDPIsrvd_json_token const * const tokens[nDPIsrvd_FLOW_KEY_TOKENS]) +static nDPIsrvd_hashkey nDPIsrvd_build_key(char const * str, int len) +{ + uint32_t hash = 5381; + uint32_t c; + + while (len-- > 0 && (c = *str++) != 0) + { + hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ + } + + return hash; +} + +static inline int nDPIsrvd_build_instance_key(struct nDPIsrvd_json_token const * const alias, + struct nDPIsrvd_json_token const * const source, + nDPIsrvd_hashkey * const alias_source_key) { - if (tokens[0] == NULL || tokens[1] == NULL || tokens[2] == NULL) + if (alias == NULL || source == NULL) { return 1; } - if (snprintf(key->key, - nDPIsrvd_FLOW_KEY_STRLEN, - "%.*s-%.*s-%.*s", - tokens[0]->value_length, - tokens[0]->value, - tokens[1]->value_length, - tokens[1]->value, - tokens[2]->value_length, - tokens[2]->value) <= 0) + *alias_source_key = nDPIsrvd_build_key(alias->value, alias->value_length); + *alias_source_key ^= nDPIsrvd_build_key(source->value, source->value_length); + + return 0; +} + +static inline int nDPIsrvd_build_flow_key(struct nDPIsrvd_json_token const * const flow_id_token, + nDPIsrvd_hashkey * const flow_key) +{ + if (flow_id_token == NULL) { return 1; } + *flow_key = nDPIsrvd_build_key(flow_id_token->value, flow_id_token->value_length); + return 0; } -static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * const sock, - struct nDPIsrvd_json_token const * const flow_id) +static inline struct nDPIsrvd_instance * nDPIsrvd_get_instance(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_json_token const * const alias, + struct nDPIsrvd_json_token const * const source) { - struct nDPIsrvd_json_token const * const tokens[nDPIsrvd_FLOW_KEY_TOKENS] = { - flow_id, - TOKEN_GET_SZ(sock, "alias"), - TOKEN_GET_SZ(sock, "source"), - }; - struct nDPIsrvd_flow_key key = {}; + struct nDPIsrvd_instance * instance; + nDPIsrvd_hashkey alias_source_key; - if (nDPIsrvd_build_flow_key(&key, tokens) != 0) + if (nDPIsrvd_build_instance_key(alias, source, &alias_source_key) != 0) { return NULL; } - struct nDPIsrvd_flow * flow = NULL; - HASH_FIND(hh, sock->flow_table, &key, sizeof(key), flow); + HASH_FIND_INT(sock->instance_table, &alias_source_key, instance); + + if (instance == NULL) + { + instance = (struct nDPIsrvd_instance *)nDPIsrvd_calloc(1, sizeof(*instance)); + if (instance == NULL) + { + return NULL; + } + + instance->alias_source_key = alias_source_key; + HASH_ADD_INT(sock->instance_table, alias_source_key, instance); +#ifdef ENABLE_MEMORY_PROFILING + nDPIsrvd_memprof_log("Instance alias \"%.*s\" with source \"%.*s\" added: %zu bytes.", + alias->value_length, + alias->value, + source->value_length, + source->value, + sizeof(*instance)); +#endif + } + + return instance; +} + +static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance ** const instance) +{ + struct nDPIsrvd_flow * flow; + struct nDPIsrvd_json_token const * const tokens[] = {TOKEN_GET_SZ(sock, "alias"), + TOKEN_GET_SZ(sock, "source"), + TOKEN_GET_SZ(sock, "flow_id"), + TOKEN_GET_SZ(sock, "ts_msec"), + TOKEN_GET_SZ(sock, "flow_last_seen"), + TOKEN_GET_SZ(sock, "flow_idle_time")}; + nDPIsrvd_hashkey flow_key; + + *instance = nDPIsrvd_get_instance(sock, tokens[0], tokens[1]); + if (*instance == NULL || nDPIsrvd_build_flow_key(tokens[2], &flow_key) != 0) + { + return NULL; + } + + HASH_FIND_INT((*instance)->flow_table, &flow_key, flow); if (flow == NULL) { - flow = (struct nDPIsrvd_flow *)calloc(1, sizeof(*flow) + sock->flow_user_data_size); + flow = (struct nDPIsrvd_flow *)nDPIsrvd_calloc(1, sizeof(*flow) + sock->flow_user_data_size); if (flow == NULL) { return NULL; } - TOKEN_VALUE_TO_ULL(tokens[0], &flow->id_as_ull); - memcpy(flow->flow_key.key, key.key, nDPIsrvd_FLOW_KEY_STRLEN); - HASH_ADD(hh, sock->flow_table, flow_key, sizeof(flow->flow_key), flow); + flow->flow_key = flow_key; + TOKEN_VALUE_TO_ULL(tokens[2], &flow->id_as_ull); + HASH_ADD_INT((*instance)->flow_table, flow_key, flow); #ifdef ENABLE_MEMORY_PROFILING - printf("Flow %llu added: %zu bytes.\n", flow->id_as_ull, sizeof(*flow) + sock->flow_user_data_size); + nDPIsrvd_memprof_log("Flow %llu added: %zu bytes.", flow->id_as_ull, sizeof(*flow) + sock->flow_user_data_size); #endif } + if (tokens[3] != NULL) + { + nDPIsrvd_ull ts_msec; + TOKEN_VALUE_TO_ULL(tokens[3], &ts_msec); + if (ts_msec > (*instance)->most_recent_flow_time) + { + (*instance)->most_recent_flow_time = ts_msec; + } + } + + if (tokens[4] != NULL) + { + nDPIsrvd_ull flow_last_seen; + TOKEN_VALUE_TO_ULL(tokens[4], &flow_last_seen); + flow->last_seen = flow_last_seen; + } + + if (tokens[5] != NULL) + { + nDPIsrvd_ull flow_idle_time = 0; + TOKEN_VALUE_TO_ULL(tokens[5], &flow_idle_time); + flow->idle_time = flow_idle_time; + } + return flow; } static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, struct nDPIsrvd_flow * const current_flow) { - if (current_flow == NULL) + if (instance == NULL || current_flow == NULL) { - return 1; + return 0; } - struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name"); + struct nDPIsrvd_json_token const * const daemon_event_name = TOKEN_GET_SZ(sock, "daemon_event_name"); + if (TOKEN_VALUE_EQUALS_SZ(daemon_event_name, "init") != 0) + { + nDPIsrvd_cleanup_instance(sock, instance, CLEANUP_REASON_DAEMON_INIT); + } + if (TOKEN_VALUE_EQUALS_SZ(daemon_event_name, "shutdown") != 0) + { + nDPIsrvd_cleanup_instance(sock, instance, CLEANUP_REASON_DAEMON_SHUTDOWN); + } - if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle") != 0 || TOKEN_VALUE_EQUALS_SZ(flow_event_name, "end") != 0) + struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name"); + int is_idle_flow; + if ((is_idle_flow = TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle")) != 0 || + TOKEN_VALUE_EQUALS_SZ(flow_event_name, "end") != 0) { - if (sock->flow_end_callback != NULL) - { - sock->flow_end_callback(sock, current_flow); - } - HASH_DEL(sock->flow_table, current_flow); #ifdef ENABLE_MEMORY_PROFILING - printf("Flow %llu deleted: %zu bytes.\n", current_flow->id_as_ull, sizeof(*current_flow) + sock->flow_user_data_size); + nDPIsrvd_memprof_log("Flow %llu deleted: %zu bytes.", + current_flow->id_as_ull, + sizeof(*current_flow) + sock->flow_user_data_size); #endif - free(current_flow); + nDPIsrvd_cleanup_flow(sock, + instance, + current_flow, + (is_idle_flow != 0 ? CLEANUP_REASON_FLOW_IDLE : CLEANUP_REASON_FLOW_END)); + } + else if (current_flow->last_seen + current_flow->idle_time < instance->most_recent_flow_time) + { +#ifdef ENABLE_MEMORY_PROFILING + nDPIsrvd_memprof_log("Flow %llu timed out: %zu bytes. Last seen [%llu] + idle time [%llu] < most recent flow time [%llu]. Diff: [%llu]", + current_flow->id_as_ull, + sizeof(*current_flow) + sock->flow_user_data_size, + current_flow->last_seen, current_flow->idle_time, instance->most_recent_flow_time, + instance->most_recent_flow_time - (current_flow->last_seen + current_flow->idle_time)); +#endif + nDPIsrvd_cleanup_flow(sock, instance, current_flow, CLEANUP_REASON_FLOW_TIMEOUT); } return 0; @@ -813,9 +982,9 @@ static void nDPIsrvd_drain_buffer(struct nDPIsrvd_buffer * const buffer) static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_socket * const sock) { - enum nDPIsrvd_parse_return ret; + enum nDPIsrvd_parse_return ret = PARSE_OK; - while ((ret = nDPIsrvd_parse_line(&sock->buffer, &sock->jsmn)) == PARSE_OK) + while (ret == PARSE_OK && (ret = nDPIsrvd_parse_line(&sock->buffer, &sock->jsmn)) == PARSE_OK) { char const * key = NULL; int key_length = 0; @@ -852,8 +1021,8 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_sock { struct nDPIsrvd_json_token jt = {.value = jsmn_token_get(sock, current_token), .value_length = jsmn_token_size(sock, current_token), - .hh = {}, - .token_index = current_token - 1}; + .token_index = current_token - 1, + .hh = {}}; if (key == NULL || key_length > nDPIsrvd_JSON_KEY_STRLEN || utarray_len(sock->json.tokens) == nDPIsrvd_MAX_JSON_TOKENS) @@ -875,21 +1044,14 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_sock } } - struct nDPIsrvd_json_token const * const flow_id = TOKEN_GET_SZ(sock, "flow_id"); + struct nDPIsrvd_instance * instance = NULL; struct nDPIsrvd_flow * flow = NULL; - if (flow_id != NULL) - { - flow = nDPIsrvd_get_flow(sock, flow_id); - if (flow == NULL) - { - ret = PARSE_FLOW_MGMT_ERROR; - } - } - if (ret == PARSE_OK && sock->json_callback(sock, flow) != CALLBACK_OK) + flow = nDPIsrvd_get_flow(sock, &instance); + if (ret == PARSE_OK && sock->json_callback(sock, instance, flow) != CALLBACK_OK) { ret = PARSE_JSON_CALLBACK_ERROR; } - if (flow_id != NULL && nDPIsrvd_check_flow_end(sock, flow) != 0) + if (nDPIsrvd_check_flow_end(sock, instance, flow) != 0) { ret = PARSE_FLOW_MGMT_ERROR; } @@ -912,6 +1074,55 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_sock return ret; } +static inline void * nDPIsrvd_calloc(size_t const n, size_t const size) +{ + void * p = nDPIsrvd_malloc(n * size); + + if (p == NULL) + { + return NULL; + } + memset(p, 0, n * size); + + return p; +} + +static inline void * nDPIsrvd_malloc(size_t const size) +{ + void * p = malloc(sizeof(uint64_t) + size); + + if (p == NULL) + { + return NULL; + } + + *(uint64_t *)p = size; +#ifdef ENABLE_MEMORY_PROFILING + nDPIsrvd_memprof_log("malloc(%zu)", size); +#endif + + return (uint8_t *)p + sizeof(uint64_t); +} + +static inline void nDPIsrvd_free(void * const freeable) +{ + void * p; + + if (freeable == NULL) + { + return; + } + + p = (uint8_t *)freeable - sizeof(uint64_t); + +#ifdef ENABLE_MEMORY_PROFILING + size_t size = *(uint64_t *)p; + nDPIsrvd_memprof_log("free(%zu)", size); +#endif + + free(p); +} + #ifdef ENABLE_MEMORY_PROFILING static inline void * nDPIsrvd_uthash_malloc(size_t const size) { @@ -921,28 +1132,62 @@ static inline void * nDPIsrvd_uthash_malloc(size_t const size) { return NULL; } - printf("malloc(%zu)\n", size); + nDPIsrvd_memprof_log("uthash malloc(%zu)", size); return p; } static inline void nDPIsrvd_uthash_free(void * const freeable, size_t const size) { - printf("free(%zu)\n", size); + nDPIsrvd_memprof_log("uthash free(%zu)", size); free(freeable); } #endif -static inline void nDPIsrvd_write_flow_info(int outfd, struct nDPIsrvd_socket const * const sock, void (*write_cb)(int outfd, struct nDPIsrvd_flow *)) +static inline int nDPIsrvd_verify_flows(struct nDPIsrvd_instance * const instance, + void (*verify_cb)(struct nDPIsrvd_flow const *, void * user_data), + void * user_data) { - struct nDPIsrvd_flow * current_flow; - struct nDPIsrvd_flow * ftmp; + int retval = 0; + struct nDPIsrvd_flow const * current_flow; + struct nDPIsrvd_flow const * ftmp; - HASH_ITER(hh, sock->flow_table, current_flow, ftmp) + HASH_ITER(hh, instance->flow_table, current_flow, ftmp) { - dprintf(outfd, "[Flow %4llu]", current_flow->id_as_ull); - write_cb(outfd, current_flow); - dprintf(outfd, "%c", '\n'); + if (current_flow->last_seen + current_flow->idle_time < instance->most_recent_flow_time) + { + if (verify_cb != NULL) + { + verify_cb(current_flow, user_data); + } + retval = 1; + } + } + + return retval; +} + +static inline void nDPIsrvd_flow_info(struct nDPIsrvd_socket const * const sock, + void (*info_cb)(struct nDPIsrvd_flow const *, void * user_data), + void * user_data) +{ + struct nDPIsrvd_instance const * current_instance; + struct nDPIsrvd_instance const * itmp; + struct nDPIsrvd_flow const * current_flow; + struct nDPIsrvd_flow const * ftmp; + + if (sock->instance_table != NULL) + { + HASH_ITER(hh, sock->instance_table, current_instance, itmp) + { + if (current_instance->flow_table != NULL) + { + HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp) + { + info_cb(current_flow, user_data); + } + } + } } } |