diff options
Diffstat (limited to 'nDPId-test.c')
-rw-r--r-- | nDPId-test.c | 154 |
1 files changed, 150 insertions, 4 deletions
diff --git a/nDPId-test.c b/nDPId-test.c index c28aa41e1..a37cec300 100644 --- a/nDPId-test.c +++ b/nDPId-test.c @@ -53,9 +53,29 @@ struct nDPId_return_value unsigned long long int cur_active_flows; unsigned long long int cur_idle_flows; +#ifdef ENABLE_ZLIB + unsigned long long int total_compressions; + unsigned long long int total_compression_diff; + unsigned long long int current_compression_diff; +#endif + unsigned long long int total_events_serialized; }; +struct distributor_instance_user_data +{ + unsigned long long int flow_cleanup_count; + unsigned long long int daemon_event_count; +}; + +struct distributor_thread_user_data +{ + unsigned long long int flow_new_count; + unsigned long long int flow_end_count; + unsigned long long int flow_idle_count; + unsigned long long int daemon_event_count; +}; + struct distributor_global_user_data { unsigned long long int total_packets_processed; @@ -80,6 +100,9 @@ struct distributor_global_user_data unsigned long long int cur_active_flows; unsigned long long int cur_idle_flows; + struct distributor_instance_user_data instance_user_data; + struct distributor_thread_user_data thread_user_data; + int flow_cleanup_error; }; @@ -258,16 +281,25 @@ error: static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_thread_data * const thread_data, struct nDPIsrvd_flow * const flow) { struct distributor_global_user_data * const global_stats = (struct distributor_global_user_data *)sock->global_user_data; + struct distributor_instance_user_data * instance_stats = + (struct distributor_instance_user_data *)instance->instance_user_data; + struct distributor_thread_user_data * thread_stats = NULL; struct distributor_flow_user_data * flow_stats = NULL; + (void)thread_data; #if 0 printf("Distributor: %.*s\n", (int)sock->buffer.json_string_length, sock->buffer.json_string); #endif + if (thread_data != NULL) + { + thread_stats = (struct distributor_thread_user_data *)thread_data->thread_user_data; + } if (flow != NULL) { flow_stats = (struct distributor_flow_user_data *)flow->flow_user_data; @@ -292,6 +324,9 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s if (daemon_event_name != NULL) { + instance_stats->daemon_event_count++; + thread_stats->daemon_event_count++; + if (TOKEN_VALUE_EQUALS_SZ(daemon_event_name, "shutdown") != 0) { struct nDPIsrvd_json_token const * const total_events_serialized = @@ -320,6 +355,7 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s { global_stats->cur_active_flows++; global_stats->flow_new_count++; + thread_stats->flow_new_count++; unsigned int hash_count = HASH_COUNT(instance->flow_table); if (hash_count != global_stats->cur_active_flows) @@ -336,12 +372,14 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s global_stats->cur_active_flows--; global_stats->cur_idle_flows++; global_stats->flow_end_count++; + thread_stats->flow_end_count++; } if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle") != 0) { global_stats->cur_active_flows--; global_stats->cur_idle_flows++; global_stats->flow_idle_count++; + thread_stats->flow_idle_count++; } if (TOKEN_VALUE_EQUALS_SZ(flow_event_name, "detected") != 0) { @@ -424,8 +462,32 @@ static enum nDPIsrvd_callback_return distributor_json_callback(struct nDPIsrvd_s return CALLBACK_OK; } +static void distributor_instance_cleanup_callback(struct nDPIsrvd_socket * const sock, + struct nDPIsrvd_instance * const instance, + enum nDPIsrvd_cleanup_reason reason) +{ + struct distributor_global_user_data * const global_stats = + (struct distributor_global_user_data *)sock->global_user_data; + struct nDPIsrvd_thread_data * current_thread_data; + struct nDPIsrvd_thread_data * ttmp; + + (void)reason; + + HASH_ITER(hh, instance->thread_data_table, current_thread_data, ttmp) + { + struct distributor_thread_user_data * const tud = + (struct distributor_thread_user_data *)current_thread_data->thread_user_data; + global_stats->thread_user_data.daemon_event_count += tud->daemon_event_count; + global_stats->thread_user_data.flow_new_count += tud->flow_new_count; + global_stats->thread_user_data.flow_end_count += tud->flow_end_count; + global_stats->thread_user_data.flow_idle_count += tud->flow_idle_count; + } + global_stats->instance_user_data = *(struct distributor_instance_user_data *)instance->instance_user_data; +} + static void distributor_flow_cleanup_callback(struct nDPIsrvd_socket * const sock, struct nDPIsrvd_instance * const instance, + struct nDPIsrvd_thread_data * const thread_data, struct nDPIsrvd_flow * const flow, enum nDPIsrvd_cleanup_reason reason) { @@ -433,7 +495,9 @@ static void distributor_flow_cleanup_callback(struct nDPIsrvd_socket * const soc (struct distributor_global_user_data *)sock->global_user_data; struct distributor_flow_user_data * const flow_stats = (struct distributor_flow_user_data *)flow->flow_user_data; - (void)instance; + (void)thread_data; + + ((struct distributor_instance_user_data *)instance->instance_user_data)->flow_cleanup_count++; switch (reason) { @@ -495,8 +559,11 @@ static void * distributor_client_mainloop_thread(void * const arg) struct distributor_return_value * const drv = (struct distributor_return_value *)arg; struct thread_return_value * const trv = &drv->thread_return_value; struct nDPIsrvd_socket * mock_sock = nDPIsrvd_socket_init(sizeof(struct distributor_global_user_data), + sizeof(struct distributor_instance_user_data), + sizeof(struct distributor_thread_user_data), sizeof(struct distributor_flow_user_data), distributor_json_callback, + distributor_instance_cleanup_callback, distributor_flow_cleanup_callback); struct distributor_global_user_data * stats; @@ -653,8 +720,6 @@ static void * distributor_client_mainloop_thread(void * const arg) } } - drv->stats = *stats; - struct nDPIsrvd_instance * current_instance; struct nDPIsrvd_instance * itmp; struct nDPIsrvd_flow * current_flow; @@ -667,8 +732,12 @@ static void * distributor_client_mainloop_thread(void * const arg) THREAD_ERROR(trv); break; } + + nDPIsrvd_cleanup_instance(mock_sock, current_instance, CLEANUP_REASON_APP_SHUTDOWN); } + drv->stats = *stats; + error: del_event(dis_epollfd, signalfd); del_event(dis_epollfd, mock_testfds[PIPE_TEST_READ]); @@ -724,6 +793,12 @@ static void * nDPId_mainloop_thread(void * const arg) nrv->cur_active_flows += reader_threads[i].workflow->cur_active_flows; nrv->cur_idle_flows += reader_threads[i].workflow->cur_idle_flows; +#ifdef ENABLE_ZLIB + nrv->total_compressions += reader_threads[i].workflow->total_compressions; + nrv->total_compression_diff += reader_threads[i].workflow->total_compression_diff; + nrv->current_compression_diff += reader_threads[i].workflow->current_compression_diff; +#endif + nrv->total_events_serialized += reader_threads[i].workflow->total_events_serialized; } @@ -1104,7 +1179,7 @@ int main(int argc, char ** argv) { logger(1, "%s: Amount of total active flows not equal to the amount of received 'detected', 'guessed and " - "'not-detected' events: %llu != " + "'not-detected' flow events: %llu != " "%llu + %llu + %llu", argv[0], nDPId_return.total_active_flows, @@ -1114,6 +1189,48 @@ int main(int argc, char ** argv) return 1; } + if (distributor_return.stats.instance_user_data.daemon_event_count != + distributor_return.stats.thread_user_data.daemon_event_count) + { + logger(1, + "%s: Amount of received daemon events differs between instance and thread: %llu != %llu", + argv[0], + distributor_return.stats.instance_user_data.daemon_event_count, + distributor_return.stats.thread_user_data.daemon_event_count); + return 1; + } + + if (distributor_return.stats.instance_user_data.flow_cleanup_count - distributor_return.stats.total_flow_timeouts != + distributor_return.stats.flow_end_count + distributor_return.stats.flow_idle_count) + { + logger(1, + "%s: Amount of flow cleanup callback calls differs between received 'end' and 'idle' flow events: %llu " + "!= %llu + %llu", + argv[0], + distributor_return.stats.instance_user_data.flow_cleanup_count - + distributor_return.stats.total_flow_timeouts, + distributor_return.stats.flow_end_count, + distributor_return.stats.flow_idle_count); + return 1; + } + + if (distributor_return.stats.flow_new_count != distributor_return.stats.thread_user_data.flow_new_count || + distributor_return.stats.flow_end_count != distributor_return.stats.thread_user_data.flow_end_count || + distributor_return.stats.flow_idle_count != distributor_return.stats.thread_user_data.flow_idle_count) + { + logger(1, + "%s: Thread user data counters not equal to the global user data counters: %llu != %llu or %llu != %llu " + "or %llu != %llu", + argv[0], + distributor_return.stats.flow_new_count, + distributor_return.stats.thread_user_data.flow_new_count, + distributor_return.stats.flow_end_count, + distributor_return.stats.thread_user_data.flow_end_count, + distributor_return.stats.flow_idle_count, + distributor_return.stats.thread_user_data.flow_idle_count); + return 1; + } + #ifdef ENABLE_ZLIB if (zlib_compressions != zlib_decompressions) { @@ -1125,6 +1242,35 @@ int main(int argc, char ** argv) (unsigned long long int)zlib_decompressions); return 1; } + if (nDPId_return.current_compression_diff != 0) + { + logger(1, + "%s: %s (%llu bytes)", + argv[0], + "ZLib compression inconsistency detected. It should be 0.", + nDPId_return.current_compression_diff); + return 1; + } + if (nDPId_return.total_compressions != zlib_compressions) + { + logger(1, + "%s: %s (%llu != %llu)", + argv[0], + "ZLib global<->workflow compression / decompression inconsistency detected.", + (unsigned long long int)zlib_compressions, + nDPId_return.current_compression_diff); + return 1; + } + if (nDPId_return.total_compression_diff != zlib_compression_bytes) + { + logger(1, + "%s: %s (%llu bytes != %llu bytes)", + argv[0], + "ZLib global<->workflow compression / decompression inconsistency detected.", + (unsigned long long int)zlib_compression_bytes, + nDPId_return.total_compression_diff); + return 1; + } #endif return 0; |