#if defined(__FreeBSD__) || defined(__APPLE__) #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if !defined(__FreeBSD__) && !defined(__APPLE__) #include #endif #include #include #ifdef ENABLE_ZLIB #include #endif #include "config.h" #include "nDPIsrvd.h" #include "nio.h" #include "utils.h" #ifndef UNIX_PATH_MAX #define UNIX_PATH_MAX 108 #endif #ifndef ETHERTYPE_DCE #define ETHERTYPE_DCE 0x8903 #endif #ifndef ETHERTYPE_PAE #define ETHERTYPE_PAE 0x888e #endif #ifndef DLT_DSA_TAG_DSA #define DLT_DSA_TAG_DSA 284 #endif #ifndef DLT_DSA_TAG_EDSA #define DLT_DSA_TAG_EDSA 285 #endif #if ((NDPI_MAJOR == 4 && NDPI_MINOR < 8) || NDPI_MAJOR < 4) && NDPI_API_VERSION < 9000 #error "nDPI >= 4.8.0 or API version >= 9000 required" #endif #if nDPId_MAX_READER_THREADS <= 0 #error "Invalid value for nDPId_MAX_READER_THREADS" #endif #if nDPId_FLOW_SCAN_INTERVAL > nDPId_GENERIC_IDLE_TIME || nDPId_FLOW_SCAN_INTERVAL > nDPId_ICMP_IDLE_TIME || \ nDPId_FLOW_SCAN_INTERVAL > nDPId_TCP_IDLE_TIME || nDPId_FLOW_SCAN_INTERVAL > nDPId_UDP_IDLE_TIME #error "Invalid value for nDPId_FLOW_SCAN_INTERVAL" #endif #if (nDPId_PACKETS_PLEN_MAX * 3) /* base64 encoded! */ > NETWORK_BUFFER_MAX_SIZE #error "Invalid value for nDPId_PACKETS_PLEN_MAX" #endif /* MIPS* does not support Compare and Swap. Use traditional locking as fallback. */ #if !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) || !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8) #define MT_VALUE(name, type) \ struct name \ { \ volatile uint64_t var; \ pthread_mutex_t var_mutex; \ } name #define MT_INIT(value) \ { \ value, PTHREAD_MUTEX_INITIALIZER \ } #define MT_INIT2(name, value) \ do \ { \ name.var = value; \ pthread_mutex_init(&name.var_mutex, NULL); \ } while (0) static inline uint64_t mt_pt_get_and_add(volatile uint64_t * value, uint64_t add, pthread_mutex_t * mutex) { uint64_t result; pthread_mutex_lock(mutex); result = *value; *value += add; pthread_mutex_unlock(mutex); return result; } #define MT_GET_AND_ADD(name, value) mt_pt_get_and_add(&name.var, value, &name.var_mutex) static inline uint64_t mt_pt_get_and_sub(volatile uint64_t * value, uint64_t sub, pthread_mutex_t * mutex) { uint64_t result; pthread_mutex_lock(mutex); result = *value; *value -= sub; pthread_mutex_unlock(mutex); return result; } #define MT_GET_AND_SUB(name, value) mt_pt_get_and_sub(&name.var, value, &name.var_mutex) #else #define MT_VALUE(name, type) volatile type name #define MT_INIT(value) value #define MT_INIT2(name, value) \ do \ { \ name = value; \ } while (0) #define MT_GET_AND_ADD(name, value) __sync_fetch_and_add(&name, value) #define MT_GET_AND_SUB(name, value) __sync_fetch_and_sub(&name, value) #endif enum nDPId_l3_type { L3_IP, L3_IP6 }; union nDPId_ip { struct { uint32_t ip; } v4; struct { union { uint64_t ip[2]; uint32_t ip_u32[4]; }; } v6; }; enum nDPId_flow_state { FS_UNKNOWN = 0, // should never happen, bug otherwise FS_SKIPPED, // flow should not be processed, see command line args -I and -E FS_FINISHED, // detection done and detection data free'd FS_INFO, // detection in progress, detection data allocated FS_COUNT }; enum nDPId_flow_direction { FD_SRC2DST = 0, FD_DST2SRC, FD_COUNT }; struct nDPId_flow_analysis { struct ndpi_analyze_struct iat; struct ndpi_analyze_struct pktlen; uint8_t * directions; struct ndpi_bin payload_len_bin[FD_COUNT]; float * entropies; }; /* * Minimal per-flow information required for flow mgmt and timeout handling. */ struct nDPId_flow_basic { enum nDPId_flow_state state; enum nDPId_l3_type l3_type; uint64_t hashval; union nDPId_ip src; union nDPId_ip dst; uint8_t l4_protocol; uint8_t tcp_fin_rst_seen : 1; uint8_t tcp_is_midstream_flow : 1; uint8_t reserved_00 : 6; uint8_t reserved_01[2]; uint16_t src_port; uint16_t dst_port; uint64_t last_pkt_time[FD_COUNT]; }; /* * Information required for a full detection cycle. */ struct nDPId_flow_extended { struct nDPId_flow_basic flow_basic; unsigned long long int flow_id; uint16_t min_l4_payload_len[FD_COUNT]; uint16_t max_l4_payload_len[FD_COUNT]; ; unsigned long long int packets_processed[FD_COUNT]; uint64_t first_seen; uint64_t last_flow_update; struct nDPId_flow_analysis * flow_analysis; unsigned long long int total_l4_payload_len[FD_COUNT]; struct ndpi_proto detected_l7_protocol; }; /* * Skipped flows need at least some information. */ struct nDPId_flow_skipped { struct nDPId_flow_basic flow_basic; }; /* * Structure which is important for the detection process. * The structure is also a compression target, if activated. */ struct nDPId_detection_data { uint32_t last_ndpi_flow_struct_hash; struct ndpi_proto guessed_l7_protocol; struct ndpi_flow_struct flow; }; struct nDPId_flow { struct nDPId_flow_extended flow_extended; union { struct { uint8_t detection_completed : 1; // Flow was detected. Detection updates may still occur. uint8_t reserved_00 : 7; uint8_t reserved_01[1]; #ifdef ENABLE_ZLIB uint16_t detection_data_compressed_size; #endif struct nDPId_detection_data * detection_data; } info; struct { ndpi_risk risk; ndpi_confidence_t confidence; } finished; }; }; struct nDPId_workflow { pcap_t * pcap_handle; MT_VALUE(error_or_eof, uint8_t); uint8_t is_pcap_file; uint16_t error_count; unsigned long long int packets_captured; unsigned long long int packets_processed; unsigned long long int total_skipped_flows; unsigned long long int total_l4_payload_len; unsigned long long int libnDPI_errors; unsigned long long int total_not_detected_flows; unsigned long long int total_guessed_flows; unsigned long long int total_detected_flows; unsigned long long int total_flow_detection_updates; unsigned long long int total_flow_updates; #ifdef ENABLE_MEMORY_PROFILING uint64_t last_memory_usage_log_time; #endif #ifdef ENABLE_ZLIB uint64_t last_compression_scan_time; uint64_t total_compressions; uint64_t total_compression_diff; uint64_t current_compression_diff; #endif uint64_t last_scan_time; uint64_t last_status_time; uint64_t last_global_time; uint64_t last_thread_time; uint64_t last_error_time; void ** ndpi_flows_active; unsigned long long int max_active_flows; unsigned long long int cur_active_flows; unsigned long long int total_active_flows; void ** ndpi_flows_idle; unsigned long long int max_idle_flows; unsigned long long int cur_idle_flows; unsigned long long int total_idle_flows; unsigned long long int total_events_serialized; ndpi_serializer ndpi_serializer; struct ndpi_detection_module_struct * ndpi_struct; }; struct nDPId_reader_thread { struct nDPId_workflow * workflow; pthread_t thread; int collector_sockfd; int collector_sock_last_errno; size_t array_index; }; enum packet_event { PACKET_EVENT_INVALID = 0, PACKET_EVENT_PAYLOAD, // A single packet that does not belong to a flow for whatever reasons. // E.g. it could be malformed and thus no flow handling is done. // There may be additional use-cases in the future. PACKET_EVENT_PAYLOAD_FLOW, // Special case; A packet event that belongs to a flow but does not include all // information a flow event requires. PACKET_EVENT_COUNT }; enum flow_event { FLOW_EVENT_INVALID = 0, FLOW_EVENT_NEW, FLOW_EVENT_END, // TCP only: FIN/RST packet seen. FLOW_EVENT_IDLE, // Flow timed out. FLOW_EVENT_UPDATE, // Inform distributor applications about flows with a long lifetime. FLOW_EVENT_ANALYSE, // Print information regarding a flow analysis, see `struct nDPId_flow_analysis'. FLOW_EVENT_GUESSED, FLOW_EVENT_DETECTED, FLOW_EVENT_DETECTION_UPDATE, // Some information in `struct ndpi_flow_struct' changed. FLOW_EVENT_NOT_DETECTED, FLOW_EVENT_COUNT }; enum error_event { ERROR_EVENT_INVALID = 0, UNKNOWN_DATALINK_LAYER, UNKNOWN_L3_PROTOCOL, UNSUPPORTED_DATALINK_LAYER, PACKET_TOO_SHORT, PACKET_TYPE_UNKNOWN, PACKET_HEADER_INVALID, IP4_PACKET_TOO_SHORT, IP4_SIZE_SMALLER_THAN_HEADER, IP4_L4_PAYLOAD_DETECTION_FAILED, IP6_PACKET_TOO_SHORT, // 10 IP6_SIZE_SMALLER_THAN_HEADER, IP6_L4_PAYLOAD_DETECTION_FAILED, TCP_PACKET_TOO_SHORT, UDP_PACKET_TOO_SHORT, CAPTURE_SIZE_SMALLER_THAN_PACKET_SIZE, MAX_FLOW_TO_TRACK, FLOW_MEMORY_ALLOCATION_FAILED, ERROR_EVENT_COUNT // 17 }; enum daemon_event { DAEMON_EVENT_INVALID = 0, DAEMON_EVENT_INIT, DAEMON_EVENT_RECONNECT, DAEMON_EVENT_SHUTDOWN, DAEMON_EVENT_STATUS, DAEMON_EVENT_COUNT }; static char const * const flow_state_name_table[FS_COUNT] = { [FS_UNKNOWN] = "unknown", [FS_SKIPPED] = "skipped", [FS_FINISHED] = "finished", [FS_INFO] = "info"}; static char const * const packet_event_name_table[PACKET_EVENT_COUNT] = { [PACKET_EVENT_INVALID] = "invalid", [PACKET_EVENT_PAYLOAD] = "packet", [PACKET_EVENT_PAYLOAD_FLOW] = "packet-flow"}; static char const * const flow_event_name_table[FLOW_EVENT_COUNT] = {[FLOW_EVENT_INVALID] = "invalid", [FLOW_EVENT_NEW] = "new", [FLOW_EVENT_END] = "end", [FLOW_EVENT_IDLE] = "idle", [FLOW_EVENT_UPDATE] = "update", [FLOW_EVENT_ANALYSE] = "analyse", [FLOW_EVENT_GUESSED] = "guessed", [FLOW_EVENT_DETECTED] = "detected", [FLOW_EVENT_DETECTION_UPDATE] = "detection-update", [FLOW_EVENT_NOT_DETECTED] = "not-detected"}; static char const * const error_event_name_table[ERROR_EVENT_COUNT] = { [ERROR_EVENT_INVALID] = "invalid", [UNKNOWN_DATALINK_LAYER] = "Unknown datalink layer packet", [UNKNOWN_L3_PROTOCOL] = "Unknown L3 protocol", [UNSUPPORTED_DATALINK_LAYER] = "Unsupported datalink layer", [PACKET_TOO_SHORT] = "Packet too short", [PACKET_TYPE_UNKNOWN] = "Unknown packet type", [PACKET_HEADER_INVALID] = "Packet header invalid", [IP4_PACKET_TOO_SHORT] = "IP4 packet too short", [IP4_SIZE_SMALLER_THAN_HEADER] = "Packet smaller than IP4 header", [IP4_L4_PAYLOAD_DETECTION_FAILED] = "nDPI IPv4/L4 payload detection failed", [IP6_PACKET_TOO_SHORT] = "IP6 packet too short", [IP6_SIZE_SMALLER_THAN_HEADER] = "Packet smaller than IP6 header", [IP6_L4_PAYLOAD_DETECTION_FAILED] = "nDPI IPv6/L4 payload detection failed", [TCP_PACKET_TOO_SHORT] = "TCP packet smaller than expected", [UDP_PACKET_TOO_SHORT] = "UDP packet smaller than expected", [CAPTURE_SIZE_SMALLER_THAN_PACKET_SIZE] = "Captured packet size is smaller than expected packet size", [MAX_FLOW_TO_TRACK] = "Max flows to track reached", [FLOW_MEMORY_ALLOCATION_FAILED] = "Flow memory allocation failed", }; static char const * const daemon_event_name_table[DAEMON_EVENT_COUNT] = { [DAEMON_EVENT_INVALID] = "invalid", [DAEMON_EVENT_INIT] = "init", [DAEMON_EVENT_RECONNECT] = "reconnect", [DAEMON_EVENT_SHUTDOWN] = "shutdown", [DAEMON_EVENT_STATUS] = "status", }; static struct nDPId_reader_thread reader_threads[nDPId_MAX_READER_THREADS] = {}; static struct nDPIsrvd_address collector_address; static MT_VALUE(nDPId_main_thread_shutdown, int) = MT_INIT(0); static MT_VALUE(global_flow_id, uint64_t) = MT_INIT(1); #ifdef ENABLE_MEMORY_PROFILING static MT_VALUE(ndpi_memory_alloc_count, uint64_t) = MT_INIT(0); static MT_VALUE(ndpi_memory_alloc_bytes, uint64_t) = MT_INIT(0); static MT_VALUE(ndpi_memory_free_count, uint64_t) = MT_INIT(0); static MT_VALUE(ndpi_memory_free_bytes, uint64_t) = MT_INIT(0); #ifdef ENABLE_ZLIB static MT_VALUE(zlib_compressions, uint64_t) = MT_INIT(0); static MT_VALUE(zlib_decompressions, uint64_t) = MT_INIT(0); static MT_VALUE(zlib_compression_diff, uint64_t) = MT_INIT(0); static MT_VALUE(zlib_compression_bytes, uint64_t) = MT_INIT(0); #endif #endif static struct { /* opts */ struct cmdarg pcap_file_or_interface; struct cmdarg bpf_str; struct cmdarg pidfile; struct cmdarg user; struct cmdarg group; struct cmdarg custom_protocols_file; struct cmdarg custom_categories_file; struct cmdarg custom_ja3_file; struct cmdarg custom_sha1_file; struct cmdarg collector_address; struct cmdarg instance_alias; union nDPId_ip pcap_dev_ip4, pcap_dev_ip6; union nDPId_ip pcap_dev_netmask4, pcap_dev_netmask6; union nDPId_ip pcap_dev_subnet4, pcap_dev_subnet6; uint8_t process_internal_initial_direction; uint8_t process_external_initial_direction; #ifdef ENABLE_ZLIB uint8_t enable_zlib_compression; #endif uint8_t enable_data_analysis; #ifdef ENABLE_EPOLL uint8_t use_poll; #endif /* subopts */ unsigned long long int max_flows_per_thread; unsigned long long int max_idle_flows_per_thread; unsigned long long int reader_thread_count; unsigned long long int daemon_status_interval; #ifdef ENABLE_MEMORY_PROFILING unsigned long long int memory_profiling_log_interval; #endif #ifdef ENABLE_ZLIB unsigned long long int compression_scan_interval; unsigned long long int compression_flow_inactivity; #endif unsigned long long int flow_scan_interval; unsigned long long int generic_max_idle_time; unsigned long long int icmp_max_idle_time; unsigned long long int udp_max_idle_time; unsigned long long int tcp_max_idle_time; unsigned long long int tcp_max_post_end_flow_time; unsigned long long int max_packets_per_flow_to_send; unsigned long long int max_packets_per_flow_to_process; unsigned long long int max_packets_per_flow_to_analyse; unsigned long long int error_event_threshold_n; unsigned long long int error_event_threshold_time; } nDPId_options = {.pcap_file_or_interface = CMDARG(NULL), .bpf_str = CMDARG(NULL), .pidfile = CMDARG(nDPId_PIDFILE), .user = CMDARG(DEFAULT_CHUSER), .group = CMDARG(NULL), .custom_protocols_file = CMDARG(NULL), .custom_categories_file = CMDARG(NULL), .custom_ja3_file = CMDARG(NULL), .custom_sha1_file = CMDARG(NULL), .collector_address = CMDARG(COLLECTOR_UNIX_SOCKET), .instance_alias = CMDARG(NULL), .max_flows_per_thread = nDPId_MAX_FLOWS_PER_THREAD / 2, .max_idle_flows_per_thread = nDPId_MAX_IDLE_FLOWS_PER_THREAD / 2, #ifdef CROSS_COMPILATION /* * We are assuming that in the cross compilation case * our target system is an embedded one with not much memory available. * To further reduce memory consumption caused by allocating nDPId / nDPI workflows per thread, * we set the default reader thread count to two. */ .reader_thread_count = 2, #else .reader_thread_count = nDPId_MAX_READER_THREADS / 3, #endif .daemon_status_interval = nDPId_DAEMON_STATUS_INTERVAL, #ifdef ENABLE_MEMORY_PROFILING .memory_profiling_log_interval = nDPId_MEMORY_PROFILING_LOG_INTERVAL, #endif #ifdef ENABLE_ZLIB .compression_scan_interval = nDPId_COMPRESSION_SCAN_INTERVAL, .compression_flow_inactivity = nDPId_COMPRESSION_FLOW_INACTIVITY, #endif .flow_scan_interval = nDPId_FLOW_SCAN_INTERVAL, .generic_max_idle_time = nDPId_GENERIC_IDLE_TIME, .icmp_max_idle_time = nDPId_ICMP_IDLE_TIME, .udp_max_idle_time = nDPId_UDP_IDLE_TIME, .tcp_max_idle_time = nDPId_TCP_IDLE_TIME, .tcp_max_post_end_flow_time = nDPId_TCP_POST_END_FLOW_TIME, .max_packets_per_flow_to_send = nDPId_PACKETS_PER_FLOW_TO_SEND, .max_packets_per_flow_to_process = nDPId_PACKETS_PER_FLOW_TO_PROCESS, .max_packets_per_flow_to_analyse = nDPId_PACKETS_PER_FLOW_TO_ANALYZE, .error_event_threshold_n = nDPId_ERROR_EVENT_THRESHOLD_N, .error_event_threshold_time = nDPId_ERROR_EVENT_THRESHOLD_TIME}; enum nDPId_subopts { MAX_FLOWS_PER_THREAD = 0, MAX_IDLE_FLOWS_PER_THREAD, MAX_READER_THREADS, DAEMON_STATUS_INTERVAL, #ifdef ENABLE_MEMORY_PROFILING MEMORY_PROFILING_LOG_INTERVAL, #endif #ifdef ENABLE_ZLIB COMPRESSION_SCAN_INTERVAL, COMPRESSION_FLOW_INACTIVITY, #endif FLOW_SCAN_INTVERAL, GENERIC_MAX_IDLE_TIME, ICMP_MAX_IDLE_TIME, UDP_MAX_IDLE_TIME, TCP_MAX_IDLE_TIME, TCP_MAX_POST_END_FLOW_TIME, MAX_PACKETS_PER_FLOW_TO_SEND, MAX_PACKETS_PER_FLOW_TO_PROCESS, MAX_PACKETS_PER_FLOW_TO_ANALYZE, ERROR_EVENT_THRESHOLD_N, ERROR_EVENT_THRESHOLD_TIME, }; static char * const subopt_token[] = {[MAX_FLOWS_PER_THREAD] = "max-flows-per-thread", [MAX_IDLE_FLOWS_PER_THREAD] = "max-idle-flows-per-thread", [MAX_READER_THREADS] = "max-reader-threads", [DAEMON_STATUS_INTERVAL] = "daemon-status-interval", #ifdef ENABLE_MEMORY_PROFILING [MEMORY_PROFILING_LOG_INTERVAL] = "memory-profiling-log-interval", #endif #ifdef ENABLE_ZLIB [COMPRESSION_SCAN_INTERVAL] = "compression-scan-interval", [COMPRESSION_FLOW_INACTIVITY] = "compression-flow-inactivity", #endif [FLOW_SCAN_INTVERAL] = "flow-scan-interval", [GENERIC_MAX_IDLE_TIME] = "generic-max-idle-time", [ICMP_MAX_IDLE_TIME] = "icmp-max-idle-time", [UDP_MAX_IDLE_TIME] = "udp-max-idle-time", [TCP_MAX_IDLE_TIME] = "tcp-max-idle-time", [TCP_MAX_POST_END_FLOW_TIME] = "tcp-max-post-end-flow-time", [MAX_PACKETS_PER_FLOW_TO_SEND] = "max-packets-per-flow-to-send", [MAX_PACKETS_PER_FLOW_TO_PROCESS] = "max-packets-per-flow-to-process", [MAX_PACKETS_PER_FLOW_TO_ANALYZE] = "max-packets-per-flow-to-analyse", [ERROR_EVENT_THRESHOLD_N] = "error-event-threshold-n", [ERROR_EVENT_THRESHOLD_TIME] = "error-event-threshold-time", NULL}; static void sighandler(int signum); static int processing_threads_error_or_eof(void); static void free_workflow(struct nDPId_workflow ** const workflow); static void serialize_and_send(struct nDPId_reader_thread * const reader_thread); static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, struct nDPId_flow_extended * const flow_ext, enum flow_event event); static void jsonize_flow_detection_event(struct nDPId_reader_thread * const reader_thread, struct nDPId_flow * const flow, enum flow_event event); static int set_collector_nonblock(struct nDPId_reader_thread * const reader_thread) { int current_flags; while ((current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0)) == -1 && errno == EINTR) {} if (current_flags == -1) {} while ((current_flags = fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags | O_NONBLOCK)) == -1 && errno == EINTR) { // Retry if interrupted by a signal. } if (current_flags == -1) { reader_thread->collector_sock_last_errno = errno; logger(1, "[%8llu] Could not set collector fd %d to non-blocking mode: %s", reader_thread->workflow->packets_processed, reader_thread->collector_sockfd, strerror(errno)); return 1; } return 0; } static int set_collector_block(struct nDPId_reader_thread * const reader_thread) { int current_flags = fcntl(reader_thread->collector_sockfd, F_GETFL, 0); if (current_flags == -1 || fcntl(reader_thread->collector_sockfd, F_SETFL, current_flags & ~O_NONBLOCK) == -1) { reader_thread->collector_sock_last_errno = errno; logger(1, "[%8llu] Could not set collector fd %d to blocking mode: %s", reader_thread->workflow->packets_processed, reader_thread->collector_sockfd, strerror(errno)); return 1; } return 0; } u_int8_t plen2slot(u_int16_t plen) { if (plen > nDPId_ANALYZE_PLEN_MAX) { return nDPId_ANALYZE_PLEN_NUM_BINS - 1; } else { return plen / nDPId_ANALYZE_PLEN_BIN_LEN; } } static uint64_t get_last_pkt_time(struct nDPId_flow_basic const * const flow_basic) { return ndpi_max(flow_basic->last_pkt_time[FD_SRC2DST], flow_basic->last_pkt_time[FD_DST2SRC]); } static uint64_t timer_sub(uint64_t a, uint64_t b) { if (b > a) { return 0; } return a - b; } #ifdef ENABLE_ZLIB static int zlib_deflate(const void * const src, int srcLen, void * dst, int dstLen) { z_stream strm = {0}; strm.total_in = strm.avail_in = srcLen; strm.total_out = strm.avail_out = dstLen; strm.next_in = (Bytef *)src; strm.next_out = (Bytef *)dst; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; int err = -1; int ret = -1; err = deflateInit(&strm, Z_BEST_COMPRESSION); if (err == Z_OK) { err = deflate(&strm, Z_FINISH); if (err == Z_STREAM_END) { ret = strm.total_out; #ifdef ENABLE_MEMORY_PROFILING MT_GET_AND_ADD(zlib_compressions, 1); MT_GET_AND_ADD(zlib_compression_diff, srcLen - ret); MT_GET_AND_ADD(zlib_compression_bytes, ret); #endif } else { deflateEnd(&strm); return err; } } else { deflateEnd(&strm); return err; } deflateEnd(&strm); return ret; } static int zlib_inflate(const void * src, int srcLen, void * dst, int dstLen) { z_stream strm = {0}; strm.total_in = strm.avail_in = srcLen; strm.total_out = strm.avail_out = dstLen; strm.next_in = (Bytef *)src; strm.next_out = (Bytef *)dst; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; int err = -1; int ret = -1; err = inflateInit2(&strm, (15 + 32)); // 15 window bits, and the +32 tells zlib to to detect if using gzip or zlib if (err == Z_OK) { err = inflate(&strm, Z_FINISH); if (err == Z_STREAM_END) { ret = strm.total_out; #ifdef ENABLE_MEMORY_PROFILING MT_GET_AND_ADD(zlib_decompressions, 1); MT_GET_AND_SUB(zlib_compression_diff, ret - srcLen); #endif } else { inflateEnd(&strm); return err; } } else { inflateEnd(&strm); return err; } inflateEnd(&strm); return ret; } static int detection_data_deflate(struct nDPId_flow * const flow) { uint8_t tmpOut[sizeof(*flow->info.detection_data)]; int ret; if (flow->info.detection_data_compressed_size > 0) { return -7; } ret = zlib_deflate(flow->info.detection_data, sizeof(*flow->info.detection_data), tmpOut, sizeof(tmpOut)); if (ret <= 0) { return ret; } struct nDPId_detection_data * const new_det_data = ndpi_malloc(ret); if (new_det_data == NULL) { return -8; } ndpi_free(flow->info.detection_data); flow->info.detection_data = new_det_data; memcpy(flow->info.detection_data, tmpOut, ret); flow->info.detection_data_compressed_size = ret; return ret; } static int detection_data_inflate(struct nDPId_flow * const flow) { uint8_t tmpOut[sizeof(*flow->info.detection_data)]; int ret; if (flow->info.detection_data_compressed_size == 0) { return -7; } ret = zlib_inflate(flow->info.detection_data, flow->info.detection_data_compressed_size, tmpOut, sizeof(tmpOut)); if (ret <= 0) { return ret; } struct nDPId_detection_data * const new_det_data = ndpi_malloc(ret); if (new_det_data == NULL) { return -8; } ndpi_free(flow->info.detection_data); flow->info.detection_data = new_det_data; memcpy(flow->info.detection_data, tmpOut, ret); flow->info.detection_data_compressed_size = 0; return ret; } static void ndpi_comp_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) { struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data; struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; (void)depth; if (workflow == NULL || flow_basic == NULL) { return; } if (which == ndpi_preorder || which == ndpi_leaf) { switch (flow_basic->state) { case FS_UNKNOWN: case FS_COUNT: case FS_SKIPPED: case FS_FINISHED: break; case FS_INFO: { if (get_last_pkt_time(flow_basic) + nDPId_options.compression_flow_inactivity < workflow->last_thread_time) { struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic; if (flow->info.detection_data_compressed_size > 0) { break; } int ret = detection_data_deflate(flow); if (ret <= 0) { logger(1, "zLib compression failed for flow %llu with error code: %d", flow->flow_extended.flow_id, ret); } else { workflow->total_compressions++; workflow->total_compression_diff += ret; workflow->current_compression_diff += ret; } } break; } } } } static void check_for_compressable_flows(struct nDPId_reader_thread * const reader_thread) { struct nDPId_workflow * const workflow = reader_thread->workflow; if (workflow->last_compression_scan_time + nDPId_options.compression_scan_interval < workflow->last_thread_time) { for (size_t comp_scan_index = 0; comp_scan_index < workflow->max_active_flows; ++comp_scan_index) { ndpi_twalk(workflow->ndpi_flows_active[comp_scan_index], ndpi_comp_scan_walker, workflow); } workflow->last_compression_scan_time = workflow->last_thread_time; } } #endif static void ip_netmask_to_subnet(union nDPId_ip const * const ip, union nDPId_ip const * const netmask, union nDPId_ip * const subnet, enum nDPId_l3_type type) { switch (type) { case L3_IP: subnet->v4.ip = ip->v4.ip & netmask->v4.ip; break; case L3_IP6: subnet->v6.ip[0] = ip->v6.ip[0] & netmask->v6.ip[0]; subnet->v6.ip[1] = ip->v6.ip[1] & netmask->v6.ip[1]; break; } } static int is_ip_in_subnet(union nDPId_ip const * const cmp_ip, union nDPId_ip const * const netmask, union nDPId_ip const * const cmp_subnet, enum nDPId_l3_type const type) { switch (type) { case L3_IP: return (cmp_ip->v4.ip & netmask->v4.ip) == cmp_subnet->v4.ip; case L3_IP6: return (cmp_ip->v6.ip[0] & netmask->v6.ip[0]) == cmp_subnet->v6.ip[0] && (cmp_ip->v6.ip[1] & netmask->v6.ip[1]) == cmp_subnet->v6.ip[1]; } return 0; } static void get_ip4_from_sockaddr(struct sockaddr_in const * const saddr, union nDPId_ip * dest) { switch (saddr->sin_family) { case AF_INET: dest->v4.ip = saddr->sin_addr.s_addr; break; case AF_INET6: return; } } static void get_ip6_from_sockaddr(struct sockaddr_in6 const * const saddr, union nDPId_ip * dest) { switch (saddr->sin6_family) { case AF_INET6: #if defined(__FreeBSD__) || defined(__APPLE__) dest->v6.ip_u32[0] = saddr->sin6_addr.__u6_addr.__u6_addr32[0]; dest->v6.ip_u32[1] = saddr->sin6_addr.__u6_addr.__u6_addr32[1]; dest->v6.ip_u32[2] = saddr->sin6_addr.__u6_addr.__u6_addr32[2]; dest->v6.ip_u32[3] = saddr->sin6_addr.__u6_addr.__u6_addr32[3]; #else dest->v6.ip_u32[0] = saddr->sin6_addr.s6_addr32[0]; dest->v6.ip_u32[1] = saddr->sin6_addr.s6_addr32[1]; dest->v6.ip_u32[2] = saddr->sin6_addr.s6_addr32[2]; dest->v6.ip_u32[3] = saddr->sin6_addr.s6_addr32[3]; #endif break; default: return; } } static int get_ip6_address_and_netmask(char const * const ifa_name, size_t ifnamelen) { FILE * f; char addr6[INET6_ADDRSTRLEN], netmask6[INET6_ADDRSTRLEN], subnet6[INET6_ADDRSTRLEN], devname[21]; struct sockaddr_in6 sap; int plen, scope, dad_status, if_idx, retval = 0; char addr6p[8][5]; f = fopen("/proc/net/if_inet6", "r"); if (f == NULL) { return 1; } while (fscanf(f, "%4s%4s%4s%4s%4s%4s%4s%4s %08x %02x %02x %02x %20s\n", addr6p[0], addr6p[1], addr6p[2], addr6p[3], addr6p[4], addr6p[5], addr6p[6], addr6p[7], &if_idx, &plen, &scope, &dad_status, devname) != EOF) { if (strncmp(devname, ifa_name, ifnamelen) == 0) { sprintf(addr6, "%s:%s:%s:%s:%s:%s:%s:%s", addr6p[0], addr6p[1], addr6p[2], addr6p[3], addr6p[4], addr6p[5], addr6p[6], addr6p[7]); memset(&sap, 0, sizeof(sap)); if (inet_pton(AF_INET6, addr6, (struct sockaddr *)&sap.sin6_addr) != 1) { retval = 1; goto error; } inet_ntop(AF_INET6, &sap.sin6_addr, addr6, sizeof(addr6)); sap.sin6_family = AF_INET6; get_ip6_from_sockaddr(&sap, &nDPId_options.pcap_dev_ip6); memset(&sap, 0, sizeof(sap)); memset(&sap.sin6_addr.s6_addr, 0xFF, plen / 8); if (plen < 128 && (plen % 32) != 0) { #if defined(__FreeBSD__) || defined(__APPLE__) sap.sin6_addr.__u6_addr.__u6_addr32[plen / 32] = 0xFFFFFFFF << (32 - (plen % 32)); #else sap.sin6_addr.s6_addr32[plen / 32] = 0xFFFFFFFF << (32 - (plen % 32)); #endif } inet_ntop(AF_INET6, &sap.sin6_addr, netmask6, sizeof(netmask6)); sap.sin6_family = AF_INET6; get_ip6_from_sockaddr(&sap, &nDPId_options.pcap_dev_netmask6); ip_netmask_to_subnet(&nDPId_options.pcap_dev_ip6, &nDPId_options.pcap_dev_netmask6, &nDPId_options.pcap_dev_subnet6, L3_IP6); inet_ntop(AF_INET6, &nDPId_options.pcap_dev_subnet6.v6, subnet6, sizeof(subnet6)); logger(0, "%s IPv6 address/prefix netmask subnet: %s/%u %s %s", get_cmdarg(&nDPId_options.pcap_file_or_interface), addr6, plen, netmask6, subnet6); } } error: fclose(f); return retval; } static void get_ip4_address_and_netmask(struct ifaddrs const * const ifaddr) { get_ip4_from_sockaddr((struct sockaddr_in *)ifaddr->ifa_netmask, &nDPId_options.pcap_dev_netmask4); get_ip4_from_sockaddr((struct sockaddr_in *)ifaddr->ifa_addr, &nDPId_options.pcap_dev_ip4); ip_netmask_to_subnet(&nDPId_options.pcap_dev_ip4, &nDPId_options.pcap_dev_netmask4, &nDPId_options.pcap_dev_subnet4, L3_IP); { char addr[INET_ADDRSTRLEN]; char netm[INET_ADDRSTRLEN]; char subn[INET_ADDRSTRLEN]; void * saddr = &nDPId_options.pcap_dev_ip4.v4.ip; void * snetm = &nDPId_options.pcap_dev_netmask4.v4.ip; void * ssubn = &nDPId_options.pcap_dev_subnet4.v4.ip; logger(0, "%s IPv4 address netmask subnet: %s %s %s", get_cmdarg(&nDPId_options.pcap_file_or_interface), inet_ntop(AF_INET, saddr, addr, sizeof(addr)), inet_ntop(AF_INET, snetm, netm, sizeof(netm)), inet_ntop(AF_INET, ssubn, subn, sizeof(subn))); } } static int get_ip_netmask_from_pcap_dev(char const * const pcap_dev) { int retval = 0, found_dev = 0; int ip4_interface_avail = 0; int ip6_interface_avail = 0; struct ifaddrs * ifaddrs = NULL; struct ifaddrs * ifa; if (getifaddrs(&ifaddrs) != 0 || ifaddrs == NULL) { return 1; } for (ifa = ifaddrs; ifa != NULL; ifa = ifa->ifa_next) { if (ifa->ifa_addr == NULL || (ifa->ifa_flags & IFF_RUNNING) == 0) { continue; } size_t ifnamelen = strnlen(ifa->ifa_name, IFNAMSIZ); if (strncmp(ifa->ifa_name, pcap_dev, IFNAMSIZ) == 0 && ifnamelen == strnlen(pcap_dev, IFNAMSIZ)) { found_dev = 1; switch (ifa->ifa_addr->sa_family) { case AF_INET: get_ip4_address_and_netmask(ifa); ip4_interface_avail = 1; break; case AF_INET6: if (ip6_interface_avail == 0 && get_ip6_address_and_netmask(ifa->ifa_name, ifnamelen) != 0) { retval = 1; } ip6_interface_avail = 1; break; default: break; } } } if (found_dev != 0 && (nDPId_options.process_internal_initial_direction != 0 || nDPId_options.process_external_initial_direction != 0) && ip4_interface_avail == 0 && ip6_interface_avail == 0) { logger_early(1, "Interface %s does not have any IPv4 / IPv6 address set, -I / -E won't work.", pcap_dev); retval = 1; } freeifaddrs(ifaddrs); return retval; } #ifdef ENABLE_MEMORY_PROFILING static void * ndpi_malloc_wrapper(size_t const size) { void * p = malloc(sizeof(uint64_t) + size); if (p == NULL) { return NULL; } *(uint64_t *)p = size; MT_GET_AND_ADD(ndpi_memory_alloc_count, 1); MT_GET_AND_ADD(ndpi_memory_alloc_bytes, size); return (uint8_t *)p + sizeof(uint64_t); } static void ndpi_free_wrapper(void * const freeable) { void * p = (uint8_t *)freeable - sizeof(uint64_t); MT_GET_AND_ADD(ndpi_memory_free_count, 1); MT_GET_AND_ADD(ndpi_memory_free_bytes, *(uint64_t *)p); free(p); } static void log_memory_usage(struct nDPId_reader_thread const * const reader_thread) { if (reader_thread->array_index == 0) { uint64_t alloc_count = MT_GET_AND_ADD(ndpi_memory_alloc_count, 0); uint64_t free_count = MT_GET_AND_ADD(ndpi_memory_free_count, 0); uint64_t alloc_bytes = MT_GET_AND_ADD(ndpi_memory_alloc_bytes, 0); uint64_t free_bytes = MT_GET_AND_ADD(ndpi_memory_free_bytes, 0); logger(0, "MemoryProfiler: %llu allocs, %llu frees, %llu bytes allocated, %llu bytes freed, %llu blocks in " "use, " "%llu bytes in use", (long long unsigned int)alloc_count, (long long unsigned int)free_count, (long long unsigned int)alloc_bytes, (long long unsigned int)free_bytes, (long long unsigned int)(alloc_count - free_count), (long long unsigned int)(alloc_bytes - free_bytes)); #ifdef ENABLE_ZLIB uint64_t zlib_compression_count = MT_GET_AND_ADD(zlib_compressions, 0); uint64_t zlib_decompression_count = MT_GET_AND_ADD(zlib_decompressions, 0); uint64_t zlib_bytes_diff = MT_GET_AND_ADD(zlib_compression_diff, 0); uint64_t zlib_bytes_total = MT_GET_AND_ADD(zlib_compression_bytes, 0); logger(0, "MemoryProfiler (zLib): %llu compressions, %llu decompressions, %llu compressed blocks in use, %llu " "bytes diff, %llu bytes total compressed", (long long unsigned int)zlib_compression_count, (long long unsigned int)zlib_decompression_count, (long long unsigned int)zlib_compression_count - (long long unsigned int)zlib_decompression_count, (long long unsigned int)zlib_bytes_diff, (long long unsigned int)zlib_bytes_total); #endif } } #endif static void ndpi_debug_printf(unsigned int proto, struct ndpi_detection_module_struct * ndpi_struct, ndpi_log_level_t log_level, const char * file_name, const char * func_name, unsigned int line_number, const char * format, ...) { va_list vl; int written, is_log_err = 0; char buf[128]; struct nDPId_workflow * const workflow = (struct nDPId_workflow *)ndpi_get_user_data(ndpi_struct); va_start(vl, format); if ((written = vsnprintf(buf, nDPIsrvd_ARRAY_LENGTH(buf), format, vl)) >= (int)nDPIsrvd_ARRAY_LENGTH(buf)) { logger(1, "[libnDPI] Logging failure due to buffer size; current: %zu, required: %d", nDPIsrvd_ARRAY_LENGTH(buf), written); } va_end(vl); switch (log_level) { case NDPI_LOG_ERROR: workflow->libnDPI_errors++; is_log_err = 1; break; case NDPI_LOG_TRACE: is_log_err = 1; break; case NDPI_LOG_DEBUG: case NDPI_LOG_DEBUG_EXTRA: is_log_err = 0; break; } logger(is_log_err, "[libnDPI@%s.%s.%u] protocol %u.%s: %s", file_name, func_name, line_number, proto, ndpi_get_proto_name(ndpi_struct, proto), buf); } static struct nDPId_workflow * init_workflow(char const * const file_or_device) { char pcap_error_buffer[PCAP_ERRBUF_SIZE]; struct nDPId_workflow * workflow; #ifdef ENABLE_MEMORY_PROFILING set_ndpi_malloc(ndpi_malloc_wrapper); set_ndpi_free(ndpi_free_wrapper); set_ndpi_flow_malloc(NULL); set_ndpi_flow_free(NULL); #endif workflow = (struct nDPId_workflow *)ndpi_calloc(1, sizeof(*workflow)); if (workflow == NULL) { return NULL; } MT_INIT2(workflow->error_or_eof, 0); errno = 0; if (access(file_or_device, R_OK) != 0 && errno == ENOENT) { workflow->pcap_handle = pcap_open_live(file_or_device, 65535, 1, 250, pcap_error_buffer); } else { workflow->pcap_handle = pcap_open_offline_with_tstamp_precision(file_or_device, PCAP_TSTAMP_PRECISION_MICRO, pcap_error_buffer); workflow->is_pcap_file = 1; } if (workflow->pcap_handle == NULL) { logger_early(1, (workflow->is_pcap_file == 0 ? "pcap_open_live: %.*s" : "pcap_open_offline_with_tstamp_precision: %.*s"), (int)PCAP_ERRBUF_SIZE, pcap_error_buffer); free_workflow(&workflow); return NULL; } if (workflow->is_pcap_file == 0 && pcap_setnonblock(workflow->pcap_handle, 1, pcap_error_buffer) == PCAP_ERROR) { logger_early(1, "pcap_setnonblock: %.*s", (int)PCAP_ERRBUF_SIZE, pcap_error_buffer); free_workflow(&workflow); return NULL; } if (is_cmdarg_set(&nDPId_options.bpf_str) != 0) { struct bpf_program fp; if (pcap_compile(workflow->pcap_handle, &fp, get_cmdarg(&nDPId_options.bpf_str), 1, PCAP_NETMASK_UNKNOWN) != 0) { logger_early(1, "pcap_compile: %s", pcap_geterr(workflow->pcap_handle)); free_workflow(&workflow); return NULL; } if (pcap_setfilter(workflow->pcap_handle, &fp) != 0) { logger_early(1, "pcap_setfilter: %s", pcap_geterr(workflow->pcap_handle)); free_workflow(&workflow); pcap_freecode(&fp); return NULL; } pcap_freecode(&fp); } ndpi_init_prefs init_prefs = ndpi_no_prefs; workflow->ndpi_struct = ndpi_init_detection_module(init_prefs); if (workflow->ndpi_struct == NULL) { logger_early(1, "%s", "BUG: Could not init ndpi detection module"); free_workflow(&workflow); return NULL; } ndpi_set_user_data(workflow->ndpi_struct, workflow); set_ndpi_debug_function(workflow->ndpi_struct, ndpi_debug_printf); ndpi_set_log_level(workflow->ndpi_struct, NDPI_LOG_DEBUG_EXTRA); workflow->total_skipped_flows = 0; workflow->total_active_flows = 0; workflow->max_active_flows = nDPId_options.max_flows_per_thread; workflow->ndpi_flows_active = (void **)ndpi_calloc(workflow->max_active_flows, sizeof(void *)); if (workflow->ndpi_flows_active == NULL) { logger_early(1, "Could not allocate %llu bytes for (active) flow tracking", workflow->max_active_flows * sizeof(void *)); free_workflow(&workflow); return NULL; } workflow->total_idle_flows = 0; workflow->max_idle_flows = nDPId_options.max_idle_flows_per_thread; workflow->ndpi_flows_idle = (void **)ndpi_calloc(workflow->max_idle_flows, sizeof(void *)); if (workflow->ndpi_flows_idle == NULL) { logger_early(1, "Could not allocate %llu bytes for (idle) flow tracking", workflow->max_idle_flows * sizeof(void *)); free_workflow(&workflow); return NULL; } NDPI_PROTOCOL_BITMASK protos; NDPI_BITMASK_SET_ALL(protos); ndpi_set_protocol_detection_bitmask2(workflow->ndpi_struct, &protos); if (is_cmdarg_set(&nDPId_options.custom_protocols_file) != 0) { ndpi_load_protocols_file(workflow->ndpi_struct, get_cmdarg(&nDPId_options.custom_protocols_file)); } if (is_cmdarg_set(&nDPId_options.custom_categories_file) != 0) { ndpi_load_categories_file(workflow->ndpi_struct, get_cmdarg(&nDPId_options.custom_categories_file), NULL); } if (is_cmdarg_set(&nDPId_options.custom_ja3_file) != 0) { ndpi_load_malicious_ja3_file(workflow->ndpi_struct, get_cmdarg(&nDPId_options.custom_ja3_file)); } if (is_cmdarg_set(&nDPId_options.custom_sha1_file) != 0) { ndpi_load_malicious_sha1_file(workflow->ndpi_struct, get_cmdarg(&nDPId_options.custom_sha1_file)); } ndpi_finalize_initialization(workflow->ndpi_struct); ndpi_set_detection_preferences(workflow->ndpi_struct, ndpi_pref_enable_tls_block_dissection, 1); if (ndpi_init_serializer_ll(&workflow->ndpi_serializer, ndpi_serialization_format_json, NETWORK_BUFFER_MAX_SIZE) != 0) { logger_early(1, "BUG: Could not init JSON serializer with buffer size: %u bytes", NETWORK_BUFFER_MAX_SIZE); free_workflow(&workflow); return NULL; } return workflow; } static void free_analysis_data(struct nDPId_flow_extended * const flow_ext) { if (nDPId_options.enable_data_analysis != 0 && flow_ext->flow_analysis != NULL) { ndpi_free_data_analysis(&flow_ext->flow_analysis->iat, 0); ndpi_free_data_analysis(&flow_ext->flow_analysis->pktlen, 0); ndpi_free(flow_ext->flow_analysis->directions); ndpi_free_bin(&flow_ext->flow_analysis->payload_len_bin[FD_SRC2DST]); ndpi_free_bin(&flow_ext->flow_analysis->payload_len_bin[FD_DST2SRC]); ndpi_free(flow_ext->flow_analysis->entropies); ndpi_free(flow_ext->flow_analysis); flow_ext->flow_analysis = NULL; } } static void free_detection_data(struct nDPId_flow * const flow) { if (flow->info.detection_data != NULL) { ndpi_free_flow_data(&flow->info.detection_data->flow); ndpi_free(flow->info.detection_data); flow->info.detection_data = NULL; } } static int alloc_detection_data(struct nDPId_flow * const flow) { flow->info.detection_data = (struct nDPId_detection_data *)ndpi_flow_malloc(sizeof(*flow->info.detection_data)); if (flow->info.detection_data == NULL) { goto error; } memset(flow->info.detection_data, 0, sizeof(*flow->info.detection_data)); if (nDPId_options.enable_data_analysis != 0) { flow->flow_extended.flow_analysis = (struct nDPId_flow_analysis *)ndpi_malloc(sizeof(*flow->flow_extended.flow_analysis)); if (flow->flow_extended.flow_analysis == NULL) { goto error; } ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->iat, nDPId_options.max_packets_per_flow_to_analyse - 1 /* first packet IAT is always 0 */); ndpi_init_data_analysis(&flow->flow_extended.flow_analysis->pktlen, nDPId_options.max_packets_per_flow_to_analyse); flow->flow_extended.flow_analysis->directions = (uint8_t *)ndpi_malloc( sizeof(*flow->flow_extended.flow_analysis->directions) * nDPId_options.max_packets_per_flow_to_analyse); flow->flow_extended.flow_analysis->entropies = (float *)ndpi_malloc( sizeof(*flow->flow_extended.flow_analysis->entropies) * nDPId_options.max_packets_per_flow_to_analyse); if (ndpi_init_bin(&flow->flow_extended.flow_analysis->payload_len_bin[FD_SRC2DST], ndpi_bin_family8, nDPId_ANALYZE_PLEN_NUM_BINS) != 0 || ndpi_init_bin(&flow->flow_extended.flow_analysis->payload_len_bin[FD_DST2SRC], ndpi_bin_family8, nDPId_ANALYZE_PLEN_NUM_BINS) != 0 || flow->flow_extended.flow_analysis->iat.values == NULL || flow->flow_extended.flow_analysis->pktlen.values == NULL || flow->flow_extended.flow_analysis->directions == NULL || flow->flow_extended.flow_analysis->entropies == NULL) { goto error; } } return 0; error: free_detection_data(flow); flow->info.detection_completed = 1; return 1; } static void ndpi_flow_info_free(void * const node) { struct nDPId_flow_basic * const flow_basic = (struct nDPId_flow_basic *)node; switch (flow_basic->state) { case FS_UNKNOWN: case FS_COUNT: case FS_SKIPPED: break; case FS_FINISHED: { struct nDPId_flow_extended * const flow_ext = (struct nDPId_flow_extended *)flow_basic; free_analysis_data(flow_ext); break; } case FS_INFO: { struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic; free_analysis_data(&flow->flow_extended); free_detection_data(flow); break; } } ndpi_free(flow_basic); } static void free_workflow(struct nDPId_workflow ** const workflow) { struct nDPId_workflow * const w = *workflow; if (w == NULL) { return; } if (w->pcap_handle != NULL) { pcap_close(w->pcap_handle); w->pcap_handle = NULL; } if (w->ndpi_struct != NULL) { ndpi_exit_detection_module(w->ndpi_struct); } for (size_t i = 0; i < w->max_active_flows; i++) { if (w->ndpi_flows_active != NULL) { ndpi_tdestroy(w->ndpi_flows_active[i], ndpi_flow_info_free); } } ndpi_free(w->ndpi_flows_active); ndpi_free(w->ndpi_flows_idle); ndpi_term_serializer(&w->ndpi_serializer); ndpi_free(w); *workflow = NULL; } static char * get_default_pcapdev(char * errbuf) { char * ifname; pcap_if_t * all_devices = NULL; if (pcap_findalldevs(&all_devices, errbuf) != 0) { return NULL; } ifname = strdup(all_devices[0].name); pcap_freealldevs(all_devices); return ifname; } static int setup_reader_threads(void) { char pcap_error_buffer[PCAP_ERRBUF_SIZE]; if (nDPId_options.reader_thread_count > nDPId_MAX_READER_THREADS) { return 1; } if (is_cmdarg_set(&nDPId_options.pcap_file_or_interface) == 0) { char * const pcapdev = get_default_pcapdev(pcap_error_buffer); set_cmdarg(&nDPId_options.pcap_file_or_interface, pcapdev); free(pcapdev); if (is_cmdarg_set(&nDPId_options.pcap_file_or_interface) == 0) { logger_early(1, "pcap_lookupdev: %.*s", (int)PCAP_ERRBUF_SIZE, pcap_error_buffer); return 1; } logger_early(0, "Capturing packets from default device: %s", get_cmdarg(&nDPId_options.pcap_file_or_interface)); } errno = 0; if (access(get_cmdarg(&nDPId_options.pcap_file_or_interface), R_OK) != 0 && errno == ENOENT) { errno = 0; if (get_ip_netmask_from_pcap_dev(get_cmdarg(&nDPId_options.pcap_file_or_interface)) != 0) { if (errno != 0) { logger_early(1, "Could not get netmask for pcap device %s: %s", get_cmdarg(&nDPId_options.pcap_file_or_interface), strerror(errno)); } else { logger_early(1, "Unexpected error while retrieving netmask for pcap device %s", get_cmdarg(&nDPId_options.pcap_file_or_interface)); } return 1; } } else { if (nDPId_options.process_internal_initial_direction != 0) { logger_early(1, "%s", "You are processing a PCAP file, `-I' ignored"); nDPId_options.process_internal_initial_direction = 0; } if (nDPId_options.process_external_initial_direction != 0) { logger_early(1, "%s", "You are processing a PCAP file, `-E' ignored"); nDPId_options.process_external_initial_direction = 0; } } for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { reader_threads[i].workflow = init_workflow(get_cmdarg(&nDPId_options.pcap_file_or_interface)); if (reader_threads[i].workflow == NULL) { return 1; } } return 0; } static int ip_tuples_compare(struct nDPId_flow_basic const * const A, struct nDPId_flow_basic const * const B) { // generate a warning if the enum changes switch (A->l3_type) { case L3_IP: case L3_IP6: break; } if (A->l3_type == L3_IP && B->l3_type == L3_IP) { if (A->src.v4.ip < B->src.v4.ip) { return -1; } if (A->src.v4.ip > B->src.v4.ip) { return 1; } if (A->dst.v4.ip < B->dst.v4.ip) { return -1; } if (A->dst.v4.ip > B->dst.v4.ip) { return 1; } } else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6) { if (A->src.v6.ip[0] < B->src.v6.ip[0] && A->src.v6.ip[1] < B->src.v6.ip[1]) { return -1; } if (A->src.v6.ip[0] > B->src.v6.ip[0] && A->src.v6.ip[1] > B->src.v6.ip[1]) { return 1; } if (A->dst.v6.ip[0] < B->dst.v6.ip[0] && A->dst.v6.ip[1] < B->dst.v6.ip[1]) { return -1; } if (A->dst.v6.ip[0] > B->dst.v6.ip[0] && A->dst.v6.ip[1] > B->dst.v6.ip[1]) { return 1; } } if (A->src_port < B->src_port) { return -1; } if (A->src_port > B->src_port) { return 1; } if (A->dst_port < B->dst_port) { return -1; } if (A->dst_port > B->dst_port) { return 1; } return 0; } static uint64_t get_l4_protocol_idle_time(uint8_t l4_protocol) { switch (l4_protocol) { case IPPROTO_ICMP: case IPPROTO_ICMPV6: return nDPId_options.icmp_max_idle_time; case IPPROTO_TCP: return nDPId_options.tcp_max_idle_time; case IPPROTO_UDP: return nDPId_options.udp_max_idle_time; default: return nDPId_options.generic_max_idle_time; } } static uint64_t get_l4_protocol_idle_time_external(uint8_t l4_protocol) { uint64_t idle_time = get_l4_protocol_idle_time(l4_protocol); idle_time += nDPId_options.flow_scan_interval * 2; if (l4_protocol == IPPROTO_TCP) { idle_time += nDPId_options.tcp_max_post_end_flow_time; } return idle_time; } static int is_l4_protocol_timed_out(struct nDPId_workflow const * const workflow, struct nDPId_flow_basic const * const flow_basic) { uint64_t itime = get_l4_protocol_idle_time(flow_basic->l4_protocol); return flow_basic->tcp_fin_rst_seen == 1 || get_last_pkt_time(flow_basic) + itime <= workflow->last_thread_time; } static int is_tcp_post_end(struct nDPId_workflow const * const workflow, struct nDPId_flow_basic const * const flow_basic) { return flow_basic->l4_protocol != IPPROTO_TCP || flow_basic->tcp_fin_rst_seen == 0 || (flow_basic->tcp_fin_rst_seen == 1 && get_last_pkt_time(flow_basic) + nDPId_options.tcp_max_post_end_flow_time <= workflow->last_thread_time); } static int is_flow_update_required(struct nDPId_workflow const * const workflow, struct nDPId_flow_extended const * const flow_ext) { uint64_t itime = get_l4_protocol_idle_time(flow_ext->flow_basic.l4_protocol); if (flow_ext->flow_basic.l4_protocol != IPPROTO_TCP) { return flow_ext->last_flow_update + itime / 4 <= workflow->last_thread_time; } return flow_ext->last_flow_update + itime <= workflow->last_thread_time; } static int is_error_event_threshold(struct nDPId_workflow * const workflow) { if (workflow->last_global_time - workflow->last_error_time > nDPId_options.error_event_threshold_time) { workflow->error_count = 0; } workflow->last_error_time = workflow->last_global_time; if (workflow->error_count >= nDPId_options.error_event_threshold_n) { return 1; } workflow->error_count++; return 0; } static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) { struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data; struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; (void)depth; if (workflow == NULL || flow_basic == NULL) { return; } if (workflow->cur_idle_flows == nDPId_options.max_idle_flows_per_thread) { return; } if (which == ndpi_preorder || which == ndpi_leaf) { if (is_l4_protocol_timed_out(workflow, flow_basic) != 0) { if (is_tcp_post_end(workflow, flow_basic) != 0) { workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow_basic; switch (flow_basic->state) { case FS_UNKNOWN: case FS_COUNT: case FS_SKIPPED: break; case FS_FINISHED: case FS_INFO: workflow->total_idle_flows++; break; } } } } } static int ndpi_workflow_node_cmp(void const * const A, void const * const B) { struct nDPId_flow_basic const * const flow_basic_a = (struct nDPId_flow_basic *)A; struct nDPId_flow_basic const * const flow_basic_b = (struct nDPId_flow_basic *)B; if (flow_basic_a->hashval < flow_basic_b->hashval) { return -1; } else if (flow_basic_a->hashval > flow_basic_b->hashval) { return 1; } /* flows have the same hash */ if (flow_basic_a->l4_protocol < flow_basic_b->l4_protocol) { return -1; } else if (flow_basic_a->l4_protocol > flow_basic_b->l4_protocol) { return 1; } return ip_tuples_compare(flow_basic_a, flow_basic_b); } static void process_idle_flow(struct nDPId_reader_thread * const reader_thread, size_t idle_scan_index) { struct nDPId_workflow * const workflow = reader_thread->workflow; while (workflow->cur_idle_flows > 0) { struct nDPId_flow_basic * const flow_basic = (struct nDPId_flow_basic *)workflow->ndpi_flows_idle[--workflow->cur_idle_flows]; switch (flow_basic->state) { case FS_UNKNOWN: case FS_COUNT: case FS_SKIPPED: break; case FS_FINISHED: { struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic; if (flow->flow_extended.flow_basic.tcp_fin_rst_seen != 0) { jsonize_flow_event(reader_thread, &flow->flow_extended, FLOW_EVENT_END); } else { jsonize_flow_event(reader_thread, &flow->flow_extended, FLOW_EVENT_IDLE); } break; } case FS_INFO: { struct nDPId_flow * const flow = (struct nDPId_flow *)flow_basic; #ifdef ENABLE_ZLIB if (nDPId_options.enable_zlib_compression != 0 && flow->info.detection_data_compressed_size > 0) { workflow->current_compression_diff -= flow->info.detection_data_compressed_size; int ret = detection_data_inflate(flow); if (ret <= 0) { workflow->current_compression_diff += flow->info.detection_data_compressed_size; logger(1, "zLib decompression failed with error code: %d", ret); return; } } #endif if (flow->info.detection_completed == 0) { uint8_t protocol_was_guessed = 0; if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow->info.detection_data->guessed_l7_protocol) == 0) { flow->info.detection_data->guessed_l7_protocol = ndpi_detection_giveup( workflow->ndpi_struct, &flow->info.detection_data->flow, 1, &protocol_was_guessed); } else { protocol_was_guessed = 1; } if (protocol_was_guessed != 0) { workflow->total_guessed_flows++; jsonize_flow_detection_event(reader_thread, flow, FLOW_EVENT_GUESSED); } else { workflow->total_not_detected_flows++; jsonize_flow_detection_event(reader_thread, flow, FLOW_EVENT_NOT_DETECTED); } } if (flow->flow_extended.flow_basic.tcp_fin_rst_seen != 0) { jsonize_flow_event(reader_thread, &flow->flow_extended, FLOW_EVENT_END); } else { jsonize_flow_event(reader_thread, &flow->flow_extended, FLOW_EVENT_IDLE); } break; } } ndpi_tdelete(flow_basic, &workflow->ndpi_flows_active[idle_scan_index], ndpi_workflow_node_cmp); ndpi_flow_info_free(flow_basic); workflow->cur_active_flows--; } } static void check_for_idle_flows(struct nDPId_reader_thread * const reader_thread) { struct nDPId_workflow * const workflow = reader_thread->workflow; for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index) { ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow); process_idle_flow(reader_thread, idle_scan_index); } } static void ndpi_flow_update_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) { struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)user_data; struct nDPId_workflow * const workflow = reader_thread->workflow; struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; (void)depth; if (workflow == NULL || flow_basic == NULL) { return; } if (which == ndpi_preorder || which == ndpi_leaf) { switch (flow_basic->state) { case FS_UNKNOWN: case FS_COUNT: case FS_SKIPPED: break; case FS_FINISHED: case FS_INFO: { struct nDPId_flow_extended * const flow_ext = (struct nDPId_flow_extended *)flow_basic; if (is_flow_update_required(workflow, flow_ext) != 0) { workflow->total_flow_updates++; jsonize_flow_event(reader_thread, flow_ext, FLOW_EVENT_UPDATE); flow_ext->last_flow_update = workflow->last_thread_time; } break; } } } } static void check_for_flow_updates(struct nDPId_reader_thread * const reader_thread) { struct nDPId_workflow * const workflow = reader_thread->workflow; for (size_t update_scan_index = 0; update_scan_index < workflow->max_active_flows; ++update_scan_index) { ndpi_twalk(workflow->ndpi_flows_active[update_scan_index], ndpi_flow_update_scan_walker, reader_thread); } } static void jsonize_l3_l4(struct nDPId_workflow * const workflow, struct nDPId_flow_basic const * const flow_basic) { ndpi_serializer * const serializer = &workflow->ndpi_serializer; char src_name[48] = {}; char dst_name[48] = {}; switch (flow_basic->l3_type) { case L3_IP: ndpi_serialize_string_string(serializer, "l3_proto", "ip4"); if (inet_ntop(AF_INET, &flow_basic->src.v4.ip, src_name, sizeof(src_name)) == NULL) { logger(1, "Could not convert IPv4 source ip to string: %s", strerror(errno)); } if (inet_ntop(AF_INET, &flow_basic->dst.v4.ip, dst_name, sizeof(dst_name)) == NULL) { logger(1, "Could not convert IPv4 destination ip to string: %s", strerror(errno)); } break; case L3_IP6: ndpi_serialize_string_string(serializer, "l3_proto", "ip6"); if (inet_ntop(AF_INET6, &flow_basic->src.v6.ip[0], src_name, sizeof(src_name)) == NULL) { logger(1, "Could not convert IPv6 source ip to string: %s", strerror(errno)); } if (inet_ntop(AF_INET6, &flow_basic->dst.v6.ip[0], dst_name, sizeof(dst_name)) == NULL) { logger(1, "Could not convert IPv6 destination ip to string: %s", strerror(errno)); } /* For consistency across platforms replace :0: with :: */ ndpi_patchIPv6Address(src_name), ndpi_patchIPv6Address(dst_name); break; default: ndpi_serialize_string_string(serializer, "l3_proto", "unknown"); } ndpi_serialize_string_string(serializer, "src_ip", src_name); ndpi_serialize_string_string(serializer, "dst_ip", dst_name); if (flow_basic->src_port) { ndpi_serialize_string_uint32(serializer, "src_port", flow_basic->src_port); } if (flow_basic->dst_port) { ndpi_serialize_string_uint32(serializer, "dst_port", flow_basic->dst_port); } switch (flow_basic->l4_protocol) { case IPPROTO_TCP: ndpi_serialize_string_string(serializer, "l4_proto", "tcp"); break; case IPPROTO_UDP: ndpi_serialize_string_string(serializer, "l4_proto", "udp"); break; case IPPROTO_ICMP: ndpi_serialize_string_string(serializer, "l4_proto", "icmp"); break; case IPPROTO_ICMPV6: ndpi_serialize_string_string(serializer, "l4_proto", "icmp6"); break; default: ndpi_serialize_string_uint32(serializer, "l4_proto", flow_basic->l4_protocol); break; } } static void jsonize_basic(struct nDPId_reader_thread * const reader_thread, int serialize_thread_id) { struct nDPId_workflow * const workflow = reader_thread->workflow; if (serialize_thread_id != 0) { ndpi_serialize_string_int32(&workflow->ndpi_serializer, "thread_id", reader_thread->array_index); } ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "packet_id", workflow->packets_captured); ndpi_serialize_string_string(&workflow->ndpi_serializer, "source", get_cmdarg(&nDPId_options.pcap_file_or_interface)); ndpi_serialize_string_string(&workflow->ndpi_serializer, "alias", get_cmdarg(&nDPId_options.instance_alias)); } static void jsonize_daemon(struct nDPId_reader_thread * const reader_thread, enum daemon_event event) { char const ev[] = "daemon_event_name"; struct nDPId_workflow * const workflow = reader_thread->workflow; if (event == DAEMON_EVENT_RECONNECT) { ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer); } ndpi_serialize_string_int32(&workflow->ndpi_serializer, "daemon_event_id", event); if (event > DAEMON_EVENT_INVALID && event < DAEMON_EVENT_COUNT) { ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, daemon_event_name_table[event]); } else { ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, daemon_event_name_table[DAEMON_EVENT_INVALID]); } jsonize_basic(reader_thread, 1); #ifndef PKG_VERSION ndpi_serialize_string_string(&workflow->ndpi_serializer, "version", "unknown"); #else ndpi_serialize_string_string(&workflow->ndpi_serializer, "version", PKG_VERSION); #endif ndpi_serialize_string_string(&workflow->ndpi_serializer, "ndpi_version", ndpi_revision()); switch (event) { case DAEMON_EVENT_INVALID: case DAEMON_EVENT_COUNT: break; case DAEMON_EVENT_INIT: case DAEMON_EVENT_RECONNECT: ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "max-flows-per-thread", nDPId_options.max_flows_per_thread); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "max-idle-flows-per-thread", nDPId_options.max_idle_flows_per_thread); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "reader-thread-count", nDPId_options.reader_thread_count); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow-scan-interval", nDPId_options.flow_scan_interval); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "generic-max-idle-time", nDPId_options.generic_max_idle_time); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "icmp-max-idle-time", nDPId_options.icmp_max_idle_time); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "udp-max-idle-time", nDPId_options.udp_max_idle_time); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "tcp-max-idle-time", nDPId_options.tcp_max_idle_time + nDPId_options.tcp_max_post_end_flow_time); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "max-packets-per-flow-to-send", nDPId_options.max_packets_per_flow_to_send); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "max-packets-per-flow-to-process", nDPId_options.max_packets_per_flow_to_process); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "max-packets-per-flow-to-analyse", nDPId_options.max_packets_per_flow_to_analyse); break; case DAEMON_EVENT_STATUS: case DAEMON_EVENT_SHUTDOWN: ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "packets-captured", workflow->packets_captured); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "packets-processed", workflow->packets_processed); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-skipped-flows", workflow->total_skipped_flows); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-l4-payload-len", workflow->total_l4_payload_len); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-not-detected-flows", workflow->total_not_detected_flows); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-guessed-flows", workflow->total_guessed_flows); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-detected-flows", workflow->total_detected_flows); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-detection-updates", workflow->total_flow_detection_updates); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-updates", workflow->total_flow_updates); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "current-active-flows", workflow->cur_active_flows); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-active-flows", workflow->total_active_flows); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-idle-flows", workflow->total_idle_flows); #if defined(ENABLE_ZLIB) && !defined(NO_MAIN) /* Compression diff's may very from run to run. Due to this, `nDPId-test' would be inconsistent. */ ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-compressions", workflow->total_compressions); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-compression-diff", workflow->total_compression_diff); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "current-compression-diff", workflow->current_compression_diff); #else ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-compressions", 0); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-compression-diff", 0); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "current-compression-diff", 0); #endif ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "total-events-serialized", workflow->total_events_serialized + 1 /* DAEMON_EVENT_SHUTDOWN is an event as well */); break; } ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_usec", workflow->last_global_time); serialize_and_send(reader_thread); } static void jsonize_flow(struct nDPId_workflow * const workflow, struct nDPId_flow_extended const * const flow_ext) { ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_id", flow_ext->flow_id); ndpi_serialize_string_string(&workflow->ndpi_serializer, "flow_state", flow_state_name_table[flow_ext->flow_basic.state]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_src_packets_processed", flow_ext->packets_processed[FD_SRC2DST]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_dst_packets_processed", flow_ext->packets_processed[FD_DST2SRC]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_first_seen", flow_ext->first_seen); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_src_last_pkt_time", flow_ext->flow_basic.last_pkt_time[FD_SRC2DST]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_dst_last_pkt_time", flow_ext->flow_basic.last_pkt_time[FD_DST2SRC]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_idle_time", get_l4_protocol_idle_time_external(flow_ext->flow_basic.l4_protocol)); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_src_min_l4_payload_len", flow_ext->min_l4_payload_len[FD_SRC2DST]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_dst_min_l4_payload_len", flow_ext->min_l4_payload_len[FD_DST2SRC]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_src_max_l4_payload_len", flow_ext->max_l4_payload_len[FD_SRC2DST]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_dst_max_l4_payload_len", flow_ext->max_l4_payload_len[FD_DST2SRC]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_src_tot_l4_payload_len", flow_ext->total_l4_payload_len[FD_SRC2DST]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_dst_tot_l4_payload_len", flow_ext->total_l4_payload_len[FD_DST2SRC]); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "midstream", flow_ext->flow_basic.tcp_is_midstream_flow); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_usec", workflow->last_thread_time); } static int connect_to_collector(struct nDPId_reader_thread * const reader_thread) { if (reader_thread->collector_sockfd >= 0) { close(reader_thread->collector_sockfd); } int sock_type = (collector_address.raw.sa_family == AF_UNIX ? SOCK_STREAM : SOCK_DGRAM); reader_thread->collector_sockfd = socket(collector_address.raw.sa_family, sock_type, 0); if (reader_thread->collector_sockfd < 0 || set_fd_cloexec(reader_thread->collector_sockfd) < 0) { reader_thread->collector_sock_last_errno = errno; return 1; } int opt = NETWORK_BUFFER_MAX_SIZE; if (setsockopt(reader_thread->collector_sockfd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)) < 0) { return 1; } if (set_collector_nonblock(reader_thread) != 0) { return 1; } if (connect(reader_thread->collector_sockfd, &collector_address.raw, collector_address.size) < 0) { reader_thread->collector_sock_last_errno = errno; return 1; } if (shutdown(reader_thread->collector_sockfd, SHUT_RD) != 0) { reader_thread->collector_sock_last_errno = errno; return 1; } reader_thread->collector_sock_last_errno = 0; return 0; } static void send_to_collector(struct nDPId_reader_thread * const reader_thread, char const * const json_str, size_t json_str_len) { struct nDPId_workflow * const workflow = reader_thread->workflow; int saved_errno; int s_ret; char newline_json_str[NETWORK_BUFFER_MAX_SIZE]; s_ret = snprintf(newline_json_str, sizeof(newline_json_str), "%0" NETWORK_BUFFER_LENGTH_DIGITS_STR "zu%.*s\n", json_str_len + 1, (int)json_str_len, json_str); if (s_ret < 0 || s_ret >= (int)sizeof(newline_json_str)) { logger(1, "[%8llu, %zu] JSON buffer prepare failed: snprintf returned %d, buffer size %zu", workflow->packets_captured, reader_thread->array_index, s_ret, sizeof(newline_json_str)); if (s_ret >= (int)sizeof(newline_json_str)) { logger(1, "[%8llu, %zu] JSON string: %.*s...", workflow->packets_captured, reader_thread->array_index, ndpi_min(512, NETWORK_BUFFER_MAX_SIZE), newline_json_str); } return; } if (reader_thread->collector_sock_last_errno != 0) { saved_errno = reader_thread->collector_sock_last_errno; if (connect_to_collector(reader_thread) == 0) { if (collector_address.raw.sa_family == AF_UNIX) { logger(1, "[%8llu, %zu] Reconnected to nDPIsrvd Collector at %s", workflow->packets_captured, reader_thread->array_index, get_cmdarg(&nDPId_options.collector_address)); jsonize_daemon(reader_thread, DAEMON_EVENT_RECONNECT); } } else { if (saved_errno != reader_thread->collector_sock_last_errno) { logger(1, "[%8llu, %zu] Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s", workflow->packets_captured, reader_thread->array_index, get_cmdarg(&nDPId_options.collector_address), (reader_thread->collector_sock_last_errno != 0 ? strerror(reader_thread->collector_sock_last_errno) : "Internal Error.")); } return; } } errno = 0; ssize_t written; if (reader_thread->collector_sock_last_errno == 0 && (written = write(reader_thread->collector_sockfd, newline_json_str, s_ret)) != s_ret) { saved_errno = errno; if (saved_errno == EPIPE || written == 0) { logger(1, "[%8llu, %zu] Lost connection to nDPIsrvd Collector", workflow->packets_captured, reader_thread->array_index); } if (saved_errno != EAGAIN) { if (saved_errno == ECONNREFUSED) { logger(1, "[%8llu, %zu] %s to %s refused by endpoint", workflow->packets_captured, reader_thread->array_index, (collector_address.raw.sa_family == AF_UNIX ? "Connection" : "Datagram"), get_cmdarg(&nDPId_options.collector_address)); } reader_thread->collector_sock_last_errno = saved_errno; } else if (collector_address.raw.sa_family == AF_UNIX) { size_t pos = (written < 0 ? 0 : written); set_collector_block(reader_thread); while ((size_t)(written = write(reader_thread->collector_sockfd, newline_json_str + pos, s_ret - pos)) != s_ret - pos) { saved_errno = errno; if (saved_errno == EPIPE || written == 0) { logger(1, "[%8llu, %zu] Lost connection to nDPIsrvd Collector", workflow->packets_captured, reader_thread->array_index); reader_thread->collector_sock_last_errno = saved_errno; break; } else if (written < 0) { logger(1, "[%8llu, %zu] Send data (blocking I/O) to nDPIsrvd Collector at %s failed: %s", workflow->packets_captured, reader_thread->array_index, get_cmdarg(&nDPId_options.collector_address), strerror(saved_errno)); reader_thread->collector_sock_last_errno = saved_errno; break; } else { pos += written; } } set_collector_nonblock(reader_thread); } } } static void serialize_and_send(struct nDPId_reader_thread * const reader_thread) { char * json_str; uint32_t json_str_len; json_str = ndpi_serializer_get_buffer(&reader_thread->workflow->ndpi_serializer, &json_str_len); if (json_str == NULL || json_str_len == 0) { logger(1, "[%8llu, %zu] jsonize failed, buffer length: %u", reader_thread->workflow->packets_captured, reader_thread->array_index, json_str_len); } else { reader_thread->workflow->total_events_serialized++; send_to_collector(reader_thread, json_str, json_str_len); } ndpi_reset_serializer(&reader_thread->workflow->ndpi_serializer); } /* Slightly modified code from: https://en.wikibooks.org/wiki/Algorithm_Implementation/Miscellaneous/Base64 */ static int base64_encode(uint8_t const * const data_buf, size_t dataLength, char * const result, size_t * const resultSize) { static const char base64chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; const uint8_t * data = (const uint8_t *)data_buf; size_t resultIndex = 0; size_t x; uint32_t n = 0; int padCount = dataLength % 3; uint8_t n0, n1, n2, n3; /* increment over the length of the string, three characters at a time */ for (x = 0; x < dataLength; x += 3) { /* these three 8-bit (ASCII) characters become one 24-bit number */ n = ((uint32_t)data[x]) << 16; // parenthesis needed, compiler depending on flags can do the shifting before // conversion to uint32_t, resulting to 0 if ((x + 1) < dataLength) n += ((uint32_t)data[x + 1]) << 8; // parenthesis needed, compiler depending on flags can do the shifting // before conversion to uint32_t, resulting to 0 if ((x + 2) < dataLength) n += data[x + 2]; /* this 24-bit number gets separated into four 6-bit numbers */ n0 = (uint8_t)(n >> 18) & 63; n1 = (uint8_t)(n >> 12) & 63; n2 = (uint8_t)(n >> 6) & 63; n3 = (uint8_t)n & 63; /* * if we have one byte available, then its encoding is spread * out over two characters */ if (resultIndex >= *resultSize) return 1; /* indicate failure: buffer too small */ result[resultIndex++] = base64chars[n0]; if (resultIndex >= *resultSize) return 1; /* indicate failure: buffer too small */ result[resultIndex++] = base64chars[n1]; /* * if we have only two bytes available, then their encoding is * spread out over three chars */ if ((x + 1) < dataLength) { if (resultIndex >= *resultSize) return 1; /* indicate failure: buffer too small */ result[resultIndex++] = base64chars[n2]; } /* * if we have all three bytes available, then their encoding is spread * out over four characters */ if ((x + 2) < dataLength) { if (resultIndex >= *resultSize) return 1; /* indicate failure: buffer too small */ result[resultIndex++] = base64chars[n3]; } } /* * create and add padding that is required if we did not have a multiple of 3 * number of characters available */ if (padCount > 0) { for (; padCount < 3; padCount++) { if (resultIndex >= *resultSize) return 1; /* indicate failure: buffer too small */ result[resultIndex++] = '='; } } if (resultIndex >= *resultSize) return 1; /* indicate failure: buffer too small */ result[resultIndex] = 0; *resultSize = resultIndex; return 0; /* indicate success */ } static void jsonize_data_analysis(struct nDPId_reader_thread * const reader_thread, struct nDPId_flow_extended const * const flow_ext) { struct nDPId_workflow * const workflow = reader_thread->workflow; struct nDPId_flow_analysis * const analysis = (struct nDPId_flow_analysis *)flow_ext->flow_analysis; if (nDPId_options.enable_data_analysis != 0 && flow_ext->flow_analysis != NULL) { ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "data_analysis"); ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "iat"); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "min", ndpi_data_min(&analysis->iat)); ndpi_serialize_string_float(&workflow->ndpi_serializer, "avg", ndpi_data_average(&analysis->iat), "%.1f"); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "max", ndpi_data_max(&analysis->iat)); ndpi_serialize_string_float(&workflow->ndpi_serializer, "stddev", ndpi_data_stddev(&analysis->iat), "%.1f"); ndpi_serialize_string_float(&workflow->ndpi_serializer, "var", ndpi_data_variance(&analysis->iat), "%.1f"); ndpi_serialize_string_float(&workflow->ndpi_serializer, "ent", ndpi_data_entropy(&analysis->iat), "%.1f"); ndpi_serialize_start_of_list(&workflow->ndpi_serializer, "data"); for (uint16_t i = 0; i < analysis->iat.num_values_array_len; ++i) { ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "", analysis->iat.values[i]); } ndpi_serialize_end_of_list(&workflow->ndpi_serializer); ndpi_serialize_end_of_block(&workflow->ndpi_serializer); ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "pktlen"); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "min", ndpi_data_min(&analysis->pktlen)); ndpi_serialize_string_float(&workflow->ndpi_serializer, "avg", ndpi_data_average(&analysis->pktlen), "%.1f"); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "max", ndpi_data_max(&analysis->pktlen)); ndpi_serialize_string_float(&workflow->ndpi_serializer, "stddev", ndpi_data_stddev(&analysis->pktlen), "%.1f"); ndpi_serialize_string_float(&workflow->ndpi_serializer, "var", ndpi_data_variance(&analysis->pktlen), "%.1f"); ndpi_serialize_string_float(&workflow->ndpi_serializer, "ent", ndpi_data_entropy(&analysis->pktlen), "%.1f"); ndpi_serialize_start_of_list(&workflow->ndpi_serializer, "data"); for (uint16_t i = 0; i < ndpi_min(analysis->pktlen.num_data_entries, analysis->pktlen.num_values_array_len); ++i) { ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "", analysis->pktlen.values[i]); } ndpi_serialize_end_of_list(&workflow->ndpi_serializer); ndpi_serialize_end_of_block(&workflow->ndpi_serializer); ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "bins"); ndpi_serialize_start_of_list(&workflow->ndpi_serializer, "c_to_s"); for (uint16_t i = 0; i < analysis->payload_len_bin[FD_SRC2DST].num_bins; ++i) { ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "", analysis->payload_len_bin[FD_SRC2DST].u.bins8[i]); } ndpi_serialize_end_of_list(&workflow->ndpi_serializer); ndpi_serialize_start_of_list(&workflow->ndpi_serializer, "s_to_c"); for (uint16_t i = 0; i < analysis->payload_len_bin[FD_DST2SRC].num_bins; ++i) { ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "", analysis->payload_len_bin[FD_DST2SRC].u.bins8[i]); } ndpi_serialize_end_of_list(&workflow->ndpi_serializer); ndpi_serialize_end_of_block(&workflow->ndpi_serializer); ndpi_serialize_start_of_list(&workflow->ndpi_serializer, "directions"); for (unsigned long long int i = 0; i < nDPId_options.max_packets_per_flow_to_analyse; ++i) { ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "", analysis->directions[i]); } ndpi_serialize_end_of_list(&workflow->ndpi_serializer); ndpi_serialize_start_of_list(&workflow->ndpi_serializer, "entropies"); for (unsigned long long int i = 0; i < nDPId_options.max_packets_per_flow_to_analyse; ++i) { ndpi_serialize_string_float(&workflow->ndpi_serializer, "", analysis->entropies[i], "%.9f"); } ndpi_serialize_end_of_list(&workflow->ndpi_serializer); ndpi_serialize_end_of_block(&workflow->ndpi_serializer); } } static void jsonize_packet_event(struct nDPId_reader_thread * const reader_thread, struct pcap_pkthdr const * const header, uint8_t const * const packet, uint16_t pkt_type, uint16_t pkt_l3_offset, uint16_t pkt_l4_offset, uint16_t pkt_l4_len, struct nDPId_flow_extended const * const flow_ext, enum packet_event event) { struct nDPId_workflow * const workflow = reader_thread->workflow; char const ev[] = "packet_event_name"; if (event == PACKET_EVENT_PAYLOAD_FLOW) { if (flow_ext == NULL) { logger(1, "[%8llu, %zu] BUG: got a PACKET_EVENT_PAYLOAD_FLOW with a flow pointer equals NULL", reader_thread->workflow->packets_captured, reader_thread->array_index); return; } if (flow_ext->packets_processed[FD_SRC2DST] + flow_ext->packets_processed[FD_DST2SRC] > nDPId_options.max_packets_per_flow_to_send) { return; } } ndpi_serialize_string_int32(&workflow->ndpi_serializer, "packet_event_id", event); if (event > PACKET_EVENT_INVALID && event < PACKET_EVENT_COUNT) { ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, packet_event_name_table[event]); } else { ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, packet_event_name_table[PACKET_EVENT_INVALID]); } jsonize_basic(reader_thread, (event == PACKET_EVENT_PAYLOAD_FLOW ? 1 : 0)); if (event == PACKET_EVENT_PAYLOAD_FLOW) { ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_id", flow_ext->flow_id); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_packet_id", flow_ext->packets_processed[FD_SRC2DST] + flow_ext->packets_processed[FD_DST2SRC]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_src_last_pkt_time", flow_ext->flow_basic.last_pkt_time[FD_SRC2DST]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_dst_last_pkt_time", flow_ext->flow_basic.last_pkt_time[FD_DST2SRC]); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "flow_idle_time", get_l4_protocol_idle_time_external(flow_ext->flow_basic.l4_protocol)); } ndpi_serialize_string_int32(&workflow->ndpi_serializer, "pkt_datalink", pcap_datalink(reader_thread->workflow->pcap_handle)); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_caplen", header->caplen); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_type", pkt_type); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l3_offset", pkt_l3_offset); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_offset", pkt_l4_offset); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_len", header->len); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "pkt_l4_len", pkt_l4_len); ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "thread_ts_usec", workflow->last_thread_time); char base64_data[nDPId_PACKETS_PLEN_MAX * 4]; size_t base64_data_len = sizeof(base64_data); if (base64_encode(packet, (header->caplen > nDPId_PACKETS_PLEN_MAX ? nDPId_PACKETS_PLEN_MAX : header->caplen), base64_data, &base64_data_len) != 0) { logger(1, "[%8llu, %zu] Base64 encoding failed.", reader_thread->workflow->packets_captured, reader_thread->array_index); } else if (base64_data_len > 0 && ndpi_serialize_string_binary(&workflow->ndpi_serializer, "pkt", base64_data, (uint16_t)base64_data_len) != 0) { logger(1, "[%8llu, %zu] JSON serializing base64 packet buffer failed", reader_thread->workflow->packets_captured, reader_thread->array_index); } serialize_and_send(reader_thread); } /* I decided against ndpi_flow2json as it does not fulfill my needs. */ static void jsonize_flow_event(struct nDPId_reader_thread * const reader_thread, struct nDPId_flow_extended * const flow_ext, enum flow_event event) { struct nDPId_workflow * const workflow = reader_thread->workflow; char const ev[] = "flow_event_name"; ndpi_serialize_string_int32(&workflow->ndpi_serializer, "flow_event_id", event); if (event > FLOW_EVENT_INVALID && event < FLOW_EVENT_COUNT) { ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[event]); } else { ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[FLOW_EVENT_INVALID]); } jsonize_basic(reader_thread, 1); jsonize_flow(workflow, flow_ext); jsonize_l3_l4(workflow, &flow_ext->flow_basic); switch (event) { case FLOW_EVENT_INVALID: case FLOW_EVENT_COUNT: break; case FLOW_EVENT_NEW: case FLOW_EVENT_END: case FLOW_EVENT_IDLE: case FLOW_EVENT_UPDATE: case FLOW_EVENT_ANALYSE: ndpi_serialize_string_int32(&workflow->ndpi_serializer, "flow_datalink", pcap_datalink(reader_thread->workflow->pcap_handle)); ndpi_serialize_string_uint32(&workflow->ndpi_serializer, "flow_max_packets", nDPId_options.max_packets_per_flow_to_send); if (event == FLOW_EVENT_ANALYSE) { jsonize_data_analysis(reader_thread, flow_ext); } if (flow_ext->flow_basic.state == FS_FINISHED) { struct nDPId_flow const * const flow = (struct nDPId_flow *)flow_ext; ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "ndpi"); ndpi_serialize_proto(workflow->ndpi_struct, &workflow->ndpi_serializer, flow->finished.risk, flow->finished.confidence, flow->flow_extended.detected_l7_protocol); ndpi_serialize_end_of_block(&workflow->ndpi_serializer); } else if (flow_ext->flow_basic.state == FS_INFO) { struct nDPId_flow * const flow = (struct nDPId_flow *)flow_ext; #ifdef ENABLE_ZLIB if (nDPId_options.enable_zlib_compression != 0 && flow->info.detection_data_compressed_size > 0) { workflow->current_compression_diff -= flow->info.detection_data_compressed_size; int ret = detection_data_inflate(flow); if (ret <= 0) { workflow->current_compression_diff += flow->info.detection_data_compressed_size; logger(1, "zLib decompression failed with error code: %d", ret); return; } } #endif if (flow->info.detection_completed != 0) { ndpi_serialize_start_of_block(&workflow->ndpi_serializer, "ndpi"); ndpi_serialize_proto(workflow->ndpi_struct, &workflow->ndpi_serializer, flow->info.detection_data->flow.risk, flow->info.detection_data->flow.confidence, flow->flow_extended.detected_l7_protocol); ndpi_serialize_end_of_block(&workflow->ndpi_serializer); } } break; case FLOW_EVENT_NOT_DETECTED: case FLOW_EVENT_GUESSED: case FLOW_EVENT_DETECTED: case FLOW_EVENT_DETECTION_UPDATE: logger(1, "[%8llu, %4llu] internal error / invalid function call", workflow->packets_captured, flow_ext->flow_id); break; } serialize_and_send(reader_thread); } static void jsonize_flow_detection_event(struct nDPId_reader_thread * const reader_thread, struct nDPId_flow * const flow, enum flow_event event) { struct nDPId_workflow * const workflow = reader_thread->workflow; char const ev[] = "flow_event_name"; ndpi_serialize_string_int32(&workflow->ndpi_serializer, "flow_event_id", event); if (event > FLOW_EVENT_INVALID && event < FLOW_EVENT_COUNT) { ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[event]); } else { ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, flow_event_name_table[FLOW_EVENT_INVALID]); } jsonize_basic(reader_thread, 1); jsonize_flow(workflow, &flow->flow_extended); jsonize_l3_l4(workflow, &flow->flow_extended.flow_basic); switch (event) { case FLOW_EVENT_INVALID: case FLOW_EVENT_COUNT: break; case FLOW_EVENT_NEW: case FLOW_EVENT_END: case FLOW_EVENT_IDLE: case FLOW_EVENT_UPDATE: case FLOW_EVENT_ANALYSE: logger(1, "[%8llu, %4llu] internal error / invalid function call", workflow->packets_captured, flow->flow_extended.flow_id); break; case FLOW_EVENT_NOT_DETECTED: case FLOW_EVENT_GUESSED: if (ndpi_dpi2json(workflow->ndpi_struct, &flow->info.detection_data->flow, flow->info.detection_data->guessed_l7_protocol, &workflow->ndpi_serializer) != 0) { logger(1, "[%8llu, %4llu] ndpi_dpi2json failed for not-detected/guessed flow", workflow->packets_captured, flow->flow_extended.flow_id); } break; case FLOW_EVENT_DETECTED: case FLOW_EVENT_DETECTION_UPDATE: if (ndpi_dpi2json(workflow->ndpi_struct, &flow->info.detection_data->flow, flow->flow_extended.detected_l7_protocol, &workflow->ndpi_serializer) != 0) { logger(1, "[%8llu, %4llu] ndpi_dpi2json failed for detected/detection-update flow", workflow->packets_captured, flow->flow_extended.flow_id); } break; } serialize_and_send(reader_thread); } static void internal_format_error(ndpi_serializer * const serializer, char const * const format, uint32_t format_index) { logger(1, "BUG: Internal error detected for format string `%s' at format index %u", format, format_index); ndpi_reset_serializer(serializer); } static void vjsonize_error_eventf(struct nDPId_reader_thread * const reader_thread, char const * format, va_list ap) { uint8_t got_jsonkey = 0; uint8_t is_long_long = 0; char json_key[NETWORK_BUFFER_MAX_SIZE]; uint32_t format_index = 0; while (*format) { if (got_jsonkey == 0) { json_key[0] = '\0'; } switch (*format++) { case 's': { format_index++; char * value = va_arg(ap, char *); if (got_jsonkey == 0) { snprintf(json_key, sizeof(json_key), "%s", value); got_jsonkey = 1; } else { ndpi_serialize_string_string(&reader_thread->workflow->ndpi_serializer, json_key, value); got_jsonkey = 0; } break; } case 'f': { format_index++; if (got_jsonkey == 1) { float value = va_arg(ap, double); ndpi_serialize_string_float(&reader_thread->workflow->ndpi_serializer, json_key, value, "%.2f"); got_jsonkey = 0; } else { internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index); return; } break; } case 'z': case 'l': format_index++; if (got_jsonkey != 1) { internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index); return; } if (*format == 'l') { format++; is_long_long = 1; } else { is_long_long = 0; } if (*format == 'd') { long long int value; if (is_long_long != 0) { value = va_arg(ap, long long int); } else { value = va_arg(ap, long int); } ndpi_serialize_string_int64(&reader_thread->workflow->ndpi_serializer, json_key, value); got_jsonkey = 0; } else if (*format == 'u') { unsigned long long int value; if (is_long_long != 0) { value = va_arg(ap, unsigned long long int); } else { value = va_arg(ap, unsigned long int); } ndpi_serialize_string_uint64(&reader_thread->workflow->ndpi_serializer, json_key, value); got_jsonkey = 0; } else { internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index); return; } format++; break; case 'u': format_index++; if (got_jsonkey == 1) { unsigned int value = va_arg(ap, unsigned int); ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer, json_key, value); got_jsonkey = 0; } else { internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index); return; } break; case 'd': format_index++; if (got_jsonkey == 1) { int value = va_arg(ap, int); ndpi_serialize_string_int32(&reader_thread->workflow->ndpi_serializer, json_key, value); got_jsonkey = 0; } else { internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index); return; } break; /* format string separators */ case ' ': case ',': case '%': break; default: internal_format_error(&reader_thread->workflow->ndpi_serializer, format, format_index); return; } } } __attribute__((format(printf, 3, 4))) static void jsonize_error_eventf(struct nDPId_reader_thread * const reader_thread, enum error_event event, char const * format, ...) { struct nDPId_workflow * const workflow = reader_thread->workflow; va_list ap; char const ev[] = "error_event_name"; ndpi_serialize_string_int32(&reader_thread->workflow->ndpi_serializer, "error_event_id", event); if (event > ERROR_EVENT_INVALID && event < ERROR_EVENT_COUNT) { ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, error_event_name_table[event]); } else { ndpi_serialize_string_string(&workflow->ndpi_serializer, ev, error_event_name_table[ERROR_EVENT_INVALID]); } ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer, "threshold_n", workflow->error_count); ndpi_serialize_string_uint32(&reader_thread->workflow->ndpi_serializer, "threshold_n_max", nDPId_options.error_event_threshold_n); ndpi_serialize_string_uint64(&reader_thread->workflow->ndpi_serializer, "threshold_time", nDPId_options.error_event_threshold_time); ndpi_serialize_string_uint64(&reader_thread->workflow->ndpi_serializer, "threshold_ts_usec", workflow->last_error_time); switch (event) { case MAX_FLOW_TO_TRACK: case FLOW_MEMORY_ALLOCATION_FAILED: jsonize_basic(reader_thread, 1); break; default: jsonize_basic(reader_thread, 0); break; } if (format != NULL) { va_start(ap, format); vjsonize_error_eventf(reader_thread, format, ap); va_end(ap); } ndpi_serialize_string_uint64(&workflow->ndpi_serializer, "global_ts_usec", workflow->last_global_time); serialize_and_send(reader_thread); } /* See: https://en.wikipedia.org/wiki/MurmurHash#MurmurHash3 */ static inline uint32_t murmur_32_scramble(uint32_t k) { k *= 0xcc9e2d51; k = (k << 15) | (k >> 17); k *= 0x1b873593; return k; } /* See: https://en.wikipedia.org/wiki/MurmurHash#MurmurHash3 */ static uint32_t murmur3_32(uint8_t const * key, size_t len, uint32_t seed) { uint32_t h = seed; uint32_t k; /* Read in groups of 4. */ for (size_t i = len >> 2; i; i--) { k = htole32(*(uint32_t *)key); key += sizeof(uint32_t); h ^= murmur_32_scramble(k); h = (h << 13) | (h >> 19); h = h * 5 + 0xe6546b64; } /* Read the rest. */ k = 0; for (size_t i = len & 3; i; i--) { k <<= 8; k |= key[i - 1]; } // A swap is *not* necessary here because the preceding loop already // places the low bytes in the low places according to whatever endianness // we use. Swaps only apply when the memory is copied in a chunk. h ^= murmur_32_scramble(k); /* Finalize. */ h ^= len; h ^= h >> 16; h *= 0x85ebca6b; h ^= h >> 13; h *= 0xc2b2ae35; h ^= h >> 16; return h; } static uint32_t calculate_ndpi_flow_struct_hash(struct ndpi_flow_struct const * const ndpi_flow) { /* * This is a kludge, but necessary for now as I do not want to spam nDPIsrvd and clients * with the same detection json string over and over again. * So we are building a hash over the more "stable" parts of the ndpi flow struct. * Stable in terms of they should only change if the detection changes for whatever reason. * At the time of writing, nDPI has no API function to check if the detection changed * or has some new information available. This is far from perfect. */ uint32_t hash = murmur3_32((uint8_t const *)&ndpi_flow->protos, sizeof(ndpi_flow->protos), nDPId_FLOW_STRUCT_SEED); hash += ndpi_flow->category; hash += (ndpi_flow->risk & 0xFFFFFFFF) + (ndpi_flow->risk >> 32); // nDPI Risks are u64's (might change in the // future) hash += ndpi_flow->confidence; const size_t protocol_bitmask_size = sizeof(ndpi_flow->excluded_protocol_bitmask.fds_bits) / sizeof(ndpi_flow->excluded_protocol_bitmask.fds_bits[0]); for (size_t i = 0; i < protocol_bitmask_size; ++i) { hash += ndpi_flow->excluded_protocol_bitmask.fds_bits[i]; hash += ndpi_flow->excluded_protocol_bitmask.fds_bits[i]; } size_t host_server_name_len = strnlen((const char *)ndpi_flow->host_server_name, sizeof(ndpi_flow->host_server_name)); hash += host_server_name_len; hash += murmur3_32((uint8_t const *)&ndpi_flow->host_server_name, sizeof(ndpi_flow->host_server_name), nDPId_FLOW_STRUCT_SEED); return hash; } /* Some constants stolen from ndpiReader. */ #define SNAP 0xaa /* mask for FCF */ #define WIFI_DATA 0x2 #define FCF_TYPE(fc) (((fc) >> 2) & 0x3) /* 0000 0011 = 0x3 */ #define FCF_TO_DS(fc) ((fc)&0x0100) #define FCF_FROM_DS(fc) ((fc)&0x0200) /* mask for Bad FCF presence */ #define BAD_FCS 0x50 /* 0101 0000 */ static int process_datalink_layer(struct nDPId_reader_thread * const reader_thread, struct pcap_pkthdr const * const header, uint8_t const * const packet, uint16_t * ip_offset, uint16_t * layer3_type) { const uint16_t eth_offset = 0; const int datalink_type = pcap_datalink(reader_thread->workflow->pcap_handle); const struct ndpi_ethhdr * ethernet; switch (datalink_type) { case DLT_NULL: { /* DLT header values can be stored as big or little endian. */ uint32_t dlt_hdr = *((uint32_t *)&packet[eth_offset]); if (dlt_hdr == 0x02000000 || dlt_hdr == 0x02) { *layer3_type = ETH_P_IP; } else if (dlt_hdr == 0x24000000 || dlt_hdr == 0x24 || dlt_hdr == 0x28000000 || dlt_hdr == 0x28 || dlt_hdr == 0x30000000 || dlt_hdr == 0x30) { *layer3_type = ETH_P_IPV6; } else { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, UNKNOWN_DATALINK_LAYER, "%s%u", "layer_type", ntohl(*((uint32_t *)&packet[eth_offset]))); jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } *ip_offset = sizeof(dlt_hdr) + eth_offset; break; } case DLT_PPP_SERIAL: { if (header->caplen < sizeof(struct ndpi_chdlc)) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, PACKET_TOO_SHORT, "%s%u %s%zu", "size", header->caplen, "expected", sizeof(struct ndpi_chdlc)); jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); *layer3_type = ntohs(chdlc->proto_code); break; } case DLT_C_HDLC: case DLT_PPP: if (header->caplen < sizeof(struct ndpi_chdlc)) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, PACKET_TOO_SHORT, "%s%u %s%zu", "size", header->caplen, "expected", sizeof(struct ndpi_chdlc)); jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } if (packet[0] == 0x0f || packet[0] == 0x8f) { struct ndpi_chdlc const * const chdlc = (struct ndpi_chdlc const * const)&packet[eth_offset]; *ip_offset = sizeof(struct ndpi_chdlc); /* CHDLC_OFF = 4 */ *layer3_type = ntohs(chdlc->proto_code); } else { *ip_offset = 2; *layer3_type = ntohs(*((u_int16_t *)&packet[eth_offset])); } break; case DLT_LINUX_SLL: if (header->caplen < 16) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf( reader_thread, PACKET_TOO_SHORT, "%s%u %s%u", "size", header->caplen, "expected", 16); jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } *layer3_type = (packet[eth_offset + 14] << 8) + packet[eth_offset + 15]; *ip_offset = 16 + eth_offset; break; case DLT_IEEE802_11_RADIO: { if (header->caplen < sizeof(struct ndpi_radiotap_header)) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, PACKET_TOO_SHORT, "%s%u %s%zu", "size", header->caplen, "expected", sizeof(struct ndpi_radiotap_header)); jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } struct ndpi_radiotap_header const * const radiotap = (struct ndpi_radiotap_header const * const)&packet[eth_offset]; uint16_t radio_len = radiotap->len; /* Check Bad FCS presence */ if ((radiotap->flags & BAD_FCS) == BAD_FCS) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, PACKET_HEADER_INVALID, "%s%s", "reason", "Bad FCS presence"); jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } if (header->caplen < (eth_offset + radio_len + sizeof(struct ndpi_wifi_header))) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, PACKET_TOO_SHORT, "%s%u %s%zu", "size", header->caplen, "expected", (eth_offset + radio_len + sizeof(struct ndpi_wifi_header))); jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } /* Calculate 802.11 header length (variable) */ struct ndpi_wifi_header const * const wifi = (struct ndpi_wifi_header const * const)(packet + eth_offset + radio_len); uint16_t fc = wifi->fc; int wifi_len = 0; /* check wifi data presence */ if (FCF_TYPE(fc) == WIFI_DATA) { if ((FCF_TO_DS(fc) && FCF_FROM_DS(fc) == 0x0) || (FCF_TO_DS(fc) == 0x0 && FCF_FROM_DS(fc))) { wifi_len = 26; /* + 4 byte fcs */ } } else { /* no data frames */ break; } /* Check ether_type from LLC */ if (header->caplen < (eth_offset + wifi_len + radio_len + sizeof(struct ndpi_llc_header_snap))) { return 1; } struct ndpi_llc_header_snap const * const llc = (struct ndpi_llc_header_snap const * const)(packet + eth_offset + wifi_len + radio_len); if (llc->dsap == SNAP) { *layer3_type = ntohs(llc->snap.proto_ID); } /* Set IP header offset */ *ip_offset = wifi_len + radio_len + sizeof(struct ndpi_llc_header_snap) + eth_offset; break; } case DLT_RAW: *ip_offset = 0; if (header->caplen < 1) { return 1; } switch ((packet[0] & 0xF0) >> 4) { case 4: *layer3_type = ETH_P_IP; break; case 6: *layer3_type = ETH_P_IPV6; break; default: return 1; } break; case DLT_EN10MB: if (header->caplen < sizeof(struct ndpi_ethhdr)) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, PACKET_TOO_SHORT, "%s%u %s%zu", "size", header->caplen, "expected", sizeof(struct ndpi_ethhdr)); jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } ethernet = (struct ndpi_ethhdr *)&packet[eth_offset]; *ip_offset = sizeof(struct ndpi_ethhdr) + eth_offset; *layer3_type = ntohs(ethernet->h_proto); /* Cisco FabricPath (data center ethernet devices) */ if (*layer3_type == ETHERTYPE_DCE) { if (header->caplen < sizeof(struct ndpi_ethhdr) + 20 /* sizeof(Ethernet/DCE-header) */) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, PACKET_TOO_SHORT, "%s%u %s%zu", "size", header->caplen, "expected", sizeof(struct ndpi_ethhdr) + 2); jsonize_packet_event( reader_thread, header, packet, *layer3_type, *ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } ethernet = (struct ndpi_ethhdr *)&packet[eth_offset + 20]; *ip_offset = sizeof(struct ndpi_ethhdr) + eth_offset; *layer3_type = ntohs(ethernet->h_proto); } /* 802.1Q VLAN */ if (*layer3_type == ETHERTYPE_VLAN) { if (header->caplen < sizeof(struct ndpi_ethhdr) + 4 /* sizeof(802.1Q-header) */) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, PACKET_TOO_SHORT, "%s%u %s%zu", "size", header->caplen, "expected", sizeof(struct ndpi_ethhdr) + 4); jsonize_packet_event( reader_thread, header, packet, *layer3_type, *ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } *layer3_type = ntohs(*(uint16_t *)&packet[*ip_offset + 2]); *ip_offset += 4; } switch (*layer3_type) { case ETH_P_IP: /* IPv4 */ if (header->caplen < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_iphdr)) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, IP4_PACKET_TOO_SHORT, "%s%u %s%zu", "size", header->caplen, "expected", sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_iphdr)); jsonize_packet_event(reader_thread, header, packet, *layer3_type, *ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } break; case ETH_P_IPV6: /* IPV6 */ if (header->caplen < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_ipv6hdr)) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, IP6_PACKET_TOO_SHORT, "%s%u %s%zu", "size", header->caplen, "expected", sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_ipv6hdr)); jsonize_packet_event(reader_thread, header, packet, *layer3_type, *ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } break; case ETHERTYPE_PAE: /* 802.1X Authentication */ return 1; case ETHERTYPE_ARP: /* ARP */ return 1; default: if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, PACKET_TYPE_UNKNOWN, "%s%u", "layer_type", *layer3_type); jsonize_packet_event( reader_thread, header, packet, *layer3_type, *ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } break; case DLT_IPV4: *layer3_type = ETH_P_IP; *ip_offset = 0; break; case DLT_IPV6: *layer3_type = ETH_P_IPV6; *ip_offset = 0; break; /* Switch tag datalinks are not supported for now. */ case DLT_DSA_TAG_DSA: return 1; case DLT_DSA_TAG_EDSA: return 1; default: if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, UNKNOWN_DATALINK_LAYER, "%s%u", "layer_type", ntohl(*((uint32_t *)&packet[eth_offset]))); jsonize_packet_event(reader_thread, header, packet, 0, 0, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return 1; } return 0; } static struct nDPId_flow_basic * add_new_flow(struct nDPId_workflow * const workflow, struct nDPId_flow_basic * orig_flow_basic, enum nDPId_flow_state state, size_t hashed_index) { size_t s; switch (state) { case FS_UNKNOWN: case FS_COUNT: case FS_FINISHED: // do not allocate something for FS_FINISHED as we are re-using memory allocated by FS_INFO return NULL; case FS_SKIPPED: workflow->total_skipped_flows++; s = sizeof(struct nDPId_flow_skipped); break; case FS_INFO: s = sizeof(struct nDPId_flow); break; } struct nDPId_flow_basic * flow_basic = (struct nDPId_flow_basic *)ndpi_malloc(s); if (flow_basic == NULL) { return NULL; } memset(flow_basic, 0, s); *flow_basic = *orig_flow_basic; flow_basic->state = state; if (ndpi_tsearch(flow_basic, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp) == NULL) { ndpi_free(flow_basic); return NULL; } workflow->cur_active_flows++; return flow_basic; } static void do_periodically_work(struct nDPId_reader_thread * const reader_thread) { if (reader_thread->workflow->last_scan_time + nDPId_options.flow_scan_interval <= reader_thread->workflow->last_global_time) { check_for_idle_flows(reader_thread); check_for_flow_updates(reader_thread); reader_thread->workflow->last_scan_time = reader_thread->workflow->last_global_time; } if (reader_thread->workflow->last_status_time + nDPId_options.daemon_status_interval + reader_thread->array_index * 1000 <= reader_thread->workflow->last_global_time) { jsonize_daemon(reader_thread, DAEMON_EVENT_STATUS); reader_thread->workflow->last_status_time = reader_thread->workflow->last_global_time + reader_thread->array_index * 1000; } #ifdef ENABLE_MEMORY_PROFILING if (reader_thread->workflow->last_memory_usage_log_time + nDPId_options.memory_profiling_log_interval <= reader_thread->workflow->last_global_time) { log_memory_usage(reader_thread); reader_thread->workflow->last_memory_usage_log_time = reader_thread->workflow->last_global_time; } #endif } static int distribute_single_packet(struct nDPId_reader_thread * const reader_thread) { return (reader_thread->workflow->packets_captured % nDPId_options.reader_thread_count == reader_thread->array_index); } static void ndpi_process_packet(uint8_t * const args, struct pcap_pkthdr const * const header, uint8_t const * const packet) { struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)args; struct nDPId_workflow * workflow; struct nDPId_flow_basic flow_basic = {}; enum nDPId_flow_direction direction; size_t hashed_index; void * tree_result; struct nDPId_flow * flow_to_process; uint8_t is_new_flow = 0; const struct ndpi_iphdr * ip; struct ndpi_ipv6hdr * ip6; const struct ndpi_tcphdr * tcp = NULL; uint64_t time_us; uint64_t last_pkt_time; uint16_t ip_offset = 0; uint16_t ip_size; const uint8_t * l4_ptr = NULL; uint16_t l4_len = 0; uint16_t l4_payload_len = 0; uint16_t type = 0; size_t thread_index = nDPId_THREAD_DISTRIBUTION_SEED; // generated with `dd if=/dev/random bs=1024 count=1 |& hd' if (reader_thread == NULL) { return; } workflow = reader_thread->workflow; if (workflow == NULL) { return; } workflow->packets_captured++; time_us = ndpi_timeval_to_microseconds(header->ts); if (workflow->last_global_time < time_us) { workflow->last_global_time = time_us; } if (workflow->last_thread_time == 0) { workflow->last_thread_time = time_us; } do_periodically_work(reader_thread); if (process_datalink_layer(reader_thread, header, packet, &ip_offset, &type) != 0) { return; } if (type == ETH_P_IP) { ip = (struct ndpi_iphdr *)&packet[ip_offset]; ip6 = NULL; } else if (type == ETH_P_IPV6) { ip = NULL; ip6 = (struct ndpi_ipv6hdr *)&packet[ip_offset]; } else { if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return; } ip_size = header->caplen - ip_offset; if (type == ETH_P_IP && header->caplen >= ip_offset) { if (header->caplen < header->len) { if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, CAPTURE_SIZE_SMALLER_THAN_PACKET_SIZE, "%s%u %s%u", "size", header->caplen, "expected", header->len); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } } } /* process layer3 e.g. IPv4 / IPv6 */ if (ip != NULL && ip->version == 4) { if (ip_size < sizeof(*ip)) { if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, IP4_SIZE_SMALLER_THAN_HEADER, "%s%u %s%zu", "size", ip_size, "expected", sizeof(*ip)); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return; } flow_basic.l3_type = L3_IP; if (ndpi_detection_get_l4( (uint8_t *)ip, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0) { if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf( reader_thread, IP4_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip)); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return; } flow_basic.src.v4.ip = ip->saddr; flow_basic.dst.v4.ip = ip->daddr; uint32_t min_addr = (flow_basic.src.v4.ip > flow_basic.dst.v4.ip ? flow_basic.dst.v4.ip : flow_basic.src.v4.ip); thread_index = min_addr + ip->protocol; } else if (ip6 != NULL) { if (ip_size < sizeof(ip6->ip6_hdr)) { if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, IP6_SIZE_SMALLER_THAN_HEADER, "%s%u %s%zu", "size", ip_size, "expected", sizeof(ip6->ip6_hdr)); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return; } flow_basic.l3_type = L3_IP6; if (ndpi_detection_get_l4( (uint8_t *)ip6, ip_size, &l4_ptr, &l4_len, &flow_basic.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0) { if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf( reader_thread, IP6_L4_PAYLOAD_DETECTION_FAILED, "%s%zu", "l4_data_len", ip_size - sizeof(*ip)); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return; } flow_basic.src.v6.ip[0] = ip6->ip6_src.u6_addr.u6_addr64[0]; flow_basic.src.v6.ip[1] = ip6->ip6_src.u6_addr.u6_addr64[1]; flow_basic.dst.v6.ip[0] = ip6->ip6_dst.u6_addr.u6_addr64[0]; flow_basic.dst.v6.ip[1] = ip6->ip6_dst.u6_addr.u6_addr64[1]; uint64_t min_addr[2]; if (flow_basic.src.v6.ip[0] > flow_basic.dst.v6.ip[0] || (flow_basic.src.v6.ip[0] == flow_basic.dst.v6.ip[0] && flow_basic.src.v6.ip[1] > flow_basic.dst.v6.ip[1])) { min_addr[0] = flow_basic.dst.v6.ip[0]; min_addr[1] = flow_basic.dst.v6.ip[1]; } else { min_addr[0] = flow_basic.src.v6.ip[0]; min_addr[1] = flow_basic.src.v6.ip[1]; } thread_index = min_addr[0] + min_addr[1] + ip6->ip6_hdr.ip6_un1_nxt; } else { if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, UNKNOWN_L3_PROTOCOL, "%s%u", "protocol", type); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, 0, 0, NULL, PACKET_EVENT_PAYLOAD); } return; } /* process layer4 e.g. TCP / UDP */ if (flow_basic.l4_protocol == IPPROTO_TCP) { if (header->caplen < (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)) { if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, TCP_PACKET_TOO_SHORT, "%s%u %s%zu", "size", header->caplen, "expected", (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); } return; } tcp = (struct ndpi_tcphdr *)l4_ptr; l4_payload_len = ndpi_max(0, l4_len - 4 * tcp->doff); flow_basic.tcp_fin_rst_seen = (tcp->fin == 1 || tcp->rst == 1 ? 1 : 0); flow_basic.tcp_is_midstream_flow = (tcp->syn == 0 ? 1 : 0); flow_basic.src_port = ntohs(tcp->source); flow_basic.dst_port = ntohs(tcp->dest); } else if (flow_basic.l4_protocol == IPPROTO_UDP) { const struct ndpi_udphdr * udp; if (header->caplen < (l4_ptr - packet) + sizeof(struct ndpi_udphdr)) { if (distribute_single_packet(reader_thread) != 0 && is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, UDP_PACKET_TOO_SHORT, "%s%u %s%zu", "size", header->caplen, "expected", (l4_ptr - packet) + sizeof(struct ndpi_udphdr)); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); } return; } udp = (struct ndpi_udphdr *)l4_ptr; l4_payload_len = (l4_len > sizeof(struct ndpi_udphdr)) ? l4_len - sizeof(struct ndpi_udphdr) : 0; flow_basic.src_port = ntohs(udp->source); flow_basic.dst_port = ntohs(udp->dest); } else { /* Use layer4 length returned from libnDPI. */ l4_payload_len = l4_len; } /* distribute flows to threads while keeping stability (same flow goes always to same thread) */ thread_index += (flow_basic.src_port < flow_basic.dst_port ? flow_basic.dst_port : flow_basic.src_port); thread_index %= nDPId_options.reader_thread_count; if (thread_index != reader_thread->array_index) { return; } if (workflow->last_thread_time < time_us) { workflow->last_thread_time = time_us; } /* calculate flow hash for btree find, search(insert) */ switch (flow_basic.l3_type) { case L3_IP: if (ndpi_flowv4_flow_hash(flow_basic.l4_protocol, flow_basic.src.v4.ip, flow_basic.dst.v4.ip, flow_basic.src_port, flow_basic.dst_port, 0, 0, (uint8_t *)&flow_basic.hashval, sizeof(flow_basic.hashval)) != 0) { flow_basic.hashval = flow_basic.src.v4.ip + flow_basic.dst.v4.ip; // fallback } break; case L3_IP6: if (ndpi_flowv6_flow_hash(flow_basic.l4_protocol, &ip6->ip6_src, &ip6->ip6_dst, flow_basic.src_port, flow_basic.dst_port, 0, 0, (uint8_t *)&flow_basic.hashval, sizeof(flow_basic.hashval)) != 0) { flow_basic.hashval = flow_basic.src.v6.ip[0] + flow_basic.src.v6.ip[1]; flow_basic.hashval += flow_basic.dst.v6.ip[0] + flow_basic.dst.v6.ip[1]; } break; } flow_basic.hashval += flow_basic.l4_protocol + flow_basic.src_port + flow_basic.dst_port; hashed_index = flow_basic.hashval % workflow->max_active_flows; direction = FD_SRC2DST; tree_result = ndpi_tfind(&flow_basic, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp); if (tree_result == NULL) { direction = FD_DST2SRC; /* flow not found in btree: switch src <-> dst and try to find it again */ uint64_t orig_src_ip[2] = {flow_basic.src.v6.ip[0], flow_basic.src.v6.ip[1]}; uint64_t orig_dst_ip[2] = {flow_basic.dst.v6.ip[0], flow_basic.dst.v6.ip[1]}; uint16_t orig_src_port = flow_basic.src_port; uint16_t orig_dst_port = flow_basic.dst_port; flow_basic.src.v6.ip[0] = orig_dst_ip[0]; flow_basic.src.v6.ip[1] = orig_dst_ip[1]; flow_basic.dst.v6.ip[0] = orig_src_ip[0]; flow_basic.dst.v6.ip[1] = orig_src_ip[1]; flow_basic.src_port = orig_dst_port; flow_basic.dst_port = orig_src_port; tree_result = ndpi_tfind(&flow_basic, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp); flow_basic.src.v6.ip[0] = orig_src_ip[0]; flow_basic.src.v6.ip[1] = orig_src_ip[1]; flow_basic.dst.v6.ip[0] = orig_dst_ip[0]; flow_basic.dst.v6.ip[1] = orig_dst_ip[1]; flow_basic.src_port = orig_src_port; flow_basic.dst_port = orig_dst_port; } if (tree_result == NULL) { /* flow still not found, must be new or midstream */ direction = FD_SRC2DST; union nDPId_ip const * netmask = NULL; union nDPId_ip const * subnet = NULL; switch (flow_basic.l3_type) { case L3_IP: netmask = &nDPId_options.pcap_dev_netmask4; subnet = &nDPId_options.pcap_dev_subnet4; break; case L3_IP6: netmask = &nDPId_options.pcap_dev_netmask6; subnet = &nDPId_options.pcap_dev_subnet6; break; } if (nDPId_options.process_internal_initial_direction != 0 && flow_basic.tcp_is_midstream_flow == 0) { if (is_ip_in_subnet(&flow_basic.src, netmask, subnet, flow_basic.l3_type) == 0) { if (add_new_flow(workflow, &flow_basic, FS_SKIPPED, hashed_index) == NULL && is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", "size", sizeof(struct nDPId_flow_skipped)); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); } return; } } else if (nDPId_options.process_external_initial_direction != 0 && flow_basic.tcp_is_midstream_flow == 0) { if (is_ip_in_subnet(&flow_basic.src, netmask, subnet, flow_basic.l3_type) != 0) { if (add_new_flow(workflow, &flow_basic, FS_SKIPPED, hashed_index) == NULL && is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", "size", sizeof(struct nDPId_flow_skipped)); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); } return; } } if (workflow->cur_active_flows == workflow->max_active_flows) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf(reader_thread, MAX_FLOW_TO_TRACK, "%s%llu %s%llu %s%llu %s%llu", "current_active", workflow->cur_active_flows, "current_idle", workflow->cur_idle_flows, "max_active", workflow->max_active_flows, "max_idle", workflow->max_idle_flows); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); } return; } flow_to_process = (struct nDPId_flow *)add_new_flow(workflow, &flow_basic, FS_INFO, hashed_index); if (flow_to_process == NULL) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf( reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", "size", sizeof(*flow_to_process)); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); } return; } workflow->total_active_flows++; flow_to_process->flow_extended.flow_id = MT_GET_AND_ADD(global_flow_id, 1); if (alloc_detection_data(flow_to_process) != 0) { if (is_error_event_threshold(reader_thread->workflow) == 0) { jsonize_error_eventf( reader_thread, FLOW_MEMORY_ALLOCATION_FAILED, "%s%zu", "size", sizeof(*flow_to_process)); jsonize_packet_event(reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, NULL, PACKET_EVENT_PAYLOAD); } return; } is_new_flow = 1; } else { /* flow already exists in the tree */ struct nDPId_flow_basic * const flow_basic_to_process = *(struct nDPId_flow_basic **)tree_result; /* Update last seen timestamp for timeout handling. */ last_pkt_time = flow_basic_to_process->last_pkt_time[direction]; flow_basic_to_process->last_pkt_time[direction] = workflow->last_thread_time; /* TCP-FIN/TCP-RST: indicates that at least one side wants to end the connection. */ if (flow_basic.tcp_fin_rst_seen != 0) { flow_basic_to_process->tcp_fin_rst_seen = 1; } switch (flow_basic_to_process->state) { case FS_UNKNOWN: case FS_COUNT: case FS_SKIPPED: return; case FS_FINISHED: case FS_INFO: break; } flow_to_process = (struct nDPId_flow *)flow_basic_to_process; if (flow_to_process->flow_extended.flow_basic.state == FS_INFO) { #ifdef ENABLE_ZLIB if (nDPId_options.enable_zlib_compression != 0 && flow_to_process->info.detection_data_compressed_size > 0) { workflow->current_compression_diff -= flow_to_process->info.detection_data_compressed_size; int ret = detection_data_inflate(flow_to_process); if (ret <= 0) { workflow->current_compression_diff += flow_to_process->info.detection_data_compressed_size; logger(1, "zLib decompression failed for existing flow %llu with error code: %d", flow_to_process->flow_extended.flow_id, ret); return; } } #endif } } flow_to_process->flow_extended.packets_processed[direction]++; flow_to_process->flow_extended.total_l4_payload_len[direction] += l4_payload_len; workflow->packets_processed++; workflow->total_l4_payload_len += l4_payload_len; if (l4_payload_len > flow_to_process->flow_extended.max_l4_payload_len[direction]) { flow_to_process->flow_extended.max_l4_payload_len[direction] = l4_payload_len; } if (l4_payload_len < flow_to_process->flow_extended.min_l4_payload_len[direction]) { flow_to_process->flow_extended.min_l4_payload_len[direction] = l4_payload_len; } if (is_new_flow != 0) { last_pkt_time = flow_to_process->flow_extended.first_seen = flow_to_process->flow_extended.flow_basic.last_pkt_time[direction] = flow_to_process->flow_extended.flow_basic.last_pkt_time[1 - direction] = flow_to_process->flow_extended.last_flow_update = workflow->last_thread_time; flow_to_process->flow_extended.max_l4_payload_len[direction] = l4_payload_len; flow_to_process->flow_extended.min_l4_payload_len[direction] = l4_payload_len; jsonize_flow_event(reader_thread, &flow_to_process->flow_extended, FLOW_EVENT_NEW); } if (nDPId_options.enable_data_analysis != 0 && flow_to_process->flow_extended.flow_analysis != NULL && flow_to_process->flow_extended.packets_processed[FD_SRC2DST] + flow_to_process->flow_extended.packets_processed[FD_DST2SRC] <= nDPId_options.max_packets_per_flow_to_analyse) { unsigned long long int total_flow_packets = flow_to_process->flow_extended.packets_processed[FD_SRC2DST] + flow_to_process->flow_extended.packets_processed[FD_DST2SRC]; uint64_t tdiff_us = timer_sub(workflow->last_thread_time, last_pkt_time); if (total_flow_packets > 1) { ndpi_data_add_value(&flow_to_process->flow_extended.flow_analysis->iat, tdiff_us); } ndpi_data_add_value(&flow_to_process->flow_extended.flow_analysis->pktlen, ip_size); flow_to_process->flow_extended.flow_analysis ->directions[(total_flow_packets - 1) % nDPId_options.max_packets_per_flow_to_analyse] = direction; ndpi_inc_bin(&flow_to_process->flow_extended.flow_analysis->payload_len_bin[direction], plen2slot(l4_payload_len), 1); flow_to_process->flow_extended.flow_analysis ->entropies[(total_flow_packets - 1) % nDPId_options.max_packets_per_flow_to_analyse] = ndpi_entropy((ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6), ip_size); if (total_flow_packets == nDPId_options.max_packets_per_flow_to_analyse) { jsonize_flow_event(reader_thread, &flow_to_process->flow_extended, FLOW_EVENT_ANALYSE); free_analysis_data(&flow_to_process->flow_extended); } } jsonize_packet_event(reader_thread, header, packet, type, ip_offset, (l4_ptr - packet), l4_len, &flow_to_process->flow_extended, PACKET_EVENT_PAYLOAD_FLOW); if (flow_to_process->flow_extended.flow_basic.state != FS_INFO || flow_to_process->info.detection_data == NULL) { /* Only FS_INFO goes through the whole detection process. */ return; } if (flow_to_process->info.detection_data->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process - 1) { if (flow_to_process->info.detection_completed != 0) { reader_thread->workflow->total_flow_detection_updates++; jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE); } else { /* last chance to guess something, better then nothing */ uint8_t protocol_was_guessed = 0; flow_to_process->info.detection_data->guessed_l7_protocol = ndpi_detection_giveup( workflow->ndpi_struct, &flow_to_process->info.detection_data->flow, 1, &protocol_was_guessed); if (protocol_was_guessed != 0) { workflow->total_guessed_flows++; jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_GUESSED); } else { reader_thread->workflow->total_not_detected_flows++; jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_NOT_DETECTED); } } } flow_to_process->flow_extended.detected_l7_protocol = ndpi_detection_process_packet(workflow->ndpi_struct, &flow_to_process->info.detection_data->flow, ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6, ip_size, workflow->last_thread_time / 1000, NULL); if (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_to_process->flow_extended.detected_l7_protocol) != 0 && flow_to_process->info.detection_completed == 0) { flow_to_process->info.detection_completed = 1; workflow->total_detected_flows++; /* * The following needs to be done, because a successful classification may happen after the first packet. * If there is no further extra dissection possible for this protocol, we may be saving an invalid risk. */ if (flow_to_process->flow_extended.packets_processed[FD_SRC2DST] + flow_to_process->flow_extended.packets_processed[FD_DST2SRC] == 1) { ndpi_unset_risk(workflow->ndpi_struct, &flow_to_process->info.detection_data->flow, NDPI_UNIDIRECTIONAL_TRAFFIC); } jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTED); flow_to_process->info.detection_data->last_ndpi_flow_struct_hash = calculate_ndpi_flow_struct_hash(&flow_to_process->info.detection_data->flow); } else if (flow_to_process->info.detection_completed == 1) { uint32_t hash = calculate_ndpi_flow_struct_hash(&flow_to_process->info.detection_data->flow); if (hash != flow_to_process->info.detection_data->last_ndpi_flow_struct_hash) { workflow->total_flow_detection_updates++; jsonize_flow_detection_event(reader_thread, flow_to_process, FLOW_EVENT_DETECTION_UPDATE); flow_to_process->info.detection_data->last_ndpi_flow_struct_hash = hash; } } if (flow_to_process->info.detection_data->flow.num_processed_pkts == nDPId_options.max_packets_per_flow_to_process || (ndpi_is_protocol_detected(workflow->ndpi_struct, flow_to_process->flow_extended.detected_l7_protocol) != 0 && ndpi_extra_dissection_possible(workflow->ndpi_struct, &flow_to_process->info.detection_data->flow) == 0)) { struct ndpi_proto detected_l7_protocol = flow_to_process->flow_extended.detected_l7_protocol; if (ndpi_is_protocol_detected(workflow->ndpi_struct, detected_l7_protocol) == 0) { detected_l7_protocol = flow_to_process->info.detection_data->guessed_l7_protocol; } ndpi_risk risk = flow_to_process->info.detection_data->flow.risk; ndpi_confidence_t confidence = flow_to_process->info.detection_data->flow.confidence; free_detection_data(flow_to_process); flow_to_process->flow_extended.flow_basic.state = FS_FINISHED; struct nDPId_flow * const flow = flow_to_process; flow->flow_extended.detected_l7_protocol = detected_l7_protocol; flow->finished.risk = risk; flow->finished.confidence = confidence; } #ifdef ENABLE_ZLIB if (nDPId_options.enable_zlib_compression != 0) { check_for_compressable_flows(reader_thread); } #endif } static void get_current_time(struct timeval * const tval) { gettimeofday(tval, NULL); } #if !defined(__FreeBSD__) && !defined(__APPLE__) static void ndpi_log_flow_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) { struct nDPId_reader_thread const * const reader_thread = (struct nDPId_reader_thread *)user_data; struct nDPId_flow_basic const * const flow_basic = *(struct nDPId_flow_basic **)A; (void)depth; (void)user_data; if (flow_basic == NULL) { return; } if (which == ndpi_preorder || which == ndpi_leaf) { switch (flow_basic->state) { case FS_UNKNOWN: break; case FS_COUNT: break; case FS_SKIPPED: break; case FS_FINISHED: { struct nDPId_flow const * const flow = (struct nDPId_flow *)flow_basic; uint64_t last_seen = get_last_pkt_time(flow_basic); uint64_t idle_time = get_l4_protocol_idle_time_external(flow->flow_extended.flow_basic.l4_protocol); logger(0, "[%2zu][%4llu][last-seen: %13llu][last-update: %13llu][idle-time: %7llu][time-until-timeout: " "%7llu]", reader_thread->array_index, flow->flow_extended.flow_id, (unsigned long long int)last_seen, (unsigned long long int)flow->flow_extended.last_flow_update, (unsigned long long int)idle_time, (unsigned long long int)(last_seen + idle_time >= reader_thread->workflow->last_thread_time ? last_seen + idle_time - reader_thread->workflow->last_thread_time : 0)); break; } case FS_INFO: { struct nDPId_flow const * const flow = (struct nDPId_flow *)flow_basic; uint64_t last_seen = get_last_pkt_time(flow_basic); uint64_t idle_time = get_l4_protocol_idle_time_external(flow->flow_extended.flow_basic.l4_protocol); logger(0, "[%2zu][%4llu][last-seen: %13llu][last-update: %13llu][idle-time: %7llu][time-until-timeout: " "%7llu]", reader_thread->array_index, flow->flow_extended.flow_id, (unsigned long long int)last_seen, (unsigned long long int)flow->flow_extended.last_flow_update, (unsigned long long int)idle_time, (unsigned long long int)(last_seen + idle_time >= reader_thread->workflow->last_thread_time ? last_seen + idle_time - reader_thread->workflow->last_thread_time : 0)); break; } } } } static void log_all_flows(struct nDPId_reader_thread const * const reader_thread) { struct nDPId_workflow const * const workflow = reader_thread->workflow; logger(0, "[%2zu][last-global-time: %13llu][last-thread-time: %13llu][last-scan-time: %13llu]", reader_thread->array_index, (unsigned long long int)workflow->last_global_time, (unsigned long long int)workflow->last_thread_time, (unsigned long long int)workflow->last_scan_time); for (size_t scan_index = 0; scan_index < workflow->max_active_flows; ++scan_index) { ndpi_twalk(workflow->ndpi_flows_active[scan_index], ndpi_log_flow_walker, (void *)reader_thread); } } #endif static void run_pcap_loop(struct nDPId_reader_thread * const reader_thread) { if (reader_thread->workflow != NULL && reader_thread->workflow->pcap_handle != NULL) { if (reader_thread->workflow->is_pcap_file != 0) { switch (pcap_loop(reader_thread->workflow->pcap_handle, -1, &ndpi_process_packet, (uint8_t *)reader_thread)) { case PCAP_ERROR: logger(1, "Error while reading pcap file: '%s'", pcap_geterr(reader_thread->workflow->pcap_handle)); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; case PCAP_ERROR_BREAK: MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; default: return; } } else { #if !defined(__FreeBSD__) && !defined(__APPLE__) sigset_t thread_signal_set, old_signal_set; sigfillset(&thread_signal_set); if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0) { logger(1, "pthread_sigmask: %s", strerror(errno)); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; } sigaddset(&thread_signal_set, SIGINT); sigaddset(&thread_signal_set, SIGTERM); sigaddset(&thread_signal_set, SIGUSR1); int signal_fd = signalfd(-1, &thread_signal_set, SFD_NONBLOCK); if (signal_fd < 0 || set_fd_cloexec(signal_fd) < 0) { logger(1, "signalfd: %s", strerror(errno)); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; } #endif int pcap_fd = pcap_get_selectable_fd(reader_thread->workflow->pcap_handle); if (pcap_fd < 0) { logger(1, "%s", "Got an invalid PCAP fd"); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return; } struct nio io; nio_init(&io); #ifdef ENABLE_EPOLL if ((nDPId_options.use_poll == 0 && nio_use_epoll(&io, 32) != NIO_SUCCESS) || (nDPId_options.use_poll != 0 && nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS)) #else if (nio_use_poll(&io, nDPIsrvd_MAX_REMOTE_DESCRIPTORS) != NIO_SUCCESS) #endif { logger(1, "%s", "Event I/O poll/epoll setup failed"); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); nio_free(&io); return; } errno = 0; if (nio_add_fd(&io, pcap_fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS) { logger(1, "Could not add pcap fd to event queue: %s", (errno != 0 ? strerror(errno) : "Internal Error")); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); nio_free(&io); return; } #if !defined(__FreeBSD__) && !defined(__APPLE__) errno = 0; if (nio_add_fd(&io, signal_fd, NIO_EVENT_INPUT, NULL) != NIO_SUCCESS) { logger(1, "Could not add signal fd to event queue: %s", (errno != 0 ? strerror(errno) : "Internal Error")); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); nio_free(&io); return; } #endif int const timeout_ms = 1000; /* TODO: Configurable? */ struct timeval tval_before_epoll, tval_after_epoll; while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) { get_current_time(&tval_before_epoll); errno = 0; if (nio_run(&io, timeout_ms) != NIO_SUCCESS) { logger(1, "Event I/O returned error: %s", strerror(errno)); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); break; } int nready = nio_get_nready(&io); if (nready == 0) { struct timeval tval_diff; get_current_time(&tval_after_epoll); ndpi_timer_sub(&tval_after_epoll, &tval_before_epoll, &tval_diff); uint64_t tdiff_us = tval_diff.tv_sec * 1000 * 1000 + tval_diff.tv_usec; reader_thread->workflow->last_global_time += tdiff_us; reader_thread->workflow->last_thread_time += tdiff_us; do_periodically_work(reader_thread); } for (int i = 0; i < nready; ++i) { if (nio_has_error(&io, i) == NIO_SUCCESS) { logger(1, "%s", "Event I/O error"); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); } int fd = nio_get_fd(&io, i); #if !defined(__FreeBSD__) && !defined(__APPLE__) if (fd == signal_fd) { struct signalfd_siginfo fdsi; if (read(signal_fd, &fdsi, sizeof(fdsi)) != sizeof(fdsi)) { if (errno != EAGAIN) { logger(1, "Could not read signal data from fd %d: %s", signal_fd, strerror(errno)); } } else { char const * signame = "unknown"; switch (fdsi.ssi_signo) { case SIGINT: signame = "SIGINT"; sighandler(SIGINT); break; case SIGTERM: signame = "SIGTERM"; sighandler(SIGTERM); break; case SIGUSR1: signame = "SIGUSR1"; log_all_flows(reader_thread); break; } logger(1, "Received signal %d (%s)", fdsi.ssi_signo, signame); } } else #endif if (fd == pcap_fd) { switch (pcap_dispatch( reader_thread->workflow->pcap_handle, -1, ndpi_process_packet, (uint8_t *)reader_thread)) { case PCAP_ERROR: logger(1, "Error while reading from pcap device: '%s'", pcap_geterr(reader_thread->workflow->pcap_handle)); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); break; case PCAP_ERROR_BREAK: MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); nio_free(&io); return; default: break; } } else { logger(1, "Unknown event descriptor or data returned: %p", nio_get_ptr(&io, i)); } } } nio_free(&io); } } } static void break_pcap_loop(struct nDPId_reader_thread * const reader_thread) { if (reader_thread->workflow != NULL && reader_thread->workflow->pcap_handle != NULL) { pcap_breakloop(reader_thread->workflow->pcap_handle); } } static void * processing_thread(void * const ndpi_thread_arg) { struct nDPId_reader_thread * const reader_thread = (struct nDPId_reader_thread *)ndpi_thread_arg; reader_thread->collector_sockfd = -1; if (connect_to_collector(reader_thread) != 0) { logger(1, "Thread %zu: Could not connect to nDPIsrvd Collector at %s, will try again later. Error: %s", reader_thread->array_index, get_cmdarg(&nDPId_options.collector_address), (reader_thread->collector_sock_last_errno != 0 ? strerror(reader_thread->collector_sock_last_errno) : "Internal Error.")); } else { jsonize_daemon(reader_thread, DAEMON_EVENT_INIT); } run_pcap_loop(reader_thread); set_collector_block(reader_thread); MT_GET_AND_ADD(reader_thread->workflow->error_or_eof, 1); return NULL; } static int processing_threads_error_or_eof(void) { for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { if (MT_GET_AND_ADD(reader_threads[i].workflow->error_or_eof, 0) == 0) { return 0; } } return 1; } static int start_reader_threads(void) { sigset_t thread_signal_set, old_signal_set; sigfillset(&thread_signal_set); sigdelset(&thread_signal_set, SIGINT); sigdelset(&thread_signal_set, SIGTERM); if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0) { logger_early(1, "pthread_sigmask: %s", strerror(errno)); return 1; } if (daemonize_with_pidfile(get_cmdarg(&nDPId_options.pidfile)) != 0) { return 1; } errno = 0; if (change_user_group(get_cmdarg(&nDPId_options.user), get_cmdarg(&nDPId_options.group), get_cmdarg(&nDPId_options.pidfile), NULL, NULL) != 0 && errno != EPERM) { if (errno != 0) { logger(1, "Change user/group to %s/%s failed: %s", get_cmdarg(&nDPId_options.user), get_cmdarg(&nDPId_options.group), strerror(errno)); } else { logger(1, "Change user/group to %s/%s failed.", get_cmdarg(&nDPId_options.user), get_cmdarg(&nDPId_options.group)); } return 1; } for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { reader_threads[i].array_index = i; if (reader_threads[i].workflow == NULL) { /* no more threads should be started */ break; } if (pthread_create(&reader_threads[i].thread, NULL, processing_thread, &reader_threads[i]) != 0) { logger(1, "pthread_create: %s", strerror(errno)); return 1; } } if (pthread_sigmask(SIG_BLOCK, &old_signal_set, NULL) != 0) { logger(1, "pthread_sigmask: %s", strerror(errno)); return 1; } return 0; } static void ndpi_shutdown_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data) { struct nDPId_workflow * const workflow = (struct nDPId_workflow *)user_data; struct nDPId_flow_basic * const flow_basic = *(struct nDPId_flow_basic **)A; (void)depth; if (workflow == NULL || flow_basic == NULL) { return; } if (workflow->cur_idle_flows == nDPId_options.max_idle_flows_per_thread) { return; } if (which == ndpi_preorder || which == ndpi_leaf) { workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow_basic; switch (flow_basic->state) { case FS_UNKNOWN: case FS_COUNT: case FS_SKIPPED: break; case FS_INFO: case FS_FINISHED: workflow->total_idle_flows++; break; } } } static void process_remaining_flows(void) { for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { set_collector_block(&reader_threads[i]); for (size_t idle_scan_index = 0; idle_scan_index < reader_threads[i].workflow->max_active_flows; ++idle_scan_index) { ndpi_twalk(reader_threads[i].workflow->ndpi_flows_active[idle_scan_index], ndpi_shutdown_walker, reader_threads[i].workflow); process_idle_flow(&reader_threads[i], idle_scan_index); } jsonize_daemon(&reader_threads[i], DAEMON_EVENT_SHUTDOWN); } } static int stop_reader_threads(void) { unsigned long long int total_packets_processed = 0; unsigned long long int total_l4_payload_len = 0; unsigned long long int total_flows_skipped = 0; unsigned long long int total_flows_captured = 0; unsigned long long int total_flows_idle = 0; unsigned long long int total_not_detected = 0; unsigned long long int total_flows_guessed = 0; unsigned long long int total_flows_detected = 0; unsigned long long int total_flow_detection_updates = 0; unsigned long long int total_flow_updates = 0; for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { break_pcap_loop(&reader_threads[i]); } printf("------------------------------------ Stopping reader threads\n"); for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { if (reader_threads[i].workflow == NULL) { continue; } if (pthread_join(reader_threads[i].thread, NULL) != 0) { logger(1, "pthread_join: %s", strerror(errno)); } } printf("------------------------------------ Processing remaining flows\n"); process_remaining_flows(); printf("------------------------------------ Results\n"); for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { if (reader_threads[i].workflow == NULL) { continue; } total_packets_processed += reader_threads[i].workflow->packets_processed; total_l4_payload_len += reader_threads[i].workflow->total_l4_payload_len; total_flows_skipped += reader_threads[i].workflow->total_skipped_flows; total_flows_captured += reader_threads[i].workflow->total_active_flows; total_flows_idle += reader_threads[i].workflow->total_idle_flows; total_not_detected += reader_threads[i].workflow->total_not_detected_flows; total_flows_guessed += reader_threads[i].workflow->total_guessed_flows; total_flows_detected += reader_threads[i].workflow->total_detected_flows; total_flow_detection_updates += reader_threads[i].workflow->total_flow_detection_updates; total_flow_updates += reader_threads[i].workflow->total_flow_updates; printf( "Stopping Thread %2zu, processed %llu packets, %llu bytes\n" "\tskipped flows.....: %8llu, processed flows: %8llu, idle flows....: %8llu\n" "\tnot detected flows: %8llu, guessed flows..: %8llu, detected flows: %8llu\n" "\tdetection updates.: %8llu, updated flows..: %8llu\n", reader_threads[i].array_index, reader_threads[i].workflow->packets_processed, reader_threads[i].workflow->total_l4_payload_len, reader_threads[i].workflow->total_skipped_flows, reader_threads[i].workflow->total_active_flows, reader_threads[i].workflow->total_idle_flows, reader_threads[i].workflow->total_not_detected_flows, reader_threads[i].workflow->total_guessed_flows, reader_threads[i].workflow->total_detected_flows, reader_threads[i].workflow->total_flow_detection_updates, reader_threads[i].workflow->total_flow_updates); } /* total packets captured: same value for all threads as packet2thread distribution happens later */ printf("Total packets captured.......: %llu\n", (reader_threads[0].workflow != NULL ? reader_threads[0].workflow->packets_captured : 0)); printf("Total packets processed......: %llu\n", total_packets_processed); printf("Total layer4 payload size....: %llu\n", total_l4_payload_len); printf("Total flows ignopred.........: %llu\n", total_flows_skipped); printf("Total flows processed........: %llu\n", total_flows_captured); printf("Total flows timed out........: %llu\n", total_flows_idle); printf("Total flows detected.........: %llu\n", total_flows_detected); printf("Total flows guessed..........: %llu\n", total_flows_guessed); printf("Total flows not detected.....: %llu\n", total_not_detected); printf("Total flow updates...........: %llu\n", total_flow_updates); printf("Total flow detections updates: %llu\n", total_flow_detection_updates); return 0; } static void free_reader_threads(void) { for (unsigned long long int i = 0; i < nDPId_options.reader_thread_count; ++i) { if (reader_threads[i].workflow == NULL) { continue; } free_workflow(&reader_threads[i].workflow); } } static void sighandler(int signum) { (void)signum; if (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0) { MT_GET_AND_ADD(nDPId_main_thread_shutdown, 1); } } static void print_subopt_usage(void) { int index = MAX_FLOWS_PER_THREAD; char * const * token = &subopt_token[0]; fprintf(stderr, "\tsubopts:\n"); do { if (*token != NULL) { fprintf(stderr, "\t\t%s = ", *token); enum nDPId_subopts subopts = index++; switch (subopts) { case MAX_FLOWS_PER_THREAD: fprintf(stderr, "%llu\n", nDPId_options.max_flows_per_thread); break; case MAX_IDLE_FLOWS_PER_THREAD: fprintf(stderr, "%llu\n", nDPId_options.max_idle_flows_per_thread); break; case MAX_READER_THREADS: fprintf(stderr, "%llu\n", nDPId_options.reader_thread_count); break; case DAEMON_STATUS_INTERVAL: fprintf(stderr, "%llu\n", nDPId_options.daemon_status_interval); break; #ifdef ENABLE_MEMORY_PROFILING case MEMORY_PROFILING_LOG_INTERVAL: fprintf(stderr, "%llu\n", nDPId_options.memory_profiling_log_interval); break; #endif #ifdef ENABLE_ZLIB case COMPRESSION_SCAN_INTERVAL: fprintf(stderr, "%llu\n", nDPId_options.compression_scan_interval); break; case COMPRESSION_FLOW_INACTIVITY: fprintf(stderr, "%llu\n", nDPId_options.compression_flow_inactivity); break; #endif case FLOW_SCAN_INTVERAL: fprintf(stderr, "%llu\n", nDPId_options.flow_scan_interval); break; case GENERIC_MAX_IDLE_TIME: fprintf(stderr, "%llu\n", nDPId_options.generic_max_idle_time); break; case ICMP_MAX_IDLE_TIME: fprintf(stderr, "%llu\n", nDPId_options.icmp_max_idle_time); break; case UDP_MAX_IDLE_TIME: fprintf(stderr, "%llu\n", nDPId_options.udp_max_idle_time); break; case TCP_MAX_IDLE_TIME: fprintf(stderr, "%llu\n", nDPId_options.tcp_max_idle_time); break; case TCP_MAX_POST_END_FLOW_TIME: fprintf(stderr, "%llu\n", nDPId_options.tcp_max_post_end_flow_time); break; case MAX_PACKETS_PER_FLOW_TO_SEND: fprintf(stderr, "%llu\n", nDPId_options.max_packets_per_flow_to_send); break; case MAX_PACKETS_PER_FLOW_TO_PROCESS: fprintf(stderr, "%llu\n", nDPId_options.max_packets_per_flow_to_process); break; case MAX_PACKETS_PER_FLOW_TO_ANALYZE: fprintf(stderr, "%llu\n", nDPId_options.max_packets_per_flow_to_analyse); break; case ERROR_EVENT_THRESHOLD_N: fprintf(stderr, "%llu\n", nDPId_options.error_event_threshold_n); break; case ERROR_EVENT_THRESHOLD_TIME: fprintf(stderr, "%llu\n", nDPId_options.error_event_threshold_time); break; } } else { break; } token++; } while (1); } static void print_usage(char const * const arg0) { static char const usage[] = "Usage: %s " "[-i pcap-file/interface] [-I] [-E] [-B bpf-filter]\n" "\t \t" "[-l] [-L logfile] [-c address] [-e]" "[-d] [-p pidfile]\n" "\t \t" "[-u user] [-g group] " "[-P path] [-C path] [-J path]\n" "\t \t" "[-a instance-alias] [-A]\n" "\t \t" "[-o subopt=value]\n" "\t \t" "[-v] [-h]\n\n" "\t-i\tInterface or file from where to read packets from.\n" "\t-I\tProcess only packets where the source address of the first packet\n" "\t \tis part of the interface subnet. (Internal mode)\n" "\t-E\tProcess only packets where the source address of the first packet\n" "\t \tis *NOT* part of the interface subnet. (External mode)\n" "\t-B\tSet an optional PCAP filter string. (BPF format)\n" "\t-l\tLog all messages to stderr.\n" "\t-L\tLog all messages to a log file.\n" "\t-c\tPath to a UNIX socket (nDPIsrvd Collector) or a custom UDP endpoint.\n" "\t \tDefault: %s\n" "\t-e\tUse poll() instead of epoll().\n" "\t \tDefault: epoll() on Linux, poll() otherwise\n" "\t-d\tFork into background after initialization.\n" "\t-p\tWrite the daemon PID to the given file path.\n" "\t \tDefault: %s\n" "\t-u\tChange UID to the numeric value of user.\n" "\t \tDefault: %s\n" "\t-g\tChange GID to the numeric value of group.\n" "\t-P\tLoad a nDPI custom protocols file.\n" "\t-C\tLoad a nDPI custom categories file.\n" "\t-J\tLoad a nDPI JA3 hash blacklist file.\n" "\t \tSee: https://sslbl.abuse.ch/blacklist/ja3_fingerprints.csv\n" "\t-S\tLoad a nDPI SSL SHA1 hash blacklist file.\n" "\t \tSee: https://sslbl.abuse.ch/blacklist/sslblacklist.csv\n" "\t-a\tSet an alias name of this daemon instance which will\n" "\t \tbe part of every JSON message.\n" "\t \tThis value is required for correct flow handling of\n" "\t \tmultiple instances and should be unique.\n" "\t \tDefaults to your hostname.\n" "\t-A\tEnable flow analysis aka feature extraction. Requires more memory and cpu usage.\n" "\t \tExperimental, do not rely on those values.\n" #ifdef ENABLE_ZLIB "\t-z\tEnable flow memory zLib compression.\n" #endif "\t-o\t(Carefully) Tune some daemon options. See subopts below.\n" "\t-v\tversion\n" "\t-h\tthis\n\n"; fprintf(stderr, usage, arg0, get_cmdarg(&nDPId_options.collector_address), get_cmdarg(&nDPId_options.pidfile), get_cmdarg(&nDPId_options.user)); } static void nDPId_print_deps_version(FILE * const out) { fprintf(out, "------------------------------------------------------\n" #ifdef LIBNDPI_STATIC "nDPI version: %s (statically linked)\n" #else "nDPI version: %s\n" #endif " API version: %u\n" "pcap version: %s\n" "------------------------------------------------------\n", ndpi_revision(), ndpi_get_api_version(), pcap_lib_version() + strlen("libpcap version ")); if (ndpi_get_gcrypt_version() != NULL) { fprintf(out, "gcrypt version: %s\n" "----------------------------------\n", ndpi_get_gcrypt_version()); } } static int nDPId_parse_options(int argc, char ** argv) { int opt; while ((opt = getopt(argc, argv, "i:IEB:lL:c:edp:u:g:P:C:J:S:a:Azo:vh")) != -1) { switch (opt) { case 'i': set_cmdarg(&nDPId_options.pcap_file_or_interface, optarg); break; case 'I': nDPId_options.process_internal_initial_direction = 1; break; case 'E': nDPId_options.process_external_initial_direction = 1; break; case 'B': set_cmdarg(&nDPId_options.bpf_str, optarg); break; case 'l': enable_console_logger(); break; case 'L': if (enable_file_logger(optarg) != 0) { return 1; } break; case 'c': set_cmdarg(&nDPId_options.collector_address, optarg); break; case 'e': #ifdef ENABLE_EPOLL nDPId_options.use_poll = 1; #else logger_early(1, "%s", "nDPId was built w/o epoll() support, poll() is already the default"); #endif break; case 'd': daemonize_enable(); break; case 'p': set_cmdarg(&nDPId_options.pidfile, optarg); break; case 'u': set_cmdarg(&nDPId_options.user, optarg); break; case 'g': set_cmdarg(&nDPId_options.group, optarg); break; case 'P': set_cmdarg(&nDPId_options.custom_protocols_file, optarg); break; case 'C': set_cmdarg(&nDPId_options.custom_categories_file, optarg); break; case 'J': set_cmdarg(&nDPId_options.custom_ja3_file, optarg); break; case 'S': set_cmdarg(&nDPId_options.custom_sha1_file, optarg); break; case 'a': set_cmdarg(&nDPId_options.instance_alias, optarg); break; case 'A': nDPId_options.enable_data_analysis = 1; break; case 'z': #ifdef ENABLE_ZLIB nDPId_options.enable_zlib_compression = 1; break; #else logger_early(1, "%s", "nDPId was built w/o zLib compression"); return 1; #endif case 'o': { int errfnd = 0; char * subopts = optarg; char * value; while (*subopts != '\0' && !errfnd) { char * endptr; int subopt = getsubopt(&subopts, subopt_token, &value); if (value == NULL && subopt != -1) { logger_early(1, "Missing value for `%s'", subopt_token[subopt]); fprintf(stderr, "%s", "\n"); print_usage(argv[0]); print_subopt_usage(); return 1; } if (subopt == -1) { logger_early(1, "Invalid subopt: %s", value); fprintf(stderr, "%s", "\n"); print_usage(argv[0]); print_subopt_usage(); return 1; } long int value_llu = strtoull(value, &endptr, 10); if (value == endptr) { logger_early(1, "Subopt `%s': Value `%s' is not a valid number.", subopt_token[subopt], value); return 1; } if (errno == ERANGE) { logger_early(1, "Subopt `%s': Number too large.", subopt_token[subopt]); return 1; } switch ((enum nDPId_subopts)subopt) { case MAX_FLOWS_PER_THREAD: nDPId_options.max_flows_per_thread = value_llu; break; case MAX_IDLE_FLOWS_PER_THREAD: nDPId_options.max_idle_flows_per_thread = value_llu; break; case MAX_READER_THREADS: nDPId_options.reader_thread_count = value_llu; break; case DAEMON_STATUS_INTERVAL: nDPId_options.daemon_status_interval = value_llu; break; #ifdef ENABLE_MEMORY_PROFILING case MEMORY_PROFILING_LOG_INTERVAL: nDPId_options.memory_profiling_log_interval = value_llu; break; #endif #ifdef ENABLE_ZLIB case COMPRESSION_SCAN_INTERVAL: nDPId_options.compression_scan_interval = value_llu; break; case COMPRESSION_FLOW_INACTIVITY: nDPId_options.compression_flow_inactivity = value_llu; break; #endif case FLOW_SCAN_INTVERAL: nDPId_options.flow_scan_interval = value_llu; break; case GENERIC_MAX_IDLE_TIME: nDPId_options.generic_max_idle_time = value_llu; break; case ICMP_MAX_IDLE_TIME: nDPId_options.icmp_max_idle_time = value_llu; break; case UDP_MAX_IDLE_TIME: nDPId_options.udp_max_idle_time = value_llu; break; case TCP_MAX_IDLE_TIME: nDPId_options.tcp_max_idle_time = value_llu; break; case TCP_MAX_POST_END_FLOW_TIME: nDPId_options.tcp_max_post_end_flow_time = value_llu; break; case MAX_PACKETS_PER_FLOW_TO_SEND: nDPId_options.max_packets_per_flow_to_send = value_llu; break; case MAX_PACKETS_PER_FLOW_TO_PROCESS: nDPId_options.max_packets_per_flow_to_process = value_llu; break; case MAX_PACKETS_PER_FLOW_TO_ANALYZE: nDPId_options.max_packets_per_flow_to_analyse = value_llu; break; case ERROR_EVENT_THRESHOLD_N: nDPId_options.error_event_threshold_n = value_llu; break; case ERROR_EVENT_THRESHOLD_TIME: nDPId_options.error_event_threshold_time = value_llu; break; } } break; } case 'v': fprintf(stderr, "%s", get_nDPId_version()); nDPId_print_deps_version(stderr); return 1; case 'h': default: fprintf(stderr, "%s\n", get_nDPId_version()); print_usage(argv[0]); print_subopt_usage(); return 1; } } if (optind < argc) { logger_early(1, "%s", "Unexpected argument after options"); fprintf(stderr, "%s", "\n"); print_usage(argv[0]); print_subopt_usage(); return 1; } return 0; } static int validate_options(void) { int retval = 0; if (is_daemonize_enabled() != 0 && is_console_logger_enabled() != 0) { logger_early(1, "%s", "Daemon mode `-d' and `-l' can not be used together, " "because stdout/stderr is beeing redirected to /dev/null"); retval = 1; } #ifdef ENABLE_ZLIB if (nDPId_options.enable_zlib_compression != 0) { if (nDPId_options.compression_flow_inactivity < TIME_S_TO_US(6u) || nDPId_options.compression_scan_interval < TIME_S_TO_US(4u)) { logger_early(1, "Setting compression-scan-interval / compression-flow-inactivity " "to values lower than %llu / %llu are not recommended.", TIME_S_TO_US(4u), TIME_S_TO_US(6u)); logger_early(1, "%s", "Your CPU usage may increase heavily."); } } #endif if (nDPIsrvd_setup_address(&collector_address, get_cmdarg(&nDPId_options.collector_address)) != 0) { retval = 1; logger_early(1, "Collector socket invalid address: %s.", get_cmdarg(&nDPId_options.collector_address)); } if (is_cmdarg_set(&nDPId_options.instance_alias) == 0) { char hname[256]; errno = 0; if (gethostname(hname, sizeof(hname)) != 0) { logger_early(1, "Could not retrieve your hostname: %s", strerror(errno)); retval = 1; } else { set_cmdarg(&nDPId_options.instance_alias, hname); logger_early(1, "No instance alias given, using your hostname '%s'", get_cmdarg(&nDPId_options.instance_alias)); if (is_cmdarg_set(&nDPId_options.instance_alias) == 0) { retval = 1; } } } if (nDPId_options.max_flows_per_thread < 128 || nDPId_options.max_flows_per_thread > nDPId_MAX_FLOWS_PER_THREAD) { logger_early(1, "Value not in range: 128 < max-flows-per-thread[%llu] < %d", nDPId_options.max_flows_per_thread, nDPId_MAX_FLOWS_PER_THREAD); retval = 1; } if (nDPId_options.max_idle_flows_per_thread < 64 || nDPId_options.max_idle_flows_per_thread > nDPId_MAX_IDLE_FLOWS_PER_THREAD) { logger_early(1, "Value not in range: 64 < max-idle-flows-per-thread[%llu] < %d", nDPId_options.max_idle_flows_per_thread, nDPId_MAX_IDLE_FLOWS_PER_THREAD); retval = 1; } if (nDPId_options.reader_thread_count < 1 || nDPId_options.reader_thread_count > nDPId_MAX_READER_THREADS) { logger_early(1, "Value not in range: 1 < reader-thread-count[%llu] < %d", nDPId_options.reader_thread_count, nDPId_MAX_READER_THREADS); retval = 1; } if (nDPId_options.flow_scan_interval < TIME_S_TO_US(5u)) { logger_early(1, "Value not in range: idle-scan-interval[%llu] > %llu", nDPId_options.flow_scan_interval, TIME_S_TO_US(5u)); retval = 1; } if (nDPId_options.flow_scan_interval >= nDPId_options.generic_max_idle_time) { logger_early(1, "Value not in range: flow-scan-interval[%llu] < generic-max-idle-time[%llu]", nDPId_options.flow_scan_interval, nDPId_options.generic_max_idle_time); retval = 1; } if (nDPId_options.flow_scan_interval >= nDPId_options.icmp_max_idle_time) { logger_early(1, "Value not in range: flow-scan-interval[%llu] < icmp-max-idle-time[%llu]", nDPId_options.flow_scan_interval, nDPId_options.icmp_max_idle_time); retval = 1; } if (nDPId_options.flow_scan_interval >= nDPId_options.tcp_max_idle_time) { logger_early(1, "Value not in range: flow-scan-interval[%llu] < generic-max-idle-time[%llu]", nDPId_options.flow_scan_interval, nDPId_options.tcp_max_idle_time); retval = 1; } if (nDPId_options.flow_scan_interval >= nDPId_options.udp_max_idle_time) { logger_early(1, "Value not in range:flow-scan-interval[%llu] < udp-max-idle-time[%llu]", nDPId_options.flow_scan_interval, nDPId_options.udp_max_idle_time); retval = 1; } if (nDPId_options.process_internal_initial_direction != 0 && nDPId_options.process_external_initial_direction != 0) { logger_early(1, "%s", "Internal and External packet processing does not make sense as this is the default."); retval = 1; } if (nDPId_options.process_internal_initial_direction != 0 || nDPId_options.process_external_initial_direction != 0) { logger_early(1, "%s", "Internal and External packet processing may lead to incorrect results for flows that were active " "before the daemon started."); } if (nDPId_options.max_packets_per_flow_to_process < 1 || nDPId_options.max_packets_per_flow_to_process > 65535) { logger_early(1, "Value not in range: 1 =< max-packets-per-flow-to-process[%llu] =< 65535", nDPId_options.max_packets_per_flow_to_process); retval = 1; } if (nDPId_options.max_packets_per_flow_to_send > 30) { logger_early(1, "%s", "Higher values of max-packets-per-flow-to-send may cause superfluous network usage."); } return retval; } #ifndef NO_MAIN int main(int argc, char ** argv) { if (argc == 0 || stdout == NULL || stderr == NULL) { return 1; } init_logging("nDPId"); if (nDPId_parse_options(argc, argv) != 0) { return 1; } if (validate_options() != 0) { logger_early(1, "%s", "Option validation failed."); return 1; } log_app_info(); nDPId_print_deps_version(stdout); if (NDPI_API_VERSION != ndpi_get_api_version()) { logger_early(1, "Unforeseen Consequences; nDPId was compiled with libnDPI api version %u, but the api version of " "the shared library is: %u.", NDPI_API_VERSION, ndpi_get_api_version()); } if (sizeof(struct ndpi_flow_struct) != ndpi_detection_get_sizeof_ndpi_flow_struct()) { logger_early(1, "FATAL: nDPI flow struct size inconsistent; expected %zu bytes, got %u bytes.", sizeof(struct ndpi_flow_struct), ndpi_detection_get_sizeof_ndpi_flow_struct()); return 1; } #ifdef ENABLE_MEMORY_PROFILING logger_early(0, "size/workflow....: %zu bytes", sizeof(struct nDPId_workflow)); logger_early(0, "size/flow........: %zu bytes", sizeof(struct nDPId_flow) + sizeof(struct nDPId_detection_data)); logger_early(0, "size/flow-analyse: %zu bytes", sizeof(struct nDPId_flow_analysis)); #endif if (setup_reader_threads() != 0) { return 1; } if (start_reader_threads() != 0) { return 1; } signal(SIGINT, sighandler); signal(SIGTERM, sighandler); signal(SIGPIPE, SIG_IGN); while (MT_GET_AND_ADD(nDPId_main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) { sleep(1); } if (stop_reader_threads() != 0) { return 1; } free_reader_threads(); daemonize_shutdown(get_cmdarg(&nDPId_options.pidfile)); logger(0, "%s", "Bye."); shutdown_logging(); return 0; } #endif