aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToni Uhlig <matzeton@googlemail.com>2021-01-10 19:08:03 +0100
committerToni Uhlig <matzeton@googlemail.com>2021-01-10 19:12:05 +0100
commitf5d5c076a3a7f2d5ef87d195a67f5c624e09c137 (patch)
tree189e8caaaf2cd0e6857f31316f295740af6afd0f
parentb82a535a45c5eaadbbc558c6063e35dd1c99ce1f (diff)
Added internal/external packet processing modes. #2
* fixed incorrect handling of skipped flows Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r--config.h2
-rw-r--r--nDPId.c511
2 files changed, 308 insertions, 205 deletions
diff --git a/config.h b/config.h
index 45af506ac..c345f15cf 100644
--- a/config.h
+++ b/config.h
@@ -21,7 +21,7 @@
#define nDPId_MAX_READER_THREADS 32
#define nDPId_IDLE_SCAN_PERIOD 10000 /* 10 sec */
#define nDPId_IDLE_TIME 600000 /* 600 sec */
-#define nDPId_POST_END_FLOW_TIME 60000 /* 60 sec */
+#define nDPId_TCP_POST_END_FLOW_TIME 60000 /* 60 sec */
#define nDPId_THREAD_DISTRIBUTION_SEED 0x03dd018b
#define nDPId_PACKETS_PER_FLOW_TO_SEND 15
#define nDPId_FLOW_STRUCT_SEED 0x5defc104
diff --git a/nDPId.c b/nDPId.c
index 26a2b6713..ca41e7505 100644
--- a/nDPId.c
+++ b/nDPId.c
@@ -43,31 +43,51 @@ union nDPId_ip {
} v6;
};
-struct nDPId_flow_info
+enum nDPId_flow_type
{
- uint32_t flow_id;
- unsigned long long int packets_processed;
- uint64_t first_seen;
- uint64_t last_seen;
- uint64_t hashval;
+ FT_UNKNOWN = 0,
+ FT_SKIPPED,
+ FT_INFO
+};
+/*
+ * Minimal per-flow information required for flow mgmt and timeout handling.
+ */
+struct nDPId_flow_basic
+{
+ enum nDPId_flow_type type;
+ uint64_t hashval;
enum nDPId_l3_type l3_type;
-
union nDPId_ip src;
union nDPId_ip dst;
+ uint8_t l4_protocol;
+ uint8_t tcp_fin_rst_seen : 1;
+ uint8_t tcp_is_midstream_flow : 1;
+ uint8_t reserved_00[2];
+ uint16_t src_port;
+ uint16_t dst_port;
+ uint64_t last_seen;
+};
+
+struct nDPId_flow_skipped
+{
+ struct nDPId_flow_basic flow_basic;
+};
+
+struct nDPId_flow_info
+{
+ struct nDPId_flow_basic flow_basic;
+
+ uint32_t flow_id;
+ unsigned long long int packets_processed;
+ uint64_t first_seen;
uint16_t min_l4_data_len;
uint16_t max_l4_data_len;
unsigned long long int total_l4_data_len;
- uint16_t src_port;
- uint16_t dst_port;
- uint8_t is_midstream_flow : 1;
- uint8_t flow_fin_rst_seen : 1;
uint8_t detection_completed : 1;
- uint8_t reserved_00 : 5;
- uint8_t reserved_01[3];
- uint8_t l4_protocol;
+ uint8_t reserved_00[3];
uint32_t last_ndpi_flow_struct_hash;
struct ndpi_proto detected_l7_protocol;
@@ -99,6 +119,7 @@ struct nDPId_workflow
unsigned long long int packets_captured;
unsigned long long int packets_processed;
+ unsigned long long int total_skipped_flows;
unsigned long long int total_l4_data_len;
unsigned long long int detected_flow_protocols;
@@ -253,7 +274,7 @@ static unsigned long long int tick_resolution = nDPId_TICK_RESOLUTION;
static unsigned long long int reader_thread_count = nDPId_MAX_READER_THREADS / 2;
static unsigned long long int idle_scan_period = nDPId_IDLE_SCAN_PERIOD;
static unsigned long long int max_idle_time = nDPId_IDLE_TIME;
-static unsigned long long int max_post_end_flow_time = nDPId_POST_END_FLOW_TIME;
+static unsigned long long int tcp_max_post_end_flow_time = nDPId_TCP_POST_END_FLOW_TIME;
static unsigned long long int max_packets_per_flow_to_send = nDPId_PACKETS_PER_FLOW_TO_SEND;
enum nDPId_subopts
@@ -264,7 +285,7 @@ enum nDPId_subopts
MAX_READER_THREADS,
IDLE_SCAN_PERIOD,
MAX_IDLE_TIME,
- MAX_POST_END_FLOW_TIME,
+ TCP_MAX_POST_END_FLOW_TIME,
MAX_PACKETS_PER_FLOW_TO_SEND,
};
static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-thread",
@@ -273,7 +294,7 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th
[MAX_READER_THREADS] = "max-reader-threads",
[IDLE_SCAN_PERIOD] = "idle-scan-period",
[MAX_IDLE_TIME] = "max-idle-time",
- [MAX_POST_END_FLOW_TIME] = "max-post-end-flow-time",
+ [TCP_MAX_POST_END_FLOW_TIME] = "tcp-max-post-end-flow-time",
[MAX_PACKETS_PER_FLOW_TO_SEND] = "max-packets-per-flow-to-send",
NULL};
@@ -322,11 +343,13 @@ static void get_v4_ip_from_sockaddr(struct sockaddr_in * saddr, union nDPId_ip *
dest->v4.ip = saddr->sin_addr.s_addr;
}
+#if 0
static void get_v6_ip_from_sockaddr(struct sockaddr_in6 * saddr, union nDPId_ip * dest)
{
dest->v6.ip6[0] = *(uint64_t *)&saddr->sin6_addr.s6_addr[0];
dest->v6.ip6[0] = *(uint64_t *)&saddr->sin6_addr.s6_addr[7];
}
+#endif
/*
* Only IPv4 supported for now!
@@ -336,6 +359,7 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev)
{
struct ifaddrs * ifaddrs = NULL;
struct ifaddrs * ifa;
+ int sock = -1;
if (getifaddrs(&ifaddrs) != 0 || ifaddrs == NULL)
{
@@ -366,7 +390,13 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev)
if (strcmp(ifa->ifa_name, pcap_dev) == 0 && ifnamelen == strlen(pcap_dev))
{
- int const sock = socket(ifa->ifa_addr->sa_family, SOCK_DGRAM, IPPROTO_IP);
+ if (type == L3_IP6)
+ {
+ syslog(LOG_DAEMON, "IPv6 network interfaces in combination with -I / -E is not supported for now");
+ continue;
+ }
+
+ sock = socket(ifa->ifa_addr->sa_family, SOCK_DGRAM, IPPROTO_IP);
struct ifreq ifr;
if (sock < 0)
@@ -386,14 +416,7 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev)
{
break;
}
- if (ifa->ifa_addr->sa_family == AF_INET)
- {
- get_v4_ip_from_sockaddr((struct sockaddr_in *)&ifr.ifr_netmask, &pcap_dev_netmask);
- }
- else
- {
- get_v6_ip_from_sockaddr((struct sockaddr_in6 *)&ifr.ifr_netmask, &pcap_dev_netmask);
- }
+ get_v4_ip_from_sockaddr((struct sockaddr_in *)&ifr.ifr_netmask, &pcap_dev_netmask);
memset(&ifr, 0, sizeof(ifr));
memcpy(ifr.ifr_name, ifa->ifa_name, ifnamelen);
@@ -403,26 +426,16 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev)
{
break;
}
- if (ifa->ifa_addr->sa_family == AF_INET)
- {
- get_v4_ip_from_sockaddr((struct sockaddr_in *)&ifr.ifr_netmask, &pcap_dev_ip);
- }
- else
- {
- get_v6_ip_from_sockaddr((struct sockaddr_in6 *)&ifr.ifr_netmask, &pcap_dev_ip);
- }
+ get_v4_ip_from_sockaddr((struct sockaddr_in *)&ifr.ifr_netmask, &pcap_dev_ip);
ip_netmask_to_subnet(&pcap_dev_ip, &pcap_dev_netmask, &pcap_dev_subnet, type);
- char addr[INET6_ADDRSTRLEN];
- char netm[INET6_ADDRSTRLEN];
- char subn[INET6_ADDRSTRLEN];
- void * saddr =
- (ifa->ifa_addr->sa_family == AF_INET ? (void *)&pcap_dev_ip.v4.ip : (void *)&pcap_dev_ip.v6.ip6);
- void * snetm = (ifa->ifa_addr->sa_family == AF_INET ? (void *)&pcap_dev_netmask.v4.ip
- : (void *)&pcap_dev_netmask.v6.ip6);
- void * ssubn = (ifa->ifa_addr->sa_family == AF_INET ? (void *)&pcap_dev_subnet.v4.ip
- : (void *)&pcap_dev_subnet.v6.ip6);
+ char addr[INET_ADDRSTRLEN];
+ char netm[INET_ADDRSTRLEN];
+ char subn[INET_ADDRSTRLEN];
+ void * saddr = &pcap_dev_ip.v4.ip;
+ void * snetm = &pcap_dev_netmask.v4.ip;
+ void * ssubn = &pcap_dev_subnet.v4.ip;
syslog(LOG_DAEMON,
"%s address/netmask/subnet: %s/%s/%s",
pcap_file_or_interface,
@@ -431,11 +444,16 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev)
inet_ntop(ifa->ifa_addr->sa_family, ssubn, subn, sizeof(subn)));
freeifaddrs(ifaddrs);
+ close(sock);
return 0;
}
}
freeifaddrs(ifaddrs);
+ if (sock >= 0)
+ {
+ close(sock);
+ }
return 1;
}
@@ -499,6 +517,7 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
return NULL;
}
+ workflow->total_skipped_flows = 0;
workflow->total_active_flows = 0;
workflow->max_active_flows = max_flows_per_thread;
workflow->ndpi_flows_active = (void **)ndpi_calloc(workflow->max_active_flows, sizeof(void *));
@@ -533,10 +552,14 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device)
static void ndpi_flow_info_freer(void * const node)
{
- struct nDPId_flow_info * const flow = (struct nDPId_flow_info *)node;
+ struct nDPId_flow_basic * const flow_basic = (struct nDPId_flow_basic *)node;
- ndpi_free_flow_data(&flow->ndpi_flow);
- ndpi_free(flow);
+ if (flow_basic->type == FT_INFO)
+ {
+ struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic;
+ ndpi_free_flow_data(&flow_info->ndpi_flow);
+ }
+ ndpi_free(flow_basic);
}
static void free_workflow(struct nDPId_workflow ** const workflow)
@@ -644,7 +667,7 @@ static int setup_reader_threads(void)
return 0;
}
-static int ip_tuples_equal(struct nDPId_flow_info const * const A, struct nDPId_flow_info const * const B)
+static int ip_tuples_equal(struct nDPId_flow_basic const * const A, struct nDPId_flow_basic const * const B)
{
// generate a warning if the enum changes
switch (A->l3_type)
@@ -665,7 +688,7 @@ static int ip_tuples_equal(struct nDPId_flow_info const * const A, struct nDPId_
return 0;
}
-static int ip_tuples_compare(struct nDPId_flow_info const * const A, struct nDPId_flow_info const * const B)
+static int ip_tuples_compare(struct nDPId_flow_basic const * const A, struct nDPId_flow_basic const * const B)
{
// generate a warning if the enum changes
switch (A->l3_type)
@@ -712,11 +735,11 @@ static int ip_tuples_compare(struct nDPId_flow_info const * const A, struct nDPI
static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
{
struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data;
- struct nDPId_flow_info * const flow = *(struct nDPId_flow_info **)A;
+ struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A;
(void)depth;
- if (workflow == NULL || flow == NULL)
+ if (workflow == NULL || flow_basic == NULL)
{
return;
}
@@ -728,46 +751,50 @@ static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int de
if (which == ndpi_preorder || which == ndpi_leaf)
{
- if ((flow->flow_fin_rst_seen == 1 && flow->last_seen + max_post_end_flow_time < workflow->last_time) ||
- flow->last_seen + max_idle_time < workflow->last_time)
+ if (flow_basic->last_seen + max_idle_time < workflow->last_time ||
+ (flow_basic->tcp_fin_rst_seen == 1 &&
+ flow_basic->last_seen + tcp_max_post_end_flow_time < workflow->last_time))
{
- workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow;
- workflow->total_idle_flows++;
+ workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow_basic;
+ if (flow_basic->type == FT_INFO)
+ {
+ workflow->total_idle_flows++;
+ }
}
}
}
static int ndpi_workflow_node_cmp(void const * const A, void const * const B)
{
- struct nDPId_flow_info const * const flow_info_a = (struct nDPId_flow_info *)A;
- struct nDPId_flow_info const * const flow_info_b = (struct nDPId_flow_info *)B;
+ struct nDPId_flow_basic const * const flow_basic_a = (struct nDPId_flow_basic *)A;
+ struct nDPId_flow_basic const * const flow_basic_b = (struct nDPId_flow_basic *)B;
- if (flow_info_a->hashval < flow_info_b->hashval)
+ if (flow_basic_a->hashval < flow_basic_b->hashval)
{
- return (-1);
+ return -1;
}
- else if (flow_info_a->hashval > flow_info_b->hashval)
+ else if (flow_basic_a->hashval > flow_basic_b->hashval)
{
- return (1);
+ return 1;
}
/* Flows have the same hash */
- if (flow_info_a->l4_protocol < flow_info_b->l4_protocol)
+ if (flow_basic_a->l4_protocol < flow_basic_b->l4_protocol)
{
- return (-1);
+ return -1;
}
- else if (flow_info_a->l4_protocol > flow_info_b->l4_protocol)
+ else if (flow_basic_a->l4_protocol > flow_basic_b->l4_protocol)
{
- return (1);
+ return 1;
}
- if (ip_tuples_equal(flow_info_a, flow_info_b) != 0 && flow_info_a->src_port == flow_info_b->src_port &&
- flow_info_a->dst_port == flow_info_b->dst_port)
+ if (ip_tuples_equal(flow_basic_a, flow_basic_b) != 0 && flow_basic_a->src_port == flow_basic_b->src_port &&
+ flow_basic_a->dst_port == flow_basic_b->dst_port)
{
- return (0);
+ return 0;
}
- return ip_tuples_compare(flow_info_a, flow_info_b);
+ return ip_tuples_compare(flow_basic_a, flow_basic_b);
}
static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, size_t idle_scan_index)
@@ -776,42 +803,48 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread,
while (workflow->cur_idle_flows > 0)
{
- struct nDPId_flow_info * const f =
- (struct nDPId_flow_info *)workflow->ndpi_flows_idle[--workflow->cur_idle_flows];
+ struct nDPId_flow_basic * const flow_basic =
+ (struct nDPId_flow_basic *)workflow->ndpi_flows_idle[--workflow->cur_idle_flows];
- if (f->detection_completed == 0)
+ if (flow_basic->type == FT_INFO)
{
- uint8_t protocol_was_guessed = 0;
+ struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic;
- if (ndpi_is_protocol_detected(workflow->ndpi_struct, f->guessed_l7_protocol) == 0)
- {
- f->guessed_l7_protocol =
- ndpi_detection_giveup(workflow->ndpi_struct, &f->ndpi_flow, 1, &protocol_was_guessed);
- }
- else
+ if (flow_info->detection_completed == 0)
{
- protocol_was_guessed = 1;
- }
+ uint8_t protocol_was_guessed = 0;
- if (protocol_was_guessed != 0)
+ if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_info->guessed_l7_protocol) == 0)
+ {
+ flow_info->guessed_l7_protocol =
+ ndpi_detection_giveup(workflow->ndpi_struct, &flow_info->ndpi_flow, 1, &protocol_was_guessed);
+ }
+ else
+ {
+ protocol_was_guessed = 1;
+ }
+
+ if (protocol_was_guessed != 0)
+ {
+ jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_GUESSED);
+ }
+ else
+ {
+ jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_NOT_DETECTED);
+ }
+ }
+ if (flow_basic->tcp_fin_rst_seen != 0)
{
- jsonize_flow_event(reader_thread, f, FLOW_EVENT_GUESSED);
+ jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_END);
}
else
{
- jsonize_flow_event(reader_thread, f, FLOW_EVENT_NOT_DETECTED);
+ jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_IDLE);
}
}
- if (f->flow_fin_rst_seen != 0)
- {
- jsonize_flow_event(reader_thread, f, FLOW_EVENT_END);
- }
- else
- {
- jsonize_flow_event(reader_thread, f, FLOW_EVENT_IDLE);
- }
- ndpi_tdelete(f, &workflow->ndpi_flows_active[idle_scan_index], ndpi_workflow_node_cmp);
- ndpi_flow_info_freer(f);
+
+ ndpi_tdelete(flow_basic, &workflow->ndpi_flows_active[idle_scan_index], ndpi_workflow_node_cmp);
+ ndpi_flow_info_freer(flow_basic);
workflow->cur_active_flows--;
}
}
@@ -838,26 +871,26 @@ static void jsonize_l3_l4(struct nDPId_workflow * const workflow, struct nDPId_f
char src_name[48] = {};
char dst_name[48] = {};
- switch (flow->l3_type)
+ switch (flow->flow_basic.l3_type)
{
case L3_IP:
ndpi_serialize_string_string(serializer, "l3_proto", "ip4");
- if (inet_ntop(AF_INET, &flow->src.v4.ip, src_name, sizeof(src_name)) == NULL)
+ if (inet_ntop(AF_INET, &flow->flow_basic.src.v4.ip, src_name, sizeof(src_name)) == NULL)
{
syslog(LOG_DAEMON | LOG_ERR, "Could not convert IPv4 source ip to string: %s", strerror(errno));
}
- if (inet_ntop(AF_INET, &flow->dst.v4.ip, dst_name, sizeof(dst_name)) == NULL)
+ if (inet_ntop(AF_INET, &flow->flow_basic.dst.v4.ip, dst_name, sizeof(dst_name)) == NULL)
{
syslog(LOG_DAEMON | LOG_ERR, "Could not convert IPv4 destination ip to string: %s", strerror(errno));
}
break;
case L3_IP6:
ndpi_serialize_string_string(serializer, "l3_proto", "ip6");
- if (inet_ntop(AF_INET6, &flow->src.v6.ip6[0], src_name, sizeof(src_name)) == NULL)
+ if (inet_ntop(AF_INET6, &flow->flow_basic.src.v6.ip6[0], src_name, sizeof(src_name)) == NULL)
{
syslog(LOG_DAEMON | LOG_ERR, "Could not convert IPv6 source ip to string: %s", strerror(errno));
}
- if (inet_ntop(AF_INET6, &flow->dst.v6.ip6[0], dst_name, sizeof(dst_name)) == NULL)
+ if (inet_ntop(AF_INET6, &flow->flow_basic.dst.v6.ip6[0], dst_name, sizeof(dst_name)) == NULL)
{
syslog(LOG_DAEMON | LOG_ERR, "Could not convert IPv6 destination ip to string: %s", strerror(errno));
}
@@ -871,16 +904,16 @@ static void jsonize_l3_l4(struct nDPId_workflow * const workflow, struct nDPId_f
ndpi_serialize_string_string(serializer, "src_ip", src_name);
ndpi_serialize_string_string(serializer, "dst_ip", dst_name);
- if (flow->src_port)
+ if (flow->flow_basic.src_port)
{
- ndpi_serialize_string_uint32(serializer, "src_port", flow->src_port);
+ ndpi_serialize_string_uint32(serializer, "src_port", flow->flow_basic.src_port);
}
- if (flow->dst_port)
+ if (flow->flow_basic.dst_port)
{
- ndpi_serialize_string_uint32(serializer, "dst_port", flow->dst_port);
+ ndpi_serialize_string_uint32(serializer, "dst_port", flow->flow_basic.dst_port);
}
- switch (flow->l4_protocol)
+ switch (flow->flow_basic.l4_protocol)
{
case IPPROTO_TCP:
ndpi_serialize_string_string(serializer, "l4_proto", "tcp");
@@ -895,7 +928,7 @@ static void jsonize_l3_l4(struct nDPId_workflow * const workflow, struct nDPId_f
ndpi_serialize_string_string(serializer, "l4_proto", "icmp6");
break;
default:
- ndpi_serialize_string_uint32(serializer, "l4_proto", flow->l4_protocol);
+ ndpi_serialize_string_uint32(serializer, "l4_proto", flow->flow_basic.l4_protocol);
break;
}
}
@@ -938,7 +971,9 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu
ndpi_serialize_string_int64(&workflow->ndpi_serializer, "reader-thread-count", reader_thread_count);
ndpi_serialize_string_int64(&workflow->ndpi_serializer, "idle-scan-period", idle_scan_period);
ndpi_serialize_string_int64(&workflow->ndpi_serializer, "max-idle-time", max_idle_time);
- ndpi_serialize_string_int64(&workflow->ndpi_serializer, "max-post-end-flow-time", max_post_end_flow_time);
+ ndpi_serialize_string_int64(&workflow->ndpi_serializer,
+ "tcp-max-post-end-flow-time",
+ tcp_max_post_end_flow_time);
ndpi_serialize_string_int64(&workflow->ndpi_serializer,
"max-packets-per-flow-to-send",
max_packets_per_flow_to_send);
@@ -951,21 +986,24 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl
ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_id", flow->flow_id);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packet_id", flow->packets_processed);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_first_seen", flow->first_seen);
- ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_last_seen", flow->last_seen);
+ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_last_seen", flow->flow_basic.last_seen);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_tot_l4_data_len", flow->total_l4_data_len);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_min_l4_data_len", flow->min_l4_data_len);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_max_l4_data_len", flow->max_l4_data_len);
ndpi_serialize_string_uint64(&workflow->ndpi_serializer,
"flow_avg_l4_data_len",
(flow->packets_processed > 0 ? flow->total_l4_data_len / flow->packets_processed : 0));
- ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow->is_midstream_flow);
+ ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow->flow_basic.tcp_is_midstream_flow);
}
static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thread)
{
struct sockaddr_un saddr;
- close(reader_thread->json_sockfd);
+ if (reader_thread->json_sockfd >= 0)
+ {
+ close(reader_thread->json_sockfd);
+ }
reader_thread->json_sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
if (reader_thread->json_sockfd < 0)
@@ -1623,13 +1661,50 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre
return 0;
}
+static struct nDPId_flow_basic * add_new_flow(struct nDPId_workflow * const workflow,
+ struct nDPId_flow_basic * orig_flow_basic,
+ enum nDPId_flow_type type,
+ size_t hashed_index)
+{
+ size_t s;
+
+ switch (type)
+ {
+ case FT_UNKNOWN:
+ return NULL;
+ case FT_SKIPPED:
+ workflow->total_skipped_flows++;
+ s = sizeof(struct nDPId_flow_skipped);
+ break;
+ case FT_INFO:
+ s = sizeof(struct nDPId_flow_info);
+ break;
+ }
+
+ struct nDPId_flow_basic * flow_basic = (struct nDPId_flow_basic *)ndpi_malloc(s);
+ if (flow_basic == NULL)
+ {
+ return NULL;
+ }
+ memset(flow_basic, 0, s);
+ *flow_basic = *orig_flow_basic;
+ flow_basic->type = type;
+ if (ndpi_tsearch(flow_basic, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp) == NULL)
+ {
+ ndpi_free(flow_basic);
+ return NULL;
+ }
+
+ return flow_basic;
+}
+
static void ndpi_process_packet(uint8_t * const args,
struct pcap_pkthdr const * const header,
uint8_t const * const packet)
{
struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)args;
struct nDPId_workflow * workflow;
- struct nDPId_flow_info flow = {};
+ struct nDPId_flow_basic flow_basic = {};
size_t hashed_index;
void * tree_result;
@@ -1719,10 +1794,10 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
- flow.l3_type = L3_IP;
+ flow_basic.l3_type = L3_IP;
if (ndpi_detection_get_l4(
- (uint8_t *)ip, ip_size, &l4_ptr, &l4_len, &flow.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0)
+ (uint8_t *)ip, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0)
{
jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD);
jsonize_basic_eventf(
@@ -1730,9 +1805,9 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
- flow.src.v4.ip = ip->saddr;
- flow.dst.v4.ip = ip->daddr;
- uint32_t min_addr = (flow.src.v4.ip > flow.dst.v4.ip ? flow.dst.v4.ip : flow.src.v4.ip);
+ flow_basic.src.v4.ip = ip->saddr;
+ flow_basic.dst.v4.ip = ip->daddr;
+ uint32_t min_addr = (flow_basic.src.v4.ip > flow_basic.dst.v4.ip ? flow_basic.dst.v4.ip : flow_basic.src.v4.ip);
thread_index = min_addr + ip->protocol;
}
else if (ip6 != NULL)
@@ -1750,9 +1825,9 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
- flow.l3_type = L3_IP6;
+ flow_basic.l3_type = L3_IP6;
if (ndpi_detection_get_l4(
- (uint8_t *)ip6, ip_size, &l4_ptr, &l4_len, &flow.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0)
+ (uint8_t *)ip6, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0)
{
jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD);
jsonize_basic_eventf(
@@ -1760,20 +1835,20 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
- flow.src.v6.ip6[0] = ip6->ip6_src.u6_addr.u6_addr64[0];
- flow.src.v6.ip6[1] = ip6->ip6_src.u6_addr.u6_addr64[1];
- flow.dst.v6.ip6[0] = ip6->ip6_dst.u6_addr.u6_addr64[0];
- flow.dst.v6.ip6[1] = ip6->ip6_dst.u6_addr.u6_addr64[1];
+ flow_basic.src.v6.ip6[0] = ip6->ip6_src.u6_addr.u6_addr64[0];
+ flow_basic.src.v6.ip6[1] = ip6->ip6_src.u6_addr.u6_addr64[1];
+ flow_basic.dst.v6.ip6[0] = ip6->ip6_dst.u6_addr.u6_addr64[0];
+ flow_basic.dst.v6.ip6[1] = ip6->ip6_dst.u6_addr.u6_addr64[1];
uint64_t min_addr[2];
- if (flow.src.v6.ip6[0] > flow.dst.v6.ip6[0] && flow.src.v6.ip6[1] > flow.dst.v6.ip6[1])
+ if (flow_basic.src.v6.ip6[0] > flow_basic.dst.v6.ip6[0] && flow_basic.src.v6.ip6[1] > flow_basic.dst.v6.ip6[1])
{
- min_addr[0] = flow.dst.v6.ip6[0];
- min_addr[1] = flow.dst.v6.ip6[0];
+ min_addr[0] = flow_basic.dst.v6.ip6[0];
+ min_addr[1] = flow_basic.dst.v6.ip6[0];
}
else
{
- min_addr[0] = flow.src.v6.ip6[0];
- min_addr[1] = flow.src.v6.ip6[0];
+ min_addr[0] = flow_basic.src.v6.ip6[0];
+ min_addr[1] = flow_basic.src.v6.ip6[0];
}
thread_index = min_addr[0] + min_addr[1] + ip6->ip6_hdr.ip6_un1_nxt;
}
@@ -1785,7 +1860,7 @@ static void ndpi_process_packet(uint8_t * const args,
}
/* process layer4 e.g. TCP / UDP */
- if (flow.l4_protocol == IPPROTO_TCP)
+ if (flow_basic.l4_protocol == IPPROTO_TCP)
{
const struct ndpi_tcphdr * tcp;
@@ -1802,12 +1877,12 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
tcp = (struct ndpi_tcphdr *)l4_ptr;
- flow.is_midstream_flow = (tcp->syn == 0 ? 1 : 0);
- flow.flow_fin_rst_seen = (tcp->fin == 1 || tcp->rst == 1 ? 1 : 0);
- flow.src_port = ntohs(tcp->source);
- flow.dst_port = ntohs(tcp->dest);
+ flow_basic.tcp_fin_rst_seen = (tcp->fin == 1 || tcp->rst == 1 ? 1 : 0);
+ flow_basic.tcp_is_midstream_flow = (tcp->syn == 0 ? 1 : 0);
+ flow_basic.src_port = ntohs(tcp->source);
+ flow_basic.dst_port = ntohs(tcp->dest);
}
- else if (flow.l4_protocol == IPPROTO_UDP)
+ else if (flow_basic.l4_protocol == IPPROTO_UDP)
{
const struct ndpi_udphdr * udp;
@@ -1824,12 +1899,12 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
udp = (struct ndpi_udphdr *)l4_ptr;
- flow.src_port = ntohs(udp->source);
- flow.dst_port = ntohs(udp->dest);
+ flow_basic.src_port = ntohs(udp->source);
+ flow_basic.dst_port = ntohs(udp->dest);
}
/* distribute flows to threads while keeping stability (same flow goes always to same thread) */
- thread_index += (flow.src_port < flow.dst_port ? flow.dst_port : flow.src_port);
+ thread_index += (flow_basic.src_port < flow_basic.dst_port ? flow_basic.dst_port : flow_basic.src_port);
thread_index %= reader_thread_count;
if (thread_index != reader_thread->array_index)
{
@@ -1839,69 +1914,69 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->total_l4_data_len += l4_len;
/* calculate flow hash for btree find, search(insert) */
- switch (flow.l3_type)
+ switch (flow_basic.l3_type)
{
case L3_IP:
- if (ndpi_flowv4_flow_hash(flow.l4_protocol,
- flow.src.v4.ip,
- flow.dst.v4.ip,
- flow.src_port,
- flow.dst_port,
+ if (ndpi_flowv4_flow_hash(flow_basic.l4_protocol,
+ flow_basic.src.v4.ip,
+ flow_basic.dst.v4.ip,
+ flow_basic.src_port,
+ flow_basic.dst_port,
0,
0,
- (uint8_t *)&flow.hashval,
- sizeof(flow.hashval)) != 0)
+ (uint8_t *)&flow_basic.hashval,
+ sizeof(flow_basic.hashval)) != 0)
{
- flow.hashval = flow.src.v4.ip + flow.dst.v4.ip; // fallback
+ flow_basic.hashval = flow_basic.src.v4.ip + flow_basic.dst.v4.ip; // fallback
}
break;
case L3_IP6:
- if (ndpi_flowv6_flow_hash(flow.l4_protocol,
+ if (ndpi_flowv6_flow_hash(flow_basic.l4_protocol,
&ip6->ip6_src,
&ip6->ip6_dst,
- flow.src_port,
- flow.dst_port,
+ flow_basic.src_port,
+ flow_basic.dst_port,
0,
0,
- (uint8_t *)&flow.hashval,
- sizeof(flow.hashval)) != 0)
+ (uint8_t *)&flow_basic.hashval,
+ sizeof(flow_basic.hashval)) != 0)
{
- flow.hashval = flow.src.v6.ip6[0] + flow.src.v6.ip6[1];
- flow.hashval += flow.dst.v6.ip6[0] + flow.dst.v6.ip6[1];
+ flow_basic.hashval = flow_basic.src.v6.ip6[0] + flow_basic.src.v6.ip6[1];
+ flow_basic.hashval += flow_basic.dst.v6.ip6[0] + flow_basic.dst.v6.ip6[1];
}
break;
}
- flow.hashval += flow.l4_protocol + flow.src_port + flow.dst_port;
+ flow_basic.hashval += flow_basic.l4_protocol + flow_basic.src_port + flow_basic.dst_port;
- hashed_index = flow.hashval % workflow->max_active_flows;
- tree_result = ndpi_tfind(&flow, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp);
+ hashed_index = flow_basic.hashval % workflow->max_active_flows;
+ tree_result = ndpi_tfind(&flow_basic, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp);
if (tree_result == NULL)
{
/* flow not found in btree: switch src <-> dst and try to find it again */
- uint64_t orig_src_ip[2] = {flow.src.v6.ip6[0], flow.src.v6.ip6[1]};
- uint64_t orig_dst_ip[2] = {flow.dst.v6.ip6[0], flow.dst.v6.ip6[1]};
- uint16_t orig_src_port = flow.src_port;
- uint16_t orig_dst_port = flow.dst_port;
-
- flow.src.v6.ip6[0] = orig_dst_ip[0];
- flow.src.v6.ip6[1] = orig_dst_ip[1];
- flow.dst.v6.ip6[0] = orig_src_ip[0];
- flow.dst.v6.ip6[1] = orig_src_ip[1];
- flow.src_port = orig_dst_port;
- flow.dst_port = orig_src_port;
-
- tree_result = ndpi_tfind(&flow, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp);
+ uint64_t orig_src_ip[2] = {flow_basic.src.v6.ip6[0], flow_basic.src.v6.ip6[1]};
+ uint64_t orig_dst_ip[2] = {flow_basic.dst.v6.ip6[0], flow_basic.dst.v6.ip6[1]};
+ uint16_t orig_src_port = flow_basic.src_port;
+ uint16_t orig_dst_port = flow_basic.dst_port;
+
+ flow_basic.src.v6.ip6[0] = orig_dst_ip[0];
+ flow_basic.src.v6.ip6[1] = orig_dst_ip[1];
+ flow_basic.dst.v6.ip6[0] = orig_src_ip[0];
+ flow_basic.dst.v6.ip6[1] = orig_src_ip[1];
+ flow_basic.src_port = orig_dst_port;
+ flow_basic.dst_port = orig_src_port;
+
+ tree_result = ndpi_tfind(&flow_basic, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp);
if (tree_result != NULL)
{
direction_changed = 1;
}
- flow.src.v6.ip6[0] = orig_src_ip[0];
- flow.src.v6.ip6[1] = orig_src_ip[1];
- flow.dst.v6.ip6[0] = orig_dst_ip[0];
- flow.dst.v6.ip6[1] = orig_dst_ip[1];
- flow.src_port = orig_src_port;
- flow.dst_port = orig_dst_port;
+ flow_basic.src.v6.ip6[0] = orig_src_ip[0];
+ flow_basic.src.v6.ip6[1] = orig_src_ip[1];
+ flow_basic.dst.v6.ip6[0] = orig_dst_ip[0];
+ flow_basic.dst.v6.ip6[1] = orig_dst_ip[1];
+ flow_basic.src_port = orig_src_port;
+ flow_basic.dst_port = orig_dst_port;
}
if (tree_result == NULL)
@@ -1910,15 +1985,33 @@ static void ndpi_process_packet(uint8_t * const args,
if (process_internal_initial_direction != 0)
{
- if (is_ip_in_subnet(&flow.src, &pcap_dev_netmask, &pcap_dev_subnet, flow.l3_type) == 0)
+ if (is_ip_in_subnet(&flow_basic.src, &pcap_dev_netmask, &pcap_dev_subnet, flow_basic.l3_type) == 0)
{
+ if (add_new_flow(workflow, &flow_basic, FT_SKIPPED, hashed_index) == NULL)
+ {
+ jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(reader_thread,
+ FLOW_MEMORY_ALLOCATION_FAILED,
+ "%s%zu",
+ "size",
+ sizeof(struct nDPId_flow_skipped));
+ }
return;
}
}
else if (process_external_initial_direction != 0)
{
- if (is_ip_in_subnet(&flow.src, &pcap_dev_netmask, &pcap_dev_subnet, flow.l3_type) != 0)
+ if (is_ip_in_subnet(&flow_basic.src, &pcap_dev_netmask, &pcap_dev_subnet, flow_basic.l3_type) != 0)
{
+ if (add_new_flow(workflow, &flow_basic, FT_SKIPPED, hashed_index) == NULL)
+ {
+ jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD);
+ jsonize_basic_eventf(reader_thread,
+ FLOW_MEMORY_ALLOCATION_FAILED,
+ "%s%zu",
+ "size",
+ sizeof(struct nDPId_flow_skipped));
+ }
return;
}
}
@@ -1938,7 +2031,7 @@ static void ndpi_process_packet(uint8_t * const args,
return;
}
- flow_to_process = (struct nDPId_flow_info *)ndpi_malloc(sizeof(*flow_to_process));
+ flow_to_process = (struct nDPId_flow_info *)add_new_flow(workflow, &flow_basic, FT_INFO, hashed_index);
if (flow_to_process == NULL)
{
jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD);
@@ -1949,7 +2042,6 @@ static void ndpi_process_packet(uint8_t * const args,
workflow->cur_active_flows++;
workflow->total_active_flows++;
- memcpy(flow_to_process, &flow, sizeof(*flow_to_process));
#ifdef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_4
flow_to_process->flow_id = __sync_fetch_and_add(&global_flow_id, 1);
#else
@@ -1968,12 +2060,6 @@ static void ndpi_process_packet(uint8_t * const args,
0,
(SIZEOF_ID_STRUCT > sizeof(struct ndpi_id_struct) ? SIZEOF_ID_STRUCT : sizeof(struct ndpi_id_struct)));
- if (ndpi_tsearch(flow_to_process, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp) == NULL)
- {
- /* Possible Leak, but should not happen as we'd abort earlier. */
- return;
- }
-
ndpi_src = &flow_to_process->ndpi_src;
ndpi_dst = &flow_to_process->ndpi_dst;
@@ -1981,7 +2067,20 @@ static void ndpi_process_packet(uint8_t * const args,
}
else
{
- flow_to_process = *(struct nDPId_flow_info **)tree_result;
+ struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result;
+ /* Update last seen timestamp for timeout handling. */
+ flow_basic_to_process->last_seen = time_ms;
+ /* TCP-FIN: indicates that at least one side wants to end the connection (timeout handling as well) */
+ if (flow_basic.tcp_fin_rst_seen != 0)
+ {
+ flow_basic_to_process->tcp_fin_rst_seen = 1;
+ }
+
+ if (flow_basic_to_process->type != FT_INFO)
+ {
+ return;
+ }
+ flow_to_process = (struct nDPId_flow_info *)flow_basic_to_process;
if (direction_changed != 0)
{
@@ -1997,13 +2096,10 @@ static void ndpi_process_packet(uint8_t * const args,
flow_to_process->packets_processed++;
flow_to_process->total_l4_data_len += l4_len;
- /* update timestamps, important for timeout handling */
if (flow_to_process->first_seen == 0)
{
flow_to_process->first_seen = time_ms;
}
- flow_to_process->last_seen = time_ms;
-
if (l4_len > flow_to_process->max_l4_data_len)
{
flow_to_process->max_l4_data_len = l4_len;
@@ -2022,12 +2118,6 @@ static void ndpi_process_packet(uint8_t * const args,
jsonize_packet_event(reader_thread, header, packet, type, ip_offset, flow_to_process, PACKET_EVENT_PAYLOAD_FLOW);
- /* TCP-FIN: indicates that at least one side wants to end the connection */
- if (flow.flow_fin_rst_seen != 0)
- {
- flow_to_process->flow_fin_rst_seen = 1;
- }
-
/* We currently process max. 254 packets per flow. TODO: The user should decide this! */
if (flow_to_process->ndpi_flow.num_processed_pkts == 0xFF)
{
@@ -2210,11 +2300,11 @@ static int start_reader_threads(void)
static void ndpi_shutdown_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
{
struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data;
- struct nDPId_flow_info * const flow = *(struct nDPId_flow_info **)A;
+ struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A;
(void)depth;
- if (workflow == NULL || flow == NULL)
+ if (workflow == NULL || flow_basic == NULL)
{
return;
}
@@ -2226,8 +2316,11 @@ static void ndpi_shutdown_walker(void const * const A, ndpi_VISIT which, int dep
if (which == ndpi_preorder || which == ndpi_leaf)
{
- workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow;
- workflow->total_idle_flows++;
+ workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow_basic;
+ if (flow_basic->type == FT_INFO)
+ {
+ workflow->total_idle_flows++;
+ }
}
}
@@ -2235,6 +2328,7 @@ static int stop_reader_threads(void)
{
unsigned long long int total_packets_processed = 0;
unsigned long long int total_l4_data_len = 0;
+ unsigned long long int total_flows_skipped = 0;
unsigned long long int total_flows_captured = 0;
unsigned long long int total_flows_idle = 0;
unsigned long long int total_flows_detected = 0;
@@ -2285,16 +2379,18 @@ static int stop_reader_threads(void)
total_packets_processed += reader_threads[i].workflow->packets_processed;
total_l4_data_len += reader_threads[i].workflow->total_l4_data_len;
+ total_flows_skipped += reader_threads[i].workflow->total_skipped_flows;
total_flows_captured += reader_threads[i].workflow->total_active_flows;
total_flows_idle += reader_threads[i].workflow->total_idle_flows;
total_flows_detected += reader_threads[i].workflow->detected_flow_protocols;
printf(
- "Stopping Thread %d, processed %10llu packets, %12llu bytes, total flows: %8llu, "
+ "Stopping Thread %d, processed %10llu packets, %12llu bytes, skipped flows: %8llu, processed flows: %8llu, "
"idle flows: %8llu, detected flows: %8llu\n",
reader_threads[i].array_index,
reader_threads[i].workflow->packets_processed,
reader_threads[i].workflow->total_l4_data_len,
+ reader_threads[i].workflow->total_skipped_flows,
reader_threads[i].workflow->total_active_flows,
reader_threads[i].workflow->total_idle_flows,
reader_threads[i].workflow->detected_flow_protocols);
@@ -2303,7 +2399,8 @@ static int stop_reader_threads(void)
printf("Total packets captured.: %llu\n", reader_threads[0].workflow->packets_captured);
printf("Total packets processed: %llu\n", total_packets_processed);
printf("Total layer4 data size.: %llu\n", total_l4_data_len);
- printf("Total flows captured...: %llu\n", total_flows_captured);
+ printf("Total flows ignopred...: %llu\n", total_flows_skipped);
+ printf("Total flows processed..: %llu\n", total_flows_captured);
printf("Total flows timed out..: %llu\n", total_flows_idle);
printf("Total flows detected...: %llu\n", total_flows_detected);
@@ -2374,8 +2471,8 @@ static void print_subopt_usage(void)
case MAX_IDLE_TIME:
fprintf(stderr, "%llu\n", max_idle_time);
break;
- case MAX_POST_END_FLOW_TIME:
- fprintf(stderr, "%llu\n", max_post_end_flow_time);
+ case TCP_MAX_POST_END_FLOW_TIME:
+ fprintf(stderr, "%llu\n", tcp_max_post_end_flow_time);
break;
case MAX_PACKETS_PER_FLOW_TO_SEND:
fprintf(stderr, "%llu\n", max_packets_per_flow_to_send);
@@ -2518,8 +2615,8 @@ static int parse_options(int argc, char ** argv)
case MAX_IDLE_TIME:
max_idle_time = value_llu;
break;
- case MAX_POST_END_FLOW_TIME:
- max_post_end_flow_time = value_llu;
+ case TCP_MAX_POST_END_FLOW_TIME:
+ tcp_max_post_end_flow_time = value_llu;
break;
case MAX_PACKETS_PER_FLOW_TO_SEND:
max_packets_per_flow_to_send = value_llu;
@@ -2592,12 +2689,12 @@ static int validate_options(char const * const arg0)
fprintf(stderr, "%s: Value not in range: max-idle-time[%llu] > 60\n", arg0, max_idle_time);
retval = 1;
}
- if (max_post_end_flow_time > max_idle_time)
+ if (tcp_max_post_end_flow_time > max_idle_time)
{
fprintf(stderr,
"%s: Value not in range: max-post-end-flow-time[%llu] < max_idle_time[%llu]\n",
arg0,
- max_post_end_flow_time,
+ tcp_max_post_end_flow_time,
max_idle_time);
retval = 1;
}
@@ -2608,6 +2705,12 @@ static int validate_options(char const * const arg0)
arg0);
retval = 1;
}
+ if (process_internal_initial_direction != 0 || process_external_initial_direction != 0)
+ {
+ fprintf(stderr,
+ "%s: Internal and External packet processing may lead to incorrect results for flows that were active before the daemon started.\n",
+ arg0);
+ }
return retval;
}