diff options
author | lns <matzeton@googlemail.com> | 2020-06-06 12:55:31 +0200 |
---|---|---|
committer | lns <matzeton@googlemail.com> | 2020-06-06 12:55:31 +0200 |
commit | a7069d0b72b793628b0c1c1f036aa91cb883a084 (patch) | |
tree | 076434c335433ecea7043fd26b1c443104e8d957 | |
parent | fd33b7355f95db09a26a5e1442a325927938a99e (diff) |
flow tracking
-rw-r--r-- | main.c | 229 |
1 files changed, 175 insertions, 54 deletions
@@ -2,6 +2,7 @@ #include <errno.h> #include <linux/if_ether.h> #include <netinet/in.h> +#include <ndpi/ndpi_api.h> #include <ndpi/ndpi_main.h> #include <pcap/pcap.h> #include <pthread.h> @@ -16,7 +17,7 @@ enum nDPId_l3_type { struct nDPId_flow_info { uint32_t flow_id; - uint32_t hashval; + uint64_t hashval; enum nDPId_l3_type l3_type; union { struct { @@ -24,8 +25,8 @@ struct nDPId_flow_info { uint32_t dst; } v4; struct { - struct ndpi_in6_addr src; - struct ndpi_in6_addr dst; + uint64_t src[2]; + uint64_t dst[2]; } v6; } ip_tuple; uint16_t src_port; @@ -41,7 +42,6 @@ struct nDPId_workflow { void ** ndpi_flows_root; size_t max_available_flows; size_t num_allocated_flows; - struct ndpi_detection_module_struct * ndpi_struct; }; @@ -55,15 +55,23 @@ struct nDPId_reader_thread { static struct nDPId_reader_thread reader_threads[MAX_READER_THREADS] = {}; static int reader_thread_count = MAX_READER_THREADS; static int main_thread_shutdown = 0; +static uint32_t flow_id = 0; static struct nDPId_workflow * init_workflow(void) { + char pcap_error_buffer[PCAP_ERRBUF_SIZE]; struct nDPId_workflow * workflow = (struct nDPId_workflow *)ndpi_calloc(1, sizeof(*workflow)); if (workflow == NULL) { return NULL; } + workflow->pcap_handle = pcap_open_live("wifi0" /* "lo" */, /* 1536 */ 65535, 1, 250, pcap_error_buffer); + if (workflow->pcap_handle == NULL) { + fprintf(stderr, "pcap_open_live: %s\n", pcap_error_buffer); + return NULL; + } + ndpi_init_prefs init_prefs = ndpi_no_prefs; workflow->ndpi_struct = ndpi_init_detection_module(init_prefs); if (workflow->ndpi_struct == NULL) { @@ -86,46 +94,36 @@ static struct nDPId_workflow * init_workflow(void) return workflow; } -static void free_workflow(struct nDPId_workflow ** const workflow) +static void ndpi_flow_info_freer(void * const node) { - if (*workflow == NULL) { - return; - } + struct nDPId_flow_info * const flow = (struct nDPId_flow_info *)node; - if ((*workflow)->ndpi_struct != NULL) { - ndpi_exit_detection_module((*workflow)->ndpi_struct); - } - ndpi_free(*workflow); - *workflow = NULL; + ndpi_flow_free(flow->ndpi_flow); + ndpi_free(flow); } -static int setup_detection(struct nDPId_workflow * const workflow) +static void free_workflow(struct nDPId_workflow ** const workflow) { - char pcap_error_buffer[PCAP_ERRBUF_SIZE]; + struct nDPId_workflow * const w = *workflow; - if (workflow == NULL) { - return 1; + if (w == NULL) { + return; } - workflow->pcap_handle = pcap_open_live("wifi0" /* "lo" */, /* 1536 */ 65535, 1, 250, pcap_error_buffer); - if (workflow->pcap_handle == NULL) { - fprintf(stderr, "pcap_open_live: %s\n", pcap_error_buffer); - return 1; + if (w->pcap_handle != NULL) { + pcap_close(w->pcap_handle); + w->pcap_handle = NULL; } - return 0; -} - -static void free_detection(struct nDPId_workflow * const workflow) -{ - if (workflow == NULL) { - return; + if (w->ndpi_struct != NULL) { + ndpi_exit_detection_module(w->ndpi_struct); } - - if (workflow->pcap_handle != NULL) { - pcap_close(workflow->pcap_handle); - workflow->pcap_handle = NULL; + for(size_t i = 0; i < w->max_available_flows; i++) { + ndpi_tdestroy(w->ndpi_flows_root[i], ndpi_flow_info_freer); } + ndpi_free(w->ndpi_flows_root); + ndpi_free(w); + *workflow = NULL; } static int setup_reader_threads(void) @@ -136,8 +134,7 @@ static int setup_reader_threads(void) for (int i = 0; i < reader_thread_count; ++i) { reader_threads[i].workflow = init_workflow(); - if (reader_threads[i].workflow == NULL || - setup_detection(reader_threads[i].workflow) != 0) + if (reader_threads[i].workflow == NULL) { return 1; } @@ -170,9 +167,9 @@ static void print_packet_info(int thread_array_index, src_addr_str, dst_addr_str); break; case ETH_P_IPV6: - inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.src.u6_addr.u6_addr8[0], + inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.src[0], src_addr_str, sizeof(src_addr_str)); - inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.dst.u6_addr.u6_addr8[0], + inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.dst[0], dst_addr_str, sizeof(dst_addr_str)); ret = snprintf(buf + used, sizeof(buf) - used, "IP6[%s -> %s]", src_addr_str, dst_addr_str); @@ -214,14 +211,79 @@ static void print_packet_info(int thread_array_index, printf("%.*s\n", used, buf); } +static int ip_tuples_equal(struct nDPId_flow_info const * const A, + struct nDPId_flow_info const * const B) +{ + if (A->l3_type == L3_IP) { + return A->ip_tuple.v4.src == B->ip_tuple.v4.src && + A->ip_tuple.v4.dst == B->ip_tuple.v4.dst; + } else if (A->l3_type == L3_IP6) { + return A->ip_tuple.v6.src[0] == B->ip_tuple.v6.src[0] && + A->ip_tuple.v6.src[1] == B->ip_tuple.v6.src[1] && + A->ip_tuple.v6.dst[0] == B->ip_tuple.v6.dst[0] && + A->ip_tuple.v6.dst[1] == B->ip_tuple.v6.dst[1]; + } + return 0; +} + +static int ndpi_workflow_node_cmp(void const * const a, void const * const b) { + struct nDPId_flow_info const * const fa = (struct nDPId_flow_info*)a; + struct nDPId_flow_info const * const fb = (struct nDPId_flow_info*)b; + + if (fa->hashval < fb->hashval) { + return(-1); + } else if (fa->hashval > fb->hashval) { + return(1); + } + + /* Flows have the same hash */ + if (fa->protocol < fb->protocol) { + return(-1); + } else if (fa->protocol > fb->protocol) { + return(1); + } + + if (ip_tuples_equal(fa, fb) != 0 && + fa->src_port == fb->src_port && + fa->dst_port == fb->dst_port) + { + return(0); + } + + if (fa->ip_tuple.v4.src < fb->ip_tuple.v4.src) { + return(-1); + } else if(fa->ip_tuple.v4.dst > fb->ip_tuple.v4.dst) { + return(1); + } + if (fa->ip_tuple.v4.src < fb->ip_tuple.v4.src) { + return(-1); + } else if(fa->src_port > fb->src_port) { + return(1); + } + if(fa->ip_tuple.v4.dst < fb->ip_tuple.v4.dst) { + return(-2); + } else if(fa->ip_tuple.v4.dst > fb->ip_tuple.v4.dst) { + return(1); + } + if (fa->dst_port < fb->dst_port) { + return(-1); + } else if (fa->dst_port > fb->dst_port) { + return(1); + } + + return(0); /* notreached */ +} + static void ndpi_process_packet(uint8_t * const args, - const struct pcap_pkthdr * const header, - const uint8_t * const packet) + struct pcap_pkthdr const * const header, + uint8_t const * const packet) { struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)args; struct nDPId_workflow * workflow; - struct nDPId_flow_info flow; + struct nDPId_flow_info flow = {}; + void * opaque_flow; + size_t hashed_index; const struct ndpi_ethhdr * ethernet; const struct ndpi_iphdr * ip; @@ -333,19 +395,19 @@ static void ndpi_process_packet(uint8_t * const args, flow.protocol = ip6->ip6_hdr.ip6_un1_nxt; l4_offset = ip_offset + sizeof(ip6->ip6_hdr); - flow.ip_tuple.v6.src.u6_addr.u6_addr64[0] = ip6->ip6_src.u6_addr.u6_addr64[0]; - flow.ip_tuple.v6.src.u6_addr.u6_addr64[1] = ip6->ip6_src.u6_addr.u6_addr64[1]; - flow.ip_tuple.v6.dst.u6_addr.u6_addr64[0] = ip6->ip6_dst.u6_addr.u6_addr64[0]; - flow.ip_tuple.v6.dst.u6_addr.u6_addr64[1] = ip6->ip6_dst.u6_addr.u6_addr64[1]; + flow.ip_tuple.v6.src[0] = ip6->ip6_src.u6_addr.u6_addr64[0]; + flow.ip_tuple.v6.src[1] = ip6->ip6_src.u6_addr.u6_addr64[1]; + flow.ip_tuple.v6.dst[0] = ip6->ip6_dst.u6_addr.u6_addr64[0]; + flow.ip_tuple.v6.dst[1] = ip6->ip6_dst.u6_addr.u6_addr64[1]; uint64_t min_addr[2]; - if (flow.ip_tuple.v6.src.u6_addr.u6_addr64[0] > flow.ip_tuple.v6.dst.u6_addr.u6_addr64[0] && - flow.ip_tuple.v6.src.u6_addr.u6_addr64[1] > flow.ip_tuple.v6.dst.u6_addr.u6_addr64[1]) + if (flow.ip_tuple.v6.src[0] > flow.ip_tuple.v6.dst[0] && + flow.ip_tuple.v6.src[1] > flow.ip_tuple.v6.dst[1]) { - min_addr[0] = flow.ip_tuple.v6.dst.u6_addr.u6_addr64[0]; - min_addr[1] = flow.ip_tuple.v6.dst.u6_addr.u6_addr64[0]; + min_addr[0] = flow.ip_tuple.v6.dst[0]; + min_addr[1] = flow.ip_tuple.v6.dst[0]; } else { - min_addr[0] = flow.ip_tuple.v6.src.u6_addr.u6_addr64[0]; - min_addr[1] = flow.ip_tuple.v6.src.u6_addr.u6_addr64[0]; + min_addr[0] = flow.ip_tuple.v6.src[0]; + min_addr[1] = flow.ip_tuple.v6.src[0]; } thread_index = min_addr[0] + min_addr[1]; thread_index %= reader_thread_count; @@ -373,9 +435,69 @@ static void ndpi_process_packet(uint8_t * const args, } print_packet_info(reader_thread->array_index, header, type, &flow); + + if (flow.l3_type == L3_IP) { + flow.hashval = flow.ip_tuple.v4.src + flow.ip_tuple.v4.dst; + } else if (flow.l3_type == L3_IP6) { + flow.hashval = flow.ip_tuple.v6.src[0] + flow.ip_tuple.v6.src[1]; + flow.hashval += flow.ip_tuple.v6.dst[0] + flow.ip_tuple.v6.dst[1]; + } + flow.hashval += flow.protocol + flow.src_port + flow.dst_port; + + hashed_index = flow.hashval % workflow->max_available_flows; + opaque_flow = ndpi_tfind(&flow, &workflow->ndpi_flows_root[hashed_index], ndpi_workflow_node_cmp); + if (opaque_flow == NULL) { + uint64_t orig_src_ip[2] = { flow.ip_tuple.v6.src[0], flow.ip_tuple.v6.src[1] }; + uint64_t orig_dst_ip[2] = { flow.ip_tuple.v6.dst[0], flow.ip_tuple.v6.dst[1] }; + uint16_t orig_src_port = flow.src_port; + uint16_t orig_dst_port = flow.dst_port; + + flow.ip_tuple.v6.src[0] = orig_dst_ip[0]; + flow.ip_tuple.v6.src[1] = orig_dst_ip[1]; + flow.ip_tuple.v6.dst[0] = orig_src_ip[0]; + flow.ip_tuple.v6.dst[1] = orig_src_ip[1]; + flow.src_port = orig_dst_port; + flow.dst_port = orig_src_port; + + opaque_flow = ndpi_tfind(&flow, &workflow->ndpi_flows_root[hashed_index], ndpi_workflow_node_cmp); + + flow.ip_tuple.v6.src[0] = orig_src_ip[0]; + flow.ip_tuple.v6.src[1] = orig_src_ip[1]; + flow.ip_tuple.v6.dst[0] = orig_dst_ip[0]; + flow.ip_tuple.v6.dst[1] = orig_dst_ip[1]; + flow.src_port = orig_src_port; + flow.dst_port = orig_dst_port; + } + + if (opaque_flow == NULL) { + if (workflow->num_allocated_flows == workflow->max_available_flows) { + fprintf(stderr, "Max flows to track reached: %zu\n", workflow->max_available_flows); + return; + } + + struct nDPId_flow_info * const newflow = (struct nDPId_flow_info *)ndpi_malloc(sizeof(*newflow)); + if (newflow == NULL) { + fprintf(stderr, "Not enough memory for flow info\n"); + return; + } + + workflow->num_allocated_flows++; + memcpy(newflow, &flow, sizeof(*newflow)); + newflow->flow_id = flow_id++; + + newflow->ndpi_flow = (struct ndpi_flow_struct *)ndpi_flow_malloc(SIZEOF_FLOW_STRUCT); + if (newflow->ndpi_flow == NULL) { + fprintf(stderr, "Not enough memory for flow struct\n"); + return; + } + memset(newflow->ndpi_flow, 0, SIZEOF_FLOW_STRUCT); + + printf("New flow with id %u\n", newflow->flow_id); + ndpi_tsearch(newflow, &workflow->ndpi_flows_root[hashed_index], ndpi_workflow_node_cmp); /* Add */ + } } -static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) +static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread) { if (reader_thread->workflow != NULL && reader_thread->workflow->pcap_handle != NULL) { @@ -400,7 +522,7 @@ static void break_pcap_loop(struct nDPId_reader_thread * const reader_thread) static void * processing_thread(void * const ndpi_thread_arg) { - struct nDPId_reader_thread * const reader_thread = + struct nDPId_reader_thread const * const reader_thread = (struct nDPId_reader_thread *)ndpi_thread_arg; printf("Starting ThreadID %d\n", reader_thread->array_index); @@ -452,7 +574,7 @@ static int stop_reader_threads(void) for (int i = 0; i < reader_thread_count; ++i) { if (reader_threads[i].workflow == NULL) { - continue;; + continue; } printf("Stopping ThreadID %d\n", reader_threads[i].array_index); @@ -467,14 +589,13 @@ static int stop_reader_threads(void) fprintf(stderr, "pthread_join: %s\n", strerror(errno)); } - free_detection(reader_threads[i].workflow); free_workflow(&reader_threads[i].workflow); } return 0; } -void sighandler(int signum) +static void sighandler(int signum) { fprintf(stderr, "Got a %d\n", signum); |