diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-02-01 15:36:00 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2021-02-06 15:41:30 +0100 |
commit | 69b4e662c773c3cfdad29035e7668457e5960c99 (patch) | |
tree | b6b73c02402cebe43749880c6a118d23c488bc09 | |
parent | 257cce1dccb19190ebcb4c48bac421a21daa4a56 (diff) |
nDPIsrvd C API overhaul and massive simplification.
* nDPIsrvd.h does flow mgmt out of the box
* dissect received JSON strings via callback
* added new JSON key/values for packet-flows (usecTimestamp/L3/L4 info)
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r-- | Makefile | 1 | ||||
-rw-r--r-- | config.h | 22 | ||||
-rw-r--r-- | dependencies/nDPIsrvd.h | 525 | ||||
-rw-r--r-- | examples/c-captured/c-captured.c | 368 | ||||
-rw-r--r-- | examples/go-dashboard/main.go | 10 | ||||
-rwxr-xr-x | examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py | 4 | ||||
-rwxr-xr-x | examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py | 4 | ||||
-rw-r--r-- | nDPId.c | 108 |
8 files changed, 611 insertions, 431 deletions
@@ -41,6 +41,7 @@ endif # PKG_CONFIG_BIN ifeq ($(ENABLE_MEMORY_PROFILING),yes) PROJECT_CFLAGS += -DENABLE_MEMORY_PROFILING=1 +UTHASH_CFLAGS += -Duthash_malloc=nDPIsrvd_uthash_malloc -Duthash_free=nDPIsrvd_uthash_free endif ifeq ($(ENABLE_DEBUG),yes) @@ -5,27 +5,27 @@ #define COLLECTOR_UNIX_SOCKET "/tmp/ndpid-collector.sock" #define DISTRIBUTOR_UNIX_SOCKET "/tmp/ndpid-distributor.sock" #define DISTRIBUTOR_HOST "127.0.0.1" -#define DISTRIBUTOR_PORT 7000 +#define DISTRIBUTOR_PORT 7000u /* * NOTE: Buffer size needs to keep in sync with other implementations * e.g. dependencies/nDPIsrvd.py */ -#define NETWORK_BUFFER_MAX_SIZE 12288 /* 8192 + 4096 */ -#define NETWORK_BUFFER_LENGTH_DIGITS 5 +#define NETWORK_BUFFER_MAX_SIZE 12288u /* 8192 + 4096 */ +#define NETWORK_BUFFER_LENGTH_DIGITS 5u #define NETWORK_BUFFER_LENGTH_DIGITS_STR "5" /* nDPId default config options */ #define nDPId_PIDFILE "/tmp/ndpid.pid" -#define nDPId_MAX_FLOWS_PER_THREAD 4096 -#define nDPId_MAX_IDLE_FLOWS_PER_THREAD 512 -#define nDPId_TICK_RESOLUTION 1000 -#define nDPId_MAX_READER_THREADS 32 -#define nDPId_IDLE_SCAN_PERIOD 10000 /* 10 sec */ -#define nDPId_IDLE_TIME 600000 /* 600 sec */ -#define nDPId_TCP_POST_END_FLOW_TIME 60000 /* 60 sec */ +#define nDPId_MAX_FLOWS_PER_THREAD 4096u +#define nDPId_MAX_IDLE_FLOWS_PER_THREAD 512u +#define nDPId_TICK_RESOLUTION 1000u +#define nDPId_MAX_READER_THREADS 32u +#define nDPId_IDLE_SCAN_PERIOD 10000u /* 10 sec */ +#define nDPId_IDLE_TIME 600000u /* 600 sec */ +#define nDPId_TCP_POST_END_FLOW_TIME 60000u /* 60 sec */ #define nDPId_THREAD_DISTRIBUTION_SEED 0x03dd018b -#define nDPId_PACKETS_PER_FLOW_TO_SEND 15 +#define nDPId_PACKETS_PER_FLOW_TO_SEND 15u #define nDPId_FLOW_STRUCT_SEED 0x5defc104 /* nDPIsrvd default config options */ diff --git a/dependencies/nDPIsrvd.h b/dependencies/nDPIsrvd.h index 001e06500..75dcda021 100644 --- a/dependencies/nDPIsrvd.h +++ b/dependencies/nDPIsrvd.h @@ -14,19 +14,109 @@ #include "config.h" #include "jsmn/jsmn.h" +#include "utarray.h" #include "uthash.h" +#define nDPIsrvd_MAX_JSON_TOKENS 128 +#define nDPIsrvd_FLOW_ID_STRLEN 24 +#define nDPIsrvd_JSON_KEY_STRLEN 32 + +#define nDPIsrvd_STRLEN_SZ(s) (sizeof(s)/sizeof(s[0]) - sizeof(s[0])) +#define TOKEN_GET_SZ(sock, key) token_get(sock, (char const *)key, nDPIsrvd_STRLEN_SZ(key)) +#define TOKEN_GET_VALUE_SZ(sock, key, value_length) token_get_value(sock, (char const *)key, nDPIsrvd_STRLEN_SZ(key), value_length) +#define TOKEN_VALUE_EQUALS_SZ(token, string_to_check) token_value_equals(token, string_to_check, nDPIsrvd_STRLEN_SZ(string_to_check)) +#define TOKEN_VALUE_TO_ULL(token, value) token_value_to_ull(token, value) + +#define FIRST_ENUM_VALUE 1 +#define LAST_ENUM_VALUE CONVERSION_LAST_ENUM_VALUE + +enum nDPIsrvd_connect_return +{ + CONNECT_OK = FIRST_ENUM_VALUE, + CONNECT_ERROR_SOCKET, + CONNECT_ERROR_PTON, + CONNECT_ERROR, + CONNECT_LAST_ENUM_VALUE +}; + +enum nDPIsrvd_read_return +{ + READ_OK = CONNECT_LAST_ENUM_VALUE, + READ_PEER_DISCONNECT, + READ_ERROR, + READ_LAST_ENUM_VALUE +}; + +enum nDPIsrvd_parse_return +{ + PARSE_OK = READ_LAST_ENUM_VALUE, + PARSE_INVALID_OPENING_CHAR, + PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT, + PARSE_SIZE_MISSING, + PARSE_STRING_TOO_BIG, + PARSE_INVALID_CLOSING_CHAR, + PARSE_JSMN_ERROR, + PARSE_JSON_CALLBACK_ERROR, + PARSE_JSON_MGMT_ERROR, + PARSE_FLOW_MGMT_ERROR, + PARSE_LAST_ENUM_VALUE +}; + +enum nDPIsrvd_callback_return +{ + CALLBACK_OK = PARSE_LAST_ENUM_VALUE, + CALLBACK_ERROR, + CALLBACK_LAST_ENUM_VALUE +}; + +enum nDPIsrvd_conversion_return +{ + CONVERSION_OK = CALLBACK_LAST_ENUM_VALUE, + CONVERISON_KEY_NOT_FOUND, + CONVERSION_NOT_A_NUMBER, + CONVERSION_RANGE_EXCEEDED, + CONVERSION_LAST_ENUM_VALUE +}; + +typedef unsigned long long int nDPIsrvd_ull; +typedef nDPIsrvd_ull * nDPIsrvd_ull_ptr; + struct nDPIsrvd_flow { - char id[24]; + char id[nDPIsrvd_FLOW_ID_STRLEN]; + nDPIsrvd_ull id_as_ull; + UT_hash_handle hh; + uint8_t flow_user_data[0]; +}; + +struct nDPIsrvd_json_token +{ + char key[nDPIsrvd_FLOW_ID_STRLEN]; + int key_length; UT_hash_handle hh; - uint8_t user_data[0]; + char const * value; + int value_length; }; +struct nDPIsrvd_socket; +#ifdef ENABLE_MEMORY_PROFILING +static inline void * nDPIsrvd_uthash_malloc(size_t const size); +static inline void nDPIsrvd_uthash_free(void * const freeable, size_t const size); +#endif + +typedef enum nDPIsrvd_callback_return (*json_callback)(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_flow * const flow); +typedef void (*flow_end_callback)(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_flow * const flow); + struct nDPIsrvd_socket { int fd; int socket_family; + size_t flow_user_data_size; + struct nDPIsrvd_flow * flow_table; + json_callback json_callback; + flow_end_callback flow_end_callback; union { struct @@ -46,74 +136,29 @@ struct nDPIsrvd_socket size_t used; char * json_string; size_t json_string_start; - unsigned long long int json_string_length; + nDPIsrvd_ull json_string_length; } buffer; + /* jsmn JSON parser */ struct { jsmn_parser parser; - jsmntok_t tokens[128]; - int current_token; + jsmntok_t tokens[nDPIsrvd_MAX_JSON_TOKENS]; int tokens_found; - struct - { - char const * key; - int key_length; - char const * value; - int value_length; - } key_value; } jsmn; + /* easy and fast JSON key/value access via hash table and a static array */ struct { - char const * event_name; - int event_name_len; - char const * flow_id; - int flow_id_len; - } current; -}; + UT_array * tokens; + struct nDPIsrvd_json_token * token_table; + } json; -#define FIRST_ENUM_VALUE 1 -#define LAST_ENUM_VALUE CALLBACK_LAST_ENUM_VALUE - -enum nDPIsrvd_connect_return -{ - CONNECT_OK = FIRST_ENUM_VALUE, - CONNECT_ERROR_SOCKET, - CONNECT_ERROR_PTON, - CONNECT_ERROR, - CONNECT_LAST_ENUM_VALUE + size_t global_user_data_size; + uint8_t global_user_data[0]; }; -enum nDPIsrvd_read_return -{ - READ_OK = CONNECT_LAST_ENUM_VALUE, - READ_PEER_DISCONNECT, - READ_ERROR, - READ_LAST_ENUM_VALUE -}; - -enum nDPIsrvd_parse_return -{ - PARSE_OK = READ_LAST_ENUM_VALUE, - PARSE_INVALID_OPENING_CHAR, - PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT, - PARSE_SIZE_MISSING, - PARSE_STRING_TOO_BIG, - PARSE_INVALID_CLOSING_CHAR, - PARSE_JSMN_ERROR, - PARSE_CALLBACK_ERROR, - PARSE_LAST_ENUM_VALUE -}; - -enum nDPIsrvd_callback_return -{ - CALLBACK_OK = PARSE_LAST_ENUM_VALUE, - CALLBACK_ERROR, - CALLBACK_LAST_ENUM_VALUE -}; - -typedef enum nDPIsrvd_callback_return (*json_callback)(struct nDPIsrvd_socket * const sock, void * const user_data); +static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock); /* Slightly modified code: https://en.wikibooks.org/wiki/Algorithm_Implementation/Miscellaneous/Base64 */ #define WHITESPACE 64 @@ -205,7 +250,14 @@ static inline char const * nDPIsrvd_enum_to_string(int enum_value) "PARSE_STRING_TOO_BIG", "PARSE_INVALID_CLOSING_CHAR", "PARSE_JSMN_ERROR", - "PARSE_CALLBACK_ERROR"}; + "PARSE_JSON_CALLBACK_ERROR", + "PARSE_JSON_MGMT_ERROR", + "PARSE_FLOW_MGMT_ERROR", + "CONVERSION_OK", + "CONVERISON_KEY_NOT_FOUND", + "CONVERSION_NOT_A_NUMBER", + "CONVERSION_RANGE_EXCEEDED", + [LAST_ENUM_VALUE] = "LAST_ENUM_VALUE"}; if (enum_value < FIRST_ENUM_VALUE || enum_value >= LAST_ENUM_VALUE) { @@ -215,32 +267,83 @@ static inline char const * nDPIsrvd_enum_to_string(int enum_value) return enum_str[enum_value - FIRST_ENUM_VALUE]; } -static inline struct nDPIsrvd_socket * nDPIsrvd_init(void) +static inline struct nDPIsrvd_socket * nDPIsrvd_init(size_t global_user_data_size, + size_t flow_user_data_size, + json_callback json_cb, + flow_end_callback flow_end_cb) { - struct nDPIsrvd_socket * sock = (struct nDPIsrvd_socket *)malloc(sizeof(*sock)); + static const UT_icd packet_data_icd = {sizeof(struct nDPIsrvd_json_token), NULL, NULL, NULL}; + struct nDPIsrvd_socket * sock = (struct nDPIsrvd_socket *)malloc(sizeof(*sock) + global_user_data_size); + + if (json_cb == NULL) + { + goto error; + } if (sock != NULL) { + memset(sock, 0, sizeof(*sock)); + sock->fd = -1; sock->socket_family = -1; + sock->flow_user_data_size = flow_user_data_size; + + sock->json_callback = json_cb; + sock->flow_end_callback = flow_end_cb; + + utarray_new(sock->json.tokens, &packet_data_icd); + if (sock->json.tokens == NULL) + { + goto error; + } + utarray_reserve(sock->json.tokens, nDPIsrvd_MAX_JSON_TOKENS); + + sock->global_user_data_size = global_user_data_size; } return sock; +error: + nDPIsrvd_free(&sock); + return NULL; } -static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock, struct nDPIsrvd_flow ** const flow_table) +static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock) { struct nDPIsrvd_flow * current_flow; - struct nDPIsrvd_flow * tmp; + struct nDPIsrvd_flow * ftmp; + struct nDPIsrvd_json_token * current_json_token; + struct nDPIsrvd_json_token * jtmp; + + if (sock == NULL || *sock == NULL) + { + return; + } + + if ((*sock)->json.token_table != NULL) + { + HASH_ITER(hh, (*sock)->json.token_table, current_json_token, jtmp) + { + HASH_DEL((*sock)->json.token_table, current_json_token); + } + (*sock)->json.token_table = NULL; + } + + if ((*sock)->json.tokens != NULL) + { + utarray_free((*sock)->json.tokens); + } - if (flow_table != NULL) + if ((*sock)->flow_table != NULL) { - HASH_ITER(hh, *flow_table, current_flow, tmp) + HASH_ITER(hh, (*sock)->flow_table, current_flow, ftmp) { - HASH_DEL(*flow_table, current_flow); + if ((*sock)->flow_end_callback != NULL) { + (*sock)->flow_end_callback(*sock, current_flow); + } + HASH_DEL((*sock)->flow_table, current_flow); free(current_flow); } - *flow_table = NULL; + (*sock)->flow_table = NULL; } free(*sock); @@ -321,132 +424,145 @@ static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * c return READ_OK; } -static inline int token_event_equals(struct nDPIsrvd_socket const * const sock, char const * const event_value) +static inline int jsmn_token_is_key(int current_token_index) { - return sock->current.event_name != NULL && sock->current.event_name_len > 0 && - (int)strlen(event_value) == sock->current.event_name_len && - strncmp(sock->current.event_name, event_value, sock->current.event_name_len) == 0; + return current_token_index % 2; } -static inline int token_is_key(struct nDPIsrvd_socket const * const sock) +static inline char const * jsmn_token_get(struct nDPIsrvd_socket const * const sock, int current_token_index) { - return sock->jsmn.current_token % 2; + return sock->buffer.json_string + sock->jsmn.tokens[current_token_index].start; } -static inline char const * token_get(struct nDPIsrvd_socket const * const sock) +static inline int jsmn_token_size(struct nDPIsrvd_socket const * const sock, int current_token_index) { - return sock->buffer.json_string + sock->jsmn.tokens[sock->jsmn.current_token].start; + return sock->jsmn.tokens[current_token_index].end - sock->jsmn.tokens[current_token_index].start; } -static inline int token_size(struct nDPIsrvd_socket const * const sock) +static inline int jsmn_token_is_jsmn_type(struct nDPIsrvd_socket const * const sock, int current_token_index, jsmntype_t type_to_check) { - return sock->jsmn.tokens[sock->jsmn.current_token].end - sock->jsmn.tokens[sock->jsmn.current_token].start; + return sock->jsmn.tokens[current_token_index].type == type_to_check; } -static inline int token_is_start(struct nDPIsrvd_socket const * const sock) +static inline struct nDPIsrvd_json_token const * +token_get(struct nDPIsrvd_socket const * const sock, char const * const key, size_t key_length) { - return sock->jsmn.current_token == 0; + struct nDPIsrvd_json_token * token = NULL; + HASH_FIND(hh, sock->json.token_table, key, key_length, token); + return token; } -static inline int token_is_end(struct nDPIsrvd_socket const * const sock) +static inline char const * +token_get_value(struct nDPIsrvd_socket const * const sock, char const * const key, size_t key_length, size_t * value_length) { - return sock->jsmn.current_token == sock->jsmn.tokens_found; -} + struct nDPIsrvd_json_token const * const token = token_get(sock, key, key_length); + if (token != NULL) + { + if (value_length != NULL) + { + *value_length = token->value_length; + } + return token->value; + } -static inline int token_is_key_value_pair(struct nDPIsrvd_socket const * const sock) -{ - return sock->jsmn.current_token > 0 && sock->jsmn.current_token < sock->jsmn.tokens_found; + return NULL; } -static inline int token_is_jsmn_type(struct nDPIsrvd_socket const * const sock, jsmntype_t type_to_check) +static inline int token_value_equals(struct nDPIsrvd_json_token const * const token, char const * const value, size_t value_length) { - if (token_is_key_value_pair(sock) == 0) + if (token == NULL || token->value == NULL) { return 0; } - return sock->jsmn.tokens[sock->jsmn.current_token].type == type_to_check; + return strncmp(token->value, value, token->value_length) == 0 && + token->value_length == (int)value_length; } -static inline int key_equals(struct nDPIsrvd_socket const * const sock, char const * const name) +static inline enum nDPIsrvd_conversion_return +token_value_to_ull(struct nDPIsrvd_json_token const * const token, nDPIsrvd_ull_ptr const value) { - if (sock->jsmn.key_value.key == NULL || sock->jsmn.key_value.key_length == 0) + if (token == NULL || token->value == NULL || token->value_length == 0) { - return 0; + return CONVERISON_KEY_NOT_FOUND; } - return (int)strlen(name) == sock->jsmn.key_value.key_length && - strncmp(name, sock->jsmn.key_value.key, sock->jsmn.key_value.key_length) == 0; -} - -static inline int value_equals(struct nDPIsrvd_socket const * const sock, char const * const name) -{ - if (sock->jsmn.key_value.value == NULL || sock->jsmn.key_value.value_length == 0) { - return 0; + char const * const value_as_string = token->value; + char * endptr = NULL; + *value = strtoull(value_as_string, &endptr, 10); + + if (value_as_string == endptr) + { + return CONVERSION_NOT_A_NUMBER; + } + if (errno == ERANGE) + { + return CONVERSION_RANGE_EXCEEDED; + } } - return (int)strlen(name) == sock->jsmn.key_value.value_length && - strncmp(name, sock->jsmn.key_value.value, sock->jsmn.key_value.value_length) == 0; + return CONVERSION_OK; } -static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * const sock, - struct nDPIsrvd_flow ** const flow_table, - size_t user_data_size) +static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * const sock) { - if (token_is_start(sock) == 1) - { - memset(&sock->current, 0, sizeof(sock->current)); - } - else if (token_is_end(sock) == 1) + struct nDPIsrvd_json_token const * const flow_id = TOKEN_GET_SZ(sock, "flow_id"); + + if (flow_id != NULL) { - if (sock->current.event_name != NULL && sock->current.flow_id != NULL) + if (flow_id->value_length > nDPIsrvd_FLOW_ID_STRLEN) { + return NULL; + } + + struct nDPIsrvd_flow * flow = NULL; + HASH_FIND(hh, sock->flow_table, flow_id->value, (size_t)flow_id->value_length, flow); + + if (flow == NULL) { - if (strncmp(sock->current.event_name, "new", sock->current.event_name_len) == 0) - { - struct nDPIsrvd_flow * f = (struct nDPIsrvd_flow *)calloc(1, sizeof(*f) + user_data_size); - if (f == NULL) - { - return NULL; - } - snprintf(f->id, sizeof(f->id), "%.*s", sock->current.flow_id_len, sock->current.flow_id); - HASH_ADD(hh, *flow_table, id, (size_t)sock->current.flow_id_len, f); - return f; - } - else + flow = (struct nDPIsrvd_flow *)calloc(1, sizeof(*flow) + sock->flow_user_data_size); + if (flow == NULL) { - struct nDPIsrvd_flow * f = NULL; - HASH_FIND(hh, *flow_table, sock->current.flow_id, (size_t)sock->current.flow_id_len, f); - return f; + return NULL; } + + TOKEN_VALUE_TO_ULL(flow_id, &flow->id_as_ull); + snprintf(flow->id, nDPIsrvd_FLOW_ID_STRLEN, "%.*s", flow_id->value_length, flow_id->value); + HASH_ADD(hh, sock->flow_table, id,flow_id->value_length, flow); } + + return flow; } - else if (token_is_key_value_pair(sock) == 1) + + return NULL; +} + +static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_flow * const current_flow) +{ + if (current_flow == NULL) { - if (key_equals(sock, "packet_event_name") == 1) - { - sock->current.event_name = sock->jsmn.key_value.value; - sock->current.event_name_len = sock->jsmn.key_value.value_length; - } - else if (key_equals(sock, "flow_event_name") == 1) - { - sock->current.event_name = sock->jsmn.key_value.value; - sock->current.event_name_len = sock->jsmn.key_value.value_length; - } - else if (key_equals(sock, "flow_id") == 1) - { - sock->current.flow_id = sock->jsmn.key_value.value; - sock->current.flow_id_len = sock->jsmn.key_value.value_length; + return 1; + } + + struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name"); + + if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle") != 0 && + TOKEN_VALUE_EQUALS_SZ(flow_event_name, "end") != 0) + { + if (sock->flow_end_callback != NULL) { + sock->flow_end_callback(sock, current_flow); } + HASH_DEL(sock->flow_table, current_flow); + free(current_flow); } - return NULL; + return 0; } -static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket * const sock, - json_callback cb, - void * user_data) +static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket * const sock) { + enum nDPIsrvd_parse_return ret = PARSE_OK; + while (sock->buffer.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1) { if (sock->buffer.raw[NETWORK_BUFFER_LENGTH_DIGITS] != '{') @@ -486,65 +602,95 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket * sock->jsmn.tokens_found = jsmn_parse(&sock->jsmn.parser, (char *)(sock->buffer.raw + sock->buffer.json_string_start), sock->buffer.json_string_length - sock->buffer.json_string_start, - sock->jsmn.tokens, - sizeof(sock->jsmn.tokens) / sizeof(sock->jsmn.tokens[0])); + sock->jsmn.tokens, nDPIsrvd_MAX_JSON_TOKENS); if (sock->jsmn.tokens_found < 0 || sock->jsmn.tokens[0].type != JSMN_OBJECT) { return PARSE_JSMN_ERROR; } - sock->jsmn.key_value.key = NULL; - sock->jsmn.key_value.key_length = 0; - sock->jsmn.key_value.value = NULL; - sock->jsmn.key_value.value_length = 0; - sock->jsmn.current_token = 0; - - if (cb(sock, user_data) != CALLBACK_OK) - { - return PARSE_CALLBACK_ERROR; - } - - for (sock->jsmn.current_token = 1; sock->jsmn.current_token < sock->jsmn.tokens_found; - sock->jsmn.current_token++) + char const * key = NULL; + int key_length = 0; + for (int current_token = 1; current_token < sock->jsmn.tokens_found; current_token++) { - if (token_is_key(sock) == 1) + if (jsmn_token_is_key(current_token) == 1) { - sock->jsmn.key_value.key = token_get(sock); - sock->jsmn.key_value.key_length = token_size(sock); + if (key != NULL) + { + ret = PARSE_JSMN_ERROR; + break; + } + + key = jsmn_token_get(sock, current_token); + key_length = jsmn_token_size(sock, current_token); - if (sock->jsmn.key_value.key == NULL || sock->jsmn.key_value.value != NULL) + if (key == NULL) { - return PARSE_JSMN_ERROR; + ret = PARSE_JSMN_ERROR; + break; } } else { - sock->jsmn.key_value.value = token_get(sock); - sock->jsmn.key_value.value_length = token_size(sock); + struct nDPIsrvd_json_token * token = NULL; + HASH_FIND(hh, sock->json.token_table, key, (size_t)key_length, token); - if (sock->jsmn.key_value.key == NULL || sock->jsmn.key_value.value == NULL) + if (token != NULL) { - return PARSE_JSMN_ERROR; - } - if (cb(sock, user_data) != CALLBACK_OK) - { - return PARSE_CALLBACK_ERROR; + token->value = jsmn_token_get(sock, current_token); + token->value_length = jsmn_token_size(sock, current_token); + } else { + struct nDPIsrvd_json_token jt = { + .value = jsmn_token_get(sock, current_token), + .value_length = jsmn_token_size(sock, current_token), + .hh = {} + }; + + if (key == NULL || key_length > nDPIsrvd_JSON_KEY_STRLEN || + utarray_len(sock->json.tokens) == nDPIsrvd_MAX_JSON_TOKENS) + { + ret = PARSE_JSON_MGMT_ERROR; + break; + } + + jt.key_length = key_length; + snprintf(jt.key, nDPIsrvd_JSON_KEY_STRLEN, "%.*s", key_length, key); + utarray_push_back(sock->json.tokens, &jt); + HASH_ADD_STR(sock->json.token_table, key, + (struct nDPIsrvd_json_token *)utarray_back(sock->json.tokens)); } - sock->jsmn.key_value.key = NULL; - sock->jsmn.key_value.key_length = 0; - sock->jsmn.key_value.value = NULL; - sock->jsmn.key_value.value_length = 0; + key = NULL; + key_length = 0; } } - if (cb(sock, user_data) != CALLBACK_OK) + struct nDPIsrvd_flow * flow = NULL; + flow = nDPIsrvd_get_flow(sock); + if (flow == NULL) + { + ret = PARSE_FLOW_MGMT_ERROR; + } + if (ret == PARSE_OK && + sock->json_callback(sock, flow) != CALLBACK_OK) + { + ret = PARSE_JSON_CALLBACK_ERROR; + } + if (nDPIsrvd_check_flow_end(sock, flow) != 0) { - return PARSE_CALLBACK_ERROR; + ret = PARSE_FLOW_MGMT_ERROR; } - sock->jsmn.current_token = -1; sock->jsmn.tokens_found = 0; + { + struct nDPIsrvd_json_token * current_token = NULL; + struct nDPIsrvd_json_token * jtmp = NULL; + + HASH_ITER(hh, sock->json.token_table, current_token, jtmp) + { + current_token->value = NULL; + current_token->value_length = 0; + } + } memmove(sock->buffer.raw, sock->buffer.raw + sock->buffer.json_string_length, @@ -554,7 +700,28 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket * sock->buffer.json_string_start = 0; } - return PARSE_OK; + return ret; } +#ifdef ENABLE_MEMORY_PROFILING +static inline void * nDPIsrvd_uthash_malloc(size_t const size) +{ + void * p = malloc(size); + + if (p == NULL) + { + return NULL; + } + printf("malloc(%zu)\n", size); + + return p; +} + +static inline void nDPIsrvd_uthash_free(void * const freeable, size_t const size) +{ + printf("free(%zu)\n", size); + free(freeable); +} +#endif + #endif diff --git a/examples/c-captured/c-captured.c b/examples/c-captured/c-captured.c index b75b4e463..316cc97d3 100644 --- a/examples/c-captured/c-captured.c +++ b/examples/c-captured/c-captured.c @@ -1,5 +1,7 @@ #include <arpa/inet.h> #include <errno.h> +#include <ndpi_main.h> +#include <ndpi_typedefs.h> #include <pcap/pcap.h> #include <signal.h> #include <stdio.h> @@ -16,9 +18,10 @@ struct packet_data { - uint64_t packet_ts; - size_t packet_len; - size_t base64_packet_size; + nDPIsrvd_ull packet_ts_sec; + nDPIsrvd_ull packet_ts_usec; + nDPIsrvd_ull packet_len; + int base64_packet_size; union { char * base64_packet; char const * base64_packet_const; @@ -27,30 +30,17 @@ struct packet_data struct flow_user_data { + uint8_t flow_new_seen; + uint8_t detection_finished; uint8_t guessed; uint8_t detected; - int pkt_datalink; + nDPIsrvd_ull flow_datalink; + nDPIsrvd_ull flow_max_packets; + nDPIsrvd_ull flow_l4_header_len; + nDPIsrvd_ull flow_total_l4_payload_len; UT_array * packets; }; -struct callback_tmp_data -{ - uint8_t guessed; - uint8_t detected; - int pkt_datalink; - - uint8_t flow_end_or_idle; - uint8_t is_packet_flow; - - struct packet_data pkt; -}; - -struct callback_user_data -{ - struct nDPIsrvd_flow * flow_table; - struct callback_tmp_data tmp; -}; - struct nDPIsrvd_socket * sock = NULL; static int main_thread_shutdown = 0; static char const serv_listen_path[] = DISTRIBUTOR_UNIX_SOCKET; @@ -149,13 +139,13 @@ static int packet_write_pcap_file(UT_array const * const pd_array, int pkt_datal if (nDPIsrvd_base64decode(pd_elt->base64_packet, pd_elt->base64_packet_size, pkt_buf, &pkt_buf_len) != 0 || pkt_buf_len == 0) { - printf("packet base64 decode failed (%zu bytes): %s\n", pd_elt->base64_packet_size, pd_elt->base64_packet); + printf("packet base64 decode failed (%d bytes): %s\n", pd_elt->base64_packet_size, pd_elt->base64_packet); } else { struct pcap_pkthdr phdr; - phdr.ts.tv_sec = 0; - phdr.ts.tv_usec = 0; + phdr.ts.tv_sec = pd_elt->packet_ts_sec; + phdr.ts.tv_usec = pd_elt->packet_ts_usec; phdr.caplen = pkt_buf_len; phdr.len = pkt_buf_len; pcap_dump((unsigned char *)pd, &phdr, pkt_buf); @@ -184,199 +174,184 @@ static void packet_data_print(UT_array const * const pd_array) { break; } - printf("\tpacket-data base64 length: %zu\n", pd_elt->base64_packet_size); + printf("\tpacket-data base64 length: %d\n", pd_elt->base64_packet_size); } while ((pd_elt = (struct packet_data *)utarray_next(pd_array, pd_elt)) != NULL); } #else #define packet_data_print(pd_array) #endif -enum nDPIsrvd_callback_return nDPIsrvd_json_callback(struct nDPIsrvd_socket * const sock, void * const user_data) +static void perror_ull(enum nDPIsrvd_conversion_return retval, char const * const prefix) { - struct callback_user_data * const cb_user_data = (struct callback_user_data *)user_data; - struct nDPIsrvd_flow * flow = nDPIsrvd_get_flow(sock, &cb_user_data->flow_table, sizeof(struct flow_user_data)); - struct flow_user_data * flow_user = (struct flow_user_data *)(flow != NULL ? flow->user_data : NULL); - - if (token_is_start(sock) == 1) /* Start of a JSON string. */ + switch (retval) { - memset(&cb_user_data->tmp, 0, sizeof(cb_user_data->tmp)); - cb_user_data->tmp.pkt_datalink = -1; -#ifdef VERBOSE - printf("JSON "); -#endif - return CALLBACK_OK; + case CONVERSION_OK: + return; + + case CONVERISON_KEY_NOT_FOUND: + fprintf(stderr, "%s `: Key not found.\n", prefix); + break; + case CONVERSION_NOT_A_NUMBER: + fprintf(stderr, "%s: Not a valid number.\n", prefix); + break; + case CONVERSION_RANGE_EXCEEDED: + fprintf(stderr, "%s: Number too large.\n", prefix); + break; + + default: + fprintf(stderr, "Internal error, invalid conversion return value.\n"); } - else if (token_is_end(sock) == 1) /* End of a JSON string. */ +} + +static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_flow * const flow) +{ + struct flow_user_data * const flow_user = (struct flow_user_data *)flow->flow_user_data; + +#ifdef VERBOSE + struct nDPIsrvd_json_token * current_token = NULL; + struct nDPIsrvd_json_token * jtmp = NULL; + + HASH_ITER(hh, sock->json.token_table, current_token, jtmp) { - if (flow != NULL) + if (current_token->value != NULL) { - if (cb_user_data->tmp.is_packet_flow == 1) - { - if (flow_user->packets == NULL) - { - utarray_new(flow_user->packets, &packet_data_icd); - } - if (flow_user->packets != NULL) - { - utarray_push_back(flow_user->packets, &cb_user_data->tmp.pkt); - } - flow_user->pkt_datalink = cb_user_data->tmp.pkt_datalink; - } - else - { - if (cb_user_data->tmp.guessed != 0) - { - flow_user->guessed = cb_user_data->tmp.guessed; - } - if (cb_user_data->tmp.detected != 0) - { - flow_user->detected = cb_user_data->tmp.detected; - } - } - if (cb_user_data->tmp.flow_end_or_idle == 1 && (flow_user->guessed != 0 || flow_user->detected == 0)) - { - if (flow_user->packets != NULL) - { - packet_data_print(flow_user->packets); - char pcap_filename[64]; - if (generate_pcap_filename(flow, flow_user, pcap_filename, sizeof(pcap_filename)) == NULL) - { - fprintf(stderr, "%s\n", "Internal error, exit .."); - return CALLBACK_ERROR; - } - printf("dump flow with id %s to %s\n", flow->id, pcap_filename); - if (packet_write_pcap_file(flow_user->packets, flow_user->pkt_datalink, pcap_filename) != 0) - { - return CALLBACK_ERROR; - } - utarray_free(flow_user->packets); - flow_user->packets = NULL; - } - } -#ifdef VERBOSE - printf("GUESSED: %u, DETECTED: %u ", flow_user->guessed, flow_user->detected); -#endif + printf("[%.*s : %.*s] ", + current_token->key_length, current_token->key, + current_token->value_length, current_token->value); } -#ifdef VERBOSE - printf("EoF\n"); -#endif - return CALLBACK_OK; } + printf("EoF\n"); +#endif - if (token_is_key_value_pair(sock) != 1) + if (flow_user == NULL || flow_user->detection_finished != 0) { - fprintf(stderr, "%s\n", "Internal error, exit .."); - return CALLBACK_ERROR; + return CALLBACK_OK; } - if (key_equals(sock, "packet_event_name") == 1) + if (TOKEN_VALUE_EQUALS_SZ(TOKEN_GET_SZ(sock, "packet_event_name"), "packet-flow") != 0) { - if (value_equals(sock, "packet-flow") == 1) + struct nDPIsrvd_json_token const * const pkt = TOKEN_GET_SZ(sock, "pkt"); + if (pkt == NULL) { - cb_user_data->tmp.is_packet_flow = 1; + return CALLBACK_ERROR; } - } - else if (key_equals(sock, "pkt") == 1) - { - cb_user_data->tmp.pkt.base64_packet_const = sock->jsmn.key_value.value; - cb_user_data->tmp.pkt.base64_packet_size = sock->jsmn.key_value.value_length; - } - else if (key_equals(sock, "pkt_ts") == 1) - { - char * endptr = NULL; - unsigned long long int value = strtoull(sock->jsmn.key_value.value, &endptr, 10); - if (sock->jsmn.key_value.value == endptr) + if (flow_user->packets == NULL) { - fprintf(stderr, - "pkt_ts `%.*s': Value `%.*s' is not a valid number.\n", - sock->jsmn.key_value.key_length, - sock->jsmn.key_value.key, - sock->jsmn.key_value.value_length, - sock->jsmn.key_value.value); - return CALLBACK_ERROR; + utarray_new(flow_user->packets, &packet_data_icd); } - if (errno == ERANGE) + if (flow_user->packets == NULL) { - fprintf(stderr, - "pkt_ts `%.*s': Number too large.\n", - sock->jsmn.key_value.key_length, - sock->jsmn.key_value.key); return CALLBACK_ERROR; } - cb_user_data->tmp.pkt.packet_ts = value; + + nDPIsrvd_ull pkt_ts_sec = 0ull; + perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_ts_sec"), &pkt_ts_sec), "pkt_ts_sec"); + + nDPIsrvd_ull pkt_ts_usec = 0ull; + perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_ts_usec"), &pkt_ts_usec), "pkt_ts_usec"); + + nDPIsrvd_ull pkt_len = 0ull; + perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_len"), &pkt_len), "pkt_len"); + + nDPIsrvd_ull pkt_l4_len = 0ull; + perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "pkt_l4_len"), &pkt_l4_len), "pkt_l4_len"); + + struct packet_data pd = { + .packet_ts_sec = pkt_ts_sec, + .packet_ts_usec = pkt_ts_usec, + .packet_len = pkt_len, + .base64_packet_size = pkt->value_length, + .base64_packet_const = pkt->value + }; + utarray_push_back(flow_user->packets, &pd); + flow_user->flow_total_l4_payload_len += pkt_l4_len - flow_user->flow_l4_header_len; } - else if (key_equals(sock, "pkt_len") == 1) + { - char * endptr = NULL; - unsigned long long int value = strtoull(sock->jsmn.key_value.value, &endptr, 10); - if (sock->jsmn.key_value.value == endptr) + struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name"); + if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "new") != 0) { - fprintf(stderr, - "pkt_len `%.*s': Value `%.*s' is not a valid number.\n", - sock->jsmn.key_value.key_length, - sock->jsmn.key_value.key, - sock->jsmn.key_value.value_length, - sock->jsmn.key_value.value); - return CALLBACK_ERROR; - } - if (errno == ERANGE) + perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "flow_datalink"), &flow_user->flow_datalink), "flow_datalink"); + perror_ull(TOKEN_VALUE_TO_ULL(TOKEN_GET_SZ(sock, "flow_max_packets"), &flow_user->flow_max_packets), "flow_max_packets"); + + struct nDPIsrvd_json_token const * const l4_proto = TOKEN_GET_SZ(sock, "l4_proto"); + if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "tcp") != 0) + { + flow_user->flow_l4_header_len = sizeof(struct ndpi_tcphdr); + } else if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "udp") != 0) + { + flow_user->flow_l4_header_len = sizeof(struct ndpi_udphdr); + } else if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "icmp") != 0) + { + flow_user->flow_l4_header_len = sizeof(struct ndpi_icmphdr); + } else if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "icmp6") != 0) + { + flow_user->flow_l4_header_len = sizeof(struct ndpi_icmp6hdr); + } + + flow_user->flow_new_seen = 1; + + return CALLBACK_OK; + } else if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "guessed") != 0) { - fprintf(stderr, - "pkt_len `%.*s': Number too large.\n", - sock->jsmn.key_value.key_length, - sock->jsmn.key_value.key); - return CALLBACK_ERROR; - } - cb_user_data->tmp.pkt.packet_len = value; - } - else if (key_equals(sock, "pkt_datalink") == 1) - { - char * endptr = NULL; - unsigned long long int value = strtoull(sock->jsmn.key_value.value, &endptr, 10); - if (sock->jsmn.key_value.value == endptr) + flow_user->guessed = 1; + flow_user->detection_finished = 1; + } else if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "not-detected") != 0) { - fprintf(stderr, - "pkt_datalink `%.*s': Value `%.*s' is not a valid number.\n", - sock->jsmn.key_value.key_length, - sock->jsmn.key_value.key, - sock->jsmn.key_value.value_length, - sock->jsmn.key_value.value); - return CALLBACK_ERROR; - } - if (errno == ERANGE || value > (unsigned long long int)((uint32_t)-1)) + flow_user->detected = 0; + flow_user->detection_finished = 1; + } else if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "detected") != 0) { - fprintf(stderr, - "pkt_datalink `%.*s': Number too large.\n", - sock->jsmn.key_value.key_length, - sock->jsmn.key_value.key); - return CALLBACK_ERROR; + flow_user->detected = 1; + flow_user->detection_finished = 1; + if (flow_user->packets != NULL) + { + utarray_free(flow_user->packets); + flow_user->packets = NULL; + } + + return CALLBACK_OK; } - cb_user_data->tmp.pkt_datalink = value; - } - else if (key_equals(sock, "flow_event_name") == 1) - { - if (value_equals(sock, "end") == 1 || value_equals(sock, "idle") == 1) + + if (flow_user->flow_new_seen == 0) { - cb_user_data->tmp.flow_end_or_idle = 1; + return CALLBACK_OK; } - else if (value_equals(sock, "guessed") == 1) + + if (flow_user->packets == NULL || flow_user->flow_max_packets == 0 || utarray_len(flow_user->packets) == 0) { - cb_user_data->tmp.guessed = 1; + printf("flow %s: No packets captured.\n", flow->id); + + return CALLBACK_OK; } - else if (value_equals(sock, "detected") == 1) + + if (flow_user->detection_finished != 0 && + (flow_user->guessed != 0 || flow_user->detected == 0)) { - cb_user_data->tmp.detected = 1; + packet_data_print(flow_user->packets); + if (flow_user->flow_total_l4_payload_len > 0) + { + char pcap_filename[64]; + if (generate_pcap_filename(flow, flow_user, pcap_filename, sizeof(pcap_filename)) == NULL) + { + fprintf(stderr, "%s\n", "Internal error, exit .."); + return CALLBACK_ERROR; + } + printf("flow %s: save to %s\n", flow->id, pcap_filename); + if (packet_write_pcap_file(flow_user->packets, flow_user->flow_datalink, pcap_filename) != 0) + { + return CALLBACK_ERROR; + } + } else { + printf("flow %s: captured packets do not have any l4 payload\n", flow->id); + } + + utarray_free(flow_user->packets); + flow_user->packets = NULL; } } -#ifdef VERBOSE - printf("[%.*s : %.*s] ", - sock->jsmn.key_value.key_length, - sock->jsmn.key_value.key, - sock->jsmn.key_value.value_length, - sock->jsmn.key_value.value); -#endif - return CALLBACK_OK; } @@ -390,12 +365,21 @@ static void sighandler(int signum) } } -int main(int argc, char ** argv) +static void captured_flow_end_callback(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_flow * const flow) { - struct callback_user_data cb_user_data; + (void)sock; + + struct flow_user_data * const ud = (struct flow_user_data *)flow->flow_user_data; + if (ud != NULL && ud->packets != NULL) + { + utarray_free(ud->packets); + ud->packets = NULL; + } +} - memset(&cb_user_data, 0, sizeof(cb_user_data)); - sock = nDPIsrvd_init(); +int main(int argc, char ** argv) +{ + sock = nDPIsrvd_init(0, sizeof(struct flow_user_data), captured_json_callback, captured_flow_end_callback); if (sock == NULL) { fprintf(stderr, "%s: nDPIsrvd socket memory allocation failed!\n", argv[0]); @@ -406,8 +390,9 @@ int main(int argc, char ** argv) signal(SIGTERM, sighandler); signal(SIGPIPE, sighandler); - enum nDPIsrvd_connect_return connect_ret; + enum nDPIsrvd_connect_return connect_ret = CONNECT_ERROR; + printf("Recv buffer size: %u\n", NETWORK_BUFFER_MAX_SIZE); if (argc == 2) { printf("Connecting to UNIX socket: %s\n", argv[1]); @@ -430,7 +415,7 @@ int main(int argc, char ** argv) if (connect_ret != CONNECT_OK) { fprintf(stderr, "%s: nDPIsrvd socket connect failed!\n", argv[0]); - nDPIsrvd_free(&sock, &cb_user_data.flow_table); + nDPIsrvd_free(&sock); return 1; } @@ -444,7 +429,7 @@ int main(int argc, char ** argv) break; } - enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse(sock, nDPIsrvd_json_callback, &cb_user_data); + enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse(sock); if (parse_ret != PARSE_OK) { fprintf(stderr, "%s: nDPIsrvd parse failed with: %s\n", argv[0], nDPIsrvd_enum_to_string(parse_ret)); @@ -452,18 +437,7 @@ int main(int argc, char ** argv) } } - struct nDPIsrvd_flow * current_flow; - struct nDPIsrvd_flow * tmp; - HASH_ITER(hh, cb_user_data.flow_table, current_flow, tmp) - { - struct flow_user_data * const ud = (struct flow_user_data *)current_flow->user_data; - if (ud != NULL && ud->packets != NULL) - { - utarray_free(ud->packets); - ud->packets = NULL; - } - } - nDPIsrvd_free(&sock, &cb_user_data.flow_table); + nDPIsrvd_free(&sock); return 0; } diff --git a/examples/go-dashboard/main.go b/examples/go-dashboard/main.go index ef468f3d9..13c5d462b 100644 --- a/examples/go-dashboard/main.go +++ b/examples/go-dashboard/main.go @@ -29,17 +29,19 @@ type packet_event struct { FlowID uint32 `json:"flow_id"` FlowPacketID uint64 `json:"flow_packet_id"` - MaxPackets uint8 `json:"max_packets"` PacketEventID uint8 `json:"packet_event_id"` PacketEventName string `json:"packet_event_name"` PacketOversize bool `json:"pkt_oversize"` - PacketTimestamp uint64 `json:"pkt_ts"` + PacketTimestampS uint64 `json:"pkt_ts_sec"` + PacketTimestampUs uint64 `json:"pkt_ts_usec"` PacketLength uint32 `json:"pkt_len"` + PacketL4Length uint32 `json:"pkt_l4_len"` Packet string `json:"pkt"` PacketCaptureLength uint32 `json:"pkt_caplen"` PacketType uint32 `json:"pkt_type"` - PacketIpOffset uint32 `json:"pkt_ipoffset"` + PacketL3Offset uint32 `json:"pkt_l3_offset"` + PacketL4Offset uint32 `json:"pkt_l4_offset"` } type flow_event struct { @@ -54,6 +56,8 @@ type flow_event struct { FlowMinLayer4DataLength uint64 `json:"flow_min_l4_data_len"` FlowMaxLayer4DataLength uint64 `json:"flow_max_l4_data_len"` FlowAvgLayer4DataLength uint64 `json:"flow_avg_l4_data_len"` + FlowDatalinkLayer uint8 `json:"flow_datalink"` + MaxPackets uint8 `json:"flow_max_packets"` IsMidstreamFlow uint32 `json:"midstream"` } diff --git a/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py b/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py index 899d61e47..f04e4d955 100755 --- a/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py +++ b/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py @@ -51,12 +51,12 @@ def parse_json_str(json_str): if flow_id not in FLOWS: return - FLOWS[flow_id].addPacket(buffer_decoded, j['pkt_type'], j['pkt_ipoffset']) + FLOWS[flow_id].addPacket(buffer_decoded, j['pkt_type'], j['pkt_l3_offset']) if j['packet_event_name'] == 'packet': flow = PcapPacket() - flow.addPacket(buffer_decoded, j['pkt_type'], j['pkt_ipoffset']) + flow.addPacket(buffer_decoded, j['pkt_type'], j['pkt_l3_offset']) if __name__ == '__main__': diff --git a/examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py b/examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py index aff70a2b8..1a07e2e90 100755 --- a/examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py +++ b/examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py @@ -47,12 +47,12 @@ def parse_json_str(json_str): if flow_id not in FLOWS: return - FLOWS[flow_id].addPacket(buffer_decoded, j['pkt_type'], j['pkt_ipoffset']) + FLOWS[flow_id].addPacket(buffer_decoded, j['pkt_type'], j['pkt_l3_offset']) if j['packet_event_name'] == 'packet': flow = PcapPacket() - flow.addPacket(buffer_decoded, j['pkt_type'], j['pkt_ipoffset']) + flow.addPacket(buffer_decoded, j['pkt_type'], j['pkt_l3_offset']) if __name__ == '__main__': @@ -253,7 +253,7 @@ static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = { static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {}; int main_thread_shutdown = 0; -static uint64_t global_flow_id = 0; +static uint64_t global_flow_id = 1; #ifdef ENABLE_MEMORY_PROFILING static uint64_t ndpi_memory_alloc_count = 0; @@ -1296,7 +1296,9 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa struct pcap_pkthdr const * const header, uint8_t const * const packet, uint16_t pkt_type, - uint16_t pkt_ipoffset, + uint16_t pkt_l3_offset, + uint16_t pkt_l4_offset, + uint16_t pkt_l4_len, struct nDPId_flow_info const * const flow, enum packet_event event) { @@ -1319,7 +1321,6 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa } ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_id", flow->flow_id); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packet_id", flow->packets_processed); - ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "max_packets", max_packets_per_flow_to_send); } ndpi_serialize_string_int32(&workflow->ndpi_serializer, "packet_event_id", event); @@ -1338,22 +1339,21 @@ static void jsonize_packet_event(struct nDPId_reader_thread * const reader_threa size_t base64_data_len = sizeof(base64_data); int base64_retval = base64encode(packet, header->caplen, base64_data, &base64_data_len); - if (ndpi_serialize_string_boolean(&workflow->ndpi_serializer, - "pkt_oversize", - base64_data_len > sizeof(base64_data)) != 0 || - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pkt_ts", header->ts.tv_sec) != 0 || - ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_len", header->len) != 0 || - (base64_retval == 0 && base64_data_len > 0 && - ndpi_serialize_string_binary(&workflow->ndpi_serializer, "pkt", base64_data, base64_data_len) != 0) || - ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_caplen", header->caplen) != 0 || - ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_type", pkt_type) != 0 || - ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_ipoffset", pkt_ipoffset) != 0 || - ndpi_serialize_string_int32(&workflow->ndpi_serializer, - "pkt_datalink", - pcap_datalink(reader_thread->workflow->pcap_handle)) != 0) + ndpi_serialize_string_boolean(&workflow->ndpi_serializer, "pkt_oversize", base64_data_len > sizeof(base64_data)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pkt_ts_sec", header->ts.tv_sec); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pkt_ts_usec", header->ts.tv_usec); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_caplen", header->caplen); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_type", pkt_type); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l3_offset", pkt_l3_offset); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_offset", pkt_l4_offset); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_len", header->len); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_len", pkt_l4_len); + + if (base64_retval == 0 && base64_data_len > 0 && + ndpi_serialize_string_binary(&workflow->ndpi_serializer, "pkt", base64_data, base64_data_len) != 0) { syslog(LOG_DAEMON | LOG_ERR, - "[%8llu, %d] JSON serialize buffer failed", + "[%8llu, %d] JSON serializing base64 packet buffer failed", reader_thread->workflow->packets_captured, reader_thread->array_index); } @@ -1385,9 +1385,15 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, { case FLOW_EVENT_INVALID: case FLOW_EVENT_COUNT: + break; + case FLOW_EVENT_NEW: case FLOW_EVENT_END: case FLOW_EVENT_IDLE: + ndpi_serialize_string_int32(&workflow->ndpi_serializer, + "flow_datalink", + pcap_datalink(reader_thread->workflow->pcap_handle)); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_max_packets", max_packets_per_flow_to_send); break; case FLOW_EVENT_NOT_DETECTED: @@ -1697,7 +1703,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre } else { - jsonize_packet_event(reader_thread, header, packet, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, UNKNOWN_DATALINK_LAYER, "%s%u%s%u", @@ -1713,7 +1719,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre case DLT_EN10MB: if (header->len < sizeof(struct ndpi_ethhdr)) { - jsonize_packet_event(reader_thread, header, packet, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, ETHERNET_PACKET_TOO_SHORT, NULL); return 1; } @@ -1726,7 +1732,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre if (header->len < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_iphdr)) { jsonize_packet_event( - reader_thread, header, packet, *layer3_type, *ip_offset, NULL, PACKET_EVENT_PAYLOAD); + reader_thread, header, packet, *layer3_type, *ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, IP4_PACKET_TOO_SHORT, NULL); return 1; } @@ -1735,7 +1741,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre if (header->len < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_ipv6hdr)) { jsonize_packet_event( - reader_thread, header, packet, *layer3_type, *ip_offset, NULL, PACKET_EVENT_PAYLOAD); + reader_thread, header, packet, *layer3_type, *ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, IP6_PACKET_TOO_SHORT, NULL); return 1; } @@ -1744,7 +1750,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre return 1; default: jsonize_packet_event( - reader_thread, header, packet, *layer3_type, *ip_offset, NULL, PACKET_EVENT_PAYLOAD); + reader_thread, header, packet, *layer3_type, *ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, ETHERNET_PACKET_UNKNOWN, "%s%u", "type", *layer3_type); return 1; } @@ -1758,7 +1764,7 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre *ip_offset = 0; break; default: - jsonize_packet_event(reader_thread, header, packet, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, UNKNOWN_DATALINK_LAYER, "%s%u", "datalink", datalink_type); return 1; } @@ -1867,7 +1873,7 @@ static void ndpi_process_packet(uint8_t * const args, } else { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type); return; } @@ -1877,7 +1883,7 @@ static void ndpi_process_packet(uint8_t * const args, { if (header->caplen < header->len) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, CAPTURE_SIZE_SMALLER_THAN_PACKET_SIZE, "%s%u %s%u", @@ -1893,7 +1899,7 @@ static void ndpi_process_packet(uint8_t * const args, { if (ip_size < sizeof(*ip)) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf( reader_thread, IP4_SIZE_SMALLER_THAN_HEADER, "%s%u %s%zu", "ip_size", ip_size, "expected", sizeof(*ip)); return; @@ -1904,7 +1910,7 @@ static void ndpi_process_packet(uint8_t * const args, if (ndpi_detection_get_l4( (uint8_t *)ip, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf( reader_thread, IP4_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip)); return; @@ -1919,7 +1925,7 @@ static void ndpi_process_packet(uint8_t * const args, { if (ip_size < sizeof(ip6->ip6_hdr)) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, IP6_SIZE_SMALLER_THAN_HEADER, "%s%u %s%zu", @@ -1934,7 +1940,7 @@ static void ndpi_process_packet(uint8_t * const args, if (ndpi_detection_get_l4( (uint8_t *)ip6, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf( reader_thread, IP6_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip)); return; @@ -1959,7 +1965,7 @@ static void ndpi_process_packet(uint8_t * const args, } else { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type); return; } @@ -1971,7 +1977,8 @@ static void ndpi_process_packet(uint8_t * const args, if (header->len < (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event( + reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, TCP_PACKET_TOO_SHORT, "%s%u %s%zu", @@ -1993,7 +2000,8 @@ static void ndpi_process_packet(uint8_t * const args, if (header->len < (l4_ptr - packet) + sizeof(struct ndpi_udphdr)) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event( + reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, UDP_PACKET_TOO_SHORT, "%s%u %s%zu", @@ -2094,7 +2102,15 @@ static void ndpi_process_packet(uint8_t * const args, { if (add_new_flow(workflow, &flow_basic, FT_SKIPPED, hashed_index) == NULL) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", @@ -2110,7 +2126,15 @@ static void ndpi_process_packet(uint8_t * const args, { if (add_new_flow(workflow, &flow_basic, FT_SKIPPED, hashed_index) == NULL) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", @@ -2123,7 +2147,8 @@ static void ndpi_process_packet(uint8_t * const args, if (workflow->cur_active_flows == workflow->max_active_flows) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event( + reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf(reader_thread, MAX_FLOW_TO_TRACK, "%s%llu %s%llu %s%llu", @@ -2139,7 +2164,8 @@ static void ndpi_process_packet(uint8_t * const args, flow_to_process = (struct nDPId_flow_info *)add_new_flow(workflow, &flow_basic, FT_INFO, hashed_index); if (flow_to_process == NULL) { - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_packet_event( + reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf( reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", "size", sizeof(*flow_to_process)); return; @@ -2216,7 +2242,15 @@ static void ndpi_process_packet(uint8_t * const args, jsonize_flow_event(reader_thread, flow_to_process, FLOW_EVENT_NEW); } - jsonize_packet_event(reader_thread, header, packet, type, ip_offset, flow_to_process, PACKET_EVENT_PAYLOAD_FLOW); + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + flow_to_process, + PACKET_EVENT_PAYLOAD_FLOW); /* We currently process max. 254 packets per flow. TODO: The user should decide this! */ if (flow_to_process->ndpi_flow.num_processed_pkts == 0xFF) |