aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile.old24
-rw-r--r--dependencies/nDPIsrvd.h303
-rw-r--r--examples/c-captured/c-captured.c13
-rw-r--r--examples/c-collectd/c-collectd.c17
-rwxr-xr-xexamples/py-flow-info/flow-info.py11
-rwxr-xr-xexamples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py11
-rwxr-xr-xexamples/py-json-stdout/json-stdout.py12
-rwxr-xr-xexamples/py-risky-flow-to-pcap/risky-flow-to-pcap.py11
-rwxr-xr-xexamples/py-schema-validation/py-schema-validation.py11
-rw-r--r--nDPId-test.c246
-rw-r--r--nDPId.c35
-rw-r--r--nDPIsrvd.c54
-rwxr-xr-xtest/run_tests.sh41
-rw-r--r--utils.c19
-rw-r--r--utils.h3
15 files changed, 537 insertions, 274 deletions
diff --git a/Makefile.old b/Makefile.old
index efc1da985..abe7e5339 100644
--- a/Makefile.old
+++ b/Makefile.old
@@ -65,7 +65,7 @@ RM = rm -f
INSTALL = install
ifeq ($(ENABLE_DEBUG),yes)
-INSTALL_ARGS = -s
+INSTALL_ARGS_STRIP = -s
endif
all: help nDPId nDPIsrvd nDPId-test examples
@@ -99,16 +99,20 @@ else
endif
install: all
- $(INSTALL) -d '$(DESTDIR)$(PREFIX)/bin' '$(DESTDIR)$(PREFIX)/sbin'
- $(INSTALL) $(INSTALL_ARGS) ./nDPId-test '$(DESTDIR)$(PREFIX)/bin'
- $(INSTALL) $(INSTALL_ARGS) ./nDPIsrvd '$(DESTDIR)$(PREFIX)/bin'
- $(INSTALL) $(INSTALL_ARGS) ./nDPId '$(DESTDIR)$(PREFIX)/sbin'
- $(INSTALL) $(INSTALL_ARGS) ./examples/c-captured/c-captured '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-captured'
- $(INSTALL) $(INSTALL_ARGS) ./examples/c-json-stdout/c-json-stdout '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-json-dump'
- $(INSTALL) $(INSTALL_ARGS) ./examples/c-collectd/c-collectd '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-collectd'
- $(INSTALL) $(INSTALL_ARGS) ./examples/py-flow-info/flow-info.py '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-flow-info.py'
+ $(INSTALL) -d \
+ '$(DESTDIR)$(PREFIX)/bin' \
+ '$(DESTDIR)$(PREFIX)/sbin' \
+ '$(DESTDIR)$(PREFIX)/share/nDPId'
+ $(INSTALL) $(INSTALL_ARGS_STRIP) ./nDPId-test '$(DESTDIR)$(PREFIX)/bin'
+ $(INSTALL) $(INSTALL_ARGS_STRIP) ./nDPIsrvd '$(DESTDIR)$(PREFIX)/bin'
+ $(INSTALL) $(INSTALL_ARGS_STRIP) ./nDPId '$(DESTDIR)$(PREFIX)/sbin'
+ $(INSTALL) $(INSTALL_ARGS_STRIP) ./examples/c-captured/c-captured '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-captured'
+ $(INSTALL) $(INSTALL_ARGS_STRIP) ./examples/c-json-stdout/c-json-stdout '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-json-dump'
+ $(INSTALL) $(INSTALL_ARGS_STRIP) ./examples/c-collectd/c-collectd '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-collectd'
+ $(INSTALL) ./dependencies/nDPIsrvd.py '$(DESTDIR)$(PREFIX)/share/nDPId/nDPIsrvd.py'
+ $(INSTALL) ./examples/py-flow-info/flow-info.py '$(DESTDIR)$(PREFIX)/bin/nDPIsrvd-flow-info.py'
ifneq ($(GOCC),)
- $(INSTALL) $(INSTALL_ARGS) -t '$(DESTDIR)$(PREFIX)/bin' examples/go-dashboard/go-dashboard
+ $(INSTALL) $(INSTALL_ARGS_STRIP) -t '$(DESTDIR)$(PREFIX)/bin' examples/go-dashboard/go-dashboard
endif
clean:
diff --git a/dependencies/nDPIsrvd.h b/dependencies/nDPIsrvd.h
index 756b10edf..f673a4d86 100644
--- a/dependencies/nDPIsrvd.h
+++ b/dependencies/nDPIsrvd.h
@@ -22,10 +22,12 @@
#define nDPIsrvd_FLOW_KEY_STRLEN 24
#define nDPIsrvd_JSON_KEY_STRLEN 32
-#define nDPIsrvd_STRLEN_SZ(s) (sizeof(s)/sizeof(s[0]) - sizeof(s[0]))
+#define nDPIsrvd_STRLEN_SZ(s) (sizeof(s) / sizeof(s[0]) - sizeof(s[0]))
#define TOKEN_GET_SZ(sock, key) token_get(sock, (char const *)key, nDPIsrvd_STRLEN_SZ(key))
-#define TOKEN_GET_VALUE_SZ(sock, key, value_length) token_get_value(sock, (char const *)key, nDPIsrvd_STRLEN_SZ(key), value_length)
-#define TOKEN_VALUE_EQUALS_SZ(token, string_to_check) token_value_equals(token, string_to_check, nDPIsrvd_STRLEN_SZ(string_to_check))
+#define TOKEN_GET_VALUE_SZ(sock, key, value_length) \
+ token_get_value(sock, (char const *)key, nDPIsrvd_STRLEN_SZ(key), value_length)
+#define TOKEN_VALUE_EQUALS_SZ(token, string_to_check) \
+ token_value_equals(token, string_to_check, nDPIsrvd_STRLEN_SZ(string_to_check))
#define TOKEN_VALUE_TO_ULL(token, value) token_value_to_ull(token, value)
#define FIRST_ENUM_VALUE 1
@@ -51,7 +53,8 @@ enum nDPIsrvd_read_return
enum nDPIsrvd_parse_return
{
- PARSE_OK = READ_LAST_ENUM_VALUE,
+ PARSE_OK = READ_LAST_ENUM_VALUE, /* can only be returned by nDPIsrvd_parse_line, not nDPIsrvd_parse_all */
+ PARSE_NEED_MORE_DATA, /* returned by nDPIsrvd_parse_line and nDPIsrvd_parse_all */
PARSE_INVALID_OPENING_CHAR,
PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT,
PARSE_SIZE_MISSING,
@@ -116,8 +119,7 @@ static inline void nDPIsrvd_uthash_free(void * const freeable, size_t const size
typedef enum nDPIsrvd_callback_return (*json_callback)(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_flow * const flow);
-typedef void (*flow_end_callback)(struct nDPIsrvd_socket * const sock,
- struct nDPIsrvd_flow * const flow);
+typedef void (*flow_end_callback)(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_flow * const flow);
struct nDPIsrvd_address
{
@@ -130,6 +132,22 @@ struct nDPIsrvd_address
};
};
+struct nDPIsrvd_buffer
+{
+ char raw[NETWORK_BUFFER_MAX_SIZE];
+ size_t used;
+ char * json_string;
+ size_t json_string_start;
+ nDPIsrvd_ull json_string_length;
+};
+
+struct nDPIsrvd_jsmn
+{
+ jsmn_parser parser;
+ jsmntok_t tokens[nDPIsrvd_MAX_JSON_TOKENS];
+ int tokens_found;
+};
+
struct nDPIsrvd_socket
{
int fd;
@@ -140,22 +158,8 @@ struct nDPIsrvd_socket
json_callback json_callback;
flow_end_callback flow_end_callback;
- struct
- {
- char raw[NETWORK_BUFFER_MAX_SIZE];
- size_t used;
- char * json_string;
- size_t json_string_start;
- nDPIsrvd_ull json_string_length;
- } buffer;
-
- /* jsmn JSON parser */
- struct
- {
- jsmn_parser parser;
- jsmntok_t tokens[nDPIsrvd_MAX_JSON_TOKENS];
- int tokens_found;
- } jsmn;
+ struct nDPIsrvd_buffer buffer;
+ struct nDPIsrvd_jsmn jsmn;
/* easy and fast JSON key/value access via hash table and a static array */
struct
@@ -246,36 +250,35 @@ static inline int nDPIsrvd_base64decode(char * in, size_t inLen, unsigned char *
static inline char const * nDPIsrvd_enum_to_string(int enum_value)
{
- static char const * const enum_str[LAST_ENUM_VALUE + 1] = {
- "CONNECT_OK",
- "CONNECT_ERROR_SOCKET",
- "CONNECT_ERROR",
-
- "READ_OK",
- "READ_PEER_DISCONNECT",
- "READ_ERROR",
-
- "PARSE_OK",
- "PARSE_INVALID_OPENING_CHAR",
- "PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT",
- "PARSE_SIZE_MISSING",
- "PARSE_STRING_TOO_BIG",
- "PARSE_INVALID_CLOSING_CHAR",
- "PARSE_JSMN_ERROR",
- "PARSE_JSON_CALLBACK_ERROR",
- "PARSE_JSON_MGMT_ERROR",
- "PARSE_FLOW_MGMT_ERROR",
-
- "CALLBACK_OK",
- "CALLBACK_ERROR",
-
- "CONVERSION_OK",
- "CONVERISON_KEY_NOT_FOUND",
- "CONVERSION_NOT_A_NUMBER",
- "CONVERSION_RANGE_EXCEEDED",
-
- [LAST_ENUM_VALUE] = "LAST_ENUM_VALUE"
- };
+ static char const * const enum_str[LAST_ENUM_VALUE + 1] = {"CONNECT_OK",
+ "CONNECT_ERROR_SOCKET",
+ "CONNECT_ERROR",
+
+ "READ_OK",
+ "READ_PEER_DISCONNECT",
+ "READ_ERROR",
+
+ "PARSE_OK",
+ "PARSE_NEED_MORE_DATA",
+ "PARSE_INVALID_OPENING_CHAR",
+ "PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT",
+ "PARSE_SIZE_MISSING",
+ "PARSE_STRING_TOO_BIG",
+ "PARSE_INVALID_CLOSING_CHAR",
+ "PARSE_JSMN_ERROR",
+ "PARSE_JSON_CALLBACK_ERROR",
+ "PARSE_JSON_MGMT_ERROR",
+ "PARSE_FLOW_MGMT_ERROR",
+
+ "CALLBACK_OK",
+ "CALLBACK_ERROR",
+
+ "CONVERSION_OK",
+ "CONVERISON_KEY_NOT_FOUND",
+ "CONVERSION_NOT_A_NUMBER",
+ "CONVERSION_RANGE_EXCEEDED",
+
+ [LAST_ENUM_VALUE] = "LAST_ENUM_VALUE"};
if (enum_value < FIRST_ENUM_VALUE || enum_value >= LAST_ENUM_VALUE)
{
@@ -355,7 +358,8 @@ static inline void nDPIsrvd_free(struct nDPIsrvd_socket ** const sock)
{
HASH_ITER(hh, (*sock)->flow_table, current_flow, ftmp)
{
- if ((*sock)->flow_end_callback != NULL) {
+ if ((*sock)->flow_end_callback != NULL)
+ {
(*sock)->flow_end_callback(*sock, current_flow);
}
HASH_DEL((*sock)->flow_table, current_flow);
@@ -377,14 +381,17 @@ static inline int nDPIsrvd_setup_address(struct nDPIsrvd_address * const address
memset(address, 0, sizeof(*address));
- if (last_colon == NULL) {
+ if (last_colon == NULL)
+ {
address->raw.sa_family = AF_UNIX;
address->size = sizeof(address->un);
if (snprintf(address->un.sun_path, sizeof(address->un.sun_path), "%s", destination) <= 0)
{
return 1;
}
- } else {
+ }
+ else
+ {
char addr_buf[INET6_ADDRSTRLEN];
char const * address_start = destination;
char const * address_end = last_colon;
@@ -401,7 +408,9 @@ static inline int nDPIsrvd_setup_address(struct nDPIsrvd_address * const address
{
return 1;
}
- } else {
+ }
+ else
+ {
address->raw.sa_family = AF_INET6;
address->size = sizeof(address->in6);
address->in6.sin6_port = htons(atoi(last_colon + 1));
@@ -486,13 +495,16 @@ static inline int jsmn_token_size(struct nDPIsrvd_socket const * const sock, int
return sock->jsmn.tokens[current_token_index].end - sock->jsmn.tokens[current_token_index].start;
}
-static inline int jsmn_token_is_jsmn_type(struct nDPIsrvd_socket const * const sock, int current_token_index, jsmntype_t type_to_check)
+static inline int jsmn_token_is_jsmn_type(struct nDPIsrvd_socket const * const sock,
+ int current_token_index,
+ jsmntype_t type_to_check)
{
return sock->jsmn.tokens[current_token_index].type == type_to_check;
}
-static inline struct nDPIsrvd_json_token const *
-token_get(struct nDPIsrvd_socket const * const sock, char const * const key, size_t key_length)
+static inline struct nDPIsrvd_json_token const * token_get(struct nDPIsrvd_socket const * const sock,
+ char const * const key,
+ size_t key_length)
{
struct nDPIsrvd_json_token * token = NULL;
@@ -505,8 +517,10 @@ token_get(struct nDPIsrvd_socket const * const sock, char const * const key, siz
return NULL;
}
-static inline char const *
-token_get_value(struct nDPIsrvd_socket const * const sock, char const * const key, size_t key_length, size_t * value_length)
+static inline char const * token_get_value(struct nDPIsrvd_socket const * const sock,
+ char const * const key,
+ size_t key_length,
+ size_t * value_length)
{
struct nDPIsrvd_json_token const * const token = token_get(sock, key, key_length);
if (token != NULL)
@@ -521,19 +535,20 @@ token_get_value(struct nDPIsrvd_socket const * const sock, char const * const ke
return NULL;
}
-static inline int token_value_equals(struct nDPIsrvd_json_token const * const token, char const * const value, size_t value_length)
+static inline int token_value_equals(struct nDPIsrvd_json_token const * const token,
+ char const * const value,
+ size_t value_length)
{
if (token == NULL)
{
return 0;
}
- return strncmp(token->value, value, token->value_length) == 0 &&
- token->value_length == (int)value_length;
+ return strncmp(token->value, value, token->value_length) == 0 && token->value_length == (int)value_length;
}
-static inline enum nDPIsrvd_conversion_return
-str_value_to_ull(char const * const value_as_string, nDPIsrvd_ull_ptr const value)
+static inline enum nDPIsrvd_conversion_return str_value_to_ull(char const * const value_as_string,
+ nDPIsrvd_ull_ptr const value)
{
char * endptr = NULL;
*value = strtoull(value_as_string, &endptr, 10);
@@ -550,8 +565,8 @@ str_value_to_ull(char const * const value_as_string, nDPIsrvd_ull_ptr const valu
return CONVERSION_OK;
}
-static inline enum nDPIsrvd_conversion_return
-token_value_to_ull(struct nDPIsrvd_json_token const * const token, nDPIsrvd_ull_ptr const value)
+static inline enum nDPIsrvd_conversion_return token_value_to_ull(struct nDPIsrvd_json_token const * const token,
+ nDPIsrvd_ull_ptr const value)
{
if (token == NULL)
{
@@ -569,10 +584,15 @@ static inline int nDPIsrvd_build_flow_key(struct nDPIsrvd_flow_key * const key,
return 1;
}
- if (snprintf(key->key, nDPIsrvd_FLOW_KEY_STRLEN, "%.*s-%.*s-%.*s",
- tokens[0]->value_length, tokens[0]->value,
- tokens[1]->value_length, tokens[1]->value,
- tokens[2]->value_length, tokens[2]->value) <= 0)
+ if (snprintf(key->key,
+ nDPIsrvd_FLOW_KEY_STRLEN,
+ "%.*s-%.*s-%.*s",
+ tokens[0]->value_length,
+ tokens[0]->value,
+ tokens[1]->value_length,
+ tokens[1]->value,
+ tokens[2]->value_length,
+ tokens[2]->value) <= 0)
{
return 1;
}
@@ -584,7 +604,9 @@ static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket *
struct nDPIsrvd_json_token const * const flow_id)
{
struct nDPIsrvd_json_token const * const tokens[nDPIsrvd_FLOW_KEY_TOKENS] = {
- flow_id, TOKEN_GET_SZ(sock, "alias"), TOKEN_GET_SZ(sock, "source"),
+ flow_id,
+ TOKEN_GET_SZ(sock, "alias"),
+ TOKEN_GET_SZ(sock, "source"),
};
struct nDPIsrvd_flow_key key = {};
@@ -612,7 +634,8 @@ static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket *
return flow;
}
-static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_flow * const current_flow)
+static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_flow * const current_flow)
{
if (current_flow == NULL)
{
@@ -621,10 +644,10 @@ static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock, s
struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name");
- if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle") != 0 &&
- TOKEN_VALUE_EQUALS_SZ(flow_event_name, "end") != 0)
+ if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle") != 0 && TOKEN_VALUE_EQUALS_SZ(flow_event_name, "end") != 0)
{
- if (sock->flow_end_callback != NULL) {
+ if (sock->flow_end_callback != NULL)
+ {
sock->flow_end_callback(sock, current_flow);
}
HASH_DEL(sock->flow_table, current_flow);
@@ -634,55 +657,72 @@ static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock, s
return 0;
}
-static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket * const sock)
+static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_line(struct nDPIsrvd_buffer * const buffer,
+ struct nDPIsrvd_jsmn * const jsmn)
{
- enum nDPIsrvd_parse_return ret = PARSE_OK;
+ if (buffer->used < NETWORK_BUFFER_LENGTH_DIGITS + 1)
+ {
+ return PARSE_NEED_MORE_DATA;
+ }
+ if (buffer->raw[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
+ {
+ return PARSE_INVALID_OPENING_CHAR;
+ }
- while (sock->buffer.used >= NETWORK_BUFFER_LENGTH_DIGITS + 1)
+ errno = 0;
+ buffer->json_string_length = strtoull((const char *)buffer->raw, &buffer->json_string, 10);
+ buffer->json_string_length += buffer->json_string - buffer->raw;
+ buffer->json_string_start = buffer->json_string - buffer->raw;
+
+ if (errno == ERANGE)
{
- if (sock->buffer.raw[NETWORK_BUFFER_LENGTH_DIGITS] != '{')
- {
- return PARSE_INVALID_OPENING_CHAR;
- }
+ return PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT;
+ }
+ if (buffer->json_string == buffer->raw)
+ {
+ return PARSE_SIZE_MISSING;
+ }
+ if (buffer->json_string_length > sizeof(buffer->raw))
+ {
+ return PARSE_STRING_TOO_BIG;
+ }
+ if (buffer->json_string_length > buffer->used)
+ {
+ return PARSE_NEED_MORE_DATA;
+ }
+ if (buffer->raw[buffer->json_string_length - 2] != '}' || buffer->raw[buffer->json_string_length - 1] != '\n')
+ {
+ return PARSE_INVALID_CLOSING_CHAR;
+ }
- errno = 0;
- sock->buffer.json_string_length = strtoull((const char *)sock->buffer.raw, &sock->buffer.json_string, 10);
- sock->buffer.json_string_length += sock->buffer.json_string - sock->buffer.raw;
- sock->buffer.json_string_start = sock->buffer.json_string - sock->buffer.raw;
+ jsmn_init(&jsmn->parser);
+ jsmn->tokens_found = jsmn_parse(&jsmn->parser,
+ (char *)(buffer->raw + buffer->json_string_start),
+ buffer->json_string_length - buffer->json_string_start,
+ jsmn->tokens,
+ nDPIsrvd_MAX_JSON_TOKENS);
+ if (jsmn->tokens_found < 0 || jsmn->tokens[0].type != JSMN_OBJECT)
+ {
+ return PARSE_JSMN_ERROR;
+ }
- if (errno == ERANGE)
- {
- return PARSE_SIZE_EXCEEDS_CONVERSION_LIMIT;
- }
- if (sock->buffer.json_string == sock->buffer.raw)
- {
- return PARSE_SIZE_MISSING;
- }
- if (sock->buffer.json_string_length > sizeof(sock->buffer.raw))
- {
- return PARSE_STRING_TOO_BIG;
- }
- if (sock->buffer.json_string_length > sock->buffer.used)
- {
- break;
- }
+ return PARSE_OK;
+}
- if (sock->buffer.raw[sock->buffer.json_string_length - 2] != '}' ||
- sock->buffer.raw[sock->buffer.json_string_length - 1] != '\n')
- {
- return PARSE_INVALID_CLOSING_CHAR;
- }
+static void nDPIsrvd_drain_buffer(struct nDPIsrvd_buffer * const buffer)
+{
+ memmove(buffer->raw, buffer->raw + buffer->json_string_length, buffer->used - buffer->json_string_length);
+ buffer->used -= buffer->json_string_length;
+ buffer->json_string_length = 0;
+ buffer->json_string_start = 0;
+}
- jsmn_init(&sock->jsmn.parser);
- sock->jsmn.tokens_found = jsmn_parse(&sock->jsmn.parser,
- (char *)(sock->buffer.raw + sock->buffer.json_string_start),
- sock->buffer.json_string_length - sock->buffer.json_string_start,
- sock->jsmn.tokens, nDPIsrvd_MAX_JSON_TOKENS);
- if (sock->jsmn.tokens_found < 0 || sock->jsmn.tokens[0].type != JSMN_OBJECT)
- {
- return PARSE_JSMN_ERROR;
- }
+static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_socket * const sock)
+{
+ enum nDPIsrvd_parse_return ret;
+ while ((ret = nDPIsrvd_parse_line(&sock->buffer, &sock->jsmn)) == PARSE_OK)
+ {
char const * key = NULL;
int key_length = 0;
for (int current_token = 1; current_token < sock->jsmn.tokens_found; current_token++)
@@ -713,12 +753,12 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket *
{
token->value = jsmn_token_get(sock, current_token);
token->value_length = jsmn_token_size(sock, current_token);
- } else {
- struct nDPIsrvd_json_token jt = {
- .value = jsmn_token_get(sock, current_token),
- .value_length = jsmn_token_size(sock, current_token),
- .hh = {}
- };
+ }
+ else
+ {
+ struct nDPIsrvd_json_token jt = {.value = jsmn_token_get(sock, current_token),
+ .value_length = jsmn_token_size(sock, current_token),
+ .hh = {}};
if (key == NULL || key_length > nDPIsrvd_JSON_KEY_STRLEN ||
utarray_len(sock->json.tokens) == nDPIsrvd_MAX_JSON_TOKENS)
@@ -728,9 +768,10 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket *
}
jt.key_length = key_length;
- snprintf(jt.key, nDPIsrvd_JSON_KEY_STRLEN, "%.*s", key_length, key);
+ snprintf(jt.key, nDPIsrvd_JSON_KEY_STRLEN, "%.*s", key_length, key);
utarray_push_back(sock->json.tokens, &jt);
- HASH_ADD_STR(sock->json.token_table, key,
+ HASH_ADD_STR(sock->json.token_table,
+ key,
(struct nDPIsrvd_json_token *)utarray_back(sock->json.tokens));
}
@@ -749,8 +790,7 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket *
ret = PARSE_FLOW_MGMT_ERROR;
}
}
- if (ret == PARSE_OK &&
- sock->json_callback(sock, flow) != CALLBACK_OK)
+ if (ret == PARSE_OK && sock->json_callback(sock, flow) != CALLBACK_OK)
{
ret = PARSE_JSON_CALLBACK_ERROR;
}
@@ -771,12 +811,7 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse(struct nDPIsrvd_socket *
}
}
- memmove(sock->buffer.raw,
- sock->buffer.raw + sock->buffer.json_string_length,
- sock->buffer.used - sock->buffer.json_string_length);
- sock->buffer.used -= sock->buffer.json_string_length;
- sock->buffer.json_string_length = 0;
- sock->buffer.json_string_start = 0;
+ nDPIsrvd_drain_buffer(&sock->buffer);
}
return ret;
diff --git a/examples/c-captured/c-captured.c b/examples/c-captured/c-captured.c
index b5f7646a3..a427a38fc 100644
--- a/examples/c-captured/c-captured.c
+++ b/examples/c-captured/c-captured.c
@@ -560,6 +560,14 @@ static int parse_options(int argc, char ** argv)
}
errno = 0;
+ if (datadir[0] != '/')
+ {
+ fprintf(stderr,
+ "%s: PCAP capture directory must be absolut i.e. starting with `/', path given: `%s'\n",
+ argv[0],
+ datadir);
+ return 1;
+ }
if (mkdir(datadir, S_IRWXU) != 0 && errno != EEXIST)
{
fprintf(stderr, "%s: Could not create directory %s: %s\n", argv[0], datadir, strerror(errno));
@@ -581,8 +589,8 @@ static int mainloop(void)
return 1;
}
- enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse(sock);
- if (parse_ret != PARSE_OK)
+ enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse_all(sock);
+ if (parse_ret != PARSE_NEED_MORE_DATA)
{
syslog(LOG_DAEMON | LOG_ERR, "nDPIsrvd parse failed with: %s", nDPIsrvd_enum_to_string(parse_ret));
return 1;
@@ -645,6 +653,7 @@ int main(int argc, char ** argv)
int retval = mainloop();
nDPIsrvd_free(&sock);
+ daemonize_shutdown(pidfile);
closelog();
return retval;
diff --git a/examples/c-collectd/c-collectd.c b/examples/c-collectd/c-collectd.c
index 291676817..42e8b6020 100644
--- a/examples/c-collectd/c-collectd.c
+++ b/examples/c-collectd/c-collectd.c
@@ -89,6 +89,7 @@ static struct
uint64_t flow_l3_other_count;
uint64_t flow_l4_tcp_count;
uint64_t flow_l4_udp_count;
+ uint64_t flow_l4_icmp_count;
uint64_t flow_l4_other_count;
} collectd_statistics = {};
@@ -313,13 +314,15 @@ static void print_collectd_exec_output(void)
printf(COLLECTD_PUTVAL_N_FORMAT(flow_l3_ip4_count) COLLECTD_PUTVAL_N_FORMAT(flow_l3_ip6_count)
COLLECTD_PUTVAL_N_FORMAT(flow_l3_other_count) COLLECTD_PUTVAL_N_FORMAT(flow_l4_tcp_count)
- COLLECTD_PUTVAL_N_FORMAT(flow_l4_udp_count) COLLECTD_PUTVAL_N_FORMAT(flow_l4_other_count),
+ COLLECTD_PUTVAL_N_FORMAT(flow_l4_udp_count) COLLECTD_PUTVAL_N_FORMAT(flow_l4_icmp_count)
+ COLLECTD_PUTVAL_N_FORMAT(flow_l4_other_count),
COLLECTD_PUTVAL_N(flow_l3_ip4_count),
COLLECTD_PUTVAL_N(flow_l3_ip6_count),
COLLECTD_PUTVAL_N(flow_l3_other_count),
COLLECTD_PUTVAL_N(flow_l4_tcp_count),
COLLECTD_PUTVAL_N(flow_l4_udp_count),
+ COLLECTD_PUTVAL_N(flow_l4_icmp_count),
COLLECTD_PUTVAL_N(flow_l4_other_count));
memset(&collectd_statistics, 0, sizeof(collectd_statistics));
@@ -370,8 +373,8 @@ static int mainloop(int epollfd)
return 1;
}
- enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse(sock);
- if (parse_ret != PARSE_OK)
+ enum nDPIsrvd_parse_return parse_ret = nDPIsrvd_parse_all(sock);
+ if (parse_ret != PARSE_NEED_MORE_DATA)
{
LOG(LOG_DAEMON | LOG_ERR, "nDPIsrvd parse failed with: %s", nDPIsrvd_enum_to_string(parse_ret));
return 1;
@@ -424,14 +427,18 @@ static enum nDPIsrvd_callback_return captured_json_callback(struct nDPIsrvd_sock
}
struct nDPIsrvd_json_token const * const l4_proto = TOKEN_GET_SZ(sock, "l4_proto");
- if (TOKEN_VALUE_EQUALS_SZ(l3_proto, "tcp") != 0)
+ if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "tcp") != 0)
{
collectd_statistics.flow_l4_tcp_count++;
}
- else if (TOKEN_VALUE_EQUALS_SZ(l3_proto, "tcp") != 0)
+ else if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "udp") != 0)
{
collectd_statistics.flow_l4_udp_count++;
}
+ else if (TOKEN_VALUE_EQUALS_SZ(l4_proto, "icmp") != 0)
+ {
+ collectd_statistics.flow_l4_icmp_count++;
+ }
else if (l4_proto != NULL)
{
collectd_statistics.flow_l4_other_count++;
diff --git a/examples/py-flow-info/flow-info.py b/examples/py-flow-info/flow-info.py
index f9b71df91..3b11a03b7 100755
--- a/examples/py-flow-info/flow-info.py
+++ b/examples/py-flow-info/flow-info.py
@@ -3,9 +3,14 @@
import os
import sys
-sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
-import nDPIsrvd
-from nDPIsrvd import nDPIsrvdSocket, TermColor
+sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
+try:
+ import nDPIsrvd
+ from nDPIsrvd import nDPIsrvdSocket, TermColor
+except ModuleNotFoundError:
+ sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
+ import nDPIsrvd
+ from nDPIsrvd import nDPIsrvdSocket, TermColor
global args
diff --git a/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py b/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py
index 961adc3cf..c25a7601f 100755
--- a/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py
+++ b/examples/py-flow-undetected-to-pcap/flow-undetected-to-pcap.py
@@ -3,9 +3,14 @@
import os
import sys
-sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
-import nDPIsrvd
-from nDPIsrvd import TermColor, nDPIsrvdSocket, PcapPacket
+sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
+try:
+ import nDPIsrvd
+ from nDPIsrvd import nDPIsrvdSocket, TermColor
+except ModuleNotFoundError:
+ sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
+ import nDPIsrvd
+ from nDPIsrvd import nDPIsrvdSocket, TermColor
def onJsonLineRecvd(json_dict, current_flow, global_user_data):
if current_flow is None:
diff --git a/examples/py-json-stdout/json-stdout.py b/examples/py-json-stdout/json-stdout.py
index 9f58d161e..a14447745 100755
--- a/examples/py-json-stdout/json-stdout.py
+++ b/examples/py-json-stdout/json-stdout.py
@@ -3,10 +3,14 @@
import os
import sys
-sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
-import nDPIsrvd
-from nDPIsrvd import nDPIsrvdSocket, TermColor
-
+sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
+try:
+ import nDPIsrvd
+ from nDPIsrvd import nDPIsrvdSocket, TermColor
+except ModuleNotFoundError:
+ sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
+ import nDPIsrvd
+ from nDPIsrvd import nDPIsrvdSocket, TermColor
def onJsonLineRecvd(json_dict, current_flow, global_user_data):
print(json_dict)
diff --git a/examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py b/examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py
index 33c0be810..11165ed35 100755
--- a/examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py
+++ b/examples/py-risky-flow-to-pcap/risky-flow-to-pcap.py
@@ -4,9 +4,14 @@ import base64
import os
import sys
-sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
-import nDPIsrvd
-from nDPIsrvd import TermColor, nDPIsrvdSocket, PcapPacket
+sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
+try:
+ import nDPIsrvd
+ from nDPIsrvd import nDPIsrvdSocket, TermColor
+except ModuleNotFoundError:
+ sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
+ import nDPIsrvd
+ from nDPIsrvd import nDPIsrvdSocket, TermColor
def onJsonLineRecvd(json_dict, current_flow, global_user_data):
if current_flow is None:
diff --git a/examples/py-schema-validation/py-schema-validation.py b/examples/py-schema-validation/py-schema-validation.py
index ca269e0c3..583612516 100755
--- a/examples/py-schema-validation/py-schema-validation.py
+++ b/examples/py-schema-validation/py-schema-validation.py
@@ -3,9 +3,14 @@
import os
import sys
-sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
-import nDPIsrvd
-from nDPIsrvd import nDPIsrvdSocket, TermColor
+sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
+try:
+ import nDPIsrvd
+ from nDPIsrvd import nDPIsrvdSocket, TermColor
+except ModuleNotFoundError:
+ sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
+ import nDPIsrvd
+ from nDPIsrvd import nDPIsrvdSocket, TermColor
class Stats:
lines_processed = 0
diff --git a/nDPId-test.c b/nDPId-test.c
index afe77adf7..528109a2a 100644
--- a/nDPId-test.c
+++ b/nDPId-test.c
@@ -16,11 +16,29 @@ enum
PIPE_COUNT = 2
};
-static int epollfd = -1;
+struct thread_return_value
+{
+ int val;
+};
+
static int mock_pipefds[PIPE_COUNT] = {};
static int mock_servfds[PIPE_COUNT] = {};
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
+#define MAX_REMOTE_DESCRIPTORS 2
+
+#define THREAD_ERROR(thread_arg) \
+ do \
+ { \
+ ((struct thread_return_value *)thread_arg)->val = 1; \
+ } while (0);
+#define THREAD_ERROR_GOTO(thread_arg) \
+ do \
+ { \
+ THREAD_ERROR(thread_arg); \
+ goto error; \
+ } while (0);
+
void mock_syslog_stderr(int p, const char * format, ...)
{
va_list ap;
@@ -47,98 +65,158 @@ static int setup_pipe(int pipefd[PIPE_COUNT])
static void * nDPIsrvd_mainloop_thread(void * const arg)
{
(void)arg;
+ int epollfd = create_evq();
struct remote_desc * mock_json_desc = NULL;
struct remote_desc * mock_serv_desc = NULL;
+ struct epoll_event events[32];
+ size_t const events_size = sizeof(events) / sizeof(events[0]);
+
+ if (epollfd < 0)
+ {
+ THREAD_ERROR_GOTO(arg);
+ }
mock_json_desc = get_unused_remote_descriptor(JSON_SOCK, mock_pipefds[PIPE_nDPIsrvd]);
if (mock_json_desc == NULL)
{
- goto error;
+ THREAD_ERROR_GOTO(arg);
}
mock_serv_desc = get_unused_remote_descriptor(SERV_SOCK, mock_servfds[PIPE_WRITE]);
if (mock_serv_desc == NULL)
{
- goto error;
+ THREAD_ERROR_GOTO(arg);
}
strncpy(mock_serv_desc->event_serv.peer_addr, "0.0.0.0", sizeof(mock_serv_desc->event_serv.peer_addr));
mock_serv_desc->event_serv.peer.sin_port = 0;
if (add_event(epollfd, mock_pipefds[PIPE_nDPIsrvd], mock_json_desc) != 0)
{
- goto error;
+ THREAD_ERROR_GOTO(arg);
}
if (add_event(epollfd, mock_servfds[PIPE_WRITE], mock_serv_desc) != 0)
{
- goto error;
+ THREAD_ERROR_GOTO(arg);
}
- if (mainloop(epollfd) != 0)
+ while (1)
{
- goto error;
- }
+ int nready = epoll_wait(epollfd, events, events_size, -1);
+
+ if (nready < 0)
+ {
+ THREAD_ERROR_GOTO(arg);
+ }
- while (handle_incoming_data(epollfd, mock_json_desc) == 0) {}
+ for (int i = 0; i < nready; i++)
+ {
+ if (events[i].data.ptr == mock_json_desc)
+ {
+ if (handle_incoming_data_event(epollfd, &events[i]) != 0)
+ {
+ goto error;
+ }
+ }
+ else
+ {
+ THREAD_ERROR_GOTO(arg);
+ }
+ }
+ }
error:
+ del_event(epollfd, mock_pipefds[PIPE_nDPIsrvd]);
+ del_event(epollfd, mock_servfds[PIPE_WRITE]);
+ close(mock_pipefds[PIPE_nDPIsrvd]);
close(mock_servfds[PIPE_WRITE]);
+ close(epollfd);
return NULL;
}
-static void * distributor_mainloop_thread(void * const arg)
+static enum nDPIsrvd_parse_return parse_json_lines(struct io_buffer * const buffer)
{
- char buf[NETWORK_BUFFER_MAX_SIZE];
+ struct nDPIsrvd_buffer buf = {};
+ struct nDPIsrvd_jsmn jsmn = {};
+ size_t const n = (buffer->used > sizeof(buf.raw) ? sizeof(buf.raw) : buffer->used);
- (void)arg;
+ if (n > NETWORK_BUFFER_MAX_SIZE)
+ {
+ return PARSE_STRING_TOO_BIG;
+ }
+
+ memcpy(buf.raw, buffer->ptr, n);
+ buf.used = buffer->used;
+
+ enum nDPIsrvd_parse_return ret;
+ while ((ret = nDPIsrvd_parse_line(&buf, &jsmn)) == PARSE_OK)
+ {
+ if (jsmn.tokens_found == 0)
+ {
+ return PARSE_JSMN_ERROR;
+ }
+ nDPIsrvd_drain_buffer(&buf);
+ }
+
+ memcpy(buffer->ptr, buf.raw, buf.used);
+ buffer->used = buf.used;
- int dis_thread_shutdown = 0;
+ return ret;
+}
+
+static void * distributor_client_mainloop_thread(void * const arg)
+{
+ struct io_buffer client_buffer = {.ptr = (uint8_t *)malloc(NETWORK_BUFFER_MAX_SIZE),
+ .max = NETWORK_BUFFER_MAX_SIZE,
+ .used = 0};
int dis_epollfd = create_evq();
int signalfd = setup_signalfd(dis_epollfd);
-
struct epoll_event events[32];
size_t const events_size = sizeof(events) / sizeof(events[0]);
- if (dis_epollfd < 0)
+ if (client_buffer.ptr == NULL || dis_epollfd < 0 || signalfd < 0)
{
- goto error;
+ THREAD_ERROR_GOTO(arg);
}
if (add_event(dis_epollfd, mock_servfds[PIPE_READ], NULL) != 0)
{
- goto error;
- }
- if (signalfd < 0)
- {
- goto error;
+ THREAD_ERROR_GOTO(arg);
}
- while (dis_thread_shutdown == 0)
+ while (1)
{
int nready = epoll_wait(dis_epollfd, events, events_size, -1);
for (int i = 0; i < nready; i++)
{
- if ((events[i].events & EPOLLERR) != 0)
- {
- dis_thread_shutdown = 1;
- break;
- }
- if ((events[i].events & EPOLLIN) == 0)
+ if ((events[i].events & EPOLLIN) == 0 && (events[i].events & EPOLLHUP) == 0)
{
- dis_thread_shutdown = 1;
- break;
+ THREAD_ERROR_GOTO(arg);
}
if (events[i].data.fd == mock_servfds[PIPE_READ])
{
- ssize_t bytes_read = read(mock_servfds[PIPE_READ], buf, sizeof(buf));
- if (bytes_read <= 0)
+ ssize_t bytes_read = read(mock_servfds[PIPE_READ],
+ client_buffer.ptr + client_buffer.used,
+ client_buffer.max - client_buffer.used);
+ if (bytes_read == 0)
{
- dis_thread_shutdown = 1;
- break;
+ goto error;
+ }
+ else if (bytes_read < 0)
+ {
+ THREAD_ERROR_GOTO(arg);
+ }
+ printf("%.*s", (int)bytes_read, client_buffer.ptr + client_buffer.used);
+ client_buffer.used += bytes_read;
+
+ enum nDPIsrvd_parse_return parse_ret = parse_json_lines(&client_buffer);
+ if (parse_ret != PARSE_NEED_MORE_DATA)
+ {
+ fprintf(stderr, "JSON parsing failed: %s\n", nDPIsrvd_enum_to_string(parse_ret));
+ THREAD_ERROR(arg);
}
- printf("%.*s", (int)bytes_read, buf);
}
else if (events[i].data.fd == signalfd)
{
@@ -148,45 +226,38 @@ static void * distributor_mainloop_thread(void * const arg)
s = read(signalfd, &fdsi, sizeof(struct signalfd_siginfo));
if (s != sizeof(struct signalfd_siginfo))
{
- dis_thread_shutdown = 1;
- break;
+ THREAD_ERROR(arg);
}
if (fdsi.ssi_signo == SIGINT || fdsi.ssi_signo == SIGTERM || fdsi.ssi_signo == SIGQUIT)
{
- dis_thread_shutdown = 1;
- break;
+ fprintf(stderr, "Got signal %d, abort.\n", fdsi.ssi_signo);
+ THREAD_ERROR(arg);
}
}
else
{
- dis_thread_shutdown = 1;
- break;
+ THREAD_ERROR(arg);
}
}
}
- ssize_t bytes_read;
- while ((bytes_read = read(mock_servfds[PIPE_READ], buf, sizeof(buf))) > 0)
- {
- printf("%.*s", (int)bytes_read, buf);
- }
error:
del_event(dis_epollfd, signalfd);
del_event(dis_epollfd, mock_servfds[PIPE_READ]);
close(dis_epollfd);
close(signalfd);
+ free(client_buffer.ptr);
return NULL;
}
static void * nDPId_mainloop_thread(void * const arg)
{
- (void)arg;
-
if (setup_reader_threads() != 0)
{
- exit(EXIT_FAILURE);
+ THREAD_ERROR(arg);
+ return NULL;
}
/* Replace nDPId JSON socket fd with the one in our pipe and hope that no socket specific code-path triggered. */
@@ -203,9 +274,33 @@ static void * nDPId_mainloop_thread(void * const arg)
static void usage(char const * const arg0)
{
- printf("usage: %s [path-to-pcap-file]\n", arg0);
+ fprintf(stderr, "usage: %s [path-to-pcap-file]\n", arg0);
+}
+
+static int thread_wait_for_termination(pthread_t thread, time_t wait_time_secs, struct thread_return_value * const trv)
+{
+ struct timespec ts;
+
+ if (clock_gettime(CLOCK_REALTIME, &ts) == -1)
+ {
+ return -1;
+ }
+
+ ts.tv_sec += wait_time_secs;
+ int err = pthread_timedjoin_np(thread, (void **)&trv, &ts);
+
+ switch (err)
+ {
+ case EBUSY:
+ return 0;
+ case ETIMEDOUT:
+ return 0;
+ }
+
+ return 1;
}
+#define THREADS_RETURNED_ERROR() (nDPId_return.val != 0 || nDPIsrvd_return.val != 0 || distributor_return.val != 0)
int main(int argc, char ** argv)
{
if (argc != 2)
@@ -214,6 +309,11 @@ int main(int argc, char ** argv)
return -1;
}
+ if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+ {
+ return -1;
+ }
+
nDPId_options.reader_thread_count = 1; /* Please do not change this! Generating meaningful pcap diff's relies on a
single reader thread! */
nDPId_options.instance_alias = strdup("nDPId-test");
@@ -232,53 +332,57 @@ int main(int argc, char ** argv)
json_sockfd = -1;
serv_sockfd = -1;
- if (setup_remote_descriptors(2) != 0)
- {
- return -1;
- }
-
- epollfd = create_evq();
- if (epollfd < 0)
+ if (setup_remote_descriptors(MAX_REMOTE_DESCRIPTORS) != 0)
{
return -1;
}
pthread_t nDPId_thread;
- if (pthread_create(&nDPId_thread, NULL, nDPId_mainloop_thread, NULL) != 0)
+ struct thread_return_value nDPId_return = {};
+ if (pthread_create(&nDPId_thread, NULL, nDPId_mainloop_thread, &nDPId_return) != 0)
{
return -1;
}
pthread_t nDPIsrvd_thread;
- if (pthread_create(&nDPIsrvd_thread, NULL, nDPIsrvd_mainloop_thread, NULL) != 0)
+ struct thread_return_value nDPIsrvd_return = {};
+ if (pthread_create(&nDPIsrvd_thread, NULL, nDPIsrvd_mainloop_thread, &nDPIsrvd_return) != 0)
{
return -1;
}
pthread_t distributor_thread;
- if (pthread_create(&distributor_thread, NULL, distributor_mainloop_thread, NULL) != 0)
+ struct thread_return_value distributor_return = {};
+ if (pthread_create(&distributor_thread, NULL, distributor_client_mainloop_thread, &distributor_return) != 0)
{
return -1;
}
- if (pthread_join(nDPId_thread, NULL) != 0)
+ /* Try to gracefully shutdown all threads. */
+
+ while (thread_wait_for_termination(distributor_thread, 1, &distributor_return) == 0)
{
- return -1;
+ if (THREADS_RETURNED_ERROR() != 0)
+ {
+ return -1;
+ }
}
- pthread_kill(nDPIsrvd_thread, SIGINT);
-
- if (pthread_join(nDPIsrvd_thread, NULL) != 0)
+ while (thread_wait_for_termination(nDPId_thread, 1, &nDPId_return) == 0)
{
- return -1;
+ if (THREADS_RETURNED_ERROR() != 0)
+ {
+ return -1;
+ }
}
- pthread_kill(distributor_thread, SIGINT);
-
- if (pthread_join(distributor_thread, NULL) != 0)
+ while (thread_wait_for_termination(nDPIsrvd_thread, 1, &nDPIsrvd_return) == 0)
{
- return -1;
+ if (THREADS_RETURNED_ERROR() != 0)
+ {
+ return -1;
+ }
}
- return 0;
+ return THREADS_RETURNED_ERROR();
}
diff --git a/nDPId.c b/nDPId.c
index 6a087bf7c..c75116094 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -1532,11 +1532,13 @@ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread,
serialize_and_send(reader_thread);
}
-static void jsonize_format_error(struct nDPId_reader_thread * const reader_thread, uint32_t format_index)
+static void internal_format_error(ndpi_serializer * const serializer, char const * const format, uint32_t format_index)
{
- ndpi_serialize_string_string(&reader_thread->workflow->ndpi_serializer, "serializer-error", "format");
- ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer, "serializer-format-index", format_index);
- serialize_and_send(reader_thread);
+ syslog(LOG_DAEMON | LOG_ERR,
+ "BUG: Internal error detected for format string `%s' at format index %u",
+ format,
+ format_index);
+ ndpi_reset_serializer(serializer);
}
static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thread, char const * format, va_list ap)
@@ -1582,7 +1584,7 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
}
else
{
- jsonize_format_error(reader_thread, format_index);
+ internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index);
return;
}
break;
@@ -1592,7 +1594,7 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
format_index++;
if (got_jsonkey != 1)
{
- jsonize_format_error(reader_thread, format_index);
+ internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index);
return;
}
if (*format == 'l')
@@ -1634,7 +1636,7 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
}
else
{
- jsonize_format_error(reader_thread, format_index);
+ internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index);
return;
}
format++;
@@ -1649,7 +1651,7 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
}
else
{
- jsonize_format_error(reader_thread, format_index);
+ internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index);
return;
}
break;
@@ -1663,16 +1665,17 @@ static void vjsonize_basic_eventf(struct nDPId_reader_thread * const reader_thre
}
else
{
- jsonize_format_error(reader_thread, format_index);
+ internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index);
return;
}
break;
+ /* format string separators */
case ' ':
case ',':
case '%':
break;
default:
- jsonize_format_error(reader_thread, format_index);
+ internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index);
return;
}
}
@@ -2202,9 +2205,9 @@ static void ndpi_process_packet(uint8_t * const args,
if (tree_result == NULL)
{
- /* flow still not found, must be new */
+ /* flow still not found, must be new or midstream */
- if (nDPId_options.process_internal_initial_direction != 0)
+ if (nDPId_options.process_internal_initial_direction != 0 && flow_basic.tcp_is_midstream_flow == 0)
{
if (is_ip_in_subnet(&flow_basic.src,
&nDPId_options.pcap_dev_netmask,
@@ -2231,7 +2234,7 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
}
- else if (nDPId_options.process_external_initial_direction != 0)
+ else if (nDPId_options.process_external_initial_direction != 0 && flow_basic.tcp_is_midstream_flow == 0)
{
if (is_ip_in_subnet(&flow_basic.src,
&nDPId_options.pcap_dev_netmask,
@@ -2761,7 +2764,7 @@ static int nDPId_parse_options(int argc, char ** argv)
"[-d] [-p pidfile]\n"
"\t \t"
"[-u user] [-g group] "
- "[-P path] [-C path] "
+ "[-P path] [-C path] [-J path] "
"[-a instance-alias] "
"[-o subopt=value]\n\n"
"\t-i\tInterface or file from where to read packets from.\n"
@@ -2932,6 +2935,10 @@ static int validate_options(char const * const arg0)
{
int retval = 0;
+ if (is_path_absolute("JSON socket", nDPId_options.json_sockpath) != 0)
+ {
+ retval = 1;
+ }
if (nDPId_options.instance_alias == NULL)
{
char hname[256];
diff --git a/nDPIsrvd.c b/nDPIsrvd.c
index 105fec2b0..f687a29e6 100644
--- a/nDPIsrvd.c
+++ b/nDPIsrvd.c
@@ -277,11 +277,19 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
{
nDPIsrvd_options.pidfile = strdup(nDPIsrvd_PIDFILE);
}
+ if (is_path_absolute("Pidfile", nDPIsrvd_options.pidfile) != 0)
+ {
+ return 1;
+ }
if (nDPIsrvd_options.json_sockpath == NULL)
{
nDPIsrvd_options.json_sockpath = strdup(COLLECTOR_UNIX_SOCKET);
}
+ if (is_path_absolute("JSON socket", nDPIsrvd_options.json_sockpath) != 0)
+ {
+ return 1;
+ }
if (nDPIsrvd_options.serv_optarg == NULL)
{
@@ -293,6 +301,10 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
fprintf(stderr, "%s: Could not parse address `%s'\n", argv[0], nDPIsrvd_options.serv_optarg);
return 1;
}
+ if (serv_address.raw.sa_family == AF_UNIX && is_path_absolute("SERV socket", nDPIsrvd_options.serv_optarg) != 0)
+ {
+ return 1;
+ }
if (optind < argc)
{
@@ -303,6 +315,28 @@ static int nDPIsrvd_parse_options(int argc, char ** argv)
return 0;
}
+static struct remote_desc * accept_remote(int server_fd,
+ enum sock_type socktype,
+ struct sockaddr * const sockaddr,
+ socklen_t * const addrlen)
+{
+ int client_fd = accept(server_fd, sockaddr, addrlen);
+ if (client_fd < 0)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno));
+ return NULL;
+ }
+
+ struct remote_desc * current = get_unused_remote_descriptor(socktype, client_fd);
+ if (current == NULL)
+ {
+ syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used);
+ return NULL;
+ }
+
+ return current;
+}
+
static int new_connection(int epollfd, int eventfd)
{
union {
@@ -330,17 +364,9 @@ static int new_connection(int epollfd, int eventfd)
return 1;
}
- int client_fd = accept(server_fd, (struct sockaddr *)&sockaddr, &peer_addr_len);
- if (client_fd < 0)
- {
- syslog(LOG_DAEMON | LOG_ERR, "Accept failed: %s", strerror(errno));
- return 1;
- }
-
- struct remote_desc * current = get_unused_remote_descriptor(stype, client_fd);
+ struct remote_desc * current = accept_remote(server_fd, stype, (struct sockaddr *)&sockaddr, &peer_addr_len);
if (current == NULL)
{
- syslog(LOG_DAEMON | LOG_ERR, "Max number of connections reached: %zu", remotes.desc_used);
return 1;
}
@@ -628,20 +654,20 @@ static int handle_incoming_data_event(int epollfd, struct epoll_event * const ev
{
struct remote_desc * current = (struct remote_desc *)event->data.ptr;
- if (current == NULL)
+ if ((event->events & EPOLLIN) == 0)
{
- syslog(LOG_DAEMON | LOG_ERR, "%s", "remote descriptor got from event data invalid");
return 1;
}
- if (current->fd < 0)
+ if (current == NULL)
{
- syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd);
+ syslog(LOG_DAEMON | LOG_ERR, "%s", "remote descriptor got from event data invalid");
return 1;
}
- if ((event->events & EPOLLIN) == 0)
+ if (current->fd < 0)
{
+ syslog(LOG_DAEMON | LOG_ERR, "file descriptor `%d' got from event data invalid", current->fd);
return 1;
}
diff --git a/test/run_tests.sh b/test/run_tests.sh
index 8e2a73005..747880c9b 100755
--- a/test/run_tests.sh
+++ b/test/run_tests.sh
@@ -1,11 +1,19 @@
-#!/usr/bin/env sh
+#!/usr/bin/env bash
set -e
LINE_SPACES=${LINE_SPACES:-48}
MYDIR="$(realpath "$(dirname ${0})")"
nDPId_test_EXEC="${2:-"$(realpath "${MYDIR}/../nDPId-test")"}"
-nDPI_SOURCE_ROOT="${1}"
+nDPI_SOURCE_ROOT="$(realpath "${1}")"
+LOCKFILE="$(realpath "${0}").lock"
+
+touch "${LOCKFILE}"
+exec 42< "${LOCKFILE}"
+flock -x -n 42 || {
+ printf '%s\n' "Could not aquire file lock for ${0}. Already running instance?";
+ exit 1;
+}
if [ $# -ne 1 -a $# -ne 2 ]; then
cat <<EOF
@@ -36,16 +44,24 @@ mkdir -p /tmp/nDPId-test-stderr
set +e
RETVAL=0
for pcap_file in $(ls *.pcap*); do
+ printf '%s\n' "${nDPId_test_EXEC} ${pcap_file}" \
+ >"/tmp/nDPId-test-stderr/${pcap_file}.out"
+
${nDPId_test_EXEC} "${pcap_file}" \
>"${MYDIR}/results/${pcap_file}.out.new" \
- 2>"/tmp/nDPId-test-stderr/${pcap_file}.out"
+ 2>>"/tmp/nDPId-test-stderr/${pcap_file}.out"
+
+ printf "%-${LINE_SPACES}s\t" "${pcap_file}"
if [ $? -eq 0 ]; then
- if diff -u0 "${MYDIR}/results/${pcap_file}.out" \
- "${MYDIR}/results/${pcap_file}.out.new" >/dev/null; then
- printf "%-${LINE_SPACES}s\t%s\n" "${pcap_file}" '[OK]'
+ if [ ! -r "${MYDIR}/results/${pcap_file}.out" ]; then
+ printf '%s\n' '[NEW]'
+ RETVAL=1
+ elif diff -u0 "${MYDIR}/results/${pcap_file}.out" \
+ "${MYDIR}/results/${pcap_file}.out.new" >/dev/null; then
+ printf '%s\n' '[OK]'
else
- printf "%-${LINE_SPACES}s\t%s\n" "${pcap_file}" '[DIFF]'
+ printf '%s\n' '[DIFF]'
diff -u0 "${MYDIR}/results/${pcap_file}.out" \
"${MYDIR}/results/${pcap_file}.out.new"
mv -v "${MYDIR}/results/${pcap_file}.out.new" \
@@ -53,7 +69,7 @@ for pcap_file in $(ls *.pcap*); do
RETVAL=1
fi
else
- printf "%-${LINE_SPACES}s\t%s\n" "${pcap_file}" '[FAIL]'
+ printf '%s\n' '[FAIL]'
printf '%s\n' '----------------------------------------'
printf '%s\n' "-- STDERR of ${pcap_file}"
cat "/tmp/nDPId-test-stderr/${pcap_file}.out"
@@ -63,4 +79,13 @@ for pcap_file in $(ls *.pcap*); do
rm -f "${MYDIR}/results/${pcap_file}.out.new"
done
+cd "${MYDIR}"
+for out_file in $(ls results/*.out); do
+ pcap_file="${nDPI_TEST_DIR}/$(basename ${out_file%.out})"
+ if [ ! -r "${pcap_file}" ]; then
+ printf "%-${LINE_SPACES}s\t%s\n" "$(basename ${pcap_file})" '[MISSING]'
+ RETVAL=1
+ fi
+done
+
exit ${RETVAL}
diff --git a/utils.c b/utils.c
index cea51b4a1..16b233ced 100644
--- a/utils.c
+++ b/utils.c
@@ -55,6 +55,11 @@ static int create_pidfile(char const * const pidfile)
{
int pfd;
+ if (is_path_absolute("Pidfile", pidfile) != 0)
+ {
+ return 1;
+ }
+
pfd = open(pidfile, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (pfd < 0)
@@ -74,6 +79,20 @@ static int create_pidfile(char const * const pidfile)
return 0;
}
+int is_path_absolute(char const * const prefix,
+ char const * const path)
+{
+ if (path[0] != '/')
+ {
+ syslog(LOG_DAEMON | LOG_ERR,
+ "%s path must be absolut i.e. starting with a `/', path given: `%s'",
+ prefix, path);
+ return 1;
+ }
+
+ return 0;
+}
+
int daemonize_with_pidfile(char const * const pidfile)
{
pid_str ps = {};
diff --git a/utils.h b/utils.h
index f1765fc53..951cdea05 100644
--- a/utils.h
+++ b/utils.h
@@ -1,6 +1,9 @@
#ifndef UTILS_H
#define UTILS_H 1
+int is_path_absolute(char const * const prefix,
+ char const * const path);
+
void daemonize_enable(void);
int daemonize_with_pidfile(char const * const pidfile);