aboutsummaryrefslogtreecommitdiff
path: root/main.c
diff options
context:
space:
mode:
Diffstat (limited to 'main.c')
-rw-r--r--main.c89
1 files changed, 82 insertions, 7 deletions
diff --git a/main.c b/main.c
index cc73cc730..1f6e579cf 100644
--- a/main.c
+++ b/main.c
@@ -15,9 +15,13 @@
#error "nDPI 3.2.0 requiired"
#endif
-#define MAX_FLOW_ROOTS_PER_THREAD 10 //1024
+#define MAX_FLOW_ROOTS_PER_THREAD 1024
+#define MAX_IDLE_FLOWS_PER_THREAD 128
#define TICK_RESOLUTION 1000
#define MAX_READER_THREADS 8
+#define IDLE_SCAN_PERIOD 1000 /* msec (TICK_RESOLUTION = 1000) */
+#define MAX_IDLE_TIME 300000 /* msec (TICK_RESOLUTION = 1000) */
+#define INITIAL_THREAD_HASH 0x03dd018b
enum nDPId_l3_type {
L3_IP, L3_IP6
@@ -58,11 +62,18 @@ struct nDPId_workflow {
pcap_t * pcap_handle;
unsigned long long int thread_packets_processed;
uint64_t last_idle_scan_time;
+ size_t idle_scan_index;
uint64_t last_time;
void ** ndpi_flow_roots;
size_t max_available_flows;
size_t num_allocated_flows;
+ size_t cur_allocated_flows;
+
+ void ** ndpi_flows_idle;
+ size_t max_idle_flows;
+ size_t num_idle_flows;
+ size_t cur_idle_flows;
struct ndpi_detection_module_struct * ndpi_struct;
};
@@ -111,6 +122,14 @@ static struct nDPId_workflow * init_workflow(void)
return NULL;
}
+ workflow->num_idle_flows = 0;
+ workflow->max_idle_flows = MAX_IDLE_FLOWS_PER_THREAD;
+ workflow->ndpi_flows_idle = (void **)ndpi_calloc(workflow->max_idle_flows, sizeof(void *));
+ if (workflow->ndpi_flows_idle == NULL) {
+ free_workflow(&workflow);
+ return NULL;
+ }
+
NDPI_PROTOCOL_BITMASK protos;
NDPI_BITMASK_SET_ALL(protos);
ndpi_set_protocol_detection_bitmask2(workflow->ndpi_struct, &protos);
@@ -149,6 +168,7 @@ static void free_workflow(struct nDPId_workflow ** const workflow)
ndpi_tdestroy(w->ndpi_flow_roots[i], ndpi_flow_info_freer);
}
ndpi_free(w->ndpi_flow_roots);
+ ndpi_free(w->ndpi_flows_idle);
ndpi_free(w);
*workflow = NULL;
}
@@ -325,6 +345,32 @@ static void ndpi_workflow_node_walk(void const * const A, ndpi_VISIT which, int
}
#endif
+static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
+{
+ struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data;
+ struct nDPId_flow_info * const flow = *(struct nDPId_flow_info **)A;
+
+ (void)depth;
+
+ if (workflow == NULL || flow == NULL) {
+ return;
+ }
+
+ if (workflow->cur_idle_flows == MAX_IDLE_FLOWS_PER_THREAD) {
+ return;
+ }
+
+ if (which == ndpi_preorder || which == ndpi_leaf) {
+ if (flow->last_seen + MAX_IDLE_TIME < workflow->last_time) {
+ char src_addr_str[INET6_ADDRSTRLEN+1];
+ char dst_addr_str[INET6_ADDRSTRLEN+1];
+ ip_tuple_to_string(flow, src_addr_str, sizeof(src_addr_str), dst_addr_str, sizeof(dst_addr_str));
+ workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow;
+ workflow->num_idle_flows++;
+ }
+ }
+}
+
static int ndpi_workflow_node_cmp(void const * const A, void const * const B) {
struct nDPId_flow_info const * const flow_info_a = (struct nDPId_flow_info *)A;
struct nDPId_flow_info const * const flow_info_b = (struct nDPId_flow_info *)B;
@@ -381,7 +427,7 @@ static void ndpi_process_packet(uint8_t * const args,
uint32_t l4_data_len = 0;
uint16_t type;
uint16_t frag_off = 0;
- int thread_index = 0x0daa8ef6; // generated with `dd if=/dev/random bs=1024 count=1 |& hd'
+ int thread_index = INITIAL_THREAD_HASH; // generated with `dd if=/dev/random bs=1024 count=1 |& hd'
if (reader_thread == NULL) {
return;
@@ -392,12 +438,34 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
+ time_ms = ((uint64_t) header->ts.tv_sec) * TICK_RESOLUTION + header->ts.tv_usec / (1000000 / TICK_RESOLUTION);
+ workflow->last_time = time_ms;
+
#if 0
for (size_t i = 0; i < workflow->max_available_flows; ++i) {
ndpi_twalk(workflow->ndpi_flow_roots[i], ndpi_workflow_node_walk, workflow);
}
#endif
+ if (workflow->last_idle_scan_time + IDLE_SCAN_PERIOD < workflow->last_time) {
+ ndpi_twalk(workflow->ndpi_flow_roots[workflow->idle_scan_index], ndpi_idle_scan_walker, workflow);
+
+ while (workflow->cur_idle_flows > 0) {
+ struct nDPId_flow_info * const f = (struct nDPId_flow_info *)workflow->ndpi_flows_idle[--workflow->cur_idle_flows];
+ printf("ThreadID %d, free idle flow with id %u\n", thread_index, f->flow_id);
+ ndpi_tdelete(f, &workflow->ndpi_flow_roots[workflow->idle_scan_index],
+ ndpi_workflow_node_cmp);
+ ndpi_flow_info_freer(f);
+ workflow->cur_allocated_flows--;
+ }
+
+ if (++workflow->idle_scan_index == workflow->max_available_flows) {
+ workflow->idle_scan_index = 0;
+ }
+
+ workflow->last_idle_scan_time = workflow->last_time;
+ }
+
switch (pcap_datalink(workflow->pcap_handle)) {
case DLT_NULL:
if (ntohl(*((uint32_t *)&packet[eth_offset])) == 0x00000002) {
@@ -579,9 +647,9 @@ static void ndpi_process_packet(uint8_t * const args,
}
if (tree_result == NULL) {
- if (workflow->num_allocated_flows == workflow->max_available_flows) {
- fprintf(stderr, "ThreadID %d, max flows to track reached: %zu\n", thread_index,
- workflow->max_available_flows);
+ if (workflow->cur_allocated_flows == workflow->max_available_flows) {
+ fprintf(stderr, "ThreadID %d, max flows to track reached: %zu, idle: %zu\n", thread_index,
+ workflow->max_available_flows, workflow->cur_idle_flows);
return;
}
@@ -591,6 +659,7 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
+ workflow->cur_allocated_flows++;
workflow->num_allocated_flows++;
memcpy(flow_to_process, &flow, sizeof(*flow_to_process));
flow_to_process->flow_id = flow_id++;
@@ -640,8 +709,6 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
- time_ms = ((uint64_t) header->ts.tv_sec) * TICK_RESOLUTION + header->ts.tv_usec / (1000000 / TICK_RESOLUTION);
- workflow->last_time = time_ms;
if (flow_to_process->first_seen == 0) {
flow_to_process->first_seen = time_ms;
}
@@ -732,6 +799,8 @@ static int start_reader_threads(void)
static int stop_reader_threads(void)
{
unsigned long long int total_packets_processed = 0;
+ size_t total_flows_captured = 0;
+ size_t total_flows_idle = 0;
for (int i = 0; i < reader_thread_count; ++i) {
break_pcap_loop(&reader_threads[i]);
@@ -743,10 +812,16 @@ static int stop_reader_threads(void)
}
total_packets_processed += reader_threads[i].workflow->thread_packets_processed;
+ total_flows_captured += reader_threads[i].workflow->num_allocated_flows;
+ total_flows_idle += reader_threads[i].workflow->num_idle_flows;
+
printf("Stopping Thread %d, processed %llu packets\n",
reader_threads[i].array_index, reader_threads[i].workflow->thread_packets_processed);
}
printf("Total packets processed: %llu\n", total_packets_processed);
+ printf("Total flows captured...: %zu\n", total_flows_captured);
+ printf("Total flows timed out..: %zu\n", total_flows_idle);
+
for (int i = 0; i < reader_thread_count; ++i) {
if (reader_threads[i].workflow == NULL) {