diff options
author | Toni Uhlig <matzeton@googlemail.com> | 2021-01-10 19:08:03 +0100 |
---|---|---|
committer | Toni Uhlig <matzeton@googlemail.com> | 2021-01-10 19:12:05 +0100 |
commit | f5d5c076a3a7f2d5ef87d195a67f5c624e09c137 (patch) | |
tree | 189e8caaaf2cd0e6857f31316f295740af6afd0f | |
parent | b82a535a45c5eaadbbc558c6063e35dd1c99ce1f (diff) |
Added internal/external packet processing modes. #2
* fixed incorrect handling of skipped flows
Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
-rw-r--r-- | config.h | 2 | ||||
-rw-r--r-- | nDPId.c | 511 |
2 files changed, 308 insertions, 205 deletions
@@ -21,7 +21,7 @@ #define nDPId_MAX_READER_THREADS 32 #define nDPId_IDLE_SCAN_PERIOD 10000 /* 10 sec */ #define nDPId_IDLE_TIME 600000 /* 600 sec */ -#define nDPId_POST_END_FLOW_TIME 60000 /* 60 sec */ +#define nDPId_TCP_POST_END_FLOW_TIME 60000 /* 60 sec */ #define nDPId_THREAD_DISTRIBUTION_SEED 0x03dd018b #define nDPId_PACKETS_PER_FLOW_TO_SEND 15 #define nDPId_FLOW_STRUCT_SEED 0x5defc104 @@ -43,31 +43,51 @@ union nDPId_ip { } v6; }; -struct nDPId_flow_info +enum nDPId_flow_type { - uint32_t flow_id; - unsigned long long int packets_processed; - uint64_t first_seen; - uint64_t last_seen; - uint64_t hashval; + FT_UNKNOWN = 0, + FT_SKIPPED, + FT_INFO +}; +/* + * Minimal per-flow information required for flow mgmt and timeout handling. + */ +struct nDPId_flow_basic +{ + enum nDPId_flow_type type; + uint64_t hashval; enum nDPId_l3_type l3_type; - union nDPId_ip src; union nDPId_ip dst; + uint8_t l4_protocol; + uint8_t tcp_fin_rst_seen : 1; + uint8_t tcp_is_midstream_flow : 1; + uint8_t reserved_00[2]; + uint16_t src_port; + uint16_t dst_port; + uint64_t last_seen; +}; + +struct nDPId_flow_skipped +{ + struct nDPId_flow_basic flow_basic; +}; + +struct nDPId_flow_info +{ + struct nDPId_flow_basic flow_basic; + + uint32_t flow_id; + unsigned long long int packets_processed; + uint64_t first_seen; uint16_t min_l4_data_len; uint16_t max_l4_data_len; unsigned long long int total_l4_data_len; - uint16_t src_port; - uint16_t dst_port; - uint8_t is_midstream_flow : 1; - uint8_t flow_fin_rst_seen : 1; uint8_t detection_completed : 1; - uint8_t reserved_00 : 5; - uint8_t reserved_01[3]; - uint8_t l4_protocol; + uint8_t reserved_00[3]; uint32_t last_ndpi_flow_struct_hash; struct ndpi_proto detected_l7_protocol; @@ -99,6 +119,7 @@ struct nDPId_workflow unsigned long long int packets_captured; unsigned long long int packets_processed; + unsigned long long int total_skipped_flows; unsigned long long int total_l4_data_len; unsigned long long int detected_flow_protocols; @@ -253,7 +274,7 @@ static unsigned long long int tick_resolution = nDPId_TICK_RESOLUTION; static unsigned long long int reader_thread_count = nDPId_MAX_READER_THREADS / 2; static unsigned long long int idle_scan_period = nDPId_IDLE_SCAN_PERIOD; static unsigned long long int max_idle_time = nDPId_IDLE_TIME; -static unsigned long long int max_post_end_flow_time = nDPId_POST_END_FLOW_TIME; +static unsigned long long int tcp_max_post_end_flow_time = nDPId_TCP_POST_END_FLOW_TIME; static unsigned long long int max_packets_per_flow_to_send = nDPId_PACKETS_PER_FLOW_TO_SEND; enum nDPId_subopts @@ -264,7 +285,7 @@ enum nDPId_subopts MAX_READER_THREADS, IDLE_SCAN_PERIOD, MAX_IDLE_TIME, - MAX_POST_END_FLOW_TIME, + TCP_MAX_POST_END_FLOW_TIME, MAX_PACKETS_PER_FLOW_TO_SEND, }; static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-thread", @@ -273,7 +294,7 @@ static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-th [MAX_READER_THREADS] = "max-reader-threads", [IDLE_SCAN_PERIOD] = "idle-scan-period", [MAX_IDLE_TIME] = "max-idle-time", - [MAX_POST_END_FLOW_TIME] = "max-post-end-flow-time", + [TCP_MAX_POST_END_FLOW_TIME] = "tcp-max-post-end-flow-time", [MAX_PACKETS_PER_FLOW_TO_SEND] = "max-packets-per-flow-to-send", NULL}; @@ -322,11 +343,13 @@ static void get_v4_ip_from_sockaddr(struct sockaddr_in * saddr, union nDPId_ip * dest->v4.ip = saddr->sin_addr.s_addr; } +#if 0 static void get_v6_ip_from_sockaddr(struct sockaddr_in6 * saddr, union nDPId_ip * dest) { dest->v6.ip6[0] = *(uint64_t *)&saddr->sin6_addr.s6_addr[0]; dest->v6.ip6[0] = *(uint64_t *)&saddr->sin6_addr.s6_addr[7]; } +#endif /* * Only IPv4 supported for now! @@ -336,6 +359,7 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev) { struct ifaddrs * ifaddrs = NULL; struct ifaddrs * ifa; + int sock = -1; if (getifaddrs(&ifaddrs) != 0 || ifaddrs == NULL) { @@ -366,7 +390,13 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev) if (strcmp(ifa->ifa_name, pcap_dev) == 0 && ifnamelen == strlen(pcap_dev)) { - int const sock = socket(ifa->ifa_addr->sa_family, SOCK_DGRAM, IPPROTO_IP); + if (type == L3_IP6) + { + syslog(LOG_DAEMON, "IPv6 network interfaces in combination with -I / -E is not supported for now"); + continue; + } + + sock = socket(ifa->ifa_addr->sa_family, SOCK_DGRAM, IPPROTO_IP); struct ifreq ifr; if (sock < 0) @@ -386,14 +416,7 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev) { break; } - if (ifa->ifa_addr->sa_family == AF_INET) - { - get_v4_ip_from_sockaddr((struct sockaddr_in *)&ifr.ifr_netmask, &pcap_dev_netmask); - } - else - { - get_v6_ip_from_sockaddr((struct sockaddr_in6 *)&ifr.ifr_netmask, &pcap_dev_netmask); - } + get_v4_ip_from_sockaddr((struct sockaddr_in *)&ifr.ifr_netmask, &pcap_dev_netmask); memset(&ifr, 0, sizeof(ifr)); memcpy(ifr.ifr_name, ifa->ifa_name, ifnamelen); @@ -403,26 +426,16 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev) { break; } - if (ifa->ifa_addr->sa_family == AF_INET) - { - get_v4_ip_from_sockaddr((struct sockaddr_in *)&ifr.ifr_netmask, &pcap_dev_ip); - } - else - { - get_v6_ip_from_sockaddr((struct sockaddr_in6 *)&ifr.ifr_netmask, &pcap_dev_ip); - } + get_v4_ip_from_sockaddr((struct sockaddr_in *)&ifr.ifr_netmask, &pcap_dev_ip); ip_netmask_to_subnet(&pcap_dev_ip, &pcap_dev_netmask, &pcap_dev_subnet, type); - char addr[INET6_ADDRSTRLEN]; - char netm[INET6_ADDRSTRLEN]; - char subn[INET6_ADDRSTRLEN]; - void * saddr = - (ifa->ifa_addr->sa_family == AF_INET ? (void *)&pcap_dev_ip.v4.ip : (void *)&pcap_dev_ip.v6.ip6); - void * snetm = (ifa->ifa_addr->sa_family == AF_INET ? (void *)&pcap_dev_netmask.v4.ip - : (void *)&pcap_dev_netmask.v6.ip6); - void * ssubn = (ifa->ifa_addr->sa_family == AF_INET ? (void *)&pcap_dev_subnet.v4.ip - : (void *)&pcap_dev_subnet.v6.ip6); + char addr[INET_ADDRSTRLEN]; + char netm[INET_ADDRSTRLEN]; + char subn[INET_ADDRSTRLEN]; + void * saddr = &pcap_dev_ip.v4.ip; + void * snetm = &pcap_dev_netmask.v4.ip; + void * ssubn = &pcap_dev_subnet.v4.ip; syslog(LOG_DAEMON, "%s address/netmask/subnet: %s/%s/%s", pcap_file_or_interface, @@ -431,11 +444,16 @@ static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev) inet_ntop(ifa->ifa_addr->sa_family, ssubn, subn, sizeof(subn))); freeifaddrs(ifaddrs); + close(sock); return 0; } } freeifaddrs(ifaddrs); + if (sock >= 0) + { + close(sock); + } return 1; } @@ -499,6 +517,7 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device) return NULL; } + workflow->total_skipped_flows = 0; workflow->total_active_flows = 0; workflow->max_active_flows = max_flows_per_thread; workflow->ndpi_flows_active = (void **)ndpi_calloc(workflow->max_active_flows, sizeof(void *)); @@ -533,10 +552,14 @@ static struct nDPId_workflow * init_workflow(char const * const file_or_device) static void ndpi_flow_info_freer(void * const node) { - struct nDPId_flow_info * const flow = (struct nDPId_flow_info *)node; + struct nDPId_flow_basic * const flow_basic = (struct nDPId_flow_basic *)node; - ndpi_free_flow_data(&flow->ndpi_flow); - ndpi_free(flow); + if (flow_basic->type == FT_INFO) + { + struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic; + ndpi_free_flow_data(&flow_info->ndpi_flow); + } + ndpi_free(flow_basic); } static void free_workflow(struct nDPId_workflow ** const workflow) @@ -644,7 +667,7 @@ static int setup_reader_threads(void) return 0; } -static int ip_tuples_equal(struct nDPId_flow_info const * const A, struct nDPId_flow_info const * const B) +static int ip_tuples_equal(struct nDPId_flow_basic const * const A, struct nDPId_flow_basic const * const B) { // generate a warning if the enum changes switch (A->l3_type) @@ -665,7 +688,7 @@ static int ip_tuples_equal(struct nDPId_flow_info const * const A, struct nDPId_ return 0; } -static int ip_tuples_compare(struct nDPId_flow_info const * const A, struct nDPId_flow_info const * const B) +static int ip_tuples_compare(struct nDPId_flow_basic const * const A, struct nDPId_flow_basic const * const B) { // generate a warning if the enum changes switch (A->l3_type) @@ -712,11 +735,11 @@ static int ip_tuples_compare(struct nDPId_flow_info const * const A, struct nDPI static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) { struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data; - struct nDPId_flow_info * const flow = *(struct nDPId_flow_info **)A; + struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; (void)depth; - if (workflow == NULL || flow == NULL) + if (workflow == NULL || flow_basic == NULL) { return; } @@ -728,46 +751,50 @@ static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int de if (which == ndpi_preorder || which == ndpi_leaf) { - if ((flow->flow_fin_rst_seen == 1 && flow->last_seen + max_post_end_flow_time < workflow->last_time) || - flow->last_seen + max_idle_time < workflow->last_time) + if (flow_basic->last_seen + max_idle_time < workflow->last_time || + (flow_basic->tcp_fin_rst_seen == 1 && + flow_basic->last_seen + tcp_max_post_end_flow_time < workflow->last_time)) { - workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow; - workflow->total_idle_flows++; + workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow_basic; + if (flow_basic->type == FT_INFO) + { + workflow->total_idle_flows++; + } } } } static int ndpi_workflow_node_cmp(void const * const A, void const * const B) { - struct nDPId_flow_info const * const flow_info_a = (struct nDPId_flow_info *)A; - struct nDPId_flow_info const * const flow_info_b = (struct nDPId_flow_info *)B; + struct nDPId_flow_basic const * const flow_basic_a = (struct nDPId_flow_basic *)A; + struct nDPId_flow_basic const * const flow_basic_b = (struct nDPId_flow_basic *)B; - if (flow_info_a->hashval < flow_info_b->hashval) + if (flow_basic_a->hashval < flow_basic_b->hashval) { - return (-1); + return -1; } - else if (flow_info_a->hashval > flow_info_b->hashval) + else if (flow_basic_a->hashval > flow_basic_b->hashval) { - return (1); + return 1; } /* Flows have the same hash */ - if (flow_info_a->l4_protocol < flow_info_b->l4_protocol) + if (flow_basic_a->l4_protocol < flow_basic_b->l4_protocol) { - return (-1); + return -1; } - else if (flow_info_a->l4_protocol > flow_info_b->l4_protocol) + else if (flow_basic_a->l4_protocol > flow_basic_b->l4_protocol) { - return (1); + return 1; } - if (ip_tuples_equal(flow_info_a, flow_info_b) != 0 && flow_info_a->src_port == flow_info_b->src_port && - flow_info_a->dst_port == flow_info_b->dst_port) + if (ip_tuples_equal(flow_basic_a, flow_basic_b) != 0 && flow_basic_a->src_port == flow_basic_b->src_port && + flow_basic_a->dst_port == flow_basic_b->dst_port) { - return (0); + return 0; } - return ip_tuples_compare(flow_info_a, flow_info_b); + return ip_tuples_compare(flow_basic_a, flow_basic_b); } static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, size_t idle_scan_index) @@ -776,42 +803,48 @@ static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, while (workflow->cur_idle_flows > 0) { - struct nDPId_flow_info * const f = - (struct nDPId_flow_info *)workflow->ndpi_flows_idle[--workflow->cur_idle_flows]; + struct nDPId_flow_basic * const flow_basic = + (struct nDPId_flow_basic *)workflow->ndpi_flows_idle[--workflow->cur_idle_flows]; - if (f->detection_completed == 0) + if (flow_basic->type == FT_INFO) { - uint8_t protocol_was_guessed = 0; + struct nDPId_flow_info * const flow_info = (struct nDPId_flow_info *)flow_basic; - if (ndpi_is_protocol_detected(workflow->ndpi_struct, f->guessed_l7_protocol) == 0) - { - f->guessed_l7_protocol = - ndpi_detection_giveup(workflow->ndpi_struct, &f->ndpi_flow, 1, &protocol_was_guessed); - } - else + if (flow_info->detection_completed == 0) { - protocol_was_guessed = 1; - } + uint8_t protocol_was_guessed = 0; - if (protocol_was_guessed != 0) + if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_info->guessed_l7_protocol) == 0) + { + flow_info->guessed_l7_protocol = + ndpi_detection_giveup(workflow->ndpi_struct, &flow_info->ndpi_flow, 1, &protocol_was_guessed); + } + else + { + protocol_was_guessed = 1; + } + + if (protocol_was_guessed != 0) + { + jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_GUESSED); + } + else + { + jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_NOT_DETECTED); + } + } + if (flow_basic->tcp_fin_rst_seen != 0) { - jsonize_flow_event(reader_thread, f, FLOW_EVENT_GUESSED); + jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_END); } else { - jsonize_flow_event(reader_thread, f, FLOW_EVENT_NOT_DETECTED); + jsonize_flow_event(reader_thread, flow_info, FLOW_EVENT_IDLE); } } - if (f->flow_fin_rst_seen != 0) - { - jsonize_flow_event(reader_thread, f, FLOW_EVENT_END); - } - else - { - jsonize_flow_event(reader_thread, f, FLOW_EVENT_IDLE); - } - ndpi_tdelete(f, &workflow->ndpi_flows_active[idle_scan_index], ndpi_workflow_node_cmp); - ndpi_flow_info_freer(f); + + ndpi_tdelete(flow_basic, &workflow->ndpi_flows_active[idle_scan_index], ndpi_workflow_node_cmp); + ndpi_flow_info_freer(flow_basic); workflow->cur_active_flows--; } } @@ -838,26 +871,26 @@ static void jsonize_l3_l4(struct nDPId_workflow * const workflow, struct nDPId_f char src_name[48] = {}; char dst_name[48] = {}; - switch (flow->l3_type) + switch (flow->flow_basic.l3_type) { case L3_IP: ndpi_serialize_string_string(serializer, "l3_proto", "ip4"); - if (inet_ntop(AF_INET, &flow->src.v4.ip, src_name, sizeof(src_name)) == NULL) + if (inet_ntop(AF_INET, &flow->flow_basic.src.v4.ip, src_name, sizeof(src_name)) == NULL) { syslog(LOG_DAEMON | LOG_ERR, "Could not convert IPv4 source ip to string: %s", strerror(errno)); } - if (inet_ntop(AF_INET, &flow->dst.v4.ip, dst_name, sizeof(dst_name)) == NULL) + if (inet_ntop(AF_INET, &flow->flow_basic.dst.v4.ip, dst_name, sizeof(dst_name)) == NULL) { syslog(LOG_DAEMON | LOG_ERR, "Could not convert IPv4 destination ip to string: %s", strerror(errno)); } break; case L3_IP6: ndpi_serialize_string_string(serializer, "l3_proto", "ip6"); - if (inet_ntop(AF_INET6, &flow->src.v6.ip6[0], src_name, sizeof(src_name)) == NULL) + if (inet_ntop(AF_INET6, &flow->flow_basic.src.v6.ip6[0], src_name, sizeof(src_name)) == NULL) { syslog(LOG_DAEMON | LOG_ERR, "Could not convert IPv6 source ip to string: %s", strerror(errno)); } - if (inet_ntop(AF_INET6, &flow->dst.v6.ip6[0], dst_name, sizeof(dst_name)) == NULL) + if (inet_ntop(AF_INET6, &flow->flow_basic.dst.v6.ip6[0], dst_name, sizeof(dst_name)) == NULL) { syslog(LOG_DAEMON | LOG_ERR, "Could not convert IPv6 destination ip to string: %s", strerror(errno)); } @@ -871,16 +904,16 @@ static void jsonize_l3_l4(struct nDPId_workflow * const workflow, struct nDPId_f ndpi_serialize_string_string(serializer, "src_ip", src_name); ndpi_serialize_string_string(serializer, "dst_ip", dst_name); - if (flow->src_port) + if (flow->flow_basic.src_port) { - ndpi_serialize_string_uint32(serializer, "src_port", flow->src_port); + ndpi_serialize_string_uint32(serializer, "src_port", flow->flow_basic.src_port); } - if (flow->dst_port) + if (flow->flow_basic.dst_port) { - ndpi_serialize_string_uint32(serializer, "dst_port", flow->dst_port); + ndpi_serialize_string_uint32(serializer, "dst_port", flow->flow_basic.dst_port); } - switch (flow->l4_protocol) + switch (flow->flow_basic.l4_protocol) { case IPPROTO_TCP: ndpi_serialize_string_string(serializer, "l4_proto", "tcp"); @@ -895,7 +928,7 @@ static void jsonize_l3_l4(struct nDPId_workflow * const workflow, struct nDPId_f ndpi_serialize_string_string(serializer, "l4_proto", "icmp6"); break; default: - ndpi_serialize_string_uint32(serializer, "l4_proto", flow->l4_protocol); + ndpi_serialize_string_uint32(serializer, "l4_proto", flow->flow_basic.l4_protocol); break; } } @@ -938,7 +971,9 @@ static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enu ndpi_serialize_string_int64(&workflow->ndpi_serializer, "reader-thread-count", reader_thread_count); ndpi_serialize_string_int64(&workflow->ndpi_serializer, "idle-scan-period", idle_scan_period); ndpi_serialize_string_int64(&workflow->ndpi_serializer, "max-idle-time", max_idle_time); - ndpi_serialize_string_int64(&workflow->ndpi_serializer, "max-post-end-flow-time", max_post_end_flow_time); + ndpi_serialize_string_int64(&workflow->ndpi_serializer, + "tcp-max-post-end-flow-time", + tcp_max_post_end_flow_time); ndpi_serialize_string_int64(&workflow->ndpi_serializer, "max-packets-per-flow-to-send", max_packets_per_flow_to_send); @@ -951,21 +986,24 @@ static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_fl ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_id", flow->flow_id); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packet_id", flow->packets_processed); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_first_seen", flow->first_seen); - ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_last_seen", flow->last_seen); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_last_seen", flow->flow_basic.last_seen); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_tot_l4_data_len", flow->total_l4_data_len); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_min_l4_data_len", flow->min_l4_data_len); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_max_l4_data_len", flow->max_l4_data_len); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_avg_l4_data_len", (flow->packets_processed > 0 ? flow->total_l4_data_len / flow->packets_processed : 0)); - ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow->is_midstream_flow); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow->flow_basic.tcp_is_midstream_flow); } static int connect_to_json_socket(struct nDPId_reader_thread * const reader_thread) { struct sockaddr_un saddr; - close(reader_thread->json_sockfd); + if (reader_thread->json_sockfd >= 0) + { + close(reader_thread->json_sockfd); + } reader_thread->json_sockfd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); if (reader_thread->json_sockfd < 0) @@ -1623,13 +1661,50 @@ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thre return 0; } +static struct nDPId_flow_basic * add_new_flow(struct nDPId_workflow * const workflow, + struct nDPId_flow_basic * orig_flow_basic, + enum nDPId_flow_type type, + size_t hashed_index) +{ + size_t s; + + switch (type) + { + case FT_UNKNOWN: + return NULL; + case FT_SKIPPED: + workflow->total_skipped_flows++; + s = sizeof(struct nDPId_flow_skipped); + break; + case FT_INFO: + s = sizeof(struct nDPId_flow_info); + break; + } + + struct nDPId_flow_basic * flow_basic = (struct nDPId_flow_basic *)ndpi_malloc(s); + if (flow_basic == NULL) + { + return NULL; + } + memset(flow_basic, 0, s); + *flow_basic = *orig_flow_basic; + flow_basic->type = type; + if (ndpi_tsearch(flow_basic, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp) == NULL) + { + ndpi_free(flow_basic); + return NULL; + } + + return flow_basic; +} + static void ndpi_process_packet(uint8_t * const args, struct pcap_pkthdr const * const header, uint8_t const * const packet) { struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)args; struct nDPId_workflow * workflow; - struct nDPId_flow_info flow = {}; + struct nDPId_flow_basic flow_basic = {}; size_t hashed_index; void * tree_result; @@ -1719,10 +1794,10 @@ static void ndpi_process_packet(uint8_t * const args, return; } - flow.l3_type = L3_IP; + flow_basic.l3_type = L3_IP; if (ndpi_detection_get_l4( - (uint8_t *)ip, ip_size, &l4_ptr, &l4_len, &flow.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0) + (uint8_t *)ip, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0) { jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf( @@ -1730,9 +1805,9 @@ static void ndpi_process_packet(uint8_t * const args, return; } - flow.src.v4.ip = ip->saddr; - flow.dst.v4.ip = ip->daddr; - uint32_t min_addr = (flow.src.v4.ip > flow.dst.v4.ip ? flow.dst.v4.ip : flow.src.v4.ip); + flow_basic.src.v4.ip = ip->saddr; + flow_basic.dst.v4.ip = ip->daddr; + uint32_t min_addr = (flow_basic.src.v4.ip > flow_basic.dst.v4.ip ? flow_basic.dst.v4.ip : flow_basic.src.v4.ip); thread_index = min_addr + ip->protocol; } else if (ip6 != NULL) @@ -1750,9 +1825,9 @@ static void ndpi_process_packet(uint8_t * const args, return; } - flow.l3_type = L3_IP6; + flow_basic.l3_type = L3_IP6; if (ndpi_detection_get_l4( - (uint8_t *)ip6, ip_size, &l4_ptr, &l4_len, &flow.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0) + (uint8_t *)ip6, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0) { jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); jsonize_basic_eventf( @@ -1760,20 +1835,20 @@ static void ndpi_process_packet(uint8_t * const args, return; } - flow.src.v6.ip6[0] = ip6->ip6_src.u6_addr.u6_addr64[0]; - flow.src.v6.ip6[1] = ip6->ip6_src.u6_addr.u6_addr64[1]; - flow.dst.v6.ip6[0] = ip6->ip6_dst.u6_addr.u6_addr64[0]; - flow.dst.v6.ip6[1] = ip6->ip6_dst.u6_addr.u6_addr64[1]; + flow_basic.src.v6.ip6[0] = ip6->ip6_src.u6_addr.u6_addr64[0]; + flow_basic.src.v6.ip6[1] = ip6->ip6_src.u6_addr.u6_addr64[1]; + flow_basic.dst.v6.ip6[0] = ip6->ip6_dst.u6_addr.u6_addr64[0]; + flow_basic.dst.v6.ip6[1] = ip6->ip6_dst.u6_addr.u6_addr64[1]; uint64_t min_addr[2]; - if (flow.src.v6.ip6[0] > flow.dst.v6.ip6[0] && flow.src.v6.ip6[1] > flow.dst.v6.ip6[1]) + if (flow_basic.src.v6.ip6[0] > flow_basic.dst.v6.ip6[0] && flow_basic.src.v6.ip6[1] > flow_basic.dst.v6.ip6[1]) { - min_addr[0] = flow.dst.v6.ip6[0]; - min_addr[1] = flow.dst.v6.ip6[0]; + min_addr[0] = flow_basic.dst.v6.ip6[0]; + min_addr[1] = flow_basic.dst.v6.ip6[0]; } else { - min_addr[0] = flow.src.v6.ip6[0]; - min_addr[1] = flow.src.v6.ip6[0]; + min_addr[0] = flow_basic.src.v6.ip6[0]; + min_addr[1] = flow_basic.src.v6.ip6[0]; } thread_index = min_addr[0] + min_addr[1] + ip6->ip6_hdr.ip6_un1_nxt; } @@ -1785,7 +1860,7 @@ static void ndpi_process_packet(uint8_t * const args, } /* process layer4 e.g. TCP / UDP */ - if (flow.l4_protocol == IPPROTO_TCP) + if (flow_basic.l4_protocol == IPPROTO_TCP) { const struct ndpi_tcphdr * tcp; @@ -1802,12 +1877,12 @@ static void ndpi_process_packet(uint8_t * const args, return; } tcp = (struct ndpi_tcphdr *)l4_ptr; - flow.is_midstream_flow = (tcp->syn == 0 ? 1 : 0); - flow.flow_fin_rst_seen = (tcp->fin == 1 || tcp->rst == 1 ? 1 : 0); - flow.src_port = ntohs(tcp->source); - flow.dst_port = ntohs(tcp->dest); + flow_basic.tcp_fin_rst_seen = (tcp->fin == 1 || tcp->rst == 1 ? 1 : 0); + flow_basic.tcp_is_midstream_flow = (tcp->syn == 0 ? 1 : 0); + flow_basic.src_port = ntohs(tcp->source); + flow_basic.dst_port = ntohs(tcp->dest); } - else if (flow.l4_protocol == IPPROTO_UDP) + else if (flow_basic.l4_protocol == IPPROTO_UDP) { const struct ndpi_udphdr * udp; @@ -1824,12 +1899,12 @@ static void ndpi_process_packet(uint8_t * const args, return; } udp = (struct ndpi_udphdr *)l4_ptr; - flow.src_port = ntohs(udp->source); - flow.dst_port = ntohs(udp->dest); + flow_basic.src_port = ntohs(udp->source); + flow_basic.dst_port = ntohs(udp->dest); } /* distribute flows to threads while keeping stability (same flow goes always to same thread) */ - thread_index += (flow.src_port < flow.dst_port ? flow.dst_port : flow.src_port); + thread_index += (flow_basic.src_port < flow_basic.dst_port ? flow_basic.dst_port : flow_basic.src_port); thread_index %= reader_thread_count; if (thread_index != reader_thread->array_index) { @@ -1839,69 +1914,69 @@ static void ndpi_process_packet(uint8_t * const args, workflow->total_l4_data_len += l4_len; /* calculate flow hash for btree find, search(insert) */ - switch (flow.l3_type) + switch (flow_basic.l3_type) { case L3_IP: - if (ndpi_flowv4_flow_hash(flow.l4_protocol, - flow.src.v4.ip, - flow.dst.v4.ip, - flow.src_port, - flow.dst_port, + if (ndpi_flowv4_flow_hash(flow_basic.l4_protocol, + flow_basic.src.v4.ip, + flow_basic.dst.v4.ip, + flow_basic.src_port, + flow_basic.dst_port, 0, 0, - (uint8_t *)&flow.hashval, - sizeof(flow.hashval)) != 0) + (uint8_t *)&flow_basic.hashval, + sizeof(flow_basic.hashval)) != 0) { - flow.hashval = flow.src.v4.ip + flow.dst.v4.ip; // fallback + flow_basic.hashval = flow_basic.src.v4.ip + flow_basic.dst.v4.ip; // fallback } break; case L3_IP6: - if (ndpi_flowv6_flow_hash(flow.l4_protocol, + if (ndpi_flowv6_flow_hash(flow_basic.l4_protocol, &ip6->ip6_src, &ip6->ip6_dst, - flow.src_port, - flow.dst_port, + flow_basic.src_port, + flow_basic.dst_port, 0, 0, - (uint8_t *)&flow.hashval, - sizeof(flow.hashval)) != 0) + (uint8_t *)&flow_basic.hashval, + sizeof(flow_basic.hashval)) != 0) { - flow.hashval = flow.src.v6.ip6[0] + flow.src.v6.ip6[1]; - flow.hashval += flow.dst.v6.ip6[0] + flow.dst.v6.ip6[1]; + flow_basic.hashval = flow_basic.src.v6.ip6[0] + flow_basic.src.v6.ip6[1]; + flow_basic.hashval += flow_basic.dst.v6.ip6[0] + flow_basic.dst.v6.ip6[1]; } break; } - flow.hashval += flow.l4_protocol + flow.src_port + flow.dst_port; + flow_basic.hashval += flow_basic.l4_protocol + flow_basic.src_port + flow_basic.dst_port; - hashed_index = flow.hashval % workflow->max_active_flows; - tree_result = ndpi_tfind(&flow, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp); + hashed_index = flow_basic.hashval % workflow->max_active_flows; + tree_result = ndpi_tfind(&flow_basic, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp); if (tree_result == NULL) { /* flow not found in btree: switch src <-> dst and try to find it again */ - uint64_t orig_src_ip[2] = {flow.src.v6.ip6[0], flow.src.v6.ip6[1]}; - uint64_t orig_dst_ip[2] = {flow.dst.v6.ip6[0], flow.dst.v6.ip6[1]}; - uint16_t orig_src_port = flow.src_port; - uint16_t orig_dst_port = flow.dst_port; - - flow.src.v6.ip6[0] = orig_dst_ip[0]; - flow.src.v6.ip6[1] = orig_dst_ip[1]; - flow.dst.v6.ip6[0] = orig_src_ip[0]; - flow.dst.v6.ip6[1] = orig_src_ip[1]; - flow.src_port = orig_dst_port; - flow.dst_port = orig_src_port; - - tree_result = ndpi_tfind(&flow, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp); + uint64_t orig_src_ip[2] = {flow_basic.src.v6.ip6[0], flow_basic.src.v6.ip6[1]}; + uint64_t orig_dst_ip[2] = {flow_basic.dst.v6.ip6[0], flow_basic.dst.v6.ip6[1]}; + uint16_t orig_src_port = flow_basic.src_port; + uint16_t orig_dst_port = flow_basic.dst_port; + + flow_basic.src.v6.ip6[0] = orig_dst_ip[0]; + flow_basic.src.v6.ip6[1] = orig_dst_ip[1]; + flow_basic.dst.v6.ip6[0] = orig_src_ip[0]; + flow_basic.dst.v6.ip6[1] = orig_src_ip[1]; + flow_basic.src_port = orig_dst_port; + flow_basic.dst_port = orig_src_port; + + tree_result = ndpi_tfind(&flow_basic, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp); if (tree_result != NULL) { direction_changed = 1; } - flow.src.v6.ip6[0] = orig_src_ip[0]; - flow.src.v6.ip6[1] = orig_src_ip[1]; - flow.dst.v6.ip6[0] = orig_dst_ip[0]; - flow.dst.v6.ip6[1] = orig_dst_ip[1]; - flow.src_port = orig_src_port; - flow.dst_port = orig_dst_port; + flow_basic.src.v6.ip6[0] = orig_src_ip[0]; + flow_basic.src.v6.ip6[1] = orig_src_ip[1]; + flow_basic.dst.v6.ip6[0] = orig_dst_ip[0]; + flow_basic.dst.v6.ip6[1] = orig_dst_ip[1]; + flow_basic.src_port = orig_src_port; + flow_basic.dst_port = orig_dst_port; } if (tree_result == NULL) @@ -1910,15 +1985,33 @@ static void ndpi_process_packet(uint8_t * const args, if (process_internal_initial_direction != 0) { - if (is_ip_in_subnet(&flow.src, &pcap_dev_netmask, &pcap_dev_subnet, flow.l3_type) == 0) + if (is_ip_in_subnet(&flow_basic.src, &pcap_dev_netmask, &pcap_dev_subnet, flow_basic.l3_type) == 0) { + if (add_new_flow(workflow, &flow_basic, FT_SKIPPED, hashed_index) == NULL) + { + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf(reader_thread, + FLOW_MEMORY_ALLOCATION_FAILED, + "%s%zu", + "size", + sizeof(struct nDPId_flow_skipped)); + } return; } } else if (process_external_initial_direction != 0) { - if (is_ip_in_subnet(&flow.src, &pcap_dev_netmask, &pcap_dev_subnet, flow.l3_type) != 0) + if (is_ip_in_subnet(&flow_basic.src, &pcap_dev_netmask, &pcap_dev_subnet, flow_basic.l3_type) != 0) { + if (add_new_flow(workflow, &flow_basic, FT_SKIPPED, hashed_index) == NULL) + { + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); + jsonize_basic_eventf(reader_thread, + FLOW_MEMORY_ALLOCATION_FAILED, + "%s%zu", + "size", + sizeof(struct nDPId_flow_skipped)); + } return; } } @@ -1938,7 +2031,7 @@ static void ndpi_process_packet(uint8_t * const args, return; } - flow_to_process = (struct nDPId_flow_info *)ndpi_malloc(sizeof(*flow_to_process)); + flow_to_process = (struct nDPId_flow_info *)add_new_flow(workflow, &flow_basic, FT_INFO, hashed_index); if (flow_to_process == NULL) { jsonize_packet_event(reader_thread, header, packet, type, ip_offset, NULL, PACKET_EVENT_PAYLOAD); @@ -1949,7 +2042,6 @@ static void ndpi_process_packet(uint8_t * const args, workflow->cur_active_flows++; workflow->total_active_flows++; - memcpy(flow_to_process, &flow, sizeof(*flow_to_process)); #ifdef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_4 flow_to_process->flow_id = __sync_fetch_and_add(&global_flow_id, 1); #else @@ -1968,12 +2060,6 @@ static void ndpi_process_packet(uint8_t * const args, 0, (SIZEOF_ID_STRUCT > sizeof(struct ndpi_id_struct) ? SIZEOF_ID_STRUCT : sizeof(struct ndpi_id_struct))); - if (ndpi_tsearch(flow_to_process, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp) == NULL) - { - /* Possible Leak, but should not happen as we'd abort earlier. */ - return; - } - ndpi_src = &flow_to_process->ndpi_src; ndpi_dst = &flow_to_process->ndpi_dst; @@ -1981,7 +2067,20 @@ static void ndpi_process_packet(uint8_t * const args, } else { - flow_to_process = *(struct nDPId_flow_info **)tree_result; + struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result; + /* Update last seen timestamp for timeout handling. */ + flow_basic_to_process->last_seen = time_ms; + /* TCP-FIN: indicates that at least one side wants to end the connection (timeout handling as well) */ + if (flow_basic.tcp_fin_rst_seen != 0) + { + flow_basic_to_process->tcp_fin_rst_seen = 1; + } + + if (flow_basic_to_process->type != FT_INFO) + { + return; + } + flow_to_process = (struct nDPId_flow_info *)flow_basic_to_process; if (direction_changed != 0) { @@ -1997,13 +2096,10 @@ static void ndpi_process_packet(uint8_t * const args, flow_to_process->packets_processed++; flow_to_process->total_l4_data_len += l4_len; - /* update timestamps, important for timeout handling */ if (flow_to_process->first_seen == 0) { flow_to_process->first_seen = time_ms; } - flow_to_process->last_seen = time_ms; - if (l4_len > flow_to_process->max_l4_data_len) { flow_to_process->max_l4_data_len = l4_len; @@ -2022,12 +2118,6 @@ static void ndpi_process_packet(uint8_t * const args, jsonize_packet_event(reader_thread, header, packet, type, ip_offset, flow_to_process, PACKET_EVENT_PAYLOAD_FLOW); - /* TCP-FIN: indicates that at least one side wants to end the connection */ - if (flow.flow_fin_rst_seen != 0) - { - flow_to_process->flow_fin_rst_seen = 1; - } - /* We currently process max. 254 packets per flow. TODO: The user should decide this! */ if (flow_to_process->ndpi_flow.num_processed_pkts == 0xFF) { @@ -2210,11 +2300,11 @@ static int start_reader_threads(void) static void ndpi_shutdown_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) { struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data; - struct nDPId_flow_info * const flow = *(struct nDPId_flow_info **)A; + struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; (void)depth; - if (workflow == NULL || flow == NULL) + if (workflow == NULL || flow_basic == NULL) { return; } @@ -2226,8 +2316,11 @@ static void ndpi_shutdown_walker(void const * const A, ndpi_VISIT which, int dep if (which == ndpi_preorder || which == ndpi_leaf) { - workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow; - workflow->total_idle_flows++; + workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow_basic; + if (flow_basic->type == FT_INFO) + { + workflow->total_idle_flows++; + } } } @@ -2235,6 +2328,7 @@ static int stop_reader_threads(void) { unsigned long long int total_packets_processed = 0; unsigned long long int total_l4_data_len = 0; + unsigned long long int total_flows_skipped = 0; unsigned long long int total_flows_captured = 0; unsigned long long int total_flows_idle = 0; unsigned long long int total_flows_detected = 0; @@ -2285,16 +2379,18 @@ static int stop_reader_threads(void) total_packets_processed += reader_threads[i].workflow->packets_processed; total_l4_data_len += reader_threads[i].workflow->total_l4_data_len; + total_flows_skipped += reader_threads[i].workflow->total_skipped_flows; total_flows_captured += reader_threads[i].workflow->total_active_flows; total_flows_idle += reader_threads[i].workflow->total_idle_flows; total_flows_detected += reader_threads[i].workflow->detected_flow_protocols; printf( - "Stopping Thread %d, processed %10llu packets, %12llu bytes, total flows: %8llu, " + "Stopping Thread %d, processed %10llu packets, %12llu bytes, skipped flows: %8llu, processed flows: %8llu, " "idle flows: %8llu, detected flows: %8llu\n", reader_threads[i].array_index, reader_threads[i].workflow->packets_processed, reader_threads[i].workflow->total_l4_data_len, + reader_threads[i].workflow->total_skipped_flows, reader_threads[i].workflow->total_active_flows, reader_threads[i].workflow->total_idle_flows, reader_threads[i].workflow->detected_flow_protocols); @@ -2303,7 +2399,8 @@ static int stop_reader_threads(void) printf("Total packets captured.: %llu\n", reader_threads[0].workflow->packets_captured); printf("Total packets processed: %llu\n", total_packets_processed); printf("Total layer4 data size.: %llu\n", total_l4_data_len); - printf("Total flows captured...: %llu\n", total_flows_captured); + printf("Total flows ignopred...: %llu\n", total_flows_skipped); + printf("Total flows processed..: %llu\n", total_flows_captured); printf("Total flows timed out..: %llu\n", total_flows_idle); printf("Total flows detected...: %llu\n", total_flows_detected); @@ -2374,8 +2471,8 @@ static void print_subopt_usage(void) case MAX_IDLE_TIME: fprintf(stderr, "%llu\n", max_idle_time); break; - case MAX_POST_END_FLOW_TIME: - fprintf(stderr, "%llu\n", max_post_end_flow_time); + case TCP_MAX_POST_END_FLOW_TIME: + fprintf(stderr, "%llu\n", tcp_max_post_end_flow_time); break; case MAX_PACKETS_PER_FLOW_TO_SEND: fprintf(stderr, "%llu\n", max_packets_per_flow_to_send); @@ -2518,8 +2615,8 @@ static int parse_options(int argc, char ** argv) case MAX_IDLE_TIME: max_idle_time = value_llu; break; - case MAX_POST_END_FLOW_TIME: - max_post_end_flow_time = value_llu; + case TCP_MAX_POST_END_FLOW_TIME: + tcp_max_post_end_flow_time = value_llu; break; case MAX_PACKETS_PER_FLOW_TO_SEND: max_packets_per_flow_to_send = value_llu; @@ -2592,12 +2689,12 @@ static int validate_options(char const * const arg0) fprintf(stderr, "%s: Value not in range: max-idle-time[%llu] > 60\n", arg0, max_idle_time); retval = 1; } - if (max_post_end_flow_time > max_idle_time) + if (tcp_max_post_end_flow_time > max_idle_time) { fprintf(stderr, "%s: Value not in range: max-post-end-flow-time[%llu] < max_idle_time[%llu]\n", arg0, - max_post_end_flow_time, + tcp_max_post_end_flow_time, max_idle_time); retval = 1; } @@ -2608,6 +2705,12 @@ static int validate_options(char const * const arg0) arg0); retval = 1; } + if (process_internal_initial_direction != 0 || process_external_initial_direction != 0) + { + fprintf(stderr, + "%s: Internal and External packet processing may lead to incorrect results for flows that were active before the daemon started.\n", + arg0); + } return retval; } |