summaryrefslogtreecommitdiff
path: root/nDPId-test.c
diff options
context:
space:
mode:
Diffstat (limited to 'nDPId-test.c')
-rw-r--r--nDPId-test.c154
1 files changed, 150 insertions, 4 deletions
diff --git a/nDPId-test.c b/nDPId-test.c
index c28aa41e1..a37cec300 100644
--- a/nDPId-test.c
+++ b/nDPId-test.c
@@ -53,9 +53,29 @@ struct nDPId_return_value
unsigned long long int cur_active_flows;
unsigned long long int cur_idle_flows;
+#ifdef ENABLE_ZLIB
+ unsigned long long int total_compressions;
+ unsigned long long int total_compression_diff;
+ unsigned long long int current_compression_diff;
+#endif
+
unsigned long long int total_events_serialized;
};
+struct distributor_instance_user_data
+{
+ unsigned long long int flow_cleanup_count;
+ unsigned long long int daemon_event_count;
+};
+
+struct distributor_thread_user_data
+{
+ unsigned long long int flow_new_count;
+ unsigned long long int flow_end_count;
+ unsigned long long int flow_idle_count;
+ unsigned long long int daemon_event_count;
+};
+
struct distributor_global_user_data
{
unsigned long long int total_packets_processed;
@@ -80,6 +100,9 @@ struct distributor_global_user_data
unsigned long long int cur_active_flows;
unsigned long long int cur_idle_flows;
+ struct distributor_instance_user_data instance_user_data;
+ struct distributor_thread_user_data thread_user_data;
+
int flow_cleanup_error;
};
@@ -258,16 +281,25 @@ error:
static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_thread_data * const thread_data,
struct nDPIsrvd_flow * const flow)
{
struct distributor_global_user_data * const global_stats =
(struct distributor_global_user_data *)sock->global_user_data;
+ struct distributor_instance_user_data * instance_stats =
+ (struct distributor_instance_user_data *)instance->instance_user_data;
+ struct distributor_thread_user_data * thread_stats = NULL;
struct distributor_flow_user_data * flow_stats = NULL;
+ (void)thread_data;
#if 0
printf("Distributor: %.*s\n", (int)sock->buffer.json_string_length, sock->buffer.json_string);
#endif
+ if (thread_data != NULL)
+ {
+ thread_stats = (struct distributor_thread_user_data *)thread_data->thread_user_data;
+ }
if (flow != NULL)
{
flow_stats = (struct distributor_flow_user_data *)flow->flow_user_data;
@@ -292,6 +324,9 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
if (daemon_event_name != NULL)
{
+ instance_stats->daemon_event_count++;
+ thread_stats->daemon_event_count++;
+
if (TOKEN_VALUE_EQUALS_SZ(daemon_event_name, "shutdown") != 0)
{
struct nDPIsrvd_json_token const * const total_events_serialized =
@@ -320,6 +355,7 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
{
global_stats->cur_active_flows++;
global_stats->flow_new_count++;
+ thread_stats->flow_new_count++;
unsigned int hash_count = HASH_COUNT(instance->flow_table);
if (hash_count != global_stats->cur_active_flows)
@@ -336,12 +372,14 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
global_stats->cur_active_flows--;
global_stats->cur_idle_flows++;
global_stats->flow_end_count++;
+ thread_stats->flow_end_count++;
}
if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle") != 0)
{
global_stats->cur_active_flows--;
global_stats->cur_idle_flows++;
global_stats->flow_idle_count++;
+ thread_stats->flow_idle_count++;
}
if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "detected") != 0)
{
@@ -424,8 +462,32 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s
return CALLBACK_OK;
}
+static void distributor_instance_cleanup_callback(struct nDPIsrvd_socket * const sock,
+ struct nDPIsrvd_instance * const instance,
+ enum nDPIsrvd_cleanup_reason reason)
+{
+ struct distributor_global_user_data * const global_stats =
+ (struct distributor_global_user_data *)sock->global_user_data;
+ struct nDPIsrvd_thread_data * current_thread_data;
+ struct nDPIsrvd_thread_data * ttmp;
+
+ (void)reason;
+
+ HASH_ITER(hh, instance->thread_data_table, current_thread_data, ttmp)
+ {
+ struct distributor_thread_user_data * const tud =
+ (struct distributor_thread_user_data *)current_thread_data->thread_user_data;
+ global_stats->thread_user_data.daemon_event_count += tud->daemon_event_count;
+ global_stats->thread_user_data.flow_new_count += tud->flow_new_count;
+ global_stats->thread_user_data.flow_end_count += tud->flow_end_count;
+ global_stats->thread_user_data.flow_idle_count += tud->flow_idle_count;
+ }
+ global_stats->instance_user_data = *(struct distributor_instance_user_data *)instance->instance_user_data;
+}
+
static void distributor_flow_cleanup_callback(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
+ struct nDPIsrvd_thread_data * const thread_data,
struct nDPIsrvd_flow * const flow,
enum nDPIsrvd_cleanup_reason reason)
{
@@ -433,7 +495,9 @@ static void distributor_flow_cleanup_callback(struct nDPIsrvd_socket * const soc
(struct distributor_global_user_data *)sock->global_user_data;
struct distributor_flow_user_data * const flow_stats = (struct distributor_flow_user_data *)flow->flow_user_data;
- (void)instance;
+ (void)thread_data;
+
+ ((struct distributor_instance_user_data *)instance->instance_user_data)->flow_cleanup_count++;
switch (reason)
{
@@ -495,8 +559,11 @@ static void * distributor_client_mainloop_thread(void * const arg)
struct distributor_return_value * const drv = (struct distributor_return_value *)arg;
struct thread_return_value * const trv = &drv->thread_return_value;
struct nDPIsrvd_socket * mock_sock = nDPIsrvd_socket_init(sizeof(struct distributor_global_user_data),
+ sizeof(struct distributor_instance_user_data),
+ sizeof(struct distributor_thread_user_data),
sizeof(struct distributor_flow_user_data),
distributor_json_callback,
+ distributor_instance_cleanup_callback,
distributor_flow_cleanup_callback);
struct distributor_global_user_data * stats;
@@ -653,8 +720,6 @@ static void * distributor_client_mainloop_thread(void * const arg)
}
}
- drv->stats = *stats;
-
struct nDPIsrvd_instance * current_instance;
struct nDPIsrvd_instance * itmp;
struct nDPIsrvd_flow * current_flow;
@@ -667,8 +732,12 @@ static void * distributor_client_mainloop_thread(void * const arg)
THREAD_ERROR(trv);
break;
}
+
+ nDPIsrvd_cleanup_instance(mock_sock, current_instance, CLEANUP_REASON_APP_SHUTDOWN);
}
+ drv->stats = *stats;
+
error:
del_event(dis_epollfd, signalfd);
del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]);
@@ -724,6 +793,12 @@ static void * nDPId_mainloop_thread(void * const arg)
nrv->cur_active_flows += reader_threads[i].workflow->cur_active_flows;
nrv->cur_idle_flows += reader_threads[i].workflow->cur_idle_flows;
+#ifdef ENABLE_ZLIB
+ nrv->total_compressions += reader_threads[i].workflow->total_compressions;
+ nrv->total_compression_diff += reader_threads[i].workflow->total_compression_diff;
+ nrv->current_compression_diff += reader_threads[i].workflow->current_compression_diff;
+#endif
+
nrv->total_events_serialized += reader_threads[i].workflow->total_events_serialized;
}
@@ -1104,7 +1179,7 @@ int main(int argc, char ** argv)
{
logger(1,
"%s: Amount of total active flows not equal to the amount of received 'detected', 'guessed and "
- "'not-detected' events: %llu != "
+ "'not-detected' flow events: %llu != "
"%llu + %llu + %llu",
argv[0],
nDPId_return.total_active_flows,
@@ -1114,6 +1189,48 @@ int main(int argc, char ** argv)
return 1;
}
+ if (distributor_return.stats.instance_user_data.daemon_event_count !=
+ distributor_return.stats.thread_user_data.daemon_event_count)
+ {
+ logger(1,
+ "%s: Amount of received daemon events differs between instance and thread: %llu != %llu",
+ argv[0],
+ distributor_return.stats.instance_user_data.daemon_event_count,
+ distributor_return.stats.thread_user_data.daemon_event_count);
+ return 1;
+ }
+
+ if (distributor_return.stats.instance_user_data.flow_cleanup_count - distributor_return.stats.total_flow_timeouts !=
+ distributor_return.stats.flow_end_count + distributor_return.stats.flow_idle_count)
+ {
+ logger(1,
+ "%s: Amount of flow cleanup callback calls differs between received 'end' and 'idle' flow events: %llu "
+ "!= %llu + %llu",
+ argv[0],
+ distributor_return.stats.instance_user_data.flow_cleanup_count -
+ distributor_return.stats.total_flow_timeouts,
+ distributor_return.stats.flow_end_count,
+ distributor_return.stats.flow_idle_count);
+ return 1;
+ }
+
+ if (distributor_return.stats.flow_new_count != distributor_return.stats.thread_user_data.flow_new_count ||
+ distributor_return.stats.flow_end_count != distributor_return.stats.thread_user_data.flow_end_count ||
+ distributor_return.stats.flow_idle_count != distributor_return.stats.thread_user_data.flow_idle_count)
+ {
+ logger(1,
+ "%s: Thread user data counters not equal to the global user data counters: %llu != %llu or %llu != %llu "
+ "or %llu != %llu",
+ argv[0],
+ distributor_return.stats.flow_new_count,
+ distributor_return.stats.thread_user_data.flow_new_count,
+ distributor_return.stats.flow_end_count,
+ distributor_return.stats.thread_user_data.flow_end_count,
+ distributor_return.stats.flow_idle_count,
+ distributor_return.stats.thread_user_data.flow_idle_count);
+ return 1;
+ }
+
#ifdef ENABLE_ZLIB
if (zlib_compressions != zlib_decompressions)
{
@@ -1125,6 +1242,35 @@ int main(int argc, char ** argv)
(unsigned long long int)zlib_decompressions);
return 1;
}
+ if (nDPId_return.current_compression_diff != 0)
+ {
+ logger(1,
+ "%s: %s (%llu bytes)",
+ argv[0],
+ "ZLib compression inconsistency detected. It should be 0.",
+ nDPId_return.current_compression_diff);
+ return 1;
+ }
+ if (nDPId_return.total_compressions != zlib_compressions)
+ {
+ logger(1,
+ "%s: %s (%llu != %llu)",
+ argv[0],
+ "ZLib global<->workflow compression / decompression inconsistency detected.",
+ (unsigned long long int)zlib_compressions,
+ nDPId_return.current_compression_diff);
+ return 1;
+ }
+ if (nDPId_return.total_compression_diff != zlib_compression_bytes)
+ {
+ logger(1,
+ "%s: %s (%llu bytes != %llu bytes)",
+ argv[0],
+ "ZLib global<->workflow compression / decompression inconsistency detected.",
+ (unsigned long long int)zlib_compression_bytes,
+ nDPId_return.total_compression_diff);
+ return 1;
+ }
#endif
return 0;