aboutsummaryrefslogtreecommitdiff
path: root/nDPId-test.c
diff options
context:
space:
mode:
Diffstat (limited to 'nDPId-test.c')
-rw-r--r--nDPId-test.c121
1 files changed, 79 insertions, 42 deletions
diff --git a/nDPId-test.c b/nDPId-test.c
index d48784f38..39b42d591 100644
--- a/nDPId-test.c
+++ b/nDPId-test.c
@@ -44,6 +44,8 @@ struct nDPId_return_value
{
struct thread_return_value thread_return_value;
+ size_t thread_index;
+
unsigned long long int packets_captured;
unsigned long long int packets_processed;
unsigned long long int total_skipped_flows;
@@ -1287,64 +1289,66 @@ static void * nDPId_mainloop_thread(void * const arg)
if (thread_block_signals() != 0)
{
logger(1, "nDPId block signals failed: %s", strerror(errno));
+ THREAD_ERROR_GOTO(trr);
}
if (setup_reader_threads() != 0)
{
- THREAD_ERROR(trr);
- goto error;
+ logger(1, "%s", "nDPId setup reader threads failed");
+ THREAD_ERROR_GOTO(trr);
}
/* Replace nDPId JSON socket fd with the one in our pipe and hope that no socket specific code-path triggered. */
- reader_threads[0].collector_sockfd = mock_pipefds[PIPE_nDPId];
- reader_threads[0].collector_sock_last_errno = 0;
- if (set_collector_block(&reader_threads[0]) != 0)
+ reader_threads[nrv->thread_index].collector_sockfd = mock_pipefds[PIPE_nDPId];
+ reader_threads[nrv->thread_index].collector_sock_last_errno = 0;
+ if (set_collector_block(&reader_threads[nrv->thread_index]) != 0)
{
- goto error;
+ THREAD_ERROR_GOTO(trr);
}
logger(0, "nDPId thread initialize done");
thread_signal(&start_condition);
thread_wait(&start_condition);
- jsonize_daemon(&reader_threads[0], DAEMON_EVENT_INIT);
+ jsonize_daemon(&reader_threads[nrv->thread_index], DAEMON_EVENT_INIT);
/* restore SIGPIPE to the default handler (Termination) */
if (signal(SIGPIPE, SIG_DFL) == SIG_ERR)
{
- goto error;
+ logger(1, "nDPId restore SIGPIPE to the default handler: %s", strerror(errno));
+ THREAD_ERROR_GOTO(trr);
}
- run_capture_loop(&reader_threads[0]);
+
+ run_capture_loop(&reader_threads[nrv->thread_index]);
process_remaining_flows();
- for (size_t i = 0; i < GET_CMDARG_ULL(nDPId_options.reader_thread_count); ++i)
- {
- nrv->packets_captured += reader_threads[i].workflow->packets_captured;
- nrv->packets_processed += reader_threads[i].workflow->packets_processed;
- nrv->total_skipped_flows += reader_threads[i].workflow->total_skipped_flows;
- nrv->total_l4_payload_len += reader_threads[i].workflow->total_l4_payload_len;
- nrv->not_detected_flow_protocols += reader_threads[i].workflow->total_not_detected_flows;
- nrv->guessed_flow_protocols += reader_threads[i].workflow->total_guessed_flows;
- nrv->detected_flow_protocols += reader_threads[i].workflow->total_detected_flows;
- nrv->flow_detection_updates += reader_threads[i].workflow->total_flow_detection_updates;
- nrv->flow_updates += reader_threads[i].workflow->total_flow_updates;
+ nrv->packets_captured += reader_threads[nrv->thread_index].workflow->packets_captured;
+ nrv->packets_processed += reader_threads[nrv->thread_index].workflow->packets_processed;
+ nrv->total_skipped_flows += reader_threads[nrv->thread_index].workflow->total_skipped_flows;
+ nrv->total_l4_payload_len += reader_threads[nrv->thread_index].workflow->total_l4_payload_len;
- nrv->total_active_flows += reader_threads[i].workflow->total_active_flows;
- nrv->total_idle_flows += reader_threads[i].workflow->total_idle_flows;
- nrv->cur_active_flows += reader_threads[i].workflow->cur_active_flows;
- nrv->cur_idle_flows += reader_threads[i].workflow->cur_idle_flows;
+ nrv->not_detected_flow_protocols += reader_threads[nrv->thread_index].workflow->total_not_detected_flows;
+ nrv->guessed_flow_protocols += reader_threads[nrv->thread_index].workflow->total_guessed_flows;
+ nrv->detected_flow_protocols += reader_threads[nrv->thread_index].workflow->total_detected_flows;
+ nrv->flow_detection_updates += reader_threads[nrv->thread_index].workflow->total_flow_detection_updates;
+ nrv->flow_updates += reader_threads[nrv->thread_index].workflow->total_flow_updates;
+
+ nrv->total_active_flows += reader_threads[nrv->thread_index].workflow->total_active_flows;
+ nrv->total_idle_flows += reader_threads[nrv->thread_index].workflow->total_idle_flows;
+ nrv->cur_active_flows += reader_threads[nrv->thread_index].workflow->cur_active_flows;
+ nrv->cur_idle_flows += reader_threads[nrv->thread_index].workflow->cur_idle_flows;
#ifdef ENABLE_ZLIB
- nrv->total_compressions += reader_threads[i].workflow->total_compressions;
- nrv->total_compression_diff += reader_threads[i].workflow->total_compression_diff;
- nrv->current_compression_diff += reader_threads[i].workflow->current_compression_diff;
+ nrv->total_compressions += reader_threads[nrv->thread_index].workflow->total_compressions;
+ nrv->total_compression_diff += reader_threads[nrv->thread_index].workflow->total_compression_diff;
+ nrv->current_compression_diff += reader_threads[nrv->thread_index].workflow->current_compression_diff;
#endif
- nrv->total_events_serialized += reader_threads[i].workflow->total_events_serialized;
- }
+ nrv->total_events_serialized += reader_threads[nrv->thread_index].workflow->total_events_serialized;
error:
- free_reader_threads();
+ thread_signal(&start_condition);
close(mock_pipefds[PIPE_nDPId]);
+ mock_pipefds[PIPE_nDPId] = -1;
logger(0, "%s", "nDPId worker thread exits..");
return NULL;
@@ -1352,7 +1356,7 @@ error:
static void usage(char const * const arg0)
{
- fprintf(stderr, "usage: %s [path-to-pcap-file]\n", arg0);
+ fprintf(stderr, "usage: %s [path-to-pcap-file] [optional-nDPId-config-file]\n", arg0);
}
static int thread_wait_for_termination(pthread_t thread, time_t wait_time_secs, struct thread_return_value * const trv)
@@ -1639,7 +1643,7 @@ error:
distributor_return.thread_return_value.val != 0)
int main(int argc, char ** argv)
{
- if (argc != 1 && argc != 2)
+ if (argc != 1 && argc != 2 && argc != 3)
{
usage(argv[0]);
return 1;
@@ -1667,7 +1671,47 @@ int main(int argc, char ** argv)
return retval;
}
+ if (access(argv[1], R_OK) != 0)
+ {
+ logger(1, "%s: pcap file `%s' does not exist or is not readable", argv[0], argv[1]);
+ return 1;
+ }
+
+ set_config_defaults(&config_map[0], nDPIsrvd_ARRAY_LENGTH(config_map));
+ set_config_defaults(&general_config_map[0], nDPIsrvd_ARRAY_LENGTH(general_config_map));
+ set_config_defaults(&tuning_config_map[0], nDPIsrvd_ARRAY_LENGTH(tuning_config_map));
+ {
+ int ret;
+
+ if (argc == 3)
+ {
+ set_cmdarg_string(&nDPId_options.config_file, argv[2]);
+ }
+
+ if (IS_CMDARG_SET(nDPId_options.config_file) != 0 &&
+ (ret = parse_config_file(GET_CMDARG_STR(nDPId_options.config_file), nDPId_parsed_config_line, NULL)) != 0)
+ {
+ if (ret > 0)
+ {
+ logger(1, "Config file `%s' is malformed", GET_CMDARG_STR(nDPId_options.config_file));
+ }
+ else if (ret == -ENOENT)
+ {
+ logger(1, "Path `%s' is not a regular file", GET_CMDARG_STR(nDPId_options.config_file));
+ }
+ else
+ {
+ logger(1,
+ "Could not open file `%s' for reading: %s",
+ GET_CMDARG_STR(nDPId_options.config_file),
+ strerror(errno));
+ }
+ return 1;
+ }
+ }
+
set_cmdarg_ull(&nDPIsrvd_options.max_write_buffers, 32);
+ set_cmdarg_string(&nDPId_options.pcap_file_or_interface, argv[1]);
set_cmdarg_boolean(&nDPId_options.enable_data_analysis, 1);
set_cmdarg_ull(&nDPId_options.max_packets_per_flow_to_send, 5);
#ifdef ENABLE_ZLIB
@@ -1682,15 +1726,7 @@ int main(int argc, char ** argv)
set_cmdarg_ull(&nDPId_options.reader_thread_count, 1); /* Please do not change this! Generating meaningful pcap
diff's relies on a single reader thread! */
set_cmdarg_string(&nDPId_options.instance_alias, "nDPId-test");
- if (access(argv[1], R_OK) != 0)
- {
- logger(1, "%s: pcap file `%s' does not exist or is not readable", argv[0], argv[1]);
- return 1;
- }
- set_cmdarg_string(&nDPId_options.pcap_file_or_interface, argv[1]);
- set_config_defaults(&config_map[0], nDPIsrvd_ARRAY_LENGTH(config_map));
- set_config_defaults(&general_config_map[0], nDPIsrvd_ARRAY_LENGTH(general_config_map));
- set_config_defaults(&tuning_config_map[0], nDPIsrvd_ARRAY_LENGTH(tuning_config_map));
+
if (validate_options() != 0)
{
return 1;
@@ -1713,7 +1749,7 @@ int main(int argc, char ** argv)
}
pthread_t nDPId_thread;
- struct nDPId_return_value nDPId_return = {};
+ struct nDPId_return_value nDPId_return = {.thread_index = 0};
if (pthread_create(&nDPId_thread, NULL, nDPId_mainloop_thread, &nDPId_return) != 0)
{
return 1;
@@ -1759,6 +1795,7 @@ int main(int argc, char ** argv)
}
logger(0, "%s", "All worker threads terminated..");
+ free_reader_threads();
if (THREADS_RETURNED_ERROR() != 0)
{