diff options
author | Luca Deri <deri@ntop.org> | 2018-11-10 16:10:22 +0100 |
---|---|---|
committer | Luca Deri <deri@ntop.org> | 2018-11-10 16:10:22 +0100 |
commit | 71b2c19cf28f72ad2e876719af88c7841c2aea84 (patch) | |
tree | 8d5d8c1be904f1b8ff88aaa3adcac0bdcfbbfdff | |
parent | 6d929bf4cc48d54db91678a24ef0afb2e20382d7 (diff) |
Added DPDK support to ndpiReader
-rw-r--r-- | configure.seed | 11 | ||||
-rw-r--r-- | example/Makefile.dpdk.in | 27 | ||||
-rw-r--r-- | example/Makefile.in | 5 | ||||
-rw-r--r-- | example/README.DPDK | 31 | ||||
-rw-r--r-- | example/ndpiReader.c | 106 | ||||
-rw-r--r-- | example/ndpi_util.c | 78 | ||||
-rw-r--r-- | example/ndpi_util.h | 18 |
7 files changed, 242 insertions, 34 deletions
diff --git a/configure.seed b/configure.seed index f3b267904..11220f885 100644 --- a/configure.seed +++ b/configure.seed @@ -40,6 +40,14 @@ AC_CHECK_HEADERS([netinet/in.h stdint.h stdlib.h string.h unistd.h]) PCAP_HOME=$HOME/PF_RING/userland +DPDK_TARGET= +if test -d $HOME/DPDK; then : + echo "Enabling DPDK support in ndpiReader" + DPDK_TARGET=dpdk +else + echo "DPDK support disabled (missing $HOME/DPDK)" +fi + if test -d $PCAP_HOME; then : echo -n "" else @@ -138,7 +146,7 @@ AC_ARG_ENABLE([debug-messages], AC_CHECK_LIB(pthread, pthread_setaffinity_np, AC_DEFINE_UNQUOTED(HAVE_PTHREAD_SETAFFINITY_NP, 1, [libc has pthread_setaffinity_np])) -AC_CONFIG_FILES([Makefile example/Makefile tests/Makefile libndpi.pc src/include/ndpi_define.h src/lib/Makefile]) +AC_CONFIG_FILES([Makefile example/Makefile example/Makefile.dpdk tests/Makefile libndpi.pc src/include/ndpi_define.h src/lib/Makefile]) AC_CONFIG_HEADERS(src/include/ndpi_config.h) AC_SUBST(GIT_RELEASE) AC_SUBST(NDPI_MAJOR) @@ -152,6 +160,7 @@ AC_SUBST(PCAP_LIB) AC_SUBST(DL_LIB) AC_SUBST(HS_LIB) AC_SUBST(HS_INC) +AC_SUBST(DPDK_TARGET) AC_SUBST(HAVE_PTHREAD_SETAFFINITY_NP) AC_OUTPUT diff --git a/example/Makefile.dpdk.in b/example/Makefile.dpdk.in new file mode 100644 index 000000000..8519d7d0e --- /dev/null +++ b/example/Makefile.dpdk.in @@ -0,0 +1,27 @@ +# +# Run 'make -f Makefile.dpdk' to compile the DPDK examples +# +# See http://core.dpdk.org/doc/quick-start/ for DPDK installation and setup +# +ifeq ($(RTE_SDK),) +#$(error "Please define RTE_SDK environment variable") +RTE_SDK = $(HOME)/DPDK +RTE_TARGET = build +endif + +# Default target, can be overridden by command line or environment +RTE_TARGET ?= x86_64-native-linuxapp-gcc + +include $(RTE_SDK)/mk/rte.vars.mk + +APP = ndpiReader +LIBNDPI = $(PWD)/../src/lib/libndpi.a + +SRCS-y := ndpi_util.c ndpiReader.c + +CFLAGS += -g +CFLAGS += -Wno-strict-prototypes -Wno-missing-prototypes -Wno-missing-declarations -Wno-unused-parameter -I $(PWD)/../src/include @CFLAGS@ -DUSE_DPDK +LDLIBS = $(LIBNDPI) -lpthread @LDFLAGS@ + +include $(RTE_SDK)/mk/rte.extapp.mk + diff --git a/example/Makefile.in b/example/Makefile.in index 8c18f94dc..a5ca6acce 100644 --- a/example/Makefile.in +++ b/example/Makefile.in @@ -5,7 +5,7 @@ LDFLAGS=$(LIBNDPI) -lpcap -lpthread @LDFLAGS@ OBJS=ndpiReader.o ndpi_util.o PREFIX?=/usr/local -all: ndpiReader +all: ndpiReader @DPDK_TARGET@ ndpiReader: $(OBJS) $(LIBNDPI) $(CXX) $(CFLAGS) $(OBJS) -o $@ $(LDFLAGS) @@ -16,6 +16,9 @@ ndpiReader: $(OBJS) $(LIBNDPI) install: cp ndpiReader $(DESTDIR)$(PREFIX)/bin +dpdk: + make -f Makefile.dpdk + clean: /bin/rm -f *.o ndpiReader diff --git a/example/README.DPDK b/example/README.DPDK new file mode 100644 index 000000000..472597c3f --- /dev/null +++ b/example/README.DPDK @@ -0,0 +1,31 @@ +Prerequisites +------------- + +You need to install and compile DPDK in your HOME directory as explained in +See http://core.dpdk.org/doc/quick-start/ for DPDK installation and setup + +Once DPDK is built make sure to create a symbolic link + +$ cd +$ ln -s dpdk-18.08 DPDK + +so the build process will use the DPDK directory letting you have multiple +DPDK versions available on your system + + +Build +----- +Everything will happen automagically but if you want to do it by hand +do: make -f Makefile.dpdk + + +Run Application +--------------- +Supposing to capture packets from device eno1 you can start the +application as follows: + +sudo ./build/ndpiReader -c 1 --vdev=net_pcap0,iface=eno1 -- -v 1 + +NOTE: +- ndpiReader without DPDK support sits in this directory +- ndpiReader with DPDK support can be found inside the ./build directory diff --git a/example/ndpiReader.c b/example/ndpiReader.c index faa453ee2..89f5714a8 100644 --- a/example/ndpiReader.c +++ b/example/ndpiReader.c @@ -204,7 +204,9 @@ typedef struct ndpi_id { // used memory counters u_int32_t current_ndpi_memory = 0, max_ndpi_memory = 0; - +#ifdef USE_DPDK +static int dpdk_port_id = 0, dpdk_run_capture = 1; +#endif void test_lib(); /* Forward */ @@ -227,7 +229,11 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle); static void help(u_int long_help) { printf("Welcome to nDPI %s\n\n", ndpi_revision()); - printf("ndpiReader -i <file|device> [-f <filter>][-s <duration>][-m <duration>]\n" + printf("ndpiReader " +#ifndef USE_DPDK + "-i <file|device> " +#endif + "[-f <filter>][-s <duration>][-m <duration>]\n" " [-p <protos>][-l <loops> [-q][-d][-h][-t][-v <level>]\n" " [-n <threads>][-w <file>][-c <file>][-j <file>][-x <file>]\n\n" "Usage:\n" @@ -457,7 +463,18 @@ static void parseOptions(int argc, char **argv) { if(trace) fprintf(trace, " #### %s #### \n", __FUNCTION__); #endif - while ((opt = getopt_long(argc, argv, "c:df:g:i:hp:l:s:tv:V:n:j:rp:w:q0123:456:7:89:m:b:x:", longopts, &option_idx)) != EOF) { +#ifdef USE_DPDK + { + int ret = rte_eal_init(argc, argv); + + if(ret < 0) + rte_exit(EXIT_FAILURE, "Error with EAL initialization\n"); + + argc -= ret, argv += ret; + } +#endif + + while((opt = getopt_long(argc, argv, "c:df:g:i:hp:l:s:tv:V:n:j:rp:w:q0123:456:7:89:m:b:x:", longopts, &option_idx)) != EOF) { #ifdef DEBUG_TRACE if(trace) fprintf(trace, " #### -%c [%s] #### \n", opt, optarg ? optarg : ""); #endif @@ -603,7 +620,7 @@ static void parseOptions(int argc, char **argv) { case '9': extcap_packet_filter = ndpi_get_proto_by_name(ndpi_info_mod, optarg); - if (extcap_packet_filter == NDPI_PROTOCOL_UNKNOWN) extcap_packet_filter = atoi(optarg); + if(extcap_packet_filter == NDPI_PROTOCOL_UNKNOWN) extcap_packet_filter = atoi(optarg); break; case 257: @@ -616,6 +633,7 @@ static void parseOptions(int argc, char **argv) { } } +#ifndef USE_DPDK if(!bpf_filter_flag) { if(do_capture) { quiet_mode = 1; @@ -630,7 +648,7 @@ static void parseOptions(int argc, char **argv) { if(strchr(_pcap_file[0], ',')) { /* multiple ingress interfaces */ num_threads = 0; /* setting number of threads = number of interfaces */ __pcap_file = strtok(_pcap_file[0], ","); - while (__pcap_file != NULL && num_threads < MAX_NUM_READER_THREADS) { + while(__pcap_file != NULL && num_threads < MAX_NUM_READER_THREADS) { _pcap_file[num_threads++] = __pcap_file; __pcap_file = strtok(NULL, ","); } @@ -647,13 +665,14 @@ static void parseOptions(int argc, char **argv) { if(num_cores > 1 && bind_mask != NULL) { char *core_id = strtok(bind_mask, ":"); thread_id = 0; - while (core_id != NULL && thread_id < num_threads) { + while(core_id != NULL && thread_id < num_threads) { core_affinity[thread_id++] = atoi(core_id) % num_cores; core_id = strtok(NULL, ":"); } } #endif } +#endif #ifdef DEBUG_TRACE if(trace) fclose(trace); @@ -719,7 +738,7 @@ char* intoaV4(u_int32_t addr, char* buf, u_int16_t bufLen) { } *--cp = '.'; addr >>= 8; - } while (--n > 0); + } while(--n > 0); /* Convert the string to lowercase */ retStr = (char*)(cp+1); @@ -769,7 +788,7 @@ static void printFlow(u_int16_t id, struct ndpi_flow_info *flow, u_int16_t threa ndpi_get_proto_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, flow->detected_protocol.app_protocol)); if(flow->detected_protocol.category != 0) - fprintf(out, "[cat: %s/%u]", + fprintf(out, "[cat: %s/%u]", ndpi_category_get_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, flow->detected_protocol.category), (unsigned int)flow->detected_protocol.category); @@ -924,7 +943,7 @@ static void node_proto_guess_walker(const void *node, ndpi_VISIT which, int dept } process_ndpi_collected_info(ndpi_thread_info[thread_id].workflow, flow); - + ndpi_thread_info[thread_id].workflow->stats.protocol_counter[flow->detected_protocol.app_protocol] += flow->src2dst_packets + flow->dst2src_packets; ndpi_thread_info[thread_id].workflow->stats.protocol_counter_bytes[flow->detected_protocol.app_protocol] += flow->src2dst_bytes + flow->dst2src_bytes; ndpi_thread_info[thread_id].workflow->stats.protocol_flows[flow->detected_protocol.app_protocol]++; @@ -985,7 +1004,7 @@ int updateIpTree(u_int32_t key, u_int8_t version, if(rootp == (addr_node **)0) return 0; - while (*rootp != (addr_node *)0) { + while(*rootp != (addr_node *)0) { /* Knuth's T1: */ if((version == (*rootp)->version) && (key == (*rootp)->addr)) { /* T2: */ @@ -1015,7 +1034,7 @@ int updateIpTree(u_int32_t key, u_int8_t version, /* *********************************************** */ void freeIpTree(addr_node *root) { - if (root == NULL) + if(root == NULL) return; freeIpTree(root->left); @@ -1210,9 +1229,9 @@ static void deleteReceivers(struct receiver *receivers) { /* *********************************************** */ /* implementation of: https://jeroen.massar.ch/presentations/files/FloCon2010-TopK.pdf * - * if (table1.size < max1 || acceptable){ + * if(table1.size < max1 || acceptable){ * create new element and add to the table1 - * if (table1.size > max2) { + * if(table1.size > max2) { * cut table1 back to max1 * merge table 1 to table2 * if(table2.size > max1) @@ -2266,9 +2285,13 @@ free_stats: * @brief Force a pcap_dispatch() or pcap_loop() call to return */ static void breakPcapLoop(u_int16_t thread_id) { +#ifdef USE_DPDK + dpdk_run_capture = 0; +#else if(ndpi_thread_info[thread_id].workflow->pcap_handle != NULL) { pcap_breakloop(ndpi_thread_info[thread_id].workflow->pcap_handle); } +#endif } /** @@ -2335,15 +2358,26 @@ static void configurePcapHandle(pcap_t * pcap_handle) { * @brief Open a pcap file or a specified device - Always returns a valid pcap_t */ static pcap_t * openPcapFileOrDevice(u_int16_t thread_id, const u_char * pcap_file) { - u_int snaplen = 1536; int promisc = 1; char pcap_error_buffer[PCAP_ERRBUF_SIZE]; pcap_t * pcap_handle = NULL; /* trying to open a live interface */ - if((pcap_handle = pcap_open_live((char*)pcap_file, snaplen, promisc, - 500, pcap_error_buffer)) == NULL) { +#ifdef USE_DPDK + struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS, + MBUF_CACHE_SIZE, 0, + RTE_MBUF_DEFAULT_BUF_SIZE, + rte_socket_id()); + + if(mbuf_pool == NULL) + rte_exit(EXIT_FAILURE, "Cannot create mbuf pool: are hugepages ok?\n"); + + if(dpdk_port_init(dpdk_port_id, mbuf_pool) != 0) + rte_exit(EXIT_FAILURE, "DPDK: Cannot init port %u: please see README.dpdk\n", dpdk_port_id); +#else + if((pcap_handle = pcap_open_live((char*)pcap_file, snaplen, + promisc, 500, pcap_error_buffer)) == NULL) { capture_for = capture_until = 0; live_capture = 0; @@ -2370,11 +2404,17 @@ static pcap_t * openPcapFileOrDevice(u_int16_t thread_id, const u_char * pcap_fi } else { live_capture = 1; - if((!json_flag) && (!quiet_mode)) + if((!json_flag) && (!quiet_mode)) { +#ifdef USE_DPDK + printf("Capturing from DPDK (port 0)...\n"); +#else printf("Capturing live traffic from device %s...\n", pcap_file); +#endif + } } configurePcapHandle(pcap_handle); +#endif /* !DPDK */ if(capture_for > 0) { if((!json_flag) && (!quiet_mode)) @@ -2520,13 +2560,13 @@ static void runPcapLoop(u_int16_t thread_id) { * @brief Process a running thread */ void * processing_thread(void *_thread_id) { - long thread_id = (long) _thread_id; char pcap_error_buffer[PCAP_ERRBUF_SIZE]; #if defined(linux) && defined(HAVE_PTHREAD_SETAFFINITY_NP) if(core_affinity[thread_id] >= 0) { cpu_set_t cpuset; + CPU_ZERO(&cpuset); CPU_SET(core_affinity[thread_id], &cpuset); @@ -2539,6 +2579,33 @@ void * processing_thread(void *_thread_id) { #endif if((!json_flag) && (!quiet_mode)) printf("Running thread %ld...\n", thread_id); +#ifdef USE_DPDK + while(dpdk_run_capture) { + struct rte_mbuf *bufs[BURST_SIZE]; + u_int16_t num = rte_eth_rx_burst(dpdk_port_id, 0, bufs, BURST_SIZE); + u_int i; + + if(num == 0) { + usleep(1); + continue; + } + + for(i = 0; i < PREFETCH_OFFSET && i < num; i++) + rte_prefetch0(rte_pktmbuf_mtod(bufs[i], void *)); + + for(i = 0; i < num; i++) { + char *data = rte_pktmbuf_mtod(bufs[i], char *); + int len = rte_pktmbuf_pkt_len(bufs[i]); + struct pcap_pkthdr h; + + h.len = h.caplen = len; + gettimeofday(&h.ts, NULL); + + pcap_process_packet((u_char*)&thread_id, &h, (const u_char *)data); + rte_pktmbuf_free(bufs[i]); + } + } +#else pcap_loop: runPcapLoop(thread_id); @@ -2551,6 +2618,7 @@ pcap_loop: goto pcap_loop; } } +#endif return NULL; } @@ -3239,7 +3307,7 @@ int orginal_main(int argc, char **argv) { automataUnitTest(); ndpi_info_mod = ndpi_init_detection_module(); - if (ndpi_info_mod == NULL) return -1; + if(ndpi_info_mod == NULL) return -1; memset(ndpi_thread_info, 0, sizeof(ndpi_thread_info)); diff --git a/example/ndpi_util.c b/example/ndpi_util.c index 977a246ec..1a0d45914 100644 --- a/example/ndpi_util.c +++ b/example/ndpi_util.c @@ -219,7 +219,7 @@ void ndpi_flow_info_freer(void *node) { /* ***************************************************** */ void ndpi_workflow_free(struct ndpi_workflow * workflow) { - int i; + u_int i; for(i=0; i<workflow->prefs.num_roots; i++) ndpi_tdestroy(workflow->ndpi_flows_root[i], ndpi_flow_info_freer); @@ -232,8 +232,8 @@ void ndpi_workflow_free(struct ndpi_workflow * workflow) { /* ***************************************************** */ int ndpi_workflow_node_cmp(const void *a, const void *b) { - struct ndpi_flow_info *fa = (struct ndpi_flow_info*)a; - struct ndpi_flow_info *fb = (struct ndpi_flow_info*)b; + const struct ndpi_flow_info *fa = (const struct ndpi_flow_info*)a; + const struct ndpi_flow_info *fb = (const struct ndpi_flow_info*)b; if(fa->hashval < fb->hashval) return(-1); else if(fa->hashval > fb->hashval) return(1); @@ -307,7 +307,7 @@ static struct ndpi_flow_info *get_ndpi_flow_info(struct ndpi_workflow * workflow u_int32_t idx, l4_offset, hashval; struct ndpi_flow_info flow; void *ret; - u_int8_t *l3, *l4; + const u_int8_t *l3, *l4; /* Note: to keep things simple (ndpiReader is just a demo app) @@ -322,10 +322,10 @@ static struct ndpi_flow_info *get_ndpi_flow_info(struct ndpi_workflow * workflow return NULL; l4_offset = iph->ihl * 4; - l3 = (u_int8_t*)iph; + l3 = (const u_int8_t*)iph; } else { l4_offset = sizeof(struct ndpi_ipv6hdr); - l3 = (u_int8_t*)iph6; + l3 = (const u_int8_t*)iph6; } if(l4_packet_len < 64) @@ -345,7 +345,7 @@ static struct ndpi_flow_info *get_ndpi_flow_info(struct ndpi_workflow * workflow workflow->stats.max_packet_len = l4_packet_len; *proto = iph->protocol; - l4 = ((u_int8_t *) l3 + l4_offset); + l4 = ((const u_int8_t *) l3 + l4_offset); if(iph->protocol == IPPROTO_TCP && l4_packet_len >= 20) { u_int tcp_len; @@ -355,7 +355,7 @@ static struct ndpi_flow_info *get_ndpi_flow_info(struct ndpi_workflow * workflow *tcph = (struct ndpi_tcphdr *)l4; *sport = ntohs((*tcph)->source), *dport = ntohs((*tcph)->dest); tcp_len = ndpi_min(4*(*tcph)->doff, l4_packet_len); - *payload = &l4[tcp_len]; + *payload = (u_int8_t*)&l4[tcp_len]; *payload_len = ndpi_max(0, l4_packet_len-4*(*tcph)->doff); } else if(iph->protocol == IPPROTO_UDP && l4_packet_len >= 8) { // udp @@ -363,8 +363,8 @@ static struct ndpi_flow_info *get_ndpi_flow_info(struct ndpi_workflow * workflow workflow->stats.udp_count++; *udph = (struct ndpi_udphdr *)l4; *sport = ntohs((*udph)->source), *dport = ntohs((*udph)->dest); - *payload = &l4[sizeof(struct ndpi_udphdr)]; - *payload_len = ndpi_max(0, l4_packet_len-sizeof(struct ndpi_udphdr)); + *payload = (u_int8_t*)&l4[sizeof(struct ndpi_udphdr)]; + *payload_len = (l4_packet_len > sizeof(struct ndpi_udphdr)) ? l4_packet_len-sizeof(struct ndpi_udphdr) : 0; } else { // non tcp/udp protocols *sport = *dport = 0; @@ -507,7 +507,7 @@ static struct ndpi_flow_info *get_ndpi_flow_info6(struct ndpi_workflow * workflo iph.protocol = iph6->ip6_hdr.ip6_un1_nxt; if(iph.protocol == IPPROTO_DSTOPTS /* IPv6 destination option */) { - u_int8_t *options = (u_int8_t*)iph6 + sizeof(const struct ndpi_ipv6hdr); + const u_int8_t *options = (const u_int8_t*)iph6 + sizeof(const struct ndpi_ipv6hdr); iph.protocol = options[0]; } @@ -530,7 +530,7 @@ void process_ndpi_collected_info(struct ndpi_workflow * workflow, struct ndpi_fl /* BITTORRENT */ if(flow->detected_protocol.app_protocol == NDPI_PROTOCOL_BITTORRENT) { - int i, j, n = 0; + u_int i, j, n = 0; for(i=0, j = 0; j < sizeof(flow->bittorent_hash)-1; i++) { sprintf(&flow->bittorent_hash[j], "%02x", @@ -751,8 +751,14 @@ struct ndpi_proto ndpi_workflow_process_packet (struct ndpi_workflow * workflow, workflow->last_time = time; /*** check Data Link type ***/ - const int datalink_type = pcap_datalink(workflow->pcap_handle); + int datalink_type; +#ifdef USE_DPDK + datalink_type = DLT_EN10MB; +#else + datalink_type = (int)pcap_datalink(workflow->pcap_handle); +#endif + datalink_check: switch(datalink_type) { case DLT_NULL: @@ -1076,3 +1082,49 @@ u_int32_t ethernet_crc32(const void* data, size_t n_bytes) { __crc32(data, n_bytes, &crc); return crc; } + +/* *********************************************** */ + +#ifdef USE_DPDK + +static const struct rte_eth_conf port_conf_default = { + .rxmode = { .max_rx_pkt_len = ETHER_MAX_LEN } +}; + +/* ************************************ */ + +int dpdk_port_init(int port, struct rte_mempool *mbuf_pool) { + struct rte_eth_conf port_conf = port_conf_default; + const u_int16_t rx_rings = 1, tx_rings = 1; + int retval; + u_int16_t q; + + /* 1 RX queue */ + retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf); + + if(retval != 0) + return retval; + + for (q = 0; q < rx_rings; q++) { + retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, rte_eth_dev_socket_id(port), NULL, mbuf_pool); + if(retval < 0) + return retval; + } + + for (q = 0; q < tx_rings; q++) { + retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, rte_eth_dev_socket_id(port), NULL); + if(retval < 0) + return retval; + } + + retval = rte_eth_dev_start(port); + + if(retval < 0) + return retval; + + rte_eth_promiscuous_enable(port); + + return 0; +} + +#endif diff --git a/example/ndpi_util.h b/example/ndpi_util.h index d1d461490..eb9ab8e65 100644 --- a/example/ndpi_util.h +++ b/example/ndpi_util.h @@ -31,6 +31,24 @@ #include <pcap.h> +#ifdef USE_DPDK +#include <rte_eal.h> +#include <rte_ether.h> +#include <rte_ethdev.h> +#include <rte_cycles.h> +#include <rte_lcore.h> +#include <rte_mbuf.h> + +#define RX_RING_SIZE 128 +#define TX_RING_SIZE 512 +#define NUM_MBUFS 8191 +#define MBUF_CACHE_SIZE 250 +#define BURST_SIZE 32 +#define PREFETCH_OFFSET 3 + +extern int dpdk_port_init(int port, struct rte_mempool *mbuf_pool); +#endif + #define MAX_NUM_READER_THREADS 16 #define IDLE_SCAN_PERIOD 10 /* msec (use TICK_RESOLUTION = 1000) */ #define MAX_IDLE_TIME 30000 |