diff options
Diffstat (limited to 'dependencies/nDPIsrvd.h')
-rw-r--r-- | dependencies/nDPIsrvd.h | 288 |
1 files changed, 232 insertions, 56 deletions
diff --git a/dependencies/nDPIsrvd.h b/dependencies/nDPIsrvd.h index 1e732f5c6..44bd4703c 100644 --- a/dependencies/nDPIsrvd.h +++ b/dependencies/nDPIsrvd.h @@ -53,7 +53,7 @@ enum nDPIsrvd_read_return { READ_OK = CONNECT_LAST_ENUM_VALUE, READ_PEER_DISCONNECT, - READ_ERROR, + READ_ERROR, /* check for errno */ READ_LAST_ENUM_VALUE }; @@ -118,18 +118,28 @@ struct nDPIsrvd_flow { nDPIsrvd_hashkey flow_key; nDPIsrvd_ull id_as_ull; + nDPIsrvd_hashkey thread_id; nDPIsrvd_ull last_seen; nDPIsrvd_ull idle_time; UT_hash_handle hh; uint8_t flow_user_data[0]; }; +struct nDPIsrvd_thread_data +{ + nDPIsrvd_hashkey thread_key; + nDPIsrvd_ull most_recent_flow_time; + UT_hash_handle hh; + uint8_t thread_user_data[0]; +}; + struct nDPIsrvd_instance { nDPIsrvd_hashkey alias_source_key; - nDPIsrvd_ull most_recent_flow_time; struct nDPIsrvd_flow * flow_table; + struct nDPIsrvd_thread_data * thread_data_table; UT_hash_handle hh; + uint8_t instance_user_data[0]; }; struct nDPIsrvd_json_token @@ -154,9 +164,14 @@ extern void nDPIsrvd_memprof_log(char const * const format, ...); typedef enum nDPIsrvd_callback_return (*json_callback)(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_thread_data * const thread_data, struct nDPIsrvd_flow * const flow); +typedef void (*instance_cleanup_callback)(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, + enum nDPIsrvd_cleanup_reason reason); typedef void (*flow_cleanup_callback)(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_thread_data * const thread_data, struct nDPIsrvd_flow * const flow, enum nDPIsrvd_cleanup_reason reason); @@ -198,9 +213,12 @@ struct nDPIsrvd_socket int fd; struct nDPIsrvd_address address; + size_t instance_user_data_size; + size_t thread_user_data_size; size_t flow_user_data_size; struct nDPIsrvd_instance * instance_table; json_callback json_callback; + instance_cleanup_callback instance_cleanup_callback; flow_cleanup_callback flow_cleanup_callback; struct nDPIsrvd_buffer buffer; @@ -373,8 +391,11 @@ static inline void nDPIsrvd_buffer_free(struct nDPIsrvd_buffer * const buffer) } static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_data_size, + size_t instance_user_data_size, + size_t thread_user_data_size, size_t flow_user_data_size, json_callback json_cb, + instance_cleanup_callback instance_cleanup_cb, flow_cleanup_callback flow_cleanup_callback_cb) { static const UT_icd packet_data_icd = {sizeof(struct nDPIsrvd_json_token), NULL, NULL, NULL}; @@ -393,9 +414,13 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_d goto error; } sock->address.raw.sa_family = -1; + + sock->instance_user_data_size = instance_user_data_size; + sock->thread_user_data_size = thread_user_data_size; sock->flow_user_data_size = flow_user_data_size; sock->json_callback = json_cb; + sock->instance_cleanup_callback = instance_cleanup_cb; sock->flow_cleanup_callback = flow_cleanup_callback_cb; utarray_new(sock->json.tokens, &packet_data_icd); @@ -417,34 +442,72 @@ error: static inline void nDPIsrvd_cleanup_flow(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_thread_data * const thread_data, struct nDPIsrvd_flow * const flow, enum nDPIsrvd_cleanup_reason reason) { if (sock->flow_cleanup_callback != NULL) { - sock->flow_cleanup_callback(sock, instance, flow, reason); + sock->flow_cleanup_callback(sock, instance, thread_data, flow, reason); } HASH_DEL(instance->flow_table, flow); nDPIsrvd_free(flow); } +static inline void nDPIsrvd_cleanup_flows(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_thread_data * const thread_data, + enum nDPIsrvd_cleanup_reason reason) +{ + struct nDPIsrvd_flow * current_flow; + struct nDPIsrvd_flow * ftmp; + + if (instance->flow_table != NULL) + { +#ifdef ENABLE_MEMORY_PROFILING + nDPIsrvd_memprof_log("Cleaning up flows for instance 0x%x and thread %d.", + instance->alias_source_key, + thread_data->thread_key); +#endif + + HASH_ITER(hh, instance->flow_table, current_flow, ftmp) + { + if (current_flow->thread_id == thread_data->thread_key) + { + nDPIsrvd_cleanup_flow(sock, instance, thread_data, current_flow, reason); + } + } + } +} + static inline void nDPIsrvd_cleanup_instance(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_instance * const instance, enum nDPIsrvd_cleanup_reason reason) { - struct nDPIsrvd_flow * current_flow; - struct nDPIsrvd_flow * ftmp; + struct nDPIsrvd_thread_data * current_thread_data; + struct nDPIsrvd_thread_data * ttmp; if (instance != NULL) { - if (instance->flow_table != NULL) +#ifdef ENABLE_MEMORY_PROFILING + nDPIsrvd_memprof_log("Cleaning up instance 0x%x.", instance->alias_source_key); +#endif + if (sock->instance_cleanup_callback != NULL) + { + sock->instance_cleanup_callback(sock, instance, reason); + } + + if (instance->thread_data_table != NULL) { - HASH_ITER(hh, instance->flow_table, current_flow, ftmp) + HASH_ITER(hh, instance->thread_data_table, current_thread_data, ttmp) { - nDPIsrvd_cleanup_flow(sock, instance, current_flow, reason); + nDPIsrvd_cleanup_flows(sock, instance, current_thread_data, reason); + HASH_DEL(instance->thread_data_table, current_thread_data); + nDPIsrvd_free(current_thread_data); } - instance->flow_table = NULL; + instance->thread_data_table = NULL; } + HASH_DEL(sock->instance_table, instance); nDPIsrvd_free(instance); } @@ -481,6 +544,7 @@ static inline void nDPIsrvd_socket_free(struct nDPIsrvd_socket ** const sock) nDPIsrvd_cleanup_instance(*sock, current_instance, CLEANUP_REASON_APP_SHUTDOWN); } (*sock)->instance_table = NULL; + nDPIsrvd_buffer_free(&(*sock)->buffer); nDPIsrvd_free(*sock); @@ -582,9 +646,7 @@ static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * c return READ_OK; } - ssize_t bytes_read = read(sock->fd, - sock->buffer.ptr.raw + sock->buffer.used, - sock->buffer.max - sock->buffer.used); + ssize_t bytes_read = read(sock->fd, sock->buffer.ptr.raw + sock->buffer.used, sock->buffer.max - sock->buffer.used); if (bytes_read == 0) { @@ -799,7 +861,7 @@ static inline struct nDPIsrvd_instance * nDPIsrvd_get_instance(struct nDPIsrvd_s if (instance == NULL) { - instance = (struct nDPIsrvd_instance *)nDPIsrvd_calloc(1, sizeof(*instance)); + instance = (struct nDPIsrvd_instance *)nDPIsrvd_calloc(1, sizeof(*instance) + sock->instance_user_data_size); if (instance == NULL) { return NULL; @@ -820,24 +882,100 @@ static inline struct nDPIsrvd_instance * nDPIsrvd_get_instance(struct nDPIsrvd_s return instance; } +static inline struct nDPIsrvd_thread_data * nDPIsrvd_get_thread_data( + struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_json_token const * const thread_id_token, + struct nDPIsrvd_json_token const * const ts_msec_token) +{ + struct nDPIsrvd_thread_data * thread_data; + nDPIsrvd_hashkey thread_id; + + if (thread_id_token == NULL) + { + return NULL; + } + + { + nDPIsrvd_ull thread_key; + TOKEN_VALUE_TO_ULL(thread_id_token, &thread_key); + thread_id = thread_key; + } + + HASH_FIND_INT(instance->thread_data_table, &thread_id, thread_data); + + if (thread_data == NULL) + { + thread_data = + (struct nDPIsrvd_thread_data *)nDPIsrvd_calloc(1, sizeof(*thread_data) + sock->thread_user_data_size); + if (thread_data == NULL) + { + return NULL; + } + + thread_data->thread_key = thread_id; + HASH_ADD_INT(instance->thread_data_table, thread_key, thread_data); +#ifdef ENABLE_MEMORY_PROFILING + nDPIsrvd_memprof_log("Thread Data %d added: %zu bytes.", + thread_data->thread_key, + sizeof(*thread_data) + sock->thread_user_data_size); +#endif + } + + if (ts_msec_token != NULL) + { + nDPIsrvd_ull thread_ts_msec; + TOKEN_VALUE_TO_ULL(ts_msec_token, &thread_ts_msec); + + if (thread_ts_msec > thread_data->most_recent_flow_time) + { + thread_data->most_recent_flow_time = thread_ts_msec; + } + } + + return thread_data; +} + static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * const sock, - struct nDPIsrvd_instance ** const instance) + struct nDPIsrvd_instance ** const instance, + struct nDPIsrvd_thread_data ** const thread_data) { struct nDPIsrvd_flow * flow; struct nDPIsrvd_json_token const * const tokens[] = {TOKEN_GET_SZ(sock, "alias"), TOKEN_GET_SZ(sock, "source"), + TOKEN_GET_SZ(sock, "thread_id"), TOKEN_GET_SZ(sock, "flow_id"), TOKEN_GET_SZ(sock, "thread_ts_msec"), TOKEN_GET_SZ(sock, "flow_last_seen"), TOKEN_GET_SZ(sock, "flow_idle_time")}; + enum + { + TOKEN_ALIAS = 0, + TOKEN_SOURCE, + TOKEN_THREAD_ID, + TOKEN_FLOW_ID, + TOKEN_THREAD_TS_MSEC, + TOKEN_FLOW_LAST_SEEN, + TOKEN_FLOW_IDLE_TIME + }; nDPIsrvd_hashkey flow_key; - *instance = nDPIsrvd_get_instance(sock, tokens[0], tokens[1]); - if (*instance == NULL || nDPIsrvd_build_flow_key(tokens[2], &flow_key) != 0) + *instance = nDPIsrvd_get_instance(sock, tokens[TOKEN_ALIAS], tokens[TOKEN_SOURCE]); + if (*instance == NULL) { return NULL; } + *thread_data = nDPIsrvd_get_thread_data(sock, *instance, tokens[TOKEN_THREAD_ID], tokens[TOKEN_THREAD_TS_MSEC]); + if (*thread_data == NULL) + { + return NULL; + } + + if (nDPIsrvd_build_flow_key(tokens[TOKEN_FLOW_ID], &flow_key) != 0) + { + return NULL; + } HASH_FIND_INT((*instance)->flow_table, &flow_key, flow); if (flow == NULL) @@ -849,34 +987,26 @@ static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * } flow->flow_key = flow_key; - TOKEN_VALUE_TO_ULL(tokens[2], &flow->id_as_ull); + flow->thread_id = (*thread_data)->thread_key; + + TOKEN_VALUE_TO_ULL(tokens[TOKEN_FLOW_ID], &flow->id_as_ull); HASH_ADD_INT((*instance)->flow_table, flow_key, flow); #ifdef ENABLE_MEMORY_PROFILING nDPIsrvd_memprof_log("Flow %llu added: %zu bytes.", flow->id_as_ull, sizeof(*flow) + sock->flow_user_data_size); #endif } - if (tokens[3] != NULL) - { - nDPIsrvd_ull thread_ts_msec; - TOKEN_VALUE_TO_ULL(tokens[3], &thread_ts_msec); - if (thread_ts_msec > (*instance)->most_recent_flow_time) - { - (*instance)->most_recent_flow_time = thread_ts_msec; - } - } - - if (tokens[4] != NULL) + if (tokens[TOKEN_FLOW_LAST_SEEN] != NULL) { nDPIsrvd_ull flow_last_seen; - TOKEN_VALUE_TO_ULL(tokens[4], &flow_last_seen); + TOKEN_VALUE_TO_ULL(tokens[TOKEN_FLOW_LAST_SEEN], &flow_last_seen); flow->last_seen = flow_last_seen; } - if (tokens[5] != NULL) + if (tokens[TOKEN_FLOW_IDLE_TIME] != NULL) { - nDPIsrvd_ull flow_idle_time = 0; - TOKEN_VALUE_TO_ULL(tokens[5], &flow_idle_time); + nDPIsrvd_ull flow_idle_time; + TOKEN_VALUE_TO_ULL(tokens[TOKEN_FLOW_IDLE_TIME], &flow_idle_time); flow->idle_time = flow_idle_time; } @@ -885,27 +1015,39 @@ static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_thread_data * const thread_data, struct nDPIsrvd_flow * const current_flow) { - if (instance == NULL || current_flow == NULL) + struct nDPIsrvd_json_token const * const tokens[] = {TOKEN_GET_SZ(sock, "daemon_event_name"), + TOKEN_GET_SZ(sock, "flow_event_name")}; + enum + { + TOKEN_DAEMON_EVENT_NAME = 0, + TOKEN_FLOW_EVENT_NAME + }; + + if (instance == NULL) { return 0; } - struct nDPIsrvd_json_token const * const daemon_event_name = TOKEN_GET_SZ(sock, "daemon_event_name"); - if (TOKEN_VALUE_EQUALS_SZ(daemon_event_name, "init") != 0) + if (TOKEN_VALUE_EQUALS_SZ(tokens[TOKEN_DAEMON_EVENT_NAME], "init") != 0) { - nDPIsrvd_cleanup_instance(sock, instance, CLEANUP_REASON_DAEMON_INIT); + nDPIsrvd_cleanup_flows(sock, instance, thread_data, CLEANUP_REASON_DAEMON_INIT); } - if (TOKEN_VALUE_EQUALS_SZ(daemon_event_name, "shutdown") != 0) + if (TOKEN_VALUE_EQUALS_SZ(tokens[TOKEN_DAEMON_EVENT_NAME], "shutdown") != 0) { - nDPIsrvd_cleanup_instance(sock, instance, CLEANUP_REASON_DAEMON_SHUTDOWN); + nDPIsrvd_cleanup_flows(sock, instance, thread_data, CLEANUP_REASON_DAEMON_SHUTDOWN); + } + + if (current_flow == NULL) + { + return 0; } - struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name"); int is_idle_flow; - if ((is_idle_flow = TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle")) != 0 || - TOKEN_VALUE_EQUALS_SZ(flow_event_name, "end") != 0) + if ((is_idle_flow = TOKEN_VALUE_EQUALS_SZ(tokens[TOKEN_FLOW_EVENT_NAME], "idle")) != 0 || + TOKEN_VALUE_EQUALS_SZ(tokens[TOKEN_FLOW_EVENT_NAME], "end") != 0) { #ifdef ENABLE_MEMORY_PROFILING nDPIsrvd_memprof_log("Flow %llu deleted: %zu bytes.", @@ -914,19 +1056,25 @@ static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock, #endif nDPIsrvd_cleanup_flow(sock, instance, + thread_data, current_flow, (is_idle_flow != 0 ? CLEANUP_REASON_FLOW_IDLE : CLEANUP_REASON_FLOW_END)); } - else if (current_flow->last_seen + current_flow->idle_time < instance->most_recent_flow_time) + else if (thread_data != NULL && + current_flow->last_seen + current_flow->idle_time < thread_data->most_recent_flow_time) { #ifdef ENABLE_MEMORY_PROFILING - nDPIsrvd_memprof_log("Flow %llu timed out: %zu bytes. Last seen [%llu] + idle time [%llu] < most recent flow time [%llu]. Diff: [%llu]", - current_flow->id_as_ull, - sizeof(*current_flow) + sock->flow_user_data_size, - current_flow->last_seen, current_flow->idle_time, instance->most_recent_flow_time, - instance->most_recent_flow_time - (current_flow->last_seen + current_flow->idle_time)); + nDPIsrvd_memprof_log( + "Flow %llu timed out: %zu bytes. Last seen [%llu] + idle time [%llu] < most recent flow time [%llu]. Diff: " + "[%llu]", + current_flow->id_as_ull, + sizeof(*current_flow) + sock->flow_user_data_size, + current_flow->last_seen, + current_flow->idle_time, + thread_data->most_recent_flow_time, + thread_data->most_recent_flow_time - (current_flow->last_seen + current_flow->idle_time)); #endif - nDPIsrvd_cleanup_flow(sock, instance, current_flow, CLEANUP_REASON_FLOW_TIMEOUT); + nDPIsrvd_cleanup_flow(sock, instance, thread_data, current_flow, CLEANUP_REASON_FLOW_TIMEOUT); } return 0; @@ -1068,13 +1216,14 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_sock } struct nDPIsrvd_instance * instance = NULL; + struct nDPIsrvd_thread_data * thread_data = NULL; struct nDPIsrvd_flow * flow = NULL; - flow = nDPIsrvd_get_flow(sock, &instance); - if (ret == PARSE_OK && sock->json_callback(sock, instance, flow) != CALLBACK_OK) + flow = nDPIsrvd_get_flow(sock, &instance, &thread_data); + if (ret == PARSE_OK && sock->json_callback(sock, instance, thread_data, flow) != CALLBACK_OK) { ret = PARSE_JSON_CALLBACK_ERROR; } - if (nDPIsrvd_check_flow_end(sock, instance, flow) != 0) + if (nDPIsrvd_check_flow_end(sock, instance, thread_data, flow) != 0) { ret = PARSE_FLOW_MGMT_ERROR; } @@ -1168,7 +1317,9 @@ static inline void nDPIsrvd_uthash_free(void * const freeable, size_t const size #endif static inline int nDPIsrvd_verify_flows(struct nDPIsrvd_instance * const instance, - void (*verify_cb)(struct nDPIsrvd_flow const *, void * user_data), + void (*verify_cb)(struct nDPIsrvd_thread_data const * const, + struct nDPIsrvd_flow const *, + void * user_data), void * user_data) { int retval = 0; @@ -1177,11 +1328,30 @@ static inline int nDPIsrvd_verify_flows(struct nDPIsrvd_instance * const instanc HASH_ITER(hh, instance->flow_table, current_flow, ftmp) { - if (current_flow->last_seen + current_flow->idle_time < instance->most_recent_flow_time) + struct nDPIsrvd_thread_data * current_thread_data; + + HASH_FIND_INT(instance->thread_data_table, ¤t_flow->thread_id, current_thread_data); + if (current_thread_data == NULL) + { + if (verify_cb != NULL) + { + verify_cb(current_thread_data, current_flow, user_data); + } + retval = 1; + } + else if (current_flow->thread_id != current_thread_data->thread_key) + { + if (verify_cb != NULL) + { + verify_cb(current_thread_data, current_flow, user_data); + } + retval = 1; + } + else if (current_flow->last_seen + current_flow->idle_time < current_thread_data->most_recent_flow_time) { if (verify_cb != NULL) { - verify_cb(current_flow, user_data); + verify_cb(current_thread_data, current_flow, user_data); } retval = 1; } @@ -1191,11 +1361,16 @@ static inline int nDPIsrvd_verify_flows(struct nDPIsrvd_instance * const instanc } static inline void nDPIsrvd_flow_info(struct nDPIsrvd_socket const * const sock, - void (*info_cb)(struct nDPIsrvd_flow const *, void * user_data), + void (*info_cb)(struct nDPIsrvd_socket const *, + struct nDPIsrvd_instance const *, + struct nDPIsrvd_thread_data const *, + struct nDPIsrvd_flow const *, + void *), void * user_data) { struct nDPIsrvd_instance const * current_instance; struct nDPIsrvd_instance const * itmp; + struct nDPIsrvd_thread_data * current_thread_data; struct nDPIsrvd_flow const * current_flow; struct nDPIsrvd_flow const * ftmp; @@ -1207,7 +1382,8 @@ static inline void nDPIsrvd_flow_info(struct nDPIsrvd_socket const * const sock, { HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp) { - info_cb(current_flow, user_data); + HASH_FIND_INT(current_instance->thread_data_table, ¤t_flow->thread_id, current_thread_data); + info_cb(sock, current_instance, current_thread_data, current_flow, user_data); } } } |