diff options
Diffstat (limited to 'nDPId.c')
-rw-r--r-- | nDPId.c | 5826 |
1 files changed, 5826 insertions, 0 deletions
diff --git a/nDPId.c b/nDPId.c new file mode 100644 index 000000000..61b17cd0f --- /dev/null +++ b/nDPId.c @@ -0,0 +1,5826 @@ +#if defined(__FreeBSD__) || defined(__APPLE__) +#include <sys/types.h> +#endif +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <ifaddrs.h> +#include <net/ethernet.h> +#include <net/if.h> +#include <netinet/in.h> +#include <ndpi_api.h> +#include <ndpi_classify.h> +#include <ndpi_main.h> +#include <ndpi_typedefs.h> +#include <pcap/dlt.h> +#include <pcap/pcap.h> +#include <pthread.h> +#include <signal.h> +#include <stdarg.h> +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L +#include <stddef.h> +#endif +#include <stdio.h> +#include <stdlib.h> +#include <sys/ioctl.h> +#if !defined(__FreeBSD__) && !defined(__APPLE__) +#include <sys/signalfd.h> +#endif +#include <sys/un.h> +#include <unistd.h> +#ifdef ENABLE_ZLIB +#include <zlib.h> +#endif + +#include "config.h" +#include "nDPIsrvd.h" +#include "nio.h" +#ifdef ENABLE_PFRING +#include "npfring.h" +#endif +#include "utils.h" + +#ifndef ETHERTYPE_DCE +#define ETHERTYPE_DCE 0x8903 +#endif + +#ifndef ETHERTYPE_PAE +#define ETHERTYPE_PAE 0x888e +#endif + +#ifndef DLT_DSA_TAG_DSA +#define DLT_DSA_TAG_DSA 284 +#endif + +#ifndef DLT_DSA_TAG_EDSA +#define DLT_DSA_TAG_EDSA 285 +#endif + +#define NDPI_VERSION_CHECK ((NDPI_MAJOR == 4 && NDPI_MINOR < 9) || NDPI_MAJOR < 4) + +#if NDPI_VERSION_CHECK +#error "nDPI >= 4.9.0 required" +#endif + +#if nDPId_MAX_READER_THREADS <= 0 +#error "Invalid value for nDPId_MAX_READER_THREADS" +#endif + +#if nDPId_FLOW_SCAN_INTERVAL > nDPId_GENERIC_IDLE_TIME || nDPId_FLOW_SCAN_INTERVAL > nDPId_ICMP_IDLE_TIME || \ + nDPId_FLOW_SCAN_INTERVAL > nDPId_TCP_IDLE_TIME || nDPId_FLOW_SCAN_INTERVAL > nDPId_UDP_IDLE_TIME +#error "Invalid value for nDPId_FLOW_SCAN_INTERVAL" +#endif + +#if (nDPId_PACKETS_PLEN_MAX * 3) /* base64 encoded! */ > NETWORK_BUFFER_MAX_SIZE +#error "Invalid value for nDPId_PACKETS_PLEN_MAX" +#endif + +#if defined(ENABLE_MEMORY_PROFILING) && !defined(ENABLE_MEMORY_STATUS) +#error "ENABLE_MEMORY_PROFILING requires ENABLE_MEMORY_STATUS to make it work!" +#endif + +/* MIPS* does not support Compare and Swap. Use traditional locking as fallback. */ +#if !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) || !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8) +#define MT_VALUE(name, type) \ + struct name \ + { \ + volatile uint64_t var; \ + pthread_mutex_t var_mutex; \ + } name +#define MT_INIT(value) \ + { \ + value, PTHREAD_MUTEX_INITIALIZER \ + } +#define MT_INIT2(name, value) \ + do \ + { \ + name.var = value; \ + pthread_mutex_init(&name.var_mutex, NULL); \ + } while (0) + +static inline uint64_t mt_pt_get_and_add(volatile uint64_t * value, uint64_t add, pthread_mutex_t * mutex) +{ + uint64_t result; + pthread_mutex_lock(mutex); + result = *value; + *value += add; + pthread_mutex_unlock(mutex); + return result; +} + +#define MT_GET_AND_ADD(name, value) mt_pt_get_and_add(&name.var, value, &name.var_mutex) + +static inline uint64_t mt_pt_get_and_sub(volatile uint64_t * value, uint64_t sub, pthread_mutex_t * mutex) +{ + uint64_t result; + pthread_mutex_lock(mutex); + result = *value; + *value -= sub; + pthread_mutex_unlock(mutex); + return result; +} +#define MT_GET_AND_SUB(name, value) mt_pt_get_and_sub(&name.var, value, &name.var_mutex) +#else +#define MT_VALUE(name, type) volatile type name +#define MT_INIT(value) value +#define MT_INIT2(name, value) \ + do \ + { \ + name = value; \ + } while (0) +#define MT_GET_AND_ADD(name, value) __sync_fetch_and_add(&name, value) +#define MT_GET_AND_SUB(name, value) __sync_fetch_and_sub(&name, value) +#endif + +enum nDPId_l3_type +{ + L3_IP, + L3_IP6 +}; + +union nDPId_ip +{ + struct + { + uint32_t ip; + } v4; + struct + { + union + { + uint64_t ip[2]; + uint32_t ip_u32[4]; + }; + } v6; +}; + +enum nDPId_flow_state +{ + FS_UNKNOWN = 0, // should never happen, bug otherwise + FS_SKIPPED, // flow should not be processed, see command line args -I and -E + FS_FINISHED, // detection done and detection data free'd + FS_INFO, // detection in progress, detection data allocated + FS_COUNT +}; + +enum nDPId_flow_direction +{ + FD_SRC2DST = 0, + FD_DST2SRC, + FD_COUNT +}; + +struct nDPId_flow_analysis +{ + struct ndpi_analyze_struct iat; + struct ndpi_analyze_struct pktlen; + uint8_t * directions; + struct ndpi_bin payload_len_bin[FD_COUNT]; + float * entropies; +}; + +/* + * Minimal per-flow information required for flow mgmt and timeout handling. + */ +struct nDPId_flow_basic +{ + enum nDPId_flow_state state; + enum nDPId_l3_type l3_type; + uint64_t hashval; + union nDPId_ip src; + union nDPId_ip dst; + uint8_t l4_protocol; + uint8_t tcp_fin_rst_seen : 1; + uint8_t tcp_is_midstream_flow : 1; + uint8_t reserved_00 : 6; + uint8_t reserved_01[2]; + uint16_t src_port; + uint16_t dst_port; + uint64_t last_pkt_time[FD_COUNT]; +}; + +/* + * Information required for a full detection cycle. + */ +struct nDPId_flow_extended +{ + struct nDPId_flow_basic flow_basic; // Do not move this element! + + unsigned long long int flow_id; + + uint16_t min_l4_payload_len[FD_COUNT]; + uint16_t max_l4_payload_len[FD_COUNT]; + ; + + unsigned long long int packets_processed[FD_COUNT]; + uint64_t first_seen; + uint64_t last_flow_update; + + struct nDPId_flow_analysis * flow_analysis; + unsigned long long int total_l4_payload_len[FD_COUNT]; + struct ndpi_proto detected_l7_protocol; +}; +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L +_Static_assert(offsetof(struct nDPId_flow_extended, flow_basic) == 0, + "Offset of flow_basic is not zero any more. nDPId won't work anymore w/o changing it's core!"); +#endif + +/* + * Skipped flows need at least some information. + */ +struct nDPId_flow_skipped +{ + struct nDPId_flow_basic flow_basic; // Do not move this element! +}; +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L +_Static_assert(offsetof(struct nDPId_flow_skipped, flow_basic) == 0, + "Offset of flow_basic is not zero any more. nDPId won't work anymore w/o changing it's core!"); +#endif + +/* + * Structure which is important for the detection process. + * The structure is also a compression target, if activated. + */ +struct nDPId_detection_data +{ + uint32_t last_ndpi_flow_struct_hash; + struct ndpi_proto guessed_l7_protocol; + struct ndpi_flow_struct flow; +}; + +struct nDPId_flow +{ + struct nDPId_flow_extended flow_extended; // Do not move this element! + + union + { + struct + { + uint8_t detection_completed : 1; // Flow was detected. Detection updates may still occur. + uint8_t reserved_00 : 7; + uint8_t reserved_01[1]; +#ifdef ENABLE_ZLIB + uint16_t detection_data_compressed_size; +#endif + struct nDPId_detection_data * detection_data; + } info; + struct + { + ndpi_risk risk; + ndpi_confidence_t confidence; + char * hostname; + } finished; + }; +}; +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L +_Static_assert(offsetof(struct nDPId_flow, flow_extended) == 0, + "Offset of flow_extended is not zero any more. nDPId won't work anymore w/o changing it's core!"); +#endif + +struct nDPId_workflow +{ +#ifdef ENABLE_PFRING + struct npfring npf; +#endif + pcap_t * pcap_handle; + + MT_VALUE(error_or_eof, uint8_t); + uint8_t is_pcap_file; + + uint16_t error_count; + + unsigned long long int packets_captured; + unsigned long long int packets_processed; + unsigned long long int total_skipped_flows; + unsigned long long int total_l4_payload_len; + + unsigned long long int libnDPI_errors; + + unsigned long long int total_not_detected_flows; + unsigned long long int total_guessed_flows; + unsigned long long int total_detected_flows; + unsigned long long int total_flow_detection_updates; + unsigned long long int total_flow_updates; + +#ifdef ENABLE_MEMORY_PROFILING + uint64_t last_memory_usage_log_time; +#endif + +#ifdef ENABLE_ZLIB + uint64_t last_compression_scan_time; + uint64_t total_compressions; + uint64_t total_compression_diff; + uint64_t current_compression_diff; +#endif + + uint64_t last_scan_time; + uint64_t last_status_time; + uint64_t last_global_time; + uint64_t last_thread_time; + uint64_t last_error_time; + + void ** ndpi_flows_active; + unsigned long long int max_active_flows; + unsigned long long int cur_active_flows; + unsigned long long int total_active_flows; + + void ** ndpi_flows_idle; + unsigned long long int max_idle_flows; + unsigned long long int cur_idle_flows; + unsigned long long int total_idle_flows; + + unsigned long long int total_events_serialized; + + ndpi_serializer ndpi_serializer; + struct ndpi_detection_module_struct * ndpi_struct; +}; + +struct nDPId_reader_thread +{ + struct nDPId_workflow * workflow; + pthread_t thread; + int collector_sockfd; + int collector_sock_last_errno; + size_t array_index; +}; + +enum packet_event +{ + PACKET_EVENT_INVALID = 0, + + PACKET_EVENT_PAYLOAD, // A single packet that does not belong to a flow for whatever reasons. + // E.g. it could be malformed and thus no flow handling is done. + // There may be additional use-cases in the future. + PACKET_EVENT_PAYLOAD_FLOW, // Special case; A packet event that belongs to a flow but does not include all + // information a flow event requires. + + PACKET_EVENT_COUNT +}; + +enum flow_event +{ + FLOW_EVENT_INVALID = 0, + + FLOW_EVENT_NEW, + FLOW_EVENT_END, // TCP only: FIN/RST packet seen. + FLOW_EVENT_IDLE, // Flow timed out. + FLOW_EVENT_UPDATE, // Inform distributor applications about flows with a long lifetime. + FLOW_EVENT_ANALYSE, // Print information regarding a flow analysis, see `struct nDPId_flow_analysis'. + + FLOW_EVENT_GUESSED, + FLOW_EVENT_DETECTED, + FLOW_EVENT_DETECTION_UPDATE, // Some information in `struct ndpi_flow_struct' changed. + FLOW_EVENT_NOT_DETECTED, + + FLOW_EVENT_COUNT +}; + +enum error_event +{ + ERROR_EVENT_INVALID = 0, + + UNKNOWN_DATALINK_LAYER, + UNKNOWN_L3_PROTOCOL, + UNSUPPORTED_DATALINK_LAYER, + PACKET_TOO_SHORT, + PACKET_TYPE_UNKNOWN, + PACKET_HEADER_INVALID, + IP4_PACKET_TOO_SHORT, + IP4_SIZE_SMALLER_THAN_HEADER, + IP4_L4_PAYLOAD_DETECTION_FAILED, + IP6_PACKET_TOO_SHORT, // 10 + IP6_SIZE_SMALLER_THAN_HEADER, + IP6_L4_PAYLOAD_DETECTION_FAILED, + TCP_PACKET_TOO_SHORT, + UDP_PACKET_TOO_SHORT, + CAPTURE_SIZE_SMALLER_THAN_PACKET_SIZE, + MAX_FLOW_TO_TRACK, + FLOW_MEMORY_ALLOCATION_FAILED, + + ERROR_EVENT_COUNT // 17 +}; + +enum daemon_event +{ + DAEMON_EVENT_INVALID = 0, + + DAEMON_EVENT_INIT, + DAEMON_EVENT_RECONNECT, + DAEMON_EVENT_SHUTDOWN, + DAEMON_EVENT_STATUS, + + DAEMON_EVENT_COUNT +}; + +static char const * const flow_state_name_table[FS_COUNT] = { + [FS_UNKNOWN] = "unknown", [FS_SKIPPED] = "skipped", [FS_FINISHED] = "finished", [FS_INFO] = "info"}; + +static char const * const packet_event_name_table[PACKET_EVENT_COUNT] = { + [PACKET_EVENT_INVALID] = "invalid", [PACKET_EVENT_PAYLOAD] = "packet", [PACKET_EVENT_PAYLOAD_FLOW] = "packet-flow"}; + +static char const * const flow_event_name_table[FLOW_EVENT_COUNT] = {[FLOW_EVENT_INVALID] = "invalid", + [FLOW_EVENT_NEW] = "new", + [FLOW_EVENT_END] = "end", + [FLOW_EVENT_IDLE] = "idle", + [FLOW_EVENT_UPDATE] = "update", + [FLOW_EVENT_ANALYSE] = "analyse", + [FLOW_EVENT_GUESSED] = "guessed", + [FLOW_EVENT_DETECTED] = "detected", + [FLOW_EVENT_DETECTION_UPDATE] = "detection-update", + [FLOW_EVENT_NOT_DETECTED] = "not-detected"}; +static char const * const error_event_name_table[ERROR_EVENT_COUNT] = { + [ERROR_EVENT_INVALID] = "invalid", + [UNKNOWN_DATALINK_LAYER] = "Unknown datalink layer packet", + [UNKNOWN_L3_PROTOCOL] = "Unknown L3 protocol", + [UNSUPPORTED_DATALINK_LAYER] = "Unsupported datalink layer", + [PACKET_TOO_SHORT] = "Packet too short", + [PACKET_TYPE_UNKNOWN] = "Unknown packet type", + [PACKET_HEADER_INVALID] = "Packet header invalid", + [IP4_PACKET_TOO_SHORT] = "IP4 packet too short", + [IP4_SIZE_SMALLER_THAN_HEADER] = "Packet smaller than IP4 header", + [IP4_L4_PAYLOAD_DETECTION_FAILED] = "nDPI IPv4/L4 payload detection failed", + [IP6_PACKET_TOO_SHORT] = "IP6 packet too short", + [IP6_SIZE_SMALLER_THAN_HEADER] = "Packet smaller than IP6 header", + [IP6_L4_PAYLOAD_DETECTION_FAILED] = "nDPI IPv6/L4 payload detection failed", + [TCP_PACKET_TOO_SHORT] = "TCP packet smaller than expected", + [UDP_PACKET_TOO_SHORT] = "UDP packet smaller than expected", + [CAPTURE_SIZE_SMALLER_THAN_PACKET_SIZE] = "Captured packet size is smaller than expected packet size", + [MAX_FLOW_TO_TRACK] = "Max flows to track reached", + [FLOW_MEMORY_ALLOCATION_FAILED] = "Flow memory allocation failed", +}; + +static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = { + [DAEMON_EVENT_INVALID] = "invalid", + [DAEMON_EVENT_INIT] = "init", + [DAEMON_EVENT_RECONNECT] = "reconnect", + [DAEMON_EVENT_SHUTDOWN] = "shutdown", + [DAEMON_EVENT_STATUS] = "status", +}; + +static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {}; +static MT_VALUE(nDPId_main_thread_shutdown, int) = MT_INIT(0); +static MT_VALUE(global_flow_id, uint64_t) = MT_INIT(1); + +#ifdef ENABLE_MEMORY_STATUS +static MT_VALUE(ndpi_memory_alloc_count, uint64_t) = MT_INIT(0); +static MT_VALUE(ndpi_memory_alloc_bytes, uint64_t) = MT_INIT(0); +static MT_VALUE(ndpi_memory_free_count, uint64_t) = MT_INIT(0); +static MT_VALUE(ndpi_memory_free_bytes, uint64_t) = MT_INIT(0); +#ifdef ENABLE_ZLIB +static MT_VALUE(zlib_compressions, uint64_t) = MT_INIT(0); +static MT_VALUE(zlib_decompressions, uint64_t) = MT_INIT(0); +static MT_VALUE(zlib_compression_diff, uint64_t) = MT_INIT(0); +static MT_VALUE(zlib_compression_bytes, uint64_t) = MT_INIT(0); +#endif +#endif + +static struct +{ + /* options which are resolved automatically */ + struct nDPIsrvd_address parsed_collector_address; + union nDPId_ip pcap_dev_ip4, pcap_dev_ip6; + union nDPId_ip pcap_dev_netmask4, pcap_dev_netmask6; + union nDPId_ip pcap_dev_subnet4, pcap_dev_subnet6; + /* opts */ + struct cmdarg config_file; + struct cmdarg pcap_file_or_interface; + struct cmdarg bpf_str; + struct cmdarg pidfile; + struct cmdarg user; + struct cmdarg group; + struct cmdarg custom_risk_domain_file; + struct cmdarg custom_protocols_file; + struct cmdarg custom_categories_file; + struct cmdarg custom_ja3_file; + struct cmdarg custom_sha1_file; + struct cmdarg collector_address; + struct cmdarg instance_alias; + struct cmdarg process_internal_initial_direction; + struct cmdarg process_external_initial_direction; +#ifdef ENABLE_ZLIB + struct cmdarg enable_zlib_compression; +#endif + struct cmdarg enable_data_analysis; +#ifdef ENABLE_EPOLL + struct cmdarg use_poll; +#endif +#ifdef ENABLE_PFRING + struct cmdarg use_pfring; +#endif + /* subopts */ + struct cmdarg max_flows_per_thread; + struct cmdarg max_idle_flows_per_thread; + struct cmdarg reader_thread_count; + struct cmdarg daemon_status_interval; +#ifdef ENABLE_MEMORY_PROFILING + struct cmdarg memory_profiling_log_interval; +#endif +#ifdef ENABLE_ZLIB + struct cmdarg compression_scan_interval; + struct cmdarg compression_flow_inactivity; +#endif + struct cmdarg flow_scan_interval; + struct cmdarg generic_max_idle_time; + struct cmdarg icmp_max_idle_time; + struct cmdarg udp_max_idle_time; + struct cmdarg tcp_max_idle_time; + struct cmdarg tcp_max_post_end_flow_time; + struct cmdarg max_packets_per_flow_to_send; + struct cmdarg max_packets_per_flow_to_process; + struct cmdarg max_packets_per_flow_to_analyse; + struct cmdarg error_event_threshold_n; + struct cmdarg error_event_threshold_time; +} nDPId_options = {.config_file = CMDARG_STR(NULL), + .pcap_file_or_interface = CMDARG_STR(NULL), + .bpf_str = CMDARG_STR(NULL), + .pidfile = CMDARG_STR(nDPId_PIDFILE), + .user = CMDARG_STR(DEFAULT_CHUSER), + .group = CMDARG_STR(NULL), + .custom_risk_domain_file = CMDARG_STR(NULL), + .custom_protocols_file = CMDARG_STR(NULL), + .custom_categories_file = CMDARG_STR(NULL), + .custom_ja3_file = CMDARG_STR(NULL), + .custom_sha1_file = CMDARG_STR(NULL), + .collector_address = CMDARG_STR(COLLECTOR_UNIX_SOCKET), + .instance_alias = CMDARG_STR(NULL), + .process_internal_initial_direction = CMDARG_BOOL(0), + .process_external_initial_direction = CMDARG_BOOL(0), +#ifdef ENABLE_ZLIB + .enable_zlib_compression = CMDARG_BOOL(0), +#endif + .enable_data_analysis = CMDARG_BOOL(0), +#ifdef ENABLE_EPOLL + .use_poll = CMDARG_BOOL(0), +#endif +#ifdef ENABLE_PFRING + .use_pfring = CMDARG_BOOL(0), +#endif + .max_flows_per_thread = CMDARG_ULL(nDPId_MAX_FLOWS_PER_THREAD / 2), + .max_idle_flows_per_thread = CMDARG_ULL(nDPId_MAX_IDLE_FLOWS_PER_THREAD / 2), +#ifdef CROSS_COMPILATION + /* + * We are assuming that in the cross compilation case + * our target system is an embedded one with not much memory available. + * To further reduce memory consumption caused by allocating nDPId / nDPI workflows per thread, + * we set the default reader thread count to two. + */ + .reader_thread_count = CMDARG_ULL(2), +#else + .reader_thread_count = CMDARG_ULL(nDPId_MAX_READER_THREADS / 3), +#endif + .daemon_status_interval = CMDARG_ULL(nDPId_DAEMON_STATUS_INTERVAL), +#ifdef ENABLE_MEMORY_PROFILING + .memory_profiling_log_interval = CMDARG_ULL(nDPId_MEMORY_PROFILING_LOG_INTERVAL), +#endif +#ifdef ENABLE_ZLIB + .compression_scan_interval = CMDARG_ULL(nDPId_COMPRESSION_SCAN_INTERVAL), + .compression_flow_inactivity = CMDARG_ULL(nDPId_COMPRESSION_FLOW_INACTIVITY), +#endif + .flow_scan_interval = CMDARG_ULL(nDPId_FLOW_SCAN_INTERVAL), + .generic_max_idle_time = CMDARG_ULL(nDPId_GENERIC_IDLE_TIME), + .icmp_max_idle_time = CMDARG_ULL(nDPId_ICMP_IDLE_TIME), + .udp_max_idle_time = CMDARG_ULL(nDPId_UDP_IDLE_TIME), + .tcp_max_idle_time = CMDARG_ULL(nDPId_TCP_IDLE_TIME), + .tcp_max_post_end_flow_time = CMDARG_ULL(nDPId_TCP_POST_END_FLOW_TIME), + .max_packets_per_flow_to_send = CMDARG_ULL(nDPId_PACKETS_PER_FLOW_TO_SEND), + .max_packets_per_flow_to_process = CMDARG_ULL(nDPId_PACKETS_PER_FLOW_TO_PROCESS), + .max_packets_per_flow_to_analyse = CMDARG_ULL(nDPId_PACKETS_PER_FLOW_TO_ANALYZE), + .error_event_threshold_n = CMDARG_ULL(nDPId_ERROR_EVENT_THRESHOLD_N), + .error_event_threshold_time = CMDARG_ULL(nDPId_ERROR_EVENT_THRESHOLD_TIME)}; +struct confopt general_config_map[] = {CONFOPT("netif", &nDPId_options.pcap_file_or_interface), + CONFOPT("bpf", &nDPId_options.bpf_str), + CONFOPT("pidfile", &nDPId_options.pidfile), + CONFOPT("user", &nDPId_options.user), + CONFOPT("group", &nDPId_options.group), + CONFOPT("riskdomains", &nDPId_options.custom_risk_domain_file), + CONFOPT("protocols", &nDPId_options.custom_protocols_file), + CONFOPT("categories", &nDPId_options.custom_categories_file), + CONFOPT("ja3", &nDPId_options.custom_ja3_file), + CONFOPT("sha1", &nDPId_options.custom_sha1_file), + CONFOPT("collector", &nDPId_options.collector_address), + CONFOPT("alias", &nDPId_options.instance_alias), + CONFOPT("internal", &nDPId_options.process_internal_initial_direction), + CONFOPT("external", &nDPId_options.process_external_initial_direction), +#ifdef ENABLE_ZLIB + CONFOPT("compression", &nDPId_options.enable_zlib_compression), +#endif + CONFOPT("analysis", &nDPId_options.enable_data_analysis), +#ifdef ENABLE_EPOLL + CONFOPT("poll", &nDPId_options.use_poll), +#endif +#ifdef ENABLE_PFRING + CONFOPT("pfring", &nDPId_options.use_pfring) +#endif +}; +struct confopt tuning_config_map[] = { + CONFOPT("max-flows-per-thread", &nDPId_options.max_flows_per_thread), + CONFOPT("max-idle-flows-per-thread", &nDPId_options.max_idle_flows_per_thread), + CONFOPT("max-reader-threads", &nDPId_options.reader_thread_count), + CONFOPT("daemon-status-interval", &nDPId_options.daemon_status_interval), +#ifdef ENABLE_MEMORY_PROFILING + CONFOPT("memory-profiling-log-interval", &nDPId_options.memory_profiling_log_interval), +#endif +#ifdef ENABLE_ZLIB + CONFOPT("compression-scan-interval", &nDPId_options.compression_scan_interval), + CONFOPT("compression-flow-inactivity", &nDPId_options.compression_flow_inactivity), +#endif + CONFOPT("flow-scan-interval", &nDPId_options.flow_scan_interval), + CONFOPT("generic-max-idle-time", &nDPId_options.generic_max_idle_time), + CONFOPT("icmp-max-idle-time", &nDPId_options.icmp_max_idle_time), + CONFOPT("udp-max-idle-time", &nDPId_options.udp_max_idle_time), + CONFOPT("tcp-max-idle-time", &nDPId_options.tcp_max_idle_time), + CONFOPT("tcp-max-post-end-flow-time", &nDPId_options.tcp_max_post_end_flow_time), + CONFOPT("max-packets-per-flow-to-send", &nDPId_options.max_packets_per_flow_to_send), + CONFOPT("max-packets-per-flow-to-process", &nDPId_options.max_packets_per_flow_to_process), + CONFOPT("max-packets-per-flow-to-analyse", &nDPId_options.max_packets_per_flow_to_analyse), + CONFOPT("error-event-threshold-n", &nDPId_options.error_event_threshold_n), + CONFOPT("error-event-threshold-time", &nDPId_options.error_event_threshold_time), +}; + +static void sighandler(int signum); +static WARN_UNUSED int processing_threads_error_or_eof(void); +static void free_workflow(struct nDPId_workflow ** const workflow); +static void serialize_and_send(struct nDPId_reader_thread * const reader_thread); +static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, + struct nDPId_flow_extended * const flow_ext, + enum flow_event event); +static void jsonize_flow_detection_event(struct nDPId_reader_thread * const reader_thread, + struct nDPId_flow * const flow, + enum flow_event event); + +static int set_collector_nonblock(struct nDPId_reader_thread * const reader_thread) +{ + int current_flags; + + while ((current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0)) == -1 && errno == EINTR) {} + if (current_flags == -1) {} + + while ((current_flags = fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags | O_NONBLOCK)) == -1 && + errno == EINTR) + { + // Retry if interrupted by a signal. + } + if (current_flags == -1) + { + reader_thread->collector_sock_last_errno = errno; + logger(1, + "[%8llu] Could not set collector fd %d to non-blocking mode: %s", + reader_thread->workflow->packets_processed, + reader_thread->collector_sockfd, + strerror(errno)); + return 1; + } + + return 0; +} + +static int set_collector_block(struct nDPId_reader_thread * const reader_thread) +{ + int current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0); + + if (current_flags == -1 || fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags & ~O_NONBLOCK) == -1) + { + reader_thread->collector_sock_last_errno = errno; + logger(1, + "[%8llu] Could not set collector fd %d to blocking mode: %s", + reader_thread->workflow->packets_processed, + reader_thread->collector_sockfd, + strerror(errno)); + return 1; + } + + return 0; +} + +u_int8_t plen2slot(u_int16_t plen) +{ + if (plen > nDPId_ANALYZE_PLEN_MAX) + { + return nDPId_ANALYZE_PLEN_NUM_BINS - 1; + } + else + { + return plen / nDPId_ANALYZE_PLEN_BIN_LEN; + } +} + +static uint64_t get_last_pkt_time(struct nDPId_flow_basic const * const flow_basic) +{ + return ndpi_max(flow_basic->last_pkt_time[FD_SRC2DST], flow_basic->last_pkt_time[FD_DST2SRC]); +} + +static uint64_t timer_sub(uint64_t a, uint64_t b) +{ + if (b > a) + { + return 0; + } + + return a - b; +} + +#ifdef ENABLE_ZLIB +static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen) +{ + z_stream strm = {0}; + strm.total_in = strm.avail_in = srcLen; + strm.total_out = strm.avail_out = dstLen; + strm.next_in = (Bytef *)src; + strm.next_out = (Bytef *)dst; + + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + + int err = -1; + int ret = -1; + + err = deflateInit2(&strm, Z_BEST_COMPRESSION, Z_BINARY, 15, 9, Z_HUFFMAN_ONLY); + if (err != Z_OK) + { + err = deflateInit(&strm, Z_BEST_COMPRESSION); + } + if (err == Z_OK) + { + err = deflate(&strm, Z_FINISH); + if (err == Z_STREAM_END) + { + ret = strm.total_out; +#ifdef ENABLE_MEMORY_STATUS + MT_GET_AND_ADD(zlib_compressions, 1); + MT_GET_AND_ADD(zlib_compression_diff, srcLen - ret); + MT_GET_AND_ADD(zlib_compression_bytes, ret); +#endif + } + else + { + deflateEnd(&strm); + return err; + } + } + else + { + deflateEnd(&strm); + return err; + } + + deflateEnd(&strm); + return ret; +} + +static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen) +{ + z_stream strm = {0}; + strm.total_in = strm.avail_in = srcLen; + strm.total_out = strm.avail_out = dstLen; + strm.next_in = (Bytef *)src; + strm.next_out = (Bytef *)dst; + + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + + int err = -1; + int ret = -1; + + err = inflateInit2(&strm, (15 + 32)); // 15 window bits, and the +32 tells zlib to to detect if using gzip or zlib + if (err == Z_OK) + { + err = inflate(&strm, Z_FINISH); + if (err == Z_STREAM_END) + { + ret = strm.total_out; +#ifdef ENABLE_MEMORY_STATUS + MT_GET_AND_ADD(zlib_decompressions, 1); + MT_GET_AND_SUB(zlib_compression_diff, ret - srcLen); +#endif + } + else + { + inflateEnd(&strm); + return err; + } + } + else + { + inflateEnd(&strm); + return err; + } + + inflateEnd(&strm); + return ret; +} + +static int detection_data_deflate(struct nDPId_flow * const flow) +{ + uint8_t tmpOut[sizeof(*flow->info.detection_data)]; + int ret; + + if (flow->info.detection_data_compressed_size > 0) + { + return -7; + } + + ret = zlib_deflate(flow->info.detection_data, sizeof(*flow->info.detection_data), tmpOut, sizeof(tmpOut)); + if (ret <= 0) + { + return ret; + } + + struct nDPId_detection_data * const new_det_data = ndpi_malloc(ret); + if (new_det_data == NULL) + { + return -8; + } + ndpi_free(flow->info.detection_data); + flow->info.detection_data = new_det_data; + + memcpy(flow->info.detection_data, tmpOut, ret); + flow->info.detection_data_compressed_size = ret; + + return ret; +} + +static int detection_data_inflate(struct nDPId_flow * const flow) +{ + uint8_t tmpOut[sizeof(*flow->info.detection_data)]; + int ret; + + if (flow->info.detection_data_compressed_size == 0) + { + return -7; + } + + ret = zlib_inflate(flow->info.detection_data, flow->info.detection_data_compressed_size, tmpOut, sizeof(tmpOut)); + if (ret <= 0) + { + return ret; + } + + struct nDPId_detection_data * const new_det_data = ndpi_malloc(ret); + if (new_det_data == NULL) + { + return -8; + } + ndpi_free(flow->info.detection_data); + flow->info.detection_data = new_det_data; + + memcpy(flow->info.detection_data, tmpOut, ret); + flow->info.detection_data_compressed_size = 0; + + return ret; +} + +static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) +{ + struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data; + struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; + + (void)depth; + + if (workflow == NULL || flow_basic == NULL) + { + return; + } + + if (which == ndpi_preorder || which == ndpi_leaf) + { + switch (flow_basic->state) + { + case FS_UNKNOWN: + case FS_COUNT: + + case FS_SKIPPED: + case FS_FINISHED: + break; + + case FS_INFO: + { + if (get_last_pkt_time(flow_basic) + GET_CMDARG_ULL(nDPId_options.compression_flow_inactivity) < + workflow->last_thread_time) + { + struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic; + + if (flow->info.detection_data_compressed_size > 0) + { + break; + } + + int ret = detection_data_deflate(flow); + + if (ret <= 0) + { + logger(1, + "zLib compression failed for flow %llu with error code: %d", + flow->flow_extended.flow_id, + ret); + } + else + { + workflow->total_compressions++; + workflow->total_compression_diff += ret; + workflow->current_compression_diff += ret; + } + } + break; + } + } + } +} + +static void check_for_compressable_flows(struct nDPId_reader_thread * const reader_thread) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + + if (workflow->last_compression_scan_time + GET_CMDARG_ULL(nDPId_options.compression_scan_interval) < + workflow->last_thread_time) + { + for (size_t comp_scan_index = 0; comp_scan_index < workflow->max_active_flows; ++comp_scan_index) + { + ndpi_twalk(workflow->ndpi_flows_active[comp_scan_index], ndpi_comp_scan_walker, workflow); + } + + workflow->last_compression_scan_time = workflow->last_thread_time; + } +} +#endif + +static void ip_netmask_to_subnet(union nDPId_ip const * const ip, + union nDPId_ip const * const netmask, + union nDPId_ip * const subnet, + enum nDPId_l3_type type) +{ + switch (type) + { + case L3_IP: + subnet->v4.ip = ip->v4.ip & netmask->v4.ip; + break; + case L3_IP6: + subnet->v6.ip[0] = ip->v6.ip[0] & netmask->v6.ip[0]; + subnet->v6.ip[1] = ip->v6.ip[1] & netmask->v6.ip[1]; + break; + } +} + +static int is_ip_in_subnet(union nDPId_ip const * const cmp_ip, + union nDPId_ip const * const netmask, + union nDPId_ip const * const cmp_subnet, + enum nDPId_l3_type const type) +{ + switch (type) + { + case L3_IP: + return (cmp_ip->v4.ip & netmask->v4.ip) == cmp_subnet->v4.ip; + case L3_IP6: + return (cmp_ip->v6.ip[0] & netmask->v6.ip[0]) == cmp_subnet->v6.ip[0] && + (cmp_ip->v6.ip[1] & netmask->v6.ip[1]) == cmp_subnet->v6.ip[1]; + } + + return 0; +} + +static void get_ip4_from_sockaddr(struct sockaddr_in const * const saddr, union nDPId_ip * dest) +{ + switch (saddr->sin_family) + { + case AF_INET: + dest->v4.ip = saddr->sin_addr.s_addr; + break; + case AF_INET6: + return; + } +} + +static void get_ip6_from_sockaddr(struct sockaddr_in6 const * const saddr, union nDPId_ip * dest) +{ + switch (saddr->sin6_family) + { + case AF_INET6: +#if defined(__FreeBSD__) || defined(__APPLE__) + dest->v6.ip_u32[0] = saddr->sin6_addr.__u6_addr.__u6_addr32[0]; + dest->v6.ip_u32[1] = saddr->sin6_addr.__u6_addr.__u6_addr32[1]; + dest->v6.ip_u32[2] = saddr->sin6_addr.__u6_addr.__u6_addr32[2]; + dest->v6.ip_u32[3] = saddr->sin6_addr.__u6_addr.__u6_addr32[3]; +#else + dest->v6.ip_u32[0] = saddr->sin6_addr.s6_addr32[0]; + dest->v6.ip_u32[1] = saddr->sin6_addr.s6_addr32[1]; + dest->v6.ip_u32[2] = saddr->sin6_addr.s6_addr32[2]; + dest->v6.ip_u32[3] = saddr->sin6_addr.s6_addr32[3]; +#endif + break; + default: + return; + } +} + +static void get_ip6_address_and_netmask(struct ifaddrs const * const ifaddr) +{ + get_ip6_from_sockaddr((struct sockaddr_in6 *)ifaddr->ifa_netmask, &nDPId_options.pcap_dev_netmask6); + get_ip6_from_sockaddr((struct sockaddr_in6 *)ifaddr->ifa_addr, &nDPId_options.pcap_dev_ip6); + ip_netmask_to_subnet(&nDPId_options.pcap_dev_ip6, + &nDPId_options.pcap_dev_netmask6, + &nDPId_options.pcap_dev_subnet6, + L3_IP6); + { + char addr[INET6_ADDRSTRLEN]; + char netm[INET6_ADDRSTRLEN]; + char subn[INET6_ADDRSTRLEN]; + void * saddr = &nDPId_options.pcap_dev_ip6.v6.ip; + void * snetm = &nDPId_options.pcap_dev_netmask6.v6.ip; + void * ssubn = &nDPId_options.pcap_dev_subnet6.v6.ip; + logger(0, + "%s IPv6 address netmask subnet: %s %s %s", + GET_CMDARG_STR(nDPId_options.pcap_file_or_interface), + inet_ntop(AF_INET6, saddr, addr, sizeof(addr)), + inet_ntop(AF_INET6, snetm, netm, sizeof(netm)), + inet_ntop(AF_INET6, ssubn, subn, sizeof(subn))); + } +} + +static void get_ip4_address_and_netmask(struct ifaddrs const * const ifaddr) +{ + get_ip4_from_sockaddr((struct sockaddr_in *)ifaddr->ifa_netmask, &nDPId_options.pcap_dev_netmask4); + get_ip4_from_sockaddr((struct sockaddr_in *)ifaddr->ifa_addr, &nDPId_options.pcap_dev_ip4); + ip_netmask_to_subnet(&nDPId_options.pcap_dev_ip4, + &nDPId_options.pcap_dev_netmask4, + &nDPId_options.pcap_dev_subnet4, + L3_IP); + { + char addr[INET_ADDRSTRLEN]; + char netm[INET_ADDRSTRLEN]; + char subn[INET_ADDRSTRLEN]; + void * saddr = &nDPId_options.pcap_dev_ip4.v4.ip; + void * snetm = &nDPId_options.pcap_dev_netmask4.v4.ip; + void * ssubn = &nDPId_options.pcap_dev_subnet4.v4.ip; + logger(0, + "%s IPv4 address netmask subnet: %s %s %s", + GET_CMDARG_STR(nDPId_options.pcap_file_or_interface), + inet_ntop(AF_INET, saddr, addr, sizeof(addr)), + inet_ntop(AF_INET, snetm, netm, sizeof(netm)), + inet_ntop(AF_INET, ssubn, subn, sizeof(subn))); + } +} + +static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev) +{ + int retval = 0, found_dev = 0; + int ip4_interface_avail = 0; + int ip6_interface_avail = 0; + struct ifaddrs * ifaddrs = NULL; + struct ifaddrs * ifa; + + if (getifaddrs(&ifaddrs) != 0 || ifaddrs == NULL) + { + int saved_errno = errno; + logger_early(1, "Interface retrieval failed with: %s", strerror(saved_errno)); + errno = saved_errno; + return 1; + } + + for (ifa = ifaddrs; ifa != NULL; ifa = ifa->ifa_next) + { + if (ifa->ifa_addr == NULL || (ifa->ifa_flags & IFF_RUNNING) == 0) + { + continue; + } + + size_t ifnamelen = strnlen(ifa->ifa_name, IFNAMSIZ); + if (strncmp(ifa->ifa_name, pcap_dev, IFNAMSIZ) == 0 && ifnamelen == strnlen(pcap_dev, IFNAMSIZ)) + { + found_dev = 1; + switch (ifa->ifa_addr->sa_family) + { + case AF_INET: + get_ip4_address_and_netmask(ifa); + ip4_interface_avail = 1; + break; + case AF_INET6: + get_ip6_address_and_netmask(ifa); + ip6_interface_avail = 1; + break; + default: + break; + } + } + } + + if (retval == 0 && found_dev != 0 && + (GET_CMDARG_BOOL(nDPId_options.process_internal_initial_direction) != 0 || + GET_CMDARG_BOOL(nDPId_options.process_external_initial_direction) != 0) && + ip4_interface_avail == 0 && ip6_interface_avail == 0) + { + logger_early(1, "Interface %s does not have any IPv4 / IPv6 address set, -I / -E won't work.", pcap_dev); + retval = 1; + } + + freeifaddrs(ifaddrs); + return retval; +} + +#ifdef ENABLE_MEMORY_STATUS +static void * ndpi_malloc_wrapper(size_t const size) +{ + void * p = malloc(sizeof(uint64_t) + size); + + if (p == NULL) + { + return NULL; + } + *(uint64_t *)p = size; + + MT_GET_AND_ADD(ndpi_memory_alloc_count, 1); + MT_GET_AND_ADD(ndpi_memory_alloc_bytes, size); + + return (uint8_t *)p + sizeof(uint64_t); +} + +static void ndpi_free_wrapper(void * const freeable) +{ + void * p = (uint8_t *)freeable - sizeof(uint64_t); + + MT_GET_AND_ADD(ndpi_memory_free_count, 1); + MT_GET_AND_ADD(ndpi_memory_free_bytes, *(uint64_t *)p); + + free(p); +} +#endif + +#ifdef ENABLE_MEMORY_PROFILING +static void log_memory_usage(struct nDPId_reader_thread const * const reader_thread) +{ + if (reader_thread->array_index == 0) + { + uint64_t alloc_count = MT_GET_AND_ADD(ndpi_memory_alloc_count, 0); + uint64_t free_count = MT_GET_AND_ADD(ndpi_memory_free_count, 0); + uint64_t alloc_bytes = MT_GET_AND_ADD(ndpi_memory_alloc_bytes, 0); + uint64_t free_bytes = MT_GET_AND_ADD(ndpi_memory_free_bytes, 0); + + logger(0, + "MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in " + "use, " + "%llu bytes in use", + (long long unsigned int)alloc_count, + (long long unsigned int)free_count, + (long long unsigned int)alloc_bytes, + (long long unsigned int)free_bytes, + (long long unsigned int)(alloc_count - free_count), + (long long unsigned int)(alloc_bytes - free_bytes)); +#ifdef ENABLE_ZLIB + uint64_t zlib_compression_count = MT_GET_AND_ADD(zlib_compressions, 0); + uint64_t zlib_decompression_count = MT_GET_AND_ADD(zlib_decompressions, 0); + uint64_t zlib_bytes_diff = MT_GET_AND_ADD(zlib_compression_diff, 0); + uint64_t zlib_bytes_total = MT_GET_AND_ADD(zlib_compression_bytes, 0); + + logger(0, + "MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu compressed blocks in use, %llu " + "bytes diff, %llu bytes total compressed", + (long long unsigned int)zlib_compression_count, + (long long unsigned int)zlib_decompression_count, + (long long unsigned int)zlib_compression_count - (long long unsigned int)zlib_decompression_count, + (long long unsigned int)zlib_bytes_diff, + (long long unsigned int)zlib_bytes_total); +#endif + } +} +#endif + +static void ndpi_debug_printf(unsigned int proto, + struct ndpi_detection_module_struct * ndpi_struct, + ndpi_log_level_t log_level, + const char * file_name, + const char * func_name, + unsigned int line_number, + const char * format, + ...) +{ + va_list vl; + int written, is_log_err = 0; + char buf[128]; + struct nDPId_workflow * const workflow = (struct nDPId_workflow *)ndpi_get_user_data(ndpi_struct); + + va_start(vl, format); + if ((written = vsnprintf(buf, nDPIsrvd_ARRAY_LENGTH(buf), format, vl)) >= (int)nDPIsrvd_ARRAY_LENGTH(buf)) + { + logger(1, + "[libnDPI] Logging failure due to buffer size; current: %zu, required: %d", + nDPIsrvd_ARRAY_LENGTH(buf), + written); + } + va_end(vl); + + switch (log_level) + { + case NDPI_LOG_ERROR: + workflow->libnDPI_errors++; + is_log_err = 1; + break; + case NDPI_LOG_TRACE: + is_log_err = 1; + break; + case NDPI_LOG_DEBUG: + case NDPI_LOG_DEBUG_EXTRA: + is_log_err = 0; + break; + } + + logger(is_log_err, + "[libnDPI@%s.%s.%u] protocol %u.%s: %s", + file_name, + func_name, + line_number, + proto, + ndpi_get_proto_name(ndpi_struct, proto), + buf); +} + +static char const * cfg_err2str(ndpi_cfg_error err) +{ + switch (err) + { + case NDPI_CFG_INVALID_CONTEXT: + return "Invalid context"; + case NDPI_CFG_NOT_FOUND: + return "Not found"; + case NDPI_CFG_INVALID_PARAM: + return "Invalid parameter"; + case NDPI_CFG_CONTEXT_ALREADY_INITIALIZED: + return "Context already initialized"; + case NDPI_CFG_CALLBACK_ERROR: + return "Callback error"; + case NDPI_CFG_OK: + return "Success"; + } + + return "Unknown error"; +} + +static int cfg_set_u64(struct nDPId_workflow * const workflow, + char const * const proto, + char const * const param, + uint64_t const value) +{ + ndpi_cfg_error cfg_err; + + cfg_err = ndpi_set_config_u64(workflow->ndpi_struct, proto, param, value); + if (cfg_err != NDPI_CFG_OK) + { + logger_early(1, "Could not set nDPI configuration (numeric value): %s", cfg_err2str(cfg_err)); + return 1; + } + + return 0; +} + +static int cfg_set(struct nDPId_workflow * const workflow, const char * proto, const char * param, const char * value) +{ + ndpi_cfg_error cfg_err; + + cfg_err = ndpi_set_config(workflow->ndpi_struct, proto, param, value); + if (cfg_err != NDPI_CFG_OK) + { + if (proto != NULL) + { + logger_early(1, + "Could not set nDPI configuration for protocol `%s' with key `%s' and value `%s': %s", + proto, + param, + value, + cfg_err2str(cfg_err)); + } + else + { + logger_early(1, + "Could not set nDPI configuration for key `%s' with value `%s': %s", + param, + value, + cfg_err2str(cfg_err)); + } + return 1; + } + + return 0; +} + +static int libnDPI_parsed_config_line( + int lineno, char const * const section, char const * const name, char const * const value, void * const user_data) +{ + struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data; + + if ((strnlen(section, INI_MAX_SECTION) == nDPIsrvd_STRLEN_SZ("general") && + strncmp(section, "general", INI_MAX_SECTION) == 0) || + (strnlen(section, INI_MAX_SECTION) == nDPIsrvd_STRLEN_SZ("tuning") && + strncmp(section, "tuning", INI_MAX_SECTION) == 0)) + { + // Nothing to do here right now (already initialized) + return 1; + } + else if (strnlen(section, INI_MAX_SECTION) == nDPIsrvd_STRLEN_SZ("ndpi") && + strncmp(section, "ndpi", INI_MAX_SECTION) == 0) + { + return (cfg_set(workflow, NULL, name, value) == 0); + } + else if (strnlen(section, INI_MAX_SECTION) == nDPIsrvd_STRLEN_SZ("protos") && + strncmp(section, "protos", INI_MAX_SECTION) == 0) + { + char const * const first_sep = strchr(name, '.'); + char proto[INI_MAX_NAME]; + + if (first_sep == NULL) + { + logger_early(1, + "Missing first `.' for section `protos' at line %d with key `%s' and value `%s'", + lineno, + name, + value); + return 0; + } + int s_ret = snprintf(proto, sizeof(proto), "%.*s", (int)(first_sep - name), name); + if (s_ret < 0) + { + logger_early(1, + "Could not format protocol at line %d with key `%s' and value `%s': snprintf returnded %d, " + "buffer size %zu", + lineno, + name, + value, + s_ret, + sizeof(proto)); + return 0; + } + + return (cfg_set(workflow, proto, first_sep + 1, value) == 0); + } + else + { + logger_early( + 1, "Invalid config section `%s' at line %d with key `%s' and value `%s'", section, lineno, name, value); + } + + return 1; +} + +static struct nDPId_workflow * init_workflow(char const * const file_or_device) +{ + char pcap_error_buffer[PCAP_ERRBUF_SIZE]; + struct nDPId_workflow * workflow; + +#ifdef ENABLE_MEMORY_STATUS + set_ndpi_malloc(ndpi_malloc_wrapper); + set_ndpi_free(ndpi_free_wrapper); + set_ndpi_flow_malloc(NULL); + set_ndpi_flow_free(NULL); +#endif + + workflow = (struct nDPId_workflow *)ndpi_calloc(1, sizeof(*workflow)); + if (workflow == NULL) + { + return NULL; + } + + MT_INIT2(workflow->error_or_eof, 0); + +#ifdef ENABLE_PFRING + if (GET_CMDARG_BOOL(nDPId_options.use_pfring) != 0) + { + errno = 0; + + if (npfring_init(file_or_device, PFRING_BUFFER_SIZE, &workflow->npf) != 0) + { + logger_early(1, "PF_RING open device %s failed: %s", file_or_device, strerror(errno)); + free_workflow(&workflow); + return NULL; + } + + if (IS_CMDARG_SET(nDPId_options.bpf_str) != 0) + { + if (npfring_set_bpf(&workflow->npf, GET_CMDARG_STR(nDPId_options.bpf_str)) != 0) + { + logger_early(1, "%s", "PF_RING set bpf filter failed"); + free_workflow(&workflow); + return NULL; + } + } + } + else +#endif + { + errno = 0; + + if (access(file_or_device, R_OK) != 0 && errno == ENOENT) + { + workflow->pcap_handle = pcap_open_live(file_or_device, 65535, 1, 250, pcap_error_buffer); + } + else + { + workflow->pcap_handle = + pcap_open_offline_with_tstamp_precision(file_or_device, PCAP_TSTAMP_PRECISION_MICRO, pcap_error_buffer); + workflow->is_pcap_file = 1; + } + + if (workflow->pcap_handle == NULL) + { + logger_early(1, + (workflow->is_pcap_file == 0 ? "pcap_open_live: %.*s" + : "pcap_open_offline_with_tstamp_precision: %.*s"), + (int)PCAP_ERRBUF_SIZE, + pcap_error_buffer); + free_workflow(&workflow); + return NULL; + } + + if (workflow->is_pcap_file == 0 && pcap_setnonblock(workflow->pcap_handle, 1, pcap_error_buffer) == PCAP_ERROR) + { + logger_early(1, "pcap_setnonblock: %.*s", (int)PCAP_ERRBUF_SIZE, pcap_error_buffer); + free_workflow(&workflow); + return NULL; + } + + if (IS_CMDARG_SET(nDPId_options.bpf_str) != 0) + { + struct bpf_program fp; + if (pcap_compile( + workflow->pcap_handle, &fp, GET_CMDARG_STR(nDPId_options.bpf_str), 1, PCAP_NETMASK_UNKNOWN) != 0) + { + logger_early(1, "pcap_compile: %s", pcap_geterr(workflow->pcap_handle)); + free_workflow(&workflow); + return NULL; + } + if (pcap_setfilter(workflow->pcap_handle, &fp) != 0) + { + logger_early(1, "pcap_setfilter: %s", pcap_geterr(workflow->pcap_handle)); + free_workflow(&workflow); + pcap_freecode(&fp); + return NULL; + } + pcap_freecode(&fp); + } + } + +#ifdef ENABLE_PFRING + if (GET_CMDARG_BOOL(nDPId_options.use_pfring) != 0) + { + if (npfring_enable(&workflow->npf) != 0) + { + logger_early(1, "%s", "Could not enable PF_RING"); + free_workflow(&workflow); + return NULL; + } + } +#endif + + workflow->ndpi_struct = ndpi_init_detection_module(NULL); + if (workflow->ndpi_struct == NULL) + { + logger_early(1, "%s", "BUG: Could not init ndpi detection module"); + free_workflow(&workflow); + return NULL; + } + + ndpi_set_user_data(workflow->ndpi_struct, workflow); + set_ndpi_debug_function(workflow->ndpi_struct, ndpi_debug_printf); + + { + int ret; + + if (IS_CMDARG_SET(nDPId_options.config_file) != 0 && + (ret = + parse_config_file(GET_CMDARG_STR(nDPId_options.config_file), libnDPI_parsed_config_line, workflow)) != + 0) + { + if (ret > 0) + { + logger_early(1, "Config file `%s' is malformed", GET_CMDARG_STR(nDPId_options.config_file)); + } + else if (ret == -ENOENT) + { + logger_early(1, "Path `%s' is not a regular file", GET_CMDARG_STR(nDPId_options.config_file)); + } + else + { + logger_early(1, + "Could not open file `%s' for reading: %s", + GET_CMDARG_STR(nDPId_options.config_file), + strerror(errno)); + } + free_workflow(&workflow); + return NULL; + } + } + + cfg_set_u64(workflow, NULL, "log.level", 3); + cfg_set_u64(workflow, + NULL, + "packets_limit_per_flow", + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_process)); + cfg_set_u64(workflow, "tls", "application_blocks_tracking", 1); + cfg_set_u64(workflow, "tls", "certificate_expiration_threshold", 5); + + workflow->total_skipped_flows = 0; + workflow->total_active_flows = 0; + workflow->max_active_flows = GET_CMDARG_ULL(nDPId_options.max_flows_per_thread); + workflow->ndpi_flows_active = (void **)ndpi_calloc(workflow->max_active_flows, sizeof(void *)); + if (workflow->ndpi_flows_active == NULL) + { + logger_early(1, + "Could not allocate %llu bytes for (active) flow tracking", + workflow->max_active_flows * sizeof(void *)); + free_workflow(&workflow); + return NULL; + } + + workflow->total_idle_flows = 0; + workflow->max_idle_flows = GET_CMDARG_ULL(nDPId_options.max_idle_flows_per_thread); + workflow->ndpi_flows_idle = (void **)ndpi_calloc(workflow->max_idle_flows, sizeof(void *)); + if (workflow->ndpi_flows_idle == NULL) + { + logger_early(1, + "Could not allocate %llu bytes for (idle) flow tracking", + workflow->max_idle_flows * sizeof(void *)); + free_workflow(&workflow); + return NULL; + } + + NDPI_PROTOCOL_BITMASK protos; + NDPI_BITMASK_SET_ALL(protos); + ndpi_set_protocol_detection_bitmask2(workflow->ndpi_struct, &protos); + if (IS_CMDARG_SET(nDPId_options.custom_risk_domain_file) != 0) + { + ndpi_load_risk_domain_file(workflow->ndpi_struct, GET_CMDARG_STR(nDPId_options.custom_risk_domain_file)); + } + if (IS_CMDARG_SET(nDPId_options.custom_protocols_file) != 0) + { + ndpi_load_protocols_file(workflow->ndpi_struct, GET_CMDARG_STR(nDPId_options.custom_protocols_file)); + } + if (IS_CMDARG_SET(nDPId_options.custom_categories_file) != 0) + { + ndpi_load_categories_file(workflow->ndpi_struct, GET_CMDARG_STR(nDPId_options.custom_categories_file), NULL); + } + if (IS_CMDARG_SET(nDPId_options.custom_ja3_file) != 0) + { + ndpi_load_malicious_ja3_file(workflow->ndpi_struct, GET_CMDARG_STR(nDPId_options.custom_ja3_file)); + } + if (IS_CMDARG_SET(nDPId_options.custom_sha1_file) != 0) + { + ndpi_load_malicious_sha1_file(workflow->ndpi_struct, GET_CMDARG_STR(nDPId_options.custom_sha1_file)); + } + ndpi_finalize_initialization(workflow->ndpi_struct); + + if (ndpi_init_serializer_ll(&workflow->ndpi_serializer, ndpi_serialization_format_json, NETWORK_BUFFER_MAX_SIZE) != + 0) + { + logger_early(1, "BUG: Could not init JSON serializer with buffer size: %u bytes", NETWORK_BUFFER_MAX_SIZE); + free_workflow(&workflow); + return NULL; + } + + return workflow; +} + +static void free_analysis_data(struct nDPId_flow_extended * const flow_ext) +{ + if (GET_CMDARG_BOOL(nDPId_options.enable_data_analysis) != 0 && flow_ext->flow_analysis != NULL) + { + ndpi_free_data_analysis(&flow_ext->flow_analysis->iat, 0); + ndpi_free_data_analysis(&flow_ext->flow_analysis->pktlen, 0); + ndpi_free(flow_ext->flow_analysis->directions); + ndpi_free_bin(&flow_ext->flow_analysis->payload_len_bin[FD_SRC2DST]); + ndpi_free_bin(&flow_ext->flow_analysis->payload_len_bin[FD_DST2SRC]); + ndpi_free(flow_ext->flow_analysis->entropies); + ndpi_free(flow_ext->flow_analysis); + flow_ext->flow_analysis = NULL; + } +} + +static void free_detection_data(struct nDPId_flow * const flow) +{ + if (flow->info.detection_data != NULL) + { + ndpi_free_flow_data(&flow->info.detection_data->flow); + ndpi_free(flow->info.detection_data); + flow->info.detection_data = NULL; + } +} + +static int alloc_detection_data(struct nDPId_flow * const flow) +{ + flow->info.detection_data = (struct nDPId_detection_data *)ndpi_flow_malloc(sizeof(*flow->info.detection_data)); + + if (flow->info.detection_data == NULL) + { + goto error; + } + + memset(flow->info.detection_data, 0, sizeof(*flow->info.detection_data)); + + if (GET_CMDARG_BOOL(nDPId_options.enable_data_analysis) != 0) + { + flow->flow_extended.flow_analysis = + (struct nDPId_flow_analysis *)ndpi_malloc(sizeof(*flow->flow_extended.flow_analysis)); + if (flow->flow_extended.flow_analysis == NULL) + { + goto error; + } + + ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->iat, + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_analyse) - + 1 /* first packet IAT is always 0 */); + ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->pktlen, + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_analyse)); + flow->flow_extended.flow_analysis->directions = + (uint8_t *)ndpi_malloc(sizeof(*flow->flow_extended.flow_analysis->directions) * + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_analyse)); + flow->flow_extended.flow_analysis->entropies = + (float *)ndpi_malloc(sizeof(*flow->flow_extended.flow_analysis->entropies) * + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_analyse)); + + if (ndpi_init_bin(&flow->flow_extended.flow_analysis->payload_len_bin[FD_SRC2DST], + ndpi_bin_family8, + nDPId_ANALYZE_PLEN_NUM_BINS) != 0 || + ndpi_init_bin(&flow->flow_extended.flow_analysis->payload_len_bin[FD_DST2SRC], + ndpi_bin_family8, + nDPId_ANALYZE_PLEN_NUM_BINS) != 0 || + flow->flow_extended.flow_analysis->iat.values == NULL || + flow->flow_extended.flow_analysis->pktlen.values == NULL || + flow->flow_extended.flow_analysis->directions == NULL || + flow->flow_extended.flow_analysis->entropies == NULL) + { + goto error; + } + } + + return 0; +error: + free_detection_data(flow); + flow->info.detection_completed = 1; + return 1; +} + +static void ndpi_flow_info_free(void * const node) +{ + struct nDPId_flow_basic * const flow_basic = (struct nDPId_flow_basic *)node; + + switch (flow_basic->state) + { + case FS_UNKNOWN: + case FS_COUNT: + + case FS_SKIPPED: + break; + + case FS_FINISHED: + { + struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic; + free(flow->finished.hostname); + free_analysis_data(&flow->flow_extended); + break; + } + + case FS_INFO: + { + struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic; + free_analysis_data(&flow->flow_extended); + free_detection_data(flow); + break; + } + } + ndpi_free(flow_basic); +} + +static void free_workflow(struct nDPId_workflow ** const workflow) +{ + struct nDPId_workflow * const w = *workflow; + + if (w == NULL) + { + return; + } + +#ifdef ENABLE_PFRING + if (GET_CMDARG_BOOL(nDPId_options.use_pfring) != 0) + { + npfring_close(&w->npf); + } +#endif + + if (w->pcap_handle != NULL) + { + pcap_close(w->pcap_handle); + w->pcap_handle = NULL; + } + + if (w->ndpi_struct != NULL) + { + ndpi_exit_detection_module(w->ndpi_struct); + } + for (size_t i = 0; i < w->max_active_flows; i++) + { + if (w->ndpi_flows_active != NULL) + { + ndpi_tdestroy(w->ndpi_flows_active[i], ndpi_flow_info_free); + } + } + ndpi_free(w->ndpi_flows_active); + ndpi_free(w->ndpi_flows_idle); + ndpi_term_serializer(&w->ndpi_serializer); + ndpi_free(w); + *workflow = NULL; +} + +static char * get_default_pcapdev(char * errbuf) +{ + char * ifname; + pcap_if_t * all_devices = NULL; + + if (pcap_findalldevs(&all_devices, errbuf) != 0) + { + return NULL; + } + + ifname = strdup(all_devices[0].name); + pcap_freealldevs(all_devices); + + return ifname; +} + +static int setup_reader_threads(void) +{ + char pcap_error_buffer[PCAP_ERRBUF_SIZE]; + + if (GET_CMDARG_ULL(nDPId_options.reader_thread_count) > nDPId_MAX_READER_THREADS) + { + return 1; + } + + if (IS_CMDARG_SET(nDPId_options.pcap_file_or_interface) == 0) + { + char * const pcapdev = get_default_pcapdev(pcap_error_buffer); + set_cmdarg_string(&nDPId_options.pcap_file_or_interface, pcapdev); + free(pcapdev); + if (IS_CMDARG_SET(nDPId_options.pcap_file_or_interface) == 0) + { + logger_early(1, "pcap_lookupdev: %.*s", (int)PCAP_ERRBUF_SIZE, pcap_error_buffer); + return 1; + } + logger_early(0, + "Capturing packets from default device: %s", + GET_CMDARG_STR(nDPId_options.pcap_file_or_interface)); + } + + errno = 0; + if (access(GET_CMDARG_STR(nDPId_options.pcap_file_or_interface), R_OK) != 0 && errno == ENOENT) + { + errno = 0; + if (get_ip_netmask_from_pcap_dev(GET_CMDARG_STR(nDPId_options.pcap_file_or_interface)) != 0) + { + if (errno != 0) + { + logger_early(1, + "Could not get netmask for pcap device %s: %s", + GET_CMDARG_STR(nDPId_options.pcap_file_or_interface), + strerror(errno)); + } + else + { + logger_early(1, + "Unexpected error while retrieving netmask for pcap device %s", + GET_CMDARG_STR(nDPId_options.pcap_file_or_interface)); + } + return 1; + } + } + + for (unsigned long long int i = 0; i < GET_CMDARG_ULL(nDPId_options.reader_thread_count); ++i) + { + reader_threads[i].workflow = init_workflow(GET_CMDARG_STR(nDPId_options.pcap_file_or_interface)); + if (reader_threads[i].workflow == NULL) + { + return 1; + } + } + + return 0; +} + +static int ip_tuples_compare(struct nDPId_flow_basic const * const A, struct nDPId_flow_basic const * const B) +{ + // generate a warning if the enum changes + switch (A->l3_type) + { + case L3_IP: + case L3_IP6: + break; + } + + if (A->l3_type == L3_IP && B->l3_type == L3_IP) + { + if (A->src.v4.ip < B->src.v4.ip) + { + return -1; + } + if (A->src.v4.ip > B->src.v4.ip) + { + return 1; + } + if (A->dst.v4.ip < B->dst.v4.ip) + { + return -1; + } + if (A->dst.v4.ip > B->dst.v4.ip) + { + return 1; + } + } + else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6) + { + if (A->src.v6.ip[0] < B->src.v6.ip[0] && A->src.v6.ip[1] < B->src.v6.ip[1]) + { + return -1; + } + if (A->src.v6.ip[0] > B->src.v6.ip[0] && A->src.v6.ip[1] > B->src.v6.ip[1]) + { + return 1; + } + if (A->dst.v6.ip[0] < B->dst.v6.ip[0] && A->dst.v6.ip[1] < B->dst.v6.ip[1]) + { + return -1; + } + if (A->dst.v6.ip[0] > B->dst.v6.ip[0] && A->dst.v6.ip[1] > B->dst.v6.ip[1]) + { + return 1; + } + } + + if (A->src_port < B->src_port) + { + return -1; + } + if (A->src_port > B->src_port) + { + return 1; + } + if (A->dst_port < B->dst_port) + { + return -1; + } + if (A->dst_port > B->dst_port) + { + return 1; + } + + return 0; +} + +static uint64_t get_l4_protocol_idle_time(uint8_t l4_protocol) +{ + switch (l4_protocol) + { + case IPPROTO_ICMP: + case IPPROTO_ICMPV6: + return GET_CMDARG_ULL(nDPId_options.icmp_max_idle_time); + case IPPROTO_TCP: + return GET_CMDARG_ULL(nDPId_options.tcp_max_idle_time); + case IPPROTO_UDP: + return GET_CMDARG_ULL(nDPId_options.udp_max_idle_time); + default: + return GET_CMDARG_ULL(nDPId_options.generic_max_idle_time); + } +} + +static uint64_t get_l4_protocol_idle_time_external(uint8_t l4_protocol) +{ + uint64_t idle_time = get_l4_protocol_idle_time(l4_protocol); + + idle_time += GET_CMDARG_ULL(nDPId_options.flow_scan_interval) * 2; + if (l4_protocol == IPPROTO_TCP) + { + idle_time += GET_CMDARG_ULL(nDPId_options.tcp_max_post_end_flow_time); + } + + return idle_time; +} + +static int is_l4_protocol_timed_out(struct nDPId_workflow const * const workflow, + struct nDPId_flow_basic const * const flow_basic) +{ + uint64_t itime = get_l4_protocol_idle_time(flow_basic->l4_protocol); + + return flow_basic->tcp_fin_rst_seen == 1 || get_last_pkt_time(flow_basic) + itime <= workflow->last_thread_time; +} + +static int is_tcp_post_end(struct nDPId_workflow const * const workflow, + struct nDPId_flow_basic const * const flow_basic) +{ + return flow_basic->l4_protocol != IPPROTO_TCP || flow_basic->tcp_fin_rst_seen == 0 || + (flow_basic->tcp_fin_rst_seen == 1 && + get_last_pkt_time(flow_basic) + GET_CMDARG_ULL(nDPId_options.tcp_max_post_end_flow_time) <= + workflow->last_thread_time); +} + +static int is_flow_update_required(struct nDPId_workflow const * const workflow, + struct nDPId_flow_extended const * const flow_ext) +{ + uint64_t itime = get_l4_protocol_idle_time(flow_ext->flow_basic.l4_protocol); + + if (flow_ext->flow_basic.l4_protocol != IPPROTO_TCP) + { + return flow_ext->last_flow_update + itime / 4 <= workflow->last_thread_time; + } + return flow_ext->last_flow_update + itime <= workflow->last_thread_time; +} + +static int is_error_event_threshold(struct nDPId_workflow * const workflow) +{ + if (workflow->last_global_time - workflow->last_error_time > + GET_CMDARG_ULL(nDPId_options.error_event_threshold_time)) + { + workflow->error_count = 0; + } + + workflow->last_error_time = workflow->last_global_time; + if (workflow->error_count >= GET_CMDARG_ULL(nDPId_options.error_event_threshold_n)) + { + return 1; + } + + workflow->error_count++; + return 0; +} + +static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) +{ + struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data; + struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; + + (void)depth; + + if (workflow == NULL || flow_basic == NULL) + { + return; + } + + if (workflow->cur_idle_flows == GET_CMDARG_ULL(nDPId_options.max_idle_flows_per_thread)) + { + return; + } + + if (which == ndpi_preorder || which == ndpi_leaf) + { + if (is_l4_protocol_timed_out(workflow, flow_basic) != 0) + { + if (is_tcp_post_end(workflow, flow_basic) != 0) + { + workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow_basic; + switch (flow_basic->state) + { + case FS_UNKNOWN: + case FS_COUNT: + + case FS_SKIPPED: + break; + + case FS_FINISHED: + case FS_INFO: + workflow->total_idle_flows++; + break; + } + } + } + } +} + +static int ndpi_workflow_node_cmp(void const * const A, void const * const B) +{ + struct nDPId_flow_basic const * const flow_basic_a = (struct nDPId_flow_basic *)A; + struct nDPId_flow_basic const * const flow_basic_b = (struct nDPId_flow_basic *)B; + + if (flow_basic_a->hashval < flow_basic_b->hashval) + { + return -1; + } + else if (flow_basic_a->hashval > flow_basic_b->hashval) + { + return 1; + } + + /* flows have the same hash */ + if (flow_basic_a->l4_protocol < flow_basic_b->l4_protocol) + { + return -1; + } + else if (flow_basic_a->l4_protocol > flow_basic_b->l4_protocol) + { + return 1; + } + + return ip_tuples_compare(flow_basic_a, flow_basic_b); +} + +static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, size_t idle_scan_index) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + + while (workflow->cur_idle_flows > 0) + { + struct nDPId_flow_basic * const flow_basic = + (struct nDPId_flow_basic *)workflow->ndpi_flows_idle[--workflow->cur_idle_flows]; + + switch (flow_basic->state) + { + case FS_UNKNOWN: + case FS_COUNT: + + case FS_SKIPPED: + break; + + case FS_FINISHED: + { + struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic; + + if (flow->flow_extended.flow_basic.tcp_fin_rst_seen != 0) + { + jsonize_flow_event(reader_thread, &flow->flow_extended, FLOW_EVENT_END); + } + else + { + jsonize_flow_event(reader_thread, &flow->flow_extended, FLOW_EVENT_IDLE); + } + break; + } + + case FS_INFO: + { + struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic; + +#ifdef ENABLE_ZLIB + if (GET_CMDARG_BOOL(nDPId_options.enable_zlib_compression) != 0 && + flow->info.detection_data_compressed_size > 0) + { + workflow->current_compression_diff -= flow->info.detection_data_compressed_size; + int ret = detection_data_inflate(flow); + if (ret <= 0) + { + workflow->current_compression_diff += flow->info.detection_data_compressed_size; + logger(1, "zLib decompression failed with error code: %d", ret); + return; + } + } +#endif + + if (flow->info.detection_completed == 0) + { + uint8_t protocol_was_guessed = 0; + + if (ndpi_is_protocol_detected(flow->info.detection_data->guessed_l7_protocol) == 0) + { + flow->info.detection_data->guessed_l7_protocol = + ndpi_detection_giveup(workflow->ndpi_struct, + &flow->info.detection_data->flow, + &protocol_was_guessed); + } + else + { + protocol_was_guessed = 1; + } + + if (protocol_was_guessed != 0) + { + workflow->total_guessed_flows++; + jsonize_flow_detection_event(reader_thread, flow, FLOW_EVENT_GUESSED); + } + else + { + workflow->total_not_detected_flows++; + jsonize_flow_detection_event(reader_thread, flow, FLOW_EVENT_NOT_DETECTED); + } + } + if (flow->flow_extended.flow_basic.tcp_fin_rst_seen != 0) + { + jsonize_flow_event(reader_thread, &flow->flow_extended, FLOW_EVENT_END); + } + else + { + jsonize_flow_event(reader_thread, &flow->flow_extended, FLOW_EVENT_IDLE); + } + break; + } + } + + ndpi_tdelete(flow_basic, &workflow->ndpi_flows_active[idle_scan_index], ndpi_workflow_node_cmp); + ndpi_flow_info_free(flow_basic); + workflow->cur_active_flows--; + } +} + +static void check_for_idle_flows(struct nDPId_reader_thread * const reader_thread) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + + for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index) + { + ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow); + process_idle_flow(reader_thread, idle_scan_index); + } +} + +static void ndpi_flow_update_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) +{ + struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)user_data; + struct nDPId_workflow * const workflow = reader_thread->workflow; + struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; + + (void)depth; + + if (workflow == NULL || flow_basic == NULL) + { + return; + } + + if (which == ndpi_preorder || which == ndpi_leaf) + { + switch (flow_basic->state) + { + case FS_UNKNOWN: + case FS_COUNT: + + case FS_SKIPPED: + break; + + case FS_FINISHED: + case FS_INFO: + { + struct nDPId_flow_extended * const flow_ext = (struct nDPId_flow_extended *)flow_basic; + + if (is_flow_update_required(workflow, flow_ext) != 0) + { + workflow->total_flow_updates++; + jsonize_flow_event(reader_thread, flow_ext, FLOW_EVENT_UPDATE); + flow_ext->last_flow_update = workflow->last_thread_time; + } + break; + } + } + } +} + +static void check_for_flow_updates(struct nDPId_reader_thread * const reader_thread) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + + for (size_t update_scan_index = 0; update_scan_index < workflow->max_active_flows; ++update_scan_index) + { + ndpi_twalk(workflow->ndpi_flows_active[update_scan_index], ndpi_flow_update_scan_walker, reader_thread); + } +} + +static void jsonize_l3_l4(struct nDPId_workflow * const workflow, struct nDPId_flow_basic const * const flow_basic) +{ + ndpi_serializer * const serializer = &workflow->ndpi_serializer; + char src_name[48] = {}; + char dst_name[48] = {}; + + switch (flow_basic->l3_type) + { + case L3_IP: + ndpi_serialize_string_string(serializer, "l3_proto", "ip4"); + if (inet_ntop(AF_INET, &flow_basic->src.v4.ip, src_name, sizeof(src_name)) == NULL) + { + logger(1, "Could not convert IPv4 source ip to string: %s", strerror(errno)); + } + if (inet_ntop(AF_INET, &flow_basic->dst.v4.ip, dst_name, sizeof(dst_name)) == NULL) + { + logger(1, "Could not convert IPv4 destination ip to string: %s", strerror(errno)); + } + break; + case L3_IP6: + ndpi_serialize_string_string(serializer, "l3_proto", "ip6"); + if (inet_ntop(AF_INET6, &flow_basic->src.v6.ip[0], src_name, sizeof(src_name)) == NULL) + { + logger(1, "Could not convert IPv6 source ip to string: %s", strerror(errno)); + } + if (inet_ntop(AF_INET6, &flow_basic->dst.v6.ip[0], dst_name, sizeof(dst_name)) == NULL) + { + logger(1, "Could not convert IPv6 destination ip to string: %s", strerror(errno)); + } + + /* For consistency across platforms replace :0: with :: */ + ndpi_patchIPv6Address(src_name), ndpi_patchIPv6Address(dst_name); + break; + default: + ndpi_serialize_string_string(serializer, "l3_proto", "unknown"); + } + + ndpi_serialize_string_string(serializer, "src_ip", src_name); + ndpi_serialize_string_string(serializer, "dst_ip", dst_name); + if (flow_basic->src_port) + { + ndpi_serialize_string_uint32(serializer, "src_port", flow_basic->src_port); + } + if (flow_basic->dst_port) + { + ndpi_serialize_string_uint32(serializer, "dst_port", flow_basic->dst_port); + } + + switch (flow_basic->l4_protocol) + { + case IPPROTO_TCP: + ndpi_serialize_string_string(serializer, "l4_proto", "tcp"); + break; + case IPPROTO_UDP: + ndpi_serialize_string_string(serializer, "l4_proto", "udp"); + break; + case IPPROTO_ICMP: + ndpi_serialize_string_string(serializer, "l4_proto", "icmp"); + break; + case IPPROTO_ICMPV6: + ndpi_serialize_string_string(serializer, "l4_proto", "icmp6"); + break; + default: + ndpi_serialize_string_uint32(serializer, "l4_proto", flow_basic->l4_protocol); + break; + } +} + +static void jsonize_basic(struct nDPId_reader_thread * const reader_thread, int serialize_thread_id) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + + if (serialize_thread_id != 0) + { + ndpi_serialize_string_int32(&workflow->ndpi_serializer, "thread_id", reader_thread->array_index); + } + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "packet_id", workflow->packets_captured); + ndpi_serialize_string_string(&workflow->ndpi_serializer, + "source", + GET_CMDARG_STR(nDPId_options.pcap_file_or_interface)); + ndpi_serialize_string_string(&workflow->ndpi_serializer, "alias", GET_CMDARG_STR(nDPId_options.instance_alias)); +} + +static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enum daemon_event event) +{ + char const ev[] = "daemon_event_name"; + struct nDPId_workflow * const workflow = reader_thread->workflow; + + if (event == DAEMON_EVENT_RECONNECT) + { + ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer); + } + + ndpi_serialize_string_int32(&workflow->ndpi_serializer, "daemon_event_id", event); + if (event > DAEMON_EVENT_INVALID && event < DAEMON_EVENT_COUNT) + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, daemon_event_name_table[event]); + } + else + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, daemon_event_name_table[DAEMON_EVENT_INVALID]); + } + + jsonize_basic(reader_thread, 1); +#ifndef PKG_VERSION + ndpi_serialize_string_string(&workflow->ndpi_serializer, "version", "unknown"); +#else + ndpi_serialize_string_string(&workflow->ndpi_serializer, "version", PKG_VERSION); +#endif + ndpi_serialize_string_string(&workflow->ndpi_serializer, "ndpi_version", ndpi_revision()); + + switch (event) + { + case DAEMON_EVENT_INVALID: + case DAEMON_EVENT_COUNT: + break; + + case DAEMON_EVENT_INIT: + case DAEMON_EVENT_RECONNECT: + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-flows-per-thread", + GET_CMDARG_ULL(nDPId_options.max_flows_per_thread)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-idle-flows-per-thread", + GET_CMDARG_ULL(nDPId_options.max_idle_flows_per_thread)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "reader-thread-count", + GET_CMDARG_ULL(nDPId_options.reader_thread_count)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow-scan-interval", + GET_CMDARG_ULL(nDPId_options.flow_scan_interval)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "generic-max-idle-time", + GET_CMDARG_ULL(nDPId_options.generic_max_idle_time)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "icmp-max-idle-time", + GET_CMDARG_ULL(nDPId_options.icmp_max_idle_time)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "udp-max-idle-time", + GET_CMDARG_ULL(nDPId_options.udp_max_idle_time)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "tcp-max-idle-time", + GET_CMDARG_ULL(nDPId_options.tcp_max_idle_time) + + GET_CMDARG_ULL(nDPId_options.tcp_max_post_end_flow_time)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-packets-per-flow-to-send", + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_send)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-packets-per-flow-to-process", + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_process)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "max-packets-per-flow-to-analyse", + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_analyse)); + break; + + case DAEMON_EVENT_STATUS: + case DAEMON_EVENT_SHUTDOWN: + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "packets-captured", workflow->packets_captured); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "packets-processed", workflow->packets_processed); +#ifdef ENABLE_PFRING + { + int rc; + struct npfring_stats stats = {}; + + if (GET_CMDARG_BOOL(nDPId_options.use_pfring) != 0) + { + if ((rc = npfring_stats(&workflow->npf, &stats)) != 0) + { + logger(1, "[%8llu] PF_RING stats returned: %d", reader_thread->workflow->packets_processed, rc); + } + ndpi_serialize_string_boolean(&workflow->ndpi_serializer, "pfring_active", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pfring_recv", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pfring_drop", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pfring_shunt", 0); + } + else + { + ndpi_serialize_string_boolean(&workflow->ndpi_serializer, + "pfring_active", + GET_CMDARG_BOOL(nDPId_options.use_pfring)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pfring_recv", stats.recv); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pfring_drop", stats.drop); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pfring_shunt", stats.shunt); + } + } +#else + ndpi_serialize_string_boolean(&workflow->ndpi_serializer, "pfring_active", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pfring_recv", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pfring_drop", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "pfring_shunt", 0); +#endif + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-skipped-flows", + workflow->total_skipped_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-l4-payload-len", + workflow->total_l4_payload_len); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-not-detected-flows", + workflow->total_not_detected_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-guessed-flows", + workflow->total_guessed_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-detected-flows", + workflow->total_detected_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-detection-updates", + workflow->total_flow_detection_updates); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-updates", workflow->total_flow_updates); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "current-active-flows", + workflow->cur_active_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-active-flows", + workflow->total_active_flows); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-idle-flows", workflow->total_idle_flows); +#if defined(ENABLE_ZLIB) && !defined(NO_MAIN) + /* Compression diff's may very from run to run. Due to this, `nDPId-test' would be inconsistent. */ + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-compressions", + workflow->total_compressions); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-compression-diff", + workflow->total_compression_diff); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "current-compression-diff", + workflow->current_compression_diff); +#else + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-compressions", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-compression-diff", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "current-compression-diff", 0); +#endif +#if defined(ENABLE_MEMORY_STATUS) && !defined(NO_MAIN) + /* + * Global memory stats may very from run to run. + * Due to this, `nDPId-test' results would be inconsistent and is disabled if NO_MAIN defined. + */ + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "global-alloc-count", + MT_GET_AND_ADD(ndpi_memory_alloc_count, 0)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "global-free-count", + MT_GET_AND_ADD(ndpi_memory_free_count, 0)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "global-alloc-bytes", + MT_GET_AND_ADD(ndpi_memory_alloc_bytes, 0)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "global-free-bytes", + MT_GET_AND_ADD(ndpi_memory_free_bytes, 0)); +#else + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global-alloc-count", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global-free-count", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global-alloc-bytes", 0); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global-free-bytes", 0); +#endif + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "total-events-serialized", + workflow->total_events_serialized + + 1 /* DAEMON_EVENT_SHUTDOWN is an event as well */); + break; + } + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_usec", workflow->last_global_time); + serialize_and_send(reader_thread); +} + +static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_flow_extended const * const flow_ext) +{ + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_id", flow_ext->flow_id); + ndpi_serialize_string_string(&workflow->ndpi_serializer, + "flow_state", + flow_state_name_table[flow_ext->flow_basic.state]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_src_packets_processed", + flow_ext->packets_processed[FD_SRC2DST]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_dst_packets_processed", + flow_ext->packets_processed[FD_DST2SRC]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_first_seen", flow_ext->first_seen); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_src_last_pkt_time", + flow_ext->flow_basic.last_pkt_time[FD_SRC2DST]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_dst_last_pkt_time", + flow_ext->flow_basic.last_pkt_time[FD_DST2SRC]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_idle_time", + get_l4_protocol_idle_time_external(flow_ext->flow_basic.l4_protocol)); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_src_min_l4_payload_len", + flow_ext->min_l4_payload_len[FD_SRC2DST]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_dst_min_l4_payload_len", + flow_ext->min_l4_payload_len[FD_DST2SRC]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_src_max_l4_payload_len", + flow_ext->max_l4_payload_len[FD_SRC2DST]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_dst_max_l4_payload_len", + flow_ext->max_l4_payload_len[FD_DST2SRC]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_src_tot_l4_payload_len", + flow_ext->total_l4_payload_len[FD_SRC2DST]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_dst_tot_l4_payload_len", + flow_ext->total_l4_payload_len[FD_DST2SRC]); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow_ext->flow_basic.tcp_is_midstream_flow); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_usec", workflow->last_thread_time); +} + +static int connect_to_collector(struct nDPId_reader_thread * const reader_thread) +{ + if (reader_thread->collector_sockfd >= 0) + { + close(reader_thread->collector_sockfd); + } + + int sock_type = (nDPId_options.parsed_collector_address.raw.sa_family == AF_UNIX ? SOCK_STREAM : SOCK_DGRAM); + reader_thread->collector_sockfd = socket(nDPId_options.parsed_collector_address.raw.sa_family, sock_type, 0); + if (reader_thread->collector_sockfd < 0 || set_fd_cloexec(reader_thread->collector_sockfd) < 0) + { + reader_thread->collector_sock_last_errno = errno; + return 1; + } + + int opt = NETWORK_BUFFER_MAX_SIZE; + if (setsockopt(reader_thread->collector_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0) + { + return 1; + } + + if (set_collector_nonblock(reader_thread) != 0) + { + return 1; + } + + if (connect(reader_thread->collector_sockfd, + &nDPId_options.parsed_collector_address.raw, + nDPId_options.parsed_collector_address.size) < 0) + { + reader_thread->collector_sock_last_errno = errno; + return 1; + } + + if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0) + { + reader_thread->collector_sock_last_errno = errno; + return 1; + } + + reader_thread->collector_sock_last_errno = 0; + + return 0; +} + +static void send_to_collector(struct nDPId_reader_thread * const reader_thread, + char const * const json_msg, + size_t json_msg_len) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + int saved_errno; + int s_ret; + char newline_json_msg[NETWORK_BUFFER_MAX_SIZE]; + + s_ret = snprintf(newline_json_msg, + sizeof(newline_json_msg), + "%0" NETWORK_BUFFER_LENGTH_DIGITS_STR "zu%.*s\n", + json_msg_len + 1, + (int)json_msg_len, + json_msg); + + if (s_ret < 0 || s_ret >= (int)sizeof(newline_json_msg)) + { + logger(1, + "[%8llu, %zu] JSON buffer prepare failed: snprintf returned %d, buffer size %zu", + workflow->packets_captured, + reader_thread->array_index, + s_ret, + sizeof(newline_json_msg)); + if (s_ret >= (int)sizeof(newline_json_msg)) + { + logger(1, + "[%8llu, %zu] JSON message: %.*s...", + workflow->packets_captured, + reader_thread->array_index, + ndpi_min(512, NETWORK_BUFFER_MAX_SIZE), + newline_json_msg); + } + return; + } + + if (reader_thread->collector_sock_last_errno != 0) + { + saved_errno = reader_thread->collector_sock_last_errno; + + if (connect_to_collector(reader_thread) == 0) + { + if (nDPId_options.parsed_collector_address.raw.sa_family == AF_UNIX) + { + logger(1, + "[%8llu, %zu] Reconnected to nDPIsrvd Collector at %s", + workflow->packets_captured, + reader_thread->array_index, + GET_CMDARG_STR(nDPId_options.collector_address)); + jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT); + } + } + else + { + if (saved_errno != reader_thread->collector_sock_last_errno) + { + logger(1, + "[%8llu, %zu] Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s", + workflow->packets_captured, + reader_thread->array_index, + GET_CMDARG_STR(nDPId_options.collector_address), + (reader_thread->collector_sock_last_errno != 0 + ? strerror(reader_thread->collector_sock_last_errno) + : "Internal Error.")); + } + return; + } + } + + errno = 0; + ssize_t written; + if (reader_thread->collector_sock_last_errno == 0 && + (written = write(reader_thread->collector_sockfd, newline_json_msg, s_ret)) != s_ret) + { + saved_errno = errno; + if (saved_errno == EPIPE || written == 0) + { + logger(1, + "[%8llu, %zu] Lost connection to nDPIsrvd Collector", + workflow->packets_captured, + reader_thread->array_index); + } + if (saved_errno != EAGAIN) + { + if (saved_errno == ECONNREFUSED) + { + logger(1, + "[%8llu, %zu] %s to %s refused by endpoint", + workflow->packets_captured, + reader_thread->array_index, + (nDPId_options.parsed_collector_address.raw.sa_family == AF_UNIX ? "Connection" : "Datagram"), + GET_CMDARG_STR(nDPId_options.collector_address)); + } + reader_thread->collector_sock_last_errno = saved_errno; + } + else if (nDPId_options.parsed_collector_address.raw.sa_family == AF_UNIX) + { + size_t pos = (written < 0 ? 0 : written); + set_collector_block(reader_thread); + while ((size_t)(written = write(reader_thread->collector_sockfd, newline_json_msg + pos, s_ret - pos)) != + s_ret - pos) + { + saved_errno = errno; + if (saved_errno == EPIPE || written == 0) + { + logger(1, + "[%8llu, %zu] Lost connection to nDPIsrvd Collector", + workflow->packets_captured, + reader_thread->array_index); + reader_thread->collector_sock_last_errno = saved_errno; + break; + } + else if (written < 0) + { + logger(1, + "[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector at %s failed: %s", + workflow->packets_captured, + reader_thread->array_index, + GET_CMDARG_STR(nDPId_options.collector_address), + strerror(saved_errno)); + reader_thread->collector_sock_last_errno = saved_errno; + break; + } + else + { + pos += written; + } + } + set_collector_nonblock(reader_thread); + } + } +} + +static void serialize_and_send(struct nDPId_reader_thread * const reader_thread) +{ + char * json_msg; + uint32_t json_msg_len; + + json_msg = ndpi_serializer_get_buffer(&reader_thread->workflow->ndpi_serializer, &json_msg_len); + if (json_msg == NULL || json_msg_len == 0) + { + logger(1, + "[%8llu, %zu] jsonize failed, buffer length: %u", + reader_thread->workflow->packets_captured, + reader_thread->array_index, + json_msg_len); + } + else + { + reader_thread->workflow->total_events_serialized++; + send_to_collector(reader_thread, json_msg, json_msg_len); + } + ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer); +} + +/* Slightly modified code from: https://en.wikibooks.org/wiki/Algorithm_Implementation/Miscellaneous/Base64 */ +static int base64_encode(uint8_t const * const data_buf, + size_t dataLength, + char * const result, + size_t * const resultSize) +{ + static const char base64chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + const uint8_t * data = (const uint8_t *)data_buf; + size_t resultIndex = 0; + size_t x; + uint32_t n = 0; + int padCount = dataLength % 3; + uint8_t n0, n1, n2, n3; + + /* increment over the length of the string, three characters at a time */ + for (x = 0; x < dataLength; x += 3) + { + /* these three 8-bit (ASCII) characters become one 24-bit number */ + n = ((uint32_t)data[x]) << 16; // parenthesis needed, compiler depending on flags can do the shifting before + // conversion to uint32_t, resulting to 0 + + if ((x + 1) < dataLength) + n += ((uint32_t)data[x + 1]) << 8; // parenthesis needed, compiler depending on flags can do the shifting + // before conversion to uint32_t, resulting to 0 + + if ((x + 2) < dataLength) + n += data[x + 2]; + + /* this 24-bit number gets separated into four 6-bit numbers */ + n0 = (uint8_t)(n >> 18) & 63; + n1 = (uint8_t)(n >> 12) & 63; + n2 = (uint8_t)(n >> 6) & 63; + n3 = (uint8_t)n & 63; + + /* + * if we have one byte available, then its encoding is spread + * out over two characters + */ + if (resultIndex >= *resultSize) + return 1; /* indicate failure: buffer too small */ + result[resultIndex++] = base64chars[n0]; + if (resultIndex >= *resultSize) + return 1; /* indicate failure: buffer too small */ + result[resultIndex++] = base64chars[n1]; + + /* + * if we have only two bytes available, then their encoding is + * spread out over three chars + */ + if ((x + 1) < dataLength) + { + if (resultIndex >= *resultSize) + return 1; /* indicate failure: buffer too small */ + result[resultIndex++] = base64chars[n2]; + } + + /* + * if we have all three bytes available, then their encoding is spread + * out over four characters + */ + if ((x + 2) < dataLength) + { + if (resultIndex >= *resultSize) + return 1; /* indicate failure: buffer too small */ + result[resultIndex++] = base64chars[n3]; + } + } + + /* + * create and add padding that is required if we did not have a multiple of 3 + * number of characters available + */ + if (padCount > 0) + { + for (; padCount < 3; padCount++) + { + if (resultIndex >= *resultSize) + return 1; /* indicate failure: buffer too small */ + result[resultIndex++] = '='; + } + } + if (resultIndex >= *resultSize) + return 1; /* indicate failure: buffer too small */ + result[resultIndex] = 0; + *resultSize = resultIndex; + return 0; /* indicate success */ +} + +static void jsonize_data_analysis(struct nDPId_reader_thread * const reader_thread, + struct nDPId_flow_extended const * const flow_ext) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + struct nDPId_flow_analysis * const analysis = (struct nDPId_flow_analysis *)flow_ext->flow_analysis; + + if (GET_CMDARG_BOOL(nDPId_options.enable_data_analysis) != 0 && flow_ext->flow_analysis != NULL) + { + ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "data_analysis"); + ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "iat"); + + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "min", ndpi_data_min(&analysis->iat)); + ndpi_serialize_string_float(&workflow->ndpi_serializer, "avg", ndpi_data_average(&analysis->iat), "%.1f"); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "max", ndpi_data_max(&analysis->iat)); + ndpi_serialize_string_float(&workflow->ndpi_serializer, "stddev", ndpi_data_stddev(&analysis->iat), "%.1f"); + ndpi_serialize_string_float(&workflow->ndpi_serializer, "var", ndpi_data_variance(&analysis->iat), "%.1f"); + ndpi_serialize_string_float(&workflow->ndpi_serializer, "ent", ndpi_data_entropy(&analysis->iat), "%.1f"); + + ndpi_serialize_start_of_list(&workflow->ndpi_serializer, "data"); + for (uint16_t i = 0; i < analysis->iat.num_values_array_len; ++i) + { + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "", analysis->iat.values[i]); + } + ndpi_serialize_end_of_list(&workflow->ndpi_serializer); + ndpi_serialize_end_of_block(&workflow->ndpi_serializer); + + ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "pktlen"); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "min", ndpi_data_min(&analysis->pktlen)); + ndpi_serialize_string_float(&workflow->ndpi_serializer, "avg", ndpi_data_average(&analysis->pktlen), "%.1f"); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "max", ndpi_data_max(&analysis->pktlen)); + ndpi_serialize_string_float(&workflow->ndpi_serializer, "stddev", ndpi_data_stddev(&analysis->pktlen), "%.1f"); + ndpi_serialize_string_float(&workflow->ndpi_serializer, "var", ndpi_data_variance(&analysis->pktlen), "%.1f"); + ndpi_serialize_string_float(&workflow->ndpi_serializer, "ent", ndpi_data_entropy(&analysis->pktlen), "%.1f"); + + ndpi_serialize_start_of_list(&workflow->ndpi_serializer, "data"); + for (uint16_t i = 0; i < ndpi_min(analysis->pktlen.num_data_entries, analysis->pktlen.num_values_array_len); + ++i) + { + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "", analysis->pktlen.values[i]); + } + ndpi_serialize_end_of_list(&workflow->ndpi_serializer); + ndpi_serialize_end_of_block(&workflow->ndpi_serializer); + + ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "bins"); + ndpi_serialize_start_of_list(&workflow->ndpi_serializer, "c_to_s"); + for (uint16_t i = 0; i < analysis->payload_len_bin[FD_SRC2DST].num_bins; ++i) + { + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, + "", + analysis->payload_len_bin[FD_SRC2DST].u.bins8[i]); + } + ndpi_serialize_end_of_list(&workflow->ndpi_serializer); + ndpi_serialize_start_of_list(&workflow->ndpi_serializer, "s_to_c"); + for (uint16_t i = 0; i < analysis->payload_len_bin[FD_DST2SRC].num_bins; ++i) + { + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, + "", + analysis->payload_len_bin[FD_DST2SRC].u.bins8[i]); + } + ndpi_serialize_end_of_list(&workflow->ndpi_serializer); + ndpi_serialize_end_of_block(&workflow->ndpi_serializer); + + ndpi_serialize_start_of_list(&workflow->ndpi_serializer, "directions"); + for (unsigned long long int i = 0; i < GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_analyse); ++i) + { + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "", analysis->directions[i]); + } + ndpi_serialize_end_of_list(&workflow->ndpi_serializer); + + ndpi_serialize_start_of_list(&workflow->ndpi_serializer, "entropies"); + for (unsigned long long int i = 0; i < GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_analyse); ++i) + { + ndpi_serialize_string_float(&workflow->ndpi_serializer, "", analysis->entropies[i], "%.9f"); + } + ndpi_serialize_end_of_list(&workflow->ndpi_serializer); + + ndpi_serialize_end_of_block(&workflow->ndpi_serializer); + } +} + +static void jsonize_packet_event(struct nDPId_reader_thread * const reader_thread, + struct pcap_pkthdr const * const header, + uint8_t const * const packet, + uint16_t pkt_type, + uint16_t pkt_l3_offset, + uint16_t pkt_l4_offset, + uint16_t pkt_l4_len, + struct nDPId_flow_extended const * const flow_ext, + enum packet_event event) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + char const ev[] = "packet_event_name"; + + if (event == PACKET_EVENT_PAYLOAD_FLOW) + { + if (flow_ext == NULL) + { + logger(1, + "[%8llu, %zu] BUG: got a PACKET_EVENT_PAYLOAD_FLOW with a flow pointer equals NULL", + reader_thread->workflow->packets_captured, + reader_thread->array_index); + return; + } + if (flow_ext->packets_processed[FD_SRC2DST] + flow_ext->packets_processed[FD_DST2SRC] > + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_send)) + { + return; + } + } + + ndpi_serialize_string_int32(&workflow->ndpi_serializer, "packet_event_id", event); + if (event > PACKET_EVENT_INVALID && event < PACKET_EVENT_COUNT) + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, packet_event_name_table[event]); + } + else + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, packet_event_name_table[PACKET_EVENT_INVALID]); + } + + jsonize_basic(reader_thread, (event == PACKET_EVENT_PAYLOAD_FLOW ? 1 : 0)); + + if (event == PACKET_EVENT_PAYLOAD_FLOW) + { + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_id", flow_ext->flow_id); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_packet_id", + flow_ext->packets_processed[FD_SRC2DST] + flow_ext->packets_processed[FD_DST2SRC]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_src_last_pkt_time", + flow_ext->flow_basic.last_pkt_time[FD_SRC2DST]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_dst_last_pkt_time", + flow_ext->flow_basic.last_pkt_time[FD_DST2SRC]); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, + "flow_idle_time", + get_l4_protocol_idle_time_external(flow_ext->flow_basic.l4_protocol)); + } + +#ifdef ENABLE_PFRING + if (GET_CMDARG_BOOL(nDPId_options.use_pfring) != 0) + { + ndpi_serialize_string_int32(&workflow->ndpi_serializer, + "pkt_datalink", + npfring_datalink(&reader_thread->workflow->npf)); + } + else +#endif + { + ndpi_serialize_string_int32(&workflow->ndpi_serializer, + "pkt_datalink", + pcap_datalink(reader_thread->workflow->pcap_handle)); + } + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_caplen", header->caplen); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_type", pkt_type); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l3_offset", pkt_l3_offset); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_offset", pkt_l4_offset); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_len", header->len); + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_len", pkt_l4_len); + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_usec", workflow->last_thread_time); + + char base64_data[nDPId_PACKETS_PLEN_MAX * 4]; + size_t base64_data_len = sizeof(base64_data); + if (base64_encode(packet, + (header->caplen > nDPId_PACKETS_PLEN_MAX ? nDPId_PACKETS_PLEN_MAX : header->caplen), + base64_data, + &base64_data_len) != 0) + { + logger(1, + "[%8llu, %zu] Base64 encoding failed.", + reader_thread->workflow->packets_captured, + reader_thread->array_index); + } + else if (base64_data_len > 0 && + ndpi_serialize_string_binary(&workflow->ndpi_serializer, "pkt", base64_data, (uint16_t)base64_data_len) != + 0) + { + logger(1, + "[%8llu, %zu] JSON serializing base64 packet buffer failed", + reader_thread->workflow->packets_captured, + reader_thread->array_index); + } + serialize_and_send(reader_thread); +} + +/* I decided against ndpi_flow2json as it does not fulfill my needs. */ +static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, + struct nDPId_flow_extended * const flow_ext, + enum flow_event event) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + char const ev[] = "flow_event_name"; + + ndpi_serialize_string_int32(&workflow->ndpi_serializer, "flow_event_id", event); + if (event > FLOW_EVENT_INVALID && event < FLOW_EVENT_COUNT) + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[event]); + } + else + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[FLOW_EVENT_INVALID]); + } + jsonize_basic(reader_thread, 1); + jsonize_flow(workflow, flow_ext); + jsonize_l3_l4(workflow, &flow_ext->flow_basic); + + switch (event) + { + case FLOW_EVENT_INVALID: + case FLOW_EVENT_COUNT: + break; + + case FLOW_EVENT_NEW: + case FLOW_EVENT_END: + case FLOW_EVENT_IDLE: + case FLOW_EVENT_UPDATE: + case FLOW_EVENT_ANALYSE: +#ifdef ENABLE_PFRING + if (GET_CMDARG_BOOL(nDPId_options.use_pfring) != 0) + { + ndpi_serialize_string_int32(&workflow->ndpi_serializer, + "flow_datalink", + npfring_datalink(&reader_thread->workflow->npf)); + } + else +#endif + { + ndpi_serialize_string_int32(&workflow->ndpi_serializer, + "flow_datalink", + pcap_datalink(reader_thread->workflow->pcap_handle)); + } + ndpi_serialize_string_uint32(&workflow->ndpi_serializer, + "flow_max_packets", + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_send)); + + if (event == FLOW_EVENT_ANALYSE) + { + jsonize_data_analysis(reader_thread, flow_ext); + } + if (flow_ext->flow_basic.state == FS_FINISHED) + { + struct nDPId_flow const * const flow = (struct nDPId_flow *)flow_ext; + + ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "ndpi"); + ndpi_serialize_proto(workflow->ndpi_struct, + &workflow->ndpi_serializer, + flow->finished.risk, + flow->finished.confidence, + flow->flow_extended.detected_l7_protocol); + if (flow->finished.hostname != NULL) + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, "hostname", flow->finished.hostname); + } + ndpi_serialize_end_of_block(&workflow->ndpi_serializer); + } + else if (flow_ext->flow_basic.state == FS_INFO) + { + struct nDPId_flow * const flow = (struct nDPId_flow *)flow_ext; + +#ifdef ENABLE_ZLIB + if (GET_CMDARG_BOOL(nDPId_options.enable_zlib_compression) != 0 && + flow->info.detection_data_compressed_size > 0) + { + workflow->current_compression_diff -= flow->info.detection_data_compressed_size; + int ret = detection_data_inflate(flow); + if (ret <= 0) + { + workflow->current_compression_diff += flow->info.detection_data_compressed_size; + logger(1, "zLib decompression failed with error code: %d", ret); + return; + } + } +#endif + + if (flow->info.detection_completed != 0) + { + ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "ndpi"); + ndpi_serialize_proto(workflow->ndpi_struct, + &workflow->ndpi_serializer, + flow->info.detection_data->flow.risk, + flow->info.detection_data->flow.confidence, + flow->flow_extended.detected_l7_protocol); + ndpi_serialize_end_of_block(&workflow->ndpi_serializer); + } + } + break; + + case FLOW_EVENT_NOT_DETECTED: + case FLOW_EVENT_GUESSED: + case FLOW_EVENT_DETECTED: + case FLOW_EVENT_DETECTION_UPDATE: + logger(1, + "[%8llu, %4llu] internal error / invalid function call", + workflow->packets_captured, + flow_ext->flow_id); + break; + } + + serialize_and_send(reader_thread); +} + +static void jsonize_flow_detection_event(struct nDPId_reader_thread * const reader_thread, + struct nDPId_flow * const flow, + enum flow_event event) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + char const ev[] = "flow_event_name"; + + ndpi_serialize_string_int32(&workflow->ndpi_serializer, "flow_event_id", event); + if (event > FLOW_EVENT_INVALID && event < FLOW_EVENT_COUNT) + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[event]); + } + else + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[FLOW_EVENT_INVALID]); + } + jsonize_basic(reader_thread, 1); + jsonize_flow(workflow, &flow->flow_extended); + jsonize_l3_l4(workflow, &flow->flow_extended.flow_basic); + + switch (event) + { + case FLOW_EVENT_INVALID: + case FLOW_EVENT_COUNT: + break; + + case FLOW_EVENT_NEW: + case FLOW_EVENT_END: + case FLOW_EVENT_IDLE: + case FLOW_EVENT_UPDATE: + case FLOW_EVENT_ANALYSE: + logger(1, + "[%8llu, %4llu] internal error / invalid function call", + workflow->packets_captured, + flow->flow_extended.flow_id); + break; + + case FLOW_EVENT_NOT_DETECTED: + case FLOW_EVENT_GUESSED: + if (ndpi_dpi2json(workflow->ndpi_struct, + &flow->info.detection_data->flow, + flow->info.detection_data->guessed_l7_protocol, + &workflow->ndpi_serializer) != 0) + { + logger(1, + "[%8llu, %4llu] ndpi_dpi2json failed for not-detected/guessed flow", + workflow->packets_captured, + flow->flow_extended.flow_id); + } + break; + + case FLOW_EVENT_DETECTED: + case FLOW_EVENT_DETECTION_UPDATE: + if (ndpi_dpi2json(workflow->ndpi_struct, + &flow->info.detection_data->flow, + flow->flow_extended.detected_l7_protocol, + &workflow->ndpi_serializer) != 0) + { + logger(1, + "[%8llu, %4llu] ndpi_dpi2json failed for detected/detection-update flow", + workflow->packets_captured, + flow->flow_extended.flow_id); + } + break; + } + + serialize_and_send(reader_thread); +} + +static void internal_format_error(ndpi_serializer * const serializer, char const * const format, uint32_t format_index) +{ + logger(1, "BUG: Internal error detected for format string `%s' at format index %u", format, format_index); + ndpi_reset_serializer(serializer); +} + +static void vjsonize_error_eventf(struct nDPId_reader_thread * const reader_thread, char const * format, va_list ap) +{ + uint8_t got_jsonkey = 0; + uint8_t is_long_long = 0; + char json_key[NETWORK_BUFFER_MAX_SIZE]; + uint32_t format_index = 0; + + while (*format) + { + if (got_jsonkey == 0) + { + json_key[0] = '\0'; + } + + switch (*format++) + { + case 's': + { + format_index++; + char * value = va_arg(ap, char *); + if (got_jsonkey == 0) + { + int s_ret = snprintf(json_key, sizeof(json_key), "%s", value); + if (s_ret < 0) + { + logger(1, + "[%8llu, %zu] Error event format failed: snprintf returned %d, buffer size %zu", + reader_thread->workflow->packets_captured, + reader_thread->array_index, + s_ret, + sizeof(json_key)); + } + got_jsonkey = 1; + } + else + { + ndpi_serialize_string_string(&reader_thread->workflow->ndpi_serializer, json_key, value); + got_jsonkey = 0; + } + break; + } + case 'f': + { + format_index++; + if (got_jsonkey == 1) + { + float value = va_arg(ap, double); + ndpi_serialize_string_float(&reader_thread->workflow->ndpi_serializer, json_key, value, "%.2f"); + got_jsonkey = 0; + } + else + { + internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index); + return; + } + break; + } + case 'z': + case 'l': + format_index++; + if (got_jsonkey != 1) + { + internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index); + return; + } + if (*format == 'l') + { + format++; + is_long_long = 1; + } + else + { + is_long_long = 0; + } + if (*format == 'd') + { + long long int value; + if (is_long_long != 0) + { + value = va_arg(ap, long long int); + } + else + { + value = va_arg(ap, long int); + } + ndpi_serialize_string_int64(&reader_thread->workflow->ndpi_serializer, json_key, value); + got_jsonkey = 0; + } + else if (*format == 'u') + { + unsigned long long int value; + if (is_long_long != 0) + { + value = va_arg(ap, unsigned long long int); + } + else + { + value = va_arg(ap, unsigned long int); + } + ndpi_serialize_string_uint64(&reader_thread->workflow->ndpi_serializer, json_key, value); + got_jsonkey = 0; + } + else + { + internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index); + return; + } + format++; + break; + case 'u': + format_index++; + if (got_jsonkey == 1) + { + unsigned int value = va_arg(ap, unsigned int); + ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer, json_key, value); + got_jsonkey = 0; + } + else + { + internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index); + return; + } + break; + case 'd': + format_index++; + if (got_jsonkey == 1) + { + int value = va_arg(ap, int); + ndpi_serialize_string_int32(&reader_thread->workflow->ndpi_serializer, json_key, value); + got_jsonkey = 0; + } + else + { + internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index); + return; + } + break; + /* format string separators */ + case ' ': + case ',': + case '%': + break; + default: + internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index); + return; + } + } +} + +__attribute__((format(printf, 3, 4))) static void jsonize_error_eventf(struct nDPId_reader_thread * const reader_thread, + enum error_event event, + char const * format, + ...) +{ + struct nDPId_workflow * const workflow = reader_thread->workflow; + va_list ap; + char const ev[] = "error_event_name"; + + ndpi_serialize_string_int32(&reader_thread->workflow->ndpi_serializer, "error_event_id", event); + if (event > ERROR_EVENT_INVALID && event < ERROR_EVENT_COUNT) + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, error_event_name_table[event]); + } + else + { + ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, error_event_name_table[ERROR_EVENT_INVALID]); + } + ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer, "threshold_n", workflow->error_count); + ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer, + "threshold_n_max", + GET_CMDARG_ULL(nDPId_options.error_event_threshold_n)); + ndpi_serialize_string_uint64(&reader_thread->workflow->ndpi_serializer, + "threshold_time", + GET_CMDARG_ULL(nDPId_options.error_event_threshold_time)); + ndpi_serialize_string_uint64(&reader_thread->workflow->ndpi_serializer, + "threshold_ts_usec", + workflow->last_error_time); + + switch (event) + { + case MAX_FLOW_TO_TRACK: + case FLOW_MEMORY_ALLOCATION_FAILED: + jsonize_basic(reader_thread, 1); + break; + default: + jsonize_basic(reader_thread, 0); + break; + } + + if (format != NULL) + { + va_start(ap, format); + vjsonize_error_eventf(reader_thread, format, ap); + va_end(ap); + } + + ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_usec", workflow->last_global_time); + serialize_and_send(reader_thread); +} + +/* See: https://en.wikipedia.org/wiki/MurmurHash#MurmurHash3 */ +static inline uint32_t murmur_32_scramble(uint32_t k) +{ + k *= 0xcc9e2d51; + k = (k << 15) | (k >> 17); + k *= 0x1b873593; + return k; +} + +/* See: https://en.wikipedia.org/wiki/MurmurHash#MurmurHash3 */ +static uint32_t murmur3_32(uint8_t const * key, size_t len, uint32_t seed) +{ + uint32_t h = seed; + uint32_t k; + /* Read in groups of 4. */ + for (size_t i = len >> 2; i; i--) + { + k = htole32(*(uint32_t *)key); + key += sizeof(uint32_t); + h ^= murmur_32_scramble(k); + h = (h << 13) | (h >> 19); + h = h * 5 + 0xe6546b64; + } + /* Read the rest. */ + k = 0; + for (size_t i = len & 3; i; i--) + { + k <<= 8; + k |= key[i - 1]; + } + // A swap is *not* necessary here because the preceding loop already + // places the low bytes in the low places according to whatever endianness + // we use. Swaps only apply when the memory is copied in a chunk. + h ^= murmur_32_scramble(k); + /* Finalize. */ + h ^= len; + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + return h; +} + +static uint32_t calculate_ndpi_flow_struct_hash(struct ndpi_flow_struct const * const ndpi_flow) +{ + /* + * This is a kludge, but necessary for now as I do not want to spam nDPIsrvd and clients + * with the same detection JSON message over and over again. + * So we are building a hash over the more "stable" parts of the ndpi flow struct. + * Stable in terms of they should only change if the detection changes for whatever reason. + * At the time of writing, nDPI has no API function to check if the detection changed + * or has some new information available. This is far from perfect. + */ + uint32_t hash = murmur3_32((uint8_t const *)&ndpi_flow->protos, sizeof(ndpi_flow->protos), nDPId_FLOW_STRUCT_SEED); + hash += ndpi_flow->category; + hash += (ndpi_flow->risk & 0xFFFFFFFF) + (ndpi_flow->risk >> 32); // nDPI Risks are u64's (might change in the + // future) + hash += ndpi_flow->confidence; + + const size_t protocol_bitmask_size = sizeof(ndpi_flow->excluded_protocol_bitmask.fds_bits) / + sizeof(ndpi_flow->excluded_protocol_bitmask.fds_bits[0]); + for (size_t i = 0; i < protocol_bitmask_size; ++i) + { + hash += ndpi_flow->excluded_protocol_bitmask.fds_bits[i]; + hash += ndpi_flow->excluded_protocol_bitmask.fds_bits[i]; + } + + size_t host_server_name_len = + strnlen((const char *)ndpi_flow->host_server_name, sizeof(ndpi_flow->host_server_name)); + hash += host_server_name_len; + hash += murmur3_32((uint8_t const *)&ndpi_flow->host_server_name, + sizeof(ndpi_flow->host_server_name), + nDPId_FLOW_STRUCT_SEED); + + return hash; +} + +/* Some constants stolen from ndpiReader. */ +#define SNAP 0xaa +/* mask for FCF */ +#define WIFI_DATA 0x2 +#define FCF_TYPE(fc) (((fc) >> 2) & 0x3) /* 0000 0011 = 0x3 */ +#define FCF_TO_DS(fc) ((fc) & 0x0100) +#define FCF_FROM_DS(fc) ((fc) & 0x0200) +/* mask for Bad FCF presence */ +#define BAD_FCS 0x50 /* 0101 0000 */ +static int process_datalink_layer(struct nDPId_reader_thread * const reader_thread, + struct pcap_pkthdr const * const header, + uint8_t const * const packet, + uint16_t * ip_offset, + uint16_t * layer3_type) +{ + const uint16_t eth_offset = 0; + int datalink_type; + const struct ndpi_ethhdr * ethernet; + +#ifdef ENABLE_PFRING + if (GET_CMDARG_BOOL(nDPId_options.use_pfring) != 0) + { + datalink_type = npfring_datalink(&reader_thread->workflow->npf); + } + else +#endif + { + datalink_type = pcap_datalink(reader_thread->workflow->pcap_handle); + } + + switch (datalink_type) + { + case DLT_NULL: + { + /* DLT header values can be stored as big or little endian. */ + + uint32_t dlt_hdr = *((uint32_t *)&packet[eth_offset]); + + if (dlt_hdr == 0x02000000 || dlt_hdr == 0x02) + { + *layer3_type = ETH_P_IP; + } + else if (dlt_hdr == 0x24000000 || dlt_hdr == 0x24 || dlt_hdr == 0x28000000 || dlt_hdr == 0x28 || + dlt_hdr == 0x30000000 || dlt_hdr == 0x30) + { + *layer3_type = ETH_P_IPV6; + } + else + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + UNKNOWN_DATALINK_LAYER, + "%s%u", + "layer_type", + ntohl(*((uint32_t *)&packet[eth_offset]))); + jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return 1; + } + *ip_offset = sizeof(dlt_hdr) + eth_offset; + break; + } + case DLT_PPP_SERIAL: + { + if (header->caplen < sizeof(struct ndpi_chdlc)) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + sizeof(struct ndpi_chdlc)); + jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return 1; + } + + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; + *ip_offset = sizeof(struct ndpi_chdlc); + *layer3_type = ntohs(chdlc->proto_code); + break; + } + case DLT_C_HDLC: + case DLT_PPP: + if (header->caplen < sizeof(struct ndpi_chdlc)) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + sizeof(struct ndpi_chdlc)); + jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return 1; + } + + if (packet[0] == 0x0f || packet[0] == 0x8f) + { + struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; + *ip_offset = sizeof(struct ndpi_chdlc); /* CHDLC_OFF = 4 */ + *layer3_type = ntohs(chdlc->proto_code); + } + else + { + *ip_offset = 2; + *layer3_type = ntohs(*((u_int16_t *)&packet[eth_offset])); + } + break; + case DLT_LINUX_SLL: + if (header->caplen < 16) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf( + reader_thread, PACKET_TOO_SHORT, "%s%u %s%u", "size", header->caplen, "expected", 16); + jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return 1; + } + + *layer3_type = (packet[eth_offset + 14] << 8) + packet[eth_offset + 15]; + *ip_offset = 16 + eth_offset; + break; + case DLT_IEEE802_11_RADIO: + { + if (header->caplen < sizeof(struct ndpi_radiotap_header)) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + sizeof(struct ndpi_radiotap_header)); + jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return 1; + } + + struct ndpi_radiotap_header const * const radiotap = + (struct ndpi_radiotap_header const * const)&packet[eth_offset]; + uint16_t radio_len = radiotap->len; + + /* Check Bad FCS presence */ + if ((radiotap->flags & BAD_FCS) == BAD_FCS) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, PACKET_HEADER_INVALID, "%s%s", "reason", "Bad FCS presence"); + jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return 1; + } + + if (header->caplen < (eth_offset + radio_len + sizeof(struct ndpi_wifi_header))) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + (eth_offset + radio_len + sizeof(struct ndpi_wifi_header))); + jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return 1; + } + + /* Calculate 802.11 header length (variable) */ + struct ndpi_wifi_header const * const wifi = + (struct ndpi_wifi_header const * const)(packet + eth_offset + radio_len); + uint16_t fc = wifi->fc; + int wifi_len = 0; + + /* check wifi data presence */ + if (FCF_TYPE(fc) == WIFI_DATA) + { + if ((FCF_TO_DS(fc) && FCF_FROM_DS(fc) == 0x0) || (FCF_TO_DS(fc) == 0x0 && FCF_FROM_DS(fc))) + { + wifi_len = 26; /* + 4 byte fcs */ + } + } + else + { + /* no data frames */ + break; + } + + /* Check ether_type from LLC */ + if (header->caplen < (eth_offset + wifi_len + radio_len + sizeof(struct ndpi_llc_header_snap))) + { + return 1; + } + + struct ndpi_llc_header_snap const * const llc = + (struct ndpi_llc_header_snap const * const)(packet + eth_offset + wifi_len + radio_len); + if (llc->dsap == SNAP) + { + *layer3_type = ntohs(llc->snap.proto_ID); + } + + /* Set IP header offset */ + *ip_offset = wifi_len + radio_len + sizeof(struct ndpi_llc_header_snap) + eth_offset; + break; + } + case DLT_RAW: + *ip_offset = 0; + if (header->caplen < 1) + { + return 1; + } + switch ((packet[0] & 0xF0) >> 4) + { + case 4: + *layer3_type = ETH_P_IP; + break; + case 6: + *layer3_type = ETH_P_IPV6; + break; + default: + return 1; + } + break; + case DLT_EN10MB: + if (header->caplen < sizeof(struct ndpi_ethhdr)) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + sizeof(struct ndpi_ethhdr)); + jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return 1; + } + + ethernet = (struct ndpi_ethhdr *)&packet[eth_offset]; + *ip_offset = sizeof(struct ndpi_ethhdr) + eth_offset; + *layer3_type = ntohs(ethernet->h_proto); + + /* Cisco FabricPath (data center ethernet devices) */ + if (*layer3_type == ETHERTYPE_DCE) + { + if (header->caplen < sizeof(struct ndpi_ethhdr) + 20 /* sizeof(Ethernet/DCE-header) */) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + sizeof(struct ndpi_ethhdr) + 2); + jsonize_packet_event( + reader_thread, header, packet, *layer3_type, *ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return 1; + } + ethernet = (struct ndpi_ethhdr *)&packet[eth_offset + 20]; + *ip_offset = sizeof(struct ndpi_ethhdr) + eth_offset; + *layer3_type = ntohs(ethernet->h_proto); + } + + /* 802.1Q VLAN */ + if (*layer3_type == ETHERTYPE_VLAN) + { + if (header->caplen < sizeof(struct ndpi_ethhdr) + 4 /* sizeof(802.1Q-header) */) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + sizeof(struct ndpi_ethhdr) + 4); + jsonize_packet_event( + reader_thread, header, packet, *layer3_type, *ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return 1; + } + *layer3_type = ntohs(*(uint16_t *)&packet[*ip_offset + 2]); + *ip_offset += 4; + } + + switch (*layer3_type) + { + case ETH_P_IP: /* IPv4 */ + if (header->caplen < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_iphdr)) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + IP4_PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_iphdr)); + jsonize_packet_event(reader_thread, + header, + packet, + *layer3_type, + *ip_offset, + 0, + 0, + NULL, + PACKET_EVENT_PAYLOAD); + } + return 1; + } + break; + case ETH_P_IPV6: /* IPV6 */ + if (header->caplen < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_ipv6hdr)) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + IP6_PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_ipv6hdr)); + jsonize_packet_event(reader_thread, + header, + packet, + *layer3_type, + *ip_offset, + 0, + 0, + NULL, + PACKET_EVENT_PAYLOAD); + } + return 1; + } + break; + case ETHERTYPE_PAE: /* 802.1X Authentication */ + return 1; + case ETHERTYPE_ARP: /* ARP */ + return 1; + default: + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, PACKET_TYPE_UNKNOWN, "%s%u", "layer_type", *layer3_type); + jsonize_packet_event( + reader_thread, header, packet, *layer3_type, *ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return 1; + } + break; + case DLT_IPV4: + *layer3_type = ETH_P_IP; + *ip_offset = 0; + break; + case DLT_IPV6: + *layer3_type = ETH_P_IPV6; + *ip_offset = 0; + break; + /* Switch tag datalinks are not supported for now. */ + case DLT_DSA_TAG_DSA: + return 1; + case DLT_DSA_TAG_EDSA: + return 1; + default: + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + UNKNOWN_DATALINK_LAYER, + "%s%u", + "layer_type", + ntohl(*((uint32_t *)&packet[eth_offset]))); + jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return 1; + } + + return 0; +} + +static struct nDPId_flow_basic * add_new_flow(struct nDPId_workflow * const workflow, + struct nDPId_flow_basic * orig_flow_basic, + enum nDPId_flow_state state, + size_t hashed_index) +{ + size_t s; + + switch (state) + { + case FS_UNKNOWN: + case FS_COUNT: + + case FS_FINISHED: // do not allocate something for FS_FINISHED as we are re-using memory allocated by FS_INFO + return NULL; + + case FS_SKIPPED: + workflow->total_skipped_flows++; + s = sizeof(struct nDPId_flow_skipped); + break; + + case FS_INFO: + s = sizeof(struct nDPId_flow); + break; + } + + struct nDPId_flow_basic * flow_basic = (struct nDPId_flow_basic *)ndpi_malloc(s); + if (flow_basic == NULL) + { + return NULL; + } + memset(flow_basic, 0, s); + *flow_basic = *orig_flow_basic; + flow_basic->state = state; + if (ndpi_tsearch(flow_basic, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp) == NULL) + { + ndpi_free(flow_basic); + return NULL; + } + + workflow->cur_active_flows++; + return flow_basic; +} + +static void do_periodically_work(struct nDPId_reader_thread * const reader_thread) +{ + if (reader_thread->workflow->last_scan_time + GET_CMDARG_ULL(nDPId_options.flow_scan_interval) <= + reader_thread->workflow->last_global_time) + { + check_for_idle_flows(reader_thread); + check_for_flow_updates(reader_thread); + reader_thread->workflow->last_scan_time = reader_thread->workflow->last_global_time; + } + if (reader_thread->workflow->last_status_time + GET_CMDARG_ULL(nDPId_options.daemon_status_interval) + + reader_thread->array_index * 1000 <= + reader_thread->workflow->last_global_time) + { + jsonize_daemon(reader_thread, DAEMON_EVENT_STATUS); + reader_thread->workflow->last_status_time = + reader_thread->workflow->last_global_time + reader_thread->array_index * 1000; + } +#ifdef ENABLE_MEMORY_PROFILING + if (reader_thread->workflow->last_memory_usage_log_time + + GET_CMDARG_ULL(nDPId_options.memory_profiling_log_interval) <= + reader_thread->workflow->last_global_time) + { + log_memory_usage(reader_thread); + reader_thread->workflow->last_memory_usage_log_time = reader_thread->workflow->last_global_time; + } +#endif +} + +static int distribute_single_packet(struct nDPId_reader_thread * const reader_thread) +{ + return (reader_thread->workflow->packets_captured % GET_CMDARG_ULL(nDPId_options.reader_thread_count) == + reader_thread->array_index); +} + +static void ndpi_process_packet(uint8_t * const args, + struct pcap_pkthdr const * const header, + uint8_t const * const packet) +{ + struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)args; + struct nDPId_workflow * workflow; + struct nDPId_flow_basic flow_basic = {}; + enum nDPId_flow_direction direction; + + size_t hashed_index; + void * tree_result; + struct nDPId_flow * flow_to_process; + + uint8_t is_new_flow = 0; + + const struct ndpi_iphdr * ip; + struct ndpi_ipv6hdr * ip6; + const struct ndpi_tcphdr * tcp = NULL; + + uint64_t time_us; + uint64_t last_pkt_time; + + uint16_t ip_offset = 0; + uint16_t ip_size; + + const uint8_t * l4_ptr = NULL; + uint16_t l4_len = 0; + uint16_t l4_payload_len = 0; + + uint16_t type = 0; + size_t thread_index = nDPId_THREAD_DISTRIBUTION_SEED; // generated with `dd if=/dev/random bs=1024 count=1 |& hd' + + if (reader_thread == NULL) + { + return; + } + workflow = reader_thread->workflow; + + if (workflow == NULL) + { + return; + } + + workflow->packets_captured++; + time_us = ndpi_timeval_to_microseconds(header->ts); + if (workflow->last_global_time < time_us) + { + workflow->last_global_time = time_us; + } + if (workflow->last_thread_time == 0) + { + workflow->last_thread_time = time_us; + } + + do_periodically_work(reader_thread); + + if (process_datalink_layer(reader_thread, header, packet, &ip_offset, &type) != 0) + { + return; + } + + if (type == ETH_P_IP) + { + ip = (struct ndpi_iphdr *)&packet[ip_offset]; + ip6 = NULL; + } + else if (type == ETH_P_IPV6) + { + ip = NULL; + ip6 = (struct ndpi_ipv6hdr *)&packet[ip_offset]; + } + else + { + if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return; + } + ip_size = header->caplen - ip_offset; + + if (type == ETH_P_IP && header->caplen >= ip_offset) + { + if (header->caplen < header->len) + { + if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + CAPTURE_SIZE_SMALLER_THAN_PACKET_SIZE, + "%s%u %s%u", + "size", + header->caplen, + "expected", + header->len); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + } + } + + /* process layer3 e.g. IPv4 / IPv6 */ + if (ip != NULL && ip->version == 4) + { + if (ip_size < sizeof(*ip)) + { + if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + IP4_SIZE_SMALLER_THAN_HEADER, + "%s%u %s%zu", + "size", + ip_size, + "expected", + sizeof(*ip)); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return; + } + + flow_basic.l3_type = L3_IP; + + if (ndpi_detection_get_l4( + (uint8_t *)ip, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0) + { + if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf( + reader_thread, IP4_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip)); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return; + } + + flow_basic.src.v4.ip = ip->saddr; + flow_basic.dst.v4.ip = ip->daddr; + uint32_t min_addr = (flow_basic.src.v4.ip > flow_basic.dst.v4.ip ? flow_basic.dst.v4.ip : flow_basic.src.v4.ip); + thread_index = min_addr + ip->protocol; + } + else if (ip6 != NULL) + { + if (ip_size < sizeof(ip6->ip6_hdr)) + { + if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + IP6_SIZE_SMALLER_THAN_HEADER, + "%s%u %s%zu", + "size", + ip_size, + "expected", + sizeof(ip6->ip6_hdr)); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return; + } + + flow_basic.l3_type = L3_IP6; + if (ndpi_detection_get_l4( + (uint8_t *)ip6, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0) + { + if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf( + reader_thread, IP6_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip)); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return; + } + + flow_basic.src.v6.ip[0] = ip6->ip6_src.u6_addr.u6_addr64[0]; + flow_basic.src.v6.ip[1] = ip6->ip6_src.u6_addr.u6_addr64[1]; + flow_basic.dst.v6.ip[0] = ip6->ip6_dst.u6_addr.u6_addr64[0]; + flow_basic.dst.v6.ip[1] = ip6->ip6_dst.u6_addr.u6_addr64[1]; + + uint64_t min_addr[2]; + if (flow_basic.src.v6.ip[0] > flow_basic.dst.v6.ip[0] || + (flow_basic.src.v6.ip[0] == flow_basic.dst.v6.ip[0] && flow_basic.src.v6.ip[1] > flow_basic.dst.v6.ip[1])) + { + min_addr[0] = flow_basic.dst.v6.ip[0]; + min_addr[1] = flow_basic.dst.v6.ip[1]; + } + else + { + min_addr[0] = flow_basic.src.v6.ip[0]; + min_addr[1] = flow_basic.src.v6.ip[1]; + } + thread_index = min_addr[0] + min_addr[1] + ip6->ip6_hdr.ip6_un1_nxt; + } + else + { + if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type); + jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); + } + return; + } + + /* process layer4 e.g. TCP / UDP */ + if (flow_basic.l4_protocol == IPPROTO_TCP) + { + if (header->caplen < (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)) + { + if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + TCP_PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)); + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); + } + return; + } + tcp = (struct ndpi_tcphdr *)l4_ptr; + l4_payload_len = ndpi_max(0, l4_len - 4 * tcp->doff); + flow_basic.tcp_fin_rst_seen = (tcp->fin == 1 || tcp->rst == 1 ? 1 : 0); + flow_basic.tcp_is_midstream_flow = (tcp->syn == 0 ? 1 : 0); + flow_basic.src_port = ntohs(tcp->source); + flow_basic.dst_port = ntohs(tcp->dest); + } + else if (flow_basic.l4_protocol == IPPROTO_UDP) + { + const struct ndpi_udphdr * udp; + + if (header->caplen < (l4_ptr - packet) + sizeof(struct ndpi_udphdr)) + { + if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + UDP_PACKET_TOO_SHORT, + "%s%u %s%zu", + "size", + header->caplen, + "expected", + (l4_ptr - packet) + sizeof(struct ndpi_udphdr)); + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); + } + return; + } + udp = (struct ndpi_udphdr *)l4_ptr; + l4_payload_len = (l4_len > sizeof(struct ndpi_udphdr)) ? l4_len - sizeof(struct ndpi_udphdr) : 0; + flow_basic.src_port = ntohs(udp->source); + flow_basic.dst_port = ntohs(udp->dest); + } + else + { + /* Use layer4 length returned from libnDPI. */ + l4_payload_len = l4_len; + } + + /* distribute flows to threads while keeping stability (same flow goes always to same thread) */ + thread_index += (flow_basic.src_port < flow_basic.dst_port ? flow_basic.dst_port : flow_basic.src_port); + thread_index %= GET_CMDARG_ULL(nDPId_options.reader_thread_count); + if (thread_index != reader_thread->array_index) + { + return; + } + + if (workflow->last_thread_time < time_us) + { + workflow->last_thread_time = time_us; + } + + /* calculate flow hash for btree find, search(insert) */ + switch (flow_basic.l3_type) + { + case L3_IP: + if (ndpi_flowv4_flow_hash(flow_basic.l4_protocol, + flow_basic.src.v4.ip, + flow_basic.dst.v4.ip, + flow_basic.src_port, + flow_basic.dst_port, + 0, + 0, + (uint8_t *)&flow_basic.hashval, + sizeof(flow_basic.hashval)) != 0) + { + flow_basic.hashval = flow_basic.src.v4.ip + flow_basic.dst.v4.ip; // fallback + } + break; + case L3_IP6: + if (ndpi_flowv6_flow_hash(flow_basic.l4_protocol, + &ip6->ip6_src, + &ip6->ip6_dst, + flow_basic.src_port, + flow_basic.dst_port, + 0, + 0, + (uint8_t *)&flow_basic.hashval, + sizeof(flow_basic.hashval)) != 0) + { + flow_basic.hashval = flow_basic.src.v6.ip[0] + flow_basic.src.v6.ip[1]; + flow_basic.hashval += flow_basic.dst.v6.ip[0] + flow_basic.dst.v6.ip[1]; + } + break; + } + flow_basic.hashval += flow_basic.l4_protocol + flow_basic.src_port + flow_basic.dst_port; + + hashed_index = flow_basic.hashval % workflow->max_active_flows; + direction = FD_SRC2DST; + tree_result = ndpi_tfind(&flow_basic, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp); + if (tree_result == NULL) + { + direction = FD_DST2SRC; + + /* flow not found in btree: switch src <-> dst and try to find it again */ + uint64_t orig_src_ip[2] = {flow_basic.src.v6.ip[0], flow_basic.src.v6.ip[1]}; + uint64_t orig_dst_ip[2] = {flow_basic.dst.v6.ip[0], flow_basic.dst.v6.ip[1]}; + uint16_t orig_src_port = flow_basic.src_port; + uint16_t orig_dst_port = flow_basic.dst_port; + + flow_basic.src.v6.ip[0] = orig_dst_ip[0]; + flow_basic.src.v6.ip[1] = orig_dst_ip[1]; + flow_basic.dst.v6.ip[0] = orig_src_ip[0]; + flow_basic.dst.v6.ip[1] = orig_src_ip[1]; + flow_basic.src_port = orig_dst_port; + flow_basic.dst_port = orig_src_port; + + tree_result = ndpi_tfind(&flow_basic, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp); + + flow_basic.src.v6.ip[0] = orig_src_ip[0]; + flow_basic.src.v6.ip[1] = orig_src_ip[1]; + flow_basic.dst.v6.ip[0] = orig_dst_ip[0]; + flow_basic.dst.v6.ip[1] = orig_dst_ip[1]; + flow_basic.src_port = orig_src_port; + flow_basic.dst_port = orig_dst_port; + } + + if (tree_result == NULL) + { + /* flow still not found, must be new or midstream */ + direction = FD_SRC2DST; + + union nDPId_ip const * netmask = NULL; + union nDPId_ip const * subnet = NULL; + switch (flow_basic.l3_type) + { + case L3_IP: + netmask = &nDPId_options.pcap_dev_netmask4; + subnet = &nDPId_options.pcap_dev_subnet4; + break; + case L3_IP6: + netmask = &nDPId_options.pcap_dev_netmask6; + subnet = &nDPId_options.pcap_dev_subnet6; + break; + } + if (GET_CMDARG_BOOL(nDPId_options.process_internal_initial_direction) != 0 && + flow_basic.tcp_is_midstream_flow == 0) + { + if (is_ip_in_subnet(&flow_basic.src, netmask, subnet, flow_basic.l3_type) == 0) + { + if (add_new_flow(workflow, &flow_basic, FS_SKIPPED, hashed_index) == NULL && + is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + FLOW_MEMORY_ALLOCATION_FAILED, + "%s%zu", + "size", + sizeof(struct nDPId_flow_skipped)); + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); + } + return; + } + } + else if (GET_CMDARG_BOOL(nDPId_options.process_external_initial_direction) != 0 && + flow_basic.tcp_is_midstream_flow == 0) + { + if (is_ip_in_subnet(&flow_basic.src, netmask, subnet, flow_basic.l3_type) != 0) + { + if (add_new_flow(workflow, &flow_basic, FS_SKIPPED, hashed_index) == NULL && + is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + FLOW_MEMORY_ALLOCATION_FAILED, + "%s%zu", + "size", + sizeof(struct nDPId_flow_skipped)); + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); + } + return; + } + } + + if (workflow->cur_active_flows == workflow->max_active_flows) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf(reader_thread, + MAX_FLOW_TO_TRACK, + "%s%llu %s%llu %s%llu %s%llu", + "current_active", + workflow->cur_active_flows, + "current_idle", + workflow->cur_idle_flows, + "max_active", + workflow->max_active_flows, + "max_idle", + workflow->max_idle_flows); + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); + } + return; + } + + flow_to_process = (struct nDPId_flow *)add_new_flow(workflow, &flow_basic, FS_INFO, hashed_index); + if (flow_to_process == NULL) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf( + reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", "size", sizeof(*flow_to_process)); + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); + } + return; + } + + workflow->total_active_flows++; + flow_to_process->flow_extended.flow_id = MT_GET_AND_ADD(global_flow_id, 1); + + if (alloc_detection_data(flow_to_process) != 0) + { + if (is_error_event_threshold(reader_thread->workflow) == 0) + { + jsonize_error_eventf( + reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", "size", sizeof(*flow_to_process)); + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + NULL, + PACKET_EVENT_PAYLOAD); + } + return; + } + + is_new_flow = 1; + } + else + { + /* flow already exists in the tree */ + + struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result; + /* Update last seen timestamp for timeout handling. */ + last_pkt_time = flow_basic_to_process->last_pkt_time[direction]; + flow_basic_to_process->last_pkt_time[direction] = workflow->last_thread_time; + /* TCP-FIN/TCP-RST: indicates that at least one side wants to end the connection. */ + if (flow_basic.tcp_fin_rst_seen != 0) + { + flow_basic_to_process->tcp_fin_rst_seen = 1; + } + + switch (flow_basic_to_process->state) + { + case FS_UNKNOWN: + case FS_COUNT: + + case FS_SKIPPED: + return; + + case FS_FINISHED: + case FS_INFO: + break; + } + flow_to_process = (struct nDPId_flow *)flow_basic_to_process; + + if (flow_to_process->flow_extended.flow_basic.state == FS_INFO) + { +#ifdef ENABLE_ZLIB + if (GET_CMDARG_BOOL(nDPId_options.enable_zlib_compression) != 0 && + flow_to_process->info.detection_data_compressed_size > 0) + { + workflow->current_compression_diff -= flow_to_process->info.detection_data_compressed_size; + int ret = detection_data_inflate(flow_to_process); + if (ret <= 0) + { + workflow->current_compression_diff += flow_to_process->info.detection_data_compressed_size; + logger(1, + "zLib decompression failed for existing flow %llu with error code: %d", + flow_to_process->flow_extended.flow_id, + ret); + return; + } + } +#endif + } + } + + flow_to_process->flow_extended.packets_processed[direction]++; + flow_to_process->flow_extended.total_l4_payload_len[direction] += l4_payload_len; + workflow->packets_processed++; + workflow->total_l4_payload_len += l4_payload_len; + + if (l4_payload_len > flow_to_process->flow_extended.max_l4_payload_len[direction]) + { + flow_to_process->flow_extended.max_l4_payload_len[direction] = l4_payload_len; + } + if (l4_payload_len < flow_to_process->flow_extended.min_l4_payload_len[direction]) + { + flow_to_process->flow_extended.min_l4_payload_len[direction] = l4_payload_len; + } + + if (is_new_flow != 0) + { + last_pkt_time = flow_to_process->flow_extended.first_seen = + flow_to_process->flow_extended.flow_basic.last_pkt_time[direction] = + flow_to_process->flow_extended.flow_basic.last_pkt_time[1 - direction] = + flow_to_process->flow_extended.last_flow_update = workflow->last_thread_time; + flow_to_process->flow_extended.max_l4_payload_len[direction] = l4_payload_len; + flow_to_process->flow_extended.min_l4_payload_len[direction] = l4_payload_len; + jsonize_flow_event(reader_thread, &flow_to_process->flow_extended, FLOW_EVENT_NEW); + } + + if (GET_CMDARG_BOOL(nDPId_options.enable_data_analysis) != 0 && + flow_to_process->flow_extended.flow_analysis != NULL && + flow_to_process->flow_extended.packets_processed[FD_SRC2DST] + + flow_to_process->flow_extended.packets_processed[FD_DST2SRC] <= + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_analyse)) + { + unsigned long long int total_flow_packets = flow_to_process->flow_extended.packets_processed[FD_SRC2DST] + + flow_to_process->flow_extended.packets_processed[FD_DST2SRC]; + uint64_t tdiff_us = timer_sub(workflow->last_thread_time, last_pkt_time); + + if (total_flow_packets > 1) + { + ndpi_data_add_value(&flow_to_process->flow_extended.flow_analysis->iat, tdiff_us); + } + ndpi_data_add_value(&flow_to_process->flow_extended.flow_analysis->pktlen, ip_size); + flow_to_process->flow_extended.flow_analysis + ->directions[(total_flow_packets - 1) % GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_analyse)] = + direction; + ndpi_inc_bin(&flow_to_process->flow_extended.flow_analysis->payload_len_bin[direction], + plen2slot(l4_payload_len), + 1); + flow_to_process->flow_extended.flow_analysis + ->entropies[(total_flow_packets - 1) % GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_analyse)] = + ndpi_entropy((ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6), ip_size); + + if (total_flow_packets == GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_analyse)) + { + jsonize_flow_event(reader_thread, &flow_to_process->flow_extended, FLOW_EVENT_ANALYSE); + free_analysis_data(&flow_to_process->flow_extended); + } + } + + jsonize_packet_event(reader_thread, + header, + packet, + type, + ip_offset, + (l4_ptr - packet), + l4_len, + &flow_to_process->flow_extended, + PACKET_EVENT_PAYLOAD_FLOW); + + if (flow_to_process->flow_extended.flow_basic.state != FS_INFO || flow_to_process->info.detection_data == NULL) + { + /* Only FS_INFO goes through the whole detection process. */ + return; + } + + flow_to_process->flow_extended.detected_l7_protocol = + ndpi_detection_process_packet(workflow->ndpi_struct, + &flow_to_process->info.detection_data->flow, + ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6, + ip_size, + workflow->last_thread_time / 1000, + NULL); + + if (ndpi_is_protocol_detected(flow_to_process->flow_extended.detected_l7_protocol) != 0 && + flow_to_process->info.detection_completed == 0) + { + flow_to_process->info.detection_completed = 1; + workflow->total_detected_flows++; + /* + * The following needs to be done, because a successful classification may happen after the first packet. + * If there is no further extra dissection possible for this protocol, we may be saving an invalid risk. + */ + if (flow_to_process->flow_extended.packets_processed[FD_SRC2DST] + + flow_to_process->flow_extended.packets_processed[FD_DST2SRC] == + 1) + { + ndpi_unset_risk(&flow_to_process->info.detection_data->flow, NDPI_UNIDIRECTIONAL_TRAFFIC); + } + jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED); + flow_to_process->info.detection_data->last_ndpi_flow_struct_hash = + calculate_ndpi_flow_struct_hash(&flow_to_process->info.detection_data->flow); + } + else if (flow_to_process->info.detection_completed == 1) + { + uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->info.detection_data->flow); + if (hash != flow_to_process->info.detection_data->last_ndpi_flow_struct_hash) + { + workflow->total_flow_detection_updates++; + jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE); + flow_to_process->info.detection_data->last_ndpi_flow_struct_hash = hash; + } + } + + if (flow_to_process->info.detection_data->flow.num_processed_pkts == + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_process) && + flow_to_process->info.detection_completed == 0) + { + /* last chance to guess something, better then nothing */ + uint8_t protocol_was_guessed = 0; + flow_to_process->info.detection_data->guessed_l7_protocol = + ndpi_detection_giveup(workflow->ndpi_struct, + &flow_to_process->info.detection_data->flow, + &protocol_was_guessed); + if (protocol_was_guessed != 0) + { + workflow->total_guessed_flows++; + jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED); + } + else + { + reader_thread->workflow->total_not_detected_flows++; + jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_NOT_DETECTED); + } + } + + if (flow_to_process->info.detection_data->flow.num_processed_pkts == + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_process) || + (ndpi_is_protocol_detected(flow_to_process->flow_extended.detected_l7_protocol) != 0 && + ndpi_extra_dissection_possible(workflow->ndpi_struct, &flow_to_process->info.detection_data->flow) == 0)) + { + struct ndpi_proto detected_l7_protocol = flow_to_process->flow_extended.detected_l7_protocol; + if (ndpi_is_protocol_detected(detected_l7_protocol) == 0) + { + detected_l7_protocol = flow_to_process->info.detection_data->guessed_l7_protocol; + } + + ndpi_risk risk = flow_to_process->info.detection_data->flow.risk; + ndpi_confidence_t confidence = flow_to_process->info.detection_data->flow.confidence; + char * hostname = NULL; + if (flow_to_process->info.detection_data->flow.host_server_name[0] != '\0') + { +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L + _Static_assert(sizeof(flow_to_process->info.detection_data->flow.host_server_name) == 80, + "Size of nDPI flow host server name changed. Please review manually."); +#endif + hostname = strndup(&flow_to_process->info.detection_data->flow.host_server_name[0], + sizeof(flow_to_process->info.detection_data->flow.host_server_name)); + } + + free_detection_data(flow_to_process); + + flow_to_process->flow_extended.flow_basic.state = FS_FINISHED; + struct nDPId_flow * const flow = flow_to_process; + flow->flow_extended.detected_l7_protocol = detected_l7_protocol; + flow->finished.risk = risk; + flow->finished.confidence = confidence; + flow->finished.hostname = hostname; + } + +#ifdef ENABLE_ZLIB + if (GET_CMDARG_BOOL(nDPId_options.enable_zlib_compression) != 0) + { + check_for_compressable_flows(reader_thread); + } +#endif +} + +static void get_current_time(struct timeval * const tval) +{ + gettimeofday(tval, NULL); +} + +#if !defined(__FreeBSD__) && !defined(__APPLE__) +static void ndpi_log_flow_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) +{ + struct nDPId_reader_thread const * const reader_thread = (struct nDPId_reader_thread *)user_data; + struct nDPId_flow_basic const * const flow_basic = *(struct nDPId_flow_basic **)A; + + (void)depth; + (void)user_data; + + if (flow_basic == NULL) + { + return; + } + + if (which == ndpi_preorder || which == ndpi_leaf) + { + switch (flow_basic->state) + { + case FS_UNKNOWN: + break; + + case FS_COUNT: + break; + + case FS_SKIPPED: + break; + + case FS_FINISHED: + { + struct nDPId_flow const * const flow = (struct nDPId_flow *)flow_basic; + + uint64_t last_seen = get_last_pkt_time(flow_basic); + uint64_t idle_time = get_l4_protocol_idle_time_external(flow->flow_extended.flow_basic.l4_protocol); + logger(0, + "[%2zu][%4llu][last-seen: %13llu][last-update: %13llu][idle-time: %7llu][time-until-timeout: " + "%7llu]", + reader_thread->array_index, + flow->flow_extended.flow_id, + (unsigned long long int)last_seen, + (unsigned long long int)flow->flow_extended.last_flow_update, + (unsigned long long int)idle_time, + (unsigned long long int)(last_seen + idle_time >= reader_thread->workflow->last_thread_time + ? last_seen + idle_time - reader_thread->workflow->last_thread_time + : 0)); + break; + } + + case FS_INFO: + { + struct nDPId_flow const * const flow = (struct nDPId_flow *)flow_basic; + + uint64_t last_seen = get_last_pkt_time(flow_basic); + uint64_t idle_time = get_l4_protocol_idle_time_external(flow->flow_extended.flow_basic.l4_protocol); + logger(0, + "[%2zu][%4llu][last-seen: %13llu][last-update: %13llu][idle-time: %7llu][time-until-timeout: " + "%7llu]", + reader_thread->array_index, + flow->flow_extended.flow_id, + (unsigned long long int)last_seen, + (unsigned long long int)flow->flow_extended.last_flow_update, + (unsigned long long int)idle_time, + (unsigned long long int)(last_seen + idle_time >= reader_thread->workflow->last_thread_time + ? last_seen + idle_time - reader_thread->workflow->last_thread_time + : 0)); + break; + } + } + } +} + +static void log_all_flows(struct nDPId_reader_thread const * const reader_thread) +{ + struct nDPId_workflow const * const workflow = reader_thread->workflow; + + logger(0, + "[%2zu][last-global-time: %13llu][last-thread-time: %13llu][last-scan-time: %13llu]", + reader_thread->array_index, + (unsigned long long int)workflow->last_global_time, + (unsigned long long int)workflow->last_thread_time, + (unsigned long long int)workflow->last_scan_time); + for (size_t scan_index = 0; scan_index < workflow->max_active_flows; ++scan_index) + { + ndpi_twalk(workflow->ndpi_flows_active[scan_index], ndpi_log_flow_walker, (void *)reader_thread); + } +} +#endif + +static void run_capture_loop(struct nDPId_reader_thread * const reader_thread) +{ + if (reader_thread->workflow == NULL || (reader_thread->workflow->pcap_handle == NULL +#ifdef ENABLE_PFRING + && reader_thread->workflow->npf.pfring_desc == NULL +#endif + )) + { + return; + } + + if (reader_thread->workflow->is_pcap_file != 0) + { + switch (pcap_loop(reader_thread->workflow->pcap_handle, -1, &ndpi_process_packet, (uint8_t *)reader_thread)) + { + case PCAP_ERROR: + logger(1, "Error while reading pcap file: '%s'", pcap_geterr(reader_thread->workflow->pcap_handle)); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + return; + case PCAP_ERROR_BREAK: + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + return; + default: + return; + } + } + else + { +#if !defined(__FreeBSD__) && !defined(__APPLE__) + sigset_t thread_signal_set, old_signal_set; + sigfillset(&thread_signal_set); + if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0) + { + logger(1, "pthread_sigmask: %s", strerror(errno)); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + return; + } + + sigaddset(&thread_signal_set, SIGINT); + sigaddset(&thread_signal_set, SIGTERM); + sigaddset(&thread_signal_set, SIGUSR1); + int signal_fd = signalfd(-1, &thread_signal_set, SFD_NONBLOCK); + if (signal_fd < 0 || set_fd_cloexec(signal_fd) < 0) + { + logger(1, "signalfd: %s", strerror(errno)); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + return; + } +#endif + + int capture_fd = -1; +#ifdef ENABLE_PFRING + if (GET_CMDARG_BOOL(nDPId_options.use_pfring) != 0) + { + capture_fd = npfring_get_selectable_fd(&reader_thread->workflow->npf); + } + else +#endif + { + capture_fd = pcap_get_selectable_fd(reader_thread->workflow->pcap_handle); + } + if (capture_fd < 0) + { + logger(1, + "Got an invalid %s fd", + ( +#ifdef ENABLE_PFRING + GET_CMDARG_BOOL(nDPId_options.use_pfring) != 0 ? "PF_RING" : +#endif + "PCAP")); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + return; + } + + struct nio io; + nio_init(&io); +#ifdef ENABLE_EPOLL + if ((GET_CMDARG_BOOL(nDPId_options.use_poll) == 0 && nio_use_epoll(&io, 32) != NIO_SUCCESS) || + (GET_CMDARG_BOOL(nDPId_options.use_poll) != 0 && + nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)) +#else + if (nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS) +#endif + { + logger(1, "%s", "Event I/O poll/epoll setup failed"); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + nio_free(&io); + return; + } + + errno = 0; + if (nio_add_fd(&io, capture_fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS) + { + logger(1, "Could not add pcap fd to event queue: %s", (errno != 0 ? strerror(errno) : "Internal Error")); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + nio_free(&io); + return; + } +#if !defined(__FreeBSD__) && !defined(__APPLE__) + errno = 0; + if (nio_add_fd(&io, signal_fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS) + { + logger(1, "Could not add signal fd to event queue: %s", (errno != 0 ? strerror(errno) : "Internal Error")); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + nio_free(&io); + return; + } +#endif + + int const timeout_ms = 1000; /* TODO: Configurable? */ + struct timeval tval_before_epoll, tval_after_epoll; + while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) + { + get_current_time(&tval_before_epoll); + errno = 0; + if (nio_run(&io, timeout_ms) != NIO_SUCCESS) + { + logger(1, "Event I/O returned error: %s", strerror(errno)); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + break; + } + + int nready = nio_get_nready(&io); + + if (nready == 0) + { + struct timeval tval_diff; + get_current_time(&tval_after_epoll); + ndpi_timer_sub(&tval_after_epoll, &tval_before_epoll, &tval_diff); + uint64_t tdiff_us = tval_diff.tv_sec * 1000 * 1000 + tval_diff.tv_usec; + + reader_thread->workflow->last_global_time += tdiff_us; + reader_thread->workflow->last_thread_time += tdiff_us; + + do_periodically_work(reader_thread); + } + + for (int i = 0; i < nready; ++i) + { + if (nio_has_error(&io, i) == NIO_SUCCESS) + { + logger(1, "%s", "Event I/O error"); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + } + + int fd = nio_get_fd(&io, i); + +#if !defined(__FreeBSD__) && !defined(__APPLE__) + if (fd == signal_fd) + { + struct signalfd_siginfo fdsi; + if (read(signal_fd, &fdsi, sizeof(fdsi)) != sizeof(fdsi)) + { + if (errno != EAGAIN) + { + logger(1, "Could not read signal data from fd %d: %s", signal_fd, strerror(errno)); + } + } + else + { + int is_valid_signal = 0; + char const * signame = "unknown"; + switch (fdsi.ssi_signo) + { + case SIGINT: + is_valid_signal = 1; + signame = "SIGINT"; + sighandler(SIGINT); + break; + case SIGTERM: + is_valid_signal = 1; + signame = "SIGTERM"; + sighandler(SIGTERM); + break; + case SIGUSR1: + is_valid_signal = 1; + signame = "SIGUSR1"; + log_all_flows(reader_thread); + break; + } + if (is_valid_signal != 0) + { + logger(1, "Received signal %d (%s)", fdsi.ssi_signo, signame); + } + else + { + logger(1, "Received signal %d (%s), ignored", fdsi.ssi_signo, signame); + } + } + } + else +#endif + if (fd == capture_fd) + { +#ifdef ENABLE_PFRING + if (GET_CMDARG_BOOL(nDPId_options.use_pfring) != 0) + { + struct pcap_pkthdr hdr; + + int rc = npfring_recv(&reader_thread->workflow->npf, &hdr); + if (rc == 0) + { + logger(1, "Error while reading packets from PF_RING: %d", rc); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + nio_free(&io); + return; + } + + ndpi_process_packet((uint8_t *)reader_thread, + &hdr, + &reader_thread->workflow->npf.pfring_buffer[0]); + } + else +#endif + { + switch (pcap_dispatch( + reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread)) + { + case PCAP_ERROR: + logger(1, + "Error while reading from pcap device: '%s'", + pcap_geterr(reader_thread->workflow->pcap_handle)); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + break; + case PCAP_ERROR_BREAK: + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + nio_free(&io); + return; + default: + break; + } + } + } + else + { + logger(1, "Unknown event descriptor or data returned: %p", nio_get_ptr(&io, i)); + } + } + } + + nio_free(&io); + } +} + +static void break_pcap_loop(struct nDPId_reader_thread * const reader_thread) +{ + if (reader_thread->workflow != NULL && reader_thread->workflow->pcap_handle != NULL) + { + pcap_breakloop(reader_thread->workflow->pcap_handle); + } +} + +static void * processing_thread(void * const ndpi_thread_arg) +{ + struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)ndpi_thread_arg; + + reader_thread->collector_sockfd = -1; + + if (connect_to_collector(reader_thread) != 0) + { + logger(1, + "Thread %zu: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s", + reader_thread->array_index, + GET_CMDARG_STR(nDPId_options.collector_address), + (reader_thread->collector_sock_last_errno != 0 ? strerror(reader_thread->collector_sock_last_errno) + : "Internal Error.")); + } + else + { + jsonize_daemon(reader_thread, DAEMON_EVENT_INIT); + } + + run_capture_loop(reader_thread); + set_collector_block(reader_thread); + MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); + return NULL; +} + +static WARN_UNUSED int processing_threads_error_or_eof(void) +{ + for (unsigned long long int i = 0; i < GET_CMDARG_ULL(nDPId_options.reader_thread_count); ++i) + { + if (MT_GET_AND_ADD(reader_threads[i].workflow->error_or_eof, 0) == 0) + { + return 0; + } + } + return 1; +} + +static int start_reader_threads(void) +{ + sigset_t thread_signal_set, old_signal_set; + + sigfillset(&thread_signal_set); + sigdelset(&thread_signal_set, SIGINT); + sigdelset(&thread_signal_set, SIGTERM); + if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0) + { + logger_early(1, "pthread_sigmask: %s", strerror(errno)); + return 1; + } + + if (daemonize_with_pidfile(GET_CMDARG_STR(nDPId_options.pidfile)) != 0) + { + return 1; + } + + int ret = change_user_group(GET_CMDARG_STR(nDPId_options.user), + GET_CMDARG_STR(nDPId_options.group), + GET_CMDARG_STR(nDPId_options.pidfile)); + if (ret != 0 && ret != -EPERM) + { + if (GET_CMDARG_STR(nDPId_options.group) != NULL) + { + logger(1, + "Change user/group to %s/%s failed: %s", + GET_CMDARG_STR(nDPId_options.user), + GET_CMDARG_STR(nDPId_options.group), + strerror(-ret)); + } + else + { + logger(1, "Change user to %s failed: %s", GET_CMDARG_STR(nDPId_options.user), strerror(-ret)); + } + return 1; + } + + for (unsigned long long int i = 0; i < GET_CMDARG_ULL(nDPId_options.reader_thread_count); ++i) + { + reader_threads[i].array_index = i; + + if (reader_threads[i].workflow == NULL) + { + /* no more threads should be started */ + break; + } + + if (pthread_create(&reader_threads[i].thread, NULL, processing_thread, &reader_threads[i]) != 0) + { + logger(1, "pthread_create: %s", strerror(errno)); + return 1; + } + } + + if (pthread_sigmask(SIG_BLOCK, &old_signal_set, NULL) != 0) + { + logger(1, "pthread_sigmask: %s", strerror(errno)); + return 1; + } + + return 0; +} + +static void ndpi_shutdown_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) +{ + struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data; + struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; + + (void)depth; + + if (workflow == NULL || flow_basic == NULL) + { + return; + } + + if (workflow->cur_idle_flows == GET_CMDARG_ULL(nDPId_options.max_idle_flows_per_thread)) + { + return; + } + + if (which == ndpi_preorder || which == ndpi_leaf) + { + workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow_basic; + switch (flow_basic->state) + { + case FS_UNKNOWN: + case FS_COUNT: + + case FS_SKIPPED: + break; + case FS_INFO: + case FS_FINISHED: + workflow->total_idle_flows++; + break; + } + } +} + +static void process_remaining_flows(void) +{ + for (unsigned long long int i = 0; i < GET_CMDARG_ULL(nDPId_options.reader_thread_count); ++i) + { + set_collector_block(&reader_threads[i]); + + for (size_t idle_scan_index = 0; idle_scan_index < reader_threads[i].workflow->max_active_flows; + ++idle_scan_index) + { + ndpi_twalk(reader_threads[i].workflow->ndpi_flows_active[idle_scan_index], + ndpi_shutdown_walker, + reader_threads[i].workflow); + process_idle_flow(&reader_threads[i], idle_scan_index); + } + + jsonize_daemon(&reader_threads[i], DAEMON_EVENT_SHUTDOWN); + } +} + +static int stop_reader_threads(void) +{ + unsigned long long int total_packets_processed = 0; + unsigned long long int total_l4_payload_len = 0; + unsigned long long int total_flows_skipped = 0; + unsigned long long int total_flows_captured = 0; + unsigned long long int total_flows_idle = 0; + unsigned long long int total_not_detected = 0; + unsigned long long int total_flows_guessed = 0; + unsigned long long int total_flows_detected = 0; + unsigned long long int total_flow_detection_updates = 0; + unsigned long long int total_flow_updates = 0; + + for (unsigned long long int i = 0; i < GET_CMDARG_ULL(nDPId_options.reader_thread_count); ++i) + { + break_pcap_loop(&reader_threads[i]); + } + + printf("------------------------------------ Stopping reader threads\n"); + for (unsigned long long int i = 0; i < GET_CMDARG_ULL(nDPId_options.reader_thread_count); ++i) + { + if (reader_threads[i].workflow == NULL) + { + continue; + } + + if (pthread_join(reader_threads[i].thread, NULL) != 0) + { + logger(1, "pthread_join: %s", strerror(errno)); + } + } + + printf("------------------------------------ Processing remaining flows\n"); + process_remaining_flows(); + + printf("------------------------------------ Results\n"); + for (unsigned long long int i = 0; i < GET_CMDARG_ULL(nDPId_options.reader_thread_count); ++i) + { + if (reader_threads[i].workflow == NULL) + { + continue; + } + + total_packets_processed += reader_threads[i].workflow->packets_processed; + total_l4_payload_len += reader_threads[i].workflow->total_l4_payload_len; + total_flows_skipped += reader_threads[i].workflow->total_skipped_flows; + total_flows_captured += reader_threads[i].workflow->total_active_flows; + total_flows_idle += reader_threads[i].workflow->total_idle_flows; + total_not_detected += reader_threads[i].workflow->total_not_detected_flows; + total_flows_guessed += reader_threads[i].workflow->total_guessed_flows; + total_flows_detected += reader_threads[i].workflow->total_detected_flows; + total_flow_detection_updates += reader_threads[i].workflow->total_flow_detection_updates; + total_flow_updates += reader_threads[i].workflow->total_flow_updates; + + printf( + "Stopping Thread %2zu, processed %llu packets, %llu bytes\n" + "\tskipped flows.....: %8llu, processed flows: %8llu, idle flows....: %8llu\n" + "\tnot detected flows: %8llu, guessed flows..: %8llu, detected flows: %8llu\n" + "\tdetection updates.: %8llu, updated flows..: %8llu\n", + reader_threads[i].array_index, + reader_threads[i].workflow->packets_processed, + reader_threads[i].workflow->total_l4_payload_len, + reader_threads[i].workflow->total_skipped_flows, + reader_threads[i].workflow->total_active_flows, + reader_threads[i].workflow->total_idle_flows, + reader_threads[i].workflow->total_not_detected_flows, + reader_threads[i].workflow->total_guessed_flows, + reader_threads[i].workflow->total_detected_flows, + reader_threads[i].workflow->total_flow_detection_updates, + reader_threads[i].workflow->total_flow_updates); + } + /* total packets captured: same value for all threads as packet2thread distribution happens later */ + printf("Total packets captured.......: %llu\n", + (reader_threads[0].workflow != NULL ? reader_threads[0].workflow->packets_captured : 0)); + printf("Total packets processed......: %llu\n", total_packets_processed); + printf("Total layer4 payload size....: %llu\n", total_l4_payload_len); + printf("Total flows ignopred.........: %llu\n", total_flows_skipped); + printf("Total flows processed........: %llu\n", total_flows_captured); + printf("Total flows timed out........: %llu\n", total_flows_idle); + printf("Total flows detected.........: %llu\n", total_flows_detected); + printf("Total flows guessed..........: %llu\n", total_flows_guessed); + printf("Total flows not detected.....: %llu\n", total_not_detected); + printf("Total flow updates...........: %llu\n", total_flow_updates); + printf("Total flow detections updates: %llu\n", total_flow_detection_updates); + + return 0; +} + +static void free_reader_threads(void) +{ + for (unsigned long long int i = 0; i < GET_CMDARG_ULL(nDPId_options.reader_thread_count); ++i) + { + if (reader_threads[i].workflow == NULL) + { + continue; + } + + free_workflow(&reader_threads[i].workflow); + } +} + +static void sighandler(int signum) +{ + (void)signum; + + if (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0) + { + MT_GET_AND_ADD(nDPId_main_thread_shutdown, 1); + } +} + +static void print_subopt_usage(void) +{ + fprintf(stderr, "\tsubopts:\n"); + for (size_t i = 0; i < nDPIsrvd_ARRAY_LENGTH(tuning_config_map); ++i) + { + fprintf(stderr, "\t\t%s = %llu\n", tuning_config_map[i].key, tuning_config_map[i].opt->ull.default_value); + } +} + +static void print_usage(char const * const arg0) +{ + static char const usage[] = + "Usage: %s " + "[-f config-file]\n" + "[-i pcap-file/interface] [-I] [-E] [-B bpf-filter]\n" + "\t \t" + "[-l] [-L logfile] [-c address] [-e]" + "[-d] [-p pidfile]\n" + "\t \t" + "[-u user] [-g group] " + "[-P path] [-C path] [-J path]\n" + "\t \t" + "[-a instance-alias] [-A]\n" + "\t \t" + "[-o subopt=value]\n" + "\t \t" + "[-v] [-h]\n\n" + "\t-f\tLoad nDPId/libnDPI options from a configuration file.\n" + "\t-i\tInterface or file from where to read packets from.\n" +#ifdef ENABLE_PFRING + "\t-r\tUse PFRING to capture packets instead of libpcap.\n" +#endif + "\t-I\tProcess only packets where the source address of the first packet\n" + "\t \tis part of the interface subnet. (Internal mode)\n" + "\t-E\tProcess only packets where the source address of the first packet\n" + "\t \tis *NOT* part of the interface subnet. (External mode)\n" + "\t-B\tSet an optional PCAP filter string. (BPF format)\n" + "\t-l\tLog all messages to stderr.\n" + "\t-L\tLog all messages to a log file.\n" + "\t-c\tPath to a UNIX socket (nDPIsrvd Collector) or a custom UDP endpoint.\n" + "\t \tDefault: %s\n" + "\t-e\tUse poll() instead of epoll().\n" + "\t \tDefault: epoll() on Linux, poll() otherwise\n" + "\t-d\tFork into background after initialization.\n" + "\t-p\tWrite the daemon PID to the given file path.\n" + "\t \tDefault: %s\n" + "\t-u\tChange UID to the numeric value of user.\n" + "\t \tDefault: %s\n" + "\t-g\tChange GID to the numeric value of group.\n" + "\t-R\tLoad a nDPI custom risk domain file.\n" + "\t-P\tLoad a nDPI custom protocols file.\n" + "\t-C\tLoad a nDPI custom categories file.\n" + "\t-J\tLoad a nDPI JA3 hash blacklist file.\n" + "\t \tSee: https://sslbl.abuse.ch/blacklist/ja3_fingerprints.csv\n" + "\t-S\tLoad a nDPI SSL SHA1 hash blacklist file.\n" + "\t \tSee: https://sslbl.abuse.ch/blacklist/sslblacklist.csv\n" + "\t-a\tSet an alias name of this daemon instance which will\n" + "\t \tbe part of every JSON message.\n" + "\t \tThis value is required for correct flow handling of\n" + "\t \tmultiple instances and should be unique.\n" + "\t \tDefaults to your hostname.\n" + "\t-A\tEnable flow analysis aka feature extraction. Requires more memory and cpu usage.\n" + "\t \tExperimental, do not rely on those values.\n" +#ifdef ENABLE_ZLIB + "\t-z\tEnable flow memory zLib compression.\n" +#endif + "\t-o\t(Carefully) Tune some daemon options. See subopts below.\n" + "\t-v\tversion\n" + "\t-h\tthis\n\n"; + fprintf(stderr, + usage, + arg0, + nDPId_options.collector_address.string.default_value, + nDPId_options.pidfile.string.default_value, + nDPId_options.user.string.default_value); +} + +static void nDPId_print_deps_version(FILE * const out) +{ + fprintf(out, + "-------------------------------------------------------\n" +#ifdef LIBNDPI_STATIC + "nDPI version...: %s (statically linked)\n" +#else + "nDPI version...: %s\n" +#endif + " API version...: %u\n" + "pcap version...: %s\n", + ndpi_revision(), + ndpi_get_api_version(), + pcap_lib_version() + strlen("libpcap version ")); + if (ndpi_get_gcrypt_version() != NULL) + { + fprintf(out, "gcrypt version.: %s\n", ndpi_get_gcrypt_version()); + } +#ifdef ENABLE_PFRING + npfring_print_version(out); +#endif + fprintf(out, "%s", "-------------------------------------------------------\n"); +} + +static int nDPId_parse_options(int argc, char ** argv) +{ + int opt; + + while ((opt = getopt(argc, argv, "f:i:rIEB:lL:c:edp:u:g:R:P:C:J:S:a:Azo:vh")) != -1) + { + switch (opt) + { + case 'f': + set_cmdarg_string(&nDPId_options.config_file, optarg); + break; + case 'i': + set_cmdarg_string(&nDPId_options.pcap_file_or_interface, optarg); + break; + case 'r': +#ifdef ENABLE_PFRING + set_cmdarg_boolean(&nDPId_options.use_pfring, 1); + break; +#else + logger_early(1, "%s", "nDPId was built w/o PFRING support"); + return 1; +#endif + case 'I': + set_cmdarg_boolean(&nDPId_options.process_internal_initial_direction, 1); + break; + case 'E': + set_cmdarg_boolean(&nDPId_options.process_external_initial_direction, 1); + break; + case 'B': + set_cmdarg_string(&nDPId_options.bpf_str, optarg); + break; + case 'l': + enable_console_logger(); + break; + case 'L': + if (enable_file_logger(optarg) != 0) + { + return 1; + } + break; + case 'c': + set_cmdarg_string(&nDPId_options.collector_address, optarg); + break; + case 'e': +#ifdef ENABLE_EPOLL + set_cmdarg_boolean(&nDPId_options.use_poll, 1); +#else + logger_early(1, "%s", "nDPId was built w/o epoll() support, poll() is already the default"); +#endif + break; + case 'd': + daemonize_enable(); + break; + case 'p': + set_cmdarg_string(&nDPId_options.pidfile, optarg); + break; + case 'u': + set_cmdarg_string(&nDPId_options.user, optarg); + break; + case 'g': + set_cmdarg_string(&nDPId_options.group, optarg); + break; + case 'R': + set_cmdarg_string(&nDPId_options.custom_risk_domain_file, optarg); + break; + case 'P': + set_cmdarg_string(&nDPId_options.custom_protocols_file, optarg); + break; + case 'C': + set_cmdarg_string(&nDPId_options.custom_categories_file, optarg); + break; + case 'J': + set_cmdarg_string(&nDPId_options.custom_ja3_file, optarg); + break; + case 'S': + set_cmdarg_string(&nDPId_options.custom_sha1_file, optarg); + break; + case 'a': + set_cmdarg_string(&nDPId_options.instance_alias, optarg); + break; + case 'A': + set_cmdarg_boolean(&nDPId_options.enable_data_analysis, 1); + break; + case 'z': +#ifdef ENABLE_ZLIB + set_cmdarg_boolean(&nDPId_options.enable_zlib_compression, 1); + break; +#else + logger_early(1, "%s", "nDPId was built w/o zLib compression"); + return 1; +#endif + case 'o': + { + int errfnd = 0; + char * subopts = optarg; + char * value; + char * subopt_tokens[nDPIsrvd_ARRAY_LENGTH(tuning_config_map) + 1] = {}; + + for (size_t i = 0; i < nDPIsrvd_ARRAY_LENGTH(tuning_config_map); ++i) + { + subopt_tokens[i] = strdup(tuning_config_map[i].key); + } + while (*subopts != '\0' && errfnd == 0) + { + int subopt = getsubopt(&subopts, subopt_tokens, &value); + if (value == NULL && subopt != -1) + { + logger_early(1, "Missing value for `%s'", subopt_tokens[subopt]); + fprintf(stderr, "%s", "\n"); + print_usage(argv[0]); + print_subopt_usage(); + errfnd = 1; + break; + } + if (subopt == -1) + { + logger_early(1, "Invalid subopt: %s", value); + fprintf(stderr, "%s", "\n"); + print_usage(argv[0]); + print_subopt_usage(); + errfnd = 1; + break; + } + + if (set_config_from(&tuning_config_map[subopt], value) != 0) + { + logger_early(1, "Could not set subopt: %s", tuning_config_map[subopt].key); + errfnd = 1; + break; + } + } + for (size_t i = 0; i < nDPIsrvd_ARRAY_LENGTH(tuning_config_map); ++i) + { + free(subopt_tokens[i]); + } + if (errfnd != 0) + { + return 1; + } + break; + } + case 'v': + fprintf(stderr, "%s", get_nDPId_version()); + nDPId_print_deps_version(stderr); + return 1; + case 'h': + default: + fprintf(stderr, "%s\n", get_nDPId_version()); + print_usage(argv[0]); + print_subopt_usage(); + return 1; + } + } + + if (optind < argc) + { + logger_early(1, "%s", "Unexpected argument after options"); + fprintf(stderr, "%s", "\n"); + print_usage(argv[0]); + print_subopt_usage(); + return 1; + } + + return 0; +} + +static int validate_options(void) +{ + int retval = 0; + + if (is_daemonize_enabled() != 0 && is_console_logger_enabled() != 0) + { + logger_early(1, + "%s", + "Daemon mode `-d' and `-l' can not be used together, " + "because stdout/stderr is beeing redirected to /dev/null"); + retval = 1; + } +#ifdef ENABLE_ZLIB + if (GET_CMDARG_BOOL(nDPId_options.enable_zlib_compression) != 0) + { + if (GET_CMDARG_ULL(nDPId_options.compression_flow_inactivity) < TIME_S_TO_US(6u) || + GET_CMDARG_ULL(nDPId_options.compression_scan_interval) < TIME_S_TO_US(4u)) + { + logger_early(1, + "Setting compression-scan-interval / compression-flow-inactivity " + "to values lower than %llu / %llu are not recommended.", + TIME_S_TO_US(4u), + TIME_S_TO_US(6u)); + logger_early(1, "%s", "Your CPU usage may increase heavily."); + } + } +#endif + if (nDPIsrvd_setup_address(&nDPId_options.parsed_collector_address, + GET_CMDARG_STR(nDPId_options.collector_address)) != 0) + { + retval = 1; + logger_early(1, "Collector socket invalid address: %s.", GET_CMDARG_STR(nDPId_options.collector_address)); + } + if (IS_CMDARG_SET(nDPId_options.instance_alias) == 0) + { + char hname[256]; + + errno = 0; + if (gethostname(hname, sizeof(hname)) != 0) + { + logger_early(1, "Could not retrieve your hostname: %s", strerror(errno)); + retval = 1; + } + else + { + set_cmdarg_string(&nDPId_options.instance_alias, hname); + logger_early(1, + "No instance alias given, using your hostname '%s'", + GET_CMDARG_STR(nDPId_options.instance_alias)); + if (IS_CMDARG_SET(nDPId_options.instance_alias) == 0) + { + retval = 1; + } + } + } + if (GET_CMDARG_ULL(nDPId_options.max_flows_per_thread) < 128 || + GET_CMDARG_ULL(nDPId_options.max_flows_per_thread) > nDPId_MAX_FLOWS_PER_THREAD) + { + logger_early(1, + "Value not in range: 128 < max-flows-per-thread[%llu] < %d", + GET_CMDARG_ULL(nDPId_options.max_flows_per_thread), + nDPId_MAX_FLOWS_PER_THREAD); + retval = 1; + } + if (GET_CMDARG_ULL(nDPId_options.max_idle_flows_per_thread) < 64 || + GET_CMDARG_ULL(nDPId_options.max_idle_flows_per_thread) > nDPId_MAX_IDLE_FLOWS_PER_THREAD) + { + logger_early(1, + "Value not in range: 64 < max-idle-flows-per-thread[%llu] < %d", + GET_CMDARG_ULL(nDPId_options.max_idle_flows_per_thread), + nDPId_MAX_IDLE_FLOWS_PER_THREAD); + retval = 1; + } + if (GET_CMDARG_ULL(nDPId_options.reader_thread_count) < 1 || + GET_CMDARG_ULL(nDPId_options.reader_thread_count) > nDPId_MAX_READER_THREADS) + { + logger_early(1, + "Value not in range: 1 < reader-thread-count[%llu] < %d", + GET_CMDARG_ULL(nDPId_options.reader_thread_count), + nDPId_MAX_READER_THREADS); + retval = 1; + } + if (GET_CMDARG_ULL(nDPId_options.flow_scan_interval) < TIME_S_TO_US(5u)) + { + logger_early(1, + "Value not in range: idle-scan-interval[%llu] > %llu", + GET_CMDARG_ULL(nDPId_options.flow_scan_interval), + TIME_S_TO_US(5u)); + retval = 1; + } + if (GET_CMDARG_ULL(nDPId_options.flow_scan_interval) >= GET_CMDARG_ULL(nDPId_options.generic_max_idle_time)) + { + logger_early(1, + "Value not in range: flow-scan-interval[%llu] < generic-max-idle-time[%llu]", + GET_CMDARG_ULL(nDPId_options.flow_scan_interval), + GET_CMDARG_ULL(nDPId_options.generic_max_idle_time)); + retval = 1; + } + if (GET_CMDARG_ULL(nDPId_options.flow_scan_interval) >= GET_CMDARG_ULL(nDPId_options.icmp_max_idle_time)) + { + logger_early(1, + "Value not in range: flow-scan-interval[%llu] < icmp-max-idle-time[%llu]", + GET_CMDARG_ULL(nDPId_options.flow_scan_interval), + GET_CMDARG_ULL(nDPId_options.icmp_max_idle_time)); + retval = 1; + } + if (GET_CMDARG_ULL(nDPId_options.flow_scan_interval) >= GET_CMDARG_ULL(nDPId_options.tcp_max_idle_time)) + { + logger_early(1, + "Value not in range: flow-scan-interval[%llu] < generic-max-idle-time[%llu]", + GET_CMDARG_ULL(nDPId_options.flow_scan_interval), + GET_CMDARG_ULL(nDPId_options.tcp_max_idle_time)); + retval = 1; + } + if (GET_CMDARG_ULL(nDPId_options.flow_scan_interval) >= GET_CMDARG_ULL(nDPId_options.udp_max_idle_time)) + { + logger_early(1, + "Value not in range:flow-scan-interval[%llu] < udp-max-idle-time[%llu]", + GET_CMDARG_ULL(nDPId_options.flow_scan_interval), + GET_CMDARG_ULL(nDPId_options.udp_max_idle_time)); + retval = 1; + } + if (GET_CMDARG_BOOL(nDPId_options.process_internal_initial_direction) != 0 && + GET_CMDARG_BOOL(nDPId_options.process_external_initial_direction) != 0) + { + logger_early(1, "%s", "Internal and External packet processing does not make sense as this is the default."); + retval = 1; + } + if (GET_CMDARG_BOOL(nDPId_options.process_internal_initial_direction) != 0 || + GET_CMDARG_BOOL(nDPId_options.process_external_initial_direction) != 0) + { + logger_early(1, + "%s", + "Internal and External packet processing may lead to incorrect results for flows that were active " + "before the daemon started."); + } + if (GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_process) < 1 || + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_process) > 65535) + { + logger_early(1, + "Value not in range: 1 =< max-packets-per-flow-to-process[%llu] =< 65535", + GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_process)); + retval = 1; + } + if (GET_CMDARG_ULL(nDPId_options.max_packets_per_flow_to_send) > 30) + { + logger_early(1, "%s", "Higher values of max-packets-per-flow-to-send may cause superfluous network usage."); + } + + return retval; +} + +static int nDPId_parsed_config_line( + int lineno, char const * const section, char const * const name, char const * const value, void * const user_data) +{ + (void)user_data; + + if (strnlen(section, INI_MAX_SECTION) == nDPIsrvd_STRLEN_SZ("general") && + strncmp(section, "general", INI_MAX_SECTION) == 0) + { + size_t i; + + for (i = 0; i < nDPIsrvd_ARRAY_LENGTH(general_config_map); ++i) + { + if (strnlen(name, INI_MAX_NAME) == strnlen(general_config_map[i].key, INI_MAX_NAME) && + strncmp(name, general_config_map[i].key, INI_MAX_NAME) == 0) + { + if (IS_CMDARG_SET(*general_config_map[i].opt) != 0) + { + logger_early(1, "General config key `%s' already set, ignoring value `%s'", name, value); + } + else + { + if (set_config_from(&general_config_map[i], value) != 0) + { + return 0; + } + } + break; + } + } + if (i == nDPIsrvd_ARRAY_LENGTH(general_config_map)) + { + logger_early(1, "Invalid general config key `%s' at line %d", name, lineno); + } + } + else if (strnlen(section, INI_MAX_SECTION) == nDPIsrvd_STRLEN_SZ("tuning") && + strncmp(section, "tuning", INI_MAX_SECTION) == 0) + { + size_t i; + + for (i = 0; i < nDPIsrvd_ARRAY_LENGTH(tuning_config_map); ++i) + { + if (strnlen(name, INI_MAX_NAME) == strnlen(tuning_config_map[i].key, INI_MAX_NAME) && + strncmp(name, tuning_config_map[i].key, INI_MAX_NAME) == 0) + { + if (set_config_from(&tuning_config_map[i], value) != 0) + { + logger_early( + 1, "Non numeric tuning config value `%s' for key `%s' at line %d", value, name, lineno); + return 0; + } + break; + } + } + if (i == nDPIsrvd_ARRAY_LENGTH(tuning_config_map)) + { + logger_early(1, "Invalid tuning config key `%s' at line %d", name, lineno); + } + } + else if ((strnlen(section, INI_MAX_SECTION) == nDPIsrvd_STRLEN_SZ("ndpi") && + strncmp(section, "ndpi", INI_MAX_SECTION) == 0) || + (strnlen(section, INI_MAX_SECTION) == nDPIsrvd_STRLEN_SZ("protos") && + strncmp(section, "protos", INI_MAX_SECTION) == 0)) + { + // Nothing to do here right now (workflow not initialized yet) + return 1; + } + else + { + logger_early( + 1, "Invalid config section `%s' at line %d with key `%s' and value `%s'", section, lineno, name, value); + } + + return 1; +} + +#ifndef NO_MAIN +int main(int argc, char ** argv) +{ + if (argc == 0 || stdout == NULL || stderr == NULL) + { + return 1; + } + + init_logging("nDPId"); + + if (nDPId_parse_options(argc, argv) != 0) + { + return 1; + } + set_config_defaults(&general_config_map[0], nDPIsrvd_ARRAY_LENGTH(general_config_map)); + set_config_defaults(&tuning_config_map[0], nDPIsrvd_ARRAY_LENGTH(tuning_config_map)); + { + int ret; + + if (IS_CMDARG_SET(nDPId_options.config_file) != 0 && + (ret = parse_config_file(GET_CMDARG_STR(nDPId_options.config_file), nDPId_parsed_config_line, NULL)) != 0) + { + if (ret > 0) + { + logger_early(1, "Config file `%s' is malformed", GET_CMDARG_STR(nDPId_options.config_file)); + } + else if (ret == -ENOENT) + { + logger_early(1, "Path `%s' is not a regular file", GET_CMDARG_STR(nDPId_options.config_file)); + } + else + { + logger_early(1, + "Could not open file `%s' for reading: %s", + GET_CMDARG_STR(nDPId_options.config_file), + strerror(errno)); + } + return 1; + } + } + if (validate_options() != 0) + { + logger_early(1, "%s", "Option validation failed."); + return 1; + } + + log_app_info(); + + nDPId_print_deps_version(stdout); + + if (NDPI_API_VERSION != ndpi_get_api_version()) + { + logger_early(1, + "Unforeseen Consequences; nDPId was compiled with libnDPI api version %u, but the api version of " + "the shared library is: %u.", + NDPI_API_VERSION, + ndpi_get_api_version()); + } + if (sizeof(struct ndpi_flow_struct) != ndpi_detection_get_sizeof_ndpi_flow_struct()) + { + logger_early(1, + "FATAL: nDPI flow struct size inconsistent; expected %zu bytes, got %u bytes.", + sizeof(struct ndpi_flow_struct), + ndpi_detection_get_sizeof_ndpi_flow_struct()); + return 1; + } + +#ifdef ENABLE_MEMORY_PROFILING + logger_early(0, "size/workflow....: %zu bytes", sizeof(struct nDPId_workflow)); + logger_early(0, "size/flow........: %zu bytes", sizeof(struct nDPId_flow) + sizeof(struct nDPId_detection_data)); + logger_early(0, "size/flow-analyse: %zu bytes", sizeof(struct nDPId_flow_analysis)); +#endif + + if (setup_reader_threads() != 0) + { + return 1; + } + + if (start_reader_threads() != 0) + { + return 1; + } + + signal(SIGINT, sighandler); + signal(SIGTERM, sighandler); + signal(SIGPIPE, SIG_IGN); + + while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) + { + sleep(1); + } + + if (stop_reader_threads() != 0) + { + return 1; + } + free_reader_threads(); + + daemonize_shutdown(GET_CMDARG_STR(nDPId_options.pidfile)); + logger(0, "%s", "Bye."); + shutdown_logging(); + + return 0; +} +#endif |