diff options
author | Luca <deri@ntop.org> | 2017-08-02 20:15:21 +0200 |
---|---|---|
committer | Luca <deri@ntop.org> | 2017-08-02 20:15:21 +0200 |
commit | 3ba3a08141f0c60ab8e970e744ecf7540319b093 (patch) | |
tree | c504f171ae566c3b14b7c890a67597fdc58eb043 /example/ndpiReader.c | |
parent | 1d4eeaa32824d5b10b2a3e1ccf4245709a2374d1 (diff) |
Implemented flow sort based on total bytes so that we can (with -v X) immediately spot elephants and mice
Diffstat (limited to 'example/ndpiReader.c')
-rw-r--r-- | example/ndpiReader.c | 65 |
1 files changed, 53 insertions, 12 deletions
diff --git a/example/ndpiReader.c b/example/ndpiReader.c index 189321968..e0dda2cab 100644 --- a/example/ndpiReader.c +++ b/example/ndpiReader.c @@ -97,7 +97,15 @@ static time_t capture_for = 0; static time_t capture_until = 0; static u_int32_t num_flows; -struct info_pair{ +struct flow_info { + struct ndpi_flow_info *flow; + u_int16_t thread_id; +}; + +static struct flow_info *all_flows; + + +struct info_pair { char addr[48]; char proto[48]; /*app level protocol*/ int count; @@ -167,7 +175,7 @@ static u_int16_t extcap_packet_filter = (u_int16_t)-1; // struct associated to a workflow for a thread struct reader_thread { - struct ndpi_workflow * workflow; + struct ndpi_workflow *workflow; pthread_t pthread; u_int64_t last_idle_scan_time; u_int32_t idle_scan_idx; @@ -331,6 +339,13 @@ int cmpProto(const void *_a, const void *_b) { return(strcmp(a->name, b->name)); } +int cmpFlows(const void *_a, const void *_b) { + struct flow_info *a = (struct flow_info*)_a; + struct flow_info *b = (struct flow_info*)_b; + + return((a->flow->src2dst_bytes + a->flow->dst2src_bytes) < (b->flow->src2dst_bytes + b->flow->dst2src_bytes) ? 1 : -1); +} + void extcap_config() { int i, argidx = 0; struct ndpi_detection_module_struct *ndpi_mod; @@ -555,8 +570,7 @@ static void parseOptions(int argc, char **argv) { } } - if(!bpf_filter_flag) { - + if(!bpf_filter_flag) { if(do_capture) { quiet_mode = 1; extcap_capture(); @@ -670,7 +684,7 @@ char* intoaV4(u_int32_t addr, char* buf, u_int16_t bufLen) { /** * @brief Print the flow */ -static void printFlow(u_int16_t thread_id, struct ndpi_flow_info *flow) { +static void printFlow(u_int16_t id, struct ndpi_flow_info *flow, u_int16_t thread_id) { #ifdef HAVE_JSON_C json_object *jObj; #endif @@ -680,7 +694,7 @@ static void printFlow(u_int16_t thread_id, struct ndpi_flow_info *flow) { return; if(!json_flag) { - fprintf(out, "\t%u", ++num_flows); + fprintf(out, "\t%u", id); fprintf(out, "\t%s ", ipProto2Name(flow->protocol)); @@ -785,8 +799,11 @@ static void node_print_unknown_proto_walker(const void *node, ndpi_VISIT which, if(flow->detected_protocol.app_protocol != NDPI_PROTOCOL_UNKNOWN) return; - if((which == ndpi_preorder) || (which == ndpi_leaf)) /* Avoid walking the same node multiple times */ - printFlow(thread_id, flow); + if((which == ndpi_preorder) || (which == ndpi_leaf)) { + /* Avoid walking the same node multiple times */ + all_flows[num_flows].thread_id = thread_id, all_flows[num_flows].flow = flow; + num_flows++; + } } /** @@ -799,8 +816,11 @@ static void node_print_known_proto_walker(const void *node, ndpi_VISIT which, in if(flow->detected_protocol.app_protocol == NDPI_PROTOCOL_UNKNOWN) return; - if((which == ndpi_preorder) || (which == ndpi_leaf)) /* Avoid walking the same node multiple times */ - printFlow(thread_id, flow); + if((which == ndpi_preorder) || (which == ndpi_leaf)) { + /* Avoid walking the same node multiple times */ + all_flows[num_flows].thread_id = thread_id, all_flows[num_flows].flow = flow; + num_flows++; + } } @@ -837,7 +857,6 @@ static void node_proto_guess_walker(const void *node, ndpi_VISIT which, int dept if(enable_protocol_guess) { if(flow->detected_protocol.app_protocol == NDPI_PROTOCOL_UNKNOWN) { node_guess_undetected_protocol(thread_id, flow); - // printFlow(thread_id, flow); } } @@ -885,6 +904,7 @@ void updateScanners(struct single_flow_info **scanners, const char *saddr, u_int HASH_ADD_INT(f->ports, port, pp); } + else pp->num_flows++; } } @@ -1148,7 +1168,7 @@ static void on_protocol_discovered(struct ndpi_workflow * workflow, } } - printFlow(thread_id, flow); + // printFlow(thread_id, flow); } } @@ -1804,7 +1824,16 @@ static void printResults(u_int64_t tot_usec) { if((verbose == 1) || (verbose == 2)) { FILE *out = results_file ? results_file : stdout; + u_int32_t total_flows = 0; + + for(thread_id = 0; thread_id < num_threads; thread_id++) + total_flows += ndpi_thread_info[thread_id].workflow->num_allocated_flows; + if((all_flows = (struct flow_info*)malloc(sizeof(struct flow_info)*total_flows)) == NULL) { + printf("Fatal error: not enough memory\n"); + exit(-1); + } + if(!json_flag) fprintf(out, "\n"); num_flows = 0; @@ -1813,6 +1842,11 @@ static void printResults(u_int64_t tot_usec) { ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i], node_print_known_proto_walker, &thread_id); } + qsort(all_flows, num_flows, sizeof(struct flow_info), cmpFlows); + + for(i=0; i<num_flows; i++) + printFlow(i+1, all_flows[i].flow, all_flows[i].thread_id); + for(thread_id = 0; thread_id < num_threads; thread_id++) { if(ndpi_thread_info[thread_id].workflow->stats.protocol_counter[0 /* 0 = Unknown */] > 0) { if(!json_flag) { @@ -1834,6 +1868,13 @@ static void printResults(u_int64_t tot_usec) { ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i], node_print_unknown_proto_walker, &thread_id); } } + + qsort(all_flows, num_flows, sizeof(struct flow_info), cmpFlows); + + for(i=0; i<num_flows; i++) + printFlow(i+1, all_flows[i].flow, all_flows[i].thread_id); + + free(all_flows); } if(json_flag != 0) { |