summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlns <matzeton@googlemail.com>2020-06-06 12:55:31 +0200
committerlns <matzeton@googlemail.com>2020-06-06 12:55:31 +0200
commita7069d0b72b793628b0c1c1f036aa91cb883a084 (patch)
tree076434c335433ecea7043fd26b1c443104e8d957
parentfd33b7355f95db09a26a5e1442a325927938a99e (diff)
flow tracking
-rw-r--r--main.c229
1 files changed, 175 insertions, 54 deletions
diff --git a/main.c b/main.c
index 2d4b0b15f..8dc937bef 100644
--- a/main.c
+++ b/main.c
@@ -2,6 +2,7 @@
#include <errno.h>
#include <linux/if_ether.h>
#include <netinet/in.h>
+#include <ndpi/ndpi_api.h>
#include <ndpi/ndpi_main.h>
#include <pcap/pcap.h>
#include <pthread.h>
@@ -16,7 +17,7 @@ enum nDPId_l3_type {
struct nDPId_flow_info {
uint32_t flow_id;
- uint32_t hashval;
+ uint64_t hashval;
enum nDPId_l3_type l3_type;
union {
struct {
@@ -24,8 +25,8 @@ struct nDPId_flow_info {
uint32_t dst;
} v4;
struct {
- struct ndpi_in6_addr src;
- struct ndpi_in6_addr dst;
+ uint64_t src[2];
+ uint64_t dst[2];
} v6;
} ip_tuple;
uint16_t src_port;
@@ -41,7 +42,6 @@ struct nDPId_workflow {
void ** ndpi_flows_root;
size_t max_available_flows;
size_t num_allocated_flows;
-
struct ndpi_detection_module_struct * ndpi_struct;
};
@@ -55,15 +55,23 @@ struct nDPId_reader_thread {
static struct nDPId_reader_thread reader_threads[MAX_READER_THREADS] = {};
static int reader_thread_count = MAX_READER_THREADS;
static int main_thread_shutdown = 0;
+static uint32_t flow_id = 0;
static struct nDPId_workflow * init_workflow(void)
{
+ char pcap_error_buffer[PCAP_ERRBUF_SIZE];
struct nDPId_workflow * workflow = (struct nDPId_workflow *)ndpi_calloc(1, sizeof(*workflow));
if (workflow == NULL) {
return NULL;
}
+ workflow->pcap_handle = pcap_open_live("wifi0" /* "lo" */, /* 1536 */ 65535, 1, 250, pcap_error_buffer);
+ if (workflow->pcap_handle == NULL) {
+ fprintf(stderr, "pcap_open_live: %s\n", pcap_error_buffer);
+ return NULL;
+ }
+
ndpi_init_prefs init_prefs = ndpi_no_prefs;
workflow->ndpi_struct = ndpi_init_detection_module(init_prefs);
if (workflow->ndpi_struct == NULL) {
@@ -86,46 +94,36 @@ static struct nDPId_workflow * init_workflow(void)
return workflow;
}
-static void free_workflow(struct nDPId_workflow ** const workflow)
+static void ndpi_flow_info_freer(void * const node)
{
- if (*workflow == NULL) {
- return;
- }
+ struct nDPId_flow_info * const flow = (struct nDPId_flow_info *)node;
- if ((*workflow)->ndpi_struct != NULL) {
- ndpi_exit_detection_module((*workflow)->ndpi_struct);
- }
- ndpi_free(*workflow);
- *workflow = NULL;
+ ndpi_flow_free(flow->ndpi_flow);
+ ndpi_free(flow);
}
-static int setup_detection(struct nDPId_workflow * const workflow)
+static void free_workflow(struct nDPId_workflow ** const workflow)
{
- char pcap_error_buffer[PCAP_ERRBUF_SIZE];
+ struct nDPId_workflow * const w = *workflow;
- if (workflow == NULL) {
- return 1;
+ if (w == NULL) {
+ return;
}
- workflow->pcap_handle = pcap_open_live("wifi0" /* "lo" */, /* 1536 */ 65535, 1, 250, pcap_error_buffer);
- if (workflow->pcap_handle == NULL) {
- fprintf(stderr, "pcap_open_live: %s\n", pcap_error_buffer);
- return 1;
+ if (w->pcap_handle != NULL) {
+ pcap_close(w->pcap_handle);
+ w->pcap_handle = NULL;
}
- return 0;
-}
-
-static void free_detection(struct nDPId_workflow * const workflow)
-{
- if (workflow == NULL) {
- return;
+ if (w->ndpi_struct != NULL) {
+ ndpi_exit_detection_module(w->ndpi_struct);
}
-
- if (workflow->pcap_handle != NULL) {
- pcap_close(workflow->pcap_handle);
- workflow->pcap_handle = NULL;
+ for(size_t i = 0; i < w->max_available_flows; i++) {
+ ndpi_tdestroy(w->ndpi_flows_root[i], ndpi_flow_info_freer);
}
+ ndpi_free(w->ndpi_flows_root);
+ ndpi_free(w);
+ *workflow = NULL;
}
static int setup_reader_threads(void)
@@ -136,8 +134,7 @@ static int setup_reader_threads(void)
for (int i = 0; i < reader_thread_count; ++i) {
reader_threads[i].workflow = init_workflow();
- if (reader_threads[i].workflow == NULL ||
- setup_detection(reader_threads[i].workflow) != 0)
+ if (reader_threads[i].workflow == NULL)
{
return 1;
}
@@ -170,9 +167,9 @@ static void print_packet_info(int thread_array_index,
src_addr_str, dst_addr_str);
break;
case ETH_P_IPV6:
- inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.src.u6_addr.u6_addr8[0],
+ inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.src[0],
src_addr_str, sizeof(src_addr_str));
- inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.dst.u6_addr.u6_addr8[0],
+ inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.dst[0],
dst_addr_str, sizeof(dst_addr_str));
ret = snprintf(buf + used, sizeof(buf) - used, "IP6[%s -> %s]",
src_addr_str, dst_addr_str);
@@ -214,14 +211,79 @@ static void print_packet_info(int thread_array_index,
printf("%.*s\n", used, buf);
}
+static int ip_tuples_equal(struct nDPId_flow_info const * const A,
+ struct nDPId_flow_info const * const B)
+{
+ if (A->l3_type == L3_IP) {
+ return A->ip_tuple.v4.src == B->ip_tuple.v4.src &&
+ A->ip_tuple.v4.dst == B->ip_tuple.v4.dst;
+ } else if (A->l3_type == L3_IP6) {
+ return A->ip_tuple.v6.src[0] == B->ip_tuple.v6.src[0] &&
+ A->ip_tuple.v6.src[1] == B->ip_tuple.v6.src[1] &&
+ A->ip_tuple.v6.dst[0] == B->ip_tuple.v6.dst[0] &&
+ A->ip_tuple.v6.dst[1] == B->ip_tuple.v6.dst[1];
+ }
+ return 0;
+}
+
+static int ndpi_workflow_node_cmp(void const * const a, void const * const b) {
+ struct nDPId_flow_info const * const fa = (struct nDPId_flow_info*)a;
+ struct nDPId_flow_info const * const fb = (struct nDPId_flow_info*)b;
+
+ if (fa->hashval < fb->hashval) {
+ return(-1);
+ } else if (fa->hashval > fb->hashval) {
+ return(1);
+ }
+
+ /* Flows have the same hash */
+ if (fa->protocol < fb->protocol) {
+ return(-1);
+ } else if (fa->protocol > fb->protocol) {
+ return(1);
+ }
+
+ if (ip_tuples_equal(fa, fb) != 0 &&
+ fa->src_port == fb->src_port &&
+ fa->dst_port == fb->dst_port)
+ {
+ return(0);
+ }
+
+ if (fa->ip_tuple.v4.src < fb->ip_tuple.v4.src) {
+ return(-1);
+ } else if(fa->ip_tuple.v4.dst > fb->ip_tuple.v4.dst) {
+ return(1);
+ }
+ if (fa->ip_tuple.v4.src < fb->ip_tuple.v4.src) {
+ return(-1);
+ } else if(fa->src_port > fb->src_port) {
+ return(1);
+ }
+ if(fa->ip_tuple.v4.dst < fb->ip_tuple.v4.dst) {
+ return(-2);
+ } else if(fa->ip_tuple.v4.dst > fb->ip_tuple.v4.dst) {
+ return(1);
+ }
+ if (fa->dst_port < fb->dst_port) {
+ return(-1);
+ } else if (fa->dst_port > fb->dst_port) {
+ return(1);
+ }
+
+ return(0); /* notreached */
+}
+
static void ndpi_process_packet(uint8_t * const args,
- const struct pcap_pkthdr * const header,
- const uint8_t * const packet)
+ struct pcap_pkthdr const * const header,
+ uint8_t const * const packet)
{
struct nDPId_reader_thread * const reader_thread =
(struct nDPId_reader_thread *)args;
struct nDPId_workflow * workflow;
- struct nDPId_flow_info flow;
+ struct nDPId_flow_info flow = {};
+ void * opaque_flow;
+ size_t hashed_index;
const struct ndpi_ethhdr * ethernet;
const struct ndpi_iphdr * ip;
@@ -333,19 +395,19 @@ static void ndpi_process_packet(uint8_t * const args,
flow.protocol = ip6->ip6_hdr.ip6_un1_nxt;
l4_offset = ip_offset + sizeof(ip6->ip6_hdr);
- flow.ip_tuple.v6.src.u6_addr.u6_addr64[0] = ip6->ip6_src.u6_addr.u6_addr64[0];
- flow.ip_tuple.v6.src.u6_addr.u6_addr64[1] = ip6->ip6_src.u6_addr.u6_addr64[1];
- flow.ip_tuple.v6.dst.u6_addr.u6_addr64[0] = ip6->ip6_dst.u6_addr.u6_addr64[0];
- flow.ip_tuple.v6.dst.u6_addr.u6_addr64[1] = ip6->ip6_dst.u6_addr.u6_addr64[1];
+ flow.ip_tuple.v6.src[0] = ip6->ip6_src.u6_addr.u6_addr64[0];
+ flow.ip_tuple.v6.src[1] = ip6->ip6_src.u6_addr.u6_addr64[1];
+ flow.ip_tuple.v6.dst[0] = ip6->ip6_dst.u6_addr.u6_addr64[0];
+ flow.ip_tuple.v6.dst[1] = ip6->ip6_dst.u6_addr.u6_addr64[1];
uint64_t min_addr[2];
- if (flow.ip_tuple.v6.src.u6_addr.u6_addr64[0] > flow.ip_tuple.v6.dst.u6_addr.u6_addr64[0] &&
- flow.ip_tuple.v6.src.u6_addr.u6_addr64[1] > flow.ip_tuple.v6.dst.u6_addr.u6_addr64[1])
+ if (flow.ip_tuple.v6.src[0] > flow.ip_tuple.v6.dst[0] &&
+ flow.ip_tuple.v6.src[1] > flow.ip_tuple.v6.dst[1])
{
- min_addr[0] = flow.ip_tuple.v6.dst.u6_addr.u6_addr64[0];
- min_addr[1] = flow.ip_tuple.v6.dst.u6_addr.u6_addr64[0];
+ min_addr[0] = flow.ip_tuple.v6.dst[0];
+ min_addr[1] = flow.ip_tuple.v6.dst[0];
} else {
- min_addr[0] = flow.ip_tuple.v6.src.u6_addr.u6_addr64[0];
- min_addr[1] = flow.ip_tuple.v6.src.u6_addr.u6_addr64[0];
+ min_addr[0] = flow.ip_tuple.v6.src[0];
+ min_addr[1] = flow.ip_tuple.v6.src[0];
}
thread_index = min_addr[0] + min_addr[1];
thread_index %= reader_thread_count;
@@ -373,9 +435,69 @@ static void ndpi_process_packet(uint8_t * const args,
}
print_packet_info(reader_thread->array_index, header, type, &flow);
+
+ if (flow.l3_type == L3_IP) {
+ flow.hashval = flow.ip_tuple.v4.src + flow.ip_tuple.v4.dst;
+ } else if (flow.l3_type == L3_IP6) {
+ flow.hashval = flow.ip_tuple.v6.src[0] + flow.ip_tuple.v6.src[1];
+ flow.hashval += flow.ip_tuple.v6.dst[0] + flow.ip_tuple.v6.dst[1];
+ }
+ flow.hashval += flow.protocol + flow.src_port + flow.dst_port;
+
+ hashed_index = flow.hashval % workflow->max_available_flows;
+ opaque_flow = ndpi_tfind(&flow, &workflow->ndpi_flows_root[hashed_index], ndpi_workflow_node_cmp);
+ if (opaque_flow == NULL) {
+ uint64_t orig_src_ip[2] = { flow.ip_tuple.v6.src[0], flow.ip_tuple.v6.src[1] };
+ uint64_t orig_dst_ip[2] = { flow.ip_tuple.v6.dst[0], flow.ip_tuple.v6.dst[1] };
+ uint16_t orig_src_port = flow.src_port;
+ uint16_t orig_dst_port = flow.dst_port;
+
+ flow.ip_tuple.v6.src[0] = orig_dst_ip[0];
+ flow.ip_tuple.v6.src[1] = orig_dst_ip[1];
+ flow.ip_tuple.v6.dst[0] = orig_src_ip[0];
+ flow.ip_tuple.v6.dst[1] = orig_src_ip[1];
+ flow.src_port = orig_dst_port;
+ flow.dst_port = orig_src_port;
+
+ opaque_flow = ndpi_tfind(&flow, &workflow->ndpi_flows_root[hashed_index], ndpi_workflow_node_cmp);
+
+ flow.ip_tuple.v6.src[0] = orig_src_ip[0];
+ flow.ip_tuple.v6.src[1] = orig_src_ip[1];
+ flow.ip_tuple.v6.dst[0] = orig_dst_ip[0];
+ flow.ip_tuple.v6.dst[1] = orig_dst_ip[1];
+ flow.src_port = orig_src_port;
+ flow.dst_port = orig_dst_port;
+ }
+
+ if (opaque_flow == NULL) {
+ if (workflow->num_allocated_flows == workflow->max_available_flows) {
+ fprintf(stderr, "Max flows to track reached: %zu\n", workflow->max_available_flows);
+ return;
+ }
+
+ struct nDPId_flow_info * const newflow = (struct nDPId_flow_info *)ndpi_malloc(sizeof(*newflow));
+ if (newflow == NULL) {
+ fprintf(stderr, "Not enough memory for flow info\n");
+ return;
+ }
+
+ workflow->num_allocated_flows++;
+ memcpy(newflow, &flow, sizeof(*newflow));
+ newflow->flow_id = flow_id++;
+
+ newflow->ndpi_flow = (struct ndpi_flow_struct *)ndpi_flow_malloc(SIZEOF_FLOW_STRUCT);
+ if (newflow->ndpi_flow == NULL) {
+ fprintf(stderr, "Not enough memory for flow struct\n");
+ return;
+ }
+ memset(newflow->ndpi_flow, 0, SIZEOF_FLOW_STRUCT);
+
+ printf("New flow with id %u\n", newflow->flow_id);
+ ndpi_tsearch(newflow, &workflow->ndpi_flows_root[hashed_index], ndpi_workflow_node_cmp); /* Add */
+ }
}
-static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread)
+static void run_pcap_loop(struct nDPId_reader_thread const * const reader_thread)
{
if (reader_thread->workflow != NULL &&
reader_thread->workflow->pcap_handle != NULL) {
@@ -400,7 +522,7 @@ static void break_pcap_loop(struct nDPId_reader_thread * const reader_thread)
static void * processing_thread(void * const ndpi_thread_arg)
{
- struct nDPId_reader_thread * const reader_thread =
+ struct nDPId_reader_thread const * const reader_thread =
(struct nDPId_reader_thread *)ndpi_thread_arg;
printf("Starting ThreadID %d\n", reader_thread->array_index);
@@ -452,7 +574,7 @@ static int stop_reader_threads(void)
for (int i = 0; i < reader_thread_count; ++i) {
if (reader_threads[i].workflow == NULL) {
- continue;;
+ continue;
}
printf("Stopping ThreadID %d\n", reader_threads[i].array_index);
@@ -467,14 +589,13 @@ static int stop_reader_threads(void)
fprintf(stderr, "pthread_join: %s\n", strerror(errno));
}
- free_detection(reader_threads[i].workflow);
free_workflow(&reader_threads[i].workflow);
}
return 0;
}
-void sighandler(int signum)
+static void sighandler(int signum)
{
fprintf(stderr, "Got a %d\n", signum);