aboutsummaryrefslogtreecommitdiff
path: root/example
diff options
context:
space:
mode:
authorLuca <deri@ntop.org>2017-08-02 20:15:21 +0200
committerLuca <deri@ntop.org>2017-08-02 20:15:21 +0200
commit3ba3a08141f0c60ab8e970e744ecf7540319b093 (patch)
treec504f171ae566c3b14b7c890a67597fdc58eb043 /example
parent1d4eeaa32824d5b10b2a3e1ccf4245709a2374d1 (diff)
Implemented flow sort based on total bytes so that we can (with -v X) immediately spot elephants and mice
Diffstat (limited to 'example')
-rw-r--r--example/ndpiReader.c65
-rw-r--r--example/ndpi_util.c5
-rw-r--r--example/ndpi_util.h1
3 files changed, 57 insertions, 14 deletions
diff --git a/example/ndpiReader.c b/example/ndpiReader.c
index 189321968..e0dda2cab 100644
--- a/example/ndpiReader.c
+++ b/example/ndpiReader.c
@@ -97,7 +97,15 @@ static time_t capture_for = 0;
static time_t capture_until = 0;
static u_int32_t num_flows;
-struct info_pair{
+struct flow_info {
+ struct ndpi_flow_info *flow;
+ u_int16_t thread_id;
+};
+
+static struct flow_info *all_flows;
+
+
+struct info_pair {
char addr[48];
char proto[48]; /*app level protocol*/
int count;
@@ -167,7 +175,7 @@ static u_int16_t extcap_packet_filter = (u_int16_t)-1;
// struct associated to a workflow for a thread
struct reader_thread {
- struct ndpi_workflow * workflow;
+ struct ndpi_workflow *workflow;
pthread_t pthread;
u_int64_t last_idle_scan_time;
u_int32_t idle_scan_idx;
@@ -331,6 +339,13 @@ int cmpProto(const void *_a, const void *_b) {
return(strcmp(a->name, b->name));
}
+int cmpFlows(const void *_a, const void *_b) {
+ struct flow_info *a = (struct flow_info*)_a;
+ struct flow_info *b = (struct flow_info*)_b;
+
+ return((a->flow->src2dst_bytes + a->flow->dst2src_bytes) < (b->flow->src2dst_bytes + b->flow->dst2src_bytes) ? 1 : -1);
+}
+
void extcap_config() {
int i, argidx = 0;
struct ndpi_detection_module_struct *ndpi_mod;
@@ -555,8 +570,7 @@ static void parseOptions(int argc, char **argv) {
}
}
- if(!bpf_filter_flag) {
-
+ if(!bpf_filter_flag) {
if(do_capture) {
quiet_mode = 1;
extcap_capture();
@@ -670,7 +684,7 @@ char* intoaV4(u_int32_t addr, char* buf, u_int16_t bufLen) {
/**
* @brief Print the flow
*/
-static void printFlow(u_int16_t thread_id, struct ndpi_flow_info *flow) {
+static void printFlow(u_int16_t id, struct ndpi_flow_info *flow, u_int16_t thread_id) {
#ifdef HAVE_JSON_C
json_object *jObj;
#endif
@@ -680,7 +694,7 @@ static void printFlow(u_int16_t thread_id, struct ndpi_flow_info *flow) {
return;
if(!json_flag) {
- fprintf(out, "\t%u", ++num_flows);
+ fprintf(out, "\t%u", id);
fprintf(out, "\t%s ", ipProto2Name(flow->protocol));
@@ -785,8 +799,11 @@ static void node_print_unknown_proto_walker(const void *node, ndpi_VISIT which,
if(flow->detected_protocol.app_protocol != NDPI_PROTOCOL_UNKNOWN) return;
- if((which == ndpi_preorder) || (which == ndpi_leaf)) /* Avoid walking the same node multiple times */
- printFlow(thread_id, flow);
+ if((which == ndpi_preorder) || (which == ndpi_leaf)) {
+ /* Avoid walking the same node multiple times */
+ all_flows[num_flows].thread_id = thread_id, all_flows[num_flows].flow = flow;
+ num_flows++;
+ }
}
/**
@@ -799,8 +816,11 @@ static void node_print_known_proto_walker(const void *node, ndpi_VISIT which, in
if(flow->detected_protocol.app_protocol == NDPI_PROTOCOL_UNKNOWN) return;
- if((which == ndpi_preorder) || (which == ndpi_leaf)) /* Avoid walking the same node multiple times */
- printFlow(thread_id, flow);
+ if((which == ndpi_preorder) || (which == ndpi_leaf)) {
+ /* Avoid walking the same node multiple times */
+ all_flows[num_flows].thread_id = thread_id, all_flows[num_flows].flow = flow;
+ num_flows++;
+ }
}
@@ -837,7 +857,6 @@ static void node_proto_guess_walker(const void *node, ndpi_VISIT which, int dept
if(enable_protocol_guess) {
if(flow->detected_protocol.app_protocol == NDPI_PROTOCOL_UNKNOWN) {
node_guess_undetected_protocol(thread_id, flow);
- // printFlow(thread_id, flow);
}
}
@@ -885,6 +904,7 @@ void updateScanners(struct single_flow_info **scanners, const char *saddr, u_int
HASH_ADD_INT(f->ports, port, pp);
}
+
else pp->num_flows++;
}
}
@@ -1148,7 +1168,7 @@ static void on_protocol_discovered(struct ndpi_workflow * workflow,
}
}
- printFlow(thread_id, flow);
+ // printFlow(thread_id, flow);
}
}
@@ -1804,7 +1824,16 @@ static void printResults(u_int64_t tot_usec) {
if((verbose == 1) || (verbose == 2)) {
FILE *out = results_file ? results_file : stdout;
+ u_int32_t total_flows = 0;
+
+ for(thread_id = 0; thread_id < num_threads; thread_id++)
+ total_flows += ndpi_thread_info[thread_id].workflow->num_allocated_flows;
+ if((all_flows = (struct flow_info*)malloc(sizeof(struct flow_info)*total_flows)) == NULL) {
+ printf("Fatal error: not enough memory\n");
+ exit(-1);
+ }
+
if(!json_flag) fprintf(out, "\n");
num_flows = 0;
@@ -1813,6 +1842,11 @@ static void printResults(u_int64_t tot_usec) {
ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i], node_print_known_proto_walker, &thread_id);
}
+ qsort(all_flows, num_flows, sizeof(struct flow_info), cmpFlows);
+
+ for(i=0; i<num_flows; i++)
+ printFlow(i+1, all_flows[i].flow, all_flows[i].thread_id);
+
for(thread_id = 0; thread_id < num_threads; thread_id++) {
if(ndpi_thread_info[thread_id].workflow->stats.protocol_counter[0 /* 0 = Unknown */] > 0) {
if(!json_flag) {
@@ -1834,6 +1868,13 @@ static void printResults(u_int64_t tot_usec) {
ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i], node_print_unknown_proto_walker, &thread_id);
}
}
+
+ qsort(all_flows, num_flows, sizeof(struct flow_info), cmpFlows);
+
+ for(i=0; i<num_flows; i++)
+ printFlow(i+1, all_flows[i].flow, all_flows[i].thread_id);
+
+ free(all_flows);
}
if(json_flag != 0) {
diff --git a/example/ndpi_util.c b/example/ndpi_util.c
index 1fc3af15a..618a51882 100644
--- a/example/ndpi_util.c
+++ b/example/ndpi_util.c
@@ -106,7 +106,6 @@ static void free_wrapper(void *freeable) {
/* ***************************************************** */
struct ndpi_workflow * ndpi_workflow_init(const struct ndpi_workflow_prefs * prefs, pcap_t * pcap_handle) {
-
set_ndpi_malloc(malloc_wrapper), set_ndpi_free(free_wrapper);
set_ndpi_flow_malloc(NULL), set_ndpi_flow_free(NULL);
/* TODO: just needed here to init ndpi malloc wrapper */
@@ -309,7 +308,8 @@ static struct ndpi_flow_info *get_ndpi_flow_info(struct ndpi_workflow * workflow
if(newflow == NULL) {
NDPI_LOG(0, workflow->ndpi_struct, NDPI_LOG_ERROR, "[NDPI] %s(1): not enough memory\n", __FUNCTION__);
return(NULL);
- }
+ } else
+ workflow->num_allocated_flows++;
memset(newflow, 0, sizeof(struct ndpi_flow_info));
newflow->hashval = hashval;
@@ -544,6 +544,7 @@ static struct ndpi_proto packet_processing(struct ndpi_workflow * workflow,
/* TODO: When half_free is deprecated, get rid of this */
ndpi_free_flow_info_half(flow);
}
+
return(flow->detected_protocol);
}
diff --git a/example/ndpi_util.h b/example/ndpi_util.h
index 45101cf10..5fe3dc66b 100644
--- a/example/ndpi_util.h
+++ b/example/ndpi_util.h
@@ -130,6 +130,7 @@ typedef struct ndpi_workflow {
/* allocated by prefs */
void **ndpi_flows_root;
struct ndpi_detection_module_struct *ndpi_struct;
+ u_int32_t num_allocated_flows;
} ndpi_workflow_t;