diff options
Diffstat (limited to 'main.c')
-rw-r--r-- | main.c | 89 |
1 files changed, 82 insertions, 7 deletions
@@ -15,9 +15,13 @@ #error "nDPI 3.2.0 requiired" #endif -#define MAX_FLOW_ROOTS_PER_THREAD 10 //1024 +#define MAX_FLOW_ROOTS_PER_THREAD 1024 +#define MAX_IDLE_FLOWS_PER_THREAD 128 #define TICK_RESOLUTION 1000 #define MAX_READER_THREADS 8 +#define IDLE_SCAN_PERIOD 1000 /* msec (TICK_RESOLUTION = 1000) */ +#define MAX_IDLE_TIME 300000 /* msec (TICK_RESOLUTION = 1000) */ +#define INITIAL_THREAD_HASH 0x03dd018b enum nDPId_l3_type { L3_IP, L3_IP6 @@ -58,11 +62,18 @@ struct nDPId_workflow { pcap_t * pcap_handle; unsigned long long int thread_packets_processed; uint64_t last_idle_scan_time; + size_t idle_scan_index; uint64_t last_time; void ** ndpi_flow_roots; size_t max_available_flows; size_t num_allocated_flows; + size_t cur_allocated_flows; + + void ** ndpi_flows_idle; + size_t max_idle_flows; + size_t num_idle_flows; + size_t cur_idle_flows; struct ndpi_detection_module_struct * ndpi_struct; }; @@ -111,6 +122,14 @@ static struct nDPId_workflow * init_workflow(void) return NULL; } + workflow->num_idle_flows = 0; + workflow->max_idle_flows = MAX_IDLE_FLOWS_PER_THREAD; + workflow->ndpi_flows_idle = (void **)ndpi_calloc(workflow->max_idle_flows, sizeof(void *)); + if (workflow->ndpi_flows_idle == NULL) { + free_workflow(&workflow); + return NULL; + } + NDPI_PROTOCOL_BITMASK protos; NDPI_BITMASK_SET_ALL(protos); ndpi_set_protocol_detection_bitmask2(workflow->ndpi_struct, &protos); @@ -149,6 +168,7 @@ static void free_workflow(struct nDPId_workflow ** const workflow) ndpi_tdestroy(w->ndpi_flow_roots[i], ndpi_flow_info_freer); } ndpi_free(w->ndpi_flow_roots); + ndpi_free(w->ndpi_flows_idle); ndpi_free(w); *workflow = NULL; } @@ -325,6 +345,32 @@ static void ndpi_workflow_node_walk(void const * const A, ndpi_VISIT which, int } #endif +static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) +{ + struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data; + struct nDPId_flow_info * const flow = *(struct nDPId_flow_info **)A; + + (void)depth; + + if (workflow == NULL || flow == NULL) { + return; + } + + if (workflow->cur_idle_flows == MAX_IDLE_FLOWS_PER_THREAD) { + return; + } + + if (which == ndpi_preorder || which == ndpi_leaf) { + if (flow->last_seen + MAX_IDLE_TIME < workflow->last_time) { + char src_addr_str[INET6_ADDRSTRLEN+1]; + char dst_addr_str[INET6_ADDRSTRLEN+1]; + ip_tuple_to_string(flow, src_addr_str, sizeof(src_addr_str), dst_addr_str, sizeof(dst_addr_str)); + workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow; + workflow->num_idle_flows++; + } + } +} + static int ndpi_workflow_node_cmp(void const * const A, void const * const B) { struct nDPId_flow_info const * const flow_info_a = (struct nDPId_flow_info *)A; struct nDPId_flow_info const * const flow_info_b = (struct nDPId_flow_info *)B; @@ -381,7 +427,7 @@ static void ndpi_process_packet(uint8_t * const args, uint32_t l4_data_len = 0; uint16_t type; uint16_t frag_off = 0; - int thread_index = 0x0daa8ef6; // generated with `dd if=/dev/random bs=1024 count=1 |& hd' + int thread_index = INITIAL_THREAD_HASH; // generated with `dd if=/dev/random bs=1024 count=1 |& hd' if (reader_thread == NULL) { return; @@ -392,12 +438,34 @@ static void ndpi_process_packet(uint8_t * const args, return; } + time_ms = ((uint64_t) header->ts.tv_sec) * TICK_RESOLUTION + header->ts.tv_usec / (1000000 / TICK_RESOLUTION); + workflow->last_time = time_ms; + #if 0 for (size_t i = 0; i < workflow->max_available_flows; ++i) { ndpi_twalk(workflow->ndpi_flow_roots[i], ndpi_workflow_node_walk, workflow); } #endif + if (workflow->last_idle_scan_time + IDLE_SCAN_PERIOD < workflow->last_time) { + ndpi_twalk(workflow->ndpi_flow_roots[workflow->idle_scan_index], ndpi_idle_scan_walker, workflow); + + while (workflow->cur_idle_flows > 0) { + struct nDPId_flow_info * const f = (struct nDPId_flow_info *)workflow->ndpi_flows_idle[--workflow->cur_idle_flows]; + printf("ThreadID %d, free idle flow with id %u\n", thread_index, f->flow_id); + ndpi_tdelete(f, &workflow->ndpi_flow_roots[workflow->idle_scan_index], + ndpi_workflow_node_cmp); + ndpi_flow_info_freer(f); + workflow->cur_allocated_flows--; + } + + if (++workflow->idle_scan_index == workflow->max_available_flows) { + workflow->idle_scan_index = 0; + } + + workflow->last_idle_scan_time = workflow->last_time; + } + switch (pcap_datalink(workflow->pcap_handle)) { case DLT_NULL: if (ntohl(*((uint32_t *)&packet[eth_offset])) == 0x00000002) { @@ -579,9 +647,9 @@ static void ndpi_process_packet(uint8_t * const args, } if (tree_result == NULL) { - if (workflow->num_allocated_flows == workflow->max_available_flows) { - fprintf(stderr, "ThreadID %d, max flows to track reached: %zu\n", thread_index, - workflow->max_available_flows); + if (workflow->cur_allocated_flows == workflow->max_available_flows) { + fprintf(stderr, "ThreadID %d, max flows to track reached: %zu, idle: %zu\n", thread_index, + workflow->max_available_flows, workflow->cur_idle_flows); return; } @@ -591,6 +659,7 @@ static void ndpi_process_packet(uint8_t * const args, return; } + workflow->cur_allocated_flows++; workflow->num_allocated_flows++; memcpy(flow_to_process, &flow, sizeof(*flow_to_process)); flow_to_process->flow_id = flow_id++; @@ -640,8 +709,6 @@ static void ndpi_process_packet(uint8_t * const args, return; } - time_ms = ((uint64_t) header->ts.tv_sec) * TICK_RESOLUTION + header->ts.tv_usec / (1000000 / TICK_RESOLUTION); - workflow->last_time = time_ms; if (flow_to_process->first_seen == 0) { flow_to_process->first_seen = time_ms; } @@ -732,6 +799,8 @@ static int start_reader_threads(void) static int stop_reader_threads(void) { unsigned long long int total_packets_processed = 0; + size_t total_flows_captured = 0; + size_t total_flows_idle = 0; for (int i = 0; i < reader_thread_count; ++i) { break_pcap_loop(&reader_threads[i]); @@ -743,10 +812,16 @@ static int stop_reader_threads(void) } total_packets_processed += reader_threads[i].workflow->thread_packets_processed; + total_flows_captured += reader_threads[i].workflow->num_allocated_flows; + total_flows_idle += reader_threads[i].workflow->num_idle_flows; + printf("Stopping Thread %d, processed %llu packets\n", reader_threads[i].array_index, reader_threads[i].workflow->thread_packets_processed); } printf("Total packets processed: %llu\n", total_packets_processed); + printf("Total flows captured...: %zu\n", total_flows_captured); + printf("Total flows timed out..: %zu\n", total_flows_idle); + for (int i = 0; i < reader_thread_count; ++i) { if (reader_threads[i].workflow == NULL) { |