diff options
author | Daniele De Lorenzi <daniele.delorenzi@fastnetserv.net> | 2018-12-20 11:04:03 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-20 11:04:03 +0100 |
commit | 2aea4da9adc3ba87346d01d20bd815004016db4f (patch) | |
tree | 91c94e1645640407f32e0cf5b1097444f6f26271 /example/ndpiReader.c | |
parent | 3b1047b0c8136b85010554ac31f7845c68b5898b (diff) | |
parent | d3be349fa0d03477be1c84fad23fcc37df9bcf67 (diff) |
Merge pull request #10 from ntop/dev
Repo sync
Diffstat (limited to 'example/ndpiReader.c')
-rw-r--r-- | example/ndpiReader.c | 324 |
1 files changed, 200 insertions, 124 deletions
diff --git a/example/ndpiReader.c b/example/ndpiReader.c index 5a8f91139..6c3dfeee8 100644 --- a/example/ndpiReader.c +++ b/example/ndpiReader.c @@ -81,7 +81,8 @@ static json_object *jArray_topStats; static u_int8_t live_capture = 0; static u_int8_t undetected_flows_deleted = 0; /** User preferences **/ -static u_int8_t enable_protocol_guess = 1, verbose = 0, json_flag = 0; +u_int8_t enable_protocol_guess = 1; +static u_int8_t verbose = 0, json_flag = 0; int nDPI_LogLevel = 0; char *_debug_protocols = NULL; static u_int8_t stats_flag = 0, bpf_filter_flag = 0; @@ -93,11 +94,11 @@ static u_int16_t decode_tunnels = 0; static u_int16_t num_loops = 1; static u_int8_t shutdown_app = 0, quiet_mode = 0; static u_int8_t num_threads = 1; -static struct timeval begin, end; +static struct timeval startup_time, begin, end; #ifdef linux static int core_affinity[MAX_NUM_READER_THREADS]; #endif -static struct timeval pcap_start, pcap_end; +static struct timeval pcap_start = { 0, 0}, pcap_end = { 0, 0 }; /** Detection parameters **/ static time_t capture_for = 0; static time_t capture_until = 0; @@ -111,7 +112,6 @@ struct flow_info { static struct flow_info *all_flows; - struct info_pair { u_int32_t addr; u_int8_t version; /* IP version */ @@ -135,16 +135,15 @@ struct port_stats { u_int32_t cumulative_addr; /*cumulative some of IP addresses */ addr_node *addr_tree; /* tree of distinct IP addresses */ struct info_pair top_ip_addrs[MAX_NUM_IP_ADDRESS]; - u_int8_t hasTopHost; /* as boolean flag*/ - u_int32_t top_host; /*host that is contributed to > 95% of traffic*/ - u_int8_t version; /* top host's ip version */ - char proto[16]; /*application level protocol of top host */ - UT_hash_handle hh; /* makes this structure hashable */ + u_int8_t hasTopHost; /* as boolean flag */ + u_int32_t top_host; /* host that is contributed to > 95% of traffic */ + u_int8_t version; /* top host's ip version */ + char proto[16]; /* application level protocol of top host */ + UT_hash_handle hh; /* makes this structure hashable */ }; struct port_stats *srcStats = NULL, *dstStats = NULL; - // struct to hold count of flows received by destination ports struct port_flow_info { u_int32_t port; /* key */ @@ -206,7 +205,9 @@ typedef struct ndpi_id { // used memory counters u_int32_t current_ndpi_memory = 0, max_ndpi_memory = 0; - +#ifdef USE_DPDK +static int dpdk_port_id = 0, dpdk_run_capture = 1; +#endif void test_lib(); /* Forward */ @@ -229,7 +230,11 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle); static void help(u_int long_help) { printf("Welcome to nDPI %s\n\n", ndpi_revision()); - printf("ndpiReader -i <file|device> [-f <filter>][-s <duration>][-m <duration>]\n" + printf("ndpiReader " +#ifndef USE_DPDK + "-i <file|device> " +#endif + "[-f <filter>][-s <duration>][-m <duration>]\n" " [-p <protos>][-l <loops> [-q][-d][-h][-t][-v <level>]\n" " [-n <threads>][-w <file>][-c <file>][-j <file>][-x <file>]\n\n" "Usage:\n" @@ -349,6 +354,8 @@ struct ndpi_proto_sorter { char name[16]; }; +/* ********************************** */ + int cmpProto(const void *_a, const void *_b) { struct ndpi_proto_sorter *a = (struct ndpi_proto_sorter*)_a; struct ndpi_proto_sorter *b = (struct ndpi_proto_sorter*)_b; @@ -356,6 +363,8 @@ int cmpProto(const void *_a, const void *_b) { return(strcmp(a->name, b->name)); } +/* ********************************** */ + int cmpFlows(const void *_a, const void *_b) { struct ndpi_flow_info *fa = ((struct flow_info*)_a)->flow; struct ndpi_flow_info *fb = ((struct flow_info*)_b)->flow; @@ -375,12 +384,14 @@ int cmpFlows(const void *_a, const void *_b) { return(0); } +/* ********************************** */ + void extcap_config() { int i, argidx = 0; struct ndpi_proto_sorter *protos; u_int ndpi_num_supported_protocols = ndpi_get_ndpi_num_supported_protocols(ndpi_info_mod); ndpi_proto_defaults_t *proto_defaults = ndpi_get_proto_defaults(ndpi_info_mod); - + /* -i <interface> */ printf("arg {number=%d}{call=-i}{display=Capture Interface}{type=string}" "{tooltip=The interface name}\n", argidx++); @@ -453,7 +464,18 @@ static void parseOptions(int argc, char **argv) { if(trace) fprintf(trace, " #### %s #### \n", __FUNCTION__); #endif - while ((opt = getopt_long(argc, argv, "c:df:g:i:hp:l:s:tv:V:n:j:rp:w:q0123:456:7:89:m:b:x:", longopts, &option_idx)) != EOF) { +#ifdef USE_DPDK + { + int ret = rte_eal_init(argc, argv); + + if(ret < 0) + rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); + + argc -= ret, argv += ret; + } +#endif + + while((opt = getopt_long(argc, argv, "c:df:g:i:hp:l:s:tv:V:n:j:rp:w:q0123:456:7:89:m:b:x:", longopts, &option_idx)) != EOF) { #ifdef DEBUG_TRACE if(trace) fprintf(trace, " #### -%c [%s] #### \n", opt, optarg ? optarg : ""); #endif @@ -547,7 +569,7 @@ static void parseOptions(int argc, char **argv) { case 'j': #ifndef HAVE_JSON_C - printf("WARNING: this copy of ndpiReader has been compiled without JSON-C: json export disabled\n"); + printf("WARNING: this copy of ndpiReader has been compiled without json-c: JSON export disabled\n"); #else _jsonFilePath = optarg; json_flag = 1; @@ -599,9 +621,9 @@ static void parseOptions(int argc, char **argv) { case '9': extcap_packet_filter = ndpi_get_proto_by_name(ndpi_info_mod, optarg); - if (extcap_packet_filter == NDPI_PROTOCOL_UNKNOWN) extcap_packet_filter = atoi(optarg); + if(extcap_packet_filter == NDPI_PROTOCOL_UNKNOWN) extcap_packet_filter = atoi(optarg); break; - + case 257: _debug_protocols = strdup(optarg); break; @@ -612,6 +634,7 @@ static void parseOptions(int argc, char **argv) { } } +#ifndef USE_DPDK if(!bpf_filter_flag) { if(do_capture) { quiet_mode = 1; @@ -626,7 +649,7 @@ static void parseOptions(int argc, char **argv) { if(strchr(_pcap_file[0], ',')) { /* multiple ingress interfaces */ num_threads = 0; /* setting number of threads = number of interfaces */ __pcap_file = strtok(_pcap_file[0], ","); - while (__pcap_file != NULL && num_threads < MAX_NUM_READER_THREADS) { + while(__pcap_file != NULL && num_threads < MAX_NUM_READER_THREADS) { _pcap_file[num_threads++] = __pcap_file; __pcap_file = strtok(NULL, ","); } @@ -643,25 +666,26 @@ static void parseOptions(int argc, char **argv) { if(num_cores > 1 && bind_mask != NULL) { char *core_id = strtok(bind_mask, ":"); thread_id = 0; - while (core_id != NULL && thread_id < num_threads) { + while(core_id != NULL && thread_id < num_threads) { core_affinity[thread_id++] = atoi(core_id) % num_cores; core_id = strtok(NULL, ":"); } } #endif } +#endif #ifdef DEBUG_TRACE if(trace) fclose(trace); #endif } +/* ********************************** */ /** * @brief From IPPROTO to string NAME */ static char* ipProto2Name(u_int16_t proto_id) { - static char proto[8]; switch(proto_id) { @@ -689,12 +713,12 @@ static char* ipProto2Name(u_int16_t proto_id) { return(proto); } +/* ********************************** */ /** * @brief A faster replacement for inet_ntoa(). */ char* intoaV4(u_int32_t addr, char* buf, u_int16_t bufLen) { - char *cp, *retStr; uint byte; int n; @@ -715,7 +739,7 @@ char* intoaV4(u_int32_t addr, char* buf, u_int16_t bufLen) { } *--cp = '.'; addr >>= 8; - } while (--n > 0); + } while(--n > 0); /* Convert the string to lowercase */ retStr = (char*)(cp+1); @@ -723,6 +747,8 @@ char* intoaV4(u_int32_t addr, char* buf, u_int16_t bufLen) { return(retStr); } +/* ********************************** */ + /** * @brief Print the flow */ @@ -761,10 +787,12 @@ static void printFlow(u_int16_t id, struct ndpi_flow_info *flow, u_int16_t threa fprintf(out, "[proto: %u/%s]", flow->detected_protocol.app_protocol, ndpi_get_proto_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, flow->detected_protocol.app_protocol)); - + if(flow->detected_protocol.category != 0) - fprintf(out, "[cat: %s]", ndpi_category_get_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, - flow->detected_protocol.category)); + fprintf(out, "[cat: %s/%u]", + ndpi_category_get_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, + flow->detected_protocol.category), + (unsigned int)flow->detected_protocol.category); fprintf(out, "[%u pkts/%llu bytes ", flow->src2dst_packets, (long long unsigned int) flow->src2dst_bytes); fprintf(out, "%s %u pkts/%llu bytes]", @@ -790,16 +818,20 @@ static void printFlow(u_int16_t id, struct ndpi_flow_info *flow, u_int16_t threa json_object_object_add(jObj,"host_b.port",json_object_new_int(ntohs(flow->dst_port))); if(flow->detected_protocol.master_protocol) - json_object_object_add(jObj,"detected.master_protocol",json_object_new_int(flow->detected_protocol.master_protocol)); + json_object_object_add(jObj,"detected.master_protocol", + json_object_new_int(flow->detected_protocol.master_protocol)); - json_object_object_add(jObj,"detected.app_protocol",json_object_new_int(flow->detected_protocol.app_protocol)); + json_object_object_add(jObj,"detected.app_protocol", + json_object_new_int(flow->detected_protocol.app_protocol)); if(flow->detected_protocol.master_protocol) { char tmp[256]; snprintf(tmp, sizeof(tmp), "%s.%s", - ndpi_get_proto_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, flow->detected_protocol.master_protocol), - ndpi_get_proto_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, flow->detected_protocol.app_protocol)); + ndpi_get_proto_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, + flow->detected_protocol.master_protocol), + ndpi_get_proto_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, + flow->detected_protocol.app_protocol)); json_object_object_add(jObj,"detected.protocol.name", json_object_new_string(tmp)); @@ -834,12 +866,13 @@ static void printFlow(u_int16_t id, struct ndpi_flow_info *flow, u_int16_t threa } } +/* ********************************** */ /** * @brief Unknown Proto Walker */ -static void node_print_unknown_proto_walker(const void *node, ndpi_VISIT which, int depth, void *user_data) { - +static void node_print_unknown_proto_walker(const void *node, + ndpi_VISIT which, int depth, void *user_data) { struct ndpi_flow_info *flow = *(struct ndpi_flow_info**)node; u_int16_t thread_id = *((u_int16_t*)user_data); @@ -852,11 +885,13 @@ static void node_print_unknown_proto_walker(const void *node, ndpi_VISIT which, } } +/* ********************************** */ + /** * @brief Known Proto Walker */ -static void node_print_known_proto_walker(const void *node, ndpi_VISIT which, int depth, void *user_data) { - +static void node_print_known_proto_walker(const void *node, + ndpi_VISIT which, int depth, void *user_data) { struct ndpi_flow_info *flow = *(struct ndpi_flow_info**)node; u_int16_t thread_id = *((u_int16_t*)user_data); @@ -869,25 +904,7 @@ static void node_print_known_proto_walker(const void *node, ndpi_VISIT which, in } } - -/** - * @brief Guess Undetected Protocol - */ -static u_int16_t node_guess_undetected_protocol(u_int16_t thread_id, struct ndpi_flow_info *flow) { - - flow->detected_protocol = ndpi_guess_undetected_protocol(ndpi_thread_info[thread_id].workflow->ndpi_struct, - flow->protocol, - ntohl(flow->src_ip), - ntohs(flow->src_port), - ntohl(flow->dst_ip), - ntohs(flow->dst_port)); - // printf("Guess state: %u\n", flow->detected_protocol); - if(flow->detected_protocol.app_protocol != NDPI_PROTOCOL_UNKNOWN) - ndpi_thread_info[thread_id].workflow->stats.guessed_flow_protocols++; - - return(flow->detected_protocol.app_protocol); -} - +/* ********************************** */ /** * @brief Proto Guess Walker @@ -898,15 +915,10 @@ static void node_proto_guess_walker(const void *node, ndpi_VISIT which, int dept if((which == ndpi_preorder) || (which == ndpi_leaf)) { /* Avoid walking the same node multiple times */ if((!flow->detection_completed) && flow->ndpi_flow) - flow->detected_protocol = ndpi_detection_giveup(ndpi_thread_info[0].workflow->ndpi_struct, flow->ndpi_flow); - - if(enable_protocol_guess) { - if(flow->detected_protocol.app_protocol == NDPI_PROTOCOL_UNKNOWN) { - node_guess_undetected_protocol(thread_id, flow); - } - } + flow->detected_protocol = ndpi_detection_giveup(ndpi_thread_info[0].workflow->ndpi_struct, flow->ndpi_flow, enable_protocol_guess); process_ndpi_collected_info(ndpi_thread_info[thread_id].workflow, flow); + ndpi_thread_info[thread_id].workflow->stats.protocol_counter[flow->detected_protocol.app_protocol] += flow->src2dst_packets + flow->dst2src_packets; ndpi_thread_info[thread_id].workflow->stats.protocol_counter_bytes[flow->detected_protocol.app_protocol] += flow->src2dst_bytes + flow->dst2src_bytes; ndpi_thread_info[thread_id].workflow->stats.protocol_flows[flow->detected_protocol.app_protocol]++; @@ -967,7 +979,7 @@ int updateIpTree(u_int32_t key, u_int8_t version, if(rootp == (addr_node **)0) return 0; - while (*rootp != (addr_node *)0) { + while(*rootp != (addr_node *)0) { /* Knuth's T1: */ if((version == (*rootp)->version) && (key == (*rootp)->addr)) { /* T2: */ @@ -997,7 +1009,7 @@ int updateIpTree(u_int32_t key, u_int8_t version, /* *********************************************** */ void freeIpTree(addr_node *root) { - if (root == NULL) + if(root == NULL) return; freeIpTree(root->left); @@ -1192,9 +1204,9 @@ static void deleteReceivers(struct receiver *receivers) { /* *********************************************** */ /* implementation of: https://jeroen.massar.ch/presentations/files/FloCon2010-TopK.pdf * - * if (table1.size < max1 || acceptable){ + * if(table1.size < max1 || acceptable){ * create new element and add to the table1 - * if (table1.size > max2) { + * if(table1.size > max2) { * cut table1 back to max1 * merge table 1 to table2 * if(table2.size > max1) @@ -1352,7 +1364,6 @@ static void port_stats_walker(const void *node, ndpi_VISIT which, int depth, voi * @brief Idle Scan Walker */ static void node_idle_scan_walker(const void *node, ndpi_VISIT which, int depth, void *user_data) { - struct ndpi_flow_info *flow = *(struct ndpi_flow_info **) node; u_int16_t thread_id = *((u_int16_t *) user_data); @@ -1379,24 +1390,12 @@ static void node_idle_scan_walker(const void *node, ndpi_VISIT which, int depth, /** - * @brief On Protocol Discover - call node_guess_undetected_protocol() for protocol + * @brief On Protocol Discover - demo callback */ static void on_protocol_discovered(struct ndpi_workflow * workflow, struct ndpi_flow_info * flow, void * udata) { - - const u_int16_t thread_id = (uintptr_t) udata; - - if(verbose > 1) { - if(enable_protocol_guess) { - if(flow->detected_protocol.app_protocol == NDPI_PROTOCOL_UNKNOWN) { - flow->detected_protocol.app_protocol = node_guess_undetected_protocol(thread_id, flow), - flow->detected_protocol.master_protocol = NDPI_PROTOCOL_UNKNOWN; - } - } - - // printFlow(thread_id, flow); - } + ; } #if 0 @@ -1444,7 +1443,6 @@ static void debug_printf(u_int32_t protocol, void *id_struct, * @brief Setup for detection begin */ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle) { - NDPI_PROTOCOL_BITMASK all; struct ndpi_workflow_prefs prefs; @@ -1463,19 +1461,23 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle) { ndpi_set_detection_preferences(ndpi_thread_info[thread_id].workflow->ndpi_struct, ndpi_pref_dns_dissect_response, 0); ndpi_set_detection_preferences(ndpi_thread_info[thread_id].workflow->ndpi_struct, - ndpi_pref_enable_category_substring_match, 0); + ndpi_pref_enable_category_substring_match, 1); ndpi_workflow_set_flow_detected_callback(ndpi_thread_info[thread_id].workflow, - on_protocol_discovered, (void *)(uintptr_t)thread_id); + on_protocol_discovered, + (void *)(uintptr_t)thread_id); // enable all protocols NDPI_BITMASK_SET_ALL(all); ndpi_set_protocol_detection_bitmask2(ndpi_thread_info[thread_id].workflow->ndpi_struct, &all); // clear memory for results - memset(ndpi_thread_info[thread_id].workflow->stats.protocol_counter, 0, sizeof(ndpi_thread_info[thread_id].workflow->stats.protocol_counter)); - memset(ndpi_thread_info[thread_id].workflow->stats.protocol_counter_bytes, 0, sizeof(ndpi_thread_info[thread_id].workflow->stats.protocol_counter_bytes)); - memset(ndpi_thread_info[thread_id].workflow->stats.protocol_flows, 0, sizeof(ndpi_thread_info[thread_id].workflow->stats.protocol_flows)); + memset(ndpi_thread_info[thread_id].workflow->stats.protocol_counter, 0, + sizeof(ndpi_thread_info[thread_id].workflow->stats.protocol_counter)); + memset(ndpi_thread_info[thread_id].workflow->stats.protocol_counter_bytes, 0, + sizeof(ndpi_thread_info[thread_id].workflow->stats.protocol_counter_bytes)); + memset(ndpi_thread_info[thread_id].workflow->stats.protocol_flows, 0, + sizeof(ndpi_thread_info[thread_id].workflow->stats.protocol_flows)); if(_protoFilePath != NULL) ndpi_load_protocols_file(ndpi_thread_info[thread_id].workflow->ndpi_struct, _protoFilePath); @@ -1487,10 +1489,10 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle) { while(fd) { char buffer[512], *line, *name, *category; int i; - + if(!(line = fgets(buffer, sizeof(buffer), fd))) break; - + if(((i = strlen(line)) <= 1) || (line[0] == '#')) continue; else @@ -1501,9 +1503,16 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle) { category = strtok(NULL, "\t"); if(category) { + int fields[4]; + // printf("Loading %s\t%s\n", name, category); - ndpi_load_hostname_category(ndpi_thread_info[thread_id].workflow->ndpi_struct, - name, (ndpi_protocol_category_t)atoi(category)); + + if(sscanf(name, "%d.%d.%d.%d", &fields[0], &fields[1], &fields[2], &fields[3]) == 4) + ndpi_load_ip_category(ndpi_thread_info[thread_id].workflow->ndpi_struct, + name, (ndpi_protocol_category_t)atoi(category)); + else + ndpi_load_hostname_category(ndpi_thread_info[thread_id].workflow->ndpi_struct, + name, (ndpi_protocol_category_t)atoi(category)); } } } @@ -1599,7 +1608,9 @@ static void json_open_stats_file() { static void json_close_stats_file() { json_object *jObjFinal = json_object_new_object(); - json_object_object_add(jObjFinal,"duration.in.seconds",json_object_new_int(pcap_analysis_duration)); + + json_object_object_add(jObjFinal,"duration.in.seconds", + json_object_new_int(pcap_analysis_duration)); json_object_object_add(jObjFinal,"statistics", jArray_topStats); fprintf(stats_fp,"%s\n",json_object_to_json_string(jObjFinal)); fclose(stats_fp); @@ -1879,7 +1890,7 @@ void printPortStats(struct port_stats *stats) { /** * @brief Print result */ -static void printResults(u_int64_t tot_usec) { +static void printResults(u_int64_t processing_time_usec, u_int64_t setup_time_usec) { u_int32_t i; u_int64_t total_flow_bytes = 0; u_int32_t avg_pkt_size = 0; @@ -1888,6 +1899,7 @@ static void printResults(u_int64_t tot_usec) { char buf[32]; #ifdef HAVE_JSON_C FILE *json_fp = NULL; + u_int8_t dont_close_json_fp = 0; json_object *jObj_main = NULL, *jObj_trafficStats, *jArray_detProto = NULL, *jObj; #endif long long unsigned int breed_stats[NUM_BREEDS] = { 0 }; @@ -1900,8 +1912,10 @@ static void printResults(u_int64_t tot_usec) { continue; for(i=0; i<NUM_ROOTS; i++) { - ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i], node_proto_guess_walker, &thread_id); - if(verbose == 3 || stats_flag) ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i], port_stats_walker, &thread_id); + ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i], + node_proto_guess_walker, &thread_id); + if(verbose == 3 || stats_flag) ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i], + port_stats_walker, &thread_id); } /* Stats aggregation */ @@ -1939,7 +1953,9 @@ static void printResults(u_int64_t tot_usec) { printf("\tFlow Memory (per flow): %-13s\n", formatBytes(sizeof(struct ndpi_flow_struct), buf, sizeof(buf))); printf("\tActual Memory: %-13s\n", formatBytes(current_ndpi_memory, buf, sizeof(buf))); printf("\tPeak Memory: %-13s\n", formatBytes(max_ndpi_memory, buf, sizeof(buf))); - + printf("\tSetup Time: %lu msec\n", (unsigned long)(setup_time_usec/1000)); + printf("\tPacket Processing Time: %lu msec\n", (unsigned long)(processing_time_usec/1000)); + if(!json_flag) { printf("\nTraffic statistics:\n"); printf("\tEthernet bytes: %-13llu (includes ethernet CRC/IFC/trailer)\n", @@ -1970,13 +1986,15 @@ static void printResults(u_int64_t tot_usec) { printf("\tPacket Len 1024-1500: %-13lu\n", (unsigned long)cumulative_stats.packet_len[4]); printf("\tPacket Len > 1500: %-13lu\n", (unsigned long)cumulative_stats.packet_len[5]); - if(tot_usec > 0) { + if(processing_time_usec > 0) { char buf[32], buf1[32], when[64]; - float t = (float)(cumulative_stats.ip_packet_count*1000000)/(float)tot_usec; - float b = (float)(cumulative_stats.total_wire_bytes * 8 *1000000)/(float)tot_usec; + float t = (float)(cumulative_stats.ip_packet_count*1000000)/(float)processing_time_usec; + float b = (float)(cumulative_stats.total_wire_bytes * 8 *1000000)/(float)processing_time_usec; float traffic_duration; - if(live_capture) traffic_duration = tot_usec; + + if(live_capture) traffic_duration = processing_time_usec; else traffic_duration = (pcap_end.tv_sec*1000000 + pcap_end.tv_usec) - (pcap_start.tv_sec*1000000 + pcap_start.tv_usec); + printf("\tnDPI throughput: %s pps / %s/sec\n", formatPackets(t, buf), formatTraffic(b, 1, buf1)); t = (float)(cumulative_stats.ip_packet_count*1000000)/(float)traffic_duration; b = (float)(cumulative_stats.total_wire_bytes * 8 *1000000)/(float)traffic_duration; @@ -1986,7 +2004,7 @@ static void printResults(u_int64_t tot_usec) { strftime(when, sizeof(when), "%d/%b/%Y %H:%M:%S", localtime(&pcap_end.tv_sec)); printf("\tAnalysis end: %s\n", when); printf("\tTraffic throughput: %s pps / %s/sec\n", formatPackets(t, buf), formatTraffic(b, 1, buf1)); - printf("\tTraffic duration: %.3f sec\n", traffic_duration/1000000); + printf("\tTraffic duration: %.3f sec\n", traffic_duration/1000000); } if(enable_protocol_guess) @@ -1996,10 +2014,14 @@ static void printResults(u_int64_t tot_usec) { if(json_flag) { #ifdef HAVE_JSON_C - if((json_fp = fopen(_jsonFilePath,"w")) == NULL) { + if(!strcmp(_jsonFilePath, "-")) + json_fp = stderr, dont_close_json_fp = 1; + else if((json_fp = fopen(_jsonFilePath,"w")) == NULL) { printf("Error creating .json file %s\n", _jsonFilePath); json_flag = 0; - } else { + } + + if(json_flag) { jObj_main = json_object_new_object(); jObj_trafficStats = json_object_new_object(); jArray_detProto = json_object_new_array(); @@ -2150,7 +2172,7 @@ static void printResults(u_int64_t tot_usec) { json_object_object_add(jObj_main,"unknown.flows",jArray_unknown_flows); fprintf(json_fp,"%s\n",json_object_to_json_string(jObj_main)); - fclose(json_fp); + if(!dont_close_json_fp) fclose(json_fp); #endif } @@ -2231,9 +2253,13 @@ free_stats: * @brief Force a pcap_dispatch() or pcap_loop() call to return */ static void breakPcapLoop(u_int16_t thread_id) { +#ifdef USE_DPDK + dpdk_run_capture = 0; +#else if(ndpi_thread_info[thread_id].workflow->pcap_handle != NULL) { pcap_breakloop(ndpi_thread_info[thread_id].workflow->pcap_handle); } +#endif } /** @@ -2300,15 +2326,26 @@ static void configurePcapHandle(pcap_t * pcap_handle) { * @brief Open a pcap file or a specified device - Always returns a valid pcap_t */ static pcap_t * openPcapFileOrDevice(u_int16_t thread_id, const u_char * pcap_file) { - u_int snaplen = 1536; int promisc = 1; char pcap_error_buffer[PCAP_ERRBUF_SIZE]; pcap_t * pcap_handle = NULL; /* trying to open a live interface */ - if((pcap_handle = pcap_open_live((char*)pcap_file, snaplen, promisc, - 500, pcap_error_buffer)) == NULL) { +#ifdef USE_DPDK + struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS, + MBUF_CACHE_SIZE, 0, + RTE_MBUF_DEFAULT_BUF_SIZE, + rte_socket_id()); + + if(mbuf_pool == NULL) + rte_exit(EXIT_FAILURE, "Cannot create mbuf pool: are hugepages ok?\n"); + + if(dpdk_port_init(dpdk_port_id, mbuf_pool) != 0) + rte_exit(EXIT_FAILURE, "DPDK: Cannot init port %u: please see README.dpdk\n", dpdk_port_id); +#else + if((pcap_handle = pcap_open_live((char*)pcap_file, snaplen, + promisc, 500, pcap_error_buffer)) == NULL) { capture_for = capture_until = 0; live_capture = 0; @@ -2335,11 +2372,17 @@ static pcap_t * openPcapFileOrDevice(u_int16_t thread_id, const u_char * pcap_fi } else { live_capture = 1; - if((!json_flag) && (!quiet_mode)) + if((!json_flag) && (!quiet_mode)) { +#ifdef USE_DPDK + printf("Capturing from DPDK (port 0)...\n"); +#else printf("Capturing live traffic from device %s...\n", pcap_file); +#endif + } } configurePcapHandle(pcap_handle); +#endif /* !DPDK */ if(capture_for > 0) { if((!json_flag) && (!quiet_mode)) @@ -2357,7 +2400,7 @@ static pcap_t * openPcapFileOrDevice(u_int16_t thread_id, const u_char * pcap_fi /** * @brief Check pcap packet */ -static void pcap_process_packet(u_char *args, +static void ndpi_process_packet(u_char *args, const struct pcap_pkthdr *header, const u_char *packet) { struct ndpi_proto p; @@ -2382,11 +2425,11 @@ static void pcap_process_packet(u_char *args, if(live_capture) { if(ndpi_thread_info[thread_id].last_idle_scan_time + IDLE_SCAN_PERIOD < ndpi_thread_info[thread_id].workflow->last_time) { /* scan for idle flows */ - ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx], node_idle_scan_walker, &thread_id); + ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx], + node_idle_scan_walker, &thread_id); /* remove idle flows (unfortunately we cannot do this inline) */ - while (ndpi_thread_info[thread_id].num_idle_flows > 0) { - + while(ndpi_thread_info[thread_id].num_idle_flows > 0) { /* search and delete the idle flow from the "ndpi_flow_root" (see struct reader thread) - here flows are the node of a b-tree */ ndpi_tdelete(ndpi_thread_info[thread_id].idle_flows[--ndpi_thread_info[thread_id].num_idle_flows], &ndpi_thread_info[thread_id].workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx], @@ -2450,12 +2493,13 @@ static void pcap_process_packet(u_char *args, if((pcap_end.tv_sec-pcap_start.tv_sec) > pcap_analysis_duration) { int i; - u_int64_t tot_usec; + u_int64_t processing_time_usec, setup_time_usec; gettimeofday(&end, NULL); - tot_usec = end.tv_sec*1000000 + end.tv_usec - (begin.tv_sec*1000000 + begin.tv_usec); - - printResults(tot_usec); + processing_time_usec = end.tv_sec*1000000 + end.tv_usec - (begin.tv_sec*1000000 + begin.tv_usec); + setup_time_usec = begin.tv_sec*1000000 + begin.tv_usec - (startup_time.tv_sec*1000000 + startup_time.tv_usec); + + printResults(processing_time_usec, setup_time_usec); for(i=0; i<ndpi_thread_info[thread_id].workflow->prefs.num_roots; i++) { ndpi_tdestroy(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i], ndpi_flow_info_freer); @@ -2464,7 +2508,8 @@ static void pcap_process_packet(u_char *args, memset(&ndpi_thread_info[thread_id].workflow->stats, 0, sizeof(struct ndpi_stats)); } - printf("\n-------------------------------------------\n\n"); + if(!quiet_mode) + printf("\n-------------------------------------------\n\n"); memcpy(&begin, &end, sizeof(begin)); memcpy(&pcap_start, &pcap_end, sizeof(pcap_start)); @@ -2477,20 +2522,20 @@ static void pcap_process_packet(u_char *args, */ static void runPcapLoop(u_int16_t thread_id) { if((!shutdown_app) && (ndpi_thread_info[thread_id].workflow->pcap_handle != NULL)) - pcap_loop(ndpi_thread_info[thread_id].workflow->pcap_handle, -1, &pcap_process_packet, (u_char*)&thread_id); + pcap_loop(ndpi_thread_info[thread_id].workflow->pcap_handle, -1, &ndpi_process_packet, (u_char*)&thread_id); } /** * @brief Process a running thread */ void * processing_thread(void *_thread_id) { - long thread_id = (long) _thread_id; char pcap_error_buffer[PCAP_ERRBUF_SIZE]; #if defined(linux) && defined(HAVE_PTHREAD_SETAFFINITY_NP) if(core_affinity[thread_id] >= 0) { cpu_set_t cpuset; + CPU_ZERO(&cpuset); CPU_SET(core_affinity[thread_id], &cpuset); @@ -2503,6 +2548,33 @@ void * processing_thread(void *_thread_id) { #endif if((!json_flag) && (!quiet_mode)) printf("Running thread %ld...\n", thread_id); +#ifdef USE_DPDK + while(dpdk_run_capture) { + struct rte_mbuf *bufs[BURST_SIZE]; + u_int16_t num = rte_eth_rx_burst(dpdk_port_id, 0, bufs, BURST_SIZE); + u_int i; + + if(num == 0) { + usleep(1); + continue; + } + + for(i = 0; i < PREFETCH_OFFSET && i < num; i++) + rte_prefetch0(rte_pktmbuf_mtod(bufs[i], void *)); + + for(i = 0; i < num; i++) { + char *data = rte_pktmbuf_mtod(bufs[i], char *); + int len = rte_pktmbuf_pkt_len(bufs[i]); + struct pcap_pkthdr h; + + h.len = h.caplen = len; + gettimeofday(&h.ts, NULL); + + ndpi_process_packet((u_char*)&thread_id, &h, (const u_char *)data); + rte_pktmbuf_free(bufs[i]); + } + } +#else pcap_loop: runPcapLoop(thread_id); @@ -2515,6 +2587,7 @@ pcap_loop: goto pcap_loop; } } +#endif return NULL; } @@ -2525,7 +2598,7 @@ pcap_loop: */ void test_lib() { struct timeval end; - u_int64_t tot_usec; + u_int64_t processing_time_usec, setup_time_usec; long thread_id; #ifdef HAVE_JSON_C @@ -2577,10 +2650,11 @@ void test_lib() { } gettimeofday(&end, NULL); - tot_usec = end.tv_sec*1000000 + end.tv_usec - (begin.tv_sec*1000000 + begin.tv_usec); + processing_time_usec = end.tv_sec*1000000 + end.tv_usec - (begin.tv_sec*1000000 + begin.tv_usec); + setup_time_usec = begin.tv_sec*1000000 + begin.tv_usec - (startup_time.tv_sec*1000000 + startup_time.tv_usec); /* Printing cumulative results */ - printResults(tot_usec); + printResults(processing_time_usec, setup_time_usec); if(stats_flag) { #ifdef HAVE_JSON_C @@ -2599,7 +2673,7 @@ void test_lib() { void automataUnitTest() { void *automa; - assert(automa = ndpi_init_automa()); + assert((automa = ndpi_init_automa())); assert(ndpi_add_string_to_automa(automa, "hello") == 0); assert(ndpi_add_string_to_automa(automa, "world") == 0); ndpi_finalize_automa(automa); @@ -3193,17 +3267,19 @@ int orginal_main(int argc, char **argv) { #else int main(int argc, char **argv) { #endif - int i; - + int i; + if(ndpi_get_api_version() != NDPI_API_VERSION) { printf("nDPI Library version mismatch: please make sure this code and the nDPI library are in sync\n"); return(-1); } - + automataUnitTest(); + gettimeofday(&startup_time, NULL); ndpi_info_mod = ndpi_init_detection_module(); - if (ndpi_info_mod == NULL) return -1; + + if(ndpi_info_mod == NULL) return -1; memset(ndpi_thread_info, 0, sizeof(ndpi_thread_info)); |