aboutsummaryrefslogtreecommitdiff
path: root/example/ndpiReader.c
diff options
context:
space:
mode:
authorDaniele De Lorenzi <daniele.delorenzi@fastnetserv.net>2018-12-20 11:04:03 +0100
committerGitHub <noreply@github.com>2018-12-20 11:04:03 +0100
commit2aea4da9adc3ba87346d01d20bd815004016db4f (patch)
tree91c94e1645640407f32e0cf5b1097444f6f26271 /example/ndpiReader.c
parent3b1047b0c8136b85010554ac31f7845c68b5898b (diff)
parentd3be349fa0d03477be1c84fad23fcc37df9bcf67 (diff)
Merge pull request #10 from ntop/dev
Repo sync
Diffstat (limited to 'example/ndpiReader.c')
-rw-r--r--example/ndpiReader.c324
1 files changed, 200 insertions, 124 deletions
diff --git a/example/ndpiReader.c b/example/ndpiReader.c
index 5a8f91139..6c3dfeee8 100644
--- a/example/ndpiReader.c
+++ b/example/ndpiReader.c
@@ -81,7 +81,8 @@ static json_object *jArray_topStats;
static u_int8_t live_capture = 0;
static u_int8_t undetected_flows_deleted = 0;
/** User preferences **/
-static u_int8_t enable_protocol_guess = 1, verbose = 0, json_flag = 0;
+u_int8_t enable_protocol_guess = 1;
+static u_int8_t verbose = 0, json_flag = 0;
int nDPI_LogLevel = 0;
char *_debug_protocols = NULL;
static u_int8_t stats_flag = 0, bpf_filter_flag = 0;
@@ -93,11 +94,11 @@ static u_int16_t decode_tunnels = 0;
static u_int16_t num_loops = 1;
static u_int8_t shutdown_app = 0, quiet_mode = 0;
static u_int8_t num_threads = 1;
-static struct timeval begin, end;
+static struct timeval startup_time, begin, end;
#ifdef linux
static int core_affinity[MAX_NUM_READER_THREADS];
#endif
-static struct timeval pcap_start, pcap_end;
+static struct timeval pcap_start = { 0, 0}, pcap_end = { 0, 0 };
/** Detection parameters **/
static time_t capture_for = 0;
static time_t capture_until = 0;
@@ -111,7 +112,6 @@ struct flow_info {
static struct flow_info *all_flows;
-
struct info_pair {
u_int32_t addr;
u_int8_t version; /* IP version */
@@ -135,16 +135,15 @@ struct port_stats {
u_int32_t cumulative_addr; /*cumulative some of IP addresses */
addr_node *addr_tree; /* tree of distinct IP addresses */
struct info_pair top_ip_addrs[MAX_NUM_IP_ADDRESS];
- u_int8_t hasTopHost; /* as boolean flag*/
- u_int32_t top_host; /*host that is contributed to > 95% of traffic*/
- u_int8_t version; /* top host's ip version */
- char proto[16]; /*application level protocol of top host */
- UT_hash_handle hh; /* makes this structure hashable */
+ u_int8_t hasTopHost; /* as boolean flag */
+ u_int32_t top_host; /* host that is contributed to > 95% of traffic */
+ u_int8_t version; /* top host's ip version */
+ char proto[16]; /* application level protocol of top host */
+ UT_hash_handle hh; /* makes this structure hashable */
};
struct port_stats *srcStats = NULL, *dstStats = NULL;
-
// struct to hold count of flows received by destination ports
struct port_flow_info {
u_int32_t port; /* key */
@@ -206,7 +205,9 @@ typedef struct ndpi_id {
// used memory counters
u_int32_t current_ndpi_memory = 0, max_ndpi_memory = 0;
-
+#ifdef USE_DPDK
+static int dpdk_port_id = 0, dpdk_run_capture = 1;
+#endif
void test_lib(); /* Forward */
@@ -229,7 +230,11 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle);
static void help(u_int long_help) {
printf("Welcome to nDPI %s\n\n", ndpi_revision());
- printf("ndpiReader -i <file|device> [-f <filter>][-s <duration>][-m <duration>]\n"
+ printf("ndpiReader "
+#ifndef USE_DPDK
+ "-i <file|device> "
+#endif
+ "[-f <filter>][-s <duration>][-m <duration>]\n"
" [-p <protos>][-l <loops> [-q][-d][-h][-t][-v <level>]\n"
" [-n <threads>][-w <file>][-c <file>][-j <file>][-x <file>]\n\n"
"Usage:\n"
@@ -349,6 +354,8 @@ struct ndpi_proto_sorter {
char name[16];
};
+/* ********************************** */
+
int cmpProto(const void *_a, const void *_b) {
struct ndpi_proto_sorter *a = (struct ndpi_proto_sorter*)_a;
struct ndpi_proto_sorter *b = (struct ndpi_proto_sorter*)_b;
@@ -356,6 +363,8 @@ int cmpProto(const void *_a, const void *_b) {
return(strcmp(a->name, b->name));
}
+/* ********************************** */
+
int cmpFlows(const void *_a, const void *_b) {
struct ndpi_flow_info *fa = ((struct flow_info*)_a)->flow;
struct ndpi_flow_info *fb = ((struct flow_info*)_b)->flow;
@@ -375,12 +384,14 @@ int cmpFlows(const void *_a, const void *_b) {
return(0);
}
+/* ********************************** */
+
void extcap_config() {
int i, argidx = 0;
struct ndpi_proto_sorter *protos;
u_int ndpi_num_supported_protocols = ndpi_get_ndpi_num_supported_protocols(ndpi_info_mod);
ndpi_proto_defaults_t *proto_defaults = ndpi_get_proto_defaults(ndpi_info_mod);
-
+
/* -i <interface> */
printf("arg {number=%d}{call=-i}{display=Capture Interface}{type=string}"
"{tooltip=The interface name}\n", argidx++);
@@ -453,7 +464,18 @@ static void parseOptions(int argc, char **argv) {
if(trace) fprintf(trace, " #### %s #### \n", __FUNCTION__);
#endif
- while ((opt = getopt_long(argc, argv, "c:df:g:i:hp:l:s:tv:V:n:j:rp:w:q0123:456:7:89:m:b:x:", longopts, &option_idx)) != EOF) {
+#ifdef USE_DPDK
+ {
+ int ret = rte_eal_init(argc, argv);
+
+ if(ret < 0)
+ rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
+
+ argc -= ret, argv += ret;
+ }
+#endif
+
+ while((opt = getopt_long(argc, argv, "c:df:g:i:hp:l:s:tv:V:n:j:rp:w:q0123:456:7:89:m:b:x:", longopts, &option_idx)) != EOF) {
#ifdef DEBUG_TRACE
if(trace) fprintf(trace, " #### -%c [%s] #### \n", opt, optarg ? optarg : "");
#endif
@@ -547,7 +569,7 @@ static void parseOptions(int argc, char **argv) {
case 'j':
#ifndef HAVE_JSON_C
- printf("WARNING: this copy of ndpiReader has been compiled without JSON-C: json export disabled\n");
+ printf("WARNING: this copy of ndpiReader has been compiled without json-c: JSON export disabled\n");
#else
_jsonFilePath = optarg;
json_flag = 1;
@@ -599,9 +621,9 @@ static void parseOptions(int argc, char **argv) {
case '9':
extcap_packet_filter = ndpi_get_proto_by_name(ndpi_info_mod, optarg);
- if (extcap_packet_filter == NDPI_PROTOCOL_UNKNOWN) extcap_packet_filter = atoi(optarg);
+ if(extcap_packet_filter == NDPI_PROTOCOL_UNKNOWN) extcap_packet_filter = atoi(optarg);
break;
-
+
case 257:
_debug_protocols = strdup(optarg);
break;
@@ -612,6 +634,7 @@ static void parseOptions(int argc, char **argv) {
}
}
+#ifndef USE_DPDK
if(!bpf_filter_flag) {
if(do_capture) {
quiet_mode = 1;
@@ -626,7 +649,7 @@ static void parseOptions(int argc, char **argv) {
if(strchr(_pcap_file[0], ',')) { /* multiple ingress interfaces */
num_threads = 0; /* setting number of threads = number of interfaces */
__pcap_file = strtok(_pcap_file[0], ",");
- while (__pcap_file != NULL && num_threads < MAX_NUM_READER_THREADS) {
+ while(__pcap_file != NULL && num_threads < MAX_NUM_READER_THREADS) {
_pcap_file[num_threads++] = __pcap_file;
__pcap_file = strtok(NULL, ",");
}
@@ -643,25 +666,26 @@ static void parseOptions(int argc, char **argv) {
if(num_cores > 1 && bind_mask != NULL) {
char *core_id = strtok(bind_mask, ":");
thread_id = 0;
- while (core_id != NULL && thread_id < num_threads) {
+ while(core_id != NULL && thread_id < num_threads) {
core_affinity[thread_id++] = atoi(core_id) % num_cores;
core_id = strtok(NULL, ":");
}
}
#endif
}
+#endif
#ifdef DEBUG_TRACE
if(trace) fclose(trace);
#endif
}
+/* ********************************** */
/**
* @brief From IPPROTO to string NAME
*/
static char* ipProto2Name(u_int16_t proto_id) {
-
static char proto[8];
switch(proto_id) {
@@ -689,12 +713,12 @@ static char* ipProto2Name(u_int16_t proto_id) {
return(proto);
}
+/* ********************************** */
/**
* @brief A faster replacement for inet_ntoa().
*/
char* intoaV4(u_int32_t addr, char* buf, u_int16_t bufLen) {
-
char *cp, *retStr;
uint byte;
int n;
@@ -715,7 +739,7 @@ char* intoaV4(u_int32_t addr, char* buf, u_int16_t bufLen) {
}
*--cp = '.';
addr >>= 8;
- } while (--n > 0);
+ } while(--n > 0);
/* Convert the string to lowercase */
retStr = (char*)(cp+1);
@@ -723,6 +747,8 @@ char* intoaV4(u_int32_t addr, char* buf, u_int16_t bufLen) {
return(retStr);
}
+/* ********************************** */
+
/**
* @brief Print the flow
*/
@@ -761,10 +787,12 @@ static void printFlow(u_int16_t id, struct ndpi_flow_info *flow, u_int16_t threa
fprintf(out, "[proto: %u/%s]",
flow->detected_protocol.app_protocol,
ndpi_get_proto_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, flow->detected_protocol.app_protocol));
-
+
if(flow->detected_protocol.category != 0)
- fprintf(out, "[cat: %s]", ndpi_category_get_name(ndpi_thread_info[thread_id].workflow->ndpi_struct,
- flow->detected_protocol.category));
+ fprintf(out, "[cat: %s/%u]",
+ ndpi_category_get_name(ndpi_thread_info[thread_id].workflow->ndpi_struct,
+ flow->detected_protocol.category),
+ (unsigned int)flow->detected_protocol.category);
fprintf(out, "[%u pkts/%llu bytes ", flow->src2dst_packets, (long long unsigned int) flow->src2dst_bytes);
fprintf(out, "%s %u pkts/%llu bytes]",
@@ -790,16 +818,20 @@ static void printFlow(u_int16_t id, struct ndpi_flow_info *flow, u_int16_t threa
json_object_object_add(jObj,"host_b.port",json_object_new_int(ntohs(flow->dst_port)));
if(flow->detected_protocol.master_protocol)
- json_object_object_add(jObj,"detected.master_protocol",json_object_new_int(flow->detected_protocol.master_protocol));
+ json_object_object_add(jObj,"detected.master_protocol",
+ json_object_new_int(flow->detected_protocol.master_protocol));
- json_object_object_add(jObj,"detected.app_protocol",json_object_new_int(flow->detected_protocol.app_protocol));
+ json_object_object_add(jObj,"detected.app_protocol",
+ json_object_new_int(flow->detected_protocol.app_protocol));
if(flow->detected_protocol.master_protocol) {
char tmp[256];
snprintf(tmp, sizeof(tmp), "%s.%s",
- ndpi_get_proto_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, flow->detected_protocol.master_protocol),
- ndpi_get_proto_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, flow->detected_protocol.app_protocol));
+ ndpi_get_proto_name(ndpi_thread_info[thread_id].workflow->ndpi_struct,
+ flow->detected_protocol.master_protocol),
+ ndpi_get_proto_name(ndpi_thread_info[thread_id].workflow->ndpi_struct,
+ flow->detected_protocol.app_protocol));
json_object_object_add(jObj,"detected.protocol.name",
json_object_new_string(tmp));
@@ -834,12 +866,13 @@ static void printFlow(u_int16_t id, struct ndpi_flow_info *flow, u_int16_t threa
}
}
+/* ********************************** */
/**
* @brief Unknown Proto Walker
*/
-static void node_print_unknown_proto_walker(const void *node, ndpi_VISIT which, int depth, void *user_data) {
-
+static void node_print_unknown_proto_walker(const void *node,
+ ndpi_VISIT which, int depth, void *user_data) {
struct ndpi_flow_info *flow = *(struct ndpi_flow_info**)node;
u_int16_t thread_id = *((u_int16_t*)user_data);
@@ -852,11 +885,13 @@ static void node_print_unknown_proto_walker(const void *node, ndpi_VISIT which,
}
}
+/* ********************************** */
+
/**
* @brief Known Proto Walker
*/
-static void node_print_known_proto_walker(const void *node, ndpi_VISIT which, int depth, void *user_data) {
-
+static void node_print_known_proto_walker(const void *node,
+ ndpi_VISIT which, int depth, void *user_data) {
struct ndpi_flow_info *flow = *(struct ndpi_flow_info**)node;
u_int16_t thread_id = *((u_int16_t*)user_data);
@@ -869,25 +904,7 @@ static void node_print_known_proto_walker(const void *node, ndpi_VISIT which, in
}
}
-
-/**
- * @brief Guess Undetected Protocol
- */
-static u_int16_t node_guess_undetected_protocol(u_int16_t thread_id, struct ndpi_flow_info *flow) {
-
- flow->detected_protocol = ndpi_guess_undetected_protocol(ndpi_thread_info[thread_id].workflow->ndpi_struct,
- flow->protocol,
- ntohl(flow->src_ip),
- ntohs(flow->src_port),
- ntohl(flow->dst_ip),
- ntohs(flow->dst_port));
- // printf("Guess state: %u\n", flow->detected_protocol);
- if(flow->detected_protocol.app_protocol != NDPI_PROTOCOL_UNKNOWN)
- ndpi_thread_info[thread_id].workflow->stats.guessed_flow_protocols++;
-
- return(flow->detected_protocol.app_protocol);
-}
-
+/* ********************************** */
/**
* @brief Proto Guess Walker
@@ -898,15 +915,10 @@ static void node_proto_guess_walker(const void *node, ndpi_VISIT which, int dept
if((which == ndpi_preorder) || (which == ndpi_leaf)) { /* Avoid walking the same node multiple times */
if((!flow->detection_completed) && flow->ndpi_flow)
- flow->detected_protocol = ndpi_detection_giveup(ndpi_thread_info[0].workflow->ndpi_struct, flow->ndpi_flow);
-
- if(enable_protocol_guess) {
- if(flow->detected_protocol.app_protocol == NDPI_PROTOCOL_UNKNOWN) {
- node_guess_undetected_protocol(thread_id, flow);
- }
- }
+ flow->detected_protocol = ndpi_detection_giveup(ndpi_thread_info[0].workflow->ndpi_struct, flow->ndpi_flow, enable_protocol_guess);
process_ndpi_collected_info(ndpi_thread_info[thread_id].workflow, flow);
+
ndpi_thread_info[thread_id].workflow->stats.protocol_counter[flow->detected_protocol.app_protocol] += flow->src2dst_packets + flow->dst2src_packets;
ndpi_thread_info[thread_id].workflow->stats.protocol_counter_bytes[flow->detected_protocol.app_protocol] += flow->src2dst_bytes + flow->dst2src_bytes;
ndpi_thread_info[thread_id].workflow->stats.protocol_flows[flow->detected_protocol.app_protocol]++;
@@ -967,7 +979,7 @@ int updateIpTree(u_int32_t key, u_int8_t version,
if(rootp == (addr_node **)0)
return 0;
- while (*rootp != (addr_node *)0) {
+ while(*rootp != (addr_node *)0) {
/* Knuth's T1: */
if((version == (*rootp)->version) && (key == (*rootp)->addr)) {
/* T2: */
@@ -997,7 +1009,7 @@ int updateIpTree(u_int32_t key, u_int8_t version,
/* *********************************************** */
void freeIpTree(addr_node *root) {
- if (root == NULL)
+ if(root == NULL)
return;
freeIpTree(root->left);
@@ -1192,9 +1204,9 @@ static void deleteReceivers(struct receiver *receivers) {
/* *********************************************** */
/* implementation of: https://jeroen.massar.ch/presentations/files/FloCon2010-TopK.pdf
*
- * if (table1.size < max1 || acceptable){
+ * if(table1.size < max1 || acceptable){
* create new element and add to the table1
- * if (table1.size > max2) {
+ * if(table1.size > max2) {
* cut table1 back to max1
* merge table 1 to table2
* if(table2.size > max1)
@@ -1352,7 +1364,6 @@ static void port_stats_walker(const void *node, ndpi_VISIT which, int depth, voi
* @brief Idle Scan Walker
*/
static void node_idle_scan_walker(const void *node, ndpi_VISIT which, int depth, void *user_data) {
-
struct ndpi_flow_info *flow = *(struct ndpi_flow_info **) node;
u_int16_t thread_id = *((u_int16_t *) user_data);
@@ -1379,24 +1390,12 @@ static void node_idle_scan_walker(const void *node, ndpi_VISIT which, int depth,
/**
- * @brief On Protocol Discover - call node_guess_undetected_protocol() for protocol
+ * @brief On Protocol Discover - demo callback
*/
static void on_protocol_discovered(struct ndpi_workflow * workflow,
struct ndpi_flow_info * flow,
void * udata) {
-
- const u_int16_t thread_id = (uintptr_t) udata;
-
- if(verbose > 1) {
- if(enable_protocol_guess) {
- if(flow->detected_protocol.app_protocol == NDPI_PROTOCOL_UNKNOWN) {
- flow->detected_protocol.app_protocol = node_guess_undetected_protocol(thread_id, flow),
- flow->detected_protocol.master_protocol = NDPI_PROTOCOL_UNKNOWN;
- }
- }
-
- // printFlow(thread_id, flow);
- }
+ ;
}
#if 0
@@ -1444,7 +1443,6 @@ static void debug_printf(u_int32_t protocol, void *id_struct,
* @brief Setup for detection begin
*/
static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle) {
-
NDPI_PROTOCOL_BITMASK all;
struct ndpi_workflow_prefs prefs;
@@ -1463,19 +1461,23 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle) {
ndpi_set_detection_preferences(ndpi_thread_info[thread_id].workflow->ndpi_struct,
ndpi_pref_dns_dissect_response, 0);
ndpi_set_detection_preferences(ndpi_thread_info[thread_id].workflow->ndpi_struct,
- ndpi_pref_enable_category_substring_match, 0);
+ ndpi_pref_enable_category_substring_match, 1);
ndpi_workflow_set_flow_detected_callback(ndpi_thread_info[thread_id].workflow,
- on_protocol_discovered, (void *)(uintptr_t)thread_id);
+ on_protocol_discovered,
+ (void *)(uintptr_t)thread_id);
// enable all protocols
NDPI_BITMASK_SET_ALL(all);
ndpi_set_protocol_detection_bitmask2(ndpi_thread_info[thread_id].workflow->ndpi_struct, &all);
// clear memory for results
- memset(ndpi_thread_info[thread_id].workflow->stats.protocol_counter, 0, sizeof(ndpi_thread_info[thread_id].workflow->stats.protocol_counter));
- memset(ndpi_thread_info[thread_id].workflow->stats.protocol_counter_bytes, 0, sizeof(ndpi_thread_info[thread_id].workflow->stats.protocol_counter_bytes));
- memset(ndpi_thread_info[thread_id].workflow->stats.protocol_flows, 0, sizeof(ndpi_thread_info[thread_id].workflow->stats.protocol_flows));
+ memset(ndpi_thread_info[thread_id].workflow->stats.protocol_counter, 0,
+ sizeof(ndpi_thread_info[thread_id].workflow->stats.protocol_counter));
+ memset(ndpi_thread_info[thread_id].workflow->stats.protocol_counter_bytes, 0,
+ sizeof(ndpi_thread_info[thread_id].workflow->stats.protocol_counter_bytes));
+ memset(ndpi_thread_info[thread_id].workflow->stats.protocol_flows, 0,
+ sizeof(ndpi_thread_info[thread_id].workflow->stats.protocol_flows));
if(_protoFilePath != NULL)
ndpi_load_protocols_file(ndpi_thread_info[thread_id].workflow->ndpi_struct, _protoFilePath);
@@ -1487,10 +1489,10 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle) {
while(fd) {
char buffer[512], *line, *name, *category;
int i;
-
+
if(!(line = fgets(buffer, sizeof(buffer), fd)))
break;
-
+
if(((i = strlen(line)) <= 1) || (line[0] == '#'))
continue;
else
@@ -1501,9 +1503,16 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle) {
category = strtok(NULL, "\t");
if(category) {
+ int fields[4];
+
// printf("Loading %s\t%s\n", name, category);
- ndpi_load_hostname_category(ndpi_thread_info[thread_id].workflow->ndpi_struct,
- name, (ndpi_protocol_category_t)atoi(category));
+
+ if(sscanf(name, "%d.%d.%d.%d", &fields[0], &fields[1], &fields[2], &fields[3]) == 4)
+ ndpi_load_ip_category(ndpi_thread_info[thread_id].workflow->ndpi_struct,
+ name, (ndpi_protocol_category_t)atoi(category));
+ else
+ ndpi_load_hostname_category(ndpi_thread_info[thread_id].workflow->ndpi_struct,
+ name, (ndpi_protocol_category_t)atoi(category));
}
}
}
@@ -1599,7 +1608,9 @@ static void json_open_stats_file() {
static void json_close_stats_file() {
json_object *jObjFinal = json_object_new_object();
- json_object_object_add(jObjFinal,"duration.in.seconds",json_object_new_int(pcap_analysis_duration));
+
+ json_object_object_add(jObjFinal,"duration.in.seconds",
+ json_object_new_int(pcap_analysis_duration));
json_object_object_add(jObjFinal,"statistics", jArray_topStats);
fprintf(stats_fp,"%s\n",json_object_to_json_string(jObjFinal));
fclose(stats_fp);
@@ -1879,7 +1890,7 @@ void printPortStats(struct port_stats *stats) {
/**
* @brief Print result
*/
-static void printResults(u_int64_t tot_usec) {
+static void printResults(u_int64_t processing_time_usec, u_int64_t setup_time_usec) {
u_int32_t i;
u_int64_t total_flow_bytes = 0;
u_int32_t avg_pkt_size = 0;
@@ -1888,6 +1899,7 @@ static void printResults(u_int64_t tot_usec) {
char buf[32];
#ifdef HAVE_JSON_C
FILE *json_fp = NULL;
+ u_int8_t dont_close_json_fp = 0;
json_object *jObj_main = NULL, *jObj_trafficStats, *jArray_detProto = NULL, *jObj;
#endif
long long unsigned int breed_stats[NUM_BREEDS] = { 0 };
@@ -1900,8 +1912,10 @@ static void printResults(u_int64_t tot_usec) {
continue;
for(i=0; i<NUM_ROOTS; i++) {
- ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i], node_proto_guess_walker, &thread_id);
- if(verbose == 3 || stats_flag) ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i], port_stats_walker, &thread_id);
+ ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i],
+ node_proto_guess_walker, &thread_id);
+ if(verbose == 3 || stats_flag) ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i],
+ port_stats_walker, &thread_id);
}
/* Stats aggregation */
@@ -1939,7 +1953,9 @@ static void printResults(u_int64_t tot_usec) {
printf("\tFlow Memory (per flow): %-13s\n", formatBytes(sizeof(struct ndpi_flow_struct), buf, sizeof(buf)));
printf("\tActual Memory: %-13s\n", formatBytes(current_ndpi_memory, buf, sizeof(buf)));
printf("\tPeak Memory: %-13s\n", formatBytes(max_ndpi_memory, buf, sizeof(buf)));
-
+ printf("\tSetup Time: %lu msec\n", (unsigned long)(setup_time_usec/1000));
+ printf("\tPacket Processing Time: %lu msec\n", (unsigned long)(processing_time_usec/1000));
+
if(!json_flag) {
printf("\nTraffic statistics:\n");
printf("\tEthernet bytes: %-13llu (includes ethernet CRC/IFC/trailer)\n",
@@ -1970,13 +1986,15 @@ static void printResults(u_int64_t tot_usec) {
printf("\tPacket Len 1024-1500: %-13lu\n", (unsigned long)cumulative_stats.packet_len[4]);
printf("\tPacket Len > 1500: %-13lu\n", (unsigned long)cumulative_stats.packet_len[5]);
- if(tot_usec > 0) {
+ if(processing_time_usec > 0) {
char buf[32], buf1[32], when[64];
- float t = (float)(cumulative_stats.ip_packet_count*1000000)/(float)tot_usec;
- float b = (float)(cumulative_stats.total_wire_bytes * 8 *1000000)/(float)tot_usec;
+ float t = (float)(cumulative_stats.ip_packet_count*1000000)/(float)processing_time_usec;
+ float b = (float)(cumulative_stats.total_wire_bytes * 8 *1000000)/(float)processing_time_usec;
float traffic_duration;
- if(live_capture) traffic_duration = tot_usec;
+
+ if(live_capture) traffic_duration = processing_time_usec;
else traffic_duration = (pcap_end.tv_sec*1000000 + pcap_end.tv_usec) - (pcap_start.tv_sec*1000000 + pcap_start.tv_usec);
+
printf("\tnDPI throughput: %s pps / %s/sec\n", formatPackets(t, buf), formatTraffic(b, 1, buf1));
t = (float)(cumulative_stats.ip_packet_count*1000000)/(float)traffic_duration;
b = (float)(cumulative_stats.total_wire_bytes * 8 *1000000)/(float)traffic_duration;
@@ -1986,7 +2004,7 @@ static void printResults(u_int64_t tot_usec) {
strftime(when, sizeof(when), "%d/%b/%Y %H:%M:%S", localtime(&pcap_end.tv_sec));
printf("\tAnalysis end: %s\n", when);
printf("\tTraffic throughput: %s pps / %s/sec\n", formatPackets(t, buf), formatTraffic(b, 1, buf1));
- printf("\tTraffic duration: %.3f sec\n", traffic_duration/1000000);
+ printf("\tTraffic duration: %.3f sec\n", traffic_duration/1000000);
}
if(enable_protocol_guess)
@@ -1996,10 +2014,14 @@ static void printResults(u_int64_t tot_usec) {
if(json_flag) {
#ifdef HAVE_JSON_C
- if((json_fp = fopen(_jsonFilePath,"w")) == NULL) {
+ if(!strcmp(_jsonFilePath, "-"))
+ json_fp = stderr, dont_close_json_fp = 1;
+ else if((json_fp = fopen(_jsonFilePath,"w")) == NULL) {
printf("Error creating .json file %s\n", _jsonFilePath);
json_flag = 0;
- } else {
+ }
+
+ if(json_flag) {
jObj_main = json_object_new_object();
jObj_trafficStats = json_object_new_object();
jArray_detProto = json_object_new_array();
@@ -2150,7 +2172,7 @@ static void printResults(u_int64_t tot_usec) {
json_object_object_add(jObj_main,"unknown.flows",jArray_unknown_flows);
fprintf(json_fp,"%s\n",json_object_to_json_string(jObj_main));
- fclose(json_fp);
+ if(!dont_close_json_fp) fclose(json_fp);
#endif
}
@@ -2231,9 +2253,13 @@ free_stats:
* @brief Force a pcap_dispatch() or pcap_loop() call to return
*/
static void breakPcapLoop(u_int16_t thread_id) {
+#ifdef USE_DPDK
+ dpdk_run_capture = 0;
+#else
if(ndpi_thread_info[thread_id].workflow->pcap_handle != NULL) {
pcap_breakloop(ndpi_thread_info[thread_id].workflow->pcap_handle);
}
+#endif
}
/**
@@ -2300,15 +2326,26 @@ static void configurePcapHandle(pcap_t * pcap_handle) {
* @brief Open a pcap file or a specified device - Always returns a valid pcap_t
*/
static pcap_t * openPcapFileOrDevice(u_int16_t thread_id, const u_char * pcap_file) {
-
u_int snaplen = 1536;
int promisc = 1;
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
pcap_t * pcap_handle = NULL;
/* trying to open a live interface */
- if((pcap_handle = pcap_open_live((char*)pcap_file, snaplen, promisc,
- 500, pcap_error_buffer)) == NULL) {
+#ifdef USE_DPDK
+ struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS,
+ MBUF_CACHE_SIZE, 0,
+ RTE_MBUF_DEFAULT_BUF_SIZE,
+ rte_socket_id());
+
+ if(mbuf_pool == NULL)
+ rte_exit(EXIT_FAILURE, "Cannot create mbuf pool: are hugepages ok?\n");
+
+ if(dpdk_port_init(dpdk_port_id, mbuf_pool) != 0)
+ rte_exit(EXIT_FAILURE, "DPDK: Cannot init port %u: please see README.dpdk\n", dpdk_port_id);
+#else
+ if((pcap_handle = pcap_open_live((char*)pcap_file, snaplen,
+ promisc, 500, pcap_error_buffer)) == NULL) {
capture_for = capture_until = 0;
live_capture = 0;
@@ -2335,11 +2372,17 @@ static pcap_t * openPcapFileOrDevice(u_int16_t thread_id, const u_char * pcap_fi
} else {
live_capture = 1;
- if((!json_flag) && (!quiet_mode))
+ if((!json_flag) && (!quiet_mode)) {
+#ifdef USE_DPDK
+ printf("Capturing from DPDK (port 0)...\n");
+#else
printf("Capturing live traffic from device %s...\n", pcap_file);
+#endif
+ }
}
configurePcapHandle(pcap_handle);
+#endif /* !DPDK */
if(capture_for > 0) {
if((!json_flag) && (!quiet_mode))
@@ -2357,7 +2400,7 @@ static pcap_t * openPcapFileOrDevice(u_int16_t thread_id, const u_char * pcap_fi
/**
* @brief Check pcap packet
*/
-static void pcap_process_packet(u_char *args,
+static void ndpi_process_packet(u_char *args,
const struct pcap_pkthdr *header,
const u_char *packet) {
struct ndpi_proto p;
@@ -2382,11 +2425,11 @@ static void pcap_process_packet(u_char *args,
if(live_capture) {
if(ndpi_thread_info[thread_id].last_idle_scan_time + IDLE_SCAN_PERIOD < ndpi_thread_info[thread_id].workflow->last_time) {
/* scan for idle flows */
- ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx], node_idle_scan_walker, &thread_id);
+ ndpi_twalk(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx],
+ node_idle_scan_walker, &thread_id);
/* remove idle flows (unfortunately we cannot do this inline) */
- while (ndpi_thread_info[thread_id].num_idle_flows > 0) {
-
+ while(ndpi_thread_info[thread_id].num_idle_flows > 0) {
/* search and delete the idle flow from the "ndpi_flow_root" (see struct reader thread) - here flows are the node of a b-tree */
ndpi_tdelete(ndpi_thread_info[thread_id].idle_flows[--ndpi_thread_info[thread_id].num_idle_flows],
&ndpi_thread_info[thread_id].workflow->ndpi_flows_root[ndpi_thread_info[thread_id].idle_scan_idx],
@@ -2450,12 +2493,13 @@ static void pcap_process_packet(u_char *args,
if((pcap_end.tv_sec-pcap_start.tv_sec) > pcap_analysis_duration) {
int i;
- u_int64_t tot_usec;
+ u_int64_t processing_time_usec, setup_time_usec;
gettimeofday(&end, NULL);
- tot_usec = end.tv_sec*1000000 + end.tv_usec - (begin.tv_sec*1000000 + begin.tv_usec);
-
- printResults(tot_usec);
+ processing_time_usec = end.tv_sec*1000000 + end.tv_usec - (begin.tv_sec*1000000 + begin.tv_usec);
+ setup_time_usec = begin.tv_sec*1000000 + begin.tv_usec - (startup_time.tv_sec*1000000 + startup_time.tv_usec);
+
+ printResults(processing_time_usec, setup_time_usec);
for(i=0; i<ndpi_thread_info[thread_id].workflow->prefs.num_roots; i++) {
ndpi_tdestroy(ndpi_thread_info[thread_id].workflow->ndpi_flows_root[i], ndpi_flow_info_freer);
@@ -2464,7 +2508,8 @@ static void pcap_process_packet(u_char *args,
memset(&ndpi_thread_info[thread_id].workflow->stats, 0, sizeof(struct ndpi_stats));
}
- printf("\n-------------------------------------------\n\n");
+ if(!quiet_mode)
+ printf("\n-------------------------------------------\n\n");
memcpy(&begin, &end, sizeof(begin));
memcpy(&pcap_start, &pcap_end, sizeof(pcap_start));
@@ -2477,20 +2522,20 @@ static void pcap_process_packet(u_char *args,
*/
static void runPcapLoop(u_int16_t thread_id) {
if((!shutdown_app) && (ndpi_thread_info[thread_id].workflow->pcap_handle != NULL))
- pcap_loop(ndpi_thread_info[thread_id].workflow->pcap_handle, -1, &pcap_process_packet, (u_char*)&thread_id);
+ pcap_loop(ndpi_thread_info[thread_id].workflow->pcap_handle, -1, &ndpi_process_packet, (u_char*)&thread_id);
}
/**
* @brief Process a running thread
*/
void * processing_thread(void *_thread_id) {
-
long thread_id = (long) _thread_id;
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
#if defined(linux) && defined(HAVE_PTHREAD_SETAFFINITY_NP)
if(core_affinity[thread_id] >= 0) {
cpu_set_t cpuset;
+
CPU_ZERO(&cpuset);
CPU_SET(core_affinity[thread_id], &cpuset);
@@ -2503,6 +2548,33 @@ void * processing_thread(void *_thread_id) {
#endif
if((!json_flag) && (!quiet_mode)) printf("Running thread %ld...\n", thread_id);
+#ifdef USE_DPDK
+ while(dpdk_run_capture) {
+ struct rte_mbuf *bufs[BURST_SIZE];
+ u_int16_t num = rte_eth_rx_burst(dpdk_port_id, 0, bufs, BURST_SIZE);
+ u_int i;
+
+ if(num == 0) {
+ usleep(1);
+ continue;
+ }
+
+ for(i = 0; i < PREFETCH_OFFSET && i < num; i++)
+ rte_prefetch0(rte_pktmbuf_mtod(bufs[i], void *));
+
+ for(i = 0; i < num; i++) {
+ char *data = rte_pktmbuf_mtod(bufs[i], char *);
+ int len = rte_pktmbuf_pkt_len(bufs[i]);
+ struct pcap_pkthdr h;
+
+ h.len = h.caplen = len;
+ gettimeofday(&h.ts, NULL);
+
+ ndpi_process_packet((u_char*)&thread_id, &h, (const u_char *)data);
+ rte_pktmbuf_free(bufs[i]);
+ }
+ }
+#else
pcap_loop:
runPcapLoop(thread_id);
@@ -2515,6 +2587,7 @@ pcap_loop:
goto pcap_loop;
}
}
+#endif
return NULL;
}
@@ -2525,7 +2598,7 @@ pcap_loop:
*/
void test_lib() {
struct timeval end;
- u_int64_t tot_usec;
+ u_int64_t processing_time_usec, setup_time_usec;
long thread_id;
#ifdef HAVE_JSON_C
@@ -2577,10 +2650,11 @@ void test_lib() {
}
gettimeofday(&end, NULL);
- tot_usec = end.tv_sec*1000000 + end.tv_usec - (begin.tv_sec*1000000 + begin.tv_usec);
+ processing_time_usec = end.tv_sec*1000000 + end.tv_usec - (begin.tv_sec*1000000 + begin.tv_usec);
+ setup_time_usec = begin.tv_sec*1000000 + begin.tv_usec - (startup_time.tv_sec*1000000 + startup_time.tv_usec);
/* Printing cumulative results */
- printResults(tot_usec);
+ printResults(processing_time_usec, setup_time_usec);
if(stats_flag) {
#ifdef HAVE_JSON_C
@@ -2599,7 +2673,7 @@ void test_lib() {
void automataUnitTest() {
void *automa;
- assert(automa = ndpi_init_automa());
+ assert((automa = ndpi_init_automa()));
assert(ndpi_add_string_to_automa(automa, "hello") == 0);
assert(ndpi_add_string_to_automa(automa, "world") == 0);
ndpi_finalize_automa(automa);
@@ -3193,17 +3267,19 @@ int orginal_main(int argc, char **argv) {
#else
int main(int argc, char **argv) {
#endif
- int i;
-
+ int i;
+
if(ndpi_get_api_version() != NDPI_API_VERSION) {
printf("nDPI Library version mismatch: please make sure this code and the nDPI library are in sync\n");
return(-1);
}
-
+
automataUnitTest();
+ gettimeofday(&startup_time, NULL);
ndpi_info_mod = ndpi_init_detection_module();
- if (ndpi_info_mod == NULL) return -1;
+
+ if(ndpi_info_mod == NULL) return -1;
memset(ndpi_thread_info, 0, sizeof(ndpi_thread_info));